1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.NavigableSet;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.KeyValue.KVComparator;
38 import org.apache.hadoop.hbase.KeyValueUtil;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.client.IsolationLevel;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.executor.ExecutorService;
43 import org.apache.hadoop.hbase.filter.Filter;
44 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
45 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
46 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
47 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
51 import com.google.common.annotations.VisibleForTesting;
52
53
54
55
56
57 @InterfaceAudience.Private
58 public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59 implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60 private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61
62 protected final Store store;
63 protected ScanQueryMatcher matcher;
64 protected KeyValueHeap heap;
65 protected boolean cacheBlocks;
66
67 protected long countPerRow = 0;
68 protected int storeLimit = -1;
69 protected int storeOffset = 0;
70
71
72
73 protected boolean closing = false;
74 protected final boolean get;
75 protected final boolean explicitColumnQuery;
76 protected final boolean useRowColBloom;
77
78
79
80 protected boolean parallelSeekEnabled = false;
81 protected ExecutorService executor;
82 protected final Scan scan;
83 protected final NavigableSet<byte[]> columns;
84 protected final long oldestUnexpiredTS;
85 protected final long now;
86 protected final int minVersions;
87 protected final long maxRowSize;
88 protected final long cellsPerHeartbeatCheck;
89
90
91
92
93
94 private long kvsScanned = 0;
95 private Cell prevCell = null;
96
97
98 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
99 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
100 "hbase.storescanner.parallel.seek.enable";
101
102
103 protected static boolean lazySeekEnabledGlobally =
104 LAZY_SEEK_ENABLED_BY_DEFAULT;
105
106
107
108
109
110
111 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
112 "hbase.cells.scanned.per.heartbeat.check";
113
114
115
116
117 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
118
119
120 protected Cell lastTop = null;
121
122
123 private boolean scanUsePread = false;
124 protected ReentrantLock lock = new ReentrantLock();
125
126 private final long readPt;
127
128
129 enum StoreScannerCompactionRace {
130 BEFORE_SEEK,
131 AFTER_SEEK,
132 COMPACT_COMPLETE
133 }
134
135
136 protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
137 final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
138 this.readPt = readPt;
139 this.store = store;
140 this.cacheBlocks = cacheBlocks;
141 get = scan.isGetScan();
142 int numCol = columns == null ? 0 : columns.size();
143 explicitColumnQuery = numCol > 0;
144 this.scan = scan;
145 this.columns = columns;
146 this.now = EnvironmentEdgeManager.currentTime();
147 this.oldestUnexpiredTS = now - scanInfo.getTtl();
148 this.minVersions = scanInfo.getMinVersions();
149
150
151
152
153
154 this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
155
156 this.maxRowSize = scanInfo.getTableMaxRowSize();
157 this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
158 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
159
160 if (this.store != null && this.store.getStorefilesCount() > 1) {
161 RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
162 if (rsService != null && scanInfo.isParallelSeekEnabled()) {
163 this.parallelSeekEnabled = true;
164 this.executor = rsService.getExecutorService();
165 }
166 }
167 }
168
169
170
171
172
173
174
175
176
177
178 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
179 long readPt)
180 throws IOException {
181 this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
182 if (columns != null && scan.isRaw()) {
183 throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
184 }
185 matcher = new ScanQueryMatcher(scan, scanInfo, columns,
186 ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
187 oldestUnexpiredTS, now, store.getCoprocessorHost());
188
189 this.store.addChangedReaderObserver(this);
190
191 try {
192
193 List<KeyValueScanner> scanners = getScannersNoCompaction();
194
195
196
197
198
199 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
200 parallelSeekEnabled);
201
202
203 this.storeLimit = scan.getMaxResultsPerColumnFamily();
204
205
206 this.storeOffset = scan.getRowOffsetPerColumnFamily();
207
208
209 resetKVHeap(scanners, store.getComparator());
210 } catch (IOException e) {
211
212
213 this.store.deleteChangedReaderObserver(this);
214 throw e;
215 }
216 }
217
218
219
220
221
222
223
224
225
226
227
228 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
229 List<? extends KeyValueScanner> scanners, ScanType scanType,
230 long smallestReadPoint, long earliestPutTs) throws IOException {
231 this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
246 List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
247 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
248 this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
249 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
250 }
251
252 private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
253 List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
254 long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
255 this(store, scan, scanInfo, null,
256 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
257 if (dropDeletesFromRow == null) {
258 matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
259 earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
260 } else {
261 matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
262 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
263 }
264
265
266 scanners = selectScannersFrom(scanners);
267
268
269 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
270
271
272 resetKVHeap(scanners, store.getComparator());
273 }
274
275 @VisibleForTesting
276 StoreScanner(final Scan scan, ScanInfo scanInfo,
277 ScanType scanType, final NavigableSet<byte[]> columns,
278 final List<KeyValueScanner> scanners) throws IOException {
279 this(scan, scanInfo, scanType, columns, scanners,
280 HConstants.LATEST_TIMESTAMP,
281
282 0);
283 }
284
285 @VisibleForTesting
286 StoreScanner(final Scan scan, ScanInfo scanInfo,
287 ScanType scanType, final NavigableSet<byte[]> columns,
288 final List<KeyValueScanner> scanners, long earliestPutTs)
289 throws IOException {
290 this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
291
292 0);
293 }
294
295 private StoreScanner(final Scan scan, ScanInfo scanInfo,
296 ScanType scanType, final NavigableSet<byte[]> columns,
297 final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
298 throws IOException {
299 this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
300 this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
301 Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
302
303
304 if (this.store != null) {
305 this.store.addChangedReaderObserver(this);
306 }
307
308 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
309 resetKVHeap(scanners, scanInfo.getComparator());
310 }
311
312
313
314
315
316 protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
317 final boolean isCompaction = false;
318 boolean usePread = get || scanUsePread;
319 return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
320 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
321 }
322
323
324
325
326
327
328
329
330
331 protected void seekScanners(List<? extends KeyValueScanner> scanners,
332 Cell seekKey, boolean isLazy, boolean isParallelSeek)
333 throws IOException {
334
335
336
337
338 if (isLazy) {
339 for (KeyValueScanner scanner : scanners) {
340 scanner.requestSeek(seekKey, false, true);
341 }
342 } else {
343 if (!isParallelSeek) {
344 long totalScannersSoughtBytes = 0;
345 for (KeyValueScanner scanner : scanners) {
346 if (totalScannersSoughtBytes >= maxRowSize) {
347 throw new RowTooBigException("Max row size allowed: " + maxRowSize
348 + ", but row is bigger than that");
349 }
350 scanner.seek(seekKey);
351 Cell c = scanner.peek();
352 if (c != null) {
353 totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
354 }
355 }
356 } else {
357 parallelSeek(scanners, seekKey);
358 }
359 }
360 }
361
362 protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
363 KVComparator comparator) throws IOException {
364
365 heap = new KeyValueHeap(scanners, comparator);
366 }
367
368
369
370
371
372 protected List<KeyValueScanner> selectScannersFrom(
373 final List<? extends KeyValueScanner> allScanners) {
374 boolean memOnly;
375 boolean filesOnly;
376 if (scan instanceof InternalScan) {
377 InternalScan iscan = (InternalScan)scan;
378 memOnly = iscan.isCheckOnlyMemStore();
379 filesOnly = iscan.isCheckOnlyStoreFiles();
380 } else {
381 memOnly = false;
382 filesOnly = false;
383 }
384
385 List<KeyValueScanner> scanners =
386 new ArrayList<KeyValueScanner>(allScanners.size());
387
388
389
390 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
391 Long.MIN_VALUE;
392
393
394 for (KeyValueScanner kvs : allScanners) {
395 boolean isFile = kvs.isFileScanner();
396 if ((!isFile && filesOnly) || (isFile && memOnly)) {
397 continue;
398 }
399
400 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
401 scanners.add(kvs);
402 }
403 }
404 return scanners;
405 }
406
407 @Override
408 public Cell peek() {
409 lock.lock();
410 try {
411 if (this.heap == null) {
412 return this.lastTop;
413 }
414 return this.heap.peek();
415 } finally {
416 lock.unlock();
417 }
418 }
419
420 @Override
421 public KeyValue next() {
422
423 throw new RuntimeException("Never call StoreScanner.next()");
424 }
425
426 @Override
427 public void close() {
428 lock.lock();
429 try {
430 if (this.closing) return;
431 this.closing = true;
432
433 if (this.store != null)
434 this.store.deleteChangedReaderObserver(this);
435 if (this.heap != null)
436 this.heap.close();
437 this.heap = null;
438 this.lastTop = null;
439 } finally {
440 lock.unlock();
441 }
442 }
443
444 @Override
445 public boolean seek(Cell key) throws IOException {
446 lock.lock();
447 try {
448
449 checkReseek();
450 return this.heap.seek(key);
451 } finally {
452 lock.unlock();
453 }
454 }
455
456 @Override
457 public boolean next(List<Cell> outResult) throws IOException {
458 return next(outResult, NoLimitScannerContext.getInstance());
459 }
460
461
462
463
464
465
466
467 @Override
468 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
469 lock.lock();
470
471 try {
472 if (scannerContext == null) {
473 throw new IllegalArgumentException("Scanner context cannot be null");
474 }
475 if (checkReseek()) {
476 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
477 }
478
479
480
481 if (this.heap == null) {
482 close();
483 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
484 }
485
486 Cell cell = this.heap.peek();
487 if (cell == null) {
488 close();
489 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
490 }
491
492
493
494 byte[] row = cell.getRowArray();
495 int offset = cell.getRowOffset();
496 short length = cell.getRowLength();
497
498
499
500
501 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null) {
502 this.countPerRow = 0;
503 matcher.setRow(row, offset, length);
504 }
505
506
507 if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
508
509
510 KeyValue.KVComparator comparator =
511 store != null ? store.getComparator() : null;
512
513 int count = 0;
514 long totalBytesRead = 0;
515
516 LOOP: do {
517
518 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
519 scannerContext.updateTimeProgress();
520 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
521 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
522 }
523 }
524
525 if (prevCell != cell) ++kvsScanned;
526 checkScanOrder(prevCell, cell, comparator);
527 prevCell = cell;
528
529 ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
530 qcode = optimize(qcode, cell);
531 switch(qcode) {
532 case INCLUDE:
533 case INCLUDE_AND_SEEK_NEXT_ROW:
534 case INCLUDE_AND_SEEK_NEXT_COL:
535
536 Filter f = matcher.getFilter();
537 if (f != null) {
538
539 cell = f.transformCell(cell);
540 }
541
542 this.countPerRow++;
543 if (storeLimit > -1 &&
544 this.countPerRow > (storeLimit + storeOffset)) {
545
546 if (!matcher.moreRowsMayExistAfter(cell)) {
547 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
548 }
549
550
551
552 matcher.row = null;
553 seekToNextRow(cell);
554 break LOOP;
555 }
556
557
558
559 if (this.countPerRow > storeOffset) {
560 outResult.add(cell);
561
562
563 count++;
564 totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
565
566
567 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
568 scannerContext.incrementBatchProgress(1);
569
570 if (totalBytesRead > maxRowSize) {
571 throw new RowTooBigException("Max row size allowed: " + maxRowSize
572 + ", but the row is bigger than that.");
573 }
574 }
575
576 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
577 if (!matcher.moreRowsMayExistAfter(cell)) {
578 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
579 }
580
581
582
583 matcher.row = null;
584 seekToNextRow(cell);
585 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
586 seekAsDirection(matcher.getKeyForNextColumn(cell));
587 } else {
588 this.heap.next();
589 }
590
591 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
592 break LOOP;
593 }
594 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
595 break LOOP;
596 }
597 continue;
598
599 case DONE:
600
601
602
603 matcher.row = null;
604 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
605
606 case DONE_SCAN:
607 close();
608 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
609
610 case SEEK_NEXT_ROW:
611
612
613 if (!matcher.moreRowsMayExistAfter(cell)) {
614 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
615 }
616
617
618
619 matcher.row = null;
620 seekToNextRow(cell);
621 break;
622
623 case SEEK_NEXT_COL:
624 seekAsDirection(matcher.getKeyForNextColumn(cell));
625 break;
626
627 case SKIP:
628 this.heap.next();
629 break;
630
631 case SEEK_NEXT_USING_HINT:
632
633 Cell nextKV = matcher.getNextKeyHint(cell);
634 if (nextKV != null) {
635 seekAsDirection(nextKV);
636 } else {
637 heap.next();
638 }
639 break;
640
641 default:
642 throw new RuntimeException("UNEXPECTED");
643 }
644 } while((cell = this.heap.peek()) != null);
645
646 if (count > 0) {
647 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
648 }
649
650
651 close();
652 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
653 } finally {
654 lock.unlock();
655 }
656 }
657
658
659
660
661
662 private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
663 switch(qcode) {
664 case INCLUDE_AND_SEEK_NEXT_COL:
665 case SEEK_NEXT_COL:
666 {
667 Cell nextIndexedKey = getNextIndexedKey();
668 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
669 && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
670 return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
671 }
672 break;
673 }
674 case INCLUDE_AND_SEEK_NEXT_ROW:
675 case SEEK_NEXT_ROW:
676 {
677 Cell nextIndexedKey = getNextIndexedKey();
678 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
679 && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
680 return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
681 }
682 break;
683 }
684 default:
685 break;
686 }
687 return qcode;
688 }
689
690
691 @Override
692 public void updateReaders() throws IOException {
693 lock.lock();
694 try {
695 if (this.closing) return;
696
697
698
699
700
701
702 if (this.heap == null) return;
703
704
705 this.lastTop = this.peek();
706
707
708
709
710 this.heap.close();
711 this.heap = null;
712
713
714 } finally {
715 lock.unlock();
716 }
717 }
718
719
720
721
722
723
724 protected boolean checkReseek() throws IOException {
725 if (this.heap == null && this.lastTop != null) {
726 resetScannerStack(this.lastTop);
727 if (this.heap.peek() == null
728 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
729 LOG.debug("Storescanner.peek() is changed where before = "
730 + this.lastTop.toString() + ",and after = " + this.heap.peek());
731 this.lastTop = null;
732 return true;
733 }
734 this.lastTop = null;
735 }
736
737 return false;
738 }
739
740 protected void resetScannerStack(Cell lastTopKey) throws IOException {
741 if (heap != null) {
742 throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
743 }
744
745
746
747
748 List<KeyValueScanner> scanners = getScannersNoCompaction();
749
750
751 seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
752
753
754 resetKVHeap(scanners, store.getComparator());
755
756
757
758
759 Cell kv = heap.peek();
760 if (kv == null) {
761 kv = lastTopKey;
762 }
763 byte[] row = kv.getRowArray();
764 int offset = kv.getRowOffset();
765 short length = kv.getRowLength();
766 if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
767 matcher.rowOffset, matcher.rowLength)) {
768 this.countPerRow = 0;
769 matcher.reset();
770 matcher.setRow(row, offset, length);
771 }
772 }
773
774
775
776
777
778
779
780
781 protected void checkScanOrder(Cell prevKV, Cell kv,
782 KeyValue.KVComparator comparator) throws IOException {
783
784 assert prevKV == null || comparator == null
785 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
786 + " followed by a " + "smaller key " + kv + " in cf " + store;
787 }
788
789 protected boolean seekToNextRow(Cell kv) throws IOException {
790 return reseek(KeyValueUtil.createLastOnRow(kv));
791 }
792
793
794
795
796
797
798
799 protected boolean seekAsDirection(Cell kv)
800 throws IOException {
801 return reseek(kv);
802 }
803
804 @Override
805 public boolean reseek(Cell kv) throws IOException {
806 lock.lock();
807 try {
808
809
810
811 checkReseek();
812 if (explicitColumnQuery && lazySeekEnabledGlobally) {
813 return heap.requestSeek(kv, true, useRowColBloom);
814 }
815 return heap.reseek(kv);
816 } finally {
817 lock.unlock();
818 }
819 }
820
821 @Override
822 public long getSequenceID() {
823 return 0;
824 }
825
826
827
828
829
830
831
832 private void parallelSeek(final List<? extends KeyValueScanner>
833 scanners, final Cell kv) throws IOException {
834 if (scanners.isEmpty()) return;
835 int storeFileScannerCount = scanners.size();
836 CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
837 List<ParallelSeekHandler> handlers =
838 new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
839 for (KeyValueScanner scanner : scanners) {
840 if (scanner instanceof StoreFileScanner) {
841 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
842 this.readPt, latch);
843 executor.submit(seekHandler);
844 handlers.add(seekHandler);
845 } else {
846 scanner.seek(kv);
847 latch.countDown();
848 }
849 }
850
851 try {
852 latch.await();
853 } catch (InterruptedException ie) {
854 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
855 }
856
857 for (ParallelSeekHandler handler : handlers) {
858 if (handler.getErr() != null) {
859 throw new IOException(handler.getErr());
860 }
861 }
862 }
863
864
865
866
867
868 List<KeyValueScanner> getAllScannersForTesting() {
869 List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
870 KeyValueScanner current = heap.getCurrentForTesting();
871 if (current != null)
872 allScanners.add(current);
873 for (KeyValueScanner scanner : heap.getHeap())
874 allScanners.add(scanner);
875 return allScanners;
876 }
877
878 static void enableLazySeekGlobally(boolean enable) {
879 lazySeekEnabledGlobally = enable;
880 }
881
882
883
884
885 public long getEstimatedNumberOfKvsScanned() {
886 return this.kvsScanned;
887 }
888
889 @Override
890 public Cell getNextIndexedKey() {
891 return this.heap.getNextIndexedKey();
892 }
893 }
894