Samples JDK
Test.java
1 package com.freemindcafe.concurrency.sample1;
2 
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashSet;
6 import java.util.List;
7 import java.util.Set;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.LinkedBlockingQueue;
11 import java.util.concurrent.RejectedExecutionException;
12 import java.util.concurrent.SynchronousQueue;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
15 
16 import junit.framework.Assert;
17 
18 public class Test {
19 
20  @org.junit.Test
21  public void simple_pool_having_runnable() throws InterruptedException{
22 
23  List<Integer> list = new ArrayList<>();
24 
25  Runnable r1 = () -> list.add(1);
26  Runnable r2 = () -> list.add(1);
27 
28  ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
29  executor.execute(r1);
30  executor.execute(r2);
31  executor.shutdown();
32  executor.awaitTermination(1, TimeUnit.HOURS);
33 
34  Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
35 
36  }
37 
38  //If you run it multiple times (4-5), it will fail in one or two runs.
39  //We should not use non thread safe collections.
40  @org.junit.Test
41  public void pool_having_multiple_threads_random_behaviour_as_list_is_unsynchronized() throws InterruptedException{
42 
43  List<Integer> list = new ArrayList<>();
44  Set<Long> threadIds = new HashSet<>();
45 
46  Runnable r1 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
47  Runnable r2 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
48 
49  ExecutorService executor = new ThreadPoolExecutor(5, 100, 0, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
50  executor.execute(r1);
51  executor.execute(r2);
52  executor.shutdown();
53  Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
54 
55  Assert.assertNotNull(list.get(0));
56  Assert.assertNotNull(list.get(1));
57  Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
58  Assert.assertEquals(2, threadIds.size());
59 
60  }
61 
62  //Using thread safe collection
63  @org.junit.Test
64  public void pool_having_multiple_threads_success_as_list_is_copy_on_write() throws InterruptedException{
65 
66  List<Integer> list = new CopyOnWriteArrayList<>();
67  Set<Long> threadIds = new HashSet<>();
68 
69  Runnable r1 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
70  Runnable r2 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
71 
72  ExecutorService executor = new ThreadPoolExecutor(5, 100, 0, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
73  executor.execute(r1);
74  executor.execute(r2);
75  executor.shutdown();
76  Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
77 
78  Assert.assertNotNull(list.get(0));
79  Assert.assertNotNull(list.get(1));
80  Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
81  Assert.assertEquals(2, threadIds.size());
82 
83  }
84 
85  //Thread safe collection
86  @org.junit.Test
87  public void pool_having_multiple_threads_success_as_list_is_synchronized() throws InterruptedException{
88 
89  List<Integer> list = Collections.synchronizedList(new ArrayList<>());
90  Set<String> threadNames = new HashSet<>();
91 
92  Runnable r1 = () -> {list.add(1);try{Thread.sleep(1000);}catch(Exception ex){}threadNames.add(Thread.currentThread().getName());};
93  Runnable r2 = () -> {list.add(1);threadNames.add(Thread.currentThread().getName());};
94 
95  ExecutorService executor = new ThreadPoolExecutor(5, 100, 0, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
96  executor.execute(r1);
97  executor.execute(r2);
98  executor.shutdown();
99  Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
100 
101  Assert.assertNotNull(list.get(0));
102  Assert.assertNotNull(list.get(1));
103  Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
104  Assert.assertEquals(2, threadNames.stream().filter(s->s.startsWith("pool-")).count());
105  Assert.assertEquals(0, threadNames.stream().filter(s->!s.startsWith("pool-")).count());
106 
107  }
108 
109  @org.junit.Test(expected=RejectedExecutionException.class)
110  public void in_absebse_of_buffer_second_task_is_rejected() throws InterruptedException{
111 
112  List<Integer> list = new ArrayList<>();
113  Set<Long> threadIds = new HashSet<>();
114 
115  Runnable r1 = () -> {list.add(1);try {
116  Thread.sleep(2000);
117  threadIds.add(Thread.currentThread().getId());
118  } catch (Exception e) {
119  // TODO Auto-generated catch block
120  e.printStackTrace();
121  }};
122  Runnable r2 = () -> {list.add(1);try {
123  Thread.sleep(2000);
124  threadIds.add(Thread.currentThread().getId());
125  } catch (Exception e) {
126  // TODO Auto-generated catch block
127  e.printStackTrace();
128  }};
129 
130  ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.HOURS, new SynchronousQueue<Runnable>());
131  executor.execute(r1);
132  executor.execute(r2);
133  executor.shutdown();
134  Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
135 
136  Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
137  Assert.assertEquals(1, threadIds.size());
138 
139  }
140 
141  @org.junit.Test
142  public void rejected_task_runs_in_the_caller_thread() throws InterruptedException{
143 
144  List<Integer> list = new ArrayList<>();
145  Set<String> threadNames = new HashSet<>();
146 
147  Runnable r1 = () -> {list.add(1);try {
148  Thread.sleep(2000);
149  threadNames.add(Thread.currentThread().getName());
150  } catch (Exception e) {
151  // TODO Auto-generated catch block
152  e.printStackTrace();
153  }};
154  Runnable r2 = () -> {list.add(1);try {
155  Thread.sleep(2000);
156  threadNames.add(Thread.currentThread().getName());
157  } catch (Exception e) {
158  // TODO Auto-generated catch block
159  e.printStackTrace();
160  }};
161 
162  ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.HOURS, new SynchronousQueue<Runnable>(), (r, t)->r.run());
163  executor.execute(r1);
164  executor.execute(r2);
165  executor.shutdown();
166  Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
167 
168  Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
169  Assert.assertEquals(2, threadNames.size());
170  Assert.assertEquals(1, threadNames.stream().filter(s->s.startsWith("pool-")).count());
171  Assert.assertEquals(1, threadNames.stream().filter(s->!s.startsWith("pool-")).count());
172 
173  }
174 
175 }