View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellComparator;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.client.IsolationLevel;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.executor.ExecutorService;
45  import org.apache.hadoop.hbase.filter.Filter;
46  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
48  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
49  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  
52  /**
53   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
54   * into List<KeyValue> for a single row.
55   */
56  @InterfaceAudience.Private
57  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
58      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
59    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
60    protected Store store;
61    protected ScanQueryMatcher matcher;
62    protected KeyValueHeap heap;
63    protected boolean cacheBlocks;
64  
65    protected int countPerRow = 0;
66    protected int storeLimit = -1;
67    protected int storeOffset = 0;
68  
69    // Used to indicate that the scanner has closed (see HBASE-1107)
70    // Doesnt need to be volatile because it's always accessed via synchronized methods
71    protected boolean closing = false;
72    protected final boolean isGet;
73    protected final boolean explicitColumnQuery;
74    protected final boolean useRowColBloom;
75    /**
76     * A flag that enables StoreFileScanner parallel-seeking
77     */
78    protected boolean isParallelSeekEnabled = false;
79    protected ExecutorService executor;
80    protected final Scan scan;
81    protected final NavigableSet<byte[]> columns;
82    protected final long oldestUnexpiredTS;
83    protected final long now;
84    protected final int minVersions;
85    protected final long maxRowSize;
86    protected final long cellsPerHeartbeatCheck;
87  
88    // Collects all the KVHeap that are eagerly getting closed during the
89    // course of a scan
90    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
91  
92    /**
93     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
94     * KVs skipped via seeking to next row/column. TODO: estimate them?
95     */
96    private long kvsScanned = 0;
97    private Cell prevCell = null;
98  
99    /** We don't ever expect to change this, the constant is just for clarity. */
100   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
101   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
102       "hbase.storescanner.parallel.seek.enable";
103 
104   /** Used during unit testing to ensure that lazy seek does save seek ops */
105   protected static boolean lazySeekEnabledGlobally =
106       LAZY_SEEK_ENABLED_BY_DEFAULT;
107 
108   /**
109    * The number of cells scanned in between timeout checks. Specifying a larger value means that
110    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
111    * timeout checks.
112    */
113   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
114       "hbase.cells.scanned.per.heartbeat.check";
115 
116   /**
117    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
118    */
119   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
120 
121   // if heap == null and lastTop != null, you need to reseek given the key below
122   protected Cell lastTop = null;
123 
124   // A flag whether use pread for scan
125   private boolean scanUsePread = false;
126   protected ReentrantLock lock = new ReentrantLock();
127   
128   protected final long readPt;
129 
130   // used by the injection framework to test race between StoreScanner construction and compaction
131   enum StoreScannerCompactionRace {
132     BEFORE_SEEK,
133     AFTER_SEEK,
134     COMPACT_COMPLETE
135   }
136   
137   /** An internal constructor. */
138   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
139       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
140     this.readPt = readPt;
141     this.store = store;
142     this.cacheBlocks = cacheBlocks;
143     isGet = scan.isGetScan();
144     int numCol = columns == null ? 0 : columns.size();
145     explicitColumnQuery = numCol > 0;
146     this.scan = scan;
147     this.columns = columns;
148     this.now = EnvironmentEdgeManager.currentTime();
149     this.oldestUnexpiredTS = now - ttl;
150     this.minVersions = minVersions;
151 
152     if (store != null && ((HStore)store).getHRegion() != null
153         && ((HStore)store).getHRegion().getBaseConf() != null) {
154       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
155       this.maxRowSize =
156           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
157       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
158 
159       long tmpCellsPerTimeoutCheck =
160           conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
161             DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
162       this.cellsPerHeartbeatCheck =
163           tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
164               : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
165     } else {
166       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
167       this.scanUsePread = scan.isSmall();
168       this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
169     }
170 
171     // We look up row-column Bloom filters for multi-column queries as part of
172     // the seek operation. However, we also look the row-column Bloom filter
173     // for multi-row (non-"get") scans because this is not done in
174     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
175     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
176 
177     // The parallel-seeking is on :
178     // 1) the config value is *true*
179     // 2) store has more than one store file
180     if (store != null && ((HStore)store).getHRegion() != null
181         && store.getStorefilesCount() > 1) {
182       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
183       if (rsService == null || !rsService.getConfiguration().getBoolean(
184             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
185       isParallelSeekEnabled = true;
186       executor = rsService.getExecutorService();
187     }
188   }
189 
190   /**
191    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
192    * are not in a compaction.
193    *
194    * @param store who we scan
195    * @param scan the spec
196    * @param columns which columns we are scanning
197    * @throws IOException
198    */
199   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
200       long readPt)
201                               throws IOException {
202     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
203         scanInfo.getMinVersions(), readPt);
204     if (columns != null && scan.isRaw()) {
205       throw new DoNotRetryIOException(
206           "Cannot specify any column for a raw scan");
207     }
208     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
209         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
210         oldestUnexpiredTS, now, store.getCoprocessorHost());
211 
212     this.store.addChangedReaderObserver(this);
213 
214     // Pass columns to try to filter out unnecessary StoreFiles.
215     List<KeyValueScanner> scanners = getScannersNoCompaction();
216 
217     // Seek all scanners to the start of the Row (or if the exact matching row
218     // key does not exist, then to the start of the next matching Row).
219     // Always check bloom filter to optimize the top row seek for delete
220     // family marker.
221     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
222         && lazySeekEnabledGlobally, isParallelSeekEnabled);
223 
224     // set storeLimit
225     this.storeLimit = scan.getMaxResultsPerColumnFamily();
226 
227     // set rowOffset
228     this.storeOffset = scan.getRowOffsetPerColumnFamily();
229 
230     // Combine all seeked scanners with a heap
231     resetKVHeap(scanners, store.getComparator());
232   }
233 
234   /**
235    * Used for compactions.<p>
236    *
237    * Opens a scanner across specified StoreFiles.
238    * @param store who we scan
239    * @param scan the spec
240    * @param scanners ancillary scanners
241    * @param smallestReadPoint the readPoint that we should use for tracking
242    *          versions
243    */
244   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
245       List<? extends KeyValueScanner> scanners, ScanType scanType,
246       long smallestReadPoint, long earliestPutTs) throws IOException {
247     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
248   }
249 
250   /**
251    * Used for compactions that drop deletes from a limited range of rows.<p>
252    *
253    * Opens a scanner across specified StoreFiles.
254    * @param store who we scan
255    * @param scan the spec
256    * @param scanners ancillary scanners
257    * @param smallestReadPoint the readPoint that we should use for tracking versions
258    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
259    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
260    */
261   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
262       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
263       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
264     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
265         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
266   }
267 
268   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
269       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
270       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
271     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
272         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
273     if (dropDeletesFromRow == null) {
274       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
275           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
276     } else {
277       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
278           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
279     }
280 
281     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
282     scanners = selectScannersFrom(scanners);
283 
284     // Seek all scanners to the initial key
285     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
286 
287     // Combine all seeked scanners with a heap
288     resetKVHeap(scanners, store.getComparator());
289   }
290 
291   /** Constructor for testing. */
292   StoreScanner(final Scan scan, ScanInfo scanInfo,
293       ScanType scanType, final NavigableSet<byte[]> columns,
294       final List<KeyValueScanner> scanners) throws IOException {
295     this(scan, scanInfo, scanType, columns, scanners,
296         HConstants.LATEST_TIMESTAMP,
297         // 0 is passed as readpoint because the test bypasses Store
298         0);
299   }
300 
301   // Constructor for testing.
302   StoreScanner(final Scan scan, ScanInfo scanInfo,
303     ScanType scanType, final NavigableSet<byte[]> columns,
304     final List<KeyValueScanner> scanners, long earliestPutTs)
305         throws IOException {
306     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
307       // 0 is passed as readpoint because the test bypasses Store
308       0);
309   }
310 
311   public StoreScanner(final Scan scan, ScanInfo scanInfo,
312       ScanType scanType, final NavigableSet<byte[]> columns,
313       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
314           throws IOException {
315     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
316         scanInfo.getMinVersions(), readPt);
317     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
318         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
319 
320     // In unit tests, the store could be null
321     if (this.store != null) {
322       this.store.addChangedReaderObserver(this);
323     }
324     // Seek all scanners to the initial key
325     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
326     resetKVHeap(scanners, scanInfo.getComparator());
327   }
328 
329   /**
330    * Get a filtered list of scanners. Assumes we are not in a compaction.
331    * @return list of scanners to seek
332    */
333   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
334     final boolean isCompaction = false;
335     boolean usePread = isGet || scanUsePread;
336     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
337         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
338   }
339 
340   /**
341    * Seek the specified scanners with the given key
342    * @param scanners
343    * @param seekKey
344    * @param isLazy true if using lazy seek
345    * @param isParallelSeek true if using parallel seek
346    * @throws IOException
347    */
348   protected void seekScanners(List<? extends KeyValueScanner> scanners,
349       Cell seekKey, boolean isLazy, boolean isParallelSeek)
350       throws IOException {
351     // Seek all scanners to the start of the Row (or if the exact matching row
352     // key does not exist, then to the start of the next matching Row).
353     // Always check bloom filter to optimize the top row seek for delete
354     // family marker.
355     if (isLazy) {
356       for (KeyValueScanner scanner : scanners) {
357         scanner.requestSeek(seekKey, false, true);
358       }
359     } else {
360       if (!isParallelSeek) {
361         long totalScannersSoughtBytes = 0;
362         for (KeyValueScanner scanner : scanners) {
363           if (totalScannersSoughtBytes >= maxRowSize) {
364             throw new RowTooBigException("Max row size allowed: " + maxRowSize
365               + ", but row is bigger than that");
366           }
367           scanner.seek(seekKey);
368           Cell c = scanner.peek();
369           if (c != null) {
370             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
371           }
372         }
373       } else {
374         parallelSeek(scanners, seekKey);
375       }
376     }
377   }
378 
379   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
380       CellComparator comparator) throws IOException {
381     // Combine all seeked scanners with a heap
382     heap = new KeyValueHeap(scanners, comparator);
383   }
384 
385   /**
386    * Filters the given list of scanners using Bloom filter, time range, and
387    * TTL.
388    */
389   protected List<KeyValueScanner> selectScannersFrom(
390       final List<? extends KeyValueScanner> allScanners) {
391     boolean memOnly;
392     boolean filesOnly;
393     if (scan instanceof InternalScan) {
394       InternalScan iscan = (InternalScan)scan;
395       memOnly = iscan.isCheckOnlyMemStore();
396       filesOnly = iscan.isCheckOnlyStoreFiles();
397     } else {
398       memOnly = false;
399       filesOnly = false;
400     }
401 
402     List<KeyValueScanner> scanners =
403         new ArrayList<KeyValueScanner>(allScanners.size());
404 
405     // We can only exclude store files based on TTL if minVersions is set to 0.
406     // Otherwise, we might have to return KVs that have technically expired.
407     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
408         Long.MIN_VALUE;
409 
410     // include only those scan files which pass all filters
411     for (KeyValueScanner kvs : allScanners) {
412       boolean isFile = kvs.isFileScanner();
413       if ((!isFile && filesOnly) || (isFile && memOnly)) {
414         continue;
415       }
416 
417       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
418         scanners.add(kvs);
419       }
420     }
421     return scanners;
422   }
423 
424   @Override
425   public Cell peek() {
426     lock.lock();
427     try {
428     if (this.heap == null) {
429       return this.lastTop;
430     }
431     return this.heap.peek();
432     } finally {
433       lock.unlock();
434     }
435   }
436 
437   @Override
438   public KeyValue next() {
439     // throw runtime exception perhaps?
440     throw new RuntimeException("Never call StoreScanner.next()");
441   }
442 
443   @Override
444   public void close() {
445     close(true);
446   }
447 
448   private void close(boolean withHeapClose){
449     lock.lock();
450     try {
451       if (this.closing) {
452         return;
453       }
454       if (withHeapClose) this.closing = true;
455       // under test, we dont have a this.store
456       if (this.store != null) this.store.deleteChangedReaderObserver(this);
457       if (withHeapClose) {
458         for (KeyValueHeap h : this.heapsForDelayedClose) {
459           h.close();
460         }
461         this.heapsForDelayedClose.clear();
462         if (this.heap != null) {
463           this.heap.close();
464           this.heap = null; // CLOSED!
465         }
466       } else {
467         if (this.heap != null) {
468           this.heapsForDelayedClose.add(this.heap);
469           this.heap = null;
470         }
471       }
472       this.lastTop = null; // If both are null, we are closed.
473     } finally {
474       lock.unlock();
475     }
476   }
477 
478   @Override
479   public boolean seek(Cell key) throws IOException {
480     lock.lock();
481     try {
482     // reset matcher state, in case that underlying store changed
483     checkReseek();
484     return this.heap.seek(key);
485     } finally {
486       lock.unlock();
487     }
488   }
489 
490   @Override
491   public boolean next(List<Cell> outResult) throws IOException {
492     return next(outResult, NoLimitScannerContext.getInstance());
493   }
494 
495   /**
496    * Get the next row of values from this Store.
497    * @param outResult
498    * @param scannerContext
499    * @return true if there are more rows, false if scanner is done
500    */
501   @Override
502   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
503     lock.lock();
504 
505     try {
506     if (scannerContext == null) {
507       throw new IllegalArgumentException("Scanner context cannot be null");
508     }
509     if (checkReseek()) {
510       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
511     }
512 
513     // if the heap was left null, then the scanners had previously run out anyways, close and
514     // return.
515     if (this.heap == null) {
516       // By this time partial close should happened because already heap is null
517       close(false);// Do all cleanup except heap.close()
518       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
519     }
520 
521     Cell cell = this.heap.peek();
522     if (cell == null) {
523       close(false);// Do all cleanup except heap.close()
524       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
525     }
526 
527     // only call setRow if the row changes; avoids confusing the query matcher
528     // if scanning intra-row
529 
530     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
531     // rows. Else it is possible we are still traversing the same row so we must perform the row
532     // comparison.
533     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
534         !CellUtil.matchingRow(cell, matcher.curCell)) {
535       this.countPerRow = 0;
536       matcher.setToNewRow(cell);
537     }
538 
539     // Clear progress away unless invoker has indicated it should be kept.
540     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
541 
542     // Only do a sanity-check if store and comparator are available.
543     CellComparator comparator =
544         store != null ? store.getComparator() : null;
545 
546     int count = 0;
547     long totalBytesRead = 0;
548 
549     LOOP: do {
550       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
551       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
552         scannerContext.updateTimeProgress();
553         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
554           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
555         }
556       }
557 
558       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
559       checkScanOrder(prevCell, cell, comparator);
560       prevCell = cell;
561 
562       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
563       qcode = optimize(qcode, cell);
564       switch(qcode) {
565         case INCLUDE:
566         case INCLUDE_AND_SEEK_NEXT_ROW:
567         case INCLUDE_AND_SEEK_NEXT_COL:
568 
569           Filter f = matcher.getFilter();
570           if (f != null) {
571             cell = f.transformCell(cell);
572           }
573 
574           this.countPerRow++;
575           if (storeLimit > -1 &&
576               this.countPerRow > (storeLimit + storeOffset)) {
577             // do what SEEK_NEXT_ROW does.
578             if (!matcher.moreRowsMayExistAfter(cell)) {
579               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
580             }
581             seekToNextRow(cell);
582             break LOOP;
583           }
584 
585           // add to results only if we have skipped #storeOffset kvs
586           // also update metric accordingly
587           if (this.countPerRow > storeOffset) {
588             outResult.add(cell);
589 
590             // Update local tracking information
591             count++;
592             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
593 
594             // Update the progress of the scanner context
595             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
596             scannerContext.incrementBatchProgress(1);
597 
598             if (totalBytesRead > maxRowSize) {
599               throw new RowTooBigException("Max row size allowed: " + maxRowSize
600               + ", but the row is bigger than that.");
601             }
602           }
603 
604           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
605             if (!matcher.moreRowsMayExistAfter(cell)) {
606               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
607             }
608             seekToNextRow(cell);
609           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
610             seekAsDirection(matcher.getKeyForNextColumn(cell));
611           } else {
612             this.heap.next();
613           }
614 
615           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
616             break LOOP;
617           }
618           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
619             break LOOP;
620           }
621           continue;
622 
623         case DONE:
624           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
625 
626         case DONE_SCAN:
627           close(false);// Do all cleanup except heap.close()
628           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
629 
630         case SEEK_NEXT_ROW:
631           // This is just a relatively simple end of scan fix, to short-cut end
632           // us if there is an endKey in the scan.
633           if (!matcher.moreRowsMayExistAfter(cell)) {
634             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
635           }
636 
637           seekToNextRow(cell);
638           break;
639 
640         case SEEK_NEXT_COL:
641           seekAsDirection(matcher.getKeyForNextColumn(cell));
642           break;
643 
644         case SKIP:
645           this.heap.next();
646           break;
647 
648         case SEEK_NEXT_USING_HINT:
649           Cell nextKV = matcher.getNextKeyHint(cell);
650           if (nextKV != null) {
651             seekAsDirection(nextKV);
652           } else {
653             heap.next();
654           }
655           break;
656 
657         default:
658           throw new RuntimeException("UNEXPECTED");
659       }
660     } while((cell = this.heap.peek()) != null);
661 
662     if (count > 0) {
663       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
664     }
665 
666     // No more keys
667     close(false);// Do all cleanup except heap.close()
668     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
669     } finally {
670       lock.unlock();
671     }
672   }
673 
674   /*
675    * See if we should actually SEEK or rather just SKIP to the next Cell.
676    * (see HBASE-13109)
677    */
678   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
679     Cell nextIndexedKey = getNextIndexedKey();
680     if (nextIndexedKey == null || nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
681         store == null) {
682       return qcode;
683     }
684     switch(qcode) {
685     case INCLUDE_AND_SEEK_NEXT_COL:
686     case SEEK_NEXT_COL:
687     {
688       if (matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
689         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
690       }
691       break;
692     }
693     case INCLUDE_AND_SEEK_NEXT_ROW:
694     case SEEK_NEXT_ROW:
695     {
696       if (matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
697         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
698       }
699       break;
700     }
701     default:
702       break;
703     }
704     return qcode;
705   }
706 
707   // Implementation of ChangedReadersObserver
708   @Override
709   public void updateReaders() throws IOException {
710     lock.lock();
711     try {
712     if (this.closing) return;
713 
714     // All public synchronized API calls will call 'checkReseek' which will cause
715     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
716     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
717     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
718     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
719     if (this.heap == null) return;
720 
721     // this could be null.
722     this.lastTop = this.peek();
723 
724     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
725 
726     // close scanners to old obsolete Store files
727     this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
728     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
729 
730     // Let the next() call handle re-creating and seeking
731     } finally {
732       lock.unlock();
733     }
734   }
735 
736   /**
737    * @return true if top of heap has changed (and KeyValueHeap has to try the
738    *         next KV)
739    * @throws IOException
740    */
741   protected boolean checkReseek() throws IOException {
742     if (this.heap == null && this.lastTop != null) {
743       resetScannerStack(this.lastTop);
744       if (this.heap.peek() == null
745           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
746         LOG.debug("Storescanner.peek() is changed where before = "
747             + this.lastTop.toString() + ",and after = " + this.heap.peek());
748         this.lastTop = null;
749         return true;
750       }
751       this.lastTop = null; // gone!
752     }
753     // else dont need to reseek
754     return false;
755   }
756 
757   protected void resetScannerStack(Cell lastTopKey) throws IOException {
758     if (heap != null) {
759       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
760     }
761 
762     /* When we have the scan object, should we not pass it to getScanners()
763      * to get a limited set of scanners? We did so in the constructor and we
764      * could have done it now by storing the scan object from the constructor */
765     List<KeyValueScanner> scanners = getScannersNoCompaction();
766 
767     // Seek all scanners to the initial key
768     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
769 
770     // Combine all seeked scanners with a heap
771     resetKVHeap(scanners, store.getComparator());
772 
773     // Reset the state of the Query Matcher and set to top row.
774     // Only reset and call setRow if the row changes; avoids confusing the
775     // query matcher if scanning intra-row.
776     Cell cell = heap.peek();
777     if (cell == null) {
778       cell = lastTopKey;
779     }
780     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
781       this.countPerRow = 0;
782       matcher.reset();
783       matcher.setToNewRow(cell);
784     }
785   }
786 
787   /**
788    * Check whether scan as expected order
789    * @param prevKV
790    * @param kv
791    * @param comparator
792    * @throws IOException
793    */
794   protected void checkScanOrder(Cell prevKV, Cell kv,
795       CellComparator comparator) throws IOException {
796     // Check that the heap gives us KVs in an increasing order.
797     assert prevKV == null || comparator == null
798         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
799         + " followed by a " + "smaller key " + kv + " in cf " + store;
800   }
801 
802   protected boolean seekToNextRow(Cell c) throws IOException {
803     return reseek(CellUtil.createLastOnRow(c));
804   }
805 
806   /**
807    * Do a reseek in a normal StoreScanner(scan forward)
808    * @param kv
809    * @return true if scanner has values left, false if end of scanner
810    * @throws IOException
811    */
812   protected boolean seekAsDirection(Cell kv)
813       throws IOException {
814     return reseek(kv);
815   }
816 
817   @Override
818   public boolean reseek(Cell kv) throws IOException {
819     lock.lock();
820     try {
821     //Heap will not be null, if this is called from next() which.
822     //If called from RegionScanner.reseek(...) make sure the scanner
823     //stack is reset if needed.
824     checkReseek();
825     if (explicitColumnQuery && lazySeekEnabledGlobally) {
826       return heap.requestSeek(kv, true, useRowColBloom);
827     }
828     return heap.reseek(kv);
829     } finally {
830       lock.unlock();
831     }
832   }
833 
834   @Override
835   public long getSequenceID() {
836     return 0;
837   }
838 
839   /**
840    * Seek storefiles in parallel to optimize IO latency as much as possible
841    * @param scanners the list {@link KeyValueScanner}s to be read from
842    * @param kv the KeyValue on which the operation is being requested
843    * @throws IOException
844    */
845   private void parallelSeek(final List<? extends KeyValueScanner>
846       scanners, final Cell kv) throws IOException {
847     if (scanners.isEmpty()) return;
848     int storeFileScannerCount = scanners.size();
849     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
850     List<ParallelSeekHandler> handlers = 
851         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
852     for (KeyValueScanner scanner : scanners) {
853       if (scanner instanceof StoreFileScanner) {
854         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
855           this.readPt, latch);
856         executor.submit(seekHandler);
857         handlers.add(seekHandler);
858       } else {
859         scanner.seek(kv);
860         latch.countDown();
861       }
862     }
863 
864     try {
865       latch.await();
866     } catch (InterruptedException ie) {
867       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
868     }
869 
870     for (ParallelSeekHandler handler : handlers) {
871       if (handler.getErr() != null) {
872         throw new IOException(handler.getErr());
873       }
874     }
875   }
876 
877   /**
878    * Used in testing.
879    * @return all scanners in no particular order
880    */
881   List<KeyValueScanner> getAllScannersForTesting() {
882     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
883     KeyValueScanner current = heap.getCurrentForTesting();
884     if (current != null)
885       allScanners.add(current);
886     for (KeyValueScanner scanner : heap.getHeap())
887       allScanners.add(scanner);
888     return allScanners;
889   }
890 
891   static void enableLazySeekGlobally(boolean enable) {
892     lazySeekEnabledGlobally = enable;
893   }
894 
895   /**
896    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
897    */
898   public long getEstimatedNumberOfKvsScanned() {
899     return this.kvsScanned;
900   }
901 
902   @Override
903   public Cell getNextIndexedKey() {
904     return this.heap.getNextIndexedKey();
905   }
906 
907   @Override
908   public void shipped() throws IOException {
909     lock.lock();
910     try {
911       for (KeyValueHeap h : this.heapsForDelayedClose) {
912         h.close();// There wont be further fetch of Cells from these scanners. Just close.
913       }
914       this.heapsForDelayedClose.clear();
915       if (this.heap != null) {
916         this.heap.shipped();
917       }
918     } finally {
919       lock.unlock();
920     }
921   }
922 }
923