View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellComparator;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.client.IsolationLevel;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.executor.ExecutorService;
45  import org.apache.hadoop.hbase.filter.Filter;
46  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
48  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
49  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  
52  /**
53   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
54   * into List<KeyValue> for a single row.
55   */
56  @InterfaceAudience.Private
57  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
58      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
59    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
60    protected Store store;
61    protected ScanQueryMatcher matcher;
62    protected KeyValueHeap heap;
63    protected boolean cacheBlocks;
64  
65    protected int countPerRow = 0;
66    protected int storeLimit = -1;
67    protected int storeOffset = 0;
68  
69    // Used to indicate that the scanner has closed (see HBASE-1107)
70    // Doesnt need to be volatile because it's always accessed via synchronized methods
71    protected boolean closing = false;
72    protected final boolean isGet;
73    protected final boolean explicitColumnQuery;
74    protected final boolean useRowColBloom;
75    /**
76     * A flag that enables StoreFileScanner parallel-seeking
77     */
78    protected boolean isParallelSeekEnabled = false;
79    protected ExecutorService executor;
80    protected final Scan scan;
81    protected final NavigableSet<byte[]> columns;
82    protected final long oldestUnexpiredTS;
83    protected final long now;
84    protected final int minVersions;
85    protected final long maxRowSize;
86    protected final long cellsPerHeartbeatCheck;
87  
88    // Collects all the KVHeap that are eagerly getting closed during the
89    // course of a scan
90    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
91  
92    /**
93     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
94     * KVs skipped via seeking to next row/column. TODO: estimate them?
95     */
96    private long kvsScanned = 0;
97    private Cell prevCell = null;
98  
99    /** We don't ever expect to change this, the constant is just for clarity. */
100   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
101   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
102       "hbase.storescanner.parallel.seek.enable";
103 
104   /** Used during unit testing to ensure that lazy seek does save seek ops */
105   protected static boolean lazySeekEnabledGlobally =
106       LAZY_SEEK_ENABLED_BY_DEFAULT;
107 
108   /**
109    * The number of cells scanned in between timeout checks. Specifying a larger value means that
110    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
111    * timeout checks.
112    */
113   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
114       "hbase.cells.scanned.per.heartbeat.check";
115 
116   /**
117    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
118    */
119   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
120 
121   // if heap == null and lastTop != null, you need to reseek given the key below
122   protected Cell lastTop = null;
123 
124   // A flag whether use pread for scan
125   private boolean scanUsePread = false;
126   protected ReentrantLock lock = new ReentrantLock();
127   
128   protected final long readPt;
129 
130   // used by the injection framework to test race between StoreScanner construction and compaction
131   enum StoreScannerCompactionRace {
132     BEFORE_SEEK,
133     AFTER_SEEK,
134     COMPACT_COMPLETE
135   }
136   
137   /** An internal constructor. */
138   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
139       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
140     this.readPt = readPt;
141     this.store = store;
142     this.cacheBlocks = cacheBlocks;
143     isGet = scan.isGetScan();
144     int numCol = columns == null ? 0 : columns.size();
145     explicitColumnQuery = numCol > 0;
146     this.scan = scan;
147     this.columns = columns;
148     this.now = EnvironmentEdgeManager.currentTime();
149     this.oldestUnexpiredTS = now - ttl;
150     this.minVersions = minVersions;
151 
152     if (store != null && ((HStore)store).getHRegion() != null
153         && ((HStore)store).getHRegion().getBaseConf() != null) {
154       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
155       this.maxRowSize =
156           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
157       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
158 
159       long tmpCellsPerTimeoutCheck =
160           conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
161             DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
162       this.cellsPerHeartbeatCheck =
163           tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
164               : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
165     } else {
166       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
167       this.scanUsePread = scan.isSmall();
168       this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
169     }
170 
171     // We look up row-column Bloom filters for multi-column queries as part of
172     // the seek operation. However, we also look the row-column Bloom filter
173     // for multi-row (non-"get") scans because this is not done in
174     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
175     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
176 
177     // The parallel-seeking is on :
178     // 1) the config value is *true*
179     // 2) store has more than one store file
180     if (store != null && ((HStore)store).getHRegion() != null
181         && store.getStorefilesCount() > 1) {
182       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
183       if (rsService == null || !rsService.getConfiguration().getBoolean(
184             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
185       isParallelSeekEnabled = true;
186       executor = rsService.getExecutorService();
187     }
188   }
189 
190   /**
191    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
192    * are not in a compaction.
193    *
194    * @param store who we scan
195    * @param scan the spec
196    * @param columns which columns we are scanning
197    * @throws IOException
198    */
199   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
200       long readPt)
201                               throws IOException {
202     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
203         scanInfo.getMinVersions(), readPt);
204     if (columns != null && scan.isRaw()) {
205       throw new DoNotRetryIOException(
206           "Cannot specify any column for a raw scan");
207     }
208     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
209         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
210         oldestUnexpiredTS, now, store.getCoprocessorHost());
211 
212     this.store.addChangedReaderObserver(this);
213 
214     // Pass columns to try to filter out unnecessary StoreFiles.
215     List<KeyValueScanner> scanners = getScannersNoCompaction();
216 
217     // Seek all scanners to the start of the Row (or if the exact matching row
218     // key does not exist, then to the start of the next matching Row).
219     // Always check bloom filter to optimize the top row seek for delete
220     // family marker.
221     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
222         && lazySeekEnabledGlobally, isParallelSeekEnabled);
223 
224     // set storeLimit
225     this.storeLimit = scan.getMaxResultsPerColumnFamily();
226 
227     // set rowOffset
228     this.storeOffset = scan.getRowOffsetPerColumnFamily();
229 
230     // Combine all seeked scanners with a heap
231     resetKVHeap(scanners, store.getComparator());
232   }
233 
234   /**
235    * Used for compactions.<p>
236    *
237    * Opens a scanner across specified StoreFiles.
238    * @param store who we scan
239    * @param scan the spec
240    * @param scanners ancillary scanners
241    * @param smallestReadPoint the readPoint that we should use for tracking
242    *          versions
243    */
244   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
245       List<? extends KeyValueScanner> scanners, ScanType scanType,
246       long smallestReadPoint, long earliestPutTs) throws IOException {
247     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
248   }
249 
250   /**
251    * Used for compactions that drop deletes from a limited range of rows.<p>
252    *
253    * Opens a scanner across specified StoreFiles.
254    * @param store who we scan
255    * @param scan the spec
256    * @param scanners ancillary scanners
257    * @param smallestReadPoint the readPoint that we should use for tracking versions
258    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
259    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
260    */
261   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
262       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
263       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
264     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
265         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
266   }
267 
268   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
269       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
270       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
271     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
272         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
273     if (dropDeletesFromRow == null) {
274       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
275           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
276     } else {
277       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
278           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
279     }
280 
281     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
282     scanners = selectScannersFrom(scanners);
283 
284     // Seek all scanners to the initial key
285     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
286 
287     // Combine all seeked scanners with a heap
288     resetKVHeap(scanners, store.getComparator());
289   }
290 
291   /** Constructor for testing. */
292   StoreScanner(final Scan scan, ScanInfo scanInfo,
293       ScanType scanType, final NavigableSet<byte[]> columns,
294       final List<KeyValueScanner> scanners) throws IOException {
295     this(scan, scanInfo, scanType, columns, scanners,
296         HConstants.LATEST_TIMESTAMP,
297         // 0 is passed as readpoint because the test bypasses Store
298         0);
299   }
300 
301   // Constructor for testing.
302   StoreScanner(final Scan scan, ScanInfo scanInfo,
303     ScanType scanType, final NavigableSet<byte[]> columns,
304     final List<KeyValueScanner> scanners, long earliestPutTs)
305         throws IOException {
306     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
307       // 0 is passed as readpoint because the test bypasses Store
308       0);
309   }
310 
311   public StoreScanner(final Scan scan, ScanInfo scanInfo,
312       ScanType scanType, final NavigableSet<byte[]> columns,
313       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
314           throws IOException {
315     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
316         scanInfo.getMinVersions(), readPt);
317     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
318         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
319 
320     // In unit tests, the store could be null
321     if (this.store != null) {
322       this.store.addChangedReaderObserver(this);
323     }
324     // Seek all scanners to the initial key
325     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
326     resetKVHeap(scanners, scanInfo.getComparator());
327   }
328 
329   /**
330    * Get a filtered list of scanners. Assumes we are not in a compaction.
331    * @return list of scanners to seek
332    */
333   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
334     final boolean isCompaction = false;
335     boolean usePread = isGet || scanUsePread;
336     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
337         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
338   }
339 
340   /**
341    * Seek the specified scanners with the given key
342    * @param scanners
343    * @param seekKey
344    * @param isLazy true if using lazy seek
345    * @param isParallelSeek true if using parallel seek
346    * @throws IOException
347    */
348   protected void seekScanners(List<? extends KeyValueScanner> scanners,
349       Cell seekKey, boolean isLazy, boolean isParallelSeek)
350       throws IOException {
351     // Seek all scanners to the start of the Row (or if the exact matching row
352     // key does not exist, then to the start of the next matching Row).
353     // Always check bloom filter to optimize the top row seek for delete
354     // family marker.
355     if (isLazy) {
356       for (KeyValueScanner scanner : scanners) {
357         scanner.requestSeek(seekKey, false, true);
358       }
359     } else {
360       if (!isParallelSeek) {
361         long totalScannersSoughtBytes = 0;
362         for (KeyValueScanner scanner : scanners) {
363           if (totalScannersSoughtBytes >= maxRowSize) {
364             throw new RowTooBigException("Max row size allowed: " + maxRowSize
365               + ", but row is bigger than that");
366           }
367           scanner.seek(seekKey);
368           Cell c = scanner.peek();
369           if (c != null) {
370             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
371           }
372         }
373       } else {
374         parallelSeek(scanners, seekKey);
375       }
376     }
377   }
378 
379   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
380       CellComparator comparator) throws IOException {
381     // Combine all seeked scanners with a heap
382     heap = new KeyValueHeap(scanners, comparator);
383   }
384 
385   /**
386    * Filters the given list of scanners using Bloom filter, time range, and
387    * TTL.
388    */
389   protected List<KeyValueScanner> selectScannersFrom(
390       final List<? extends KeyValueScanner> allScanners) {
391     boolean memOnly;
392     boolean filesOnly;
393     if (scan instanceof InternalScan) {
394       InternalScan iscan = (InternalScan)scan;
395       memOnly = iscan.isCheckOnlyMemStore();
396       filesOnly = iscan.isCheckOnlyStoreFiles();
397     } else {
398       memOnly = false;
399       filesOnly = false;
400     }
401 
402     List<KeyValueScanner> scanners =
403         new ArrayList<KeyValueScanner>(allScanners.size());
404 
405     // We can only exclude store files based on TTL if minVersions is set to 0.
406     // Otherwise, we might have to return KVs that have technically expired.
407     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
408         Long.MIN_VALUE;
409 
410     // include only those scan files which pass all filters
411     for (KeyValueScanner kvs : allScanners) {
412       boolean isFile = kvs.isFileScanner();
413       if ((!isFile && filesOnly) || (isFile && memOnly)) {
414         continue;
415       }
416 
417       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
418         scanners.add(kvs);
419       }
420     }
421     return scanners;
422   }
423 
424   @Override
425   public Cell peek() {
426     lock.lock();
427     try {
428     if (this.heap == null) {
429       return this.lastTop;
430     }
431     return this.heap.peek();
432     } finally {
433       lock.unlock();
434     }
435   }
436 
437   @Override
438   public KeyValue next() {
439     // throw runtime exception perhaps?
440     throw new RuntimeException("Never call StoreScanner.next()");
441   }
442 
443   @Override
444   public void close() {
445     close(true);
446   }
447 
448   private void close(boolean withHeapClose){
449     lock.lock();
450     try {
451       if (this.closing) {
452         return;
453       }
454       if (withHeapClose) this.closing = true;
455       // under test, we dont have a this.store
456       if (this.store != null) this.store.deleteChangedReaderObserver(this);
457       if (withHeapClose) {
458         for (KeyValueHeap h : this.heapsForDelayedClose) {
459           h.close();
460         }
461         this.heapsForDelayedClose.clear();
462         if (this.heap != null) {
463           this.heap.close();
464           this.heap = null; // CLOSED!
465         }
466       } else {
467         if (this.heap != null) {
468           this.heapsForDelayedClose.add(this.heap);
469           this.heap = null;
470         }
471       }
472       this.lastTop = null; // If both are null, we are closed.
473     } finally {
474       lock.unlock();
475     }
476   }
477 
478   @Override
479   public boolean seek(Cell key) throws IOException {
480     lock.lock();
481     try {
482     // reset matcher state, in case that underlying store changed
483     checkReseek();
484     return this.heap.seek(key);
485     } finally {
486       lock.unlock();
487     }
488   }
489 
490   @Override
491   public boolean next(List<Cell> outResult) throws IOException {
492     return next(outResult, NoLimitScannerContext.getInstance());
493   }
494 
495   /**
496    * Get the next row of values from this Store.
497    * @param outResult
498    * @param scannerContext
499    * @return true if there are more rows, false if scanner is done
500    */
501   @Override
502   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
503     lock.lock();
504 
505     try {
506     if (scannerContext == null) {
507       throw new IllegalArgumentException("Scanner context cannot be null");
508     }
509     if (checkReseek()) {
510       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
511     }
512 
513     // if the heap was left null, then the scanners had previously run out anyways, close and
514     // return.
515     if (this.heap == null) {
516       // By this time partial close should happened because already heap is null
517       close(false);// Do all cleanup except heap.close()
518       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
519     }
520 
521     Cell peeked = this.heap.peek();
522     if (peeked == null) {
523       close(false);// Do all cleanup except heap.close()
524       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
525     }
526 
527     // only call setRow if the row changes; avoids confusing the query matcher
528     // if scanning intra-row
529 
530     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
531     // rows. Else it is possible we are still traversing the same row so we must perform the row
532     // comparison.
533     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
534         !CellUtil.matchingRow(peeked, matcher.curCell)) {
535       this.countPerRow = 0;
536       matcher.setToNewRow(peeked);
537     }
538 
539     // Clear progress away unless invoker has indicated it should be kept.
540     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
541     
542     Cell cell;
543 
544     // Only do a sanity-check if store and comparator are available.
545     CellComparator comparator =
546         store != null ? store.getComparator() : null;
547 
548     int count = 0;
549     long totalBytesRead = 0;
550 
551     LOOP: while((cell = this.heap.peek()) != null) {
552       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
553       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
554         scannerContext.updateTimeProgress();
555         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
556           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
557         }
558       }
559 
560       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
561       checkScanOrder(prevCell, cell, comparator);
562       prevCell = cell;
563 
564       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
565       qcode = optimize(qcode, cell);
566       switch(qcode) {
567         case INCLUDE:
568         case INCLUDE_AND_SEEK_NEXT_ROW:
569         case INCLUDE_AND_SEEK_NEXT_COL:
570 
571           Filter f = matcher.getFilter();
572           if (f != null) {
573             cell = f.transformCell(cell);
574           }
575 
576           this.countPerRow++;
577           if (storeLimit > -1 &&
578               this.countPerRow > (storeLimit + storeOffset)) {
579             // do what SEEK_NEXT_ROW does.
580             if (!matcher.moreRowsMayExistAfter(cell)) {
581               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
582             }
583             seekToNextRow(cell);
584             break LOOP;
585           }
586 
587           // add to results only if we have skipped #storeOffset kvs
588           // also update metric accordingly
589           if (this.countPerRow > storeOffset) {
590             outResult.add(cell);
591 
592             // Update local tracking information
593             count++;
594             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
595 
596             // Update the progress of the scanner context
597             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
598             scannerContext.incrementBatchProgress(1);
599 
600             if (totalBytesRead > maxRowSize) {
601               throw new RowTooBigException("Max row size allowed: " + maxRowSize
602               + ", but the row is bigger than that.");
603             }
604           }
605 
606           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
607             if (!matcher.moreRowsMayExistAfter(cell)) {
608               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
609             }
610             seekToNextRow(cell);
611           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
612             seekAsDirection(matcher.getKeyForNextColumn(cell));
613           } else {
614             this.heap.next();
615           }
616 
617           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
618             break LOOP;
619           }
620           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
621             break LOOP;
622           }
623           continue;
624 
625         case DONE:
626           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
627 
628         case DONE_SCAN:
629           close(false);// Do all cleanup except heap.close()
630           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
631 
632         case SEEK_NEXT_ROW:
633           // This is just a relatively simple end of scan fix, to short-cut end
634           // us if there is an endKey in the scan.
635           if (!matcher.moreRowsMayExistAfter(cell)) {
636             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
637           }
638 
639           seekToNextRow(cell);
640           break;
641 
642         case SEEK_NEXT_COL:
643           seekAsDirection(matcher.getKeyForNextColumn(cell));
644           break;
645 
646         case SKIP:
647           this.heap.next();
648           break;
649 
650         case SEEK_NEXT_USING_HINT:
651           Cell nextKV = matcher.getNextKeyHint(cell);
652           if (nextKV != null) {
653             seekAsDirection(nextKV);
654           } else {
655             heap.next();
656           }
657           break;
658 
659         default:
660           throw new RuntimeException("UNEXPECTED");
661       }
662     }
663 
664     if (count > 0) {
665       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
666     }
667 
668     // No more keys
669     close(false);// Do all cleanup except heap.close()
670     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
671     } finally {
672       lock.unlock();
673     }
674   }
675 
676   /*
677    * See if we should actually SEEK or rather just SKIP to the next Cell.
678    * (see HBASE-13109)
679    */
680   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
681     Cell nextIndexedKey = getNextIndexedKey();
682     if (nextIndexedKey == null || nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
683         store == null) {
684       return qcode;
685     }
686     switch(qcode) {
687     case INCLUDE_AND_SEEK_NEXT_COL:
688     case SEEK_NEXT_COL:
689     {
690       if (matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
691         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
692       }
693       break;
694     }
695     case INCLUDE_AND_SEEK_NEXT_ROW:
696     case SEEK_NEXT_ROW:
697     {
698       if (matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
699         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
700       }
701       break;
702     }
703     default:
704       break;
705     }
706     return qcode;
707   }
708 
709   // Implementation of ChangedReadersObserver
710   @Override
711   public void updateReaders() throws IOException {
712     lock.lock();
713     try {
714     if (this.closing) return;
715 
716     // All public synchronized API calls will call 'checkReseek' which will cause
717     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
718     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
719     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
720     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
721     if (this.heap == null) return;
722 
723     // this could be null.
724     this.lastTop = this.peek();
725 
726     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
727 
728     // close scanners to old obsolete Store files
729     this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
730     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
731 
732     // Let the next() call handle re-creating and seeking
733     } finally {
734       lock.unlock();
735     }
736   }
737 
738   /**
739    * @return true if top of heap has changed (and KeyValueHeap has to try the
740    *         next KV)
741    * @throws IOException
742    */
743   protected boolean checkReseek() throws IOException {
744     if (this.heap == null && this.lastTop != null) {
745       resetScannerStack(this.lastTop);
746       if (this.heap.peek() == null
747           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
748         LOG.debug("Storescanner.peek() is changed where before = "
749             + this.lastTop.toString() + ",and after = " + this.heap.peek());
750         this.lastTop = null;
751         return true;
752       }
753       this.lastTop = null; // gone!
754     }
755     // else dont need to reseek
756     return false;
757   }
758 
759   protected void resetScannerStack(Cell lastTopKey) throws IOException {
760     if (heap != null) {
761       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
762     }
763 
764     /* When we have the scan object, should we not pass it to getScanners()
765      * to get a limited set of scanners? We did so in the constructor and we
766      * could have done it now by storing the scan object from the constructor */
767     List<KeyValueScanner> scanners = getScannersNoCompaction();
768 
769     // Seek all scanners to the initial key
770     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
771 
772     // Combine all seeked scanners with a heap
773     resetKVHeap(scanners, store.getComparator());
774 
775     // Reset the state of the Query Matcher and set to top row.
776     // Only reset and call setRow if the row changes; avoids confusing the
777     // query matcher if scanning intra-row.
778     Cell cell = heap.peek();
779     if (cell == null) {
780       cell = lastTopKey;
781     }
782     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
783       this.countPerRow = 0;
784       matcher.reset();
785       matcher.setToNewRow(cell);
786     }
787   }
788 
789   /**
790    * Check whether scan as expected order
791    * @param prevKV
792    * @param kv
793    * @param comparator
794    * @throws IOException
795    */
796   protected void checkScanOrder(Cell prevKV, Cell kv,
797       CellComparator comparator) throws IOException {
798     // Check that the heap gives us KVs in an increasing order.
799     assert prevKV == null || comparator == null
800         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
801         + " followed by a " + "smaller key " + kv + " in cf " + store;
802   }
803 
804   protected boolean seekToNextRow(Cell c) throws IOException {
805     return reseek(CellUtil.createLastOnRow(c));
806   }
807 
808   /**
809    * Do a reseek in a normal StoreScanner(scan forward)
810    * @param kv
811    * @return true if scanner has values left, false if end of scanner
812    * @throws IOException
813    */
814   protected boolean seekAsDirection(Cell kv)
815       throws IOException {
816     return reseek(kv);
817   }
818 
819   @Override
820   public boolean reseek(Cell kv) throws IOException {
821     lock.lock();
822     try {
823     //Heap will not be null, if this is called from next() which.
824     //If called from RegionScanner.reseek(...) make sure the scanner
825     //stack is reset if needed.
826     checkReseek();
827     if (explicitColumnQuery && lazySeekEnabledGlobally) {
828       return heap.requestSeek(kv, true, useRowColBloom);
829     }
830     return heap.reseek(kv);
831     } finally {
832       lock.unlock();
833     }
834   }
835 
836   @Override
837   public long getSequenceID() {
838     return 0;
839   }
840 
841   /**
842    * Seek storefiles in parallel to optimize IO latency as much as possible
843    * @param scanners the list {@link KeyValueScanner}s to be read from
844    * @param kv the KeyValue on which the operation is being requested
845    * @throws IOException
846    */
847   private void parallelSeek(final List<? extends KeyValueScanner>
848       scanners, final Cell kv) throws IOException {
849     if (scanners.isEmpty()) return;
850     int storeFileScannerCount = scanners.size();
851     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
852     List<ParallelSeekHandler> handlers = 
853         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
854     for (KeyValueScanner scanner : scanners) {
855       if (scanner instanceof StoreFileScanner) {
856         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
857           this.readPt, latch);
858         executor.submit(seekHandler);
859         handlers.add(seekHandler);
860       } else {
861         scanner.seek(kv);
862         latch.countDown();
863       }
864     }
865 
866     try {
867       latch.await();
868     } catch (InterruptedException ie) {
869       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
870     }
871 
872     for (ParallelSeekHandler handler : handlers) {
873       if (handler.getErr() != null) {
874         throw new IOException(handler.getErr());
875       }
876     }
877   }
878 
879   /**
880    * Used in testing.
881    * @return all scanners in no particular order
882    */
883   List<KeyValueScanner> getAllScannersForTesting() {
884     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
885     KeyValueScanner current = heap.getCurrentForTesting();
886     if (current != null)
887       allScanners.add(current);
888     for (KeyValueScanner scanner : heap.getHeap())
889       allScanners.add(scanner);
890     return allScanners;
891   }
892 
893   static void enableLazySeekGlobally(boolean enable) {
894     lazySeekEnabledGlobally = enable;
895   }
896 
897   /**
898    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
899    */
900   public long getEstimatedNumberOfKvsScanned() {
901     return this.kvsScanned;
902   }
903 
904   @Override
905   public Cell getNextIndexedKey() {
906     return this.heap.getNextIndexedKey();
907   }
908 
909   @Override
910   public void shipped() throws IOException {
911     lock.lock();
912     try {
913       for (KeyValueHeap h : this.heapsForDelayedClose) {
914         h.close();// There wont be further fetch of Cells from these scanners. Just close.
915       }
916       this.heapsForDelayedClose.clear();
917       if (this.heap != null) {
918         this.heap.shipped();
919       }
920     } finally {
921       lock.unlock();
922     }
923   }
924 }
925