View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.KeyValue.KVComparator;
39  import org.apache.hadoop.hbase.KeyValueUtil;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  
50  /**
51   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
52   * into List<KeyValue> for a single row.
53   */
54  @InterfaceAudience.Private
55  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
56      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
57    static final Log LOG = LogFactory.getLog(StoreScanner.class);
58    protected Store store;
59    protected ScanQueryMatcher matcher;
60    protected KeyValueHeap heap;
61    protected boolean cacheBlocks;
62  
63    protected int countPerRow = 0;
64    protected int storeLimit = -1;
65    protected int storeOffset = 0;
66  
67    // Used to indicate that the scanner has closed (see HBASE-1107)
68    // Doesnt need to be volatile because it's always accessed via synchronized methods
69    protected boolean closing = false;
70    protected final boolean isGet;
71    protected final boolean explicitColumnQuery;
72    protected final boolean useRowColBloom;
73    /**
74     * A flag that enables StoreFileScanner parallel-seeking
75     */
76    protected boolean isParallelSeekEnabled = false;
77    protected ExecutorService executor;
78    protected final Scan scan;
79    protected final NavigableSet<byte[]> columns;
80    protected final long oldestUnexpiredTS;
81    protected final long now;
82    protected final int minVersions;
83    protected final long maxRowSize;
84  
85    /**
86     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
87     * KVs skipped via seeking to next row/column. TODO: estimate them?
88     */
89    private long kvsScanned = 0;
90    private Cell prevCell = null;
91  
92    /** We don't ever expect to change this, the constant is just for clarity. */
93    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
94    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
95        "hbase.storescanner.parallel.seek.enable";
96  
97    /** Used during unit testing to ensure that lazy seek does save seek ops */
98    protected static boolean lazySeekEnabledGlobally =
99        LAZY_SEEK_ENABLED_BY_DEFAULT;
100 
101   // if heap == null and lastTop != null, you need to reseek given the key below
102   protected Cell lastTop = null;
103 
104   // A flag whether use pread for scan
105   private boolean scanUsePread = false;
106   protected ReentrantLock lock = new ReentrantLock();
107   
108   private final long readPt;
109 
110   // used by the injection framework to test race between StoreScanner construction and compaction
111   enum StoreScannerCompactionRace {
112     BEFORE_SEEK,
113     AFTER_SEEK,
114     COMPACT_COMPLETE
115   }
116   
117   /** An internal constructor. */
118   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
119       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
120     this.readPt = readPt;
121     this.store = store;
122     this.cacheBlocks = cacheBlocks;
123     isGet = scan.isGetScan();
124     int numCol = columns == null ? 0 : columns.size();
125     explicitColumnQuery = numCol > 0;
126     this.scan = scan;
127     this.columns = columns;
128     this.now = EnvironmentEdgeManager.currentTime();
129     this.oldestUnexpiredTS = now - ttl;
130     this.minVersions = minVersions;
131 
132     if (store != null && ((HStore)store).getHRegion() != null
133         && ((HStore)store).getHRegion().getBaseConf() != null) {
134       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
135       this.maxRowSize =
136           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
137       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
138     } else {
139       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
140       this.scanUsePread = scan.isSmall();
141     }
142 
143     // We look up row-column Bloom filters for multi-column queries as part of
144     // the seek operation. However, we also look the row-column Bloom filter
145     // for multi-row (non-"get") scans because this is not done in
146     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
147     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
148 
149     // The parallel-seeking is on :
150     // 1) the config value is *true*
151     // 2) store has more than one store file
152     if (store != null && ((HStore)store).getHRegion() != null
153         && store.getStorefilesCount() > 1) {
154       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
155       if (rsService == null || !rsService.getConfiguration().getBoolean(
156             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
157       isParallelSeekEnabled = true;
158       executor = rsService.getExecutorService();
159     }
160   }
161 
162   /**
163    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
164    * are not in a compaction.
165    *
166    * @param store who we scan
167    * @param scan the spec
168    * @param columns which columns we are scanning
169    * @throws IOException
170    */
171   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
172       long readPt)
173                               throws IOException {
174     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
175         scanInfo.getMinVersions(), readPt);
176     if (columns != null && scan.isRaw()) {
177       throw new DoNotRetryIOException(
178           "Cannot specify any column for a raw scan");
179     }
180     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
181         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
182         oldestUnexpiredTS, now, store.getCoprocessorHost());
183 
184     this.store.addChangedReaderObserver(this);
185 
186     // Pass columns to try to filter out unnecessary StoreFiles.
187     List<KeyValueScanner> scanners = getScannersNoCompaction();
188 
189     // Seek all scanners to the start of the Row (or if the exact matching row
190     // key does not exist, then to the start of the next matching Row).
191     // Always check bloom filter to optimize the top row seek for delete
192     // family marker.
193     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
194         && lazySeekEnabledGlobally, isParallelSeekEnabled);
195 
196     // set storeLimit
197     this.storeLimit = scan.getMaxResultsPerColumnFamily();
198 
199     // set rowOffset
200     this.storeOffset = scan.getRowOffsetPerColumnFamily();
201 
202     // Combine all seeked scanners with a heap
203     resetKVHeap(scanners, store.getComparator());
204   }
205 
206   /**
207    * Used for compactions.<p>
208    *
209    * Opens a scanner across specified StoreFiles.
210    * @param store who we scan
211    * @param scan the spec
212    * @param scanners ancillary scanners
213    * @param smallestReadPoint the readPoint that we should use for tracking
214    *          versions
215    */
216   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
217       List<? extends KeyValueScanner> scanners, ScanType scanType,
218       long smallestReadPoint, long earliestPutTs) throws IOException {
219     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
220   }
221 
222   /**
223    * Used for compactions that drop deletes from a limited range of rows.<p>
224    *
225    * Opens a scanner across specified StoreFiles.
226    * @param store who we scan
227    * @param scan the spec
228    * @param scanners ancillary scanners
229    * @param smallestReadPoint the readPoint that we should use for tracking versions
230    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
231    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
232    */
233   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
234       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
235       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
236     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
237         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
238   }
239 
240   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
241       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
242       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
243     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
244         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
245     if (dropDeletesFromRow == null) {
246       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
247           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
248     } else {
249       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
250           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
251     }
252 
253     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
254     scanners = selectScannersFrom(scanners);
255 
256     // Seek all scanners to the initial key
257     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
258 
259     // Combine all seeked scanners with a heap
260     resetKVHeap(scanners, store.getComparator());
261   }
262 
263   /** Constructor for testing. */
264   StoreScanner(final Scan scan, ScanInfo scanInfo,
265       ScanType scanType, final NavigableSet<byte[]> columns,
266       final List<KeyValueScanner> scanners) throws IOException {
267     this(scan, scanInfo, scanType, columns, scanners,
268         HConstants.LATEST_TIMESTAMP,
269         // 0 is passed as readpoint because the test bypasses Store
270         0);
271   }
272 
273   // Constructor for testing.
274   StoreScanner(final Scan scan, ScanInfo scanInfo,
275     ScanType scanType, final NavigableSet<byte[]> columns,
276     final List<KeyValueScanner> scanners, long earliestPutTs)
277         throws IOException {
278     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
279       // 0 is passed as readpoint because the test bypasses Store
280       0);
281   }
282   
283   private StoreScanner(final Scan scan, ScanInfo scanInfo,
284       ScanType scanType, final NavigableSet<byte[]> columns,
285       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
286           throws IOException {
287     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
288         scanInfo.getMinVersions(), readPt);
289     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
290         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
291 
292     // In unit tests, the store could be null
293     if (this.store != null) {
294       this.store.addChangedReaderObserver(this);
295     }
296     // Seek all scanners to the initial key
297     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
298     resetKVHeap(scanners, scanInfo.getComparator());
299   }
300 
301   /**
302    * Get a filtered list of scanners. Assumes we are not in a compaction.
303    * @return list of scanners to seek
304    */
305   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
306     final boolean isCompaction = false;
307     boolean usePread = isGet || scanUsePread;
308     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
309         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
310   }
311 
312   /**
313    * Seek the specified scanners with the given key
314    * @param scanners
315    * @param seekKey
316    * @param isLazy true if using lazy seek
317    * @param isParallelSeek true if using parallel seek
318    * @throws IOException
319    */
320   protected void seekScanners(List<? extends KeyValueScanner> scanners,
321       Cell seekKey, boolean isLazy, boolean isParallelSeek)
322       throws IOException {
323     // Seek all scanners to the start of the Row (or if the exact matching row
324     // key does not exist, then to the start of the next matching Row).
325     // Always check bloom filter to optimize the top row seek for delete
326     // family marker.
327     if (isLazy) {
328       for (KeyValueScanner scanner : scanners) {
329         scanner.requestSeek(seekKey, false, true);
330       }
331     } else {
332       if (!isParallelSeek) {
333         long totalScannersSoughtBytes = 0;
334         for (KeyValueScanner scanner : scanners) {
335           if (totalScannersSoughtBytes >= maxRowSize) {
336             throw new RowTooBigException("Max row size allowed: " + maxRowSize
337               + ", but row is bigger than that");
338           }
339           scanner.seek(seekKey);
340           Cell c = scanner.peek();
341           if (c != null) {
342             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
343           }
344         }
345       } else {
346         parallelSeek(scanners, seekKey);
347       }
348     }
349   }
350 
351   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
352       KVComparator comparator) throws IOException {
353     // Combine all seeked scanners with a heap
354     heap = new KeyValueHeap(scanners, comparator);
355   }
356 
357   /**
358    * Filters the given list of scanners using Bloom filter, time range, and
359    * TTL.
360    */
361   protected List<KeyValueScanner> selectScannersFrom(
362       final List<? extends KeyValueScanner> allScanners) {
363     boolean memOnly;
364     boolean filesOnly;
365     if (scan instanceof InternalScan) {
366       InternalScan iscan = (InternalScan)scan;
367       memOnly = iscan.isCheckOnlyMemStore();
368       filesOnly = iscan.isCheckOnlyStoreFiles();
369     } else {
370       memOnly = false;
371       filesOnly = false;
372     }
373 
374     List<KeyValueScanner> scanners =
375         new ArrayList<KeyValueScanner>(allScanners.size());
376 
377     // We can only exclude store files based on TTL if minVersions is set to 0.
378     // Otherwise, we might have to return KVs that have technically expired.
379     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
380         Long.MIN_VALUE;
381 
382     // include only those scan files which pass all filters
383     for (KeyValueScanner kvs : allScanners) {
384       boolean isFile = kvs.isFileScanner();
385       if ((!isFile && filesOnly) || (isFile && memOnly)) {
386         continue;
387       }
388 
389       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
390         scanners.add(kvs);
391       }
392     }
393     return scanners;
394   }
395 
396   @Override
397   public Cell peek() {
398     lock.lock();
399     try {
400     if (this.heap == null) {
401       return this.lastTop;
402     }
403     return this.heap.peek();
404     } finally {
405       lock.unlock();
406     }
407   }
408 
409   @Override
410   public KeyValue next() {
411     // throw runtime exception perhaps?
412     throw new RuntimeException("Never call StoreScanner.next()");
413   }
414 
415   @Override
416   public void close() {
417     lock.lock();
418     try {
419     if (this.closing) return;
420     this.closing = true;
421     // under test, we dont have a this.store
422     if (this.store != null)
423       this.store.deleteChangedReaderObserver(this);
424     if (this.heap != null)
425       this.heap.close();
426     this.heap = null; // CLOSED!
427     this.lastTop = null; // If both are null, we are closed.
428     } finally {
429       lock.unlock();
430     }
431   }
432 
433   @Override
434   public boolean seek(Cell key) throws IOException {
435     lock.lock();
436     try {
437     // reset matcher state, in case that underlying store changed
438     checkReseek();
439     return this.heap.seek(key);
440     } finally {
441       lock.unlock();
442     }
443   }
444 
445   /**
446    * Get the next row of values from this Store.
447    * @param outResult
448    * @param limit
449    * @return true if there are more rows, false if scanner is done
450    */
451   @Override
452   public NextState next(List<Cell> outResult, int limit) throws IOException {
453     // -1 means no limit
454     return next(outResult, limit, -1);
455   }
456 
457   /**
458    * Get the next row of values from this Store.
459    * @param outResult
460    * @param limit
461    * @param remainingResultSize
462    * @return true if there are more rows, false if scanner is done
463    */
464   @Override
465   public NextState next(List<Cell> outResult, int limit, long remainingResultSize)
466       throws IOException {
467     lock.lock();
468     try {
469     if (checkReseek()) {
470       return NextState.makeState(NextState.State.MORE_VALUES, 0);
471     }
472 
473     // if the heap was left null, then the scanners had previously run out anyways, close and
474     // return.
475     if (this.heap == null) {
476       close();
477       return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
478     }
479 
480     Cell peeked = this.heap.peek();
481     if (peeked == null) {
482       close();
483       return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
484     }
485 
486     // only call setRow if the row changes; avoids confusing the query matcher
487     // if scanning intra-row
488     byte[] row = peeked.getRowArray();
489     int offset = peeked.getRowOffset();
490     short length = peeked.getRowLength();
491 
492     // If limit < 0 and remainingResultSize < 0 we can skip the row comparison because we know
493     // the row has changed. Else it is possible we are still traversing the same row so we
494     // must perform the row comparison.
495       if ((limit < 0 && remainingResultSize < 0) || matcher.row == null
496           || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset,
497             matcher.rowLength)) {
498         this.countPerRow = 0;
499         matcher.setRow(row, offset, length);
500     }
501 
502     Cell cell;
503 
504     // Only do a sanity-check if store and comparator are available.
505     KeyValue.KVComparator comparator =
506         store != null ? store.getComparator() : null;
507 
508     int count = 0;
509     long totalBytesRead = 0;
510       long totalHeapSize = 0;
511 
512     LOOP: while((cell = this.heap.peek()) != null) {
513       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
514       checkScanOrder(prevCell, cell, comparator);
515       prevCell = cell;
516 
517       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
518       qcode = optimize(qcode, cell);
519       switch(qcode) {
520         case INCLUDE:
521         case INCLUDE_AND_SEEK_NEXT_ROW:
522         case INCLUDE_AND_SEEK_NEXT_COL:
523 
524           Filter f = matcher.getFilter();
525           if (f != null) {
526             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
527             cell = f.transformCell(cell);
528           }
529 
530           this.countPerRow++;
531           if (storeLimit > -1 &&
532               this.countPerRow > (storeLimit + storeOffset)) {
533             // do what SEEK_NEXT_ROW does.
534             if (!matcher.moreRowsMayExistAfter(cell)) {
535               return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
536             }
537             seekToNextRow(cell);
538             break LOOP;
539           }
540 
541           // add to results only if we have skipped #storeOffset kvs
542           // also update metric accordingly
543           if (this.countPerRow > storeOffset) {
544             outResult.add(cell);
545             count++;
546             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
547             totalHeapSize += CellUtil.estimatedHeapSizeOf(cell);
548             if (totalBytesRead > maxRowSize) {
549               throw new RowTooBigException("Max row size allowed: " + maxRowSize
550               + ", but the row is bigger than that.");
551             }
552           }
553 
554           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
555             if (!matcher.moreRowsMayExistAfter(cell)) {
556               return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
557             }
558             seekToNextRow(cell);
559           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
560             seekAsDirection(matcher.getKeyForNextColumn(cell));
561           } else {
562             this.heap.next();
563           }
564 
565           if (limit > 0 && (count == limit)) {
566             break LOOP;
567           }
568           if (remainingResultSize > 0 && (totalHeapSize >= remainingResultSize)) {
569             break LOOP;
570           }
571           continue;
572 
573         case DONE:
574           return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize);
575 
576         case DONE_SCAN:
577           close();
578           return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
579 
580         case SEEK_NEXT_ROW:
581           // This is just a relatively simple end of scan fix, to short-cut end
582           // us if there is an endKey in the scan.
583           if (!matcher.moreRowsMayExistAfter(cell)) {
584             return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
585           }
586 
587           seekToNextRow(cell);
588           break;
589 
590         case SEEK_NEXT_COL:
591           seekAsDirection(matcher.getKeyForNextColumn(cell));
592           break;
593 
594         case SKIP:
595           this.heap.next();
596           break;
597 
598         case SEEK_NEXT_USING_HINT:
599           // TODO convert resee to Cell?
600           Cell nextKV = matcher.getNextKeyHint(cell);
601           if (nextKV != null) {
602             seekAsDirection(nextKV);
603           } else {
604             heap.next();
605           }
606           break;
607 
608         default:
609           throw new RuntimeException("UNEXPECTED");
610       }
611     }
612 
613     if (count > 0) {
614       return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize);
615     }
616 
617     // No more keys
618     close();
619     return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
620     } finally {
621       lock.unlock();
622     }
623   }
624 
625   /*
626    * See if we should actually SEEK or rather just SKIP to the next Cell.
627    * (see HBASE-13109)
628    */
629   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
630     Cell nextIndexedKey = getNextIndexedKey();
631     if (nextIndexedKey == null || nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
632         store == null) {
633       return qcode;
634     }
635     switch(qcode) {
636     case INCLUDE_AND_SEEK_NEXT_COL:
637     case SEEK_NEXT_COL:
638     {
639       if (matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
640         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
641       }
642       break;
643     }
644     case INCLUDE_AND_SEEK_NEXT_ROW:
645     case SEEK_NEXT_ROW:
646     {
647       if (matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
648         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
649       }
650       break;
651     }
652     default:
653       break;
654     }
655     return qcode;
656   }
657 
658   @Override
659   public NextState next(List<Cell> outResult) throws IOException {
660     return next(outResult, -1);
661   }
662 
663   // Implementation of ChangedReadersObserver
664   @Override
665   public void updateReaders() throws IOException {
666     lock.lock();
667     try {
668     if (this.closing) return;
669 
670     // All public synchronized API calls will call 'checkReseek' which will cause
671     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
672     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
673     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
674     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
675     if (this.heap == null) return;
676 
677     // this could be null.
678     this.lastTop = this.peek();
679 
680     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
681 
682     // close scanners to old obsolete Store files
683     this.heap.close(); // bubble thru and close all scanners.
684     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
685 
686     // Let the next() call handle re-creating and seeking
687     } finally {
688       lock.unlock();
689     }
690   }
691 
692   /**
693    * @return true if top of heap has changed (and KeyValueHeap has to try the
694    *         next KV)
695    * @throws IOException
696    */
697   protected boolean checkReseek() throws IOException {
698     if (this.heap == null && this.lastTop != null) {
699       resetScannerStack(this.lastTop);
700       if (this.heap.peek() == null
701           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
702         LOG.debug("Storescanner.peek() is changed where before = "
703             + this.lastTop.toString() + ",and after = " + this.heap.peek());
704         this.lastTop = null;
705         return true;
706       }
707       this.lastTop = null; // gone!
708     }
709     // else dont need to reseek
710     return false;
711   }
712 
713   protected void resetScannerStack(Cell lastTopKey) throws IOException {
714     if (heap != null) {
715       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
716     }
717 
718     /* When we have the scan object, should we not pass it to getScanners()
719      * to get a limited set of scanners? We did so in the constructor and we
720      * could have done it now by storing the scan object from the constructor */
721     List<KeyValueScanner> scanners = getScannersNoCompaction();
722 
723     // Seek all scanners to the initial key
724     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
725 
726     // Combine all seeked scanners with a heap
727     resetKVHeap(scanners, store.getComparator());
728 
729     // Reset the state of the Query Matcher and set to top row.
730     // Only reset and call setRow if the row changes; avoids confusing the
731     // query matcher if scanning intra-row.
732     Cell kv = heap.peek();
733     if (kv == null) {
734       kv = lastTopKey;
735     }
736     byte[] row = kv.getRowArray();
737     int offset = kv.getRowOffset();
738     short length = kv.getRowLength();
739     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
740         matcher.rowOffset, matcher.rowLength)) {
741       this.countPerRow = 0;
742       matcher.reset();
743       matcher.setRow(row, offset, length);
744     }
745   }
746 
747   /**
748    * Check whether scan as expected order
749    * @param prevKV
750    * @param kv
751    * @param comparator
752    * @throws IOException
753    */
754   protected void checkScanOrder(Cell prevKV, Cell kv,
755       KeyValue.KVComparator comparator) throws IOException {
756     // Check that the heap gives us KVs in an increasing order.
757     assert prevKV == null || comparator == null
758         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
759         + " followed by a " + "smaller key " + kv + " in cf " + store;
760   }
761 
762   protected boolean seekToNextRow(Cell kv) throws IOException {
763     return reseek(KeyValueUtil.createLastOnRow(kv));
764   }
765 
766   /**
767    * Do a reseek in a normal StoreScanner(scan forward)
768    * @param kv
769    * @return true if scanner has values left, false if end of scanner
770    * @throws IOException
771    */
772   protected boolean seekAsDirection(Cell kv)
773       throws IOException {
774     return reseek(kv);
775   }
776 
777   @Override
778   public boolean reseek(Cell kv) throws IOException {
779     lock.lock();
780     try {
781     //Heap will not be null, if this is called from next() which.
782     //If called from RegionScanner.reseek(...) make sure the scanner
783     //stack is reset if needed.
784     checkReseek();
785     if (explicitColumnQuery && lazySeekEnabledGlobally) {
786       return heap.requestSeek(kv, true, useRowColBloom);
787     }
788     return heap.reseek(kv);
789     } finally {
790       lock.unlock();
791     }
792   }
793 
794   @Override
795   public long getSequenceID() {
796     return 0;
797   }
798 
799   /**
800    * Seek storefiles in parallel to optimize IO latency as much as possible
801    * @param scanners the list {@link KeyValueScanner}s to be read from
802    * @param kv the KeyValue on which the operation is being requested
803    * @throws IOException
804    */
805   private void parallelSeek(final List<? extends KeyValueScanner>
806       scanners, final Cell kv) throws IOException {
807     if (scanners.isEmpty()) return;
808     int storeFileScannerCount = scanners.size();
809     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
810     List<ParallelSeekHandler> handlers = 
811         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
812     for (KeyValueScanner scanner : scanners) {
813       if (scanner instanceof StoreFileScanner) {
814         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
815           this.readPt, latch);
816         executor.submit(seekHandler);
817         handlers.add(seekHandler);
818       } else {
819         scanner.seek(kv);
820         latch.countDown();
821       }
822     }
823 
824     try {
825       latch.await();
826     } catch (InterruptedException ie) {
827       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
828     }
829 
830     for (ParallelSeekHandler handler : handlers) {
831       if (handler.getErr() != null) {
832         throw new IOException(handler.getErr());
833       }
834     }
835   }
836 
837   /**
838    * Used in testing.
839    * @return all scanners in no particular order
840    */
841   List<KeyValueScanner> getAllScannersForTesting() {
842     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
843     KeyValueScanner current = heap.getCurrentForTesting();
844     if (current != null)
845       allScanners.add(current);
846     for (KeyValueScanner scanner : heap.getHeap())
847       allScanners.add(scanner);
848     return allScanners;
849   }
850 
851   static void enableLazySeekGlobally(boolean enable) {
852     lazySeekEnabledGlobally = enable;
853   }
854 
855   /**
856    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
857    */
858   public long getEstimatedNumberOfKvsScanned() {
859     return this.kvsScanned;
860   }
861 
862   @Override
863   public Cell getNextIndexedKey() {
864     return this.heap.getNextIndexedKey();
865   }
866 }
867