View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellComparator;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.DoNotRetryIOException;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    // In unit tests, the store could be null
62    protected final Store store;
63    protected ScanQueryMatcher matcher;
64    protected KeyValueHeap heap;
65    protected boolean cacheBlocks;
66  
67    protected long countPerRow = 0;
68    protected int storeLimit = -1;
69    protected int storeOffset = 0;
70  
71    // Used to indicate that the scanner has closed (see HBASE-1107)
72    // Doesnt need to be volatile because it's always accessed via synchronized methods
73    protected boolean closing = false;
74    protected final boolean get;
75    protected final boolean explicitColumnQuery;
76    protected final boolean useRowColBloom;
77    /**
78     * A flag that enables StoreFileScanner parallel-seeking
79     */
80    protected boolean parallelSeekEnabled = false;
81    protected ExecutorService executor;
82    protected final Scan scan;
83    protected final NavigableSet<byte[]> columns;
84    protected final long oldestUnexpiredTS;
85    protected final long now;
86    protected final int minVersions;
87    protected final long maxRowSize;
88    protected final long cellsPerHeartbeatCheck;
89  
90    // Collects all the KVHeap that are eagerly getting closed during the
91    // course of a scan
92    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
93  
94    /**
95     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
96     * KVs skipped via seeking to next row/column. TODO: estimate them?
97     */
98    private long kvsScanned = 0;
99    private Cell prevCell = null;
100 
101   /** We don't ever expect to change this, the constant is just for clarity. */
102   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
103   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
104       "hbase.storescanner.parallel.seek.enable";
105 
106   /** Used during unit testing to ensure that lazy seek does save seek ops */
107   protected static boolean lazySeekEnabledGlobally =
108       LAZY_SEEK_ENABLED_BY_DEFAULT;
109 
110   /**
111    * The number of cells scanned in between timeout checks. Specifying a larger value means that
112    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
113    * timeout checks.
114    */
115   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
116       "hbase.cells.scanned.per.heartbeat.check";
117 
118   /**
119    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
120    */
121   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
122 
123   // if heap == null and lastTop != null, you need to reseek given the key below
124   protected Cell lastTop = null;
125 
126   // A flag whether use pread for scan
127   private boolean scanUsePread = false;
128   // Indicates whether there was flush during the course of the scan
129   protected volatile boolean flushed = false;
130   // generally we get one file from a flush
131   protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
132   // The current list of scanners
133   protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
134   // flush update lock
135   private ReentrantLock flushLock = new ReentrantLock();
136 
137   protected final long readPt;
138 
139   // used by the injection framework to test race between StoreScanner construction and compaction
140   enum StoreScannerCompactionRace {
141     BEFORE_SEEK,
142     AFTER_SEEK,
143     COMPACT_COMPLETE
144   }
145 
146   /** An internal constructor. */
147   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
148       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
149     this.readPt = readPt;
150     this.store = store;
151     this.cacheBlocks = cacheBlocks;
152     get = scan.isGetScan();
153     int numCol = columns == null ? 0 : columns.size();
154     explicitColumnQuery = numCol > 0;
155     this.scan = scan;
156     this.columns = columns;
157     this.now = EnvironmentEdgeManager.currentTime();
158     this.oldestUnexpiredTS = now - scanInfo.getTtl();
159     this.minVersions = scanInfo.getMinVersions();
160 
161      // We look up row-column Bloom filters for multi-column queries as part of
162      // the seek operation. However, we also look the row-column Bloom filter
163      // for multi-row (non-"get") scans because this is not done in
164      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
165      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
166 
167      this.maxRowSize = scanInfo.getTableMaxRowSize();
168      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
169      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
170      // Parallel seeking is on if the config allows and more there is more than one store file.
171      if (this.store != null && this.store.getStorefilesCount() > 1) {
172        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
173        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
174          this.parallelSeekEnabled = true;
175          this.executor = rsService.getExecutorService();
176        }
177      }
178   }
179 
180   protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
181     this.currentScanners.addAll(scanners);
182   }
183   /**
184    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
185    * are not in a compaction.
186    *
187    * @param store who we scan
188    * @param scan the spec
189    * @param columns which columns we are scanning
190    * @throws IOException
191    */
192   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
193       long readPt)
194   throws IOException {
195     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
196     if (columns != null && scan.isRaw()) {
197       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
198     }
199     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
200         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
201         oldestUnexpiredTS, now, store.getCoprocessorHost());
202 
203     this.store.addChangedReaderObserver(this);
204 
205     try {
206       // Pass columns to try to filter out unnecessary StoreFiles.
207       List<KeyValueScanner> scanners = getScannersNoCompaction();
208
209       // Seek all scanners to the start of the Row (or if the exact matching row
210       // key does not exist, then to the start of the next matching Row).
211       // Always check bloom filter to optimize the top row seek for delete
212       // family marker.
213       seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
214         parallelSeekEnabled);
215
216       // set storeLimit
217       this.storeLimit = scan.getMaxResultsPerColumnFamily();
218
219       // set rowOffset
220       this.storeOffset = scan.getRowOffsetPerColumnFamily();
221       addCurrentScanners(scanners);
222       // Combine all seeked scanners with a heap
223       resetKVHeap(scanners, store.getComparator());
224     } catch (IOException e) {
225       // remove us from the HStore#changedReaderObservers here or we'll have no chance to
226       // and might cause memory leak
227       this.store.deleteChangedReaderObserver(this);
228       throw e;
229     }
230   }
231
232   /**
233    * Used for compactions.<p>
234    *
235    * Opens a scanner across specified StoreFiles.
236    * @param store who we scan
237    * @param scan the spec
238    * @param scanners ancillary scanners
239    * @param smallestReadPoint the readPoint that we should use for tracking
240    *          versions
241    */
242   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
243       List<? extends KeyValueScanner> scanners, ScanType scanType,
244       long smallestReadPoint, long earliestPutTs) throws IOException {
245     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
246   }
247
248   /**
249    * Used for compactions that drop deletes from a limited range of rows.<p>
250    *
251    * Opens a scanner across specified StoreFiles.
252    * @param store who we scan
253    * @param scan the spec
254    * @param scanners ancillary scanners
255    * @param smallestReadPoint the readPoint that we should use for tracking versions
256    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
257    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
258    */
259   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
260       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
261       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
262     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
263         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
264   }
265
266   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
267       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
268       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
269     this(store, scan, scanInfo, null,
270       ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
271     if (dropDeletesFromRow == null) {
272       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
273           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
274     } else {
275       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
276           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
277     }
278
279     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
280     scanners = selectScannersFrom(scanners);
281 
282     // Seek all scanners to the initial key
283     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
284     addCurrentScanners(scanners);
285     // Combine all seeked scanners with a heap
286     resetKVHeap(scanners, store.getComparator());
287   }
288
289   @VisibleForTesting
290   StoreScanner(final Scan scan, ScanInfo scanInfo,
291       ScanType scanType, final NavigableSet<byte[]> columns,
292       final List<KeyValueScanner> scanners) throws IOException {
293     this(scan, scanInfo, scanType, columns, scanners,
294         HConstants.LATEST_TIMESTAMP,
295         // 0 is passed as readpoint because the test bypasses Store
296         0);
297   }
298
299   @VisibleForTesting
300   StoreScanner(final Scan scan, ScanInfo scanInfo,
301     ScanType scanType, final NavigableSet<byte[]> columns,
302     final List<KeyValueScanner> scanners, long earliestPutTs)
303         throws IOException {
304     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
305       // 0 is passed as readpoint because the test bypasses Store
306       0);
307   }
308
309   public StoreScanner(final Scan scan, ScanInfo scanInfo,
310       ScanType scanType, final NavigableSet<byte[]> columns,
311       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
312   throws IOException {
313     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
314     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
315         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
316
317     // In unit tests, the store could be null
318     if (this.store != null) {
319       this.store.addChangedReaderObserver(this);
320     }
321     // Seek all scanners to the initial key
322     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
323     addCurrentScanners(scanners);
324     resetKVHeap(scanners, scanInfo.getComparator());
325   }
326
327   /**
328    * Get a filtered list of scanners. Assumes we are not in a compaction.
329    * @return list of scanners to seek
330    */
331   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
332     final boolean isCompaction = false;
333     boolean usePread = get || scanUsePread;
334     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
335         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
336   }
337
338   /**
339    * Seek the specified scanners with the given key
340    * @param scanners
341    * @param seekKey
342    * @param isLazy true if using lazy seek
343    * @param isParallelSeek true if using parallel seek
344    * @throws IOException
345    */
346   protected void seekScanners(List<? extends KeyValueScanner> scanners,
347       Cell seekKey, boolean isLazy, boolean isParallelSeek)
348       throws IOException {
349     // Seek all scanners to the start of the Row (or if the exact matching row
350     // key does not exist, then to the start of the next matching Row).
351     // Always check bloom filter to optimize the top row seek for delete
352     // family marker.
353     if (isLazy) {
354       for (KeyValueScanner scanner : scanners) {
355         scanner.requestSeek(seekKey, false, true);
356       }
357     } else {
358       if (!isParallelSeek) {
359         long totalScannersSoughtBytes = 0;
360         for (KeyValueScanner scanner : scanners) {
361           if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
362             throw new RowTooBigException("Max row size allowed: " + maxRowSize
363               + ", but row is bigger than that");
364           }
365           scanner.seek(seekKey);
366           Cell c = scanner.peek();
367           if (c != null) {
368             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
369           }
370         }
371       } else {
372         parallelSeek(scanners, seekKey);
373       }
374     }
375   }
376
377   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
378       CellComparator comparator) throws IOException {
379     // Combine all seeked scanners with a heap
380     heap = new KeyValueHeap(scanners, comparator);
381   }
382
383   /**
384    * Filters the given list of scanners using Bloom filter, time range, and
385    * TTL.
386    */
387   protected List<KeyValueScanner> selectScannersFrom(
388       final List<? extends KeyValueScanner> allScanners) {
389     boolean memOnly;
390     boolean filesOnly;
391     if (scan instanceof InternalScan) {
392       InternalScan iscan = (InternalScan)scan;
393       memOnly = iscan.isCheckOnlyMemStore();
394       filesOnly = iscan.isCheckOnlyStoreFiles();
395     } else {
396       memOnly = false;
397       filesOnly = false;
398     }
399
400     List<KeyValueScanner> scanners =
401         new ArrayList<KeyValueScanner>(allScanners.size());
402
403     // We can only exclude store files based on TTL if minVersions is set to 0.
404     // Otherwise, we might have to return KVs that have technically expired.
405     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS: Long.MIN_VALUE;
406
407     // include only those scan files which pass all filters
408     for (KeyValueScanner kvs : allScanners) {
409       boolean isFile = kvs.isFileScanner();
410       if ((!isFile && filesOnly) || (isFile && memOnly)) {
411         continue;
412       }
413
414       if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
415         scanners.add(kvs);
416       } else {
417         kvs.close();
418       }
419     }
420     return scanners;
421   }
422
423   @Override
424   public Cell peek() {
425     if (this.heap == null) {
426       return this.lastTop;
427     }
428     return this.heap.peek();
429   }
430
431   @Override
432   public KeyValue next() {
433     // throw runtime exception perhaps?
434     throw new RuntimeException("Never call StoreScanner.next()");
435   }
436
437   @Override
438   public void close() {
439     close(true);
440   }
441
442   private void close(boolean withHeapClose) {
443     if (this.closing) {
444       return;
445     }
446     if (withHeapClose) this.closing = true;
447     // Under test, we dont have a this.store
448     if (this.store != null) this.store.deleteChangedReaderObserver(this);
449     if (withHeapClose) {
450       for (KeyValueHeap h : this.heapsForDelayedClose) {
451         h.close();
452       }
453       this.heapsForDelayedClose.clear();
454       if (this.heap != null) {
455         this.heap.close();
456         this.currentScanners.clear();
457         this.heap = null; // CLOSED!
458       }
459     } else {
460       if (this.heap != null) {
461         this.heapsForDelayedClose.add(this.heap);
462         this.currentScanners.clear();
463         this.heap = null;
464       }
465     }
466     this.lastTop = null; // If both are null, we are closed.
467   }
468
469   @Override
470   public boolean seek(Cell key) throws IOException {
471     boolean flushed = checkFlushed();
472     // reset matcher state, in case that underlying store changed
473     checkReseek(flushed);
474     return this.heap.seek(key);
475   }
476
477   @Override
478   public boolean next(List<Cell> outResult) throws IOException {
479     return next(outResult, NoLimitScannerContext.getInstance());
480   }
481
482   /**
483    * Get the next row of values from this Store.
484    * @param outResult
485    * @param scannerContext
486    * @return true if there are more rows, false if scanner is done
487    */
488   @Override
489   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
490     if (scannerContext == null) {
491       throw new IllegalArgumentException("Scanner context cannot be null");
492     }
493     boolean flushed = checkFlushed();
494     if (checkReseek(flushed)) {
495       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
496     }
497
498     // if the heap was left null, then the scanners had previously run out anyways, close and
499     // return.
500     if (this.heap == null) {
501       // By this time partial close should happened because already heap is null
502       close(false);// Do all cleanup except heap.close()
503       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504     }
505
506     Cell cell = this.heap.peek();
507     if (cell == null) {
508       close(false);// Do all cleanup except heap.close()
509       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
510     }
511
512     // only call setRow if the row changes; avoids confusing the query matcher
513     // if scanning intra-row
514
515     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
516     // rows. Else it is possible we are still traversing the same row so we must perform the row
517     // comparison.
518     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null) {
519       this.countPerRow = 0;
520       matcher.setToNewRow(cell);
521     }
522
523     // Clear progress away unless invoker has indicated it should be kept.
524     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
525
526     // Only do a sanity-check if store and comparator are available.
527     CellComparator comparator = store != null ? store.getComparator() : null;
528
529     int count = 0;
530     long totalBytesRead = 0;
531
532     LOOP: do {
533       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
534       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
535         scannerContext.updateTimeProgress();
536         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
537           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
538         }
539       }
540
541       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
542       checkScanOrder(prevCell, cell, comparator);
543       prevCell = cell;
544       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
545       qcode = optimize(qcode, cell);
546       switch (qcode) {
547       case INCLUDE:
548       case INCLUDE_AND_SEEK_NEXT_ROW:
549       case INCLUDE_AND_SEEK_NEXT_COL:
550
551         Filter f = matcher.getFilter();
552         if (f != null) {
553           cell = f.transformCell(cell);
554         }
555
556         this.countPerRow++;
557         if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
558           // do what SEEK_NEXT_ROW does.
559           if (!matcher.moreRowsMayExistAfter(cell)) {
560             close(false);// Do all cleanup except heap.close()
561             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
562           }
563           matcher.curCell = null;
564           seekToNextRow(cell);
565           break LOOP;
566         }
567
568         // add to results only if we have skipped #storeOffset kvs
569         // also update metric accordingly
570         if (this.countPerRow > storeOffset) {
571           outResult.add(cell);
572
573           // Update local tracking information
574           count++;
575           totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
576
577           // Update the progress of the scanner context
578           scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
579           scannerContext.incrementBatchProgress(1);
580
581           if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
582             throw new RowTooBigException(
583                 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
584           }
585         }
586
587         if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
588           if (!matcher.moreRowsMayExistAfter(cell)) {
589             close(false);// Do all cleanup except heap.close()
590             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
591           }
592           matcher.curCell = null;
593           seekToNextRow(cell);
594         } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
595           seekAsDirection(matcher.getKeyForNextColumn(cell));
596         } else {
597           this.heap.next();
598         }
599
600         if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
601           break LOOP;
602         }
603         if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
604           break LOOP;
605         }
606         continue;
607
608       case DONE:
609         // Optimization for Gets! If DONE, no more to get on this row, early exit!
610         if (this.scan.isGetScan()) {
611           // Then no more to this row... exit.
612           close(false);// Do all cleanup except heap.close()
613           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
614         }
615         matcher.curCell = null;
616         return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
617
618       case DONE_SCAN:
619         close(false);// Do all cleanup except heap.close()
620         return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
621
622       case SEEK_NEXT_ROW:
623         // This is just a relatively simple end of scan fix, to short-cut end
624         // us if there is an endKey in the scan.
625         if (!matcher.moreRowsMayExistAfter(cell)) {
626           close(false);// Do all cleanup except heap.close()
627           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
628         }
629         matcher.curCell = null;
630         seekToNextRow(cell);
631         break;
632
633       case SEEK_NEXT_COL:
634         seekAsDirection(matcher.getKeyForNextColumn(cell));
635         break;
636
637       case SKIP:
638         this.heap.next();
639         break;
640
641       case SEEK_NEXT_USING_HINT:
642         Cell nextKV = matcher.getNextKeyHint(cell);
643         if (nextKV != null) {
644           seekAsDirection(nextKV);
645         } else {
646           heap.next();
647         }
648         break;
649
650       default:
651         throw new RuntimeException("UNEXPECTED");
652       }
653     } while ((cell = this.heap.peek()) != null);
654
655     if (count > 0) {
656       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
657     }
658
659     // No more keys
660     close(false);// Do all cleanup except heap.close()
661     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
662   }
663
664   /**
665    * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
666    * This method works together with ColumnTrackers and Filters. ColumnTrackers may issue SEEK
667    * hints, such as seek to next column, next row, or seek to an arbitrary seek key.
668    * This method intercepts these qcodes and decides whether a seek is the most efficient _actual_
669    * way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
670    * current, loaded block).
671    * It does this by looking at the next indexed key of the current HFile. This key
672    * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
673    * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
674    * the current Cell but compare as though it were a seek key; see down in
675    * matcher.compareKeyForNextRow, etc). If the compare gets us onto the
676    * next block we *_SEEK, otherwise we just INCLUDE or SKIP, and let the ColumnTrackers or Filters
677    * go through the next Cell, and so on)
678    *
679    * <p>The ColumnTrackers and Filters must behave correctly in all cases, i.e. if they are past the
680    * Cells they care about they must issues a SKIP or SEEK.
681    *
682    * <p>Other notes:
683    * <ul>
684    * <li>Rows can straddle block boundaries</li>
685    * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
686    * different block than column C1 at T2)</li>
687    * <li>We want to SKIP and INCLUDE if the chance is high that we'll find the desired Cell after a
688    * few SKIPs...</li>
689    * <li>We want to INCLUDE_AND_SEEK and SEEK when the chance is high that we'll be able to seek
690    * past many Cells, especially if we know we need to go to the next block.</li>
691    * </ul>
692    * <p>A good proxy (best effort) to determine whether INCLUDE/SKIP is better than SEEK is whether
693    * we'll likely end up seeking to the next block (or past the next block) to get our next column.
694    * Example:
695    * <pre>
696    * |    BLOCK 1              |     BLOCK 2                   |
697    * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
698    *                                   ^         ^
699    *                                   |         |
700    *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
701    *
702    *
703    * |    BLOCK 1                       |     BLOCK 2                      |
704    * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
705    *                                            ^              ^
706    *                                            |              |
707    *                                    Next Index Key        SEEK_NEXT_COL
708    * </pre>
709    * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
710    * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
711    * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
712    * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
713    * where the SEEK will not land us in the next block, it is very likely better to issues a series
714    * of SKIPs.
715    */
716   @VisibleForTesting
717   protected ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
718     switch(qcode) {
719     case INCLUDE_AND_SEEK_NEXT_COL:
720     case SEEK_NEXT_COL:
721     {
722       Cell nextIndexedKey = getNextIndexedKey();
723       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
724           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
725         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
726       }
727       break;
728     }
729     case INCLUDE_AND_SEEK_NEXT_ROW:
730     case SEEK_NEXT_ROW:
731     {
732       // If it is a Get Scan, then we know that we are done with this row; there are no more
733       // rows beyond the current one: don't try to optimize. We are DONE. Return the *_NEXT_ROW
734       // qcode as is. When the caller gets these flags on a Get Scan, it knows it can shut down the
735       // Scan.
736       if (!this.scan.isGetScan()) {
737         Cell nextIndexedKey = getNextIndexedKey();
738         if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
739             && matcher.compareKeyForNextRow(nextIndexedKey, cell) > 0) {
740           return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
741         }
742       }
743       break;
744     }
745     default:
746       break;
747     }
748     return qcode;
749   }
750
751   // Implementation of ChangedReadersObserver
752   @Override
753   public void updateReaders(List<StoreFile> sfs) throws IOException {
754     flushed = true;
755     flushLock.lock();
756     try {
757       flushedStoreFiles.addAll(sfs);
758     } finally {
759       flushLock.unlock();
760     }
761     // Let the next() call handle re-creating and seeking
762   }
763
764   /**
765    * @param flushed indicates if there was a flush
766    * @return true if top of heap has changed (and KeyValueHeap has to try the
767    *         next KV)
768    * @throws IOException
769    */
770   protected boolean checkReseek(boolean flushed) throws IOException {
771     if (flushed && this.lastTop != null) {
772       resetScannerStack(this.lastTop);
773       if (this.heap.peek() == null
774           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
775         LOG.debug("Storescanner.peek() is changed where before = "
776             + this.lastTop.toString() + ",and after = " + this.heap.peek());
777         this.lastTop = null;
778         return true;
779       }
780       this.lastTop = null; // gone!
781     }
782     // else dont need to reseek
783     return false;
784   }
785
786   protected void resetScannerStack(Cell lastTopKey) throws IOException {
787     /* When we have the scan object, should we not pass it to getScanners()
788      * to get a limited set of scanners? We did so in the constructor and we
789      * could have done it now by storing the scan object from the constructor
790      */
791
792     final boolean isCompaction = false;
793     boolean usePread = get || scanUsePread;
794     List<KeyValueScanner> scanners = null;
795     try {
796       flushLock.lock();
797       scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
798         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
799       // Clear the current set of flushed store files so that they don't get added again
800       flushedStoreFiles.clear();
801     } finally {
802       flushLock.unlock();
803     }
804
805     // Seek the new scanners to the last key
806     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
807     // remove the older memstore scanner
808     for (int i = 0; i < currentScanners.size(); i++) {
809       if (!currentScanners.get(i).isFileScanner()) {
810         currentScanners.remove(i);
811         break;
812       }
813     }
814     // add the newly created scanners on the flushed files and the current active memstore scanner
815     addCurrentScanners(scanners);
816     // Combine all seeked scanners with a heap
817     resetKVHeap(this.currentScanners, store.getComparator());
818     // Reset the state of the Query Matcher and set to top row.
819     // Only reset and call setRow if the row changes; avoids confusing the
820     // query matcher if scanning intra-row.
821     Cell cell = heap.peek();
822     if (cell == null) {
823       cell = lastTopKey;
824     }
825     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
826       this.countPerRow = 0;
827       // The setToNewRow will call reset internally
828       matcher.setToNewRow(cell);
829     }
830   }
831
832   /**
833    * Check whether scan as expected order
834    * @param prevKV
835    * @param kv
836    * @param comparator
837    * @throws IOException
838    */
839   protected void checkScanOrder(Cell prevKV, Cell kv,
840       CellComparator comparator) throws IOException {
841     // Check that the heap gives us KVs in an increasing order.
842     assert prevKV == null || comparator == null
843         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
844         + " followed by a " + "smaller key " + kv + " in cf " + store;
845   }
846
847   protected boolean seekToNextRow(Cell c) throws IOException {
848     return reseek(CellUtil.createLastOnRow(c));
849   }
850
851   /**
852    * Do a reseek in a normal StoreScanner(scan forward)
853    * @param kv
854    * @return true if scanner has values left, false if end of scanner
855    * @throws IOException
856    */
857   protected boolean seekAsDirection(Cell kv)
858       throws IOException {
859     return reseek(kv);
860   }
861
862   @Override
863   public boolean reseek(Cell kv) throws IOException {
864     boolean flushed = checkFlushed();
865     // Heap will not be null, if this is called from next() which.
866     // If called from RegionScanner.reseek(...) make sure the scanner
867     // stack is reset if needed.
868     checkReseek(flushed);
869     if (explicitColumnQuery && lazySeekEnabledGlobally) {
870       return heap.requestSeek(kv, true, useRowColBloom);
871     }
872     return heap.reseek(kv);
873   }
874
875   protected boolean checkFlushed() {
876     // check the var without any lock. Suppose even if we see the old
877     // value here still it is ok to continue because we will not be resetting
878     // the heap but will continue with the referenced memstore's snapshot. For compactions
879     // any way we don't need the updateReaders at all to happen as we still continue with
880     // the older files
881     if (flushed) {
882       // If there is a flush and the current scan is notified on the flush ensure that the
883       // scan's heap gets reset and we do a seek on the newly flushed file.
884       if(!this.closing) {
885         this.lastTop = this.peek();
886       } else {
887         return false;
888       }
889       // reset the flag
890       flushed = false;
891       return true;
892     }
893     return false;
894   }
895
896   /**
897    * @see KeyValueScanner#getScannerOrder()
898    */
899   @Override
900   public long getScannerOrder() {
901     return 0;
902   }
903
904   /**
905    * Seek storefiles in parallel to optimize IO latency as much as possible
906    * @param scanners the list {@link KeyValueScanner}s to be read from
907    * @param kv the KeyValue on which the operation is being requested
908    * @throws IOException
909    */
910   private void parallelSeek(final List<? extends KeyValueScanner>
911       scanners, final Cell kv) throws IOException {
912     if (scanners.isEmpty()) return;
913     int storeFileScannerCount = scanners.size();
914     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
915     List<ParallelSeekHandler> handlers =
916         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
917     for (KeyValueScanner scanner : scanners) {
918       if (scanner instanceof StoreFileScanner) {
919         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
920           this.readPt, latch);
921         executor.submit(seekHandler);
922         handlers.add(seekHandler);
923       } else {
924         scanner.seek(kv);
925         latch.countDown();
926       }
927     }
928
929     try {
930       latch.await();
931     } catch (InterruptedException ie) {
932       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
933     }
934
935     for (ParallelSeekHandler handler : handlers) {
936       if (handler.getErr() != null) {
937         throw new IOException(handler.getErr());
938       }
939     }
940   }
941
942   /**
943    * Used in testing.
944    * @return all scanners in no particular order
945    */
946   List<KeyValueScanner> getAllScannersForTesting() {
947     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
948     KeyValueScanner current = heap.getCurrentForTesting();
949     if (current != null)
950       allScanners.add(current);
951     for (KeyValueScanner scanner : heap.getHeap())
952       allScanners.add(scanner);
953     return allScanners;
954   }
955
956   static void enableLazySeekGlobally(boolean enable) {
957     lazySeekEnabledGlobally = enable;
958   }
959
960   /**
961    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
962    */
963   public long getEstimatedNumberOfKvsScanned() {
964     return this.kvsScanned;
965   }
966
967   @Override
968   public Cell getNextIndexedKey() {
969     return this.heap.getNextIndexedKey();
970   }
971
972   @Override
973   public void shipped() throws IOException {
974     for (KeyValueHeap h : this.heapsForDelayedClose) {
975       h.close();// There wont be further fetch of Cells from these scanners. Just close.
976     }
977     this.heapsForDelayedClose.clear();
978     if (this.heap != null) {
979       this.heap.shipped();
980     }
981   }
982 }
983