View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellComparator;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.DoNotRetryIOException;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    // In unit tests, the store could be null
62    protected final Store store;
63    protected ScanQueryMatcher matcher;
64    protected KeyValueHeap heap;
65    protected boolean cacheBlocks;
66  
67    protected long countPerRow = 0;
68    protected int storeLimit = -1;
69    protected int storeOffset = 0;
70  
71    // Used to indicate that the scanner has closed (see HBASE-1107)
72    // Doesnt need to be volatile because it's always accessed via synchronized methods
73    protected boolean closing = false;
74    protected final boolean get;
75    protected final boolean explicitColumnQuery;
76    protected final boolean useRowColBloom;
77    /**
78     * A flag that enables StoreFileScanner parallel-seeking
79     */
80    protected boolean parallelSeekEnabled = false;
81    protected ExecutorService executor;
82    protected final Scan scan;
83    protected final NavigableSet<byte[]> columns;
84    protected final long oldestUnexpiredTS;
85    protected final long now;
86    protected final int minVersions;
87    protected final long maxRowSize;
88    protected final long cellsPerHeartbeatCheck;
89  
90    // Collects all the KVHeap that are eagerly getting closed during the
91    // course of a scan
92    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
93  
94    /**
95     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
96     * KVs skipped via seeking to next row/column. TODO: estimate them?
97     */
98    private long kvsScanned = 0;
99    private Cell prevCell = null;
100 
101   /** We don't ever expect to change this, the constant is just for clarity. */
102   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
103   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
104       "hbase.storescanner.parallel.seek.enable";
105 
106   /** Used during unit testing to ensure that lazy seek does save seek ops */
107   protected static boolean lazySeekEnabledGlobally =
108       LAZY_SEEK_ENABLED_BY_DEFAULT;
109 
110   /**
111    * The number of cells scanned in between timeout checks. Specifying a larger value means that
112    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
113    * timeout checks.
114    */
115   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
116       "hbase.cells.scanned.per.heartbeat.check";
117 
118   /**
119    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
120    */
121   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
122 
123   // if heap == null and lastTop != null, you need to reseek given the key below
124   protected Cell lastTop = null;
125 
126   // A flag whether use pread for scan
127   private boolean scanUsePread = false;
128   // Indicates whether there was flush during the course of the scan
129   protected volatile boolean flushed = false;
130   // generally we get one file from a flush
131   protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
132   // The current list of scanners
133   protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
134   // flush update lock
135   private ReentrantLock flushLock = new ReentrantLock();
136   
137   protected final long readPt;
138 
139   // used by the injection framework to test race between StoreScanner construction and compaction
140   enum StoreScannerCompactionRace {
141     BEFORE_SEEK,
142     AFTER_SEEK,
143     COMPACT_COMPLETE
144   }
145 
146   /** An internal constructor. */
147   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
148       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
149     this.readPt = readPt;
150     this.store = store;
151     this.cacheBlocks = cacheBlocks;
152     get = scan.isGetScan();
153     int numCol = columns == null ? 0 : columns.size();
154     explicitColumnQuery = numCol > 0;
155     this.scan = scan;
156     this.columns = columns;
157     this.now = EnvironmentEdgeManager.currentTime();
158     this.oldestUnexpiredTS = now - scanInfo.getTtl();
159     this.minVersions = scanInfo.getMinVersions();
160 
161      // We look up row-column Bloom filters for multi-column queries as part of
162      // the seek operation. However, we also look the row-column Bloom filter
163      // for multi-row (non-"get") scans because this is not done in
164      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
165      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
166 
167      this.maxRowSize = scanInfo.getTableMaxRowSize();
168      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
169      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
170      // Parallel seeking is on if the config allows and more there is more than one store file.
171      if (this.store != null && this.store.getStorefilesCount() > 1) {
172        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
173        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
174          this.parallelSeekEnabled = true;
175          this.executor = rsService.getExecutorService();
176        }
177      }
178   }
179 
180   protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
181     this.currentScanners.addAll(scanners);
182   }
183   /**
184    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
185    * are not in a compaction.
186    *
187    * @param store who we scan
188    * @param scan the spec
189    * @param columns which columns we are scanning
190    * @throws IOException
191    */
192   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
193       long readPt)
194   throws IOException {
195     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
196     if (columns != null && scan.isRaw()) {
197       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
198     }
199     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
200         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
201         oldestUnexpiredTS, now, store.getCoprocessorHost());
202 
203     this.store.addChangedReaderObserver(this);
204 
205     // Pass columns to try to filter out unnecessary StoreFiles.
206     List<KeyValueScanner> scanners = getScannersNoCompaction();
207 
208     // Seek all scanners to the start of the Row (or if the exact matching row
209     // key does not exist, then to the start of the next matching Row).
210     // Always check bloom filter to optimize the top row seek for delete
211     // family marker.
212     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
213         && lazySeekEnabledGlobally, parallelSeekEnabled);
214 
215     // set storeLimit
216     this.storeLimit = scan.getMaxResultsPerColumnFamily();
217 
218     // set rowOffset
219     this.storeOffset = scan.getRowOffsetPerColumnFamily();
220     addCurrentScanners(scanners);
221     // Combine all seeked scanners with a heap
222     resetKVHeap(scanners, store.getComparator());
223   }
224 
225   /**
226    * Used for compactions.<p>
227    *
228    * Opens a scanner across specified StoreFiles.
229    * @param store who we scan
230    * @param scan the spec
231    * @param scanners ancillary scanners
232    * @param smallestReadPoint the readPoint that we should use for tracking
233    *          versions
234    */
235   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
236       List<? extends KeyValueScanner> scanners, ScanType scanType,
237       long smallestReadPoint, long earliestPutTs) throws IOException {
238     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
239   }
240 
241   /**
242    * Used for compactions that drop deletes from a limited range of rows.<p>
243    *
244    * Opens a scanner across specified StoreFiles.
245    * @param store who we scan
246    * @param scan the spec
247    * @param scanners ancillary scanners
248    * @param smallestReadPoint the readPoint that we should use for tracking versions
249    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
250    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
251    */
252   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
253       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
254       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
255     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
256         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
257   }
258 
259   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
260       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
261       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
262     this(store, scan, scanInfo, null,
263       ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
264     if (dropDeletesFromRow == null) {
265       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
266           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
267     } else {
268       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
269           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
270     }
271 
272     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
273     scanners = selectScannersFrom(scanners);
274 
275     // Seek all scanners to the initial key
276     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
277     addCurrentScanners(scanners);
278     // Combine all seeked scanners with a heap
279     resetKVHeap(scanners, store.getComparator());
280   }
281 
282   @VisibleForTesting
283   StoreScanner(final Scan scan, ScanInfo scanInfo,
284       ScanType scanType, final NavigableSet<byte[]> columns,
285       final List<KeyValueScanner> scanners) throws IOException {
286     this(scan, scanInfo, scanType, columns, scanners,
287         HConstants.LATEST_TIMESTAMP,
288         // 0 is passed as readpoint because the test bypasses Store
289         0);
290   }
291 
292   @VisibleForTesting
293   StoreScanner(final Scan scan, ScanInfo scanInfo,
294     ScanType scanType, final NavigableSet<byte[]> columns,
295     final List<KeyValueScanner> scanners, long earliestPutTs)
296         throws IOException {
297     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
298       // 0 is passed as readpoint because the test bypasses Store
299       0);
300   }
301 
302   public StoreScanner(final Scan scan, ScanInfo scanInfo,
303       ScanType scanType, final NavigableSet<byte[]> columns,
304       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
305   throws IOException {
306     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
307     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
308         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
309 
310     // In unit tests, the store could be null
311     if (this.store != null) {
312       this.store.addChangedReaderObserver(this);
313     }
314     // Seek all scanners to the initial key
315     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
316     addCurrentScanners(scanners);
317     resetKVHeap(scanners, scanInfo.getComparator());
318   }
319 
320   /**
321    * Get a filtered list of scanners. Assumes we are not in a compaction.
322    * @return list of scanners to seek
323    */
324   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
325     final boolean isCompaction = false;
326     boolean usePread = get || scanUsePread;
327     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
328         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
329   }
330 
331   /**
332    * Seek the specified scanners with the given key
333    * @param scanners
334    * @param seekKey
335    * @param isLazy true if using lazy seek
336    * @param isParallelSeek true if using parallel seek
337    * @throws IOException
338    */
339   protected void seekScanners(List<? extends KeyValueScanner> scanners,
340       Cell seekKey, boolean isLazy, boolean isParallelSeek)
341       throws IOException {
342     // Seek all scanners to the start of the Row (or if the exact matching row
343     // key does not exist, then to the start of the next matching Row).
344     // Always check bloom filter to optimize the top row seek for delete
345     // family marker.
346     if (isLazy) {
347       for (KeyValueScanner scanner : scanners) {
348         scanner.requestSeek(seekKey, false, true);
349       }
350     } else {
351       if (!isParallelSeek) {
352         long totalScannersSoughtBytes = 0;
353         for (KeyValueScanner scanner : scanners) {
354           if (totalScannersSoughtBytes >= maxRowSize) {
355             throw new RowTooBigException("Max row size allowed: " + maxRowSize
356               + ", but row is bigger than that");
357           }
358           scanner.seek(seekKey);
359           Cell c = scanner.peek();
360           if (c != null) {
361             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
362           }
363         }
364       } else {
365         parallelSeek(scanners, seekKey);
366       }
367     }
368   }
369 
370   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
371       CellComparator comparator) throws IOException {
372     // Combine all seeked scanners with a heap
373     heap = new KeyValueHeap(scanners, comparator);
374   }
375 
376   /**
377    * Filters the given list of scanners using Bloom filter, time range, and
378    * TTL.
379    */
380   protected List<KeyValueScanner> selectScannersFrom(
381       final List<? extends KeyValueScanner> allScanners) {
382     boolean memOnly;
383     boolean filesOnly;
384     if (scan instanceof InternalScan) {
385       InternalScan iscan = (InternalScan)scan;
386       memOnly = iscan.isCheckOnlyMemStore();
387       filesOnly = iscan.isCheckOnlyStoreFiles();
388     } else {
389       memOnly = false;
390       filesOnly = false;
391     }
392 
393     List<KeyValueScanner> scanners =
394         new ArrayList<KeyValueScanner>(allScanners.size());
395 
396     // We can only exclude store files based on TTL if minVersions is set to 0.
397     // Otherwise, we might have to return KVs that have technically expired.
398     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
399         Long.MIN_VALUE;
400 
401     // include only those scan files which pass all filters
402     for (KeyValueScanner kvs : allScanners) {
403       boolean isFile = kvs.isFileScanner();
404       if ((!isFile && filesOnly) || (isFile && memOnly)) {
405         continue;
406       }
407 
408       if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
409         scanners.add(kvs);
410       } else {
411         kvs.close();
412       }
413     }
414     return scanners;
415   }
416 
417   @Override
418   public Cell peek() {
419     if (this.heap == null) {
420       return this.lastTop;
421     }
422     return this.heap.peek();
423   }
424 
425   @Override
426   public KeyValue next() {
427     // throw runtime exception perhaps?
428     throw new RuntimeException("Never call StoreScanner.next()");
429   }
430 
431   @Override
432   public void close() {
433     close(true);
434   }
435 
436   private void close(boolean withHeapClose) {
437     if (this.closing) {
438       return;
439     }
440     if (withHeapClose) this.closing = true;
441     // Under test, we dont have a this.store
442     if (this.store != null) this.store.deleteChangedReaderObserver(this);
443     if (withHeapClose) {
444       for (KeyValueHeap h : this.heapsForDelayedClose) {
445         h.close();
446       }
447       this.heapsForDelayedClose.clear();
448       if (this.heap != null) {
449         this.heap.close();
450         this.currentScanners.clear();
451         this.heap = null; // CLOSED!
452       }
453     } else {
454       if (this.heap != null) {
455         this.heapsForDelayedClose.add(this.heap);
456         this.currentScanners.clear();
457         this.heap = null;
458       }
459     }
460     this.lastTop = null; // If both are null, we are closed.
461   }
462 
463   @Override
464   public boolean seek(Cell key) throws IOException {
465     boolean flushed = checkFlushed();
466     // reset matcher state, in case that underlying store changed
467     checkReseek(flushed);
468     return this.heap.seek(key);
469   }
470 
471   @Override
472   public boolean next(List<Cell> outResult) throws IOException {
473     return next(outResult, NoLimitScannerContext.getInstance());
474   }
475 
476   /**
477    * Get the next row of values from this Store.
478    * @param outResult
479    * @param scannerContext
480    * @return true if there are more rows, false if scanner is done
481    */
482   @Override
483   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
484     if (scannerContext == null) {
485       throw new IllegalArgumentException("Scanner context cannot be null");
486     }
487     boolean flushed = checkFlushed();
488     if (checkReseek(flushed)) {
489       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
490     }
491 
492     // if the heap was left null, then the scanners had previously run out anyways, close and
493     // return.
494     if (this.heap == null) {
495       // By this time partial close should happened because already heap is null
496       close(false);// Do all cleanup except heap.close()
497       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
498     }
499 
500     Cell cell = this.heap.peek();
501     if (cell == null) {
502       close(false);// Do all cleanup except heap.close()
503       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504     }
505 
506     // only call setRow if the row changes; avoids confusing the query matcher
507     // if scanning intra-row
508 
509     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
510     // rows. Else it is possible we are still traversing the same row so we must perform the row
511     // comparison.
512     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null) {
513       this.countPerRow = 0;
514       matcher.setToNewRow(cell);
515     }
516 
517     // Clear progress away unless invoker has indicated it should be kept.
518     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
519 
520     // Only do a sanity-check if store and comparator are available.
521     CellComparator comparator = store != null ? store.getComparator() : null;
522 
523     int count = 0;
524     long totalBytesRead = 0;
525 
526     LOOP: do {
527       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
528       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
529         scannerContext.updateTimeProgress();
530         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
531           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
532         }
533       }
534 
535       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
536       checkScanOrder(prevCell, cell, comparator);
537       prevCell = cell;
538       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
539       qcode = optimize(qcode, cell);
540       switch (qcode) {
541       case INCLUDE:
542       case INCLUDE_AND_SEEK_NEXT_ROW:
543       case INCLUDE_AND_SEEK_NEXT_COL:
544 
545         Filter f = matcher.getFilter();
546         if (f != null) {
547           cell = f.transformCell(cell);
548         }
549 
550         this.countPerRow++;
551         if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
552           // do what SEEK_NEXT_ROW does.
553           if (!matcher.moreRowsMayExistAfter(cell)) {
554             close(false);// Do all cleanup except heap.close()
555             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
556           }
557           matcher.curCell = null;
558           seekToNextRow(cell);
559           break LOOP;
560         }
561 
562         // add to results only if we have skipped #storeOffset kvs
563         // also update metric accordingly
564         if (this.countPerRow > storeOffset) {
565           outResult.add(cell);
566 
567           // Update local tracking information
568           count++;
569           totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
570 
571           // Update the progress of the scanner context
572           scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
573           scannerContext.incrementBatchProgress(1);
574 
575           if (totalBytesRead > maxRowSize) {
576             throw new RowTooBigException(
577                 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
578           }
579         }
580 
581         if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
582           if (!matcher.moreRowsMayExistAfter(cell)) {
583             close(false);// Do all cleanup except heap.close()
584             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
585           }
586           matcher.curCell = null;
587           seekToNextRow(cell);
588         } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
589           seekAsDirection(matcher.getKeyForNextColumn(cell));
590         } else {
591           this.heap.next();
592         }
593 
594         if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
595           break LOOP;
596         }
597         if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
598           break LOOP;
599         }
600         continue;
601 
602       case DONE:
603         matcher.curCell = null;
604         return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
605 
606       case DONE_SCAN:
607         close(false);// Do all cleanup except heap.close()
608         return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
609 
610       case SEEK_NEXT_ROW:
611         // This is just a relatively simple end of scan fix, to short-cut end
612         // us if there is an endKey in the scan.
613         if (!matcher.moreRowsMayExistAfter(cell)) {
614           close(false);// Do all cleanup except heap.close()
615           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
616         }
617         matcher.curCell = null;
618         seekToNextRow(cell);
619         break;
620 
621       case SEEK_NEXT_COL:
622         seekAsDirection(matcher.getKeyForNextColumn(cell));
623         break;
624 
625       case SKIP:
626         this.heap.next();
627         break;
628 
629       case SEEK_NEXT_USING_HINT:
630         Cell nextKV = matcher.getNextKeyHint(cell);
631         if (nextKV != null) {
632           seekAsDirection(nextKV);
633         } else {
634           heap.next();
635         }
636         break;
637 
638       default:
639         throw new RuntimeException("UNEXPECTED");
640       }
641     } while ((cell = this.heap.peek()) != null);
642 
643     if (count > 0) {
644       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
645     }
646 
647     // No more keys
648     close(false);// Do all cleanup except heap.close()
649     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
650   }
651 
652   /*
653    * See if we should actually SEEK or rather just SKIP to the next Cell.
654    * (see HBASE-13109)
655    */
656   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
657     switch(qcode) {
658     case INCLUDE_AND_SEEK_NEXT_COL:
659     case SEEK_NEXT_COL:
660     {
661       Cell nextIndexedKey = getNextIndexedKey();
662       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
663           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
664         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
665       }
666       break;
667     }
668     case INCLUDE_AND_SEEK_NEXT_ROW:
669     case SEEK_NEXT_ROW:
670     {
671       Cell nextIndexedKey = getNextIndexedKey();
672       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
673           && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
674         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
675       }
676       break;
677     }
678     default:
679       break;
680     }
681     return qcode;
682   }
683 
684   // Implementation of ChangedReadersObserver
685   @Override
686   public void updateReaders(List<StoreFile> sfs) throws IOException {
687     flushed = true;
688     flushLock.lock();
689     try {
690       flushedStoreFiles.addAll(sfs);
691     } finally {
692       flushLock.unlock();
693     }
694     // Let the next() call handle re-creating and seeking
695   }
696 
697   /**
698    * @param flushed indicates if there was a flush
699    * @return true if top of heap has changed (and KeyValueHeap has to try the
700    *         next KV)
701    * @throws IOException
702    */
703   protected boolean checkReseek(boolean flushed) throws IOException {
704     if (flushed && this.lastTop != null) {
705       resetScannerStack(this.lastTop);
706       if (this.heap.peek() == null
707           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
708         LOG.debug("Storescanner.peek() is changed where before = "
709             + this.lastTop.toString() + ",and after = " + this.heap.peek());
710         this.lastTop = null;
711         return true;
712       }
713       this.lastTop = null; // gone!
714     }
715     // else dont need to reseek
716     return false;
717   }
718 
719   protected void resetScannerStack(Cell lastTopKey) throws IOException {
720     /* When we have the scan object, should we not pass it to getScanners()
721      * to get a limited set of scanners? We did so in the constructor and we
722      * could have done it now by storing the scan object from the constructor
723      */
724 
725     final boolean isCompaction = false;
726     boolean usePread = get || scanUsePread;
727     List<KeyValueScanner> scanners = null;
728     try {
729       flushLock.lock();
730       scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
731         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
732       // Clear the current set of flushed store files so that they don't get added again
733       flushedStoreFiles.clear();
734     } finally {
735       flushLock.unlock();
736     }
737 
738     // Seek the new scanners to the last key
739     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
740     // remove the older memstore scanner
741     for (int i = 0; i < currentScanners.size(); i++) {
742       if (!currentScanners.get(i).isFileScanner()) {
743         currentScanners.remove(i);
744         break;
745       }
746     }
747     // add the newly created scanners on the flushed files and the current active memstore scanner
748     addCurrentScanners(scanners);
749     // Combine all seeked scanners with a heap
750     resetKVHeap(this.currentScanners, store.getComparator());
751     // Reset the state of the Query Matcher and set to top row.
752     // Only reset and call setRow if the row changes; avoids confusing the
753     // query matcher if scanning intra-row.
754     Cell cell = heap.peek();
755     if (cell == null) {
756       cell = lastTopKey;
757     }
758     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
759       this.countPerRow = 0;
760       // The setToNewRow will call reset internally
761       matcher.setToNewRow(cell);
762     }
763   }
764 
765   /**
766    * Check whether scan as expected order
767    * @param prevKV
768    * @param kv
769    * @param comparator
770    * @throws IOException
771    */
772   protected void checkScanOrder(Cell prevKV, Cell kv,
773       CellComparator comparator) throws IOException {
774     // Check that the heap gives us KVs in an increasing order.
775     assert prevKV == null || comparator == null
776         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
777         + " followed by a " + "smaller key " + kv + " in cf " + store;
778   }
779 
780   protected boolean seekToNextRow(Cell c) throws IOException {
781     return reseek(CellUtil.createLastOnRow(c));
782   }
783 
784   /**
785    * Do a reseek in a normal StoreScanner(scan forward)
786    * @param kv
787    * @return true if scanner has values left, false if end of scanner
788    * @throws IOException
789    */
790   protected boolean seekAsDirection(Cell kv)
791       throws IOException {
792     return reseek(kv);
793   }
794 
795   @Override
796   public boolean reseek(Cell kv) throws IOException {
797     boolean flushed = checkFlushed();
798     // Heap will not be null, if this is called from next() which.
799     // If called from RegionScanner.reseek(...) make sure the scanner
800     // stack is reset if needed.
801     checkReseek(flushed);
802     if (explicitColumnQuery && lazySeekEnabledGlobally) {
803       return heap.requestSeek(kv, true, useRowColBloom);
804     }
805     return heap.reseek(kv);
806   }
807 
808   protected boolean checkFlushed() {
809     // check the var without any lock. Suppose even if we see the old
810     // value here still it is ok to continue because we will not be resetting
811     // the heap but will continue with the referenced memstore's snapshot. For compactions
812     // any way we don't need the updateReaders at all to happen as we still continue with 
813     // the older files
814     if (flushed) {
815       // If there is a flush and the current scan is notified on the flush ensure that the 
816       // scan's heap gets reset and we do a seek on the newly flushed file.
817       if(!this.closing) {
818         this.lastTop = this.peek();
819       } else {
820         return false;
821       }
822       // reset the flag
823       flushed = false;
824       return true;
825     }
826     return false;
827   }
828 
829   @Override
830   public long getSequenceID() {
831     return 0;
832   }
833 
834   /**
835    * Seek storefiles in parallel to optimize IO latency as much as possible
836    * @param scanners the list {@link KeyValueScanner}s to be read from
837    * @param kv the KeyValue on which the operation is being requested
838    * @throws IOException
839    */
840   private void parallelSeek(final List<? extends KeyValueScanner>
841       scanners, final Cell kv) throws IOException {
842     if (scanners.isEmpty()) return;
843     int storeFileScannerCount = scanners.size();
844     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
845     List<ParallelSeekHandler> handlers = 
846         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
847     for (KeyValueScanner scanner : scanners) {
848       if (scanner instanceof StoreFileScanner) {
849         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
850           this.readPt, latch);
851         executor.submit(seekHandler);
852         handlers.add(seekHandler);
853       } else {
854         scanner.seek(kv);
855         latch.countDown();
856       }
857     }
858 
859     try {
860       latch.await();
861     } catch (InterruptedException ie) {
862       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
863     }
864 
865     for (ParallelSeekHandler handler : handlers) {
866       if (handler.getErr() != null) {
867         throw new IOException(handler.getErr());
868       }
869     }
870   }
871 
872   /**
873    * Used in testing.
874    * @return all scanners in no particular order
875    */
876   List<KeyValueScanner> getAllScannersForTesting() {
877     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
878     KeyValueScanner current = heap.getCurrentForTesting();
879     if (current != null)
880       allScanners.add(current);
881     for (KeyValueScanner scanner : heap.getHeap())
882       allScanners.add(scanner);
883     return allScanners;
884   }
885 
886   static void enableLazySeekGlobally(boolean enable) {
887     lazySeekEnabledGlobally = enable;
888   }
889 
890   /**
891    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
892    */
893   public long getEstimatedNumberOfKvsScanned() {
894     return this.kvsScanned;
895   }
896 
897   @Override
898   public Cell getNextIndexedKey() {
899     return this.heap.getNextIndexedKey();
900   }
901 
902   @Override
903   public void shipped() throws IOException {
904     for (KeyValueHeap h : this.heapsForDelayedClose) {
905       h.close();// There wont be further fetch of Cells from these scanners. Just close.
906     }
907     this.heapsForDelayedClose.clear();
908     if (this.heap != null) {
909       this.heap.shipped();
910     }
911   }
912 }
913