1 package com.freemindcafe.concurrency.sample1;
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashSet;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.LinkedBlockingQueue;
11 import java.util.concurrent.RejectedExecutionException;
12 import java.util.concurrent.SynchronousQueue;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
16 import junit.framework.Assert;
21 public void simple_pool_having_runnable()
throws InterruptedException{
23 List<Integer> list =
new ArrayList<>();
25 Runnable r1 = () -> list.add(1);
26 Runnable r2 = () -> list.add(1);
28 ExecutorService executor =
new ThreadPoolExecutor(1, 1, 0, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>());
32 executor.awaitTermination(1, TimeUnit.HOURS);
34 Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
41 public void pool_having_multiple_threads_random_behaviour_as_list_is_unsynchronized()
throws InterruptedException{
43 List<Integer> list =
new ArrayList<>();
44 Set<Long> threadIds =
new HashSet<>();
46 Runnable r1 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
47 Runnable r2 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
49 ExecutorService executor =
new ThreadPoolExecutor(5, 100, 0, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>());
53 Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
55 Assert.assertNotNull(list.get(0));
56 Assert.assertNotNull(list.get(1));
57 Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
58 Assert.assertEquals(2, threadIds.size());
64 public void pool_having_multiple_threads_success_as_list_is_copy_on_write()
throws InterruptedException{
66 List<Integer> list =
new CopyOnWriteArrayList<>();
67 Set<Long> threadIds =
new HashSet<>();
69 Runnable r1 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
70 Runnable r2 = () -> {list.add(1);threadIds.add(Thread.currentThread().getId());};
72 ExecutorService executor =
new ThreadPoolExecutor(5, 100, 0, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>());
76 Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
78 Assert.assertNotNull(list.get(0));
79 Assert.assertNotNull(list.get(1));
80 Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
81 Assert.assertEquals(2, threadIds.size());
87 public void pool_having_multiple_threads_success_as_list_is_synchronized()
throws InterruptedException{
89 List<Integer> list = Collections.synchronizedList(
new ArrayList<>());
90 Set<String> threadNames =
new HashSet<>();
92 Runnable r1 = () -> {list.add(1);
try{Thread.sleep(1000);}
catch(Exception ex){}threadNames.add(Thread.currentThread().getName());};
93 Runnable r2 = () -> {list.add(1);threadNames.add(Thread.currentThread().getName());};
95 ExecutorService executor =
new ThreadPoolExecutor(5, 100, 0, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>());
99 Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
101 Assert.assertNotNull(list.get(0));
102 Assert.assertNotNull(list.get(1));
103 Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
104 Assert.assertEquals(2, threadNames.stream().filter(s->s.startsWith(
"pool-")).count());
105 Assert.assertEquals(0, threadNames.stream().filter(s->!s.startsWith(
"pool-")).count());
109 @
org.junit.Test(expected=RejectedExecutionException.class)
110 public void in_absebse_of_buffer_second_task_is_rejected()
throws InterruptedException{
112 List<Integer> list =
new ArrayList<>();
113 Set<Long> threadIds =
new HashSet<>();
115 Runnable r1 = () -> {list.add(1);
try {
117 threadIds.add(Thread.currentThread().getId());
118 }
catch (Exception e) {
122 Runnable r2 = () -> {list.add(1);
try {
124 threadIds.add(Thread.currentThread().getId());
125 }
catch (Exception e) {
130 ExecutorService executor =
new ThreadPoolExecutor(1, 1, 0, TimeUnit.HOURS,
new SynchronousQueue<Runnable>());
131 executor.execute(r1);
132 executor.execute(r2);
134 Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
136 Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
137 Assert.assertEquals(1, threadIds.size());
142 public void rejected_task_runs_in_the_caller_thread()
throws InterruptedException{
144 List<Integer> list =
new ArrayList<>();
145 Set<String> threadNames =
new HashSet<>();
147 Runnable r1 = () -> {list.add(1);
try {
149 threadNames.add(Thread.currentThread().getName());
150 }
catch (Exception e) {
154 Runnable r2 = () -> {list.add(1);
try {
156 threadNames.add(Thread.currentThread().getName());
157 }
catch (Exception e) {
162 ExecutorService executor =
new ThreadPoolExecutor(1, 1, 0, TimeUnit.HOURS,
new SynchronousQueue<Runnable>(), (r, t)->r.run());
163 executor.execute(r1);
164 executor.execute(r2);
166 Assert.assertTrue(executor.awaitTermination(1, TimeUnit.HOURS));
168 Assert.assertEquals(2, list.stream().mapToInt(Integer::intValue).sum());
169 Assert.assertEquals(2, threadNames.size());
170 Assert.assertEquals(1, threadNames.stream().filter(s->s.startsWith(
"pool-")).count());
171 Assert.assertEquals(1, threadNames.stream().filter(s->!s.startsWith(
"pool-")).count());