View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.KeyValue.KVComparator;
39  import org.apache.hadoop.hbase.KeyValueUtil;
40  import org.apache.hadoop.hbase.client.IsolationLevel;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.executor.ExecutorService;
43  import org.apache.hadoop.hbase.filter.Filter;
44  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  
48  /**
49   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
50   * into List<KeyValue> for a single row.
51   */
52  @InterfaceAudience.Private
53  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
54      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
55    static final Log LOG = LogFactory.getLog(StoreScanner.class);
56    protected Store store;
57    protected ScanQueryMatcher matcher;
58    protected KeyValueHeap heap;
59    protected boolean cacheBlocks;
60  
61    protected int countPerRow = 0;
62    protected int storeLimit = -1;
63    protected int storeOffset = 0;
64  
65    // Used to indicate that the scanner has closed (see HBASE-1107)
66    // Doesnt need to be volatile because it's always accessed via synchronized methods
67    protected boolean closing = false;
68    protected final boolean isGet;
69    protected final boolean explicitColumnQuery;
70    protected final boolean useRowColBloom;
71    /**
72     * A flag that enables StoreFileScanner parallel-seeking
73     */
74    protected boolean isParallelSeekEnabled = false;
75    protected ExecutorService executor;
76    protected final Scan scan;
77    protected final NavigableSet<byte[]> columns;
78    protected final long oldestUnexpiredTS;
79    protected final int minVersions;
80    protected final long maxRowSize;
81  
82    /**
83     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
84     * KVs skipped via seeking to next row/column. TODO: estimate them?
85     */
86    private long kvsScanned = 0;
87    private Cell prevCell = null;
88  
89    /** We don't ever expect to change this, the constant is just for clarity. */
90    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
91    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
92        "hbase.storescanner.parallel.seek.enable";
93  
94    /** Used during unit testing to ensure that lazy seek does save seek ops */
95    protected static boolean lazySeekEnabledGlobally =
96        LAZY_SEEK_ENABLED_BY_DEFAULT;
97  
98    // if heap == null and lastTop != null, you need to reseek given the key below
99    protected Cell lastTop = null;
100 
101   // A flag whether use pread for scan
102   private boolean scanUsePread = false;
103   protected ReentrantLock lock = new ReentrantLock();
104   
105   private final long readPt;
106 
107   // used by the injection framework to test race between StoreScanner construction and compaction
108   enum StoreScannerCompactionRace {
109     BEFORE_SEEK,
110     AFTER_SEEK,
111     COMPACT_COMPLETE
112   }
113   
114   /** An internal constructor. */
115   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
116       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
117     this.readPt = readPt;
118     this.store = store;
119     this.cacheBlocks = cacheBlocks;
120     isGet = scan.isGetScan();
121     int numCol = columns == null ? 0 : columns.size();
122     explicitColumnQuery = numCol > 0;
123     this.scan = scan;
124     this.columns = columns;
125     oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
126     this.minVersions = minVersions;
127 
128     if (store != null && ((HStore)store).getHRegion() != null
129         && ((HStore)store).getHRegion().getBaseConf() != null) {
130       this.maxRowSize = ((HStore) store).getHRegion().getBaseConf().getLong(
131         HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
132     } else {
133       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
134     }
135 
136     // We look up row-column Bloom filters for multi-column queries as part of
137     // the seek operation. However, we also look the row-column Bloom filter
138     // for multi-row (non-"get") scans because this is not done in
139     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
140     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
141     this.scanUsePread = scan.isSmall();
142     // The parallel-seeking is on :
143     // 1) the config value is *true*
144     // 2) store has more than one store file
145     if (store != null && ((HStore)store).getHRegion() != null
146         && store.getStorefilesCount() > 1) {
147       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
148       if (rsService == null || !rsService.getConfiguration().getBoolean(
149             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
150       isParallelSeekEnabled = true;
151       executor = rsService.getExecutorService();
152     }
153   }
154 
155   /**
156    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
157    * are not in a compaction.
158    *
159    * @param store who we scan
160    * @param scan the spec
161    * @param columns which columns we are scanning
162    * @throws IOException
163    */
164   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
165       long readPt)
166                               throws IOException {
167     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
168         scanInfo.getMinVersions(), readPt);
169     if (columns != null && scan.isRaw()) {
170       throw new DoNotRetryIOException(
171           "Cannot specify any column for a raw scan");
172     }
173     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
174         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
175         oldestUnexpiredTS, store.getCoprocessorHost());
176 
177     this.store.addChangedReaderObserver(this);
178 
179     // Pass columns to try to filter out unnecessary StoreFiles.
180     List<KeyValueScanner> scanners = getScannersNoCompaction();
181 
182     // Seek all scanners to the start of the Row (or if the exact matching row
183     // key does not exist, then to the start of the next matching Row).
184     // Always check bloom filter to optimize the top row seek for delete
185     // family marker.
186     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
187         && lazySeekEnabledGlobally, isParallelSeekEnabled);
188 
189     // set storeLimit
190     this.storeLimit = scan.getMaxResultsPerColumnFamily();
191 
192     // set rowOffset
193     this.storeOffset = scan.getRowOffsetPerColumnFamily();
194 
195     // Combine all seeked scanners with a heap
196     resetKVHeap(scanners, store.getComparator());
197   }
198 
199   /**
200    * Used for compactions.<p>
201    *
202    * Opens a scanner across specified StoreFiles.
203    * @param store who we scan
204    * @param scan the spec
205    * @param scanners ancillary scanners
206    * @param smallestReadPoint the readPoint that we should use for tracking
207    *          versions
208    */
209   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
210       List<? extends KeyValueScanner> scanners, ScanType scanType,
211       long smallestReadPoint, long earliestPutTs) throws IOException {
212     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
213   }
214 
215   /**
216    * Used for compactions that drop deletes from a limited range of rows.<p>
217    *
218    * Opens a scanner across specified StoreFiles.
219    * @param store who we scan
220    * @param scan the spec
221    * @param scanners ancillary scanners
222    * @param smallestReadPoint the readPoint that we should use for tracking versions
223    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
224    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
225    */
226   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
227       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
228       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
229     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
230         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
231   }
232 
233   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
234       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
235       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
236     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
237         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
238     if (dropDeletesFromRow == null) {
239       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
240           earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
241     } else {
242       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
243           oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
244     }
245 
246     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
247     scanners = selectScannersFrom(scanners);
248 
249     // Seek all scanners to the initial key
250     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
251 
252     // Combine all seeked scanners with a heap
253     resetKVHeap(scanners, store.getComparator());
254   }
255 
256   /** Constructor for testing. */
257   StoreScanner(final Scan scan, ScanInfo scanInfo,
258       ScanType scanType, final NavigableSet<byte[]> columns,
259       final List<KeyValueScanner> scanners) throws IOException {
260     this(scan, scanInfo, scanType, columns, scanners,
261         HConstants.LATEST_TIMESTAMP,
262         // 0 is passed as readpoint because the test bypasses Store
263         0);
264   }
265 
266   // Constructor for testing.
267   StoreScanner(final Scan scan, ScanInfo scanInfo,
268     ScanType scanType, final NavigableSet<byte[]> columns,
269     final List<KeyValueScanner> scanners, long earliestPutTs)
270         throws IOException {
271     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
272       // 0 is passed as readpoint because the test bypasses Store
273       0);
274   }
275   
276   private StoreScanner(final Scan scan, ScanInfo scanInfo,
277       ScanType scanType, final NavigableSet<byte[]> columns,
278       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
279           throws IOException {
280     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
281         scanInfo.getMinVersions(), readPt);
282     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
283         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
284 
285     // In unit tests, the store could be null
286     if (this.store != null) {
287       this.store.addChangedReaderObserver(this);
288     }
289     // Seek all scanners to the initial key
290     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
291     resetKVHeap(scanners, scanInfo.getComparator());
292   }
293 
294   /**
295    * Get a filtered list of scanners. Assumes we are not in a compaction.
296    * @return list of scanners to seek
297    */
298   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
299     final boolean isCompaction = false;
300     boolean usePread = isGet || scanUsePread;
301     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
302         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
303   }
304 
305   /**
306    * Seek the specified scanners with the given key
307    * @param scanners
308    * @param seekKey
309    * @param isLazy true if using lazy seek
310    * @param isParallelSeek true if using parallel seek
311    * @throws IOException
312    */
313   protected void seekScanners(List<? extends KeyValueScanner> scanners,
314       Cell seekKey, boolean isLazy, boolean isParallelSeek)
315       throws IOException {
316     // Seek all scanners to the start of the Row (or if the exact matching row
317     // key does not exist, then to the start of the next matching Row).
318     // Always check bloom filter to optimize the top row seek for delete
319     // family marker.
320     if (isLazy) {
321       for (KeyValueScanner scanner : scanners) {
322         scanner.requestSeek(seekKey, false, true);
323       }
324     } else {
325       if (!isParallelSeek) {
326         long totalScannersSoughtBytes = 0;
327         for (KeyValueScanner scanner : scanners) {
328           if (totalScannersSoughtBytes >= maxRowSize) {
329             throw new RowTooBigException("Max row size allowed: " + maxRowSize
330               + ", but row is bigger than that");
331           }
332           scanner.seek(seekKey);
333           Cell c = scanner.peek();
334           if (c != null ) {
335             totalScannersSoughtBytes += CellUtil.estimatedSizeOf(c);
336           }
337         }
338       } else {
339         parallelSeek(scanners, seekKey);
340       }
341     }
342   }
343 
344   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
345       KVComparator comparator) throws IOException {
346     // Combine all seeked scanners with a heap
347     heap = new KeyValueHeap(scanners, comparator);
348   }
349 
350   /**
351    * Filters the given list of scanners using Bloom filter, time range, and
352    * TTL.
353    */
354   protected List<KeyValueScanner> selectScannersFrom(
355       final List<? extends KeyValueScanner> allScanners) {
356     boolean memOnly;
357     boolean filesOnly;
358     if (scan instanceof InternalScan) {
359       InternalScan iscan = (InternalScan)scan;
360       memOnly = iscan.isCheckOnlyMemStore();
361       filesOnly = iscan.isCheckOnlyStoreFiles();
362     } else {
363       memOnly = false;
364       filesOnly = false;
365     }
366 
367     List<KeyValueScanner> scanners =
368         new ArrayList<KeyValueScanner>(allScanners.size());
369 
370     // We can only exclude store files based on TTL if minVersions is set to 0.
371     // Otherwise, we might have to return KVs that have technically expired.
372     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
373         Long.MIN_VALUE;
374 
375     // include only those scan files which pass all filters
376     for (KeyValueScanner kvs : allScanners) {
377       boolean isFile = kvs.isFileScanner();
378       if ((!isFile && filesOnly) || (isFile && memOnly)) {
379         continue;
380       }
381 
382       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
383         scanners.add(kvs);
384       }
385     }
386     return scanners;
387   }
388 
389   @Override
390   public Cell peek() {
391     lock.lock();
392     try {
393     if (this.heap == null) {
394       return this.lastTop;
395     }
396     return this.heap.peek();
397     } finally {
398       lock.unlock();
399     }
400   }
401 
402   @Override
403   public KeyValue next() {
404     // throw runtime exception perhaps?
405     throw new RuntimeException("Never call StoreScanner.next()");
406   }
407 
408   @Override
409   public void close() {
410     lock.lock();
411     try {
412     if (this.closing) return;
413     this.closing = true;
414     // under test, we dont have a this.store
415     if (this.store != null)
416       this.store.deleteChangedReaderObserver(this);
417     if (this.heap != null)
418       this.heap.close();
419     this.heap = null; // CLOSED!
420     this.lastTop = null; // If both are null, we are closed.
421     } finally {
422       lock.unlock();
423     }
424   }
425 
426   @Override
427   public boolean seek(Cell key) throws IOException {
428     lock.lock();
429     try {
430     // reset matcher state, in case that underlying store changed
431     checkReseek();
432     return this.heap.seek(key);
433     } finally {
434       lock.unlock();
435     }
436   }
437 
438   /**
439    * Get the next row of values from this Store.
440    * @param outResult
441    * @param limit
442    * @return true if there are more rows, false if scanner is done
443    */
444   @Override
445   public boolean next(List<Cell> outResult, int limit) throws IOException {
446     lock.lock();
447     try {
448     if (checkReseek()) {
449       return true;
450     }
451 
452     // if the heap was left null, then the scanners had previously run out anyways, close and
453     // return.
454     if (this.heap == null) {
455       close();
456       return false;
457     }
458 
459     Cell peeked = this.heap.peek();
460     if (peeked == null) {
461       close();
462       return false;
463     }
464 
465     // only call setRow if the row changes; avoids confusing the query matcher
466     // if scanning intra-row
467     byte[] row = peeked.getRowArray();
468     int offset = peeked.getRowOffset();
469     short length = peeked.getRowLength();
470     if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
471         matcher.rowOffset, matcher.rowLength)) {
472       this.countPerRow = 0;
473       matcher.setRow(row, offset, length);
474     }
475 
476     Cell cell;
477 
478     // Only do a sanity-check if store and comparator are available.
479     KeyValue.KVComparator comparator =
480         store != null ? store.getComparator() : null;
481 
482     int count = 0;
483     long totalBytesRead = 0;
484 
485     LOOP: while((cell = this.heap.peek()) != null) {
486       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
487       checkScanOrder(prevCell, cell, comparator);
488       prevCell = cell;
489 
490       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
491       switch(qcode) {
492         case INCLUDE:
493         case INCLUDE_AND_SEEK_NEXT_ROW:
494         case INCLUDE_AND_SEEK_NEXT_COL:
495 
496           Filter f = matcher.getFilter();
497           if (f != null) {
498             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
499             cell = f.transformCell(cell);
500           }
501 
502           this.countPerRow++;
503           if (storeLimit > -1 &&
504               this.countPerRow > (storeLimit + storeOffset)) {
505             // do what SEEK_NEXT_ROW does.
506             if (!matcher.moreRowsMayExistAfter(cell)) {
507               return false;
508             }
509             seekToNextRow(cell);
510             break LOOP;
511           }
512 
513           // add to results only if we have skipped #storeOffset kvs
514           // also update metric accordingly
515           if (this.countPerRow > storeOffset) {
516             outResult.add(cell);
517             count++;
518             totalBytesRead += CellUtil.estimatedSizeOf(cell);
519             if (totalBytesRead > maxRowSize) {
520               throw new RowTooBigException("Max row size allowed: " + maxRowSize
521               + ", but the row is bigger than that.");
522             }
523           }
524 
525           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
526             if (!matcher.moreRowsMayExistAfter(cell)) {
527               return false;
528             }
529             seekToNextRow(cell);
530           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
531             seekAsDirection(matcher.getKeyForNextColumn(cell));
532           } else {
533             this.heap.next();
534           }
535 
536           if (limit > 0 && (count == limit)) {
537             break LOOP;
538           }
539           continue;
540 
541         case DONE:
542           return true;
543 
544         case DONE_SCAN:
545           close();
546           return false;
547 
548         case SEEK_NEXT_ROW:
549           // This is just a relatively simple end of scan fix, to short-cut end
550           // us if there is an endKey in the scan.
551           if (!matcher.moreRowsMayExistAfter(cell)) {
552             return false;
553           }
554 
555           seekToNextRow(cell);
556           break;
557 
558         case SEEK_NEXT_COL:
559           seekAsDirection(matcher.getKeyForNextColumn(cell));
560           break;
561 
562         case SKIP:
563           this.heap.next();
564           break;
565 
566         case SEEK_NEXT_USING_HINT:
567           // TODO convert resee to Cell?
568           Cell nextKV = matcher.getNextKeyHint(cell);
569           if (nextKV != null) {
570             seekAsDirection(nextKV);
571           } else {
572             heap.next();
573           }
574           break;
575 
576         default:
577           throw new RuntimeException("UNEXPECTED");
578       }
579     }
580 
581     if (count > 0) {
582       return true;
583     }
584 
585     // No more keys
586     close();
587     return false;
588     } finally {
589       lock.unlock();
590     }
591   }
592 
593   @Override
594   public boolean next(List<Cell> outResult) throws IOException {
595     return next(outResult, -1);
596   }
597 
598   // Implementation of ChangedReadersObserver
599   @Override
600   public void updateReaders() throws IOException {
601     lock.lock();
602     try {
603     if (this.closing) return;
604 
605     // All public synchronized API calls will call 'checkReseek' which will cause
606     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
607     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
608     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
609     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
610     if (this.heap == null) return;
611 
612     // this could be null.
613     this.lastTop = this.peek();
614 
615     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
616 
617     // close scanners to old obsolete Store files
618     this.heap.close(); // bubble thru and close all scanners.
619     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
620 
621     // Let the next() call handle re-creating and seeking
622     } finally {
623       lock.unlock();
624     }
625   }
626 
627   /**
628    * @return true if top of heap has changed (and KeyValueHeap has to try the
629    *         next KV)
630    * @throws IOException
631    */
632   protected boolean checkReseek() throws IOException {
633     if (this.heap == null && this.lastTop != null) {
634       resetScannerStack(this.lastTop);
635       if (this.heap.peek() == null
636           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
637         LOG.debug("Storescanner.peek() is changed where before = "
638             + this.lastTop.toString() + ",and after = " + this.heap.peek());
639         this.lastTop = null;
640         return true;
641       }
642       this.lastTop = null; // gone!
643     }
644     // else dont need to reseek
645     return false;
646   }
647 
648   protected void resetScannerStack(Cell lastTopKey) throws IOException {
649     if (heap != null) {
650       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
651     }
652 
653     /* When we have the scan object, should we not pass it to getScanners()
654      * to get a limited set of scanners? We did so in the constructor and we
655      * could have done it now by storing the scan object from the constructor */
656     List<KeyValueScanner> scanners = getScannersNoCompaction();
657 
658     // Seek all scanners to the initial key
659     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
660 
661     // Combine all seeked scanners with a heap
662     resetKVHeap(scanners, store.getComparator());
663 
664     // Reset the state of the Query Matcher and set to top row.
665     // Only reset and call setRow if the row changes; avoids confusing the
666     // query matcher if scanning intra-row.
667     Cell kv = heap.peek();
668     if (kv == null) {
669       kv = lastTopKey;
670     }
671     byte[] row = kv.getRowArray();
672     int offset = kv.getRowOffset();
673     short length = kv.getRowLength();
674     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
675         matcher.rowOffset, matcher.rowLength)) {
676       this.countPerRow = 0;
677       matcher.reset();
678       matcher.setRow(row, offset, length);
679     }
680   }
681 
682   /**
683    * Check whether scan as expected order
684    * @param prevKV
685    * @param kv
686    * @param comparator
687    * @throws IOException
688    */
689   protected void checkScanOrder(Cell prevKV, Cell kv,
690       KeyValue.KVComparator comparator) throws IOException {
691     // Check that the heap gives us KVs in an increasing order.
692     assert prevKV == null || comparator == null
693         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
694         + " followed by a " + "smaller key " + kv + " in cf " + store;
695   }
696 
697   protected boolean seekToNextRow(Cell kv) throws IOException {
698     return reseek(KeyValueUtil.createLastOnRow(kv));
699   }
700 
701   /**
702    * Do a reseek in a normal StoreScanner(scan forward)
703    * @param kv
704    * @return true if scanner has values left, false if end of scanner
705    * @throws IOException
706    */
707   protected boolean seekAsDirection(Cell kv)
708       throws IOException {
709     return reseek(kv);
710   }
711 
712   @Override
713   public boolean reseek(Cell kv) throws IOException {
714     lock.lock();
715     try {
716     //Heap will not be null, if this is called from next() which.
717     //If called from RegionScanner.reseek(...) make sure the scanner
718     //stack is reset if needed.
719     checkReseek();
720     if (explicitColumnQuery && lazySeekEnabledGlobally) {
721       return heap.requestSeek(kv, true, useRowColBloom);
722     }
723     return heap.reseek(kv);
724     } finally {
725       lock.unlock();
726     }
727   }
728 
729   @Override
730   public long getSequenceID() {
731     return 0;
732   }
733 
734   /**
735    * Seek storefiles in parallel to optimize IO latency as much as possible
736    * @param scanners the list {@link KeyValueScanner}s to be read from
737    * @param kv the KeyValue on which the operation is being requested
738    * @throws IOException
739    */
740   private void parallelSeek(final List<? extends KeyValueScanner>
741       scanners, final Cell kv) throws IOException {
742     if (scanners.isEmpty()) return;
743     int storeFileScannerCount = scanners.size();
744     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
745     List<ParallelSeekHandler> handlers = 
746         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
747     for (KeyValueScanner scanner : scanners) {
748       if (scanner instanceof StoreFileScanner) {
749         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
750           this.readPt, latch);
751         executor.submit(seekHandler);
752         handlers.add(seekHandler);
753       } else {
754         scanner.seek(kv);
755         latch.countDown();
756       }
757     }
758 
759     try {
760       latch.await();
761     } catch (InterruptedException ie) {
762       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
763     }
764 
765     for (ParallelSeekHandler handler : handlers) {
766       if (handler.getErr() != null) {
767         throw new IOException(handler.getErr());
768       }
769     }
770   }
771 
772   /**
773    * Used in testing.
774    * @return all scanners in no particular order
775    */
776   List<KeyValueScanner> getAllScannersForTesting() {
777     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
778     KeyValueScanner current = heap.getCurrentForTesting();
779     if (current != null)
780       allScanners.add(current);
781     for (KeyValueScanner scanner : heap.getHeap())
782       allScanners.add(scanner);
783     return allScanners;
784   }
785 
786   static void enableLazySeekGlobally(boolean enable) {
787     lazySeekEnabledGlobally = enable;
788   }
789 
790   /**
791    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
792    */
793   public long getEstimatedNumberOfKvsScanned() {
794     return this.kvsScanned;
795   }
796 }
797