1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.RunnableFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.htrace.Trace;
29
30
31
32
33
34
35
36 @InterfaceAudience.Private
37 public class ResultBoundedCompletionService<V> {
38 private final RpcRetryingCallerFactory retryingCallerFactory;
39 private final Executor executor;
40 private final QueueingFuture<V>[] tasks;
41 private volatile QueueingFuture<V> completed = null;
42 private volatile boolean cancelled = false;
43
44 class QueueingFuture<T> implements RunnableFuture<T> {
45 private final RetryingCallable<T> future;
46 private T result = null;
47 private ExecutionException exeEx = null;
48 private volatile boolean cancelled = false;
49 private final int callTimeout;
50 private final RpcRetryingCaller<T> retryingCaller;
51 private boolean resultObtained = false;
52
53
54 public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
55 this.future = future;
56 this.callTimeout = callTimeout;
57 this.retryingCaller = retryingCallerFactory.<T>newCaller();
58 }
59
60 @SuppressWarnings("unchecked")
61 @Override
62 public void run() {
63 try {
64 if (!cancelled) {
65 result = this.retryingCaller.callWithRetries(future, callTimeout);
66 resultObtained = true;
67 }
68 } catch (Throwable t) {
69 exeEx = new ExecutionException(t);
70 } finally {
71 synchronized (tasks) {
72
73 if (!cancelled && completed == null) {
74 completed = (QueueingFuture<V>) QueueingFuture.this;
75 }
76
77
78
79 tasks.notify();
80 }
81 }
82 }
83 @Override
84 public boolean cancel(boolean mayInterruptIfRunning) {
85 if (resultObtained || exeEx != null) return false;
86 retryingCaller.cancel();
87 if (future instanceof Cancellable) ((Cancellable)future).cancel();
88 cancelled = true;
89 return true;
90 }
91
92 @Override
93 public boolean isCancelled() {
94 return cancelled;
95 }
96
97 @Override
98 public boolean isDone() {
99 return resultObtained || exeEx != null;
100 }
101
102 @Override
103 public T get() throws InterruptedException, ExecutionException {
104 try {
105 return get(1000, TimeUnit.DAYS);
106 } catch (TimeoutException e) {
107 throw new RuntimeException("You did wait for 1000 days here?", e);
108 }
109 }
110
111 @Override
112 public T get(long timeout, TimeUnit unit)
113 throws InterruptedException, ExecutionException, TimeoutException {
114 synchronized (tasks) {
115 if (resultObtained) {
116 return result;
117 }
118 if (exeEx != null) {
119 throw exeEx;
120 }
121 unit.timedWait(tasks, timeout);
122 }
123 if (resultObtained) {
124 return result;
125 }
126 if (exeEx != null) {
127 throw exeEx;
128 }
129
130 throw new TimeoutException("timeout=" + timeout + ", " + unit);
131 }
132 }
133
134 @SuppressWarnings("unchecked")
135 public ResultBoundedCompletionService(
136 RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
137 int maxTasks) {
138 this.retryingCallerFactory = retryingCallerFactory;
139 this.executor = executor;
140 this.tasks = new QueueingFuture[maxTasks];
141 }
142
143
144 public void submit(RetryingCallable<V> task, int callTimeout, int id) {
145 QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
146 executor.execute(Trace.wrap(newFuture));
147 tasks[id] = newFuture;
148 }
149
150 public QueueingFuture<V> take() throws InterruptedException {
151 synchronized (tasks) {
152 while (completed == null && !cancelled) tasks.wait();
153 }
154 return completed;
155 }
156
157 public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
158 synchronized (tasks) {
159 if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
160 }
161 return completed;
162 }
163
164 public void cancelAll() {
165
166 synchronized (tasks) {
167 cancelled = true;
168 }
169 for (QueueingFuture<V> future : tasks) {
170 if (future != null) future.cancel(true);
171 }
172 }
173 }