View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.DoNotRetryIOException;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.KVComparator;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  
49  /**
50   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
51   * into List<KeyValue> for a single row.
52   */
53  @InterfaceAudience.Private
54  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
55      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
56    static final Log LOG = LogFactory.getLog(StoreScanner.class);
57    protected Store store;
58    protected ScanQueryMatcher matcher;
59    protected KeyValueHeap heap;
60    protected boolean cacheBlocks;
61  
62    protected int countPerRow = 0;
63    protected int storeLimit = -1;
64    protected int storeOffset = 0;
65  
66    // Used to indicate that the scanner has closed (see HBASE-1107)
67    // Doesnt need to be volatile because it's always accessed via synchronized methods
68    protected boolean closing = false;
69    protected final boolean isGet;
70    protected final boolean explicitColumnQuery;
71    protected final boolean useRowColBloom;
72    /**
73     * A flag that enables StoreFileScanner parallel-seeking
74     */
75    protected boolean isParallelSeekEnabled = false;
76    protected ExecutorService executor;
77    protected final Scan scan;
78    protected final NavigableSet<byte[]> columns;
79    protected final long oldestUnexpiredTS;
80    protected final int minVersions;
81    protected final long maxRowSize;
82  
83    /**
84     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
85     * KVs skipped via seeking to next row/column. TODO: estimate them?
86     */
87    private long kvsScanned = 0;
88    private Cell prevCell = null;
89  
90    /** We don't ever expect to change this, the constant is just for clarity. */
91    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
92    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
93        "hbase.storescanner.parallel.seek.enable";
94  
95    /** Used during unit testing to ensure that lazy seek does save seek ops */
96    protected static boolean lazySeekEnabledGlobally =
97        LAZY_SEEK_ENABLED_BY_DEFAULT;
98  
99    // if heap == null and lastTop != null, you need to reseek given the key below
100   protected Cell lastTop = null;
101 
102   // A flag whether use pread for scan
103   private boolean scanUsePread = false;
104   protected ReentrantLock lock = new ReentrantLock();
105   
106   private final long readPt;
107 
108   // used by the injection framework to test race between StoreScanner construction and compaction
109   enum StoreScannerCompactionRace {
110     BEFORE_SEEK,
111     AFTER_SEEK,
112     COMPACT_COMPLETE
113   }
114   
115   /** An internal constructor. */
116   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
117       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
118     this.readPt = readPt;
119     this.store = store;
120     this.cacheBlocks = cacheBlocks;
121     isGet = scan.isGetScan();
122     int numCol = columns == null ? 0 : columns.size();
123     explicitColumnQuery = numCol > 0;
124     this.scan = scan;
125     this.columns = columns;
126     oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
127     this.minVersions = minVersions;
128 
129     if (store != null && ((HStore)store).getHRegion() != null
130         && ((HStore)store).getHRegion().getBaseConf() != null) {
131       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
132       this.maxRowSize =
133           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
134       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
135     } else {
136       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
137       this.scanUsePread = scan.isSmall();
138     }
139 
140     // We look up row-column Bloom filters for multi-column queries as part of
141     // the seek operation. However, we also look the row-column Bloom filter
142     // for multi-row (non-"get") scans because this is not done in
143     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
144     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
145 
146     // The parallel-seeking is on :
147     // 1) the config value is *true*
148     // 2) store has more than one store file
149     if (store != null && ((HStore)store).getHRegion() != null
150         && store.getStorefilesCount() > 1) {
151       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
152       if (rsService == null || !rsService.getConfiguration().getBoolean(
153             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
154       isParallelSeekEnabled = true;
155       executor = rsService.getExecutorService();
156     }
157   }
158 
159   /**
160    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
161    * are not in a compaction.
162    *
163    * @param store who we scan
164    * @param scan the spec
165    * @param columns which columns we are scanning
166    * @throws IOException
167    */
168   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
169       long readPt)
170                               throws IOException {
171     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
172         scanInfo.getMinVersions(), readPt);
173     if (columns != null && scan.isRaw()) {
174       throw new DoNotRetryIOException(
175           "Cannot specify any column for a raw scan");
176     }
177     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
178         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
179         oldestUnexpiredTS, store.getCoprocessorHost());
180 
181     this.store.addChangedReaderObserver(this);
182 
183     // Pass columns to try to filter out unnecessary StoreFiles.
184     List<KeyValueScanner> scanners = getScannersNoCompaction();
185 
186     // Seek all scanners to the start of the Row (or if the exact matching row
187     // key does not exist, then to the start of the next matching Row).
188     // Always check bloom filter to optimize the top row seek for delete
189     // family marker.
190     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
191         && lazySeekEnabledGlobally, isParallelSeekEnabled);
192 
193     // set storeLimit
194     this.storeLimit = scan.getMaxResultsPerColumnFamily();
195 
196     // set rowOffset
197     this.storeOffset = scan.getRowOffsetPerColumnFamily();
198 
199     // Combine all seeked scanners with a heap
200     resetKVHeap(scanners, store.getComparator());
201   }
202 
203   /**
204    * Used for compactions.<p>
205    *
206    * Opens a scanner across specified StoreFiles.
207    * @param store who we scan
208    * @param scan the spec
209    * @param scanners ancillary scanners
210    * @param smallestReadPoint the readPoint that we should use for tracking
211    *          versions
212    */
213   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
214       List<? extends KeyValueScanner> scanners, ScanType scanType,
215       long smallestReadPoint, long earliestPutTs) throws IOException {
216     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
217   }
218 
219   /**
220    * Used for compactions that drop deletes from a limited range of rows.<p>
221    *
222    * Opens a scanner across specified StoreFiles.
223    * @param store who we scan
224    * @param scan the spec
225    * @param scanners ancillary scanners
226    * @param smallestReadPoint the readPoint that we should use for tracking versions
227    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
228    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
229    */
230   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
231       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
232       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
233     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
234         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
235   }
236 
237   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
238       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
239       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
240     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
241         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
242     if (dropDeletesFromRow == null) {
243       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
244           earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
245     } else {
246       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
247           oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
248     }
249 
250     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
251     scanners = selectScannersFrom(scanners);
252 
253     // Seek all scanners to the initial key
254     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
255 
256     // Combine all seeked scanners with a heap
257     resetKVHeap(scanners, store.getComparator());
258   }
259 
260   /** Constructor for testing. */
261   StoreScanner(final Scan scan, ScanInfo scanInfo,
262       ScanType scanType, final NavigableSet<byte[]> columns,
263       final List<KeyValueScanner> scanners) throws IOException {
264     this(scan, scanInfo, scanType, columns, scanners,
265         HConstants.LATEST_TIMESTAMP,
266         // 0 is passed as readpoint because the test bypasses Store
267         0);
268   }
269 
270   // Constructor for testing.
271   StoreScanner(final Scan scan, ScanInfo scanInfo,
272     ScanType scanType, final NavigableSet<byte[]> columns,
273     final List<KeyValueScanner> scanners, long earliestPutTs)
274         throws IOException {
275     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
276       // 0 is passed as readpoint because the test bypasses Store
277       0);
278   }
279   
280   private StoreScanner(final Scan scan, ScanInfo scanInfo,
281       ScanType scanType, final NavigableSet<byte[]> columns,
282       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
283           throws IOException {
284     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
285         scanInfo.getMinVersions(), readPt);
286     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
287         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
288 
289     // In unit tests, the store could be null
290     if (this.store != null) {
291       this.store.addChangedReaderObserver(this);
292     }
293     // Seek all scanners to the initial key
294     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
295     resetKVHeap(scanners, scanInfo.getComparator());
296   }
297 
298   /**
299    * Get a filtered list of scanners. Assumes we are not in a compaction.
300    * @return list of scanners to seek
301    */
302   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
303     final boolean isCompaction = false;
304     boolean usePread = isGet || scanUsePread;
305     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
306         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
307   }
308 
309   /**
310    * Seek the specified scanners with the given key
311    * @param scanners
312    * @param seekKey
313    * @param isLazy true if using lazy seek
314    * @param isParallelSeek true if using parallel seek
315    * @throws IOException
316    */
317   protected void seekScanners(List<? extends KeyValueScanner> scanners,
318       Cell seekKey, boolean isLazy, boolean isParallelSeek)
319       throws IOException {
320     // Seek all scanners to the start of the Row (or if the exact matching row
321     // key does not exist, then to the start of the next matching Row).
322     // Always check bloom filter to optimize the top row seek for delete
323     // family marker.
324     if (isLazy) {
325       for (KeyValueScanner scanner : scanners) {
326         scanner.requestSeek(seekKey, false, true);
327       }
328     } else {
329       if (!isParallelSeek) {
330         long totalScannersSoughtBytes = 0;
331         for (KeyValueScanner scanner : scanners) {
332           if (totalScannersSoughtBytes >= maxRowSize) {
333             throw new RowTooBigException("Max row size allowed: " + maxRowSize
334               + ", but row is bigger than that");
335           }
336           scanner.seek(seekKey);
337           Cell c = scanner.peek();
338           if (c != null ) {
339             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
340           }
341         }
342       } else {
343         parallelSeek(scanners, seekKey);
344       }
345     }
346   }
347 
348   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
349       KVComparator comparator) throws IOException {
350     // Combine all seeked scanners with a heap
351     heap = new KeyValueHeap(scanners, comparator);
352   }
353 
354   /**
355    * Filters the given list of scanners using Bloom filter, time range, and
356    * TTL.
357    */
358   protected List<KeyValueScanner> selectScannersFrom(
359       final List<? extends KeyValueScanner> allScanners) {
360     boolean memOnly;
361     boolean filesOnly;
362     if (scan instanceof InternalScan) {
363       InternalScan iscan = (InternalScan)scan;
364       memOnly = iscan.isCheckOnlyMemStore();
365       filesOnly = iscan.isCheckOnlyStoreFiles();
366     } else {
367       memOnly = false;
368       filesOnly = false;
369     }
370 
371     List<KeyValueScanner> scanners =
372         new ArrayList<KeyValueScanner>(allScanners.size());
373 
374     // We can only exclude store files based on TTL if minVersions is set to 0.
375     // Otherwise, we might have to return KVs that have technically expired.
376     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
377         Long.MIN_VALUE;
378 
379     // include only those scan files which pass all filters
380     for (KeyValueScanner kvs : allScanners) {
381       boolean isFile = kvs.isFileScanner();
382       if ((!isFile && filesOnly) || (isFile && memOnly)) {
383         continue;
384       }
385 
386       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
387         scanners.add(kvs);
388       }
389     }
390     return scanners;
391   }
392 
393   @Override
394   public Cell peek() {
395     lock.lock();
396     try {
397     if (this.heap == null) {
398       return this.lastTop;
399     }
400     return this.heap.peek();
401     } finally {
402       lock.unlock();
403     }
404   }
405 
406   @Override
407   public KeyValue next() {
408     // throw runtime exception perhaps?
409     throw new RuntimeException("Never call StoreScanner.next()");
410   }
411 
412   @Override
413   public void close() {
414     lock.lock();
415     try {
416     if (this.closing) return;
417     this.closing = true;
418     // under test, we dont have a this.store
419     if (this.store != null)
420       this.store.deleteChangedReaderObserver(this);
421     if (this.heap != null)
422       this.heap.close();
423     this.heap = null; // CLOSED!
424     this.lastTop = null; // If both are null, we are closed.
425     } finally {
426       lock.unlock();
427     }
428   }
429 
430   @Override
431   public boolean seek(Cell key) throws IOException {
432     lock.lock();
433     try {
434     // reset matcher state, in case that underlying store changed
435     checkReseek();
436     return this.heap.seek(key);
437     } finally {
438       lock.unlock();
439     }
440   }
441 
442   /**
443    * Get the next row of values from this Store.
444    * @param outResult
445    * @param limit
446    * @return true if there are more rows, false if scanner is done
447    */
448   @Override
449   public boolean next(List<Cell> outResult, int limit) throws IOException {
450     lock.lock();
451     try {
452     if (checkReseek()) {
453       return true;
454     }
455 
456     // if the heap was left null, then the scanners had previously run out anyways, close and
457     // return.
458     if (this.heap == null) {
459       close();
460       return false;
461     }
462 
463     Cell peeked = this.heap.peek();
464     if (peeked == null) {
465       close();
466       return false;
467     }
468 
469     // only call setRow if the row changes; avoids confusing the query matcher
470     // if scanning intra-row
471     byte[] row = peeked.getRowArray();
472     int offset = peeked.getRowOffset();
473     short length = peeked.getRowLength();
474     if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
475         matcher.rowOffset, matcher.rowLength)) {
476       this.countPerRow = 0;
477       matcher.setRow(row, offset, length);
478     }
479 
480     Cell cell;
481 
482     // Only do a sanity-check if store and comparator are available.
483     KeyValue.KVComparator comparator =
484         store != null ? store.getComparator() : null;
485 
486     int count = 0;
487     long totalBytesRead = 0;
488 
489     LOOP: while((cell = this.heap.peek()) != null) {
490       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
491       checkScanOrder(prevCell, cell, comparator);
492       prevCell = cell;
493 
494       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
495       switch(qcode) {
496         case INCLUDE:
497         case INCLUDE_AND_SEEK_NEXT_ROW:
498         case INCLUDE_AND_SEEK_NEXT_COL:
499 
500           Filter f = matcher.getFilter();
501           if (f != null) {
502             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
503             cell = f.transformCell(cell);
504           }
505 
506           this.countPerRow++;
507           if (storeLimit > -1 &&
508               this.countPerRow > (storeLimit + storeOffset)) {
509             // do what SEEK_NEXT_ROW does.
510             if (!matcher.moreRowsMayExistAfter(cell)) {
511               return false;
512             }
513             seekToNextRow(cell);
514             break LOOP;
515           }
516 
517           // add to results only if we have skipped #storeOffset kvs
518           // also update metric accordingly
519           if (this.countPerRow > storeOffset) {
520             outResult.add(cell);
521             count++;
522             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
523             if (totalBytesRead > maxRowSize) {
524               throw new RowTooBigException("Max row size allowed: " + maxRowSize
525               + ", but the row is bigger than that.");
526             }
527           }
528 
529           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
530             if (!matcher.moreRowsMayExistAfter(cell)) {
531               return false;
532             }
533             seekToNextRow(cell);
534           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
535             seekAsDirection(matcher.getKeyForNextColumn(cell));
536           } else {
537             this.heap.next();
538           }
539 
540           if (limit > 0 && (count == limit)) {
541             break LOOP;
542           }
543           continue;
544 
545         case DONE:
546           return true;
547 
548         case DONE_SCAN:
549           close();
550           return false;
551 
552         case SEEK_NEXT_ROW:
553           // This is just a relatively simple end of scan fix, to short-cut end
554           // us if there is an endKey in the scan.
555           if (!matcher.moreRowsMayExistAfter(cell)) {
556             return false;
557           }
558 
559           seekToNextRow(cell);
560           break;
561 
562         case SEEK_NEXT_COL:
563           seekAsDirection(matcher.getKeyForNextColumn(cell));
564           break;
565 
566         case SKIP:
567           this.heap.next();
568           break;
569 
570         case SEEK_NEXT_USING_HINT:
571           // TODO convert resee to Cell?
572           Cell nextKV = matcher.getNextKeyHint(cell);
573           if (nextKV != null) {
574             seekAsDirection(nextKV);
575           } else {
576             heap.next();
577           }
578           break;
579 
580         default:
581           throw new RuntimeException("UNEXPECTED");
582       }
583     }
584 
585     if (count > 0) {
586       return true;
587     }
588 
589     // No more keys
590     close();
591     return false;
592     } finally {
593       lock.unlock();
594     }
595   }
596 
597   @Override
598   public boolean next(List<Cell> outResult) throws IOException {
599     return next(outResult, -1);
600   }
601 
602   // Implementation of ChangedReadersObserver
603   @Override
604   public void updateReaders() throws IOException {
605     lock.lock();
606     try {
607     if (this.closing) return;
608 
609     // All public synchronized API calls will call 'checkReseek' which will cause
610     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
611     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
612     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
613     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
614     if (this.heap == null) return;
615 
616     // this could be null.
617     this.lastTop = this.peek();
618 
619     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
620 
621     // close scanners to old obsolete Store files
622     this.heap.close(); // bubble thru and close all scanners.
623     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
624 
625     // Let the next() call handle re-creating and seeking
626     } finally {
627       lock.unlock();
628     }
629   }
630 
631   /**
632    * @return true if top of heap has changed (and KeyValueHeap has to try the
633    *         next KV)
634    * @throws IOException
635    */
636   protected boolean checkReseek() throws IOException {
637     if (this.heap == null && this.lastTop != null) {
638       resetScannerStack(this.lastTop);
639       if (this.heap.peek() == null
640           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
641         LOG.debug("Storescanner.peek() is changed where before = "
642             + this.lastTop.toString() + ",and after = " + this.heap.peek());
643         this.lastTop = null;
644         return true;
645       }
646       this.lastTop = null; // gone!
647     }
648     // else dont need to reseek
649     return false;
650   }
651 
652   protected void resetScannerStack(Cell lastTopKey) throws IOException {
653     if (heap != null) {
654       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
655     }
656 
657     /* When we have the scan object, should we not pass it to getScanners()
658      * to get a limited set of scanners? We did so in the constructor and we
659      * could have done it now by storing the scan object from the constructor */
660     List<KeyValueScanner> scanners = getScannersNoCompaction();
661 
662     // Seek all scanners to the initial key
663     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
664 
665     // Combine all seeked scanners with a heap
666     resetKVHeap(scanners, store.getComparator());
667 
668     // Reset the state of the Query Matcher and set to top row.
669     // Only reset and call setRow if the row changes; avoids confusing the
670     // query matcher if scanning intra-row.
671     Cell kv = heap.peek();
672     if (kv == null) {
673       kv = lastTopKey;
674     }
675     byte[] row = kv.getRowArray();
676     int offset = kv.getRowOffset();
677     short length = kv.getRowLength();
678     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
679         matcher.rowOffset, matcher.rowLength)) {
680       this.countPerRow = 0;
681       matcher.reset();
682       matcher.setRow(row, offset, length);
683     }
684   }
685 
686   /**
687    * Check whether scan as expected order
688    * @param prevKV
689    * @param kv
690    * @param comparator
691    * @throws IOException
692    */
693   protected void checkScanOrder(Cell prevKV, Cell kv,
694       KeyValue.KVComparator comparator) throws IOException {
695     // Check that the heap gives us KVs in an increasing order.
696     assert prevKV == null || comparator == null
697         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
698         + " followed by a " + "smaller key " + kv + " in cf " + store;
699   }
700 
701   protected boolean seekToNextRow(Cell kv) throws IOException {
702     return reseek(KeyValueUtil.createLastOnRow(kv));
703   }
704 
705   /**
706    * Do a reseek in a normal StoreScanner(scan forward)
707    * @param kv
708    * @return true if scanner has values left, false if end of scanner
709    * @throws IOException
710    */
711   protected boolean seekAsDirection(Cell kv)
712       throws IOException {
713     return reseek(kv);
714   }
715 
716   @Override
717   public boolean reseek(Cell kv) throws IOException {
718     lock.lock();
719     try {
720     //Heap will not be null, if this is called from next() which.
721     //If called from RegionScanner.reseek(...) make sure the scanner
722     //stack is reset if needed.
723     checkReseek();
724     if (explicitColumnQuery && lazySeekEnabledGlobally) {
725       return heap.requestSeek(kv, true, useRowColBloom);
726     }
727     return heap.reseek(kv);
728     } finally {
729       lock.unlock();
730     }
731   }
732 
733   @Override
734   public long getSequenceID() {
735     return 0;
736   }
737 
738   /**
739    * Seek storefiles in parallel to optimize IO latency as much as possible
740    * @param scanners the list {@link KeyValueScanner}s to be read from
741    * @param kv the KeyValue on which the operation is being requested
742    * @throws IOException
743    */
744   private void parallelSeek(final List<? extends KeyValueScanner>
745       scanners, final Cell kv) throws IOException {
746     if (scanners.isEmpty()) return;
747     int storeFileScannerCount = scanners.size();
748     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
749     List<ParallelSeekHandler> handlers = 
750         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
751     for (KeyValueScanner scanner : scanners) {
752       if (scanner instanceof StoreFileScanner) {
753         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
754           this.readPt, latch);
755         executor.submit(seekHandler);
756         handlers.add(seekHandler);
757       } else {
758         scanner.seek(kv);
759         latch.countDown();
760       }
761     }
762 
763     try {
764       latch.await();
765     } catch (InterruptedException ie) {
766       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
767     }
768 
769     for (ParallelSeekHandler handler : handlers) {
770       if (handler.getErr() != null) {
771         throw new IOException(handler.getErr());
772       }
773     }
774   }
775 
776   /**
777    * Used in testing.
778    * @return all scanners in no particular order
779    */
780   List<KeyValueScanner> getAllScannersForTesting() {
781     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
782     KeyValueScanner current = heap.getCurrentForTesting();
783     if (current != null)
784       allScanners.add(current);
785     for (KeyValueScanner scanner : heap.getHeap())
786       allScanners.add(scanner);
787     return allScanners;
788   }
789 
790   static void enableLazySeekGlobally(boolean enable) {
791     lazySeekEnabledGlobally = enable;
792   }
793 
794   /**
795    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
796    */
797   public long getEstimatedNumberOfKvsScanned() {
798     return this.kvsScanned;
799   }
800 }
801