View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import com.google.common.annotations.VisibleForTesting;
23
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.util.ArrayList;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.NavigableSet;
30  import java.util.Set;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.locks.ReentrantLock;
33
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellComparator;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.DoNotRetryIOException;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.client.IsolationLevel;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.executor.ExecutorService;
46  import org.apache.hadoop.hbase.filter.Filter;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
48  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
49  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
50  import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
51  import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
52  import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
53  import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
54  import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
55  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56
57  /**
58   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
59   * into List<KeyValue> for a single row.
60   */
61  @InterfaceAudience.Private
62  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
63      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
64    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
65    // In unit tests, the store could be null
66    protected final Store store;
67    protected ScanQueryMatcher matcher;
68    protected KeyValueHeap heap;
69    protected boolean cacheBlocks;
70  
71    protected long countPerRow = 0;
72    protected int storeLimit = -1;
73    protected int storeOffset = 0;
74
75    // Used to indicate that the scanner has closed (see HBASE-1107)
76    // Doesnt need to be volatile because it's always accessed via synchronized methods
77    protected boolean closing = false;
78    protected final boolean get;
79    protected final boolean explicitColumnQuery;
80    protected final boolean useRowColBloom;
81    /**
82     * A flag that enables StoreFileScanner parallel-seeking
83     */
84    protected boolean parallelSeekEnabled = false;
85    protected ExecutorService executor;
86    protected final Scan scan;
87    protected final NavigableSet<byte[]> columns;
88    protected final long oldestUnexpiredTS;
89    protected final long now;
90    protected final int minVersions;
91    protected final long maxRowSize;
92    protected final long cellsPerHeartbeatCheck;
93  
94    // Collects all the KVHeap that are eagerly getting closed during the
95    // course of a scan
96    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
97
98    /**
99     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
100    * KVs skipped via seeking to next row/column. TODO: estimate them?
101    */
102   private long kvsScanned = 0;
103   private Cell prevCell = null;
104
105   /** We don't ever expect to change this, the constant is just for clarity. */
106   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
107   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
108       "hbase.storescanner.parallel.seek.enable";
109 
110   /** Used during unit testing to ensure that lazy seek does save seek ops */
111   protected static boolean lazySeekEnabledGlobally =
112       LAZY_SEEK_ENABLED_BY_DEFAULT;
113
114   /**
115    * The number of cells scanned in between timeout checks. Specifying a larger value means that
116    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
117    * timeout checks.
118    */
119   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
120       "hbase.cells.scanned.per.heartbeat.check";
121
122   /**
123    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
124    */
125   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
126
127   // if heap == null and lastTop != null, you need to reseek given the key below
128   protected Cell lastTop = null;
129
130   // A flag whether use pread for scan
131   private boolean scanUsePread = false;
132   // Indicates whether there was flush during the course of the scan
133   protected volatile boolean flushed = false;
134   // generally we get one file from a flush
135   protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
136   // The current list of scanners
137   protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
138   // flush update lock
139   private ReentrantLock flushLock = new ReentrantLock();
140
141   protected final long readPt;
142
143   // used by the injection framework to test race between StoreScanner construction and compaction
144   enum StoreScannerCompactionRace {
145     BEFORE_SEEK,
146     AFTER_SEEK,
147     COMPACT_COMPLETE
148   }
149
150   /** An internal constructor. */
151   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
152       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
153     this.readPt = readPt;
154     this.store = store;
155     this.cacheBlocks = cacheBlocks;
156     get = scan.isGetScan();
157     int numCol = columns == null ? 0 : columns.size();
158     explicitColumnQuery = numCol > 0;
159     this.scan = scan;
160     this.columns = columns;
161     this.now = EnvironmentEdgeManager.currentTime();
162     this.oldestUnexpiredTS = now - scanInfo.getTtl();
163     this.minVersions = scanInfo.getMinVersions();
164
165      // We look up row-column Bloom filters for multi-column queries as part of
166      // the seek operation. However, we also look the row-column Bloom filter
167      // for multi-row (non-"get") scans because this is not done in
168      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
169      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
170
171      this.maxRowSize = scanInfo.getTableMaxRowSize();
172      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
173      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
174      // Parallel seeking is on if the config allows and more there is more than one store file.
175      if (this.store != null && this.store.getStorefilesCount() > 1) {
176        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
177        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
178          this.parallelSeekEnabled = true;
179          this.executor = rsService.getExecutorService();
180        }
181      }
182   }
183
184   protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
185     this.currentScanners.addAll(scanners);
186   }
187
188   /**
189    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
190    * are not in a compaction.
191    *
192    * @param store who we scan
193    * @param scan the spec
194    * @param columns which columns we are scanning
195    * @throws IOException
196    */
197   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
198       long readPt)
199   throws IOException {
200     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
201     if (columns != null && scan.isRaw()) {
202       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
203     }
204     matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
205       store.getCoprocessorHost());
206
207     this.store.addChangedReaderObserver(this);
208
209     try {
210       // Pass columns to try to filter out unnecessary StoreFiles.
211       List<KeyValueScanner> scanners = getScannersNoCompaction();
212
213       // Seek all scanners to the start of the Row (or if the exact matching row
214       // key does not exist, then to the start of the next matching Row).
215       // Always check bloom filter to optimize the top row seek for delete
216       // family marker.
217       seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
218         parallelSeekEnabled);
219
220       // set storeLimit
221       this.storeLimit = scan.getMaxResultsPerColumnFamily();
222
223       // set rowOffset
224       this.storeOffset = scan.getRowOffsetPerColumnFamily();
225       addCurrentScanners(scanners);
226       // Combine all seeked scanners with a heap
227       resetKVHeap(scanners, store.getComparator());
228     } catch (IOException e) {
229       // remove us from the HStore#changedReaderObservers here or we'll have no chance to
230       // and might cause memory leak
231       this.store.deleteChangedReaderObserver(this);
232       throw e;
233     }
234   }
235
236   /**
237    * Used for compactions.<p>
238    *
239    * Opens a scanner across specified StoreFiles.
240    * @param store who we scan
241    * @param scan the spec
242    * @param scanners ancillary scanners
243    * @param smallestReadPoint the readPoint that we should use for tracking
244    *          versions
245    */
246   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
247       List<? extends KeyValueScanner> scanners, ScanType scanType,
248       long smallestReadPoint, long earliestPutTs) throws IOException {
249     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
250   }
251
252   /**
253    * Used for compactions that drop deletes from a limited range of rows.<p>
254    *
255    * Opens a scanner across specified StoreFiles.
256    * @param store who we scan
257    * @param scan the spec
258    * @param scanners ancillary scanners
259    * @param smallestReadPoint the readPoint that we should use for tracking versions
260    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
261    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
262    */
263   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
264       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
265       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
266     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
267         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
268   }
269
270   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
271       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
272       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
273     this(store, scan, scanInfo, null,
274         ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
275     if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
276         || (scan.getStopRow() != null && scan.getStopRow().length > 0)
277         || !scan.getTimeRange().isAllTime()) {
278       // use legacy query matcher since we do not consider the scan object in our code. Only used to
279       // keep compatibility for coprocessor.
280       matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, smallestReadPoint,
281         earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow,
282         store.getCoprocessorHost());
283     } else {
284       matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint,
285         earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow,
286         store.getCoprocessorHost());
287     }
288
289     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
290     scanners = selectScannersFrom(scanners);
291
292     // Seek all scanners to the initial key
293     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
294     addCurrentScanners(scanners);
295     // Combine all seeked scanners with a heap
296     resetKVHeap(scanners, store.getComparator());
297   }
298
299   @VisibleForTesting
300   StoreScanner(final Scan scan, ScanInfo scanInfo,
301       ScanType scanType, final NavigableSet<byte[]> columns,
302       final List<KeyValueScanner> scanners) throws IOException {
303     this(scan, scanInfo, scanType, columns, scanners,
304         HConstants.LATEST_TIMESTAMP,
305         // 0 is passed as readpoint because the test bypasses Store
306         0);
307   }
308
309   @VisibleForTesting
310   StoreScanner(final Scan scan, ScanInfo scanInfo,
311     ScanType scanType, final NavigableSet<byte[]> columns,
312     final List<KeyValueScanner> scanners, long earliestPutTs)
313         throws IOException {
314     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
315       // 0 is passed as readpoint because the test bypasses Store
316       0);
317   }
318
319   public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
320       final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs,
321       long readPt) throws IOException {
322     this(null, scan, scanInfo, columns, readPt,
323         scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false);
324     if (scanType == ScanType.USER_SCAN) {
325       this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
326         null);
327     } else {
328       if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
329           || (scan.getStopRow() != null && scan.getStopRow().length > 0)
330           || !scan.getTimeRange().isAllTime() || columns != null) {
331         // use legacy query matcher since we do not consider the scan object in our code. Only used
332         // to keep compatibility for coprocessor.
333         matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE,
334           earliestPutTs, oldestUnexpiredTS, now, null, null, store.getCoprocessorHost());
335       } else {
336         this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
337           earliestPutTs, oldestUnexpiredTS, now, null, null, null);
338       }
339     }
340
341     // Seek all scanners to the initial key
342     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
343     addCurrentScanners(scanners);
344     resetKVHeap(scanners, scanInfo.getComparator());
345   }
346
347   /**
348    * Get a filtered list of scanners. Assumes we are not in a compaction.
349    * @return list of scanners to seek
350    */
351   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
352     final boolean isCompaction = false;
353     boolean usePread = get || scanUsePread;
354     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
355         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
356   }
357
358   /**
359    * Seek the specified scanners with the given key
360    * @param scanners
361    * @param seekKey
362    * @param isLazy true if using lazy seek
363    * @param isParallelSeek true if using parallel seek
364    * @throws IOException
365    */
366   protected void seekScanners(List<? extends KeyValueScanner> scanners,
367       Cell seekKey, boolean isLazy, boolean isParallelSeek)
368       throws IOException {
369     // Seek all scanners to the start of the Row (or if the exact matching row
370     // key does not exist, then to the start of the next matching Row).
371     // Always check bloom filter to optimize the top row seek for delete
372     // family marker.
373     if (isLazy) {
374       for (KeyValueScanner scanner : scanners) {
375         scanner.requestSeek(seekKey, false, true);
376       }
377     } else {
378       if (!isParallelSeek) {
379         long totalScannersSoughtBytes = 0;
380         for (KeyValueScanner scanner : scanners) {
381           if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
382             throw new RowTooBigException("Max row size allowed: " + maxRowSize
383               + ", but row is bigger than that");
384           }
385           scanner.seek(seekKey);
386           Cell c = scanner.peek();
387           if (c != null) {
388             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
389           }
390         }
391       } else {
392         parallelSeek(scanners, seekKey);
393       }
394     }
395   }
396
397   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
398       CellComparator comparator) throws IOException {
399     // Combine all seeked scanners with a heap
400     heap = new KeyValueHeap(scanners, comparator);
401   }
402
403   /**
404    * Filters the given list of scanners using Bloom filter, time range, and
405    * TTL.
406    */
407   protected List<KeyValueScanner> selectScannersFrom(
408       final List<? extends KeyValueScanner> allScanners) {
409     boolean memOnly;
410     boolean filesOnly;
411     if (scan instanceof InternalScan) {
412       InternalScan iscan = (InternalScan)scan;
413       memOnly = iscan.isCheckOnlyMemStore();
414       filesOnly = iscan.isCheckOnlyStoreFiles();
415     } else {
416       memOnly = false;
417       filesOnly = false;
418     }
419
420     List<KeyValueScanner> scanners =
421         new ArrayList<KeyValueScanner>(allScanners.size());
422
423     // We can only exclude store files based on TTL if minVersions is set to 0.
424     // Otherwise, we might have to return KVs that have technically expired.
425     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS: Long.MIN_VALUE;
426
427     // include only those scan files which pass all filters
428     for (KeyValueScanner kvs : allScanners) {
429       boolean isFile = kvs.isFileScanner();
430       if ((!isFile && filesOnly) || (isFile && memOnly)) {
431         continue;
432       }
433
434       if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
435         scanners.add(kvs);
436       } else {
437         kvs.close();
438       }
439     }
440     return scanners;
441   }
442
443   @Override
444   public Cell peek() {
445     if (this.heap == null) {
446       return this.lastTop;
447     }
448     return this.heap.peek();
449   }
450
451   @Override
452   public KeyValue next() {
453     // throw runtime exception perhaps?
454     throw new RuntimeException("Never call StoreScanner.next()");
455   }
456
457   @Override
458   public void close() {
459     close(true);
460   }
461
462   private void close(boolean withHeapClose) {
463     if (this.closing) {
464       return;
465     }
466     if (withHeapClose) this.closing = true;
467     // Under test, we dont have a this.store
468     if (this.store != null) this.store.deleteChangedReaderObserver(this);
469     if (withHeapClose) {
470       for (KeyValueHeap h : this.heapsForDelayedClose) {
471         h.close();
472       }
473       this.heapsForDelayedClose.clear();
474       if (this.heap != null) {
475         this.heap.close();
476         this.currentScanners.clear();
477         this.heap = null; // CLOSED!
478       }
479     } else {
480       if (this.heap != null) {
481         this.heapsForDelayedClose.add(this.heap);
482         this.currentScanners.clear();
483         this.heap = null;
484       }
485     }
486     this.lastTop = null; // If both are null, we are closed.
487   }
488
489   @Override
490   public boolean seek(Cell key) throws IOException {
491     boolean flushed = checkFlushed();
492     // reset matcher state, in case that underlying store changed
493     checkReseek(flushed);
494     return this.heap.seek(key);
495   }
496
497   @Override
498   public boolean next(List<Cell> outResult) throws IOException {
499     return next(outResult, NoLimitScannerContext.getInstance());
500   }
501
502   /**
503    * Get the next row of values from this Store.
504    * @param outResult
505    * @param scannerContext
506    * @return true if there are more rows, false if scanner is done
507    */
508   @Override
509   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
510     if (scannerContext == null) {
511       throw new IllegalArgumentException("Scanner context cannot be null");
512     }
513     boolean flushed = checkFlushed();
514     if (checkReseek(flushed)) {
515       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
516     }
517
518     // if the heap was left null, then the scanners had previously run out anyways, close and
519     // return.
520     if (this.heap == null) {
521       // By this time partial close should happened because already heap is null
522       close(false);// Do all cleanup except heap.close()
523       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
524     }
525
526     Cell cell = this.heap.peek();
527     if (cell == null) {
528       close(false);// Do all cleanup except heap.close()
529       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
530     }
531
532     // only call setRow if the row changes; avoids confusing the query matcher
533     // if scanning intra-row
534
535     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
536     // rows. Else it is possible we are still traversing the same row so we must perform the row
537     // comparison.
538     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
539       this.countPerRow = 0;
540       matcher.setToNewRow(cell);
541     }
542
543     // Clear progress away unless invoker has indicated it should be kept.
544     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
545
546     // Only do a sanity-check if store and comparator are available.
547     CellComparator comparator = store != null ? store.getComparator() : null;
548
549     int count = 0;
550     long totalBytesRead = 0;
551
552     LOOP: do {
553       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
554       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
555         scannerContext.updateTimeProgress();
556         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
557           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
558         }
559       }
560
561       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
562       checkScanOrder(prevCell, cell, comparator);
563       prevCell = cell;
564       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
565       qcode = optimize(qcode, cell);
566       switch (qcode) {
567         case INCLUDE:
568         case INCLUDE_AND_SEEK_NEXT_ROW:
569         case INCLUDE_AND_SEEK_NEXT_COL:
570
571           Filter f = matcher.getFilter();
572           if (f != null) {
573             cell = f.transformCell(cell);
574           }
575
576           this.countPerRow++;
577           if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
578             // do what SEEK_NEXT_ROW does.
579             if (!matcher.moreRowsMayExistAfter(cell)) {
580               close(false);// Do all cleanup except heap.close()
581               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
582             }
583             matcher.clearCurrentRow();
584             seekToNextRow(cell);
585             break LOOP;
586           }
587
588           // add to results only if we have skipped #storeOffset kvs
589           // also update metric accordingly
590           if (this.countPerRow > storeOffset) {
591             outResult.add(cell);
592
593             // Update local tracking information
594             count++;
595             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
596
597             // Update the progress of the scanner context
598             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
599             scannerContext.incrementBatchProgress(1);
600
601             if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
602               throw new RowTooBigException(
603                   "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
604             }
605           }
606
607           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
608             if (!matcher.moreRowsMayExistAfter(cell)) {
609               close(false);// Do all cleanup except heap.close()
610               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
611             }
612             matcher.clearCurrentRow();
613             seekToNextRow(cell);
614           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
615             seekAsDirection(matcher.getKeyForNextColumn(cell));
616           } else {
617             this.heap.next();
618           }
619
620           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
621             break LOOP;
622           }
623           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
624             break LOOP;
625           }
626           continue;
627
628         case DONE:
629           // Optimization for Gets! If DONE, no more to get on this row, early exit!
630           if (this.scan.isGetScan()) {
631             // Then no more to this row... exit.
632             close(false);// Do all cleanup except heap.close()
633             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
634           }
635           matcher.clearCurrentRow();
636           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
637
638         case DONE_SCAN:
639           close(false);// Do all cleanup except heap.close()
640           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
641
642         case SEEK_NEXT_ROW:
643           // This is just a relatively simple end of scan fix, to short-cut end
644           // us if there is an endKey in the scan.
645           if (!matcher.moreRowsMayExistAfter(cell)) {
646             close(false);// Do all cleanup except heap.close()
647             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
648           }
649           matcher.clearCurrentRow();
650           seekToNextRow(cell);
651           break;
652
653         case SEEK_NEXT_COL:
654           seekAsDirection(matcher.getKeyForNextColumn(cell));
655           break;
656
657         case SKIP:
658           this.heap.next();
659           break;
660
661         case SEEK_NEXT_USING_HINT:
662           Cell nextKV = matcher.getNextKeyHint(cell);
663           if (nextKV != null) {
664             seekAsDirection(nextKV);
665           } else {
666             heap.next();
667           }
668           break;
669
670         default:
671           throw new RuntimeException("UNEXPECTED");
672       }
673     } while ((cell = this.heap.peek()) != null);
674
675     if (count > 0) {
676       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
677     }
678
679     // No more keys
680     close(false);// Do all cleanup except heap.close()
681     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
682   }
683
684   /**
685    * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
686    * This method works together with ColumnTrackers and Filters. ColumnTrackers may issue SEEK
687    * hints, such as seek to next column, next row, or seek to an arbitrary seek key.
688    * This method intercepts these qcodes and decides whether a seek is the most efficient _actual_
689    * way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
690    * current, loaded block).
691    * It does this by looking at the next indexed key of the current HFile. This key
692    * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
693    * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
694    * the current Cell but compare as though it were a seek key; see down in
695    * matcher.compareKeyForNextRow, etc). If the compare gets us onto the
696    * next block we *_SEEK, otherwise we just INCLUDE or SKIP, and let the ColumnTrackers or Filters
697    * go through the next Cell, and so on)
698    *
699    * <p>The ColumnTrackers and Filters must behave correctly in all cases, i.e. if they are past the
700    * Cells they care about they must issues a SKIP or SEEK.
701    *
702    * <p>Other notes:
703    * <ul>
704    * <li>Rows can straddle block boundaries</li>
705    * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
706    * different block than column C1 at T2)</li>
707    * <li>We want to SKIP and INCLUDE if the chance is high that we'll find the desired Cell after a
708    * few SKIPs...</li>
709    * <li>We want to INCLUDE_AND_SEEK and SEEK when the chance is high that we'll be able to seek
710    * past many Cells, especially if we know we need to go to the next block.</li>
711    * </ul>
712    * <p>A good proxy (best effort) to determine whether INCLUDE/SKIP is better than SEEK is whether
713    * we'll likely end up seeking to the next block (or past the next block) to get our next column.
714    * Example:
715    * <pre>
716    * |    BLOCK 1              |     BLOCK 2                   |
717    * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
718    *                                   ^         ^
719    *                                   |         |
720    *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
721    *
722    *
723    * |    BLOCK 1                       |     BLOCK 2                      |
724    * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
725    *                                            ^              ^
726    *                                            |              |
727    *                                    Next Index Key        SEEK_NEXT_COL
728    * </pre>
729    * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
730    * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
731    * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
732    * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
733    * where the SEEK will not land us in the next block, it is very likely better to issues a series
734    * of SKIPs.
735    */
736   @VisibleForTesting
737   protected ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
738     switch(qcode) {
739     case INCLUDE_AND_SEEK_NEXT_COL:
740     case SEEK_NEXT_COL:
741     {
742       Cell nextIndexedKey = getNextIndexedKey();
743       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
744           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
745         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
746       }
747       break;
748     }
749     case INCLUDE_AND_SEEK_NEXT_ROW:
750     case SEEK_NEXT_ROW:
751     {
752       // If it is a Get Scan, then we know that we are done with this row; there are no more
753       // rows beyond the current one: don't try to optimize. We are DONE. Return the *_NEXT_ROW
754       // qcode as is. When the caller gets these flags on a Get Scan, it knows it can shut down the
755       // Scan.
756       if (!this.scan.isGetScan()) {
757         Cell nextIndexedKey = getNextIndexedKey();
758         if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
759             && matcher.compareKeyForNextRow(nextIndexedKey, cell) > 0) {
760           return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
761         }
762       }
763       break;
764     }
765     default:
766       break;
767     }
768     return qcode;
769   }
770
771   // Implementation of ChangedReadersObserver
772   @Override
773   public void updateReaders(List<StoreFile> sfs) throws IOException {
774     flushed = true;
775     flushLock.lock();
776     try {
777       flushedStoreFiles.addAll(sfs);
778     } finally {
779       flushLock.unlock();
780     }
781     // Let the next() call handle re-creating and seeking
782   }
783
784   /**
785    * @param flushed indicates if there was a flush
786    * @return true if top of heap has changed (and KeyValueHeap has to try the
787    *         next KV)
788    * @throws IOException
789    */
790   protected boolean checkReseek(boolean flushed) throws IOException {
791     if (flushed && this.lastTop != null) {
792       resetScannerStack(this.lastTop);
793       if (this.heap.peek() == null
794           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
795         LOG.debug("Storescanner.peek() is changed where before = "
796             + this.lastTop.toString() + ",and after = " + this.heap.peek());
797         this.lastTop = null;
798         return true;
799       }
800       this.lastTop = null; // gone!
801     }
802     // else dont need to reseek
803     return false;
804   }
805
806   protected void resetScannerStack(Cell lastTopKey) throws IOException {
807     /* When we have the scan object, should we not pass it to getScanners()
808      * to get a limited set of scanners? We did so in the constructor and we
809      * could have done it now by storing the scan object from the constructor
810      */
811
812     final boolean isCompaction = false;
813     boolean usePread = get || scanUsePread;
814     List<KeyValueScanner> scanners = null;
815     try {
816       flushLock.lock();
817       scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
818         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
819       // Clear the current set of flushed store files so that they don't get added again
820       flushedStoreFiles.clear();
821     } finally {
822       flushLock.unlock();
823     }
824
825     // Seek the new scanners to the last key
826     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
827     // remove the older memstore scanner
828     for (int i = 0; i < currentScanners.size(); i++) {
829       if (!currentScanners.get(i).isFileScanner()) {
830         currentScanners.remove(i);
831         break;
832       }
833     }
834     // add the newly created scanners on the flushed files and the current active memstore scanner
835     addCurrentScanners(scanners);
836     // Combine all seeked scanners with a heap
837     resetKVHeap(this.currentScanners, store.getComparator());
838     // Reset the state of the Query Matcher and set to top row.
839     // Only reset and call setRow if the row changes; avoids confusing the
840     // query matcher if scanning intra-row.
841     Cell cell = heap.peek();
842     if (cell == null) {
843       cell = lastTopKey;
844     }
845     if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
846       this.countPerRow = 0;
847       // The setToNewRow will call reset internally
848       matcher.setToNewRow(cell);
849     }
850   }
851
852   /**
853    * Check whether scan as expected order
854    * @param prevKV
855    * @param kv
856    * @param comparator
857    * @throws IOException
858    */
859   protected void checkScanOrder(Cell prevKV, Cell kv,
860       CellComparator comparator) throws IOException {
861     // Check that the heap gives us KVs in an increasing order.
862     assert prevKV == null || comparator == null
863         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
864         + " followed by a " + "smaller key " + kv + " in cf " + store;
865   }
866
867   protected boolean seekToNextRow(Cell c) throws IOException {
868     return reseek(CellUtil.createLastOnRow(c));
869   }
870
871   /**
872    * Do a reseek in a normal StoreScanner(scan forward)
873    * @param kv
874    * @return true if scanner has values left, false if end of scanner
875    * @throws IOException
876    */
877   protected boolean seekAsDirection(Cell kv)
878       throws IOException {
879     return reseek(kv);
880   }
881
882   @Override
883   public boolean reseek(Cell kv) throws IOException {
884     boolean flushed = checkFlushed();
885     // Heap will not be null, if this is called from next() which.
886     // If called from RegionScanner.reseek(...) make sure the scanner
887     // stack is reset if needed.
888     checkReseek(flushed);
889     if (explicitColumnQuery && lazySeekEnabledGlobally) {
890       return heap.requestSeek(kv, true, useRowColBloom);
891     }
892     return heap.reseek(kv);
893   }
894
895   protected boolean checkFlushed() {
896     // check the var without any lock. Suppose even if we see the old
897     // value here still it is ok to continue because we will not be resetting
898     // the heap but will continue with the referenced memstore's snapshot. For compactions
899     // any way we don't need the updateReaders at all to happen as we still continue with
900     // the older files
901     if (flushed) {
902       // If there is a flush and the current scan is notified on the flush ensure that the
903       // scan's heap gets reset and we do a seek on the newly flushed file.
904       if(!this.closing) {
905         this.lastTop = this.peek();
906       } else {
907         return false;
908       }
909       // reset the flag
910       flushed = false;
911       return true;
912     }
913     return false;
914   }
915
916   /**
917    * @see KeyValueScanner#getScannerOrder()
918    */
919   @Override
920   public long getScannerOrder() {
921     return 0;
922   }
923
924   /**
925    * Seek storefiles in parallel to optimize IO latency as much as possible
926    * @param scanners the list {@link KeyValueScanner}s to be read from
927    * @param kv the KeyValue on which the operation is being requested
928    * @throws IOException
929    */
930   private void parallelSeek(final List<? extends KeyValueScanner>
931       scanners, final Cell kv) throws IOException {
932     if (scanners.isEmpty()) return;
933     int storeFileScannerCount = scanners.size();
934     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
935     List<ParallelSeekHandler> handlers =
936         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
937     for (KeyValueScanner scanner : scanners) {
938       if (scanner instanceof StoreFileScanner) {
939         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
940           this.readPt, latch);
941         executor.submit(seekHandler);
942         handlers.add(seekHandler);
943       } else {
944         scanner.seek(kv);
945         latch.countDown();
946       }
947     }
948
949     try {
950       latch.await();
951     } catch (InterruptedException ie) {
952       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
953     }
954
955     for (ParallelSeekHandler handler : handlers) {
956       if (handler.getErr() != null) {
957         throw new IOException(handler.getErr());
958       }
959     }
960   }
961
962   /**
963    * Used in testing.
964    * @return all scanners in no particular order
965    */
966   List<KeyValueScanner> getAllScannersForTesting() {
967     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
968     KeyValueScanner current = heap.getCurrentForTesting();
969     if (current != null)
970       allScanners.add(current);
971     for (KeyValueScanner scanner : heap.getHeap())
972       allScanners.add(scanner);
973     return allScanners;
974   }
975
976   static void enableLazySeekGlobally(boolean enable) {
977     lazySeekEnabledGlobally = enable;
978   }
979
980   /**
981    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
982    */
983   public long getEstimatedNumberOfKvsScanned() {
984     return this.kvsScanned;
985   }
986
987   @Override
988   public Cell getNextIndexedKey() {
989     return this.heap.getNextIndexedKey();
990   }
991
992   @Override
993   public void shipped() throws IOException {
994     for (KeyValueHeap h : this.heapsForDelayedClose) {
995       h.close();// There wont be further fetch of Cells from these scanners. Just close.
996     }
997     this.heapsForDelayedClose.clear();
998     if (this.heap != null) {
999       this.heap.shipped();
1000     }
1001   }
1002 }
1003