View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellComparator;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.DoNotRetryIOException;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    // In unit tests, the store could be null
62    protected final Store store;
63    protected ScanQueryMatcher matcher;
64    protected KeyValueHeap heap;
65    protected boolean cacheBlocks;
66  
67    protected long countPerRow = 0;
68    protected int storeLimit = -1;
69    protected int storeOffset = 0;
70  
71    // Used to indicate that the scanner has closed (see HBASE-1107)
72    // Doesnt need to be volatile because it's always accessed via synchronized methods
73    protected boolean closing = false;
74    protected final boolean get;
75    protected final boolean explicitColumnQuery;
76    protected final boolean useRowColBloom;
77    /**
78     * A flag that enables StoreFileScanner parallel-seeking
79     */
80    protected boolean parallelSeekEnabled = false;
81    protected ExecutorService executor;
82    protected final Scan scan;
83    protected final NavigableSet<byte[]> columns;
84    protected final long oldestUnexpiredTS;
85    protected final long now;
86    protected final int minVersions;
87    protected final long maxRowSize;
88    protected final long cellsPerHeartbeatCheck;
89  
90    // Collects all the KVHeap that are eagerly getting closed during the
91    // course of a scan
92    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
93  
94    /**
95     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
96     * KVs skipped via seeking to next row/column. TODO: estimate them?
97     */
98    private long kvsScanned = 0;
99    private Cell prevCell = null;
100 
101   /** We don't ever expect to change this, the constant is just for clarity. */
102   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
103   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
104       "hbase.storescanner.parallel.seek.enable";
105 
106   /** Used during unit testing to ensure that lazy seek does save seek ops */
107   protected static boolean lazySeekEnabledGlobally =
108       LAZY_SEEK_ENABLED_BY_DEFAULT;
109 
110   /**
111    * The number of cells scanned in between timeout checks. Specifying a larger value means that
112    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
113    * timeout checks.
114    */
115   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
116       "hbase.cells.scanned.per.heartbeat.check";
117 
118   /**
119    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
120    */
121   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
122 
123   // if heap == null and lastTop != null, you need to reseek given the key below
124   protected Cell lastTop = null;
125 
126   // A flag whether use pread for scan
127   private boolean scanUsePread = false;
128   // Indicates whether there was flush during the course of the scan
129   protected volatile boolean flushed = false;
130   // generally we get one file from a flush
131   protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
132   // The current list of scanners
133   protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
134   // flush update lock
135   private ReentrantLock flushLock = new ReentrantLock();
136 
137   protected final long readPt;
138 
139   // used by the injection framework to test race between StoreScanner construction and compaction
140   enum StoreScannerCompactionRace {
141     BEFORE_SEEK,
142     AFTER_SEEK,
143     COMPACT_COMPLETE
144   }
145 
146   /** An internal constructor. */
147   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
148       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
149     this.readPt = readPt;
150     this.store = store;
151     this.cacheBlocks = cacheBlocks;
152     get = scan.isGetScan();
153     int numCol = columns == null ? 0 : columns.size();
154     explicitColumnQuery = numCol > 0;
155     this.scan = scan;
156     this.columns = columns;
157     this.now = EnvironmentEdgeManager.currentTime();
158     this.oldestUnexpiredTS = now - scanInfo.getTtl();
159     this.minVersions = scanInfo.getMinVersions();
160 
161      // We look up row-column Bloom filters for multi-column queries as part of
162      // the seek operation. However, we also look the row-column Bloom filter
163      // for multi-row (non-"get") scans because this is not done in
164      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
165      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
166 
167      this.maxRowSize = scanInfo.getTableMaxRowSize();
168      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
169      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
170      // Parallel seeking is on if the config allows and more there is more than one store file.
171      if (this.store != null && this.store.getStorefilesCount() > 1) {
172        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
173        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
174          this.parallelSeekEnabled = true;
175          this.executor = rsService.getExecutorService();
176        }
177      }
178   }
179 
180   protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
181     this.currentScanners.addAll(scanners);
182   }
183   /**
184    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
185    * are not in a compaction.
186    *
187    * @param store who we scan
188    * @param scan the spec
189    * @param columns which columns we are scanning
190    * @throws IOException
191    */
192   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
193       long readPt)
194   throws IOException {
195     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
196     if (columns != null && scan.isRaw()) {
197       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
198     }
199     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
200         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
201         oldestUnexpiredTS, now, store.getCoprocessorHost());
202 
203     this.store.addChangedReaderObserver(this);
204 
205     // Pass columns to try to filter out unnecessary StoreFiles.
206     List<KeyValueScanner> scanners = getScannersNoCompaction();
207 
208     // Seek all scanners to the start of the Row (or if the exact matching row
209     // key does not exist, then to the start of the next matching Row).
210     // Always check bloom filter to optimize the top row seek for delete
211     // family marker.
212     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
213         && lazySeekEnabledGlobally, parallelSeekEnabled);
214 
215     // set storeLimit
216     this.storeLimit = scan.getMaxResultsPerColumnFamily();
217 
218     // set rowOffset
219     this.storeOffset = scan.getRowOffsetPerColumnFamily();
220     addCurrentScanners(scanners);
221     // Combine all seeked scanners with a heap
222     resetKVHeap(scanners, store.getComparator());
223   }
224 
225   /**
226    * Used for compactions.<p>
227    *
228    * Opens a scanner across specified StoreFiles.
229    * @param store who we scan
230    * @param scan the spec
231    * @param scanners ancillary scanners
232    * @param smallestReadPoint the readPoint that we should use for tracking
233    *          versions
234    */
235   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
236       List<? extends KeyValueScanner> scanners, ScanType scanType,
237       long smallestReadPoint, long earliestPutTs) throws IOException {
238     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
239   }
240 
241   /**
242    * Used for compactions that drop deletes from a limited range of rows.<p>
243    *
244    * Opens a scanner across specified StoreFiles.
245    * @param store who we scan
246    * @param scan the spec
247    * @param scanners ancillary scanners
248    * @param smallestReadPoint the readPoint that we should use for tracking versions
249    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
250    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
251    */
252   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
253       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
254       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
255     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
256         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
257   }
258 
259   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
260       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
261       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
262     this(store, scan, scanInfo, null,
263       ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
264     if (dropDeletesFromRow == null) {
265       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
266           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
267     } else {
268       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
269           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
270     }
271 
272     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
273     scanners = selectScannersFrom(scanners);
274 
275     // Seek all scanners to the initial key
276     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
277     addCurrentScanners(scanners);
278     // Combine all seeked scanners with a heap
279     resetKVHeap(scanners, store.getComparator());
280   }
281 
282   @VisibleForTesting
283   StoreScanner(final Scan scan, ScanInfo scanInfo,
284       ScanType scanType, final NavigableSet<byte[]> columns,
285       final List<KeyValueScanner> scanners) throws IOException {
286     this(scan, scanInfo, scanType, columns, scanners,
287         HConstants.LATEST_TIMESTAMP,
288         // 0 is passed as readpoint because the test bypasses Store
289         0);
290   }
291 
292   @VisibleForTesting
293   StoreScanner(final Scan scan, ScanInfo scanInfo,
294     ScanType scanType, final NavigableSet<byte[]> columns,
295     final List<KeyValueScanner> scanners, long earliestPutTs)
296         throws IOException {
297     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
298       // 0 is passed as readpoint because the test bypasses Store
299       0);
300   }
301 
302   public StoreScanner(final Scan scan, ScanInfo scanInfo,
303       ScanType scanType, final NavigableSet<byte[]> columns,
304       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
305   throws IOException {
306     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
307     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
308         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
309 
310     // In unit tests, the store could be null
311     if (this.store != null) {
312       this.store.addChangedReaderObserver(this);
313     }
314     // Seek all scanners to the initial key
315     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
316     addCurrentScanners(scanners);
317     resetKVHeap(scanners, scanInfo.getComparator());
318   }
319 
320   /**
321    * Get a filtered list of scanners. Assumes we are not in a compaction.
322    * @return list of scanners to seek
323    */
324   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
325     final boolean isCompaction = false;
326     boolean usePread = get || scanUsePread;
327     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
328         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
329   }
330 
331   /**
332    * Seek the specified scanners with the given key
333    * @param scanners
334    * @param seekKey
335    * @param isLazy true if using lazy seek
336    * @param isParallelSeek true if using parallel seek
337    * @throws IOException
338    */
339   protected void seekScanners(List<? extends KeyValueScanner> scanners,
340       Cell seekKey, boolean isLazy, boolean isParallelSeek)
341       throws IOException {
342     // Seek all scanners to the start of the Row (or if the exact matching row
343     // key does not exist, then to the start of the next matching Row).
344     // Always check bloom filter to optimize the top row seek for delete
345     // family marker.
346     if (isLazy) {
347       for (KeyValueScanner scanner : scanners) {
348         scanner.requestSeek(seekKey, false, true);
349       }
350     } else {
351       if (!isParallelSeek) {
352         long totalScannersSoughtBytes = 0;
353         for (KeyValueScanner scanner : scanners) {
354           if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
355             throw new RowTooBigException("Max row size allowed: " + maxRowSize
356               + ", but row is bigger than that");
357           }
358           scanner.seek(seekKey);
359           Cell c = scanner.peek();
360           if (c != null) {
361             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
362           }
363         }
364       } else {
365         parallelSeek(scanners, seekKey);
366       }
367     }
368   }
369 
370   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
371       CellComparator comparator) throws IOException {
372     // Combine all seeked scanners with a heap
373     heap = new KeyValueHeap(scanners, comparator);
374   }
375 
376   /**
377    * Filters the given list of scanners using Bloom filter, time range, and
378    * TTL.
379    */
380   protected List<KeyValueScanner> selectScannersFrom(
381       final List<? extends KeyValueScanner> allScanners) {
382     boolean memOnly;
383     boolean filesOnly;
384     if (scan instanceof InternalScan) {
385       InternalScan iscan = (InternalScan)scan;
386       memOnly = iscan.isCheckOnlyMemStore();
387       filesOnly = iscan.isCheckOnlyStoreFiles();
388     } else {
389       memOnly = false;
390       filesOnly = false;
391     }
392 
393     List<KeyValueScanner> scanners =
394         new ArrayList<KeyValueScanner>(allScanners.size());
395 
396     // We can only exclude store files based on TTL if minVersions is set to 0.
397     // Otherwise, we might have to return KVs that have technically expired.
398     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
399         Long.MIN_VALUE;
400 
401     // include only those scan files which pass all filters
402     for (KeyValueScanner kvs : allScanners) {
403       boolean isFile = kvs.isFileScanner();
404       if ((!isFile && filesOnly) || (isFile && memOnly)) {
405         continue;
406       }
407 
408       if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
409         scanners.add(kvs);
410       } else {
411         kvs.close();
412       }
413     }
414     return scanners;
415   }
416 
417   @Override
418   public Cell peek() {
419     if (this.heap == null) {
420       return this.lastTop;
421     }
422     return this.heap.peek();
423   }
424 
425   @Override
426   public KeyValue next() {
427     // throw runtime exception perhaps?
428     throw new RuntimeException("Never call StoreScanner.next()");
429   }
430 
431   @Override
432   public void close() {
433     close(true);
434   }
435 
436   private void close(boolean withHeapClose) {
437     if (this.closing) {
438       return;
439     }
440     if (withHeapClose) this.closing = true;
441     // Under test, we dont have a this.store
442     if (this.store != null) this.store.deleteChangedReaderObserver(this);
443     if (withHeapClose) {
444       for (KeyValueHeap h : this.heapsForDelayedClose) {
445         h.close();
446       }
447       this.heapsForDelayedClose.clear();
448       if (this.heap != null) {
449         this.heap.close();
450         this.currentScanners.clear();
451         this.heap = null; // CLOSED!
452       }
453     } else {
454       if (this.heap != null) {
455         this.heapsForDelayedClose.add(this.heap);
456         this.currentScanners.clear();
457         this.heap = null;
458       }
459     }
460     this.lastTop = null; // If both are null, we are closed.
461   }
462 
463   @Override
464   public boolean seek(Cell key) throws IOException {
465     boolean flushed = checkFlushed();
466     // reset matcher state, in case that underlying store changed
467     checkReseek(flushed);
468     return this.heap.seek(key);
469   }
470 
471   @Override
472   public boolean next(List<Cell> outResult) throws IOException {
473     return next(outResult, NoLimitScannerContext.getInstance());
474   }
475 
476   /**
477    * Get the next row of values from this Store.
478    * @param outResult
479    * @param scannerContext
480    * @return true if there are more rows, false if scanner is done
481    */
482   @Override
483   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
484     if (scannerContext == null) {
485       throw new IllegalArgumentException("Scanner context cannot be null");
486     }
487     boolean flushed = checkFlushed();
488     if (checkReseek(flushed)) {
489       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
490     }
491 
492     // if the heap was left null, then the scanners had previously run out anyways, close and
493     // return.
494     if (this.heap == null) {
495       // By this time partial close should happened because already heap is null
496       close(false);// Do all cleanup except heap.close()
497       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
498     }
499 
500     Cell cell = this.heap.peek();
501     if (cell == null) {
502       close(false);// Do all cleanup except heap.close()
503       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504     }
505 
506     // only call setRow if the row changes; avoids confusing the query matcher
507     // if scanning intra-row
508 
509     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
510     // rows. Else it is possible we are still traversing the same row so we must perform the row
511     // comparison.
512     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null) {
513       this.countPerRow = 0;
514       matcher.setToNewRow(cell);
515     }
516 
517     // Clear progress away unless invoker has indicated it should be kept.
518     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
519 
520     // Only do a sanity-check if store and comparator are available.
521     CellComparator comparator = store != null ? store.getComparator() : null;
522 
523     int count = 0;
524     long totalBytesRead = 0;
525 
526     LOOP: do {
527       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
528       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
529         scannerContext.updateTimeProgress();
530         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
531           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
532         }
533       }
534 
535       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
536       checkScanOrder(prevCell, cell, comparator);
537       prevCell = cell;
538       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
539       qcode = optimize(qcode, cell);
540       switch (qcode) {
541       case INCLUDE:
542       case INCLUDE_AND_SEEK_NEXT_ROW:
543       case INCLUDE_AND_SEEK_NEXT_COL:
544 
545         Filter f = matcher.getFilter();
546         if (f != null) {
547           cell = f.transformCell(cell);
548         }
549 
550         this.countPerRow++;
551         if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
552           // do what SEEK_NEXT_ROW does.
553           if (!matcher.moreRowsMayExistAfter(cell)) {
554             close(false);// Do all cleanup except heap.close()
555             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
556           }
557           matcher.curCell = null;
558           seekToNextRow(cell);
559           break LOOP;
560         }
561 
562         // add to results only if we have skipped #storeOffset kvs
563         // also update metric accordingly
564         if (this.countPerRow > storeOffset) {
565           outResult.add(cell);
566 
567           // Update local tracking information
568           count++;
569           totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
570 
571           // Update the progress of the scanner context
572           scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
573           scannerContext.incrementBatchProgress(1);
574 
575           if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
576             throw new RowTooBigException(
577                 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
578           }
579         }
580 
581         if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
582           if (!matcher.moreRowsMayExistAfter(cell)) {
583             close(false);// Do all cleanup except heap.close()
584             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
585           }
586           matcher.curCell = null;
587           seekToNextRow(cell);
588         } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
589           seekAsDirection(matcher.getKeyForNextColumn(cell));
590         } else {
591           this.heap.next();
592         }
593 
594         if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
595           break LOOP;
596         }
597         if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
598           break LOOP;
599         }
600         continue;
601 
602       case DONE:
603         // Optimization for Gets! If DONE, no more to get on this row, early exit!
604         if (this.scan.isGetScan()) {
605           // Then no more to this row... exit.
606           close(false);// Do all cleanup except heap.close()
607           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
608         }
609         matcher.curCell = null;
610         return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
611 
612       case DONE_SCAN:
613         close(false);// Do all cleanup except heap.close()
614         return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
615 
616       case SEEK_NEXT_ROW:
617         // This is just a relatively simple end of scan fix, to short-cut end
618         // us if there is an endKey in the scan.
619         if (!matcher.moreRowsMayExistAfter(cell)) {
620           close(false);// Do all cleanup except heap.close()
621           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
622         }
623         matcher.curCell = null;
624         seekToNextRow(cell);
625         break;
626 
627       case SEEK_NEXT_COL:
628         seekAsDirection(matcher.getKeyForNextColumn(cell));
629         break;
630 
631       case SKIP:
632         this.heap.next();
633         break;
634 
635       case SEEK_NEXT_USING_HINT:
636         Cell nextKV = matcher.getNextKeyHint(cell);
637         if (nextKV != null) {
638           seekAsDirection(nextKV);
639         } else {
640           heap.next();
641         }
642         break;
643 
644       default:
645         throw new RuntimeException("UNEXPECTED");
646       }
647     } while ((cell = this.heap.peek()) != null);
648 
649     if (count > 0) {
650       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
651     }
652 
653     // No more keys
654     close(false);// Do all cleanup except heap.close()
655     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
656   }
657 
658   /**
659    * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
660    * This method works together with ColumnTrackers and Filters. ColumnTrackers may issue SEEK
661    * hints, such as seek to next column, next row, or seek to an arbitrary seek key.
662    * This method intercepts these qcodes and decides whether a seek is the most efficient _actual_
663    * way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
664    * current, loaded block).
665    * It does this by looking at the next indexed key of the current HFile. This key
666    * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
667    * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
668    * the current Cell but compare as though it were a seek key; see down in
669    * matcher.compareKeyForNextRow, etc). If the compare gets us onto the
670    * next block we *_SEEK, otherwise we just INCLUDE or SKIP, and let the ColumnTrackers or Filters
671    * go through the next Cell, and so on)
672    *
673    * <p>The ColumnTrackers and Filters must behave correctly in all cases, i.e. if they are past the
674    * Cells they care about they must issues a SKIP or SEEK.
675    *
676    * <p>Other notes:
677    * <ul>
678    * <li>Rows can straddle block boundaries</li>
679    * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
680    * different block than column C1 at T2)</li>
681    * <li>We want to SKIP and INCLUDE if the chance is high that we'll find the desired Cell after a
682    * few SKIPs...</li>
683    * <li>We want to INCLUDE_AND_SEEK and SEEK when the chance is high that we'll be able to seek
684    * past many Cells, especially if we know we need to go to the next block.</li>
685    * </ul>
686    * <p>A good proxy (best effort) to determine whether INCLUDE/SKIP is better than SEEK is whether
687    * we'll likely end up seeking to the next block (or past the next block) to get our next column.
688    * Example:
689    * <pre>
690    * |    BLOCK 1              |     BLOCK 2                   |
691    * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
692    *                                   ^         ^
693    *                                   |         |
694    *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
695    *
696    *
697    * |    BLOCK 1                       |     BLOCK 2                      |
698    * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
699    *                                            ^              ^
700    *                                            |              |
701    *                                    Next Index Key        SEEK_NEXT_COL
702    * </pre>
703    * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
704    * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
705    * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
706    * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
707    * where the SEEK will not land us in the next block, it is very likely better to issues a series
708    * of SKIPs.
709    */
710   @VisibleForTesting
711   protected ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
712     switch(qcode) {
713     case INCLUDE_AND_SEEK_NEXT_COL:
714     case SEEK_NEXT_COL:
715     {
716       Cell nextIndexedKey = getNextIndexedKey();
717       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
718           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
719         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
720       }
721       break;
722     }
723     case INCLUDE_AND_SEEK_NEXT_ROW:
724     case SEEK_NEXT_ROW:
725     {
726       // If it is a Get Scan, then we know that we are done with this row; there are no more
727       // rows beyond the current one: don't try to optimize. We are DONE. Return the *_NEXT_ROW
728       // qcode as is. When the caller gets these flags on a Get Scan, it knows it can shut down the
729       // Scan.
730       if (!this.scan.isGetScan()) {
731         Cell nextIndexedKey = getNextIndexedKey();
732         if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
733             && matcher.compareKeyForNextRow(nextIndexedKey, cell) > 0) {
734           return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
735         }
736       }
737       break;
738     }
739     default:
740       break;
741     }
742     return qcode;
743   }
744 
745   // Implementation of ChangedReadersObserver
746   @Override
747   public void updateReaders(List<StoreFile> sfs) throws IOException {
748     flushed = true;
749     flushLock.lock();
750     try {
751       flushedStoreFiles.addAll(sfs);
752     } finally {
753       flushLock.unlock();
754     }
755     // Let the next() call handle re-creating and seeking
756   }
757 
758   /**
759    * @param flushed indicates if there was a flush
760    * @return true if top of heap has changed (and KeyValueHeap has to try the
761    *         next KV)
762    * @throws IOException
763    */
764   protected boolean checkReseek(boolean flushed) throws IOException {
765     if (flushed && this.lastTop != null) {
766       resetScannerStack(this.lastTop);
767       if (this.heap.peek() == null
768           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
769         LOG.debug("Storescanner.peek() is changed where before = "
770             + this.lastTop.toString() + ",and after = " + this.heap.peek());
771         this.lastTop = null;
772         return true;
773       }
774       this.lastTop = null; // gone!
775     }
776     // else dont need to reseek
777     return false;
778   }
779 
780   protected void resetScannerStack(Cell lastTopKey) throws IOException {
781     /* When we have the scan object, should we not pass it to getScanners()
782      * to get a limited set of scanners? We did so in the constructor and we
783      * could have done it now by storing the scan object from the constructor
784      */
785 
786     final boolean isCompaction = false;
787     boolean usePread = get || scanUsePread;
788     List<KeyValueScanner> scanners = null;
789     try {
790       flushLock.lock();
791       scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
792         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
793       // Clear the current set of flushed store files so that they don't get added again
794       flushedStoreFiles.clear();
795     } finally {
796       flushLock.unlock();
797     }
798 
799     // Seek the new scanners to the last key
800     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
801     // remove the older memstore scanner
802     for (int i = 0; i < currentScanners.size(); i++) {
803       if (!currentScanners.get(i).isFileScanner()) {
804         currentScanners.remove(i);
805         break;
806       }
807     }
808     // add the newly created scanners on the flushed files and the current active memstore scanner
809     addCurrentScanners(scanners);
810     // Combine all seeked scanners with a heap
811     resetKVHeap(this.currentScanners, store.getComparator());
812     // Reset the state of the Query Matcher and set to top row.
813     // Only reset and call setRow if the row changes; avoids confusing the
814     // query matcher if scanning intra-row.
815     Cell cell = heap.peek();
816     if (cell == null) {
817       cell = lastTopKey;
818     }
819     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
820       this.countPerRow = 0;
821       // The setToNewRow will call reset internally
822       matcher.setToNewRow(cell);
823     }
824   }
825 
826   /**
827    * Check whether scan as expected order
828    * @param prevKV
829    * @param kv
830    * @param comparator
831    * @throws IOException
832    */
833   protected void checkScanOrder(Cell prevKV, Cell kv,
834       CellComparator comparator) throws IOException {
835     // Check that the heap gives us KVs in an increasing order.
836     assert prevKV == null || comparator == null
837         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
838         + " followed by a " + "smaller key " + kv + " in cf " + store;
839   }
840 
841   protected boolean seekToNextRow(Cell c) throws IOException {
842     return reseek(CellUtil.createLastOnRow(c));
843   }
844 
845   /**
846    * Do a reseek in a normal StoreScanner(scan forward)
847    * @param kv
848    * @return true if scanner has values left, false if end of scanner
849    * @throws IOException
850    */
851   protected boolean seekAsDirection(Cell kv)
852       throws IOException {
853     return reseek(kv);
854   }
855 
856   @Override
857   public boolean reseek(Cell kv) throws IOException {
858     boolean flushed = checkFlushed();
859     // Heap will not be null, if this is called from next() which.
860     // If called from RegionScanner.reseek(...) make sure the scanner
861     // stack is reset if needed.
862     checkReseek(flushed);
863     if (explicitColumnQuery && lazySeekEnabledGlobally) {
864       return heap.requestSeek(kv, true, useRowColBloom);
865     }
866     return heap.reseek(kv);
867   }
868 
869   protected boolean checkFlushed() {
870     // check the var without any lock. Suppose even if we see the old
871     // value here still it is ok to continue because we will not be resetting
872     // the heap but will continue with the referenced memstore's snapshot. For compactions
873     // any way we don't need the updateReaders at all to happen as we still continue with
874     // the older files
875     if (flushed) {
876       // If there is a flush and the current scan is notified on the flush ensure that the
877       // scan's heap gets reset and we do a seek on the newly flushed file.
878       if(!this.closing) {
879         this.lastTop = this.peek();
880       } else {
881         return false;
882       }
883       // reset the flag
884       flushed = false;
885       return true;
886     }
887     return false;
888   }
889 
890   /**
891    * @see KeyValueScanner#getScannerOrder()
892    */
893   @Override
894   public long getScannerOrder() {
895     return 0;
896   }
897
898   /**
899    * Seek storefiles in parallel to optimize IO latency as much as possible
900    * @param scanners the list {@link KeyValueScanner}s to be read from
901    * @param kv the KeyValue on which the operation is being requested
902    * @throws IOException
903    */
904   private void parallelSeek(final List<? extends KeyValueScanner>
905       scanners, final Cell kv) throws IOException {
906     if (scanners.isEmpty()) return;
907     int storeFileScannerCount = scanners.size();
908     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
909     List<ParallelSeekHandler> handlers =
910         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
911     for (KeyValueScanner scanner : scanners) {
912       if (scanner instanceof StoreFileScanner) {
913         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
914           this.readPt, latch);
915         executor.submit(seekHandler);
916         handlers.add(seekHandler);
917       } else {
918         scanner.seek(kv);
919         latch.countDown();
920       }
921     }
922
923     try {
924       latch.await();
925     } catch (InterruptedException ie) {
926       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
927     }
928
929     for (ParallelSeekHandler handler : handlers) {
930       if (handler.getErr() != null) {
931         throw new IOException(handler.getErr());
932       }
933     }
934   }
935
936   /**
937    * Used in testing.
938    * @return all scanners in no particular order
939    */
940   List<KeyValueScanner> getAllScannersForTesting() {
941     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
942     KeyValueScanner current = heap.getCurrentForTesting();
943     if (current != null)
944       allScanners.add(current);
945     for (KeyValueScanner scanner : heap.getHeap())
946       allScanners.add(scanner);
947     return allScanners;
948   }
949
950   static void enableLazySeekGlobally(boolean enable) {
951     lazySeekEnabledGlobally = enable;
952   }
953
954   /**
955    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
956    */
957   public long getEstimatedNumberOfKvsScanned() {
958     return this.kvsScanned;
959   }
960
961   @Override
962   public Cell getNextIndexedKey() {
963     return this.heap.getNextIndexedKey();
964   }
965
966   @Override
967   public void shipped() throws IOException {
968     for (KeyValueHeap h : this.heapsForDelayedClose) {
969       h.close();// There wont be further fetch of Cells from these scanners. Just close.
970     }
971     this.heapsForDelayedClose.clear();
972     if (this.heap != null) {
973       this.heap.shipped();
974     }
975   }
976 }
977