View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.DoNotRetryIOException;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.KeyValue.KVComparator;
38  import org.apache.hadoop.hbase.KeyValueUtil;
39  import org.apache.hadoop.hbase.client.IsolationLevel;
40  import org.apache.hadoop.hbase.client.Scan;
41  import org.apache.hadoop.hbase.executor.ExecutorService;
42  import org.apache.hadoop.hbase.filter.Filter;
43  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46  
47  /**
48   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
49   * into List<KeyValue> for a single row.
50   */
51  @InterfaceAudience.Private
52  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
53      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
54    static final Log LOG = LogFactory.getLog(StoreScanner.class);
55    protected Store store;
56    protected ScanQueryMatcher matcher;
57    protected KeyValueHeap heap;
58    protected boolean cacheBlocks;
59  
60    protected int countPerRow = 0;
61    protected int storeLimit = -1;
62    protected int storeOffset = 0;
63  
64    // Used to indicate that the scanner has closed (see HBASE-1107)
65    // Doesnt need to be volatile because it's always accessed via synchronized methods
66    protected boolean closing = false;
67    protected final boolean isGet;
68    protected final boolean explicitColumnQuery;
69    protected final boolean useRowColBloom;
70    /**
71     * A flag that enables StoreFileScanner parallel-seeking
72     */
73    protected boolean isParallelSeekEnabled = false;
74    protected ExecutorService executor;
75    protected final Scan scan;
76    protected final NavigableSet<byte[]> columns;
77    protected final long oldestUnexpiredTS;
78    protected final int minVersions;
79  
80    /**
81     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
82     * KVs skipped via seeking to next row/column. TODO: estimate them?
83     */
84    private long kvsScanned = 0;
85    private KeyValue prevKV = null;
86  
87    /** We don't ever expect to change this, the constant is just for clarity. */
88    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
89    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
90        "hbase.storescanner.parallel.seek.enable";
91  
92    /** Used during unit testing to ensure that lazy seek does save seek ops */
93    protected static boolean lazySeekEnabledGlobally =
94        LAZY_SEEK_ENABLED_BY_DEFAULT;
95  
96    // if heap == null and lastTop != null, you need to reseek given the key below
97    protected KeyValue lastTop = null;
98  
99    // A flag whether use pread for scan
100   private boolean scanUsePread = false;
101   protected ReentrantLock lock = new ReentrantLock();
102   
103   private final long readPt;
104 
105   // used by the injection framework to test race between StoreScanner construction and compaction
106   enum StoreScannerCompactionRace {
107     BEFORE_SEEK,
108     AFTER_SEEK,
109     COMPACT_COMPLETE
110   }
111   
112   /** An internal constructor. */
113   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
114       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
115     this.readPt = readPt;
116     this.store = store;
117     this.cacheBlocks = cacheBlocks;
118     isGet = scan.isGetScan();
119     int numCol = columns == null ? 0 : columns.size();
120     explicitColumnQuery = numCol > 0;
121     this.scan = scan;
122     this.columns = columns;
123     oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
124     this.minVersions = minVersions;
125 
126     // We look up row-column Bloom filters for multi-column queries as part of
127     // the seek operation. However, we also look the row-column Bloom filter
128     // for multi-row (non-"get") scans because this is not done in
129     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
130     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
131     this.scanUsePread = scan.isSmall();
132     // The parallel-seeking is on :
133     // 1) the config value is *true*
134     // 2) store has more than one store file
135     if (store != null && ((HStore)store).getHRegion() != null
136         && store.getStorefilesCount() > 1) {
137       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
138       if (rsService == null || !rsService.getConfiguration().getBoolean(
139             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
140       isParallelSeekEnabled = true;
141       executor = rsService.getExecutorService();
142     }
143   }
144 
145   /**
146    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
147    * are not in a compaction.
148    *
149    * @param store who we scan
150    * @param scan the spec
151    * @param columns which columns we are scanning
152    * @throws IOException
153    */
154   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
155       long readPt)
156                               throws IOException {
157     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
158         scanInfo.getMinVersions(), readPt);
159     if (columns != null && scan.isRaw()) {
160       throw new DoNotRetryIOException(
161           "Cannot specify any column for a raw scan");
162     }
163     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
164         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
165         oldestUnexpiredTS);
166 
167     this.store.addChangedReaderObserver(this);
168 
169     // Pass columns to try to filter out unnecessary StoreFiles.
170     List<KeyValueScanner> scanners = getScannersNoCompaction();
171 
172     // Seek all scanners to the start of the Row (or if the exact matching row
173     // key does not exist, then to the start of the next matching Row).
174     // Always check bloom filter to optimize the top row seek for delete
175     // family marker.
176     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
177         && lazySeekEnabledGlobally, isParallelSeekEnabled);
178 
179     // set storeLimit
180     this.storeLimit = scan.getMaxResultsPerColumnFamily();
181 
182     // set rowOffset
183     this.storeOffset = scan.getRowOffsetPerColumnFamily();
184 
185     // Combine all seeked scanners with a heap
186     resetKVHeap(scanners, store.getComparator());
187   }
188 
189   /**
190    * Used for compactions.<p>
191    *
192    * Opens a scanner across specified StoreFiles.
193    * @param store who we scan
194    * @param scan the spec
195    * @param scanners ancillary scanners
196    * @param smallestReadPoint the readPoint that we should use for tracking
197    *          versions
198    */
199   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
200       List<? extends KeyValueScanner> scanners, ScanType scanType,
201       long smallestReadPoint, long earliestPutTs) throws IOException {
202     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
203   }
204 
205   /**
206    * Used for compactions that drop deletes from a limited range of rows.<p>
207    *
208    * Opens a scanner across specified StoreFiles.
209    * @param store who we scan
210    * @param scan the spec
211    * @param scanners ancillary scanners
212    * @param smallestReadPoint the readPoint that we should use for tracking versions
213    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
214    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
215    */
216   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
217       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
218       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
219     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
220         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
221   }
222 
223   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
224       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
225       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
226     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
227         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
228     if (dropDeletesFromRow == null) {
229       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
230           smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
231     } else {
232       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint,
233           earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow);
234     }
235 
236     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
237     scanners = selectScannersFrom(scanners);
238 
239     // Seek all scanners to the initial key
240     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
241 
242     // Combine all seeked scanners with a heap
243     resetKVHeap(scanners, store.getComparator());
244   }
245 
246   /** Constructor for testing. */
247   StoreScanner(final Scan scan, ScanInfo scanInfo,
248       ScanType scanType, final NavigableSet<byte[]> columns,
249       final List<KeyValueScanner> scanners) throws IOException {
250     this(scan, scanInfo, scanType, columns, scanners,
251         HConstants.LATEST_TIMESTAMP,
252         // 0 is passed as readpoint because the test bypasses Store
253         0);
254   }
255 
256   // Constructor for testing.
257   StoreScanner(final Scan scan, ScanInfo scanInfo,
258     ScanType scanType, final NavigableSet<byte[]> columns,
259     final List<KeyValueScanner> scanners, long earliestPutTs)
260         throws IOException {
261     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
262       // 0 is passed as readpoint because the test bypasses Store
263       0);
264   }
265   
266   private StoreScanner(final Scan scan, ScanInfo scanInfo,
267       ScanType scanType, final NavigableSet<byte[]> columns,
268       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
269           throws IOException {
270     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
271         scanInfo.getMinVersions(), readPt);
272     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
273         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
274 
275     // In unit tests, the store could be null
276     if (this.store != null) {
277       this.store.addChangedReaderObserver(this);
278     }
279     // Seek all scanners to the initial key
280     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
281     resetKVHeap(scanners, scanInfo.getComparator());
282   }
283 
284   /**
285    * Get a filtered list of scanners. Assumes we are not in a compaction.
286    * @return list of scanners to seek
287    */
288   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
289     final boolean isCompaction = false;
290     boolean usePread = isGet || scanUsePread;
291     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
292         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
293   }
294 
295   /**
296    * Seek the specified scanners with the given key
297    * @param scanners
298    * @param seekKey
299    * @param isLazy true if using lazy seek
300    * @param isParallelSeek true if using parallel seek
301    * @throws IOException
302    */
303   protected void seekScanners(List<? extends KeyValueScanner> scanners,
304       KeyValue seekKey, boolean isLazy, boolean isParallelSeek)
305       throws IOException {
306     // Seek all scanners to the start of the Row (or if the exact matching row
307     // key does not exist, then to the start of the next matching Row).
308     // Always check bloom filter to optimize the top row seek for delete
309     // family marker.
310     if (isLazy) {
311       for (KeyValueScanner scanner : scanners) {
312         scanner.requestSeek(seekKey, false, true);
313       }
314     } else {
315       if (!isParallelSeek) {
316         for (KeyValueScanner scanner : scanners) {
317           scanner.seek(seekKey);
318         }
319       } else {
320         parallelSeek(scanners, seekKey);
321       }
322     }
323   }
324 
325   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
326       KVComparator comparator) throws IOException {
327     // Combine all seeked scanners with a heap
328     heap = new KeyValueHeap(scanners, comparator);
329   }
330 
331   /**
332    * Filters the given list of scanners using Bloom filter, time range, and
333    * TTL.
334    */
335   protected List<KeyValueScanner> selectScannersFrom(
336       final List<? extends KeyValueScanner> allScanners) {
337     boolean memOnly;
338     boolean filesOnly;
339     if (scan instanceof InternalScan) {
340       InternalScan iscan = (InternalScan)scan;
341       memOnly = iscan.isCheckOnlyMemStore();
342       filesOnly = iscan.isCheckOnlyStoreFiles();
343     } else {
344       memOnly = false;
345       filesOnly = false;
346     }
347 
348     List<KeyValueScanner> scanners =
349         new ArrayList<KeyValueScanner>(allScanners.size());
350 
351     // We can only exclude store files based on TTL if minVersions is set to 0.
352     // Otherwise, we might have to return KVs that have technically expired.
353     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
354         Long.MIN_VALUE;
355 
356     // include only those scan files which pass all filters
357     for (KeyValueScanner kvs : allScanners) {
358       boolean isFile = kvs.isFileScanner();
359       if ((!isFile && filesOnly) || (isFile && memOnly)) {
360         continue;
361       }
362 
363       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
364         scanners.add(kvs);
365       }
366     }
367     return scanners;
368   }
369 
370   @Override
371   public KeyValue peek() {
372     lock.lock();
373     try {
374     if (this.heap == null) {
375       return this.lastTop;
376     }
377     return this.heap.peek();
378     } finally {
379       lock.unlock();
380     }
381   }
382 
383   @Override
384   public KeyValue next() {
385     // throw runtime exception perhaps?
386     throw new RuntimeException("Never call StoreScanner.next()");
387   }
388 
389   @Override
390   public void close() {
391     lock.lock();
392     try {
393     if (this.closing) return;
394     this.closing = true;
395     // under test, we dont have a this.store
396     if (this.store != null)
397       this.store.deleteChangedReaderObserver(this);
398     if (this.heap != null)
399       this.heap.close();
400     this.heap = null; // CLOSED!
401     this.lastTop = null; // If both are null, we are closed.
402     } finally {
403       lock.unlock();
404     }
405   }
406 
407   @Override
408   public boolean seek(KeyValue key) throws IOException {
409     lock.lock();
410     try {
411     // reset matcher state, in case that underlying store changed
412     checkReseek();
413     return this.heap.seek(key);
414     } finally {
415       lock.unlock();
416     }
417   }
418 
419   /**
420    * Get the next row of values from this Store.
421    * @param outResult
422    * @param limit
423    * @return true if there are more rows, false if scanner is done
424    */
425   @Override
426   public boolean next(List<Cell> outResult, int limit) throws IOException {
427     lock.lock();
428     try {
429     if (checkReseek()) {
430       return true;
431     }
432 
433     // if the heap was left null, then the scanners had previously run out anyways, close and
434     // return.
435     if (this.heap == null) {
436       close();
437       return false;
438     }
439 
440     KeyValue peeked = this.heap.peek();
441     if (peeked == null) {
442       close();
443       return false;
444     }
445 
446     // only call setRow if the row changes; avoids confusing the query matcher
447     // if scanning intra-row
448     byte[] row = peeked.getRowArray();
449     int offset = peeked.getRowOffset();
450     short length = peeked.getRowLength();
451     if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
452         matcher.rowOffset, matcher.rowLength)) {
453       this.countPerRow = 0;
454       matcher.setRow(row, offset, length);
455     }
456 
457     KeyValue kv;
458 
459     // Only do a sanity-check if store and comparator are available.
460     KeyValue.KVComparator comparator =
461         store != null ? store.getComparator() : null;
462 
463     int count = 0;
464     LOOP: while((kv = this.heap.peek()) != null) {
465       if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
466       checkScanOrder(prevKV, kv, comparator);
467       prevKV = kv;
468 
469       ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
470       switch(qcode) {
471         case INCLUDE:
472         case INCLUDE_AND_SEEK_NEXT_ROW:
473         case INCLUDE_AND_SEEK_NEXT_COL:
474 
475           Filter f = matcher.getFilter();
476           if (f != null) {
477             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
478             kv = KeyValueUtil.ensureKeyValue(f.transformCell(kv));
479           }
480 
481           this.countPerRow++;
482           if (storeLimit > -1 &&
483               this.countPerRow > (storeLimit + storeOffset)) {
484             // do what SEEK_NEXT_ROW does.
485             if (!matcher.moreRowsMayExistAfter(kv)) {
486               return false;
487             }
488             seekToNextRow(kv);
489             break LOOP;
490           }
491 
492           // add to results only if we have skipped #storeOffset kvs
493           // also update metric accordingly
494           if (this.countPerRow > storeOffset) {
495             outResult.add(kv);
496             count++;
497           }
498 
499           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
500             if (!matcher.moreRowsMayExistAfter(kv)) {
501               return false;
502             }
503             seekToNextRow(kv);
504           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
505             seekAsDirection(matcher.getKeyForNextColumn(kv));
506           } else {
507             this.heap.next();
508           }
509 
510           if (limit > 0 && (count == limit)) {
511             break LOOP;
512           }
513           continue;
514 
515         case DONE:
516           return true;
517 
518         case DONE_SCAN:
519           close();
520           return false;
521 
522         case SEEK_NEXT_ROW:
523           // This is just a relatively simple end of scan fix, to short-cut end
524           // us if there is an endKey in the scan.
525           if (!matcher.moreRowsMayExistAfter(kv)) {
526             return false;
527           }
528 
529           seekToNextRow(kv);
530           break;
531 
532         case SEEK_NEXT_COL:
533           seekAsDirection(matcher.getKeyForNextColumn(kv));
534           break;
535 
536         case SKIP:
537           this.heap.next();
538           break;
539 
540         case SEEK_NEXT_USING_HINT:
541           // TODO convert resee to Cell?
542           KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv));
543           if (nextKV != null) {
544             seekAsDirection(nextKV);
545           } else {
546             heap.next();
547           }
548           break;
549 
550         default:
551           throw new RuntimeException("UNEXPECTED");
552       }
553     }
554 
555     if (count > 0) {
556       return true;
557     }
558 
559     // No more keys
560     close();
561     return false;
562     } finally {
563       lock.unlock();
564     }
565   }
566 
567   @Override
568   public boolean next(List<Cell> outResult) throws IOException {
569     return next(outResult, -1);
570   }
571 
572   // Implementation of ChangedReadersObserver
573   @Override
574   public void updateReaders() throws IOException {
575     lock.lock();
576     try {
577     if (this.closing) return;
578 
579     // All public synchronized API calls will call 'checkReseek' which will cause
580     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
581     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
582     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
583     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
584     if (this.heap == null) return;
585 
586     // this could be null.
587     this.lastTop = this.peek();
588 
589     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
590 
591     // close scanners to old obsolete Store files
592     this.heap.close(); // bubble thru and close all scanners.
593     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
594 
595     // Let the next() call handle re-creating and seeking
596     } finally {
597       lock.unlock();
598     }
599   }
600 
601   /**
602    * @return true if top of heap has changed (and KeyValueHeap has to try the
603    *         next KV)
604    * @throws IOException
605    */
606   protected boolean checkReseek() throws IOException {
607     if (this.heap == null && this.lastTop != null) {
608       resetScannerStack(this.lastTop);
609       if (this.heap.peek() == null
610           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
611         LOG.debug("Storescanner.peek() is changed where before = "
612             + this.lastTop.toString() + ",and after = " + this.heap.peek());
613         this.lastTop = null;
614         return true;
615       }
616       this.lastTop = null; // gone!
617     }
618     // else dont need to reseek
619     return false;
620   }
621 
622   protected void resetScannerStack(KeyValue lastTopKey) throws IOException {
623     if (heap != null) {
624       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
625     }
626 
627     /* When we have the scan object, should we not pass it to getScanners()
628      * to get a limited set of scanners? We did so in the constructor and we
629      * could have done it now by storing the scan object from the constructor */
630     List<KeyValueScanner> scanners = getScannersNoCompaction();
631 
632     // Seek all scanners to the initial key
633     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
634 
635     // Combine all seeked scanners with a heap
636     resetKVHeap(scanners, store.getComparator());
637 
638     // Reset the state of the Query Matcher and set to top row.
639     // Only reset and call setRow if the row changes; avoids confusing the
640     // query matcher if scanning intra-row.
641     KeyValue kv = heap.peek();
642     if (kv == null) {
643       kv = lastTopKey;
644     }
645     byte[] row = kv.getRowArray();
646     int offset = kv.getRowOffset();
647     short length = kv.getRowLength();
648     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
649         matcher.rowOffset, matcher.rowLength)) {
650       this.countPerRow = 0;
651       matcher.reset();
652       matcher.setRow(row, offset, length);
653     }
654   }
655 
656   /**
657    * Check whether scan as expected order
658    * @param prevKV
659    * @param kv
660    * @param comparator
661    * @throws IOException
662    */
663   protected void checkScanOrder(KeyValue prevKV, KeyValue kv,
664       KeyValue.KVComparator comparator) throws IOException {
665     // Check that the heap gives us KVs in an increasing order.
666     assert prevKV == null || comparator == null
667         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
668         + " followed by a " + "smaller key " + kv + " in cf " + store;
669   }
670 
671   protected boolean seekToNextRow(KeyValue kv) throws IOException {
672     return reseek(matcher.getKeyForNextRow(kv));
673   }
674 
675   /**
676    * Do a reseek in a normal StoreScanner(scan forward)
677    * @param kv
678    * @return true if scanner has values left, false if end of scanner
679    * @throws IOException
680    */
681   protected boolean seekAsDirection(KeyValue kv)
682       throws IOException {
683     return reseek(kv);
684   }
685 
686   @Override
687   public boolean reseek(KeyValue kv) throws IOException {
688     lock.lock();
689     try {
690     //Heap will not be null, if this is called from next() which.
691     //If called from RegionScanner.reseek(...) make sure the scanner
692     //stack is reset if needed.
693     checkReseek();
694     if (explicitColumnQuery && lazySeekEnabledGlobally) {
695       return heap.requestSeek(kv, true, useRowColBloom);
696     }
697     return heap.reseek(kv);
698     } finally {
699       lock.unlock();
700     }
701   }
702 
703   @Override
704   public long getSequenceID() {
705     return 0;
706   }
707 
708   /**
709    * Seek storefiles in parallel to optimize IO latency as much as possible
710    * @param scanners the list {@link KeyValueScanner}s to be read from
711    * @param kv the KeyValue on which the operation is being requested
712    * @throws IOException
713    */
714   private void parallelSeek(final List<? extends KeyValueScanner>
715       scanners, final KeyValue kv) throws IOException {
716     if (scanners.isEmpty()) return;
717     int storeFileScannerCount = scanners.size();
718     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
719     List<ParallelSeekHandler> handlers = 
720         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
721     for (KeyValueScanner scanner : scanners) {
722       if (scanner instanceof StoreFileScanner) {
723         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
724           this.readPt, latch);
725         executor.submit(seekHandler);
726         handlers.add(seekHandler);
727       } else {
728         scanner.seek(kv);
729         latch.countDown();
730       }
731     }
732 
733     try {
734       latch.await();
735     } catch (InterruptedException ie) {
736       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
737     }
738 
739     for (ParallelSeekHandler handler : handlers) {
740       if (handler.getErr() != null) {
741         throw new IOException(handler.getErr());
742       }
743     }
744   }
745 
746   /**
747    * Used in testing.
748    * @return all scanners in no particular order
749    */
750   List<KeyValueScanner> getAllScannersForTesting() {
751     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
752     KeyValueScanner current = heap.getCurrentForTesting();
753     if (current != null)
754       allScanners.add(current);
755     for (KeyValueScanner scanner : heap.getHeap())
756       allScanners.add(scanner);
757     return allScanners;
758   }
759 
760   static void enableLazySeekGlobally(boolean enable) {
761     lazySeekEnabledGlobally = enable;
762   }
763 
764   /**
765    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
766    */
767   public long getEstimatedNumberOfKvsScanned() {
768     return this.kvsScanned;
769   }
770 }
771