View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.DoNotRetryIOException;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.KVComparator;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  
49  /**
50   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
51   * into List<KeyValue> for a single row.
52   */
53  @InterfaceAudience.Private
54  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
55      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
56    static final Log LOG = LogFactory.getLog(StoreScanner.class);
57    protected Store store;
58    protected ScanQueryMatcher matcher;
59    protected KeyValueHeap heap;
60    protected boolean cacheBlocks;
61  
62    protected int countPerRow = 0;
63    protected int storeLimit = -1;
64    protected int storeOffset = 0;
65  
66    // Used to indicate that the scanner has closed (see HBASE-1107)
67    // Doesnt need to be volatile because it's always accessed via synchronized methods
68    protected boolean closing = false;
69    protected final boolean isGet;
70    protected final boolean explicitColumnQuery;
71    protected final boolean useRowColBloom;
72    /**
73     * A flag that enables StoreFileScanner parallel-seeking
74     */
75    protected boolean isParallelSeekEnabled = false;
76    protected ExecutorService executor;
77    protected final Scan scan;
78    protected final NavigableSet<byte[]> columns;
79    protected final long oldestUnexpiredTS;
80    protected final long now;
81    protected final int minVersions;
82    protected final long maxRowSize;
83  
84    /**
85     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
86     * KVs skipped via seeking to next row/column. TODO: estimate them?
87     */
88    private long kvsScanned = 0;
89    private Cell prevCell = null;
90  
91    /** We don't ever expect to change this, the constant is just for clarity. */
92    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
93    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
94        "hbase.storescanner.parallel.seek.enable";
95  
96    /** Used during unit testing to ensure that lazy seek does save seek ops */
97    protected static boolean lazySeekEnabledGlobally =
98        LAZY_SEEK_ENABLED_BY_DEFAULT;
99  
100   // if heap == null and lastTop != null, you need to reseek given the key below
101   protected Cell lastTop = null;
102 
103   // A flag whether use pread for scan
104   private boolean scanUsePread = false;
105   protected ReentrantLock lock = new ReentrantLock();
106   
107   private final long readPt;
108 
109   // used by the injection framework to test race between StoreScanner construction and compaction
110   enum StoreScannerCompactionRace {
111     BEFORE_SEEK,
112     AFTER_SEEK,
113     COMPACT_COMPLETE
114   }
115   
116   /** An internal constructor. */
117   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
118       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
119     this.readPt = readPt;
120     this.store = store;
121     this.cacheBlocks = cacheBlocks;
122     isGet = scan.isGetScan();
123     int numCol = columns == null ? 0 : columns.size();
124     explicitColumnQuery = numCol > 0;
125     this.scan = scan;
126     this.columns = columns;
127     this.now = EnvironmentEdgeManager.currentTime();
128     this.oldestUnexpiredTS = now - ttl;
129     this.minVersions = minVersions;
130 
131     if (store != null && ((HStore)store).getHRegion() != null
132         && ((HStore)store).getHRegion().getBaseConf() != null) {
133       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
134       this.maxRowSize =
135           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
136       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
137     } else {
138       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
139       this.scanUsePread = scan.isSmall();
140     }
141 
142     // We look up row-column Bloom filters for multi-column queries as part of
143     // the seek operation. However, we also look the row-column Bloom filter
144     // for multi-row (non-"get") scans because this is not done in
145     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
146     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
147 
148     // The parallel-seeking is on :
149     // 1) the config value is *true*
150     // 2) store has more than one store file
151     if (store != null && ((HStore)store).getHRegion() != null
152         && store.getStorefilesCount() > 1) {
153       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
154       if (rsService == null || !rsService.getConfiguration().getBoolean(
155             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
156       isParallelSeekEnabled = true;
157       executor = rsService.getExecutorService();
158     }
159   }
160 
161   /**
162    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
163    * are not in a compaction.
164    *
165    * @param store who we scan
166    * @param scan the spec
167    * @param columns which columns we are scanning
168    * @throws IOException
169    */
170   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
171       long readPt)
172                               throws IOException {
173     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
174         scanInfo.getMinVersions(), readPt);
175     if (columns != null && scan.isRaw()) {
176       throw new DoNotRetryIOException(
177           "Cannot specify any column for a raw scan");
178     }
179     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
180         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
181         oldestUnexpiredTS, now, store.getCoprocessorHost());
182 
183     this.store.addChangedReaderObserver(this);
184 
185     // Pass columns to try to filter out unnecessary StoreFiles.
186     List<KeyValueScanner> scanners = getScannersNoCompaction();
187 
188     // Seek all scanners to the start of the Row (or if the exact matching row
189     // key does not exist, then to the start of the next matching Row).
190     // Always check bloom filter to optimize the top row seek for delete
191     // family marker.
192     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
193         && lazySeekEnabledGlobally, isParallelSeekEnabled);
194 
195     // set storeLimit
196     this.storeLimit = scan.getMaxResultsPerColumnFamily();
197 
198     // set rowOffset
199     this.storeOffset = scan.getRowOffsetPerColumnFamily();
200 
201     // Combine all seeked scanners with a heap
202     resetKVHeap(scanners, store.getComparator());
203   }
204 
205   /**
206    * Used for compactions.<p>
207    *
208    * Opens a scanner across specified StoreFiles.
209    * @param store who we scan
210    * @param scan the spec
211    * @param scanners ancillary scanners
212    * @param smallestReadPoint the readPoint that we should use for tracking
213    *          versions
214    */
215   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
216       List<? extends KeyValueScanner> scanners, ScanType scanType,
217       long smallestReadPoint, long earliestPutTs) throws IOException {
218     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
219   }
220 
221   /**
222    * Used for compactions that drop deletes from a limited range of rows.<p>
223    *
224    * Opens a scanner across specified StoreFiles.
225    * @param store who we scan
226    * @param scan the spec
227    * @param scanners ancillary scanners
228    * @param smallestReadPoint the readPoint that we should use for tracking versions
229    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
230    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
231    */
232   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
233       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
234       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
235     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
236         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
237   }
238 
239   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
240       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
241       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
242     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
243         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
244     if (dropDeletesFromRow == null) {
245       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
246           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
247     } else {
248       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
249           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
250     }
251 
252     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
253     scanners = selectScannersFrom(scanners);
254 
255     // Seek all scanners to the initial key
256     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
257 
258     // Combine all seeked scanners with a heap
259     resetKVHeap(scanners, store.getComparator());
260   }
261 
262   /** Constructor for testing. */
263   StoreScanner(final Scan scan, ScanInfo scanInfo,
264       ScanType scanType, final NavigableSet<byte[]> columns,
265       final List<KeyValueScanner> scanners) throws IOException {
266     this(scan, scanInfo, scanType, columns, scanners,
267         HConstants.LATEST_TIMESTAMP,
268         // 0 is passed as readpoint because the test bypasses Store
269         0);
270   }
271 
272   // Constructor for testing.
273   StoreScanner(final Scan scan, ScanInfo scanInfo,
274     ScanType scanType, final NavigableSet<byte[]> columns,
275     final List<KeyValueScanner> scanners, long earliestPutTs)
276         throws IOException {
277     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
278       // 0 is passed as readpoint because the test bypasses Store
279       0);
280   }
281   
282   private StoreScanner(final Scan scan, ScanInfo scanInfo,
283       ScanType scanType, final NavigableSet<byte[]> columns,
284       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
285           throws IOException {
286     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
287         scanInfo.getMinVersions(), readPt);
288     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
289         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
290 
291     // In unit tests, the store could be null
292     if (this.store != null) {
293       this.store.addChangedReaderObserver(this);
294     }
295     // Seek all scanners to the initial key
296     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
297     resetKVHeap(scanners, scanInfo.getComparator());
298   }
299 
300   /**
301    * Get a filtered list of scanners. Assumes we are not in a compaction.
302    * @return list of scanners to seek
303    */
304   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
305     final boolean isCompaction = false;
306     boolean usePread = isGet || scanUsePread;
307     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
308         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
309   }
310 
311   /**
312    * Seek the specified scanners with the given key
313    * @param scanners
314    * @param seekKey
315    * @param isLazy true if using lazy seek
316    * @param isParallelSeek true if using parallel seek
317    * @throws IOException
318    */
319   protected void seekScanners(List<? extends KeyValueScanner> scanners,
320       Cell seekKey, boolean isLazy, boolean isParallelSeek)
321       throws IOException {
322     // Seek all scanners to the start of the Row (or if the exact matching row
323     // key does not exist, then to the start of the next matching Row).
324     // Always check bloom filter to optimize the top row seek for delete
325     // family marker.
326     if (isLazy) {
327       for (KeyValueScanner scanner : scanners) {
328         scanner.requestSeek(seekKey, false, true);
329       }
330     } else {
331       if (!isParallelSeek) {
332         long totalScannersSoughtBytes = 0;
333         for (KeyValueScanner scanner : scanners) {
334           if (totalScannersSoughtBytes >= maxRowSize) {
335             throw new RowTooBigException("Max row size allowed: " + maxRowSize
336               + ", but row is bigger than that");
337           }
338           scanner.seek(seekKey);
339           Cell c = scanner.peek();
340           if (c != null ) {
341             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
342           }
343         }
344       } else {
345         parallelSeek(scanners, seekKey);
346       }
347     }
348   }
349 
350   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
351       KVComparator comparator) throws IOException {
352     // Combine all seeked scanners with a heap
353     heap = new KeyValueHeap(scanners, comparator);
354   }
355 
356   /**
357    * Filters the given list of scanners using Bloom filter, time range, and
358    * TTL.
359    */
360   protected List<KeyValueScanner> selectScannersFrom(
361       final List<? extends KeyValueScanner> allScanners) {
362     boolean memOnly;
363     boolean filesOnly;
364     if (scan instanceof InternalScan) {
365       InternalScan iscan = (InternalScan)scan;
366       memOnly = iscan.isCheckOnlyMemStore();
367       filesOnly = iscan.isCheckOnlyStoreFiles();
368     } else {
369       memOnly = false;
370       filesOnly = false;
371     }
372 
373     List<KeyValueScanner> scanners =
374         new ArrayList<KeyValueScanner>(allScanners.size());
375 
376     // We can only exclude store files based on TTL if minVersions is set to 0.
377     // Otherwise, we might have to return KVs that have technically expired.
378     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
379         Long.MIN_VALUE;
380 
381     // include only those scan files which pass all filters
382     for (KeyValueScanner kvs : allScanners) {
383       boolean isFile = kvs.isFileScanner();
384       if ((!isFile && filesOnly) || (isFile && memOnly)) {
385         continue;
386       }
387 
388       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
389         scanners.add(kvs);
390       }
391     }
392     return scanners;
393   }
394 
395   @Override
396   public Cell peek() {
397     lock.lock();
398     try {
399     if (this.heap == null) {
400       return this.lastTop;
401     }
402     return this.heap.peek();
403     } finally {
404       lock.unlock();
405     }
406   }
407 
408   @Override
409   public KeyValue next() {
410     // throw runtime exception perhaps?
411     throw new RuntimeException("Never call StoreScanner.next()");
412   }
413 
414   @Override
415   public void close() {
416     lock.lock();
417     try {
418     if (this.closing) return;
419     this.closing = true;
420     // under test, we dont have a this.store
421     if (this.store != null)
422       this.store.deleteChangedReaderObserver(this);
423     if (this.heap != null)
424       this.heap.close();
425     this.heap = null; // CLOSED!
426     this.lastTop = null; // If both are null, we are closed.
427     } finally {
428       lock.unlock();
429     }
430   }
431 
432   @Override
433   public boolean seek(Cell key) throws IOException {
434     lock.lock();
435     try {
436     // reset matcher state, in case that underlying store changed
437     checkReseek();
438     return this.heap.seek(key);
439     } finally {
440       lock.unlock();
441     }
442   }
443 
444   /**
445    * Get the next row of values from this Store.
446    * @param outResult
447    * @param limit
448    * @return true if there are more rows, false if scanner is done
449    */
450   @Override
451   public boolean next(List<Cell> outResult, int limit) throws IOException {
452     lock.lock();
453     try {
454     if (checkReseek()) {
455       return true;
456     }
457 
458     // if the heap was left null, then the scanners had previously run out anyways, close and
459     // return.
460     if (this.heap == null) {
461       close();
462       return false;
463     }
464 
465     Cell peeked = this.heap.peek();
466     if (peeked == null) {
467       close();
468       return false;
469     }
470 
471     // only call setRow if the row changes; avoids confusing the query matcher
472     // if scanning intra-row
473     byte[] row = peeked.getRowArray();
474     int offset = peeked.getRowOffset();
475     short length = peeked.getRowLength();
476     if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
477         matcher.rowOffset, matcher.rowLength)) {
478       this.countPerRow = 0;
479       matcher.setRow(row, offset, length);
480     }
481 
482     Cell cell;
483 
484     // Only do a sanity-check if store and comparator are available.
485     KeyValue.KVComparator comparator =
486         store != null ? store.getComparator() : null;
487 
488     int count = 0;
489     long totalBytesRead = 0;
490 
491     LOOP: while((cell = this.heap.peek()) != null) {
492       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
493       checkScanOrder(prevCell, cell, comparator);
494       prevCell = cell;
495 
496       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
497       switch(qcode) {
498         case INCLUDE:
499         case INCLUDE_AND_SEEK_NEXT_ROW:
500         case INCLUDE_AND_SEEK_NEXT_COL:
501 
502           Filter f = matcher.getFilter();
503           if (f != null) {
504             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
505             cell = f.transformCell(cell);
506           }
507 
508           this.countPerRow++;
509           if (storeLimit > -1 &&
510               this.countPerRow > (storeLimit + storeOffset)) {
511             // do what SEEK_NEXT_ROW does.
512             if (!matcher.moreRowsMayExistAfter(cell)) {
513               return false;
514             }
515             seekToNextRow(cell);
516             break LOOP;
517           }
518 
519           // add to results only if we have skipped #storeOffset kvs
520           // also update metric accordingly
521           if (this.countPerRow > storeOffset) {
522             outResult.add(cell);
523             count++;
524             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
525             if (totalBytesRead > maxRowSize) {
526               throw new RowTooBigException("Max row size allowed: " + maxRowSize
527               + ", but the row is bigger than that.");
528             }
529           }
530 
531           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
532             if (!matcher.moreRowsMayExistAfter(cell)) {
533               return false;
534             }
535             seekToNextRow(cell);
536           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
537             seekAsDirection(matcher.getKeyForNextColumn(cell));
538           } else {
539             this.heap.next();
540           }
541 
542           if (limit > 0 && (count == limit)) {
543             break LOOP;
544           }
545           continue;
546 
547         case DONE:
548           return true;
549 
550         case DONE_SCAN:
551           close();
552           return false;
553 
554         case SEEK_NEXT_ROW:
555           // This is just a relatively simple end of scan fix, to short-cut end
556           // us if there is an endKey in the scan.
557           if (!matcher.moreRowsMayExistAfter(cell)) {
558             return false;
559           }
560 
561           seekToNextRow(cell);
562           break;
563 
564         case SEEK_NEXT_COL:
565           seekAsDirection(matcher.getKeyForNextColumn(cell));
566           break;
567 
568         case SKIP:
569           this.heap.next();
570           break;
571 
572         case SEEK_NEXT_USING_HINT:
573           // TODO convert resee to Cell?
574           Cell nextKV = matcher.getNextKeyHint(cell);
575           if (nextKV != null) {
576             seekAsDirection(nextKV);
577           } else {
578             heap.next();
579           }
580           break;
581 
582         default:
583           throw new RuntimeException("UNEXPECTED");
584       }
585     }
586 
587     if (count > 0) {
588       return true;
589     }
590 
591     // No more keys
592     close();
593     return false;
594     } finally {
595       lock.unlock();
596     }
597   }
598 
599   @Override
600   public boolean next(List<Cell> outResult) throws IOException {
601     return next(outResult, -1);
602   }
603 
604   // Implementation of ChangedReadersObserver
605   @Override
606   public void updateReaders() throws IOException {
607     lock.lock();
608     try {
609     if (this.closing) return;
610 
611     // All public synchronized API calls will call 'checkReseek' which will cause
612     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
613     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
614     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
615     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
616     if (this.heap == null) return;
617 
618     // this could be null.
619     this.lastTop = this.peek();
620 
621     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
622 
623     // close scanners to old obsolete Store files
624     this.heap.close(); // bubble thru and close all scanners.
625     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
626 
627     // Let the next() call handle re-creating and seeking
628     } finally {
629       lock.unlock();
630     }
631   }
632 
633   /**
634    * @return true if top of heap has changed (and KeyValueHeap has to try the
635    *         next KV)
636    * @throws IOException
637    */
638   protected boolean checkReseek() throws IOException {
639     if (this.heap == null && this.lastTop != null) {
640       resetScannerStack(this.lastTop);
641       if (this.heap.peek() == null
642           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
643         LOG.debug("Storescanner.peek() is changed where before = "
644             + this.lastTop.toString() + ",and after = " + this.heap.peek());
645         this.lastTop = null;
646         return true;
647       }
648       this.lastTop = null; // gone!
649     }
650     // else dont need to reseek
651     return false;
652   }
653 
654   protected void resetScannerStack(Cell lastTopKey) throws IOException {
655     if (heap != null) {
656       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
657     }
658 
659     /* When we have the scan object, should we not pass it to getScanners()
660      * to get a limited set of scanners? We did so in the constructor and we
661      * could have done it now by storing the scan object from the constructor */
662     List<KeyValueScanner> scanners = getScannersNoCompaction();
663 
664     // Seek all scanners to the initial key
665     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
666 
667     // Combine all seeked scanners with a heap
668     resetKVHeap(scanners, store.getComparator());
669 
670     // Reset the state of the Query Matcher and set to top row.
671     // Only reset and call setRow if the row changes; avoids confusing the
672     // query matcher if scanning intra-row.
673     Cell kv = heap.peek();
674     if (kv == null) {
675       kv = lastTopKey;
676     }
677     byte[] row = kv.getRowArray();
678     int offset = kv.getRowOffset();
679     short length = kv.getRowLength();
680     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
681         matcher.rowOffset, matcher.rowLength)) {
682       this.countPerRow = 0;
683       matcher.reset();
684       matcher.setRow(row, offset, length);
685     }
686   }
687 
688   /**
689    * Check whether scan as expected order
690    * @param prevKV
691    * @param kv
692    * @param comparator
693    * @throws IOException
694    */
695   protected void checkScanOrder(Cell prevKV, Cell kv,
696       KeyValue.KVComparator comparator) throws IOException {
697     // Check that the heap gives us KVs in an increasing order.
698     assert prevKV == null || comparator == null
699         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
700         + " followed by a " + "smaller key " + kv + " in cf " + store;
701   }
702 
703   protected boolean seekToNextRow(Cell kv) throws IOException {
704     return reseek(KeyValueUtil.createLastOnRow(kv));
705   }
706 
707   /**
708    * Do a reseek in a normal StoreScanner(scan forward)
709    * @param kv
710    * @return true if scanner has values left, false if end of scanner
711    * @throws IOException
712    */
713   protected boolean seekAsDirection(Cell kv)
714       throws IOException {
715     return reseek(kv);
716   }
717 
718   @Override
719   public boolean reseek(Cell kv) throws IOException {
720     lock.lock();
721     try {
722     //Heap will not be null, if this is called from next() which.
723     //If called from RegionScanner.reseek(...) make sure the scanner
724     //stack is reset if needed.
725     checkReseek();
726     if (explicitColumnQuery && lazySeekEnabledGlobally) {
727       return heap.requestSeek(kv, true, useRowColBloom);
728     }
729     return heap.reseek(kv);
730     } finally {
731       lock.unlock();
732     }
733   }
734 
735   @Override
736   public long getSequenceID() {
737     return 0;
738   }
739 
740   /**
741    * Seek storefiles in parallel to optimize IO latency as much as possible
742    * @param scanners the list {@link KeyValueScanner}s to be read from
743    * @param kv the KeyValue on which the operation is being requested
744    * @throws IOException
745    */
746   private void parallelSeek(final List<? extends KeyValueScanner>
747       scanners, final Cell kv) throws IOException {
748     if (scanners.isEmpty()) return;
749     int storeFileScannerCount = scanners.size();
750     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
751     List<ParallelSeekHandler> handlers = 
752         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
753     for (KeyValueScanner scanner : scanners) {
754       if (scanner instanceof StoreFileScanner) {
755         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
756           this.readPt, latch);
757         executor.submit(seekHandler);
758         handlers.add(seekHandler);
759       } else {
760         scanner.seek(kv);
761         latch.countDown();
762       }
763     }
764 
765     try {
766       latch.await();
767     } catch (InterruptedException ie) {
768       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
769     }
770 
771     for (ParallelSeekHandler handler : handlers) {
772       if (handler.getErr() != null) {
773         throw new IOException(handler.getErr());
774       }
775     }
776   }
777 
778   /**
779    * Used in testing.
780    * @return all scanners in no particular order
781    */
782   List<KeyValueScanner> getAllScannersForTesting() {
783     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
784     KeyValueScanner current = heap.getCurrentForTesting();
785     if (current != null)
786       allScanners.add(current);
787     for (KeyValueScanner scanner : heap.getHeap())
788       allScanners.add(scanner);
789     return allScanners;
790   }
791 
792   static void enableLazySeekGlobally(boolean enable) {
793     lazySeekEnabledGlobally = enable;
794   }
795 
796   /**
797    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
798    */
799   public long getEstimatedNumberOfKvsScanned() {
800     return this.kvsScanned;
801   }
802 }
803