View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.KeyValue.KVComparator;
39  import org.apache.hadoop.hbase.KeyValueUtil;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  
52  /**
53   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
54   * into List<KeyValue> for a single row.
55   */
56  @InterfaceAudience.Private
57  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
58      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
59    static final Log LOG = LogFactory.getLog(StoreScanner.class);
60    protected Store store;
61    protected ScanQueryMatcher matcher;
62    protected KeyValueHeap heap;
63    protected boolean cacheBlocks;
64  
65    protected int countPerRow = 0;
66    protected int storeLimit = -1;
67    protected int storeOffset = 0;
68  
69    // Used to indicate that the scanner has closed (see HBASE-1107)
70    // Doesnt need to be volatile because it's always accessed via synchronized methods
71    protected boolean closing = false;
72    protected final boolean isGet;
73    protected final boolean explicitColumnQuery;
74    protected final boolean useRowColBloom;
75    /**
76     * A flag that enables StoreFileScanner parallel-seeking
77     */
78    protected boolean isParallelSeekEnabled = false;
79    protected ExecutorService executor;
80    protected final Scan scan;
81    protected final NavigableSet<byte[]> columns;
82    protected final long oldestUnexpiredTS;
83    protected final long now;
84    protected final int minVersions;
85    protected final long maxRowSize;
86    protected final long cellsPerHeartbeatCheck;
87  
88    /**
89     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
90     * KVs skipped via seeking to next row/column. TODO: estimate them?
91     */
92    private long kvsScanned = 0;
93    private Cell prevCell = null;
94  
95    /** We don't ever expect to change this, the constant is just for clarity. */
96    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
97    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
98        "hbase.storescanner.parallel.seek.enable";
99  
100   /** Used during unit testing to ensure that lazy seek does save seek ops */
101   protected static boolean lazySeekEnabledGlobally =
102       LAZY_SEEK_ENABLED_BY_DEFAULT;
103 
104   /**
105    * The number of cells scanned in between timeout checks. Specifying a larger value means that
106    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
107    * timeout checks.
108    */
109   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
110       "hbase.cells.scanned.per.heartbeat.check";
111 
112   /**
113    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
114    */
115   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
116 
117   // if heap == null and lastTop != null, you need to reseek given the key below
118   protected Cell lastTop = null;
119 
120   // A flag whether use pread for scan
121   private boolean scanUsePread = false;
122   protected ReentrantLock lock = new ReentrantLock();
123   
124   private final long readPt;
125 
126   // used by the injection framework to test race between StoreScanner construction and compaction
127   enum StoreScannerCompactionRace {
128     BEFORE_SEEK,
129     AFTER_SEEK,
130     COMPACT_COMPLETE
131   }
132   
133   /** An internal constructor. */
134   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
135       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
136     this.readPt = readPt;
137     this.store = store;
138     this.cacheBlocks = cacheBlocks;
139     isGet = scan.isGetScan();
140     int numCol = columns == null ? 0 : columns.size();
141     explicitColumnQuery = numCol > 0;
142     this.scan = scan;
143     this.columns = columns;
144     this.now = EnvironmentEdgeManager.currentTime();
145     this.oldestUnexpiredTS = now - ttl;
146     this.minVersions = minVersions;
147 
148     if (store != null && ((HStore)store).getHRegion() != null
149         && ((HStore)store).getHRegion().getBaseConf() != null) {
150       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
151       this.maxRowSize =
152           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
153       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
154 
155       long tmpCellsPerTimeoutCheck =
156           conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
157             DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
158       this.cellsPerHeartbeatCheck =
159           tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
160               : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
161     } else {
162       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
163       this.scanUsePread = scan.isSmall();
164       this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
165     }
166 
167     // We look up row-column Bloom filters for multi-column queries as part of
168     // the seek operation. However, we also look the row-column Bloom filter
169     // for multi-row (non-"get") scans because this is not done in
170     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
171     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
172 
173     // The parallel-seeking is on :
174     // 1) the config value is *true*
175     // 2) store has more than one store file
176     if (store != null && ((HStore)store).getHRegion() != null
177         && store.getStorefilesCount() > 1) {
178       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
179       if (rsService == null || !rsService.getConfiguration().getBoolean(
180             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
181       isParallelSeekEnabled = true;
182       executor = rsService.getExecutorService();
183     }
184   }
185 
186   /**
187    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
188    * are not in a compaction.
189    *
190    * @param store who we scan
191    * @param scan the spec
192    * @param columns which columns we are scanning
193    * @throws IOException
194    */
195   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
196       long readPt)
197                               throws IOException {
198     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
199         scanInfo.getMinVersions(), readPt);
200     if (columns != null && scan.isRaw()) {
201       throw new DoNotRetryIOException(
202           "Cannot specify any column for a raw scan");
203     }
204     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
205         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
206         oldestUnexpiredTS, now, store.getCoprocessorHost());
207 
208     this.store.addChangedReaderObserver(this);
209 
210     try {
211       // Pass columns to try to filter out unnecessary StoreFiles.
212       List<KeyValueScanner> scanners = getScannersNoCompaction();
213 
214       // Seek all scanners to the start of the Row (or if the exact matching row
215       // key does not exist, then to the start of the next matching Row).
216       // Always check bloom filter to optimize the top row seek for delete
217       // family marker.
218       seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
219         isParallelSeekEnabled);
220 
221       // set storeLimit
222       this.storeLimit = scan.getMaxResultsPerColumnFamily();
223 
224       // set rowOffset
225       this.storeOffset = scan.getRowOffsetPerColumnFamily();
226 
227       // Combine all seeked scanners with a heap
228       resetKVHeap(scanners, store.getComparator());
229     } catch (IOException e) {
230       // remove us from the HStore#changedReaderObservers here or we'll have no chance to
231       // and might cause memory leak
232       this.store.deleteChangedReaderObserver(this);
233       throw e;
234     }
235   }
236 
237   /**
238    * Used for compactions.<p>
239    *
240    * Opens a scanner across specified StoreFiles.
241    * @param store who we scan
242    * @param scan the spec
243    * @param scanners ancillary scanners
244    * @param smallestReadPoint the readPoint that we should use for tracking
245    *          versions
246    */
247   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
248       List<? extends KeyValueScanner> scanners, ScanType scanType,
249       long smallestReadPoint, long earliestPutTs) throws IOException {
250     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
251   }
252 
253   /**
254    * Used for compactions that drop deletes from a limited range of rows.<p>
255    *
256    * Opens a scanner across specified StoreFiles.
257    * @param store who we scan
258    * @param scan the spec
259    * @param scanners ancillary scanners
260    * @param smallestReadPoint the readPoint that we should use for tracking versions
261    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
262    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
263    */
264   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
265       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
266       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
267     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
268         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
269   }
270 
271   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
272       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
273       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
274     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
275         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
276     if (dropDeletesFromRow == null) {
277       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
278           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
279     } else {
280       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
281           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
282     }
283 
284     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
285     scanners = selectScannersFrom(scanners);
286 
287     // Seek all scanners to the initial key
288     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
289 
290     // Combine all seeked scanners with a heap
291     resetKVHeap(scanners, store.getComparator());
292   }
293 
294   /** Constructor for testing. */
295   StoreScanner(final Scan scan, ScanInfo scanInfo,
296       ScanType scanType, final NavigableSet<byte[]> columns,
297       final List<KeyValueScanner> scanners) throws IOException {
298     this(scan, scanInfo, scanType, columns, scanners,
299         HConstants.LATEST_TIMESTAMP,
300         // 0 is passed as readpoint because the test bypasses Store
301         0);
302   }
303 
304   // Constructor for testing.
305   StoreScanner(final Scan scan, ScanInfo scanInfo,
306     ScanType scanType, final NavigableSet<byte[]> columns,
307     final List<KeyValueScanner> scanners, long earliestPutTs)
308         throws IOException {
309     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
310       // 0 is passed as readpoint because the test bypasses Store
311       0);
312   }
313   
314   private StoreScanner(final Scan scan, ScanInfo scanInfo,
315       ScanType scanType, final NavigableSet<byte[]> columns,
316       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
317           throws IOException {
318     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
319         scanInfo.getMinVersions(), readPt);
320     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
321         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
322 
323     // In unit tests, the store could be null
324     if (this.store != null) {
325       this.store.addChangedReaderObserver(this);
326     }
327     // Seek all scanners to the initial key
328     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
329     resetKVHeap(scanners, scanInfo.getComparator());
330   }
331 
332   /**
333    * Get a filtered list of scanners. Assumes we are not in a compaction.
334    * @return list of scanners to seek
335    */
336   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
337     final boolean isCompaction = false;
338     boolean usePread = isGet || scanUsePread;
339     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
340         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
341   }
342 
343   /**
344    * Seek the specified scanners with the given key
345    * @param scanners
346    * @param seekKey
347    * @param isLazy true if using lazy seek
348    * @param isParallelSeek true if using parallel seek
349    * @throws IOException
350    */
351   protected void seekScanners(List<? extends KeyValueScanner> scanners,
352       Cell seekKey, boolean isLazy, boolean isParallelSeek)
353       throws IOException {
354     // Seek all scanners to the start of the Row (or if the exact matching row
355     // key does not exist, then to the start of the next matching Row).
356     // Always check bloom filter to optimize the top row seek for delete
357     // family marker.
358     if (isLazy) {
359       for (KeyValueScanner scanner : scanners) {
360         scanner.requestSeek(seekKey, false, true);
361       }
362     } else {
363       if (!isParallelSeek) {
364         long totalScannersSoughtBytes = 0;
365         for (KeyValueScanner scanner : scanners) {
366           if (totalScannersSoughtBytes >= maxRowSize) {
367             throw new RowTooBigException("Max row size allowed: " + maxRowSize
368               + ", but row is bigger than that");
369           }
370           scanner.seek(seekKey);
371           Cell c = scanner.peek();
372           if (c != null) {
373             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
374           }
375         }
376       } else {
377         parallelSeek(scanners, seekKey);
378       }
379     }
380   }
381 
382   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
383       KVComparator comparator) throws IOException {
384     // Combine all seeked scanners with a heap
385     heap = new KeyValueHeap(scanners, comparator);
386   }
387 
388   /**
389    * Filters the given list of scanners using Bloom filter, time range, and
390    * TTL.
391    */
392   protected List<KeyValueScanner> selectScannersFrom(
393       final List<? extends KeyValueScanner> allScanners) {
394     boolean memOnly;
395     boolean filesOnly;
396     if (scan instanceof InternalScan) {
397       InternalScan iscan = (InternalScan)scan;
398       memOnly = iscan.isCheckOnlyMemStore();
399       filesOnly = iscan.isCheckOnlyStoreFiles();
400     } else {
401       memOnly = false;
402       filesOnly = false;
403     }
404 
405     List<KeyValueScanner> scanners =
406         new ArrayList<KeyValueScanner>(allScanners.size());
407 
408     // We can only exclude store files based on TTL if minVersions is set to 0.
409     // Otherwise, we might have to return KVs that have technically expired.
410     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
411         Long.MIN_VALUE;
412 
413     // include only those scan files which pass all filters
414     for (KeyValueScanner kvs : allScanners) {
415       boolean isFile = kvs.isFileScanner();
416       if ((!isFile && filesOnly) || (isFile && memOnly)) {
417         continue;
418       }
419 
420       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
421         scanners.add(kvs);
422       }
423     }
424     return scanners;
425   }
426 
427   @Override
428   public Cell peek() {
429     lock.lock();
430     try {
431     if (this.heap == null) {
432       return this.lastTop;
433     }
434     return this.heap.peek();
435     } finally {
436       lock.unlock();
437     }
438   }
439 
440   @Override
441   public KeyValue next() {
442     // throw runtime exception perhaps?
443     throw new RuntimeException("Never call StoreScanner.next()");
444   }
445 
446   @Override
447   public void close() {
448     lock.lock();
449     try {
450     if (this.closing) return;
451     this.closing = true;
452     // under test, we dont have a this.store
453     if (this.store != null)
454       this.store.deleteChangedReaderObserver(this);
455     if (this.heap != null)
456       this.heap.close();
457     this.heap = null; // CLOSED!
458     this.lastTop = null; // If both are null, we are closed.
459     } finally {
460       lock.unlock();
461     }
462   }
463 
464   @Override
465   public boolean seek(Cell key) throws IOException {
466     lock.lock();
467     try {
468     // reset matcher state, in case that underlying store changed
469     checkReseek();
470     return this.heap.seek(key);
471     } finally {
472       lock.unlock();
473     }
474   }
475 
476   @Override
477   public boolean next(List<Cell> outResult) throws IOException {
478     return next(outResult, NoLimitScannerContext.getInstance());
479   }
480 
481   /**
482    * Get the next row of values from this Store.
483    * @param outResult
484    * @param scannerContext
485    * @return true if there are more rows, false if scanner is done
486    */
487   @Override
488   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
489     lock.lock();
490 
491     try {
492     if (scannerContext == null) {
493       throw new IllegalArgumentException("Scanner context cannot be null");
494     }
495     if (checkReseek()) {
496       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
497     }
498 
499     // if the heap was left null, then the scanners had previously run out anyways, close and
500     // return.
501     if (this.heap == null) {
502       close();
503       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504     }
505 
506     Cell cell = this.heap.peek();
507     if (cell == null) {
508       close();
509       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
510     }
511 
512     // only call setRow if the row changes; avoids confusing the query matcher
513     // if scanning intra-row
514     byte[] row = cell.getRowArray();
515     int offset = cell.getRowOffset();
516     short length = cell.getRowLength();
517 
518     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
519     // rows. Else it is possible we are still traversing the same row so we must perform the row
520     // comparison.
521     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null) {
522       this.countPerRow = 0;
523       matcher.setRow(row, offset, length);
524     }
525 
526     // Clear progress away unless invoker has indicated it should be kept.
527     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
528     
529     // Only do a sanity-check if store and comparator are available.
530     KeyValue.KVComparator comparator =
531         store != null ? store.getComparator() : null;
532 
533     int count = 0;
534     long totalBytesRead = 0;
535 
536     LOOP: do {
537       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
538       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
539         scannerContext.updateTimeProgress();
540         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
541           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
542         }
543       }
544 
545       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
546       checkScanOrder(prevCell, cell, comparator);
547       prevCell = cell;
548 
549       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
550       qcode = optimize(qcode, cell);
551       switch(qcode) {
552         case INCLUDE:
553         case INCLUDE_AND_SEEK_NEXT_ROW:
554         case INCLUDE_AND_SEEK_NEXT_COL:
555 
556           Filter f = matcher.getFilter();
557           if (f != null) {
558             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
559             cell = f.transformCell(cell);
560           }
561 
562           this.countPerRow++;
563           if (storeLimit > -1 &&
564               this.countPerRow > (storeLimit + storeOffset)) {
565             // do what SEEK_NEXT_ROW does.
566             if (!matcher.moreRowsMayExistAfter(cell)) {
567               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
568             }
569             // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
570             // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
571             // another compareRow to say the current row is DONE
572             matcher.row = null;
573             seekToNextRow(cell);
574             break LOOP;
575           }
576 
577           // add to results only if we have skipped #storeOffset kvs
578           // also update metric accordingly
579           if (this.countPerRow > storeOffset) {
580             outResult.add(cell);
581 
582             // Update local tracking information
583             count++;
584             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
585 
586             // Update the progress of the scanner context
587             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
588             scannerContext.incrementBatchProgress(1);
589 
590             if (totalBytesRead > maxRowSize) {
591               throw new RowTooBigException("Max row size allowed: " + maxRowSize
592               + ", but the row is bigger than that.");
593             }
594           }
595 
596           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
597             if (!matcher.moreRowsMayExistAfter(cell)) {
598               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
599             }
600             // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
601             // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
602             // another compareRow to say the current row is DONE
603             matcher.row = null;
604             seekToNextRow(cell);
605           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
606             seekAsDirection(matcher.getKeyForNextColumn(cell));
607           } else {
608             this.heap.next();
609           }
610 
611           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
612             break LOOP;
613           }
614           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
615             break LOOP;
616           }
617           continue;
618 
619         case DONE:
620           // We are sure that this row is done and we are in the next row.
621           // So subsequent StoresScanner.next() call need not do another compare
622           // and set the matcher.row
623           matcher.row = null;
624           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
625 
626         case DONE_SCAN:
627           close();
628           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
629 
630         case SEEK_NEXT_ROW:
631           // This is just a relatively simple end of scan fix, to short-cut end
632           // us if there is an endKey in the scan.
633           if (!matcher.moreRowsMayExistAfter(cell)) {
634             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
635           }
636           // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
637           // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
638           // another compareRow to say the current row is DONE
639           matcher.row = null;
640           seekToNextRow(cell);
641           break;
642 
643         case SEEK_NEXT_COL:
644           seekAsDirection(matcher.getKeyForNextColumn(cell));
645           break;
646 
647         case SKIP:
648           this.heap.next();
649           break;
650 
651         case SEEK_NEXT_USING_HINT:
652           // TODO convert resee to Cell?
653           Cell nextKV = matcher.getNextKeyHint(cell);
654           if (nextKV != null) {
655             seekAsDirection(nextKV);
656           } else {
657             heap.next();
658           }
659           break;
660 
661         default:
662           throw new RuntimeException("UNEXPECTED");
663       }
664     } while((cell = this.heap.peek()) != null);
665 
666     if (count > 0) {
667       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
668     }
669 
670     // No more keys
671     close();
672     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
673     } finally {
674       lock.unlock();
675     }
676   }
677 
678   /*
679    * See if we should actually SEEK or rather just SKIP to the next Cell.
680    * (see HBASE-13109)
681    */
682   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
683     switch(qcode) {
684     case INCLUDE_AND_SEEK_NEXT_COL:
685     case SEEK_NEXT_COL:
686     {
687       Cell nextIndexedKey = getNextIndexedKey();
688       if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
689           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
690         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
691       }
692       break;
693     }
694     case INCLUDE_AND_SEEK_NEXT_ROW:
695     case SEEK_NEXT_ROW:
696     {
697       Cell nextIndexedKey = getNextIndexedKey();
698       if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
699           && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
700         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
701       }
702       break;
703     }
704     default:
705       break;
706     }
707     return qcode;
708   }
709 
710   // Implementation of ChangedReadersObserver
711   @Override
712   public void updateReaders() throws IOException {
713     lock.lock();
714     try {
715     if (this.closing) return;
716 
717     // All public synchronized API calls will call 'checkReseek' which will cause
718     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
719     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
720     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
721     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
722     if (this.heap == null) return;
723 
724     // this could be null.
725     this.lastTop = this.peek();
726 
727     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
728 
729     // close scanners to old obsolete Store files
730     this.heap.close(); // bubble thru and close all scanners.
731     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
732 
733     // Let the next() call handle re-creating and seeking
734     } finally {
735       lock.unlock();
736     }
737   }
738 
739   /**
740    * @return true if top of heap has changed (and KeyValueHeap has to try the
741    *         next KV)
742    * @throws IOException
743    */
744   protected boolean checkReseek() throws IOException {
745     if (this.heap == null && this.lastTop != null) {
746       resetScannerStack(this.lastTop);
747       if (this.heap.peek() == null
748           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
749         LOG.debug("Storescanner.peek() is changed where before = "
750             + this.lastTop.toString() + ",and after = " + this.heap.peek());
751         this.lastTop = null;
752         return true;
753       }
754       this.lastTop = null; // gone!
755     }
756     // else dont need to reseek
757     return false;
758   }
759 
760   protected void resetScannerStack(Cell lastTopKey) throws IOException {
761     if (heap != null) {
762       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
763     }
764 
765     /* When we have the scan object, should we not pass it to getScanners()
766      * to get a limited set of scanners? We did so in the constructor and we
767      * could have done it now by storing the scan object from the constructor */
768     List<KeyValueScanner> scanners = getScannersNoCompaction();
769 
770     // Seek all scanners to the initial key
771     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
772 
773     // Combine all seeked scanners with a heap
774     resetKVHeap(scanners, store.getComparator());
775 
776     // Reset the state of the Query Matcher and set to top row.
777     // Only reset and call setRow if the row changes; avoids confusing the
778     // query matcher if scanning intra-row.
779     Cell kv = heap.peek();
780     if (kv == null) {
781       kv = lastTopKey;
782     }
783     byte[] row = kv.getRowArray();
784     int offset = kv.getRowOffset();
785     short length = kv.getRowLength();
786     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
787         matcher.rowOffset, matcher.rowLength)) {
788       this.countPerRow = 0;
789       matcher.reset();
790       matcher.setRow(row, offset, length);
791     }
792   }
793 
794   /**
795    * Check whether scan as expected order
796    * @param prevKV
797    * @param kv
798    * @param comparator
799    * @throws IOException
800    */
801   protected void checkScanOrder(Cell prevKV, Cell kv,
802       KeyValue.KVComparator comparator) throws IOException {
803     // Check that the heap gives us KVs in an increasing order.
804     assert prevKV == null || comparator == null
805         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
806         + " followed by a " + "smaller key " + kv + " in cf " + store;
807   }
808 
809   protected boolean seekToNextRow(Cell kv) throws IOException {
810     return reseek(KeyValueUtil.createLastOnRow(kv));
811   }
812 
813   /**
814    * Do a reseek in a normal StoreScanner(scan forward)
815    * @param kv
816    * @return true if scanner has values left, false if end of scanner
817    * @throws IOException
818    */
819   protected boolean seekAsDirection(Cell kv)
820       throws IOException {
821     return reseek(kv);
822   }
823 
824   @Override
825   public boolean reseek(Cell kv) throws IOException {
826     lock.lock();
827     try {
828     //Heap will not be null, if this is called from next() which.
829     //If called from RegionScanner.reseek(...) make sure the scanner
830     //stack is reset if needed.
831     checkReseek();
832     if (explicitColumnQuery && lazySeekEnabledGlobally) {
833       return heap.requestSeek(kv, true, useRowColBloom);
834     }
835     return heap.reseek(kv);
836     } finally {
837       lock.unlock();
838     }
839   }
840 
841   @Override
842   public long getSequenceID() {
843     return 0;
844   }
845 
846   /**
847    * Seek storefiles in parallel to optimize IO latency as much as possible
848    * @param scanners the list {@link KeyValueScanner}s to be read from
849    * @param kv the KeyValue on which the operation is being requested
850    * @throws IOException
851    */
852   private void parallelSeek(final List<? extends KeyValueScanner>
853       scanners, final Cell kv) throws IOException {
854     if (scanners.isEmpty()) return;
855     int storeFileScannerCount = scanners.size();
856     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
857     List<ParallelSeekHandler> handlers = 
858         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
859     for (KeyValueScanner scanner : scanners) {
860       if (scanner instanceof StoreFileScanner) {
861         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
862           this.readPt, latch);
863         executor.submit(seekHandler);
864         handlers.add(seekHandler);
865       } else {
866         scanner.seek(kv);
867         latch.countDown();
868       }
869     }
870 
871     try {
872       latch.await();
873     } catch (InterruptedException ie) {
874       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
875     }
876 
877     for (ParallelSeekHandler handler : handlers) {
878       if (handler.getErr() != null) {
879         throw new IOException(handler.getErr());
880       }
881     }
882   }
883 
884   /**
885    * Used in testing.
886    * @return all scanners in no particular order
887    */
888   List<KeyValueScanner> getAllScannersForTesting() {
889     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
890     KeyValueScanner current = heap.getCurrentForTesting();
891     if (current != null)
892       allScanners.add(current);
893     for (KeyValueScanner scanner : heap.getHeap())
894       allScanners.add(scanner);
895     return allScanners;
896   }
897 
898   static void enableLazySeekGlobally(boolean enable) {
899     lazySeekEnabledGlobally = enable;
900   }
901 
902   /**
903    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
904    */
905   public long getEstimatedNumberOfKvsScanned() {
906     return this.kvsScanned;
907   }
908 
909   @Override
910   public Cell getNextIndexedKey() {
911     return this.heap.getNextIndexedKey();
912   }
913 }
914