1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.util;
21
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.FutureTask;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34
35
36
37
38
39
40
41 @InterfaceAudience.Private
42 public class BoundedCompletionService<V> {
43 private final Executor executor;
44 private final List<Future<V>> tasks;
45 private final BlockingQueue<Future<V>> completed;
46
47 class QueueingFuture extends FutureTask<V> {
48
49 public QueueingFuture(Callable<V> callable) {
50 super(callable);
51 }
52
53 @Override
54 protected void done() {
55 completed.add(QueueingFuture.this);
56 }
57 }
58
59 public BoundedCompletionService(Executor executor, int maxTasks) {
60 this.executor = executor;
61 this.tasks = new ArrayList<Future<V>>(maxTasks);
62 this.completed = new ArrayBlockingQueue<Future<V>>(maxTasks);
63 }
64
65
66 public Future<V> submit(Callable<V> task) {
67 QueueingFuture newFuture = new QueueingFuture(task);
68 executor.execute(newFuture);
69 tasks.add(newFuture);
70 return newFuture;
71 }
72
73 public Future<V> take() throws InterruptedException{
74 return completed.take();
75 }
76
77 public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException{
78 return completed.poll(timeout, unit);
79 }
80
81 public void cancelAll(boolean interrupt) {
82 for (Future<V> future : tasks) {
83 future.cancel(interrupt);
84 }
85 }
86 }