View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.KeyValue.KVComparator;
39  import org.apache.hadoop.hbase.KeyValueUtil;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  
52  /**
53   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
54   * into List<KeyValue> for a single row.
55   */
56  @InterfaceAudience.Private
57  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
58      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
59    static final Log LOG = LogFactory.getLog(StoreScanner.class);
60    protected Store store;
61    protected ScanQueryMatcher matcher;
62    protected KeyValueHeap heap;
63    protected boolean cacheBlocks;
64  
65    protected int countPerRow = 0;
66    protected int storeLimit = -1;
67    protected int storeOffset = 0;
68  
69    // Used to indicate that the scanner has closed (see HBASE-1107)
70    // Doesnt need to be volatile because it's always accessed via synchronized methods
71    protected boolean closing = false;
72    protected final boolean isGet;
73    protected final boolean explicitColumnQuery;
74    protected final boolean useRowColBloom;
75    /**
76     * A flag that enables StoreFileScanner parallel-seeking
77     */
78    protected boolean isParallelSeekEnabled = false;
79    protected ExecutorService executor;
80    protected final Scan scan;
81    protected final NavigableSet<byte[]> columns;
82    protected final long oldestUnexpiredTS;
83    protected final long now;
84    protected final int minVersions;
85    protected final long maxRowSize;
86    protected final long cellsPerHeartbeatCheck;
87  
88    /**
89     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
90     * KVs skipped via seeking to next row/column. TODO: estimate them?
91     */
92    private long kvsScanned = 0;
93    private Cell prevCell = null;
94  
95    /** We don't ever expect to change this, the constant is just for clarity. */
96    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
97    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
98        "hbase.storescanner.parallel.seek.enable";
99  
100   /** Used during unit testing to ensure that lazy seek does save seek ops */
101   protected static boolean lazySeekEnabledGlobally =
102       LAZY_SEEK_ENABLED_BY_DEFAULT;
103 
104   /**
105    * The number of cells scanned in between timeout checks. Specifying a larger value means that
106    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
107    * timeout checks.
108    */
109   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
110       "hbase.cells.scanned.per.heartbeat.check";
111 
112   /**
113    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
114    */
115   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
116 
117   // if heap == null and lastTop != null, you need to reseek given the key below
118   protected Cell lastTop = null;
119 
120   // A flag whether use pread for scan
121   private boolean scanUsePread = false;
122   protected ReentrantLock lock = new ReentrantLock();
123   
124   private final long readPt;
125 
126   // used by the injection framework to test race between StoreScanner construction and compaction
127   enum StoreScannerCompactionRace {
128     BEFORE_SEEK,
129     AFTER_SEEK,
130     COMPACT_COMPLETE
131   }
132   
133   /** An internal constructor. */
134   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
135       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
136     this.readPt = readPt;
137     this.store = store;
138     this.cacheBlocks = cacheBlocks;
139     isGet = scan.isGetScan();
140     int numCol = columns == null ? 0 : columns.size();
141     explicitColumnQuery = numCol > 0;
142     this.scan = scan;
143     this.columns = columns;
144     this.now = EnvironmentEdgeManager.currentTime();
145     this.oldestUnexpiredTS = now - ttl;
146     this.minVersions = minVersions;
147 
148     if (store != null && ((HStore)store).getHRegion() != null
149         && ((HStore)store).getHRegion().getBaseConf() != null) {
150       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
151       this.maxRowSize =
152           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
153       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
154 
155       long tmpCellsPerTimeoutCheck =
156           conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
157             DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
158       this.cellsPerHeartbeatCheck =
159           tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
160               : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
161     } else {
162       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
163       this.scanUsePread = scan.isSmall();
164       this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
165     }
166 
167     // We look up row-column Bloom filters for multi-column queries as part of
168     // the seek operation. However, we also look the row-column Bloom filter
169     // for multi-row (non-"get") scans because this is not done in
170     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
171     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
172 
173     // The parallel-seeking is on :
174     // 1) the config value is *true*
175     // 2) store has more than one store file
176     if (store != null && ((HStore)store).getHRegion() != null
177         && store.getStorefilesCount() > 1) {
178       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
179       if (rsService == null || !rsService.getConfiguration().getBoolean(
180             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
181       isParallelSeekEnabled = true;
182       executor = rsService.getExecutorService();
183     }
184   }
185 
186   /**
187    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
188    * are not in a compaction.
189    *
190    * @param store who we scan
191    * @param scan the spec
192    * @param columns which columns we are scanning
193    * @throws IOException
194    */
195   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
196       long readPt)
197                               throws IOException {
198     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
199         scanInfo.getMinVersions(), readPt);
200     if (columns != null && scan.isRaw()) {
201       throw new DoNotRetryIOException(
202           "Cannot specify any column for a raw scan");
203     }
204     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
205         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
206         oldestUnexpiredTS, now, store.getCoprocessorHost());
207 
208     this.store.addChangedReaderObserver(this);
209 
210     // Pass columns to try to filter out unnecessary StoreFiles.
211     List<KeyValueScanner> scanners = getScannersNoCompaction();
212 
213     // Seek all scanners to the start of the Row (or if the exact matching row
214     // key does not exist, then to the start of the next matching Row).
215     // Always check bloom filter to optimize the top row seek for delete
216     // family marker.
217     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
218         && lazySeekEnabledGlobally, isParallelSeekEnabled);
219 
220     // set storeLimit
221     this.storeLimit = scan.getMaxResultsPerColumnFamily();
222 
223     // set rowOffset
224     this.storeOffset = scan.getRowOffsetPerColumnFamily();
225 
226     // Combine all seeked scanners with a heap
227     resetKVHeap(scanners, store.getComparator());
228   }
229 
230   /**
231    * Used for compactions.<p>
232    *
233    * Opens a scanner across specified StoreFiles.
234    * @param store who we scan
235    * @param scan the spec
236    * @param scanners ancillary scanners
237    * @param smallestReadPoint the readPoint that we should use for tracking
238    *          versions
239    */
240   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
241       List<? extends KeyValueScanner> scanners, ScanType scanType,
242       long smallestReadPoint, long earliestPutTs) throws IOException {
243     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
244   }
245 
246   /**
247    * Used for compactions that drop deletes from a limited range of rows.<p>
248    *
249    * Opens a scanner across specified StoreFiles.
250    * @param store who we scan
251    * @param scan the spec
252    * @param scanners ancillary scanners
253    * @param smallestReadPoint the readPoint that we should use for tracking versions
254    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
255    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
256    */
257   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
258       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
259       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
260     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
261         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
262   }
263 
264   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
265       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
266       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
267     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
268         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
269     if (dropDeletesFromRow == null) {
270       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
271           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
272     } else {
273       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
274           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
275     }
276 
277     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
278     scanners = selectScannersFrom(scanners);
279 
280     // Seek all scanners to the initial key
281     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
282 
283     // Combine all seeked scanners with a heap
284     resetKVHeap(scanners, store.getComparator());
285   }
286 
287   /** Constructor for testing. */
288   StoreScanner(final Scan scan, ScanInfo scanInfo,
289       ScanType scanType, final NavigableSet<byte[]> columns,
290       final List<KeyValueScanner> scanners) throws IOException {
291     this(scan, scanInfo, scanType, columns, scanners,
292         HConstants.LATEST_TIMESTAMP,
293         // 0 is passed as readpoint because the test bypasses Store
294         0);
295   }
296 
297   // Constructor for testing.
298   StoreScanner(final Scan scan, ScanInfo scanInfo,
299     ScanType scanType, final NavigableSet<byte[]> columns,
300     final List<KeyValueScanner> scanners, long earliestPutTs)
301         throws IOException {
302     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
303       // 0 is passed as readpoint because the test bypasses Store
304       0);
305   }
306   
307   private StoreScanner(final Scan scan, ScanInfo scanInfo,
308       ScanType scanType, final NavigableSet<byte[]> columns,
309       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
310           throws IOException {
311     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
312         scanInfo.getMinVersions(), readPt);
313     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
314         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
315 
316     // In unit tests, the store could be null
317     if (this.store != null) {
318       this.store.addChangedReaderObserver(this);
319     }
320     // Seek all scanners to the initial key
321     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
322     resetKVHeap(scanners, scanInfo.getComparator());
323   }
324 
325   /**
326    * Get a filtered list of scanners. Assumes we are not in a compaction.
327    * @return list of scanners to seek
328    */
329   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
330     final boolean isCompaction = false;
331     boolean usePread = isGet || scanUsePread;
332     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
333         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
334   }
335 
336   /**
337    * Seek the specified scanners with the given key
338    * @param scanners
339    * @param seekKey
340    * @param isLazy true if using lazy seek
341    * @param isParallelSeek true if using parallel seek
342    * @throws IOException
343    */
344   protected void seekScanners(List<? extends KeyValueScanner> scanners,
345       Cell seekKey, boolean isLazy, boolean isParallelSeek)
346       throws IOException {
347     // Seek all scanners to the start of the Row (or if the exact matching row
348     // key does not exist, then to the start of the next matching Row).
349     // Always check bloom filter to optimize the top row seek for delete
350     // family marker.
351     if (isLazy) {
352       for (KeyValueScanner scanner : scanners) {
353         scanner.requestSeek(seekKey, false, true);
354       }
355     } else {
356       if (!isParallelSeek) {
357         long totalScannersSoughtBytes = 0;
358         for (KeyValueScanner scanner : scanners) {
359           if (totalScannersSoughtBytes >= maxRowSize) {
360             throw new RowTooBigException("Max row size allowed: " + maxRowSize
361               + ", but row is bigger than that");
362           }
363           scanner.seek(seekKey);
364           Cell c = scanner.peek();
365           if (c != null) {
366             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
367           }
368         }
369       } else {
370         parallelSeek(scanners, seekKey);
371       }
372     }
373   }
374 
375   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
376       KVComparator comparator) throws IOException {
377     // Combine all seeked scanners with a heap
378     heap = new KeyValueHeap(scanners, comparator);
379   }
380 
381   /**
382    * Filters the given list of scanners using Bloom filter, time range, and
383    * TTL.
384    */
385   protected List<KeyValueScanner> selectScannersFrom(
386       final List<? extends KeyValueScanner> allScanners) {
387     boolean memOnly;
388     boolean filesOnly;
389     if (scan instanceof InternalScan) {
390       InternalScan iscan = (InternalScan)scan;
391       memOnly = iscan.isCheckOnlyMemStore();
392       filesOnly = iscan.isCheckOnlyStoreFiles();
393     } else {
394       memOnly = false;
395       filesOnly = false;
396     }
397 
398     List<KeyValueScanner> scanners =
399         new ArrayList<KeyValueScanner>(allScanners.size());
400 
401     // We can only exclude store files based on TTL if minVersions is set to 0.
402     // Otherwise, we might have to return KVs that have technically expired.
403     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
404         Long.MIN_VALUE;
405 
406     // include only those scan files which pass all filters
407     for (KeyValueScanner kvs : allScanners) {
408       boolean isFile = kvs.isFileScanner();
409       if ((!isFile && filesOnly) || (isFile && memOnly)) {
410         continue;
411       }
412 
413       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
414         scanners.add(kvs);
415       }
416     }
417     return scanners;
418   }
419 
420   @Override
421   public Cell peek() {
422     lock.lock();
423     try {
424     if (this.heap == null) {
425       return this.lastTop;
426     }
427     return this.heap.peek();
428     } finally {
429       lock.unlock();
430     }
431   }
432 
433   @Override
434   public KeyValue next() {
435     // throw runtime exception perhaps?
436     throw new RuntimeException("Never call StoreScanner.next()");
437   }
438 
439   @Override
440   public void close() {
441     lock.lock();
442     try {
443     if (this.closing) return;
444     this.closing = true;
445     // under test, we dont have a this.store
446     if (this.store != null)
447       this.store.deleteChangedReaderObserver(this);
448     if (this.heap != null)
449       this.heap.close();
450     this.heap = null; // CLOSED!
451     this.lastTop = null; // If both are null, we are closed.
452     } finally {
453       lock.unlock();
454     }
455   }
456 
457   @Override
458   public boolean seek(Cell key) throws IOException {
459     lock.lock();
460     try {
461     // reset matcher state, in case that underlying store changed
462     checkReseek();
463     return this.heap.seek(key);
464     } finally {
465       lock.unlock();
466     }
467   }
468 
469   @Override
470   public boolean next(List<Cell> outResult) throws IOException {
471     return next(outResult, NoLimitScannerContext.getInstance());
472   }
473 
474   /**
475    * Get the next row of values from this Store.
476    * @param outResult
477    * @param scannerContext
478    * @return true if there are more rows, false if scanner is done
479    */
480   @Override
481   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
482     lock.lock();
483 
484     try {
485     if (scannerContext == null) {
486       throw new IllegalArgumentException("Scanner context cannot be null");
487     }
488     if (checkReseek()) {
489       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
490     }
491 
492     // if the heap was left null, then the scanners had previously run out anyways, close and
493     // return.
494     if (this.heap == null) {
495       close();
496       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
497     }
498 
499     Cell peeked = this.heap.peek();
500     if (peeked == null) {
501       close();
502       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
503     }
504 
505     // only call setRow if the row changes; avoids confusing the query matcher
506     // if scanning intra-row
507     byte[] row = peeked.getRowArray();
508     int offset = peeked.getRowOffset();
509     short length = peeked.getRowLength();
510 
511     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
512     // rows. Else it is possible we are still traversing the same row so we must perform the row
513     // comparison.
514     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null ||
515         !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
516       this.countPerRow = 0;
517       matcher.setRow(row, offset, length);
518     }
519 
520     // Clear progress away unless invoker has indicated it should be kept.
521     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
522     
523     Cell cell;
524 
525     // Only do a sanity-check if store and comparator are available.
526     KeyValue.KVComparator comparator =
527         store != null ? store.getComparator() : null;
528 
529     int count = 0;
530     long totalBytesRead = 0;
531 
532     LOOP: while((cell = this.heap.peek()) != null) {
533       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
534       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
535         scannerContext.updateTimeProgress();
536         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
537           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
538         }
539       }
540 
541       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
542       checkScanOrder(prevCell, cell, comparator);
543       prevCell = cell;
544 
545       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
546       qcode = optimize(qcode, cell);
547       switch(qcode) {
548         case INCLUDE:
549         case INCLUDE_AND_SEEK_NEXT_ROW:
550         case INCLUDE_AND_SEEK_NEXT_COL:
551 
552           Filter f = matcher.getFilter();
553           if (f != null) {
554             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
555             cell = f.transformCell(cell);
556           }
557 
558           this.countPerRow++;
559           if (storeLimit > -1 &&
560               this.countPerRow > (storeLimit + storeOffset)) {
561             // do what SEEK_NEXT_ROW does.
562             if (!matcher.moreRowsMayExistAfter(cell)) {
563               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
564             }
565             seekToNextRow(cell);
566             break LOOP;
567           }
568 
569           // add to results only if we have skipped #storeOffset kvs
570           // also update metric accordingly
571           if (this.countPerRow > storeOffset) {
572             outResult.add(cell);
573 
574             // Update local tracking information
575             count++;
576             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
577 
578             // Update the progress of the scanner context
579             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
580             scannerContext.incrementBatchProgress(1);
581 
582             if (totalBytesRead > maxRowSize) {
583               throw new RowTooBigException("Max row size allowed: " + maxRowSize
584               + ", but the row is bigger than that.");
585             }
586           }
587 
588           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
589             if (!matcher.moreRowsMayExistAfter(cell)) {
590               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
591             }
592             seekToNextRow(cell);
593           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
594             seekAsDirection(matcher.getKeyForNextColumn(cell));
595           } else {
596             this.heap.next();
597           }
598 
599           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
600             break LOOP;
601           }
602           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
603             break LOOP;
604           }
605           continue;
606 
607         case DONE:
608           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
609 
610         case DONE_SCAN:
611           close();
612           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
613 
614         case SEEK_NEXT_ROW:
615           // This is just a relatively simple end of scan fix, to short-cut end
616           // us if there is an endKey in the scan.
617           if (!matcher.moreRowsMayExistAfter(cell)) {
618             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
619           }
620 
621           seekToNextRow(cell);
622           break;
623 
624         case SEEK_NEXT_COL:
625           seekAsDirection(matcher.getKeyForNextColumn(cell));
626           break;
627 
628         case SKIP:
629           this.heap.next();
630           break;
631 
632         case SEEK_NEXT_USING_HINT:
633           // TODO convert resee to Cell?
634           Cell nextKV = matcher.getNextKeyHint(cell);
635           if (nextKV != null) {
636             seekAsDirection(nextKV);
637           } else {
638             heap.next();
639           }
640           break;
641 
642         default:
643           throw new RuntimeException("UNEXPECTED");
644       }
645     }
646 
647     if (count > 0) {
648       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
649     }
650 
651     // No more keys
652     close();
653     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
654     } finally {
655       lock.unlock();
656     }
657   }
658 
659   /*
660    * See if we should actually SEEK or rather just SKIP to the next Cell.
661    * (see HBASE-13109)
662    */
663   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
664     Cell nextIndexedKey = getNextIndexedKey();
665     if (nextIndexedKey == null || nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
666         store == null) {
667       return qcode;
668     }
669     switch(qcode) {
670     case INCLUDE_AND_SEEK_NEXT_COL:
671     case SEEK_NEXT_COL:
672     {
673       if (matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
674         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
675       }
676       break;
677     }
678     case INCLUDE_AND_SEEK_NEXT_ROW:
679     case SEEK_NEXT_ROW:
680     {
681       if (matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
682         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
683       }
684       break;
685     }
686     default:
687       break;
688     }
689     return qcode;
690   }
691 
692   // Implementation of ChangedReadersObserver
693   @Override
694   public void updateReaders() throws IOException {
695     lock.lock();
696     try {
697     if (this.closing) return;
698 
699     // All public synchronized API calls will call 'checkReseek' which will cause
700     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
701     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
702     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
703     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
704     if (this.heap == null) return;
705 
706     // this could be null.
707     this.lastTop = this.peek();
708 
709     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
710 
711     // close scanners to old obsolete Store files
712     this.heap.close(); // bubble thru and close all scanners.
713     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
714 
715     // Let the next() call handle re-creating and seeking
716     } finally {
717       lock.unlock();
718     }
719   }
720 
721   /**
722    * @return true if top of heap has changed (and KeyValueHeap has to try the
723    *         next KV)
724    * @throws IOException
725    */
726   protected boolean checkReseek() throws IOException {
727     if (this.heap == null && this.lastTop != null) {
728       resetScannerStack(this.lastTop);
729       if (this.heap.peek() == null
730           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
731         LOG.debug("Storescanner.peek() is changed where before = "
732             + this.lastTop.toString() + ",and after = " + this.heap.peek());
733         this.lastTop = null;
734         return true;
735       }
736       this.lastTop = null; // gone!
737     }
738     // else dont need to reseek
739     return false;
740   }
741 
742   protected void resetScannerStack(Cell lastTopKey) throws IOException {
743     if (heap != null) {
744       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
745     }
746 
747     /* When we have the scan object, should we not pass it to getScanners()
748      * to get a limited set of scanners? We did so in the constructor and we
749      * could have done it now by storing the scan object from the constructor */
750     List<KeyValueScanner> scanners = getScannersNoCompaction();
751 
752     // Seek all scanners to the initial key
753     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
754 
755     // Combine all seeked scanners with a heap
756     resetKVHeap(scanners, store.getComparator());
757 
758     // Reset the state of the Query Matcher and set to top row.
759     // Only reset and call setRow if the row changes; avoids confusing the
760     // query matcher if scanning intra-row.
761     Cell kv = heap.peek();
762     if (kv == null) {
763       kv = lastTopKey;
764     }
765     byte[] row = kv.getRowArray();
766     int offset = kv.getRowOffset();
767     short length = kv.getRowLength();
768     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
769         matcher.rowOffset, matcher.rowLength)) {
770       this.countPerRow = 0;
771       matcher.reset();
772       matcher.setRow(row, offset, length);
773     }
774   }
775 
776   /**
777    * Check whether scan as expected order
778    * @param prevKV
779    * @param kv
780    * @param comparator
781    * @throws IOException
782    */
783   protected void checkScanOrder(Cell prevKV, Cell kv,
784       KeyValue.KVComparator comparator) throws IOException {
785     // Check that the heap gives us KVs in an increasing order.
786     assert prevKV == null || comparator == null
787         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
788         + " followed by a " + "smaller key " + kv + " in cf " + store;
789   }
790 
791   protected boolean seekToNextRow(Cell kv) throws IOException {
792     return reseek(KeyValueUtil.createLastOnRow(kv));
793   }
794 
795   /**
796    * Do a reseek in a normal StoreScanner(scan forward)
797    * @param kv
798    * @return true if scanner has values left, false if end of scanner
799    * @throws IOException
800    */
801   protected boolean seekAsDirection(Cell kv)
802       throws IOException {
803     return reseek(kv);
804   }
805 
806   @Override
807   public boolean reseek(Cell kv) throws IOException {
808     lock.lock();
809     try {
810     //Heap will not be null, if this is called from next() which.
811     //If called from RegionScanner.reseek(...) make sure the scanner
812     //stack is reset if needed.
813     checkReseek();
814     if (explicitColumnQuery && lazySeekEnabledGlobally) {
815       return heap.requestSeek(kv, true, useRowColBloom);
816     }
817     return heap.reseek(kv);
818     } finally {
819       lock.unlock();
820     }
821   }
822 
823   @Override
824   public long getSequenceID() {
825     return 0;
826   }
827 
828   /**
829    * Seek storefiles in parallel to optimize IO latency as much as possible
830    * @param scanners the list {@link KeyValueScanner}s to be read from
831    * @param kv the KeyValue on which the operation is being requested
832    * @throws IOException
833    */
834   private void parallelSeek(final List<? extends KeyValueScanner>
835       scanners, final Cell kv) throws IOException {
836     if (scanners.isEmpty()) return;
837     int storeFileScannerCount = scanners.size();
838     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
839     List<ParallelSeekHandler> handlers = 
840         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
841     for (KeyValueScanner scanner : scanners) {
842       if (scanner instanceof StoreFileScanner) {
843         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
844           this.readPt, latch);
845         executor.submit(seekHandler);
846         handlers.add(seekHandler);
847       } else {
848         scanner.seek(kv);
849         latch.countDown();
850       }
851     }
852 
853     try {
854       latch.await();
855     } catch (InterruptedException ie) {
856       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
857     }
858 
859     for (ParallelSeekHandler handler : handlers) {
860       if (handler.getErr() != null) {
861         throw new IOException(handler.getErr());
862       }
863     }
864   }
865 
866   /**
867    * Used in testing.
868    * @return all scanners in no particular order
869    */
870   List<KeyValueScanner> getAllScannersForTesting() {
871     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
872     KeyValueScanner current = heap.getCurrentForTesting();
873     if (current != null)
874       allScanners.add(current);
875     for (KeyValueScanner scanner : heap.getHeap())
876       allScanners.add(scanner);
877     return allScanners;
878   }
879 
880   static void enableLazySeekGlobally(boolean enable) {
881     lazySeekEnabledGlobally = enable;
882   }
883 
884   /**
885    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
886    */
887   public long getEstimatedNumberOfKvsScanned() {
888     return this.kvsScanned;
889   }
890 
891   @Override
892   public Cell getNextIndexedKey() {
893     return this.heap.getNextIndexedKey();
894   }
895 }
896