View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellComparator;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.DoNotRetryIOException;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    // In unit tests, the store could be null
62    protected final Store store;
63    protected ScanQueryMatcher matcher;
64    protected KeyValueHeap heap;
65    protected boolean cacheBlocks;
66  
67    protected long countPerRow = 0;
68    protected int storeLimit = -1;
69    protected int storeOffset = 0;
70  
71    // Used to indicate that the scanner has closed (see HBASE-1107)
72    // Doesnt need to be volatile because it's always accessed via synchronized methods
73    protected boolean closing = false;
74    protected final boolean get;
75    protected final boolean explicitColumnQuery;
76    protected final boolean useRowColBloom;
77    /**
78     * A flag that enables StoreFileScanner parallel-seeking
79     */
80    protected boolean parallelSeekEnabled = false;
81    protected ExecutorService executor;
82    protected final Scan scan;
83    protected final NavigableSet<byte[]> columns;
84    protected final long oldestUnexpiredTS;
85    protected final long now;
86    protected final int minVersions;
87    protected final long maxRowSize;
88    protected final long cellsPerHeartbeatCheck;
89  
90    // Collects all the KVHeap that are eagerly getting closed during the
91    // course of a scan
92    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
93  
94    /**
95     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
96     * KVs skipped via seeking to next row/column. TODO: estimate them?
97     */
98    private long kvsScanned = 0;
99    private Cell prevCell = null;
100 
101   /** We don't ever expect to change this, the constant is just for clarity. */
102   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
103   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
104       "hbase.storescanner.parallel.seek.enable";
105 
106   /** Used during unit testing to ensure that lazy seek does save seek ops */
107   protected static boolean lazySeekEnabledGlobally =
108       LAZY_SEEK_ENABLED_BY_DEFAULT;
109 
110   /**
111    * The number of cells scanned in between timeout checks. Specifying a larger value means that
112    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
113    * timeout checks.
114    */
115   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
116       "hbase.cells.scanned.per.heartbeat.check";
117 
118   /**
119    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
120    */
121   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
122 
123   // if heap == null and lastTop != null, you need to reseek given the key below
124   protected Cell lastTop = null;
125 
126   // A flag whether use pread for scan
127   private boolean scanUsePread = false;
128   // Indicates whether there was flush during the course of the scan
129   protected volatile boolean flushed = false;
130   // generally we get one file from a flush
131   protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
132   // The current list of scanners
133   protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
134   // flush update lock
135   private ReentrantLock flushLock = new ReentrantLock();
136 
137   protected final long readPt;
138 
139   // used by the injection framework to test race between StoreScanner construction and compaction
140   enum StoreScannerCompactionRace {
141     BEFORE_SEEK,
142     AFTER_SEEK,
143     COMPACT_COMPLETE
144   }
145 
146   /** An internal constructor. */
147   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
148       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
149     this.readPt = readPt;
150     this.store = store;
151     this.cacheBlocks = cacheBlocks;
152     get = scan.isGetScan();
153     int numCol = columns == null ? 0 : columns.size();
154     explicitColumnQuery = numCol > 0;
155     this.scan = scan;
156     this.columns = columns;
157     this.now = EnvironmentEdgeManager.currentTime();
158     this.oldestUnexpiredTS = now - scanInfo.getTtl();
159     this.minVersions = scanInfo.getMinVersions();
160 
161      // We look up row-column Bloom filters for multi-column queries as part of
162      // the seek operation. However, we also look the row-column Bloom filter
163      // for multi-row (non-"get") scans because this is not done in
164      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
165      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
166 
167      this.maxRowSize = scanInfo.getTableMaxRowSize();
168      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
169      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
170      // Parallel seeking is on if the config allows and more there is more than one store file.
171      if (this.store != null && this.store.getStorefilesCount() > 1) {
172        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
173        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
174          this.parallelSeekEnabled = true;
175          this.executor = rsService.getExecutorService();
176        }
177      }
178   }
179 
180   protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
181     this.currentScanners.addAll(scanners);
182   }
183   /**
184    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
185    * are not in a compaction.
186    *
187    * @param store who we scan
188    * @param scan the spec
189    * @param columns which columns we are scanning
190    * @throws IOException
191    */
192   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
193       long readPt)
194   throws IOException {
195     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
196     if (columns != null && scan.isRaw()) {
197       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
198     }
199     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
200         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
201         oldestUnexpiredTS, now, store.getCoprocessorHost());
202 
203     this.store.addChangedReaderObserver(this);
204 
205     try {
206       // Pass columns to try to filter out unnecessary StoreFiles.
207       List<KeyValueScanner> scanners = getScannersNoCompaction();
208
209       // Seek all scanners to the start of the Row (or if the exact matching row
210       // key does not exist, then to the start of the next matching Row).
211       // Always check bloom filter to optimize the top row seek for delete
212       // family marker.
213       seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
214         parallelSeekEnabled);
215
216       // set storeLimit
217       this.storeLimit = scan.getMaxResultsPerColumnFamily();
218
219       // set rowOffset
220       this.storeOffset = scan.getRowOffsetPerColumnFamily();
221       addCurrentScanners(scanners);
222       // Combine all seeked scanners with a heap
223       resetKVHeap(scanners, store.getComparator());
224     } catch (IOException e) {
225       // remove us from the HStore#changedReaderObservers here or we'll have no chance to
226       // and might cause memory leak
227       this.store.deleteChangedReaderObserver(this);
228       throw e;
229     }
230   }
231
232   /**
233    * Used for compactions.<p>
234    *
235    * Opens a scanner across specified StoreFiles.
236    * @param store who we scan
237    * @param scan the spec
238    * @param scanners ancillary scanners
239    * @param smallestReadPoint the readPoint that we should use for tracking
240    *          versions
241    */
242   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
243       List<? extends KeyValueScanner> scanners, ScanType scanType,
244       long smallestReadPoint, long earliestPutTs) throws IOException {
245     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
246   }
247
248   /**
249    * Used for compactions that drop deletes from a limited range of rows.<p>
250    *
251    * Opens a scanner across specified StoreFiles.
252    * @param store who we scan
253    * @param scan the spec
254    * @param scanners ancillary scanners
255    * @param smallestReadPoint the readPoint that we should use for tracking versions
256    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
257    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
258    */
259   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
260       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
261       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
262     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
263         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
264   }
265
266   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
267       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
268       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
269     this(store, scan, scanInfo, null,
270       ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
271     if (dropDeletesFromRow == null) {
272       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
273           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
274     } else {
275       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
276           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
277     }
278
279     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
280     scanners = selectScannersFrom(scanners);
281 
282     // Seek all scanners to the initial key
283     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
284     addCurrentScanners(scanners);
285     // Combine all seeked scanners with a heap
286     resetKVHeap(scanners, store.getComparator());
287   }
288
289   @VisibleForTesting
290   StoreScanner(final Scan scan, ScanInfo scanInfo,
291       ScanType scanType, final NavigableSet<byte[]> columns,
292       final List<KeyValueScanner> scanners) throws IOException {
293     this(scan, scanInfo, scanType, columns, scanners,
294         HConstants.LATEST_TIMESTAMP,
295         // 0 is passed as readpoint because the test bypasses Store
296         0);
297   }
298
299   @VisibleForTesting
300   StoreScanner(final Scan scan, ScanInfo scanInfo,
301     ScanType scanType, final NavigableSet<byte[]> columns,
302     final List<KeyValueScanner> scanners, long earliestPutTs)
303         throws IOException {
304     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
305       // 0 is passed as readpoint because the test bypasses Store
306       0);
307   }
308
309   public StoreScanner(final Scan scan, ScanInfo scanInfo,
310       ScanType scanType, final NavigableSet<byte[]> columns,
311       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
312   throws IOException {
313     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
314     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
315         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
316
317     // In unit tests, the store could be null
318     if (this.store != null) {
319       this.store.addChangedReaderObserver(this);
320     }
321     // Seek all scanners to the initial key
322     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
323     addCurrentScanners(scanners);
324     resetKVHeap(scanners, scanInfo.getComparator());
325   }
326
327   /**
328    * Get a filtered list of scanners. Assumes we are not in a compaction.
329    * @return list of scanners to seek
330    */
331   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
332     final boolean isCompaction = false;
333     boolean usePread = get || scanUsePread;
334     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
335         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
336   }
337
338   /**
339    * Seek the specified scanners with the given key
340    * @param scanners
341    * @param seekKey
342    * @param isLazy true if using lazy seek
343    * @param isParallelSeek true if using parallel seek
344    * @throws IOException
345    */
346   protected void seekScanners(List<? extends KeyValueScanner> scanners,
347       Cell seekKey, boolean isLazy, boolean isParallelSeek)
348       throws IOException {
349     // Seek all scanners to the start of the Row (or if the exact matching row
350     // key does not exist, then to the start of the next matching Row).
351     // Always check bloom filter to optimize the top row seek for delete
352     // family marker.
353     if (isLazy) {
354       for (KeyValueScanner scanner : scanners) {
355         scanner.requestSeek(seekKey, false, true);
356       }
357     } else {
358       if (!isParallelSeek) {
359         long totalScannersSoughtBytes = 0;
360         for (KeyValueScanner scanner : scanners) {
361           if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
362             throw new RowTooBigException("Max row size allowed: " + maxRowSize
363               + ", but row is bigger than that");
364           }
365           scanner.seek(seekKey);
366           Cell c = scanner.peek();
367           if (c != null) {
368             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
369           }
370         }
371       } else {
372         parallelSeek(scanners, seekKey);
373       }
374     }
375   }
376
377   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
378       CellComparator comparator) throws IOException {
379     // Combine all seeked scanners with a heap
380     heap = new KeyValueHeap(scanners, comparator);
381   }
382
383   /**
384    * Filters the given list of scanners using Bloom filter, time range, and
385    * TTL.
386    */
387   protected List<KeyValueScanner> selectScannersFrom(
388       final List<? extends KeyValueScanner> allScanners) {
389     boolean memOnly;
390     boolean filesOnly;
391     if (scan instanceof InternalScan) {
392       InternalScan iscan = (InternalScan)scan;
393       memOnly = iscan.isCheckOnlyMemStore();
394       filesOnly = iscan.isCheckOnlyStoreFiles();
395     } else {
396       memOnly = false;
397       filesOnly = false;
398     }
399
400     List<KeyValueScanner> scanners =
401         new ArrayList<KeyValueScanner>(allScanners.size());
402
403     // We can only exclude store files based on TTL if minVersions is set to 0.
404     // Otherwise, we might have to return KVs that have technically expired.
405     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
406         Long.MIN_VALUE;
407 
408     // include only those scan files which pass all filters
409     for (KeyValueScanner kvs : allScanners) {
410       boolean isFile = kvs.isFileScanner();
411       if ((!isFile && filesOnly) || (isFile && memOnly)) {
412         continue;
413       }
414
415       if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
416         scanners.add(kvs);
417       } else {
418         kvs.close();
419       }
420     }
421     return scanners;
422   }
423
424   @Override
425   public Cell peek() {
426     if (this.heap == null) {
427       return this.lastTop;
428     }
429     return this.heap.peek();
430   }
431
432   @Override
433   public KeyValue next() {
434     // throw runtime exception perhaps?
435     throw new RuntimeException("Never call StoreScanner.next()");
436   }
437
438   @Override
439   public void close() {
440     close(true);
441   }
442
443   private void close(boolean withHeapClose) {
444     if (this.closing) {
445       return;
446     }
447     if (withHeapClose) this.closing = true;
448     // Under test, we dont have a this.store
449     if (this.store != null) this.store.deleteChangedReaderObserver(this);
450     if (withHeapClose) {
451       for (KeyValueHeap h : this.heapsForDelayedClose) {
452         h.close();
453       }
454       this.heapsForDelayedClose.clear();
455       if (this.heap != null) {
456         this.heap.close();
457         this.currentScanners.clear();
458         this.heap = null; // CLOSED!
459       }
460     } else {
461       if (this.heap != null) {
462         this.heapsForDelayedClose.add(this.heap);
463         this.currentScanners.clear();
464         this.heap = null;
465       }
466     }
467     this.lastTop = null; // If both are null, we are closed.
468   }
469
470   @Override
471   public boolean seek(Cell key) throws IOException {
472     boolean flushed = checkFlushed();
473     // reset matcher state, in case that underlying store changed
474     checkReseek(flushed);
475     return this.heap.seek(key);
476   }
477
478   @Override
479   public boolean next(List<Cell> outResult) throws IOException {
480     return next(outResult, NoLimitScannerContext.getInstance());
481   }
482
483   /**
484    * Get the next row of values from this Store.
485    * @param outResult
486    * @param scannerContext
487    * @return true if there are more rows, false if scanner is done
488    */
489   @Override
490   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
491     if (scannerContext == null) {
492       throw new IllegalArgumentException("Scanner context cannot be null");
493     }
494     boolean flushed = checkFlushed();
495     if (checkReseek(flushed)) {
496       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
497     }
498
499     // if the heap was left null, then the scanners had previously run out anyways, close and
500     // return.
501     if (this.heap == null) {
502       // By this time partial close should happened because already heap is null
503       close(false);// Do all cleanup except heap.close()
504       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
505     }
506
507     Cell cell = this.heap.peek();
508     if (cell == null) {
509       close(false);// Do all cleanup except heap.close()
510       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
511     }
512
513     // only call setRow if the row changes; avoids confusing the query matcher
514     // if scanning intra-row
515
516     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
517     // rows. Else it is possible we are still traversing the same row so we must perform the row
518     // comparison.
519     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null) {
520       this.countPerRow = 0;
521       matcher.setToNewRow(cell);
522     }
523
524     // Clear progress away unless invoker has indicated it should be kept.
525     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
526
527     // Only do a sanity-check if store and comparator are available.
528     CellComparator comparator = store != null ? store.getComparator() : null;
529
530     int count = 0;
531     long totalBytesRead = 0;
532
533     LOOP: do {
534       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
535       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
536         scannerContext.updateTimeProgress();
537         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
538           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
539         }
540       }
541
542       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
543       checkScanOrder(prevCell, cell, comparator);
544       prevCell = cell;
545       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
546       qcode = optimize(qcode, cell);
547       switch (qcode) {
548       case INCLUDE:
549       case INCLUDE_AND_SEEK_NEXT_ROW:
550       case INCLUDE_AND_SEEK_NEXT_COL:
551
552         Filter f = matcher.getFilter();
553         if (f != null) {
554           cell = f.transformCell(cell);
555         }
556
557         this.countPerRow++;
558         if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
559           // do what SEEK_NEXT_ROW does.
560           if (!matcher.moreRowsMayExistAfter(cell)) {
561             close(false);// Do all cleanup except heap.close()
562             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
563           }
564           matcher.curCell = null;
565           seekToNextRow(cell);
566           break LOOP;
567         }
568
569         // add to results only if we have skipped #storeOffset kvs
570         // also update metric accordingly
571         if (this.countPerRow > storeOffset) {
572           outResult.add(cell);
573
574           // Update local tracking information
575           count++;
576           totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
577
578           // Update the progress of the scanner context
579           scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
580           scannerContext.incrementBatchProgress(1);
581
582           if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
583             throw new RowTooBigException(
584                 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
585           }
586         }
587
588         if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
589           if (!matcher.moreRowsMayExistAfter(cell)) {
590             close(false);// Do all cleanup except heap.close()
591             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
592           }
593           matcher.curCell = null;
594           seekToNextRow(cell);
595         } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
596           seekAsDirection(matcher.getKeyForNextColumn(cell));
597         } else {
598           this.heap.next();
599         }
600
601         if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
602           break LOOP;
603         }
604         if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
605           break LOOP;
606         }
607         continue;
608
609       case DONE:
610         // Optimization for Gets! If DONE, no more to get on this row, early exit!
611         if (this.scan.isGetScan()) {
612           // Then no more to this row... exit.
613           close(false);// Do all cleanup except heap.close()
614           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
615         }
616         matcher.curCell = null;
617         return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
618
619       case DONE_SCAN:
620         close(false);// Do all cleanup except heap.close()
621         return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
622
623       case SEEK_NEXT_ROW:
624         // This is just a relatively simple end of scan fix, to short-cut end
625         // us if there is an endKey in the scan.
626         if (!matcher.moreRowsMayExistAfter(cell)) {
627           close(false);// Do all cleanup except heap.close()
628           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
629         }
630         matcher.curCell = null;
631         seekToNextRow(cell);
632         break;
633
634       case SEEK_NEXT_COL:
635         seekAsDirection(matcher.getKeyForNextColumn(cell));
636         break;
637
638       case SKIP:
639         this.heap.next();
640         break;
641
642       case SEEK_NEXT_USING_HINT:
643         Cell nextKV = matcher.getNextKeyHint(cell);
644         if (nextKV != null) {
645           seekAsDirection(nextKV);
646         } else {
647           heap.next();
648         }
649         break;
650
651       default:
652         throw new RuntimeException("UNEXPECTED");
653       }
654     } while ((cell = this.heap.peek()) != null);
655
656     if (count > 0) {
657       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
658     }
659
660     // No more keys
661     close(false);// Do all cleanup except heap.close()
662     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
663   }
664
665   /**
666    * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
667    * This method works together with ColumnTrackers and Filters. ColumnTrackers may issue SEEK
668    * hints, such as seek to next column, next row, or seek to an arbitrary seek key.
669    * This method intercepts these qcodes and decides whether a seek is the most efficient _actual_
670    * way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
671    * current, loaded block).
672    * It does this by looking at the next indexed key of the current HFile. This key
673    * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
674    * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
675    * the current Cell but compare as though it were a seek key; see down in
676    * matcher.compareKeyForNextRow, etc). If the compare gets us onto the
677    * next block we *_SEEK, otherwise we just INCLUDE or SKIP, and let the ColumnTrackers or Filters
678    * go through the next Cell, and so on)
679    *
680    * <p>The ColumnTrackers and Filters must behave correctly in all cases, i.e. if they are past the
681    * Cells they care about they must issues a SKIP or SEEK.
682    *
683    * <p>Other notes:
684    * <ul>
685    * <li>Rows can straddle block boundaries</li>
686    * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
687    * different block than column C1 at T2)</li>
688    * <li>We want to SKIP and INCLUDE if the chance is high that we'll find the desired Cell after a
689    * few SKIPs...</li>
690    * <li>We want to INCLUDE_AND_SEEK and SEEK when the chance is high that we'll be able to seek
691    * past many Cells, especially if we know we need to go to the next block.</li>
692    * </ul>
693    * <p>A good proxy (best effort) to determine whether INCLUDE/SKIP is better than SEEK is whether
694    * we'll likely end up seeking to the next block (or past the next block) to get our next column.
695    * Example:
696    * <pre>
697    * |    BLOCK 1              |     BLOCK 2                   |
698    * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
699    *                                   ^         ^
700    *                                   |         |
701    *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
702    *
703    *
704    * |    BLOCK 1                       |     BLOCK 2                      |
705    * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
706    *                                            ^              ^
707    *                                            |              |
708    *                                    Next Index Key        SEEK_NEXT_COL
709    * </pre>
710    * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
711    * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
712    * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
713    * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
714    * where the SEEK will not land us in the next block, it is very likely better to issues a series
715    * of SKIPs.
716    */
717   @VisibleForTesting
718   protected ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
719     switch(qcode) {
720     case INCLUDE_AND_SEEK_NEXT_COL:
721     case SEEK_NEXT_COL:
722     {
723       Cell nextIndexedKey = getNextIndexedKey();
724       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
725           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
726         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
727       }
728       break;
729     }
730     case INCLUDE_AND_SEEK_NEXT_ROW:
731     case SEEK_NEXT_ROW:
732     {
733       // If it is a Get Scan, then we know that we are done with this row; there are no more
734       // rows beyond the current one: don't try to optimize. We are DONE. Return the *_NEXT_ROW
735       // qcode as is. When the caller gets these flags on a Get Scan, it knows it can shut down the
736       // Scan.
737       if (!this.scan.isGetScan()) {
738         Cell nextIndexedKey = getNextIndexedKey();
739         if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
740             && matcher.compareKeyForNextRow(nextIndexedKey, cell) > 0) {
741           return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
742         }
743       }
744       break;
745     }
746     default:
747       break;
748     }
749     return qcode;
750   }
751
752   // Implementation of ChangedReadersObserver
753   @Override
754   public void updateReaders(List<StoreFile> sfs) throws IOException {
755     flushed = true;
756     flushLock.lock();
757     try {
758       flushedStoreFiles.addAll(sfs);
759     } finally {
760       flushLock.unlock();
761     }
762     // Let the next() call handle re-creating and seeking
763   }
764
765   /**
766    * @param flushed indicates if there was a flush
767    * @return true if top of heap has changed (and KeyValueHeap has to try the
768    *         next KV)
769    * @throws IOException
770    */
771   protected boolean checkReseek(boolean flushed) throws IOException {
772     if (flushed && this.lastTop != null) {
773       resetScannerStack(this.lastTop);
774       if (this.heap.peek() == null
775           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
776         LOG.debug("Storescanner.peek() is changed where before = "
777             + this.lastTop.toString() + ",and after = " + this.heap.peek());
778         this.lastTop = null;
779         return true;
780       }
781       this.lastTop = null; // gone!
782     }
783     // else dont need to reseek
784     return false;
785   }
786
787   protected void resetScannerStack(Cell lastTopKey) throws IOException {
788     /* When we have the scan object, should we not pass it to getScanners()
789      * to get a limited set of scanners? We did so in the constructor and we
790      * could have done it now by storing the scan object from the constructor
791      */
792
793     final boolean isCompaction = false;
794     boolean usePread = get || scanUsePread;
795     List<KeyValueScanner> scanners = null;
796     try {
797       flushLock.lock();
798       scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
799         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
800       // Clear the current set of flushed store files so that they don't get added again
801       flushedStoreFiles.clear();
802     } finally {
803       flushLock.unlock();
804     }
805
806     // Seek the new scanners to the last key
807     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
808     // remove the older memstore scanner
809     for (int i = 0; i < currentScanners.size(); i++) {
810       if (!currentScanners.get(i).isFileScanner()) {
811         currentScanners.remove(i);
812         break;
813       }
814     }
815     // add the newly created scanners on the flushed files and the current active memstore scanner
816     addCurrentScanners(scanners);
817     // Combine all seeked scanners with a heap
818     resetKVHeap(this.currentScanners, store.getComparator());
819     // Reset the state of the Query Matcher and set to top row.
820     // Only reset and call setRow if the row changes; avoids confusing the
821     // query matcher if scanning intra-row.
822     Cell cell = heap.peek();
823     if (cell == null) {
824       cell = lastTopKey;
825     }
826     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
827       this.countPerRow = 0;
828       // The setToNewRow will call reset internally
829       matcher.setToNewRow(cell);
830     }
831   }
832
833   /**
834    * Check whether scan as expected order
835    * @param prevKV
836    * @param kv
837    * @param comparator
838    * @throws IOException
839    */
840   protected void checkScanOrder(Cell prevKV, Cell kv,
841       CellComparator comparator) throws IOException {
842     // Check that the heap gives us KVs in an increasing order.
843     assert prevKV == null || comparator == null
844         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
845         + " followed by a " + "smaller key " + kv + " in cf " + store;
846   }
847
848   protected boolean seekToNextRow(Cell c) throws IOException {
849     return reseek(CellUtil.createLastOnRow(c));
850   }
851
852   /**
853    * Do a reseek in a normal StoreScanner(scan forward)
854    * @param kv
855    * @return true if scanner has values left, false if end of scanner
856    * @throws IOException
857    */
858   protected boolean seekAsDirection(Cell kv)
859       throws IOException {
860     return reseek(kv);
861   }
862
863   @Override
864   public boolean reseek(Cell kv) throws IOException {
865     boolean flushed = checkFlushed();
866     // Heap will not be null, if this is called from next() which.
867     // If called from RegionScanner.reseek(...) make sure the scanner
868     // stack is reset if needed.
869     checkReseek(flushed);
870     if (explicitColumnQuery && lazySeekEnabledGlobally) {
871       return heap.requestSeek(kv, true, useRowColBloom);
872     }
873     return heap.reseek(kv);
874   }
875
876   protected boolean checkFlushed() {
877     // check the var without any lock. Suppose even if we see the old
878     // value here still it is ok to continue because we will not be resetting
879     // the heap but will continue with the referenced memstore's snapshot. For compactions
880     // any way we don't need the updateReaders at all to happen as we still continue with
881     // the older files
882     if (flushed) {
883       // If there is a flush and the current scan is notified on the flush ensure that the
884       // scan's heap gets reset and we do a seek on the newly flushed file.
885       if(!this.closing) {
886         this.lastTop = this.peek();
887       } else {
888         return false;
889       }
890       // reset the flag
891       flushed = false;
892       return true;
893     }
894     return false;
895   }
896
897   /**
898    * @see KeyValueScanner#getScannerOrder()
899    */
900   @Override
901   public long getScannerOrder() {
902     return 0;
903   }
904
905   /**
906    * Seek storefiles in parallel to optimize IO latency as much as possible
907    * @param scanners the list {@link KeyValueScanner}s to be read from
908    * @param kv the KeyValue on which the operation is being requested
909    * @throws IOException
910    */
911   private void parallelSeek(final List<? extends KeyValueScanner>
912       scanners, final Cell kv) throws IOException {
913     if (scanners.isEmpty()) return;
914     int storeFileScannerCount = scanners.size();
915     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
916     List<ParallelSeekHandler> handlers =
917         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
918     for (KeyValueScanner scanner : scanners) {
919       if (scanner instanceof StoreFileScanner) {
920         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
921           this.readPt, latch);
922         executor.submit(seekHandler);
923         handlers.add(seekHandler);
924       } else {
925         scanner.seek(kv);
926         latch.countDown();
927       }
928     }
929
930     try {
931       latch.await();
932     } catch (InterruptedException ie) {
933       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
934     }
935
936     for (ParallelSeekHandler handler : handlers) {
937       if (handler.getErr() != null) {
938         throw new IOException(handler.getErr());
939       }
940     }
941   }
942
943   /**
944    * Used in testing.
945    * @return all scanners in no particular order
946    */
947   List<KeyValueScanner> getAllScannersForTesting() {
948     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
949     KeyValueScanner current = heap.getCurrentForTesting();
950     if (current != null)
951       allScanners.add(current);
952     for (KeyValueScanner scanner : heap.getHeap())
953       allScanners.add(scanner);
954     return allScanners;
955   }
956
957   static void enableLazySeekGlobally(boolean enable) {
958     lazySeekEnabledGlobally = enable;
959   }
960
961   /**
962    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
963    */
964   public long getEstimatedNumberOfKvsScanned() {
965     return this.kvsScanned;
966   }
967
968   @Override
969   public Cell getNextIndexedKey() {
970     return this.heap.getNextIndexedKey();
971   }
972
973   @Override
974   public void shipped() throws IOException {
975     for (KeyValueHeap h : this.heapsForDelayedClose) {
976       h.close();// There wont be further fetch of Cells from these scanners. Just close.
977     }
978     this.heapsForDelayedClose.clear();
979     if (this.heap != null) {
980       this.heap.shipped();
981     }
982   }
983 }
984