View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellComparator;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValueUtil;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.client.IsolationLevel;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.executor.ExecutorService;
46  import org.apache.hadoop.hbase.filter.Filter;
47  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
48  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
49  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
50  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    protected Store store;
62    protected ScanQueryMatcher matcher;
63    protected KeyValueHeap heap;
64    protected boolean cacheBlocks;
65  
66    protected int countPerRow = 0;
67    protected int storeLimit = -1;
68    protected int storeOffset = 0;
69  
70    // Used to indicate that the scanner has closed (see HBASE-1107)
71    // Doesnt need to be volatile because it's always accessed via synchronized methods
72    protected boolean closing = false;
73    protected final boolean isGet;
74    protected final boolean explicitColumnQuery;
75    protected final boolean useRowColBloom;
76    /**
77     * A flag that enables StoreFileScanner parallel-seeking
78     */
79    protected boolean isParallelSeekEnabled = false;
80    protected ExecutorService executor;
81    protected final Scan scan;
82    protected final NavigableSet<byte[]> columns;
83    protected final long oldestUnexpiredTS;
84    protected final long now;
85    protected final int minVersions;
86    protected final long maxRowSize;
87    protected final long cellsPerHeartbeatCheck;
88  
89    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
90  
91    /**
92     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
93     * KVs skipped via seeking to next row/column. TODO: estimate them?
94     */
95    private long kvsScanned = 0;
96    private Cell prevCell = null;
97  
98    /** We don't ever expect to change this, the constant is just for clarity. */
99    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
100   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
101       "hbase.storescanner.parallel.seek.enable";
102 
103   /** Used during unit testing to ensure that lazy seek does save seek ops */
104   protected static boolean lazySeekEnabledGlobally =
105       LAZY_SEEK_ENABLED_BY_DEFAULT;
106 
107   /**
108    * The number of cells scanned in between timeout checks. Specifying a larger value means that
109    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
110    * timeout checks.
111    */
112   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
113       "hbase.cells.scanned.per.heartbeat.check";
114 
115   /**
116    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
117    */
118   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
119 
120   // if heap == null and lastTop != null, you need to reseek given the key below
121   protected Cell lastTop = null;
122 
123   // A flag whether use pread for scan
124   private boolean scanUsePread = false;
125   protected ReentrantLock lock = new ReentrantLock();
126   
127   private final long readPt;
128 
129   // used by the injection framework to test race between StoreScanner construction and compaction
130   enum StoreScannerCompactionRace {
131     BEFORE_SEEK,
132     AFTER_SEEK,
133     COMPACT_COMPLETE
134   }
135   
136   /** An internal constructor. */
137   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
138       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
139     this.readPt = readPt;
140     this.store = store;
141     this.cacheBlocks = cacheBlocks;
142     isGet = scan.isGetScan();
143     int numCol = columns == null ? 0 : columns.size();
144     explicitColumnQuery = numCol > 0;
145     this.scan = scan;
146     this.columns = columns;
147     this.now = EnvironmentEdgeManager.currentTime();
148     this.oldestUnexpiredTS = now - ttl;
149     this.minVersions = minVersions;
150 
151     if (store != null && ((HStore)store).getHRegion() != null
152         && ((HStore)store).getHRegion().getBaseConf() != null) {
153       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
154       this.maxRowSize =
155           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
156       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
157 
158       long tmpCellsPerTimeoutCheck =
159           conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
160             DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
161       this.cellsPerHeartbeatCheck =
162           tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
163               : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
164     } else {
165       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
166       this.scanUsePread = scan.isSmall();
167       this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
168     }
169 
170     // We look up row-column Bloom filters for multi-column queries as part of
171     // the seek operation. However, we also look the row-column Bloom filter
172     // for multi-row (non-"get") scans because this is not done in
173     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
174     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
175 
176     // The parallel-seeking is on :
177     // 1) the config value is *true*
178     // 2) store has more than one store file
179     if (store != null && ((HStore)store).getHRegion() != null
180         && store.getStorefilesCount() > 1) {
181       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
182       if (rsService == null || !rsService.getConfiguration().getBoolean(
183             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
184       isParallelSeekEnabled = true;
185       executor = rsService.getExecutorService();
186     }
187   }
188 
189   /**
190    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
191    * are not in a compaction.
192    *
193    * @param store who we scan
194    * @param scan the spec
195    * @param columns which columns we are scanning
196    * @throws IOException
197    */
198   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
199       long readPt)
200                               throws IOException {
201     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
202         scanInfo.getMinVersions(), readPt);
203     if (columns != null && scan.isRaw()) {
204       throw new DoNotRetryIOException(
205           "Cannot specify any column for a raw scan");
206     }
207     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
208         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
209         oldestUnexpiredTS, now, store.getCoprocessorHost());
210 
211     this.store.addChangedReaderObserver(this);
212 
213     // Pass columns to try to filter out unnecessary StoreFiles.
214     List<KeyValueScanner> scanners = getScannersNoCompaction();
215 
216     // Seek all scanners to the start of the Row (or if the exact matching row
217     // key does not exist, then to the start of the next matching Row).
218     // Always check bloom filter to optimize the top row seek for delete
219     // family marker.
220     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
221         && lazySeekEnabledGlobally, isParallelSeekEnabled);
222 
223     // set storeLimit
224     this.storeLimit = scan.getMaxResultsPerColumnFamily();
225 
226     // set rowOffset
227     this.storeOffset = scan.getRowOffsetPerColumnFamily();
228 
229     // Combine all seeked scanners with a heap
230     resetKVHeap(scanners, store.getComparator());
231   }
232 
233   /**
234    * Used for compactions.<p>
235    *
236    * Opens a scanner across specified StoreFiles.
237    * @param store who we scan
238    * @param scan the spec
239    * @param scanners ancillary scanners
240    * @param smallestReadPoint the readPoint that we should use for tracking
241    *          versions
242    */
243   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
244       List<? extends KeyValueScanner> scanners, ScanType scanType,
245       long smallestReadPoint, long earliestPutTs) throws IOException {
246     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
247   }
248 
249   /**
250    * Used for compactions that drop deletes from a limited range of rows.<p>
251    *
252    * Opens a scanner across specified StoreFiles.
253    * @param store who we scan
254    * @param scan the spec
255    * @param scanners ancillary scanners
256    * @param smallestReadPoint the readPoint that we should use for tracking versions
257    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
258    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
259    */
260   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
261       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
262       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
263     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
264         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
265   }
266 
267   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
268       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
269       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
270     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
271         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
272     if (dropDeletesFromRow == null) {
273       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
274           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
275     } else {
276       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
277           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
278     }
279 
280     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
281     scanners = selectScannersFrom(scanners);
282 
283     // Seek all scanners to the initial key
284     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
285 
286     // Combine all seeked scanners with a heap
287     resetKVHeap(scanners, store.getComparator());
288   }
289 
290   /** Constructor for testing. */
291   StoreScanner(final Scan scan, ScanInfo scanInfo,
292       ScanType scanType, final NavigableSet<byte[]> columns,
293       final List<KeyValueScanner> scanners) throws IOException {
294     this(scan, scanInfo, scanType, columns, scanners,
295         HConstants.LATEST_TIMESTAMP,
296         // 0 is passed as readpoint because the test bypasses Store
297         0);
298   }
299 
300   // Constructor for testing.
301   StoreScanner(final Scan scan, ScanInfo scanInfo,
302     ScanType scanType, final NavigableSet<byte[]> columns,
303     final List<KeyValueScanner> scanners, long earliestPutTs)
304         throws IOException {
305     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
306       // 0 is passed as readpoint because the test bypasses Store
307       0);
308   }
309   
310   private StoreScanner(final Scan scan, ScanInfo scanInfo,
311       ScanType scanType, final NavigableSet<byte[]> columns,
312       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
313           throws IOException {
314     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
315         scanInfo.getMinVersions(), readPt);
316     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
317         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
318 
319     // In unit tests, the store could be null
320     if (this.store != null) {
321       this.store.addChangedReaderObserver(this);
322     }
323     // Seek all scanners to the initial key
324     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
325     resetKVHeap(scanners, scanInfo.getComparator());
326   }
327 
328   /**
329    * Get a filtered list of scanners. Assumes we are not in a compaction.
330    * @return list of scanners to seek
331    */
332   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
333     final boolean isCompaction = false;
334     boolean usePread = isGet || scanUsePread;
335     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
336         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
337   }
338 
339   /**
340    * Seek the specified scanners with the given key
341    * @param scanners
342    * @param seekKey
343    * @param isLazy true if using lazy seek
344    * @param isParallelSeek true if using parallel seek
345    * @throws IOException
346    */
347   protected void seekScanners(List<? extends KeyValueScanner> scanners,
348       Cell seekKey, boolean isLazy, boolean isParallelSeek)
349       throws IOException {
350     // Seek all scanners to the start of the Row (or if the exact matching row
351     // key does not exist, then to the start of the next matching Row).
352     // Always check bloom filter to optimize the top row seek for delete
353     // family marker.
354     if (isLazy) {
355       for (KeyValueScanner scanner : scanners) {
356         scanner.requestSeek(seekKey, false, true);
357       }
358     } else {
359       if (!isParallelSeek) {
360         long totalScannersSoughtBytes = 0;
361         for (KeyValueScanner scanner : scanners) {
362           if (totalScannersSoughtBytes >= maxRowSize) {
363             throw new RowTooBigException("Max row size allowed: " + maxRowSize
364               + ", but row is bigger than that");
365           }
366           scanner.seek(seekKey);
367           Cell c = scanner.peek();
368           if (c != null) {
369             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
370           }
371         }
372       } else {
373         parallelSeek(scanners, seekKey);
374       }
375     }
376   }
377 
378   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
379       CellComparator comparator) throws IOException {
380     // Combine all seeked scanners with a heap
381     heap = new KeyValueHeap(scanners, comparator);
382   }
383 
384   /**
385    * Filters the given list of scanners using Bloom filter, time range, and
386    * TTL.
387    */
388   protected List<KeyValueScanner> selectScannersFrom(
389       final List<? extends KeyValueScanner> allScanners) {
390     boolean memOnly;
391     boolean filesOnly;
392     if (scan instanceof InternalScan) {
393       InternalScan iscan = (InternalScan)scan;
394       memOnly = iscan.isCheckOnlyMemStore();
395       filesOnly = iscan.isCheckOnlyStoreFiles();
396     } else {
397       memOnly = false;
398       filesOnly = false;
399     }
400 
401     List<KeyValueScanner> scanners =
402         new ArrayList<KeyValueScanner>(allScanners.size());
403 
404     // We can only exclude store files based on TTL if minVersions is set to 0.
405     // Otherwise, we might have to return KVs that have technically expired.
406     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
407         Long.MIN_VALUE;
408 
409     // include only those scan files which pass all filters
410     for (KeyValueScanner kvs : allScanners) {
411       boolean isFile = kvs.isFileScanner();
412       if ((!isFile && filesOnly) || (isFile && memOnly)) {
413         continue;
414       }
415 
416       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
417         scanners.add(kvs);
418       }
419     }
420     return scanners;
421   }
422 
423   @Override
424   public Cell peek() {
425     lock.lock();
426     try {
427     if (this.heap == null) {
428       return this.lastTop;
429     }
430     return this.heap.peek();
431     } finally {
432       lock.unlock();
433     }
434   }
435 
436   @Override
437   public KeyValue next() {
438     // throw runtime exception perhaps?
439     throw new RuntimeException("Never call StoreScanner.next()");
440   }
441 
442   @Override
443   public void close() {
444     close(true);
445   }
446 
447   private void close(boolean withHeapClose){
448     lock.lock();
449     try {
450       if (this.closing) return;
451       this.closing = true;
452       // under test, we dont have a this.store
453       if (this.store != null) this.store.deleteChangedReaderObserver(this);
454       if (withHeapClose) {
455         for (KeyValueHeap h : this.heapsForDelayedClose) {
456           h.close();
457         }
458         this.heapsForDelayedClose.clear();
459         if (this.heap != null) {
460           this.heap.close();
461           this.heap = null; // CLOSED!
462         }
463       } else {
464         if (this.heap != null) {
465           this.heapsForDelayedClose.add(this.heap);
466           this.heap = null;
467         }
468       }
469       this.lastTop = null; // If both are null, we are closed.
470     } finally {
471       lock.unlock();
472     }
473   }
474 
475   @Override
476   public boolean seek(Cell key) throws IOException {
477     lock.lock();
478     try {
479     // reset matcher state, in case that underlying store changed
480     checkReseek();
481     return this.heap.seek(key);
482     } finally {
483       lock.unlock();
484     }
485   }
486 
487   @Override
488   public boolean next(List<Cell> outResult) throws IOException {
489     return next(outResult, NoLimitScannerContext.getInstance());
490   }
491 
492   /**
493    * Get the next row of values from this Store.
494    * @param outResult
495    * @param scannerContext
496    * @return true if there are more rows, false if scanner is done
497    */
498   @Override
499   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
500     lock.lock();
501 
502     try {
503     if (scannerContext == null) {
504       throw new IllegalArgumentException("Scanner context cannot be null");
505     }
506     if (checkReseek()) {
507       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
508     }
509 
510     // if the heap was left null, then the scanners had previously run out anyways, close and
511     // return.
512     if (this.heap == null) {
513       close(false);// Do all cleanup except heap.close()
514       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
515     }
516 
517     Cell peeked = this.heap.peek();
518     if (peeked == null) {
519       close(false);// Do all cleanup except heap.close()
520       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
521     }
522 
523     // only call setRow if the row changes; avoids confusing the query matcher
524     // if scanning intra-row
525 
526     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
527     // rows. Else it is possible we are still traversing the same row so we must perform the row
528     // comparison.
529     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
530         !CellUtil.matchingRow(peeked, matcher.curCell)) {
531       this.countPerRow = 0;
532       matcher.setToNewRow(peeked);
533     }
534 
535     // Clear progress away unless invoker has indicated it should be kept.
536     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
537     
538     Cell cell;
539 
540     // Only do a sanity-check if store and comparator are available.
541     CellComparator comparator =
542         store != null ? store.getComparator() : null;
543 
544     int count = 0;
545     long totalBytesRead = 0;
546 
547     LOOP: while((cell = this.heap.peek()) != null) {
548       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
549       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
550         scannerContext.updateTimeProgress();
551         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
552           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
553         }
554       }
555 
556       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
557       checkScanOrder(prevCell, cell, comparator);
558       prevCell = cell;
559 
560       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
561       qcode = optimize(qcode, cell);
562       switch(qcode) {
563         case INCLUDE:
564         case INCLUDE_AND_SEEK_NEXT_ROW:
565         case INCLUDE_AND_SEEK_NEXT_COL:
566 
567           Filter f = matcher.getFilter();
568           if (f != null) {
569             cell = f.transformCell(cell);
570           }
571 
572           this.countPerRow++;
573           if (storeLimit > -1 &&
574               this.countPerRow > (storeLimit + storeOffset)) {
575             // do what SEEK_NEXT_ROW does.
576             if (!matcher.moreRowsMayExistAfter(cell)) {
577               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
578             }
579             seekToNextRow(cell);
580             break LOOP;
581           }
582 
583           // add to results only if we have skipped #storeOffset kvs
584           // also update metric accordingly
585           if (this.countPerRow > storeOffset) {
586             outResult.add(cell);
587 
588             // Update local tracking information
589             count++;
590             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
591 
592             // Update the progress of the scanner context
593             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
594             scannerContext.incrementBatchProgress(1);
595 
596             if (totalBytesRead > maxRowSize) {
597               throw new RowTooBigException("Max row size allowed: " + maxRowSize
598               + ", but the row is bigger than that.");
599             }
600           }
601 
602           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
603             if (!matcher.moreRowsMayExistAfter(cell)) {
604               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
605             }
606             seekToNextRow(cell);
607           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
608             seekAsDirection(matcher.getKeyForNextColumn(cell));
609           } else {
610             this.heap.next();
611           }
612 
613           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
614             break LOOP;
615           }
616           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
617             break LOOP;
618           }
619           continue;
620 
621         case DONE:
622           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
623 
624         case DONE_SCAN:
625           close(false);// Do all cleanup except heap.close()
626           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
627 
628         case SEEK_NEXT_ROW:
629           // This is just a relatively simple end of scan fix, to short-cut end
630           // us if there is an endKey in the scan.
631           if (!matcher.moreRowsMayExistAfter(cell)) {
632             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
633           }
634 
635           seekToNextRow(cell);
636           break;
637 
638         case SEEK_NEXT_COL:
639           seekAsDirection(matcher.getKeyForNextColumn(cell));
640           break;
641 
642         case SKIP:
643           this.heap.next();
644           break;
645 
646         case SEEK_NEXT_USING_HINT:
647           Cell nextKV = matcher.getNextKeyHint(cell);
648           if (nextKV != null) {
649             seekAsDirection(nextKV);
650           } else {
651             heap.next();
652           }
653           break;
654 
655         default:
656           throw new RuntimeException("UNEXPECTED");
657       }
658     }
659 
660     if (count > 0) {
661       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
662     }
663 
664     // No more keys
665     close(false);// Do all cleanup except heap.close()
666     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
667     } finally {
668       lock.unlock();
669     }
670   }
671 
672   /*
673    * See if we should actually SEEK or rather just SKIP to the next Cell.
674    * (see HBASE-13109)
675    */
676   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
677     Cell nextIndexedKey = getNextIndexedKey();
678     if (nextIndexedKey == null || nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
679         store == null) {
680       return qcode;
681     }
682     switch(qcode) {
683     case INCLUDE_AND_SEEK_NEXT_COL:
684     case SEEK_NEXT_COL:
685     {
686       if (matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
687         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
688       }
689       break;
690     }
691     case INCLUDE_AND_SEEK_NEXT_ROW:
692     case SEEK_NEXT_ROW:
693     {
694       if (matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
695         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
696       }
697       break;
698     }
699     default:
700       break;
701     }
702     return qcode;
703   }
704 
705   // Implementation of ChangedReadersObserver
706   @Override
707   public void updateReaders() throws IOException {
708     lock.lock();
709     try {
710     if (this.closing) return;
711 
712     // All public synchronized API calls will call 'checkReseek' which will cause
713     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
714     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
715     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
716     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
717     if (this.heap == null) return;
718 
719     // this could be null.
720     this.lastTop = this.peek();
721 
722     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
723 
724     // close scanners to old obsolete Store files
725     this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
726     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
727 
728     // Let the next() call handle re-creating and seeking
729     } finally {
730       lock.unlock();
731     }
732   }
733 
734   /**
735    * @return true if top of heap has changed (and KeyValueHeap has to try the
736    *         next KV)
737    * @throws IOException
738    */
739   protected boolean checkReseek() throws IOException {
740     if (this.heap == null && this.lastTop != null) {
741       resetScannerStack(this.lastTop);
742       if (this.heap.peek() == null
743           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
744         LOG.debug("Storescanner.peek() is changed where before = "
745             + this.lastTop.toString() + ",and after = " + this.heap.peek());
746         this.lastTop = null;
747         return true;
748       }
749       this.lastTop = null; // gone!
750     }
751     // else dont need to reseek
752     return false;
753   }
754 
755   protected void resetScannerStack(Cell lastTopKey) throws IOException {
756     if (heap != null) {
757       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
758     }
759 
760     /* When we have the scan object, should we not pass it to getScanners()
761      * to get a limited set of scanners? We did so in the constructor and we
762      * could have done it now by storing the scan object from the constructor */
763     List<KeyValueScanner> scanners = getScannersNoCompaction();
764 
765     // Seek all scanners to the initial key
766     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
767 
768     // Combine all seeked scanners with a heap
769     resetKVHeap(scanners, store.getComparator());
770 
771     // Reset the state of the Query Matcher and set to top row.
772     // Only reset and call setRow if the row changes; avoids confusing the
773     // query matcher if scanning intra-row.
774     Cell cell = heap.peek();
775     if (cell == null) {
776       cell = lastTopKey;
777     }
778     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
779       this.countPerRow = 0;
780       matcher.reset();
781       matcher.setToNewRow(cell);
782     }
783   }
784 
785   /**
786    * Check whether scan as expected order
787    * @param prevKV
788    * @param kv
789    * @param comparator
790    * @throws IOException
791    */
792   protected void checkScanOrder(Cell prevKV, Cell kv,
793       CellComparator comparator) throws IOException {
794     // Check that the heap gives us KVs in an increasing order.
795     assert prevKV == null || comparator == null
796         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
797         + " followed by a " + "smaller key " + kv + " in cf " + store;
798   }
799 
800   protected boolean seekToNextRow(Cell kv) throws IOException {
801     return reseek(KeyValueUtil.createLastOnRow(kv));
802   }
803 
804   /**
805    * Do a reseek in a normal StoreScanner(scan forward)
806    * @param kv
807    * @return true if scanner has values left, false if end of scanner
808    * @throws IOException
809    */
810   protected boolean seekAsDirection(Cell kv)
811       throws IOException {
812     return reseek(kv);
813   }
814 
815   @Override
816   public boolean reseek(Cell kv) throws IOException {
817     lock.lock();
818     try {
819     //Heap will not be null, if this is called from next() which.
820     //If called from RegionScanner.reseek(...) make sure the scanner
821     //stack is reset if needed.
822     checkReseek();
823     if (explicitColumnQuery && lazySeekEnabledGlobally) {
824       return heap.requestSeek(kv, true, useRowColBloom);
825     }
826     return heap.reseek(kv);
827     } finally {
828       lock.unlock();
829     }
830   }
831 
832   @Override
833   public long getSequenceID() {
834     return 0;
835   }
836 
837   /**
838    * Seek storefiles in parallel to optimize IO latency as much as possible
839    * @param scanners the list {@link KeyValueScanner}s to be read from
840    * @param kv the KeyValue on which the operation is being requested
841    * @throws IOException
842    */
843   private void parallelSeek(final List<? extends KeyValueScanner>
844       scanners, final Cell kv) throws IOException {
845     if (scanners.isEmpty()) return;
846     int storeFileScannerCount = scanners.size();
847     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
848     List<ParallelSeekHandler> handlers = 
849         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
850     for (KeyValueScanner scanner : scanners) {
851       if (scanner instanceof StoreFileScanner) {
852         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
853           this.readPt, latch);
854         executor.submit(seekHandler);
855         handlers.add(seekHandler);
856       } else {
857         scanner.seek(kv);
858         latch.countDown();
859       }
860     }
861 
862     try {
863       latch.await();
864     } catch (InterruptedException ie) {
865       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
866     }
867 
868     for (ParallelSeekHandler handler : handlers) {
869       if (handler.getErr() != null) {
870         throw new IOException(handler.getErr());
871       }
872     }
873   }
874 
875   /**
876    * Used in testing.
877    * @return all scanners in no particular order
878    */
879   List<KeyValueScanner> getAllScannersForTesting() {
880     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
881     KeyValueScanner current = heap.getCurrentForTesting();
882     if (current != null)
883       allScanners.add(current);
884     for (KeyValueScanner scanner : heap.getHeap())
885       allScanners.add(scanner);
886     return allScanners;
887   }
888 
889   static void enableLazySeekGlobally(boolean enable) {
890     lazySeekEnabledGlobally = enable;
891   }
892 
893   /**
894    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
895    */
896   public long getEstimatedNumberOfKvsScanned() {
897     return this.kvsScanned;
898   }
899 
900   @Override
901   public Cell getNextIndexedKey() {
902     return this.heap.getNextIndexedKey();
903   }
904 }
905