1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.NavigableSet;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.KeyValue.KVComparator;
39 import org.apache.hadoop.hbase.KeyValueUtil;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.client.IsolationLevel;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.executor.ExecutorService;
44 import org.apache.hadoop.hbase.filter.Filter;
45 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class StoreScanner extends NonReversedNonLazyKeyValueScanner
58 implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
59 static final Log LOG = LogFactory.getLog(StoreScanner.class);
60 protected Store store;
61 protected ScanQueryMatcher matcher;
62 protected KeyValueHeap heap;
63 protected boolean cacheBlocks;
64
65 protected int countPerRow = 0;
66 protected int storeLimit = -1;
67 protected int storeOffset = 0;
68
69
70
71 protected boolean closing = false;
72 protected final boolean isGet;
73 protected final boolean explicitColumnQuery;
74 protected final boolean useRowColBloom;
75
76
77
78 protected boolean isParallelSeekEnabled = false;
79 protected ExecutorService executor;
80 protected final Scan scan;
81 protected final NavigableSet<byte[]> columns;
82 protected final long oldestUnexpiredTS;
83 protected final long now;
84 protected final int minVersions;
85 protected final long maxRowSize;
86 protected final long cellsPerHeartbeatCheck;
87
88
89
90
91
92 private long kvsScanned = 0;
93 private Cell prevCell = null;
94
95
96 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
97 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
98 "hbase.storescanner.parallel.seek.enable";
99
100
101 protected static boolean lazySeekEnabledGlobally =
102 LAZY_SEEK_ENABLED_BY_DEFAULT;
103
104
105
106
107
108
109 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
110 "hbase.cells.scanned.per.heartbeat.check";
111
112
113
114
115 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
116
117
118 protected Cell lastTop = null;
119
120
121 private boolean scanUsePread = false;
122 protected ReentrantLock lock = new ReentrantLock();
123
124 private final long readPt;
125
126
127 enum StoreScannerCompactionRace {
128 BEFORE_SEEK,
129 AFTER_SEEK,
130 COMPACT_COMPLETE
131 }
132
133
134 protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
135 final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
136 this.readPt = readPt;
137 this.store = store;
138 this.cacheBlocks = cacheBlocks;
139 isGet = scan.isGetScan();
140 int numCol = columns == null ? 0 : columns.size();
141 explicitColumnQuery = numCol > 0;
142 this.scan = scan;
143 this.columns = columns;
144 this.now = EnvironmentEdgeManager.currentTime();
145 this.oldestUnexpiredTS = now - ttl;
146 this.minVersions = minVersions;
147
148 if (store != null && ((HStore)store).getHRegion() != null
149 && ((HStore)store).getHRegion().getBaseConf() != null) {
150 Configuration conf = ((HStore) store).getHRegion().getBaseConf();
151 this.maxRowSize =
152 conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
153 this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
154
155 long tmpCellsPerTimeoutCheck =
156 conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
157 DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
158 this.cellsPerHeartbeatCheck =
159 tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
160 : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
161 } else {
162 this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
163 this.scanUsePread = scan.isSmall();
164 this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
165 }
166
167
168
169
170
171 useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
172
173
174
175
176 if (store != null && ((HStore)store).getHRegion() != null
177 && store.getStorefilesCount() > 1) {
178 RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
179 if (rsService == null || !rsService.getConfiguration().getBoolean(
180 STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
181 isParallelSeekEnabled = true;
182 executor = rsService.getExecutorService();
183 }
184 }
185
186
187
188
189
190
191
192
193
194
195 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
196 long readPt)
197 throws IOException {
198 this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
199 scanInfo.getMinVersions(), readPt);
200 if (columns != null && scan.isRaw()) {
201 throw new DoNotRetryIOException(
202 "Cannot specify any column for a raw scan");
203 }
204 matcher = new ScanQueryMatcher(scan, scanInfo, columns,
205 ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
206 oldestUnexpiredTS, now, store.getCoprocessorHost());
207
208 this.store.addChangedReaderObserver(this);
209
210 try {
211
212 List<KeyValueScanner> scanners = getScannersNoCompaction();
213
214
215
216
217
218 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
219 isParallelSeekEnabled);
220
221
222 this.storeLimit = scan.getMaxResultsPerColumnFamily();
223
224
225 this.storeOffset = scan.getRowOffsetPerColumnFamily();
226
227
228 resetKVHeap(scanners, store.getComparator());
229 } catch (IOException e) {
230
231
232 this.store.deleteChangedReaderObserver(this);
233 throw e;
234 }
235 }
236
237
238
239
240
241
242
243
244
245
246
247 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
248 List<? extends KeyValueScanner> scanners, ScanType scanType,
249 long smallestReadPoint, long earliestPutTs) throws IOException {
250 this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
265 List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
266 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
267 this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
268 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
269 }
270
271 private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
272 List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
273 long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
274 this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
275 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
276 if (dropDeletesFromRow == null) {
277 matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
278 earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
279 } else {
280 matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
281 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
282 }
283
284
285 scanners = selectScannersFrom(scanners);
286
287
288 seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
289
290
291 resetKVHeap(scanners, store.getComparator());
292 }
293
294
295 StoreScanner(final Scan scan, ScanInfo scanInfo,
296 ScanType scanType, final NavigableSet<byte[]> columns,
297 final List<KeyValueScanner> scanners) throws IOException {
298 this(scan, scanInfo, scanType, columns, scanners,
299 HConstants.LATEST_TIMESTAMP,
300
301 0);
302 }
303
304
305 StoreScanner(final Scan scan, ScanInfo scanInfo,
306 ScanType scanType, final NavigableSet<byte[]> columns,
307 final List<KeyValueScanner> scanners, long earliestPutTs)
308 throws IOException {
309 this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
310
311 0);
312 }
313
314 private StoreScanner(final Scan scan, ScanInfo scanInfo,
315 ScanType scanType, final NavigableSet<byte[]> columns,
316 final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
317 throws IOException {
318 this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
319 scanInfo.getMinVersions(), readPt);
320 this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
321 Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
322
323
324 if (this.store != null) {
325 this.store.addChangedReaderObserver(this);
326 }
327
328 seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
329 resetKVHeap(scanners, scanInfo.getComparator());
330 }
331
332
333
334
335
336 protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
337 final boolean isCompaction = false;
338 boolean usePread = isGet || scanUsePread;
339 return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
340 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
341 }
342
343
344
345
346
347
348
349
350
351 protected void seekScanners(List<? extends KeyValueScanner> scanners,
352 Cell seekKey, boolean isLazy, boolean isParallelSeek)
353 throws IOException {
354
355
356
357
358 if (isLazy) {
359 for (KeyValueScanner scanner : scanners) {
360 scanner.requestSeek(seekKey, false, true);
361 }
362 } else {
363 if (!isParallelSeek) {
364 long totalScannersSoughtBytes = 0;
365 for (KeyValueScanner scanner : scanners) {
366 if (totalScannersSoughtBytes >= maxRowSize) {
367 throw new RowTooBigException("Max row size allowed: " + maxRowSize
368 + ", but row is bigger than that");
369 }
370 scanner.seek(seekKey);
371 Cell c = scanner.peek();
372 if (c != null) {
373 totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
374 }
375 }
376 } else {
377 parallelSeek(scanners, seekKey);
378 }
379 }
380 }
381
382 protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
383 KVComparator comparator) throws IOException {
384
385 heap = new KeyValueHeap(scanners, comparator);
386 }
387
388
389
390
391
392 protected List<KeyValueScanner> selectScannersFrom(
393 final List<? extends KeyValueScanner> allScanners) {
394 boolean memOnly;
395 boolean filesOnly;
396 if (scan instanceof InternalScan) {
397 InternalScan iscan = (InternalScan)scan;
398 memOnly = iscan.isCheckOnlyMemStore();
399 filesOnly = iscan.isCheckOnlyStoreFiles();
400 } else {
401 memOnly = false;
402 filesOnly = false;
403 }
404
405 List<KeyValueScanner> scanners =
406 new ArrayList<KeyValueScanner>(allScanners.size());
407
408
409
410 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
411 Long.MIN_VALUE;
412
413
414 for (KeyValueScanner kvs : allScanners) {
415 boolean isFile = kvs.isFileScanner();
416 if ((!isFile && filesOnly) || (isFile && memOnly)) {
417 continue;
418 }
419
420 if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
421 scanners.add(kvs);
422 }
423 }
424 return scanners;
425 }
426
427 @Override
428 public Cell peek() {
429 lock.lock();
430 try {
431 if (this.heap == null) {
432 return this.lastTop;
433 }
434 return this.heap.peek();
435 } finally {
436 lock.unlock();
437 }
438 }
439
440 @Override
441 public KeyValue next() {
442
443 throw new RuntimeException("Never call StoreScanner.next()");
444 }
445
446 @Override
447 public void close() {
448 lock.lock();
449 try {
450 if (this.closing) return;
451 this.closing = true;
452
453 if (this.store != null)
454 this.store.deleteChangedReaderObserver(this);
455 if (this.heap != null)
456 this.heap.close();
457 this.heap = null;
458 this.lastTop = null;
459 } finally {
460 lock.unlock();
461 }
462 }
463
464 @Override
465 public boolean seek(Cell key) throws IOException {
466 lock.lock();
467 try {
468
469 checkReseek();
470 return this.heap.seek(key);
471 } finally {
472 lock.unlock();
473 }
474 }
475
476 @Override
477 public boolean next(List<Cell> outResult) throws IOException {
478 return next(outResult, NoLimitScannerContext.getInstance());
479 }
480
481
482
483
484
485
486
487 @Override
488 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
489 lock.lock();
490
491 try {
492 if (scannerContext == null) {
493 throw new IllegalArgumentException("Scanner context cannot be null");
494 }
495 if (checkReseek()) {
496 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
497 }
498
499
500
501 if (this.heap == null) {
502 close();
503 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504 }
505
506 Cell cell = this.heap.peek();
507 if (cell == null) {
508 close();
509 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
510 }
511
512
513
514 byte[] row = cell.getRowArray();
515 int offset = cell.getRowOffset();
516 short length = cell.getRowLength();
517
518
519
520
521 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null) {
522 this.countPerRow = 0;
523 matcher.setRow(row, offset, length);
524 }
525
526
527 if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
528
529
530 KeyValue.KVComparator comparator =
531 store != null ? store.getComparator() : null;
532
533 int count = 0;
534 long totalBytesRead = 0;
535
536 LOOP: do {
537
538 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
539 scannerContext.updateTimeProgress();
540 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
541 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
542 }
543 }
544
545 if (prevCell != cell) ++kvsScanned;
546 checkScanOrder(prevCell, cell, comparator);
547 prevCell = cell;
548
549 ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
550 qcode = optimize(qcode, cell);
551 switch(qcode) {
552 case INCLUDE:
553 case INCLUDE_AND_SEEK_NEXT_ROW:
554 case INCLUDE_AND_SEEK_NEXT_COL:
555
556 Filter f = matcher.getFilter();
557 if (f != null) {
558
559 cell = f.transformCell(cell);
560 }
561
562 this.countPerRow++;
563 if (storeLimit > -1 &&
564 this.countPerRow > (storeLimit + storeOffset)) {
565
566 if (!matcher.moreRowsMayExistAfter(cell)) {
567 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
568 }
569
570
571
572 matcher.row = null;
573 seekToNextRow(cell);
574 break LOOP;
575 }
576
577
578
579 if (this.countPerRow > storeOffset) {
580 outResult.add(cell);
581
582
583 count++;
584 totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
585
586
587 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
588 scannerContext.incrementBatchProgress(1);
589
590 if (totalBytesRead > maxRowSize) {
591 throw new RowTooBigException("Max row size allowed: " + maxRowSize
592 + ", but the row is bigger than that.");
593 }
594 }
595
596 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
597 if (!matcher.moreRowsMayExistAfter(cell)) {
598 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
599 }
600
601
602
603 matcher.row = null;
604 seekToNextRow(cell);
605 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
606 seekAsDirection(matcher.getKeyForNextColumn(cell));
607 } else {
608 this.heap.next();
609 }
610
611 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
612 break LOOP;
613 }
614 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
615 break LOOP;
616 }
617 continue;
618
619 case DONE:
620
621
622
623 matcher.row = null;
624 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
625
626 case DONE_SCAN:
627 close();
628 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
629
630 case SEEK_NEXT_ROW:
631
632
633 if (!matcher.moreRowsMayExistAfter(cell)) {
634 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
635 }
636
637
638
639 matcher.row = null;
640 seekToNextRow(cell);
641 break;
642
643 case SEEK_NEXT_COL:
644 seekAsDirection(matcher.getKeyForNextColumn(cell));
645 break;
646
647 case SKIP:
648 this.heap.next();
649 break;
650
651 case SEEK_NEXT_USING_HINT:
652
653 Cell nextKV = matcher.getNextKeyHint(cell);
654 if (nextKV != null) {
655 seekAsDirection(nextKV);
656 } else {
657 heap.next();
658 }
659 break;
660
661 default:
662 throw new RuntimeException("UNEXPECTED");
663 }
664 } while((cell = this.heap.peek()) != null);
665
666 if (count > 0) {
667 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
668 }
669
670
671 close();
672 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
673 } finally {
674 lock.unlock();
675 }
676 }
677
678
679
680
681
682 private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
683 switch(qcode) {
684 case INCLUDE_AND_SEEK_NEXT_COL:
685 case SEEK_NEXT_COL:
686 {
687 Cell nextIndexedKey = getNextIndexedKey();
688 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
689 && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
690 return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
691 }
692 break;
693 }
694 case INCLUDE_AND_SEEK_NEXT_ROW:
695 case SEEK_NEXT_ROW:
696 {
697 Cell nextIndexedKey = getNextIndexedKey();
698 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
699 && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
700 return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
701 }
702 break;
703 }
704 default:
705 break;
706 }
707 return qcode;
708 }
709
710
711 @Override
712 public void updateReaders() throws IOException {
713 lock.lock();
714 try {
715 if (this.closing) return;
716
717
718
719
720
721
722 if (this.heap == null) return;
723
724
725 this.lastTop = this.peek();
726
727
728
729
730 this.heap.close();
731 this.heap = null;
732
733
734 } finally {
735 lock.unlock();
736 }
737 }
738
739
740
741
742
743
744 protected boolean checkReseek() throws IOException {
745 if (this.heap == null && this.lastTop != null) {
746 resetScannerStack(this.lastTop);
747 if (this.heap.peek() == null
748 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
749 LOG.debug("Storescanner.peek() is changed where before = "
750 + this.lastTop.toString() + ",and after = " + this.heap.peek());
751 this.lastTop = null;
752 return true;
753 }
754 this.lastTop = null;
755 }
756
757 return false;
758 }
759
760 protected void resetScannerStack(Cell lastTopKey) throws IOException {
761 if (heap != null) {
762 throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
763 }
764
765
766
767
768 List<KeyValueScanner> scanners = getScannersNoCompaction();
769
770
771 seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
772
773
774 resetKVHeap(scanners, store.getComparator());
775
776
777
778
779 Cell kv = heap.peek();
780 if (kv == null) {
781 kv = lastTopKey;
782 }
783 byte[] row = kv.getRowArray();
784 int offset = kv.getRowOffset();
785 short length = kv.getRowLength();
786 if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
787 matcher.rowOffset, matcher.rowLength)) {
788 this.countPerRow = 0;
789 matcher.reset();
790 matcher.setRow(row, offset, length);
791 }
792 }
793
794
795
796
797
798
799
800
801 protected void checkScanOrder(Cell prevKV, Cell kv,
802 KeyValue.KVComparator comparator) throws IOException {
803
804 assert prevKV == null || comparator == null
805 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
806 + " followed by a " + "smaller key " + kv + " in cf " + store;
807 }
808
809 protected boolean seekToNextRow(Cell kv) throws IOException {
810 return reseek(KeyValueUtil.createLastOnRow(kv));
811 }
812
813
814
815
816
817
818
819 protected boolean seekAsDirection(Cell kv)
820 throws IOException {
821 return reseek(kv);
822 }
823
824 @Override
825 public boolean reseek(Cell kv) throws IOException {
826 lock.lock();
827 try {
828
829
830
831 checkReseek();
832 if (explicitColumnQuery && lazySeekEnabledGlobally) {
833 return heap.requestSeek(kv, true, useRowColBloom);
834 }
835 return heap.reseek(kv);
836 } finally {
837 lock.unlock();
838 }
839 }
840
841 @Override
842 public long getSequenceID() {
843 return 0;
844 }
845
846
847
848
849
850
851
852 private void parallelSeek(final List<? extends KeyValueScanner>
853 scanners, final Cell kv) throws IOException {
854 if (scanners.isEmpty()) return;
855 int storeFileScannerCount = scanners.size();
856 CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
857 List<ParallelSeekHandler> handlers =
858 new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
859 for (KeyValueScanner scanner : scanners) {
860 if (scanner instanceof StoreFileScanner) {
861 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
862 this.readPt, latch);
863 executor.submit(seekHandler);
864 handlers.add(seekHandler);
865 } else {
866 scanner.seek(kv);
867 latch.countDown();
868 }
869 }
870
871 try {
872 latch.await();
873 } catch (InterruptedException ie) {
874 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
875 }
876
877 for (ParallelSeekHandler handler : handlers) {
878 if (handler.getErr() != null) {
879 throw new IOException(handler.getErr());
880 }
881 }
882 }
883
884
885
886
887
888 List<KeyValueScanner> getAllScannersForTesting() {
889 List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
890 KeyValueScanner current = heap.getCurrentForTesting();
891 if (current != null)
892 allScanners.add(current);
893 for (KeyValueScanner scanner : heap.getHeap())
894 allScanners.add(scanner);
895 return allScanners;
896 }
897
898 static void enableLazySeekGlobally(boolean enable) {
899 lazySeekEnabledGlobally = enable;
900 }
901
902
903
904
905 public long getEstimatedNumberOfKvsScanned() {
906 return this.kvsScanned;
907 }
908
909 @Override
910 public Cell getNextIndexedKey() {
911 return this.heap.getNextIndexedKey();
912 }
913 }
914