1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.KeyValue.MetaComparator;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellComparator;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.NotServingRegionException;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.UnknownScannerException;
43 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
44 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
48 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
49 import org.apache.hadoop.hbase.util.Bytes;
50
51 import com.google.common.annotations.VisibleForTesting;
52
53
54
55
56
57
58 @InterfaceAudience.Private
59 public class ClientScanner extends AbstractClientScanner {
60 private final Log LOG = LogFactory.getLog(this.getClass());
61
62
63 static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
64 protected Scan scan;
65 protected boolean closed = false;
66
67
68 protected HRegionInfo currentRegion = null;
69 protected ScannerCallableWithReplicas callable = null;
70 protected final LinkedList<Result> cache = new LinkedList<Result>();
71
72
73
74
75
76 protected final LinkedList<Result> partialResults = new LinkedList<Result>();
77
78
79
80
81
82 protected byte[] partialResultsRow = null;
83
84
85
86 protected Cell lastCellLoadedToCache = null;
87 protected final int caching;
88 protected long lastNext;
89
90 protected Result lastResult = null;
91 protected final long maxScannerResultSize;
92 private final ClusterConnection connection;
93 private final TableName tableName;
94 protected final int scannerTimeout;
95 protected boolean scanMetricsPublished = false;
96 protected RpcRetryingCaller<Result []> caller;
97 protected RpcControllerFactory rpcControllerFactory;
98 protected Configuration conf;
99
100
101
102
103
104 protected final int primaryOperationTimeout;
105 private int retries;
106 protected final ExecutorService pool;
107 private static MetaComparator metaComparator = new MetaComparator();
108
109
110
111
112
113
114
115
116
117
118 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
119 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
120 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
121 throws IOException {
122 if (LOG.isTraceEnabled()) {
123 LOG.trace("Scan table=" + tableName
124 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
125 }
126 this.scan = scan;
127 this.tableName = tableName;
128 this.lastNext = System.currentTimeMillis();
129 this.connection = connection;
130 this.pool = pool;
131 this.primaryOperationTimeout = primaryOperationTimeout;
132 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
133 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
134 if (scan.getMaxResultSize() > 0) {
135 this.maxScannerResultSize = scan.getMaxResultSize();
136 } else {
137 this.maxScannerResultSize = conf.getLong(
138 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
139 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
140 }
141 this.scannerTimeout = HBaseConfiguration.getInt(conf,
142 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
143 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
144 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
145
146
147 initScanMetrics(scan);
148
149
150 if (this.scan.getCaching() > 0) {
151 this.caching = this.scan.getCaching();
152 } else {
153 this.caching = conf.getInt(
154 HConstants.HBASE_CLIENT_SCANNER_CACHING,
155 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
156 }
157
158 this.caller = rpcFactory.<Result[]> newCaller();
159 this.rpcControllerFactory = controllerFactory;
160
161 this.conf = conf;
162 initializeScannerInConstruction();
163 }
164
165 protected void initializeScannerInConstruction() throws IOException{
166
167 nextScanner(this.caching, false);
168 }
169
170 protected ClusterConnection getConnection() {
171 return this.connection;
172 }
173
174
175
176
177
178
179
180 @Deprecated
181 protected byte [] getTableName() {
182 return this.tableName.getName();
183 }
184
185 protected TableName getTable() {
186 return this.tableName;
187 }
188
189 protected int getRetries() {
190 return this.retries;
191 }
192
193 protected int getScannerTimeout() {
194 return this.scannerTimeout;
195 }
196
197 protected Configuration getConf() {
198 return this.conf;
199 }
200
201 protected Scan getScan() {
202 return scan;
203 }
204
205 protected ExecutorService getPool() {
206 return pool;
207 }
208
209 protected int getPrimaryOperationTimeout() {
210 return primaryOperationTimeout;
211 }
212
213 protected int getCaching() {
214 return caching;
215 }
216
217 protected long getTimestamp() {
218 return lastNext;
219 }
220
221 @VisibleForTesting
222 protected long getMaxResultSize() {
223 return maxScannerResultSize;
224 }
225
226
227 protected boolean checkScanStopRow(final byte [] endKey) {
228 if (this.scan.getStopRow().length > 0) {
229
230 byte [] stopRow = scan.getStopRow();
231 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
232 endKey, 0, endKey.length);
233 if (cmp <= 0) {
234
235
236 return true;
237 }
238 }
239 return false;
240 }
241
242 private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
243
244
245
246
247 if (callable != null && callable.switchedToADifferentReplica()) return true;
248 return nextScanner(nbRows, done);
249 }
250
251
252
253
254
255
256
257
258
259
260 protected boolean nextScanner(int nbRows, final boolean done)
261 throws IOException {
262
263 if (this.callable != null) {
264 this.callable.setClose();
265 call(callable, caller, scannerTimeout);
266 this.callable = null;
267 }
268
269
270 byte [] localStartKey;
271
272
273 if (this.currentRegion != null) {
274 byte [] endKey = this.currentRegion.getEndKey();
275 if (endKey == null ||
276 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
277 checkScanStopRow(endKey) ||
278 done) {
279 close();
280 if (LOG.isTraceEnabled()) {
281 LOG.trace("Finished " + this.currentRegion);
282 }
283 return false;
284 }
285 localStartKey = endKey;
286 if (LOG.isTraceEnabled()) {
287 LOG.trace("Finished " + this.currentRegion);
288 }
289 } else {
290 localStartKey = this.scan.getStartRow();
291 }
292
293 if (LOG.isDebugEnabled() && this.currentRegion != null) {
294
295 LOG.debug("Advancing internal scanner to startKey at '" +
296 Bytes.toStringBinary(localStartKey) + "'");
297 }
298 try {
299 callable = getScannerCallable(localStartKey, nbRows);
300
301
302 call(callable, caller, scannerTimeout);
303 this.currentRegion = callable.getHRegionInfo();
304 if (this.scanMetrics != null) {
305 this.scanMetrics.countOfRegions.incrementAndGet();
306 }
307 } catch (IOException e) {
308 close();
309 throw e;
310 }
311 return true;
312 }
313
314 @VisibleForTesting
315 boolean isAnyRPCcancelled() {
316 return callable.isAnyRPCcancelled();
317 }
318
319 Result[] call(ScannerCallableWithReplicas callable,
320 RpcRetryingCaller<Result[]> caller, int scannerTimeout)
321 throws IOException, RuntimeException {
322 if (Thread.interrupted()) {
323 throw new InterruptedIOException();
324 }
325
326
327 return caller.callWithoutRetries(callable, scannerTimeout);
328 }
329
330 @InterfaceAudience.Private
331 protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
332 int nbRows) {
333 scan.setStartRow(localStartKey);
334 ScannerCallable s =
335 new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
336 this.rpcControllerFactory);
337 s.setCaching(nbRows);
338 ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
339 s, pool, primaryOperationTimeout, scan,
340 retries, scannerTimeout, caching, conf, caller);
341 return sr;
342 }
343
344
345
346
347
348
349
350
351
352
353
354
355 protected void writeScanMetrics() {
356 if (this.scanMetrics == null || scanMetricsPublished) {
357 return;
358 }
359 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
360 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
361 scanMetricsPublished = true;
362 }
363
364 @Override
365 public Result next() throws IOException {
366
367 if (cache.size() == 0 && this.closed) {
368 return null;
369 }
370 if (cache.size() == 0) {
371 loadCache();
372 }
373
374 if (cache.size() > 0) {
375 return cache.poll();
376 }
377
378
379 writeScanMetrics();
380 return null;
381 }
382
383 @VisibleForTesting
384 public int getCacheSize() {
385 return cache != null ? cache.size() : 0;
386 }
387
388
389
390
391 protected void loadCache() throws IOException {
392 Result[] values = null;
393 long remainingResultSize = maxScannerResultSize;
394 int countdown = this.caching;
395
396 callable.setCaching(this.caching);
397
398
399 boolean retryAfterOutOfOrderException = true;
400
401
402 boolean serverHasMoreResults = false;
403 boolean allResultsSkipped = false;
404 do {
405 allResultsSkipped = false;
406 try {
407
408
409
410 values = call(callable, caller, scannerTimeout);
411
412
413
414
415
416 if (values == null && callable.switchedToADifferentReplica()) {
417
418
419 clearPartialResults();
420 this.currentRegion = callable.getHRegionInfo();
421 continue;
422 }
423 retryAfterOutOfOrderException = true;
424 } catch (DoNotRetryIOException e) {
425
426
427 clearPartialResults();
428
429
430
431
432
433 Throwable cause = e.getCause();
434 if ((cause != null && cause instanceof NotServingRegionException) ||
435 (cause != null && cause instanceof RegionServerStoppedException) ||
436 e instanceof OutOfOrderScannerNextException ||
437 e instanceof UnknownScannerException ||
438 e instanceof ScannerResetException) {
439
440
441 } else {
442 throw e;
443 }
444
445
446 if (this.lastResult != null) {
447
448
449
450
451 if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
452 if (scan.isReversed()) {
453 scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
454 } else {
455 scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
456 }
457 } else {
458
459 scan.setStartRow(lastResult.getRow());
460 }
461 }
462 if (e instanceof OutOfOrderScannerNextException) {
463 if (retryAfterOutOfOrderException) {
464 retryAfterOutOfOrderException = false;
465 } else {
466
467 throw new DoNotRetryIOException("Failed after retry of " +
468 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
469 }
470 }
471
472 this.currentRegion = null;
473
474
475 callable = null;
476
477 continue;
478 }
479 long currentTime = System.currentTimeMillis();
480 if (this.scanMetrics != null) {
481 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
482 }
483 lastNext = currentTime;
484
485
486
487 List<Result> resultsToAddToCache =
488 getResultsToAddToCache(values, callable.isHeartbeatMessage());
489 if (!resultsToAddToCache.isEmpty()) {
490 for (Result rs : resultsToAddToCache) {
491 rs = filterLoadedCell(rs);
492 if (rs == null) {
493 continue;
494 }
495 cache.add(rs);
496 for (Cell cell : rs.rawCells()) {
497 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
498 }
499 countdown--;
500 this.lastResult = rs;
501 if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
502 updateLastCellLoadedToCache(this.lastResult);
503 } else {
504 this.lastCellLoadedToCache = null;
505 }
506 }
507 if (cache.isEmpty()) {
508
509 allResultsSkipped = true;
510 continue;
511 }
512 }
513 if (callable.isHeartbeatMessage()) {
514 if (cache.size() > 0) {
515
516
517
518
519 if (LOG.isTraceEnabled()) {
520 LOG.trace("Heartbeat message received and cache contains Results."
521 + " Breaking out of scan loop");
522 }
523 break;
524 }
525 continue;
526 }
527
528
529
530
531
532 if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
533
534
535 serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
536 }
537
538
539
540
541 } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
542 || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
543 && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
544 }
545
546
547
548
549
550
551
552
553 private boolean doneWithRegion(long remainingResultSize, int remainingRows,
554 boolean regionHasMoreResults) {
555 return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
556 }
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571 protected List<Result>
572 getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
573 throws IOException {
574 int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
575 List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
576
577 final boolean isBatchSet = scan != null && scan.getBatch() > 0;
578 final boolean allowPartials = scan != null && scan.getAllowPartialResults();
579
580
581
582
583
584
585
586 if (allowPartials || isBatchSet) {
587 addResultsToList(resultsToAddToCache, resultsFromServer, 0,
588 (null == resultsFromServer ? 0 : resultsFromServer.length));
589 return resultsToAddToCache;
590 }
591
592
593
594
595 if (resultsFromServer == null || resultsFromServer.length == 0) {
596
597
598
599 if (!partialResults.isEmpty() && !heartbeatMessage) {
600 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
601 clearPartialResults();
602 }
603
604 return resultsToAddToCache;
605 }
606
607
608
609 Result last = resultsFromServer[resultsFromServer.length - 1];
610 Result partial = last.isPartial() ? last : null;
611
612 if (LOG.isTraceEnabled()) {
613 StringBuilder sb = new StringBuilder();
614 sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
615 sb.append("partial != null: ").append(partial != null).append(",");
616 sb.append("number of partials so far: ").append(partialResults.size());
617 LOG.trace(sb.toString());
618 }
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640 if (partial != null && partialResults.isEmpty()) {
641 addToPartialResults(partial);
642
643
644 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
645 } else if (!partialResults.isEmpty()) {
646 for (int i = 0; i < resultsFromServer.length; i++) {
647 Result result = resultsFromServer[i];
648
649
650
651 if (Bytes.equals(partialResultsRow, result.getRow())) {
652 addToPartialResults(result);
653
654
655
656 if (!result.isPartial()) {
657 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
658 clearPartialResults();
659 }
660 } else {
661
662
663
664 if (!partialResults.isEmpty()) {
665 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
666 clearPartialResults();
667 }
668
669
670
671
672 if (result.isPartial()) {
673 addToPartialResults(result);
674 } else {
675 resultsToAddToCache.add(result);
676 }
677 }
678 }
679 } else {
680 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
681 }
682
683 return resultsToAddToCache;
684 }
685
686
687
688
689
690
691
692 private void addToPartialResults(final Result result) throws IOException {
693 final byte[] row = result.getRow();
694 if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
695 throw new IOException("Partial result row does not match. All partial results must come "
696 + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
697 + Bytes.toString(row));
698 }
699 partialResultsRow = row;
700 partialResults.add(result);
701 }
702
703
704
705
706 private void clearPartialResults() {
707 partialResults.clear();
708 partialResultsRow = null;
709 }
710
711
712
713
714
715
716
717
718 private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
719 if (inputArray == null || start < 0 || end > inputArray.length) return;
720
721 for (int i = start; i < end; i++) {
722 outputList.add(inputArray[i]);
723 }
724 }
725
726 @Override
727 public void close() {
728 if (!scanMetricsPublished) writeScanMetrics();
729 if (callable != null) {
730 callable.setClose();
731 try {
732 call(callable, caller, scannerTimeout);
733 } catch (UnknownScannerException e) {
734
735
736
737 } catch (IOException e) {
738
739 LOG.warn("scanner failed to close. Exception follows: " + e);
740 }
741 callable = null;
742 }
743 closed = true;
744 }
745
746
747
748
749
750
751 protected static byte[] createClosestRowBefore(byte[] row) {
752 if (row == null) {
753 throw new IllegalArgumentException("The passed row is empty");
754 }
755 if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
756 return MAX_BYTE_ARRAY;
757 }
758 if (row[row.length - 1] == 0) {
759 return Arrays.copyOf(row, row.length - 1);
760 } else {
761 byte[] closestFrontRow = Arrays.copyOf(row, row.length);
762 closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
763 closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
764 return closestFrontRow;
765 }
766 }
767
768 @Override
769 public boolean renewLease() {
770 if (callable != null) {
771
772 callable.setRenew(true);
773 try {
774 this.caller.callWithoutRetries(callable, this.scannerTimeout);
775 } catch (Exception e) {
776 return false;
777 } finally {
778 callable.setRenew(false);
779 }
780 return true;
781 }
782 return false;
783 }
784
785 protected void updateLastCellLoadedToCache(Result result) {
786 if (result.rawCells().length == 0) {
787 return;
788 }
789 this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
790 }
791
792
793
794
795
796 private int compare(Cell a, Cell b) {
797 int r = 0;
798 if (currentRegion != null && currentRegion.isMetaRegion()) {
799 r = metaComparator.compareRows(a, b);
800 } else {
801 r = CellComparator.compareRows(a, b);
802 }
803 if (r != 0) {
804 return this.scan.isReversed() ? -r : r;
805 }
806 return CellComparator.compareWithoutRow(a, b);
807 }
808
809 private Result filterLoadedCell(Result result) {
810
811
812
813
814 if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
815 return result;
816 }
817 if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
818
819
820 return result;
821 }
822 if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
823
824 return null;
825 }
826
827
828 int index = 1;
829 while (index < result.rawCells().length) {
830 if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
831 break;
832 }
833 index++;
834 }
835 Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
836 return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
837 }
838 }