1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.util.HashSet;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CancellationException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.RegionLocations;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.Pair;
45
46 import com.google.common.annotations.VisibleForTesting;
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
62 private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class);
63 volatile ScannerCallable currentScannerCallable;
64 AtomicBoolean replicaSwitched = new AtomicBoolean(false);
65 final ClusterConnection cConnection;
66 protected final ExecutorService pool;
67 protected final int timeBeforeReplicas;
68 private final Scan scan;
69 private final int retries;
70 private Result lastResult;
71 private final RpcRetryingCaller<Result[]> caller;
72 private final TableName tableName;
73 private Configuration conf;
74 private int scannerTimeout;
75 private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
76 private boolean someRPCcancelled = false;
77
78 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
79 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
80 int retries, int scannerTimeout, int caching, Configuration conf,
81 RpcRetryingCaller<Result []> caller) {
82 this.currentScannerCallable = baseCallable;
83 this.cConnection = cConnection;
84 this.pool = pool;
85 if (timeBeforeReplicas < 0) {
86 throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
87 }
88 this.timeBeforeReplicas = timeBeforeReplicas;
89 this.scan = scan;
90 this.retries = retries;
91 this.tableName = tableName;
92 this.conf = conf;
93 this.scannerTimeout = scannerTimeout;
94 this.caller = caller;
95 }
96
97 public void setClose() {
98 currentScannerCallable.setClose();
99 }
100
101 public void setRenew(boolean val) {
102 currentScannerCallable.setRenew(val);
103 }
104
105 public void setCaching(int caching) {
106 currentScannerCallable.setCaching(caching);
107 }
108
109 public int getCaching() {
110 return currentScannerCallable.getCaching();
111 }
112
113 public HRegionInfo getHRegionInfo() {
114 return currentScannerCallable.getHRegionInfo();
115 }
116
117 public boolean getServerHasMoreResults() {
118 return currentScannerCallable.getServerHasMoreResults();
119 }
120
121 public void setServerHasMoreResults(boolean serverHasMoreResults) {
122 currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
123 }
124
125 public boolean hasMoreResultsContext() {
126 return currentScannerCallable.hasMoreResultsContext();
127 }
128
129 public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
130 currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
131 }
132
133 @Override
134 public Result [] call(int timeout) throws IOException {
135
136
137
138
139 if (currentScannerCallable != null && currentScannerCallable.closed) {
140
141
142 if (LOG.isTraceEnabled()) {
143 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
144 }
145 Result[] r = currentScannerCallable.call(timeout);
146 currentScannerCallable = null;
147 return r;
148 }
149
150
151
152
153
154
155
156 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
157 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
158 currentScannerCallable.getRow());
159
160
161
162 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
163 new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
164 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
165 rl.size() * 5);
166
167 AtomicBoolean done = new AtomicBoolean(false);
168 replicaSwitched.set(false);
169
170 addCallsForCurrentReplica(cs, rl);
171
172 try {
173
174 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
175 TimeUnit.MICROSECONDS);
176 if (f != null) {
177 Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
178 if (r != null && r.getSecond() != null) {
179 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
180 }
181 return r == null ? null : r.getFirst();
182 }
183 } catch (ExecutionException e) {
184 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
185 } catch (CancellationException e) {
186 throw new InterruptedIOException(e.getMessage());
187 } catch (InterruptedException e) {
188 throw new InterruptedIOException(e.getMessage());
189 } catch (TimeoutException e) {
190 throw new InterruptedIOException(e.getMessage());
191 }
192
193
194
195 addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
196
197 try {
198 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
199 if (f != null) {
200 Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
201 if (r != null && r.getSecond() != null) {
202 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
203 }
204 return r == null ? null : r.getFirst();
205 } else {
206 throw new IOException("Failed to get result within timeout, timeout="
207 + timeout + "ms");
208 }
209 } catch (ExecutionException e) {
210 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
211 } catch (CancellationException e) {
212 throw new InterruptedIOException(e.getMessage());
213 } catch (InterruptedException e) {
214 throw new InterruptedIOException(e.getMessage());
215 } catch (TimeoutException e) {
216 throw new InterruptedIOException(e.getMessage());
217 } finally {
218
219
220 cs.cancelAll();
221 }
222 LOG.error("Imposible? Arrive at an unreachable line...");
223 throw new IOException("Imposible? Arrive at an unreachable line...");
224 }
225
226 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
227 AtomicBoolean done, ExecutorService pool) {
228 if (done.compareAndSet(false, true)) {
229 if (currentScannerCallable != scanner) replicaSwitched.set(true);
230 currentScannerCallable = scanner;
231
232 if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
233 if (LOG.isTraceEnabled()) {
234 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
235 " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
236 }
237
238 outstandingCallables.remove(scanner);
239 for (ScannerCallable s : outstandingCallables) {
240 if (LOG.isTraceEnabled()) {
241 LOG.trace("Closing scanner id=" + s.scannerId +
242 ", replica=" + s.getHRegionInfo().getRegionId() +
243 " because slow and replica=" +
244 this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
245 }
246
247
248
249 s.setClose();
250 final RetryingRPC r = new RetryingRPC(s);
251 pool.submit(new Callable<Void>(){
252 @Override
253 public Void call() throws Exception {
254 r.call(scannerTimeout);
255 return null;
256 }
257 });
258 }
259
260 outstandingCallables.clear();
261 }
262 }
263
264
265
266
267
268
269 public boolean switchedToADifferentReplica() {
270 return replicaSwitched.get();
271 }
272
273
274
275
276
277
278
279 public boolean isHeartbeatMessage() {
280 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
281 }
282
283 private void addCallsForCurrentReplica(
284 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
285 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
286 outstandingCallables.add(currentScannerCallable);
287 cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
288 }
289
290 private void addCallsForOtherReplicas(
291 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
292 int min, int max) {
293 if (scan.getConsistency() == Consistency.STRONG) {
294 return;
295 }
296 for (int id = min; id <= max; id++) {
297 if (currentScannerCallable.id == id) {
298 continue;
299 }
300 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
301 setStartRowForReplicaCallable(s);
302 outstandingCallables.add(s);
303 RetryingRPC retryingOnReplica = new RetryingRPC(s);
304 cs.submit(retryingOnReplica, scannerTimeout, id);
305 }
306 }
307
308
309
310
311
312 private void setStartRowForReplicaCallable(ScannerCallable callable) {
313 if (this.lastResult == null || callable == null) return;
314
315 if (this.lastResult.isPartial()) {
316
317
318
319
320 callable.getScan().setStartRow(this.lastResult.getRow());
321 } else {
322
323
324
325 if (callable.getScan().isReversed()) {
326 callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
327 } else {
328 callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
329 }
330 }
331 }
332
333 @VisibleForTesting
334 boolean isAnyRPCcancelled() {
335 return someRPCcancelled;
336 }
337
338 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
339 final ScannerCallable callable;
340 RpcRetryingCaller<Result[]> caller;
341 private volatile boolean cancelled = false;
342
343 RetryingRPC(ScannerCallable callable) {
344 this.callable = callable;
345
346
347
348
349
350 this.caller = ScannerCallableWithReplicas.this.caller;
351 if (scan.getConsistency() == Consistency.TIMELINE) {
352 this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
353 .<Result[]>newCaller();
354 }
355 }
356
357 @Override
358 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
359
360
361 if (cancelled) {
362 return null;
363 }
364 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
365 return new Pair<Result[], ScannerCallable>(res, this.callable);
366 }
367
368 @Override
369 public void prepare(boolean reload) throws IOException {
370 if (cancelled) return;
371
372 if (Thread.interrupted()) {
373 throw new InterruptedIOException();
374 }
375
376 callable.prepare(reload);
377 }
378
379 @Override
380 public void throwable(Throwable t, boolean retrying) {
381 callable.throwable(t, retrying);
382 }
383
384 @Override
385 public String getExceptionMessageAdditionalDetail() {
386 return callable.getExceptionMessageAdditionalDetail();
387 }
388
389 @Override
390 public long sleep(long pause, int tries) {
391 return callable.sleep(pause, tries);
392 }
393
394 @Override
395 public void cancel() {
396 cancelled = true;
397 caller.cancel();
398 if (callable.getController() != null) {
399 callable.getController().startCancel();
400 }
401 someRPCcancelled = true;
402 }
403
404 @Override
405 public boolean isCancelled() {
406 return cancelled;
407 }
408 }
409
410 @Override
411 public void prepare(boolean reload) throws IOException {
412 }
413
414 @Override
415 public void throwable(Throwable t, boolean retrying) {
416 currentScannerCallable.throwable(t, retrying);
417 }
418
419 @Override
420 public String getExceptionMessageAdditionalDetail() {
421 return currentScannerCallable.getExceptionMessageAdditionalDetail();
422 }
423
424 @Override
425 public long sleep(long pause, int tries) {
426 return currentScannerCallable.sleep(pause, tries);
427 }
428 }