1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.KeyValue.MetaComparator;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellComparator;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.NotServingRegionException;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.UnknownScannerException;
43 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
44 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
48 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
49 import org.apache.hadoop.hbase.util.Bytes;
50
51 import com.google.common.annotations.VisibleForTesting;
52
53
54
55
56
57
58 @InterfaceAudience.Private
59 public class ClientScanner extends AbstractClientScanner {
60 private static final Log LOG = LogFactory.getLog(ClientScanner.class);
61
62
63 static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
64 protected Scan scan;
65 protected boolean closed = false;
66
67
68 protected HRegionInfo currentRegion = null;
69 protected ScannerCallableWithReplicas callable = null;
70 protected final LinkedList<Result> cache = new LinkedList<Result>();
71
72
73
74
75
76 protected final LinkedList<Result> partialResults = new LinkedList<Result>();
77
78
79
80
81
82 protected byte[] partialResultsRow = null;
83
84
85
86 protected Cell lastCellLoadedToCache = null;
87 protected final int caching;
88 protected long lastNext;
89
90 protected Result lastResult = null;
91 protected final long maxScannerResultSize;
92 private final ClusterConnection connection;
93 private final TableName tableName;
94 protected final int scannerTimeout;
95 protected boolean scanMetricsPublished = false;
96 protected RpcRetryingCaller<Result []> caller;
97 protected RpcControllerFactory rpcControllerFactory;
98 protected Configuration conf;
99
100
101
102
103
104 protected final int primaryOperationTimeout;
105 private int retries;
106 protected final ExecutorService pool;
107 private static MetaComparator metaComparator = new MetaComparator();
108
109
110
111
112
113
114
115
116
117
118 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
119 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
120 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
121 throws IOException {
122 if (LOG.isTraceEnabled()) {
123 LOG.trace("Scan table=" + tableName
124 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
125 }
126 this.scan = scan;
127 this.tableName = tableName;
128 this.lastNext = System.currentTimeMillis();
129 this.connection = connection;
130 this.pool = pool;
131 this.primaryOperationTimeout = primaryOperationTimeout;
132 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
133 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
134 if (scan.getMaxResultSize() > 0) {
135 this.maxScannerResultSize = scan.getMaxResultSize();
136 } else {
137 this.maxScannerResultSize = conf.getLong(
138 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
139 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
140 }
141 this.scannerTimeout = HBaseConfiguration.getInt(conf,
142 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
143 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
144 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
145
146
147 initScanMetrics(scan);
148
149
150 if (this.scan.getCaching() > 0) {
151 this.caching = this.scan.getCaching();
152 } else {
153 this.caching = conf.getInt(
154 HConstants.HBASE_CLIENT_SCANNER_CACHING,
155 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
156 }
157
158 this.caller = rpcFactory.<Result[]> newCaller();
159 this.rpcControllerFactory = controllerFactory;
160
161 this.conf = conf;
162 initializeScannerInConstruction();
163 }
164
165 protected void initializeScannerInConstruction() throws IOException{
166
167 nextScanner(this.caching, false);
168 }
169
170 protected ClusterConnection getConnection() {
171 return this.connection;
172 }
173
174
175
176
177
178
179
180 @Deprecated
181 protected byte [] getTableName() {
182 return this.tableName.getName();
183 }
184
185 protected TableName getTable() {
186 return this.tableName;
187 }
188
189 protected int getRetries() {
190 return this.retries;
191 }
192
193 protected int getScannerTimeout() {
194 return this.scannerTimeout;
195 }
196
197 protected Configuration getConf() {
198 return this.conf;
199 }
200
201 protected Scan getScan() {
202 return scan;
203 }
204
205 protected ExecutorService getPool() {
206 return pool;
207 }
208
209 protected int getPrimaryOperationTimeout() {
210 return primaryOperationTimeout;
211 }
212
213 protected int getCaching() {
214 return caching;
215 }
216
217 protected long getTimestamp() {
218 return lastNext;
219 }
220
221 @VisibleForTesting
222 protected long getMaxResultSize() {
223 return maxScannerResultSize;
224 }
225
226
227 protected boolean checkScanStopRow(final byte [] endKey) {
228 if (this.scan.getStopRow().length > 0) {
229
230 byte [] stopRow = scan.getStopRow();
231 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
232 endKey, 0, endKey.length);
233 if (cmp <= 0) {
234
235
236 return true;
237 }
238 }
239 return false;
240 }
241
242 private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
243
244
245
246
247 if (callable != null && callable.switchedToADifferentReplica()) return true;
248 return nextScanner(nbRows, done);
249 }
250
251
252
253
254
255
256
257
258
259
260 protected boolean nextScanner(int nbRows, final boolean done)
261 throws IOException {
262
263 if (this.callable != null) {
264 this.callable.setClose();
265 call(callable, caller, scannerTimeout);
266 this.callable = null;
267 }
268
269
270 byte [] localStartKey;
271
272
273 if (this.currentRegion != null) {
274 byte [] endKey = this.currentRegion.getEndKey();
275 if (endKey == null ||
276 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
277 checkScanStopRow(endKey) ||
278 done) {
279 close();
280 if (LOG.isTraceEnabled()) {
281 LOG.trace("Finished " + this.currentRegion);
282 }
283 return false;
284 }
285 localStartKey = endKey;
286 if (LOG.isTraceEnabled()) {
287 LOG.trace("Finished " + this.currentRegion);
288 }
289 } else {
290 localStartKey = this.scan.getStartRow();
291 }
292
293 if (LOG.isDebugEnabled() && this.currentRegion != null) {
294
295 LOG.debug("Advancing internal scanner to startKey at '" +
296 Bytes.toStringBinary(localStartKey) + "'");
297 }
298 try {
299 callable = getScannerCallable(localStartKey, nbRows);
300
301
302 call(callable, caller, scannerTimeout);
303 this.currentRegion = callable.getHRegionInfo();
304 if (this.scanMetrics != null) {
305 this.scanMetrics.countOfRegions.incrementAndGet();
306 }
307 } catch (IOException e) {
308 close();
309 throw e;
310 }
311 return true;
312 }
313
314 @VisibleForTesting
315 boolean isAnyRPCcancelled() {
316 return callable.isAnyRPCcancelled();
317 }
318
319 Result[] call(ScannerCallableWithReplicas callable,
320 RpcRetryingCaller<Result[]> caller, int scannerTimeout)
321 throws IOException, RuntimeException {
322 if (Thread.interrupted()) {
323 throw new InterruptedIOException();
324 }
325
326
327 return caller.callWithoutRetries(callable, scannerTimeout);
328 }
329
330 @InterfaceAudience.Private
331 protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
332 int nbRows) {
333 scan.setStartRow(localStartKey);
334 ScannerCallable s =
335 new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
336 this.rpcControllerFactory);
337 s.setCaching(nbRows);
338 ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
339 s, pool, primaryOperationTimeout, scan,
340 retries, scannerTimeout, caching, conf, caller);
341 return sr;
342 }
343
344
345
346
347
348
349
350
351
352
353
354
355 protected void writeScanMetrics() {
356 if (this.scanMetrics == null || scanMetricsPublished) {
357 return;
358 }
359 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
360 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
361 scanMetricsPublished = true;
362 }
363
364 @Override
365 public Result next() throws IOException {
366
367 if (cache.size() == 0 && this.closed) {
368 return null;
369 }
370 if (cache.size() == 0) {
371 loadCache();
372 }
373
374 if (cache.size() > 0) {
375 return cache.poll();
376 }
377
378
379 writeScanMetrics();
380 return null;
381 }
382
383 @VisibleForTesting
384 public int getCacheSize() {
385 return cache != null ? cache.size() : 0;
386 }
387
388
389
390
391 protected void loadCache() throws IOException {
392 Result[] values = null;
393 long remainingResultSize = maxScannerResultSize;
394 int countdown = this.caching;
395
396 callable.setCaching(this.caching);
397
398
399 boolean retryAfterOutOfOrderException = true;
400
401
402 boolean serverHasMoreResults = false;
403 boolean allResultsSkipped = false;
404
405
406 int retriesLeft = getRetries();
407 do {
408 allResultsSkipped = false;
409 try {
410
411
412
413 values = call(callable, caller, scannerTimeout);
414
415
416
417
418
419 if (values == null && callable.switchedToADifferentReplica()) {
420
421
422 clearPartialResults();
423 this.currentRegion = callable.getHRegionInfo();
424 continue;
425 }
426 retryAfterOutOfOrderException = true;
427 } catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) {
428
429
430 clearPartialResults();
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446 Throwable cause = e.getCause();
447 if ((cause != null && cause instanceof NotServingRegionException) ||
448 (cause != null && cause instanceof RegionServerStoppedException) ||
449 e instanceof OutOfOrderScannerNextException ||
450 e instanceof UnknownScannerException ||
451 e instanceof ScannerResetException) {
452
453
454 if (retriesLeft-- <= 0) {
455 throw e;
456 }
457 } else {
458 throw e;
459 }
460
461
462 if (this.lastResult != null) {
463
464
465
466
467 if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
468 if (scan.isReversed()) {
469 scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
470 } else {
471 scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
472 }
473 } else {
474
475 scan.setStartRow(lastResult.getRow());
476 }
477 }
478 if (e instanceof OutOfOrderScannerNextException) {
479 if (retryAfterOutOfOrderException) {
480 retryAfterOutOfOrderException = false;
481 } else {
482
483 throw new DoNotRetryIOException("Failed after retry of " +
484 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
485 }
486 }
487
488 this.currentRegion = null;
489
490
491 callable = null;
492
493 continue;
494 }
495 long currentTime = System.currentTimeMillis();
496 if (this.scanMetrics != null) {
497 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
498 }
499 lastNext = currentTime;
500
501
502
503 List<Result> resultsToAddToCache =
504 getResultsToAddToCache(values, callable.isHeartbeatMessage());
505 if (!resultsToAddToCache.isEmpty()) {
506 for (Result rs : resultsToAddToCache) {
507 rs = filterLoadedCell(rs);
508 if (rs == null) {
509 continue;
510 }
511 cache.add(rs);
512 for (Cell cell : rs.rawCells()) {
513 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
514 }
515 countdown--;
516 this.lastResult = rs;
517 if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
518 updateLastCellLoadedToCache(this.lastResult);
519 } else {
520 this.lastCellLoadedToCache = null;
521 }
522 }
523 if (cache.isEmpty()) {
524
525 allResultsSkipped = true;
526 continue;
527 }
528 }
529 if (callable.isHeartbeatMessage()) {
530 if (cache.size() > 0) {
531
532
533
534
535 if (LOG.isTraceEnabled()) {
536 LOG.trace("Heartbeat message received and cache contains Results."
537 + " Breaking out of scan loop");
538 }
539 break;
540 }
541 continue;
542 }
543
544
545
546
547
548 if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
549
550
551 serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
552 }
553
554
555
556
557 } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
558 || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
559 && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
560 }
561
562
563
564
565
566
567
568
569 private boolean doneWithRegion(long remainingResultSize, int remainingRows,
570 boolean regionHasMoreResults) {
571 return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
572 }
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587 protected List<Result>
588 getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
589 throws IOException {
590 int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
591 List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
592
593 final boolean isBatchSet = scan != null && scan.getBatch() > 0;
594 final boolean allowPartials = scan != null && scan.getAllowPartialResults();
595
596
597
598
599
600
601
602 if (allowPartials || isBatchSet) {
603 addResultsToList(resultsToAddToCache, resultsFromServer, 0,
604 (null == resultsFromServer ? 0 : resultsFromServer.length));
605 return resultsToAddToCache;
606 }
607
608
609
610
611 if (resultsFromServer == null || resultsFromServer.length == 0) {
612
613
614
615 if (!partialResults.isEmpty() && !heartbeatMessage) {
616 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
617 clearPartialResults();
618 }
619
620 return resultsToAddToCache;
621 }
622
623
624
625 Result last = resultsFromServer[resultsFromServer.length - 1];
626 Result partial = last.isPartial() ? last : null;
627
628 if (LOG.isTraceEnabled()) {
629 StringBuilder sb = new StringBuilder();
630 sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
631 sb.append("partial != null: ").append(partial != null).append(",");
632 sb.append("number of partials so far: ").append(partialResults.size());
633 LOG.trace(sb.toString());
634 }
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656 if (partial != null && partialResults.isEmpty()) {
657 addToPartialResults(partial);
658
659
660 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
661 } else if (!partialResults.isEmpty()) {
662 for (int i = 0; i < resultsFromServer.length; i++) {
663 Result result = resultsFromServer[i];
664
665
666
667 if (Bytes.equals(partialResultsRow, result.getRow())) {
668 addToPartialResults(result);
669
670
671
672 if (!result.isPartial()) {
673 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
674 clearPartialResults();
675 }
676 } else {
677
678
679
680 if (!partialResults.isEmpty()) {
681 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
682 clearPartialResults();
683 }
684
685
686
687
688 if (result.isPartial()) {
689 addToPartialResults(result);
690 } else {
691 resultsToAddToCache.add(result);
692 }
693 }
694 }
695 } else {
696 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
697 }
698
699 return resultsToAddToCache;
700 }
701
702
703
704
705
706
707
708 private void addToPartialResults(final Result result) throws IOException {
709 final byte[] row = result.getRow();
710 if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
711 throw new IOException("Partial result row does not match. All partial results must come "
712 + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
713 + Bytes.toString(row));
714 }
715 partialResultsRow = row;
716 partialResults.add(result);
717 }
718
719
720
721
722 private void clearPartialResults() {
723 partialResults.clear();
724 partialResultsRow = null;
725 }
726
727
728
729
730
731
732
733
734 private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
735 if (inputArray == null || start < 0 || end > inputArray.length) return;
736
737 for (int i = start; i < end; i++) {
738 outputList.add(inputArray[i]);
739 }
740 }
741
742 @Override
743 public void close() {
744 if (!scanMetricsPublished) writeScanMetrics();
745 if (callable != null) {
746 callable.setClose();
747 try {
748 call(callable, caller, scannerTimeout);
749 } catch (UnknownScannerException e) {
750
751
752
753 } catch (IOException e) {
754
755 LOG.warn("scanner failed to close. Exception follows: " + e);
756 }
757 callable = null;
758 }
759 closed = true;
760 }
761
762
763
764
765
766
767 protected static byte[] createClosestRowBefore(byte[] row) {
768 if (row == null) {
769 throw new IllegalArgumentException("The passed row is empty");
770 }
771 if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
772 return MAX_BYTE_ARRAY;
773 }
774 if (row[row.length - 1] == 0) {
775 return Arrays.copyOf(row, row.length - 1);
776 } else {
777 byte[] closestFrontRow = Arrays.copyOf(row, row.length);
778 closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
779 closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
780 return closestFrontRow;
781 }
782 }
783
784 @Override
785 public boolean renewLease() {
786 if (callable != null) {
787
788 callable.setRenew(true);
789 try {
790 this.caller.callWithoutRetries(callable, this.scannerTimeout);
791 } catch (Exception e) {
792 return false;
793 } finally {
794 callable.setRenew(false);
795 }
796 return true;
797 }
798 return false;
799 }
800
801 protected void updateLastCellLoadedToCache(Result result) {
802 if (result.rawCells().length == 0) {
803 return;
804 }
805 this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
806 }
807
808
809
810
811
812 private int compare(Cell a, Cell b) {
813 int r = 0;
814 if (currentRegion != null && currentRegion.isMetaRegion()) {
815 r = metaComparator.compareRows(a, b);
816 } else {
817 r = CellComparator.compareRows(a, b);
818 }
819 if (r != 0) {
820 return this.scan.isReversed() ? -r : r;
821 }
822 return CellComparator.compareWithoutRow(a, b);
823 }
824
825 private Result filterLoadedCell(Result result) {
826
827
828
829
830 if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
831 return result;
832 }
833 if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
834
835
836 return result;
837 }
838 if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
839
840 return null;
841 }
842
843
844 int index = 1;
845 while (index < result.rawCells().length) {
846 if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
847 break;
848 }
849 index++;
850 }
851 Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
852 return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
853 }
854 }