View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.DoNotRetryIOException;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.KeyValue.KVComparator;
38  import org.apache.hadoop.hbase.KeyValueUtil;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.client.IsolationLevel;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.executor.ExecutorService;
43  import org.apache.hadoop.hbase.filter.Filter;
44  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
45  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
47  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    // In unit tests, the store could be null
62    protected final Store store;
63    protected ScanQueryMatcher matcher;
64    protected KeyValueHeap heap;
65    protected boolean cacheBlocks;
66  
67    protected long countPerRow = 0;
68    protected int storeLimit = -1;
69    protected int storeOffset = 0;
70  
71    // Used to indicate that the scanner has closed (see HBASE-1107)
72    // Doesnt need to be volatile because it's always accessed via synchronized methods
73    protected boolean closing = false;
74    protected final boolean get;
75    protected final boolean explicitColumnQuery;
76    protected final boolean useRowColBloom;
77    /**
78     * A flag that enables StoreFileScanner parallel-seeking
79     */
80    protected boolean parallelSeekEnabled = false;
81    protected ExecutorService executor;
82    protected final Scan scan;
83    protected final NavigableSet<byte[]> columns;
84    protected final long oldestUnexpiredTS;
85    protected final long now;
86    protected final int minVersions;
87    protected final long maxRowSize;
88    protected final long cellsPerHeartbeatCheck;
89  
90    /**
91     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
92     * KVs skipped via seeking to next row/column. TODO: estimate them?
93     */
94    private long kvsScanned = 0;
95    private Cell prevCell = null;
96  
97    /** We don't ever expect to change this, the constant is just for clarity. */
98    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
99    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
100       "hbase.storescanner.parallel.seek.enable";
101 
102   /** Used during unit testing to ensure that lazy seek does save seek ops */
103   protected static boolean lazySeekEnabledGlobally =
104       LAZY_SEEK_ENABLED_BY_DEFAULT;
105 
106   /**
107    * The number of cells scanned in between timeout checks. Specifying a larger value means that
108    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
109    * timeout checks.
110    */
111   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
112       "hbase.cells.scanned.per.heartbeat.check";
113 
114   /**
115    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
116    */
117   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
118 
119   // if heap == null and lastTop != null, you need to reseek given the key below
120   protected Cell lastTop = null;
121 
122   // A flag whether use pread for scan
123   private boolean scanUsePread = false;
124   protected ReentrantLock lock = new ReentrantLock();
125   
126   private final long readPt;
127 
128   // used by the injection framework to test race between StoreScanner construction and compaction
129   enum StoreScannerCompactionRace {
130     BEFORE_SEEK,
131     AFTER_SEEK,
132     COMPACT_COMPLETE
133   }
134 
135   /** An internal constructor. */
136   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
137       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
138     this.readPt = readPt;
139     this.store = store;
140     this.cacheBlocks = cacheBlocks;
141     get = scan.isGetScan();
142     int numCol = columns == null ? 0 : columns.size();
143     explicitColumnQuery = numCol > 0;
144     this.scan = scan;
145     this.columns = columns;
146     this.now = EnvironmentEdgeManager.currentTime();
147     this.oldestUnexpiredTS = now - scanInfo.getTtl();
148     this.minVersions = scanInfo.getMinVersions();
149 
150      // We look up row-column Bloom filters for multi-column queries as part of
151      // the seek operation. However, we also look the row-column Bloom filter
152      // for multi-row (non-"get") scans because this is not done in
153      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
154      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
155 
156      this.maxRowSize = scanInfo.getTableMaxRowSize();
157      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
158      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
159      // Parallel seeking is on if the config allows and more there is more than one store file.
160      if (this.store != null && this.store.getStorefilesCount() > 1) {
161        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
162        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
163          this.parallelSeekEnabled = true;
164          this.executor = rsService.getExecutorService();
165        }
166      }
167   }
168 
169   /**
170    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
171    * are not in a compaction.
172    *
173    * @param store who we scan
174    * @param scan the spec
175    * @param columns which columns we are scanning
176    * @throws IOException
177    */
178   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
179       long readPt)
180   throws IOException {
181     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
182     if (columns != null && scan.isRaw()) {
183       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
184     }
185     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
186         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
187         oldestUnexpiredTS, now, store.getCoprocessorHost());
188 
189     this.store.addChangedReaderObserver(this);
190 
191     try {
192       // Pass columns to try to filter out unnecessary StoreFiles.
193       List<KeyValueScanner> scanners = getScannersNoCompaction();
194 
195       // Seek all scanners to the start of the Row (or if the exact matching row
196       // key does not exist, then to the start of the next matching Row).
197       // Always check bloom filter to optimize the top row seek for delete
198       // family marker.
199       seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
200         parallelSeekEnabled);
201 
202       // set storeLimit
203       this.storeLimit = scan.getMaxResultsPerColumnFamily();
204 
205       // set rowOffset
206       this.storeOffset = scan.getRowOffsetPerColumnFamily();
207 
208       // Combine all seeked scanners with a heap
209       resetKVHeap(scanners, store.getComparator());
210     } catch (IOException e) {
211       // remove us from the HStore#changedReaderObservers here or we'll have no chance to
212       // and might cause memory leak
213       this.store.deleteChangedReaderObserver(this);
214       throw e;
215     }
216   }
217 
218   /**
219    * Used for compactions.<p>
220    *
221    * Opens a scanner across specified StoreFiles.
222    * @param store who we scan
223    * @param scan the spec
224    * @param scanners ancillary scanners
225    * @param smallestReadPoint the readPoint that we should use for tracking
226    *          versions
227    */
228   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
229       List<? extends KeyValueScanner> scanners, ScanType scanType,
230       long smallestReadPoint, long earliestPutTs) throws IOException {
231     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
232   }
233 
234   /**
235    * Used for compactions that drop deletes from a limited range of rows.<p>
236    *
237    * Opens a scanner across specified StoreFiles.
238    * @param store who we scan
239    * @param scan the spec
240    * @param scanners ancillary scanners
241    * @param smallestReadPoint the readPoint that we should use for tracking versions
242    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
243    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
244    */
245   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
246       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
247       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
248     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
249         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
250   }
251 
252   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
253       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
254       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
255     this(store, scan, scanInfo, null,
256       ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
257     if (dropDeletesFromRow == null) {
258       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
259           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
260     } else {
261       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
262           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
263     }
264 
265     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
266     scanners = selectScannersFrom(scanners);
267 
268     // Seek all scanners to the initial key
269     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
270 
271     // Combine all seeked scanners with a heap
272     resetKVHeap(scanners, store.getComparator());
273   }
274 
275   @VisibleForTesting
276   StoreScanner(final Scan scan, ScanInfo scanInfo,
277       ScanType scanType, final NavigableSet<byte[]> columns,
278       final List<KeyValueScanner> scanners) throws IOException {
279     this(scan, scanInfo, scanType, columns, scanners,
280         HConstants.LATEST_TIMESTAMP,
281         // 0 is passed as readpoint because the test bypasses Store
282         0);
283   }
284 
285   @VisibleForTesting
286   StoreScanner(final Scan scan, ScanInfo scanInfo,
287     ScanType scanType, final NavigableSet<byte[]> columns,
288     final List<KeyValueScanner> scanners, long earliestPutTs)
289         throws IOException {
290     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
291       // 0 is passed as readpoint because the test bypasses Store
292       0);
293   }
294   
295   private StoreScanner(final Scan scan, ScanInfo scanInfo,
296       ScanType scanType, final NavigableSet<byte[]> columns,
297       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
298   throws IOException {
299     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
300     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
301         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
302 
303     // In unit tests, the store could be null
304     if (this.store != null) {
305       this.store.addChangedReaderObserver(this);
306     }
307     // Seek all scanners to the initial key
308     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
309     resetKVHeap(scanners, scanInfo.getComparator());
310   }
311 
312   /**
313    * Get a filtered list of scanners. Assumes we are not in a compaction.
314    * @return list of scanners to seek
315    */
316   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
317     final boolean isCompaction = false;
318     boolean usePread = get || scanUsePread;
319     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
320         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
321   }
322 
323   /**
324    * Seek the specified scanners with the given key
325    * @param scanners
326    * @param seekKey
327    * @param isLazy true if using lazy seek
328    * @param isParallelSeek true if using parallel seek
329    * @throws IOException
330    */
331   protected void seekScanners(List<? extends KeyValueScanner> scanners,
332       Cell seekKey, boolean isLazy, boolean isParallelSeek)
333       throws IOException {
334     // Seek all scanners to the start of the Row (or if the exact matching row
335     // key does not exist, then to the start of the next matching Row).
336     // Always check bloom filter to optimize the top row seek for delete
337     // family marker.
338     if (isLazy) {
339       for (KeyValueScanner scanner : scanners) {
340         scanner.requestSeek(seekKey, false, true);
341       }
342     } else {
343       if (!isParallelSeek) {
344         long totalScannersSoughtBytes = 0;
345         for (KeyValueScanner scanner : scanners) {
346           if (totalScannersSoughtBytes >= maxRowSize) {
347             throw new RowTooBigException("Max row size allowed: " + maxRowSize
348               + ", but row is bigger than that");
349           }
350           scanner.seek(seekKey);
351           Cell c = scanner.peek();
352           if (c != null) {
353             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
354           }
355         }
356       } else {
357         parallelSeek(scanners, seekKey);
358       }
359     }
360   }
361 
362   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
363       KVComparator comparator) throws IOException {
364     // Combine all seeked scanners with a heap
365     heap = new KeyValueHeap(scanners, comparator);
366   }
367 
368   /**
369    * Filters the given list of scanners using Bloom filter, time range, and
370    * TTL.
371    */
372   protected List<KeyValueScanner> selectScannersFrom(
373       final List<? extends KeyValueScanner> allScanners) {
374     boolean memOnly;
375     boolean filesOnly;
376     if (scan instanceof InternalScan) {
377       InternalScan iscan = (InternalScan)scan;
378       memOnly = iscan.isCheckOnlyMemStore();
379       filesOnly = iscan.isCheckOnlyStoreFiles();
380     } else {
381       memOnly = false;
382       filesOnly = false;
383     }
384 
385     List<KeyValueScanner> scanners =
386         new ArrayList<KeyValueScanner>(allScanners.size());
387 
388     // We can only exclude store files based on TTL if minVersions is set to 0.
389     // Otherwise, we might have to return KVs that have technically expired.
390     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
391         Long.MIN_VALUE;
392 
393     // include only those scan files which pass all filters
394     for (KeyValueScanner kvs : allScanners) {
395       boolean isFile = kvs.isFileScanner();
396       if ((!isFile && filesOnly) || (isFile && memOnly)) {
397         continue;
398       }
399 
400       if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
401         scanners.add(kvs);
402       }
403     }
404     return scanners;
405   }
406 
407   @Override
408   public Cell peek() {
409     lock.lock();
410     try {
411     if (this.heap == null) {
412       return this.lastTop;
413     }
414     return this.heap.peek();
415     } finally {
416       lock.unlock();
417     }
418   }
419 
420   @Override
421   public KeyValue next() {
422     // throw runtime exception perhaps?
423     throw new RuntimeException("Never call StoreScanner.next()");
424   }
425 
426   @Override
427   public void close() {
428     lock.lock();
429     try {
430     if (this.closing) return;
431     this.closing = true;
432     // Under test, we dont have a this.store
433     if (this.store != null)
434       this.store.deleteChangedReaderObserver(this);
435     if (this.heap != null)
436       this.heap.close();
437     this.heap = null; // CLOSED!
438     this.lastTop = null; // If both are null, we are closed.
439     } finally {
440       lock.unlock();
441     }
442   }
443 
444   @Override
445   public boolean seek(Cell key) throws IOException {
446     lock.lock();
447     try {
448     // reset matcher state, in case that underlying store changed
449     checkReseek();
450     return this.heap.seek(key);
451     } finally {
452       lock.unlock();
453     }
454   }
455 
456   @Override
457   public boolean next(List<Cell> outResult) throws IOException {
458     return next(outResult, NoLimitScannerContext.getInstance());
459   }
460 
461   /**
462    * Get the next row of values from this Store.
463    * @param outResult
464    * @param scannerContext
465    * @return true if there are more rows, false if scanner is done
466    */
467   @Override
468   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
469     lock.lock();
470 
471     try {
472     if (scannerContext == null) {
473       throw new IllegalArgumentException("Scanner context cannot be null");
474     }
475     if (checkReseek()) {
476       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
477     }
478 
479     // if the heap was left null, then the scanners had previously run out anyways, close and
480     // return.
481     if (this.heap == null) {
482       close();
483       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
484     }
485 
486     Cell cell = this.heap.peek();
487     if (cell == null) {
488       close();
489       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
490     }
491 
492     // only call setRow if the row changes; avoids confusing the query matcher
493     // if scanning intra-row
494     byte[] row = cell.getRowArray();
495     int offset = cell.getRowOffset();
496     short length = cell.getRowLength();
497 
498     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
499     // rows. Else it is possible we are still traversing the same row so we must perform the row
500     // comparison.
501     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null) {
502       this.countPerRow = 0;
503       matcher.setRow(row, offset, length);
504     }
505 
506     // Clear progress away unless invoker has indicated it should be kept.
507     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
508     
509     // Only do a sanity-check if store and comparator are available.
510     KeyValue.KVComparator comparator =
511         store != null ? store.getComparator() : null;
512 
513     int count = 0;
514     long totalBytesRead = 0;
515 
516     LOOP: do {
517       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
518       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
519         scannerContext.updateTimeProgress();
520         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
521           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
522         }
523       }
524 
525       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
526       checkScanOrder(prevCell, cell, comparator);
527       prevCell = cell;
528 
529       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
530       qcode = optimize(qcode, cell);
531       switch(qcode) {
532         case INCLUDE:
533         case INCLUDE_AND_SEEK_NEXT_ROW:
534         case INCLUDE_AND_SEEK_NEXT_COL:
535 
536           Filter f = matcher.getFilter();
537           if (f != null) {
538             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
539             cell = f.transformCell(cell);
540           }
541 
542           this.countPerRow++;
543           if (storeLimit > -1 &&
544               this.countPerRow > (storeLimit + storeOffset)) {
545             // do what SEEK_NEXT_ROW does.
546             if (!matcher.moreRowsMayExistAfter(cell)) {
547               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
548             }
549             // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
550             // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
551             // another compareRow to say the current row is DONE
552             matcher.row = null;
553             seekToNextRow(cell);
554             break LOOP;
555           }
556 
557           // add to results only if we have skipped #storeOffset kvs
558           // also update metric accordingly
559           if (this.countPerRow > storeOffset) {
560             outResult.add(cell);
561 
562             // Update local tracking information
563             count++;
564             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
565 
566             // Update the progress of the scanner context
567             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
568             scannerContext.incrementBatchProgress(1);
569 
570             if (totalBytesRead > maxRowSize) {
571               throw new RowTooBigException("Max row size allowed: " + maxRowSize
572                   + ", but the row is bigger than that.");
573             }
574           }
575 
576           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
577             if (!matcher.moreRowsMayExistAfter(cell)) {
578               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
579             }
580             // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
581             // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
582             // another compareRow to say the current row is DONE
583             matcher.row = null;
584             seekToNextRow(cell);
585           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
586             seekAsDirection(matcher.getKeyForNextColumn(cell));
587           } else {
588             this.heap.next();
589           }
590 
591           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
592             break LOOP;
593           }
594           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
595             break LOOP;
596           }
597           continue;
598 
599         case DONE:
600           // We are sure that this row is done and we are in the next row.
601           // So subsequent StoresScanner.next() call need not do another compare
602           // and set the matcher.row
603           matcher.row = null;
604           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
605 
606         case DONE_SCAN:
607           close();
608           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
609 
610         case SEEK_NEXT_ROW:
611           // This is just a relatively simple end of scan fix, to short-cut end
612           // us if there is an endKey in the scan.
613           if (!matcher.moreRowsMayExistAfter(cell)) {
614             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
615           }
616           // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
617           // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
618           // another compareRow to say the current row is DONE
619           matcher.row = null;
620           seekToNextRow(cell);
621           break;
622 
623         case SEEK_NEXT_COL:
624           seekAsDirection(matcher.getKeyForNextColumn(cell));
625           break;
626 
627         case SKIP:
628           this.heap.next();
629           break;
630 
631         case SEEK_NEXT_USING_HINT:
632           // TODO convert resee to Cell?
633           Cell nextKV = matcher.getNextKeyHint(cell);
634           if (nextKV != null) {
635             seekAsDirection(nextKV);
636           } else {
637             heap.next();
638           }
639           break;
640 
641         default:
642           throw new RuntimeException("UNEXPECTED");
643       }
644     } while((cell = this.heap.peek()) != null);
645 
646     if (count > 0) {
647       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
648     }
649 
650     // No more keys
651     close();
652     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
653     } finally {
654       lock.unlock();
655     }
656   }
657 
658   /*
659    * See if we should actually SEEK or rather just SKIP to the next Cell.
660    * (see HBASE-13109)
661    */
662   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
663     switch(qcode) {
664     case INCLUDE_AND_SEEK_NEXT_COL:
665     case SEEK_NEXT_COL:
666     {
667       Cell nextIndexedKey = getNextIndexedKey();
668       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
669           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
670         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
671       }
672       break;
673     }
674     case INCLUDE_AND_SEEK_NEXT_ROW:
675     case SEEK_NEXT_ROW:
676     {
677       Cell nextIndexedKey = getNextIndexedKey();
678       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
679           && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
680         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
681       }
682       break;
683     }
684     default:
685       break;
686     }
687     return qcode;
688   }
689 
690   // Implementation of ChangedReadersObserver
691   @Override
692   public void updateReaders() throws IOException {
693     lock.lock();
694     try {
695     if (this.closing) return;
696 
697     // All public synchronized API calls will call 'checkReseek' which will cause
698     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
699     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
700     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
701     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
702     if (this.heap == null) return;
703 
704     // this could be null.
705     this.lastTop = this.peek();
706 
707     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
708 
709     // close scanners to old obsolete Store files
710     this.heap.close(); // bubble thru and close all scanners.
711     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
712 
713     // Let the next() call handle re-creating and seeking
714     } finally {
715       lock.unlock();
716     }
717   }
718 
719   /**
720    * @return true if top of heap has changed (and KeyValueHeap has to try the
721    *         next KV)
722    * @throws IOException
723    */
724   protected boolean checkReseek() throws IOException {
725     if (this.heap == null && this.lastTop != null) {
726       resetScannerStack(this.lastTop);
727       if (this.heap.peek() == null
728           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
729         LOG.debug("Storescanner.peek() is changed where before = "
730             + this.lastTop.toString() + ",and after = " + this.heap.peek());
731         this.lastTop = null;
732         return true;
733       }
734       this.lastTop = null; // gone!
735     }
736     // else dont need to reseek
737     return false;
738   }
739 
740   protected void resetScannerStack(Cell lastTopKey) throws IOException {
741     if (heap != null) {
742       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
743     }
744 
745     /* When we have the scan object, should we not pass it to getScanners()
746      * to get a limited set of scanners? We did so in the constructor and we
747      * could have done it now by storing the scan object from the constructor */
748     List<KeyValueScanner> scanners = getScannersNoCompaction();
749 
750     // Seek all scanners to the initial key
751     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
752 
753     // Combine all seeked scanners with a heap
754     resetKVHeap(scanners, store.getComparator());
755 
756     // Reset the state of the Query Matcher and set to top row.
757     // Only reset and call setRow if the row changes; avoids confusing the
758     // query matcher if scanning intra-row.
759     Cell kv = heap.peek();
760     if (kv == null) {
761       kv = lastTopKey;
762     }
763     byte[] row = kv.getRowArray();
764     int offset = kv.getRowOffset();
765     short length = kv.getRowLength();
766     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
767         matcher.rowOffset, matcher.rowLength)) {
768       this.countPerRow = 0;
769       matcher.reset();
770       matcher.setRow(row, offset, length);
771     }
772   }
773 
774   /**
775    * Check whether scan as expected order
776    * @param prevKV
777    * @param kv
778    * @param comparator
779    * @throws IOException
780    */
781   protected void checkScanOrder(Cell prevKV, Cell kv,
782       KeyValue.KVComparator comparator) throws IOException {
783     // Check that the heap gives us KVs in an increasing order.
784     assert prevKV == null || comparator == null
785         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
786         + " followed by a " + "smaller key " + kv + " in cf " + store;
787   }
788 
789   protected boolean seekToNextRow(Cell kv) throws IOException {
790     return reseek(KeyValueUtil.createLastOnRow(kv));
791   }
792 
793   /**
794    * Do a reseek in a normal StoreScanner(scan forward)
795    * @param kv
796    * @return true if scanner has values left, false if end of scanner
797    * @throws IOException
798    */
799   protected boolean seekAsDirection(Cell kv)
800       throws IOException {
801     return reseek(kv);
802   }
803 
804   @Override
805   public boolean reseek(Cell kv) throws IOException {
806     lock.lock();
807     try {
808     //Heap will not be null, if this is called from next() which.
809     //If called from RegionScanner.reseek(...) make sure the scanner
810     //stack is reset if needed.
811     checkReseek();
812     if (explicitColumnQuery && lazySeekEnabledGlobally) {
813       return heap.requestSeek(kv, true, useRowColBloom);
814     }
815     return heap.reseek(kv);
816     } finally {
817       lock.unlock();
818     }
819   }
820 
821   @Override
822   public long getSequenceID() {
823     return 0;
824   }
825 
826   /**
827    * Seek storefiles in parallel to optimize IO latency as much as possible
828    * @param scanners the list {@link KeyValueScanner}s to be read from
829    * @param kv the KeyValue on which the operation is being requested
830    * @throws IOException
831    */
832   private void parallelSeek(final List<? extends KeyValueScanner>
833       scanners, final Cell kv) throws IOException {
834     if (scanners.isEmpty()) return;
835     int storeFileScannerCount = scanners.size();
836     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
837     List<ParallelSeekHandler> handlers = 
838         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
839     for (KeyValueScanner scanner : scanners) {
840       if (scanner instanceof StoreFileScanner) {
841         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
842           this.readPt, latch);
843         executor.submit(seekHandler);
844         handlers.add(seekHandler);
845       } else {
846         scanner.seek(kv);
847         latch.countDown();
848       }
849     }
850 
851     try {
852       latch.await();
853     } catch (InterruptedException ie) {
854       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
855     }
856 
857     for (ParallelSeekHandler handler : handlers) {
858       if (handler.getErr() != null) {
859         throw new IOException(handler.getErr());
860       }
861     }
862   }
863 
864   /**
865    * Used in testing.
866    * @return all scanners in no particular order
867    */
868   List<KeyValueScanner> getAllScannersForTesting() {
869     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
870     KeyValueScanner current = heap.getCurrentForTesting();
871     if (current != null)
872       allScanners.add(current);
873     for (KeyValueScanner scanner : heap.getHeap())
874       allScanners.add(scanner);
875     return allScanners;
876   }
877 
878   static void enableLazySeekGlobally(boolean enable) {
879     lazySeekEnabledGlobally = enable;
880   }
881 
882   /**
883    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
884    */
885   public long getEstimatedNumberOfKvsScanned() {
886     return this.kvsScanned;
887   }
888 
889   @Override
890   public Cell getNextIndexedKey() {
891     return this.heap.getNextIndexedKey();
892   }
893 }
894