1 package com.freemindcafe.concurrency.sample4;
3 import java.util.ArrayList;
5 import java.util.concurrent.ArrayBlockingQueue;
6 import java.util.concurrent.BlockingQueue;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Executors;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.ScheduledExecutorService;
11 import java.util.concurrent.SynchronousQueue;
12 import java.util.concurrent.ThreadPoolExecutor;
13 import java.util.concurrent.TimeUnit;
14 import java.util.function.Consumer;
18 private final int maxRequestsPerUnitOfTime;
19 private final int unitOfTime;
20 private final TimeUnit timeUnit;
21 private final int maxElementsPerRequest;
22 private final Consumer<List<E>> requestConsumer;
23 private final BlockingQueue<RequestToken> throttlingQueue;
24 private final BlockingQueue<E> incomingElementsQueue;
25 private final ExecutorService incomingElementsQueueExecutorService;
26 private Consumer<E> rejectedRequestHandler = (e) -> {};
27 private volatile boolean shutdown =
false;
30 public ThrottlingSender(
int maxRequestsPerUnitOfTime,
int unitOfTime, TimeUnit timeUnit,
int maxElementsPerRequest, Consumer<List<E>> requestConsumer){
31 this.maxRequestsPerUnitOfTime = maxRequestsPerUnitOfTime;
32 this.unitOfTime = unitOfTime;
33 this.timeUnit = timeUnit;
34 this.maxElementsPerRequest = maxElementsPerRequest;
35 this.requestConsumer = requestConsumer;
36 this.throttlingQueue =
new ArrayBlockingQueue<>(this.maxRequestsPerUnitOfTime);
37 this.incomingElementsQueue =
new LinkedBlockingQueue<E>();
38 this.incomingElementsQueueExecutorService =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>());
39 this.incomingElementsQueueExecutorService.execute(
new ThrottledSenderRunnable());
44 public void send(E event) {
47 rejectedRequestHandler.accept(event);
50 incomingElementsQueue.put(event);
52 }
catch (InterruptedException e) {
53 Thread.currentThread().interrupt();
57 public void shutdown(){
64 incomingElementsQueueExecutorService.shutdown();
67 public List<E> shutdownNow(){
73 incomingElementsQueueExecutorService.shutdownNow();
74 List<E> allEvents =
new ArrayList<>();
75 incomingElementsQueue.drainTo(allEvents);
79 private class ThrottledSenderRunnable
implements Runnable{
85 if(!incomingElementsQueue.isEmpty()){
86 RequestToken token =
new RequestToken();
87 throttlingQueue.put(token);
89 List<E> elements =
new ArrayList<E>();
90 incomingElementsQueue.drainTo(elements, maxElementsPerRequest);
91 requestConsumer.accept(elements);
98 }
catch (InterruptedException e) {
99 Thread.currentThread().interrupt();
110 private class RequestToken{
112 private void start(){
113 ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
114 service.schedule(()->throttlingQueue.remove(
this), unitOfTime, timeUnit);
120 public void setrRejectedRequestHandler(Consumer<E> rejectedRequestHandler){
121 this.rejectedRequestHandler = rejectedRequestHandler;