Samples JDK
ThrottlingSender.java
1 package com.freemindcafe.concurrency.sample4;
2 
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.ArrayBlockingQueue;
6 import java.util.concurrent.BlockingQueue;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Executors;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.ScheduledExecutorService;
11 import java.util.concurrent.SynchronousQueue;
12 import java.util.concurrent.ThreadPoolExecutor;
13 import java.util.concurrent.TimeUnit;
14 import java.util.function.Consumer;
15 
16 public class ThrottlingSender<E> implements ISender<E> {
17 
18  private final int maxRequestsPerUnitOfTime;
19  private final int unitOfTime;
20  private final TimeUnit timeUnit;
21  private final int maxElementsPerRequest;
22  private final Consumer<List<E>> requestConsumer;
23  private final BlockingQueue<RequestToken> throttlingQueue;
24  private final BlockingQueue<E> incomingElementsQueue;
25  private final ExecutorService incomingElementsQueueExecutorService;
26  private Consumer<E> rejectedRequestHandler = (e) -> {};
27  private volatile boolean shutdown = false;
28 
29 
30  public ThrottlingSender(int maxRequestsPerUnitOfTime, int unitOfTime, TimeUnit timeUnit, int maxElementsPerRequest, Consumer<List<E>> requestConsumer){
31  this.maxRequestsPerUnitOfTime = maxRequestsPerUnitOfTime;
32  this.unitOfTime = unitOfTime;
33  this.timeUnit = timeUnit;
34  this.maxElementsPerRequest = maxElementsPerRequest;
35  this.requestConsumer = requestConsumer;
36  this.throttlingQueue = new ArrayBlockingQueue<>(this.maxRequestsPerUnitOfTime);
37  this.incomingElementsQueue = new LinkedBlockingQueue<E>(); //unbounded thread-safe, we don't want to discard any requests
38  this.incomingElementsQueueExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
39  this.incomingElementsQueueExecutorService.execute(new ThrottledSenderRunnable());
40  }
41 
42 
43  @Override
44  public void send(E event) {
45  try {
46  if(shutdown){
47  rejectedRequestHandler.accept(event);
48  }
49  else{
50  incomingElementsQueue.put(event);
51  }
52  } catch (InterruptedException e) {
53  Thread.currentThread().interrupt();
54  }
55  }
56 
57  public void shutdown(){
58  //set shutdown flag
59  //Reject further work
60  //Call shutdown on thread pool
61  //Wait till queue is empty
62  //Exit then
63  shutdown = true;
64  incomingElementsQueueExecutorService.shutdown();
65  }
66 
67  public List<E> shutdownNow(){
68  //set shutdown flag
69  //Reject further work
70  //Exit
71  //Return all pending events
72  shutdown = true;
73  incomingElementsQueueExecutorService.shutdownNow();
74  List<E> allEvents = new ArrayList<>();
75  incomingElementsQueue.drainTo(allEvents);
76  return allEvents;
77  }
78 
79  private class ThrottledSenderRunnable implements Runnable{
80 
81  @Override
82  public void run() {
83  while(true){
84  try {
85  if(!incomingElementsQueue.isEmpty()){
86  RequestToken token = new RequestToken();
87  throttlingQueue.put(token);
88  token.start();
89  List<E> elements = new ArrayList<E>();
90  incomingElementsQueue.drainTo(elements, maxElementsPerRequest);
91  requestConsumer.accept(elements);
92  }
93  else{
94  if(shutdown){
95  break;
96  }
97  }
98  } catch (InterruptedException e) {
99  Thread.currentThread().interrupt();
100  //In case of interrupt, come out of the loop
101  //As the thread pool executor will interrupt the thread in case of shutdown
102  break;
103  }
104  }
105 
106  }
107 
108  }
109 
110  private class RequestToken{
111 
112  private void start(){
113  ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
114  service.schedule(()->throttlingQueue.remove(this), unitOfTime, timeUnit);
115 
116  }
117 
118  }
119 
120  public void setrRejectedRequestHandler(Consumer<E> rejectedRequestHandler){
121  this.rejectedRequestHandler = rejectedRequestHandler;
122  }
123 
124 }