001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.NavigableSet;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.locks.ReentrantLock;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.KeyValueUtil;
037import org.apache.hadoop.hbase.client.IsolationLevel;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.executor.ExecutorService;
040import org.apache.hadoop.hbase.filter.Filter;
041import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
042import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
043import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
044import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
045import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
046import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.yetus.audience.InterfaceAudience;
049
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
056
057/**
058 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
059 * for a single row.
060 * <p>
061 * The implementation is not thread safe. So there will be no race between next and close. The only
062 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
063 * is a flush.
064 */
065@InterfaceAudience.Private
066public class StoreScanner extends NonReversedNonLazyKeyValueScanner
067    implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
068  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
069  // In unit tests, the store could be null
070  protected final HStore store;
071  private final CellComparator comparator;
072  private ScanQueryMatcher matcher;
073  protected KeyValueHeap heap;
074  private boolean cacheBlocks;
075
076  private long countPerRow = 0;
077  private int storeLimit = -1;
078  private int storeOffset = 0;
079
080  // Used to indicate that the scanner has closed (see HBASE-1107)
081  private volatile boolean closing = false;
082  private final boolean get;
083  private final boolean explicitColumnQuery;
084  private final boolean useRowColBloom;
085  /**
086   * A flag that enables StoreFileScanner parallel-seeking
087   */
088  private boolean parallelSeekEnabled = false;
089  private ExecutorService executor;
090  private final Scan scan;
091  private final long oldestUnexpiredTS;
092  private final long now;
093  private final int minVersions;
094  private final long maxRowSize;
095  private final long cellsPerHeartbeatCheck;
096  @VisibleForTesting
097  long memstoreOnlyReads;
098  @VisibleForTesting
099  long mixedReads;
100
101  // 1) Collects all the KVHeap that are eagerly getting closed during the
102  //    course of a scan
103  // 2) Collects the unused memstore scanners. If we close the memstore scanners
104  //    before sending data to client, the chunk may be reclaimed by other
105  //    updates and the data will be corrupt.
106  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
107
108  /**
109   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
110   * KVs skipped via seeking to next row/column. TODO: estimate them?
111   */
112  private long kvsScanned = 0;
113  private Cell prevCell = null;
114
115  private final long preadMaxBytes;
116  private long bytesRead;
117
118  /** We don't ever expect to change this, the constant is just for clarity. */
119  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
120  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
121      "hbase.storescanner.parallel.seek.enable";
122
123  /** Used during unit testing to ensure that lazy seek does save seek ops */
124  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
125
126  /**
127   * The number of cells scanned in between timeout checks. Specifying a larger value means that
128   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
129   * timeout checks.
130   */
131  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
132      "hbase.cells.scanned.per.heartbeat.check";
133
134  /**
135   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
136   */
137  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
138
139  /**
140   * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
141   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
142   * block size for this store.
143   */
144  public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
145
146  private final Scan.ReadType readType;
147
148  // A flag whether use pread for scan
149  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
150  private boolean scanUsePread;
151  // Indicates whether there was flush during the course of the scan
152  private volatile boolean flushed = false;
153  // generally we get one file from a flush
154  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
155  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
156  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
157  // The current list of scanners
158  @VisibleForTesting
159  final List<KeyValueScanner> currentScanners = new ArrayList<>();
160  // flush update lock
161  private final ReentrantLock flushLock = new ReentrantLock();
162  // lock for closing.
163  private final ReentrantLock closeLock = new ReentrantLock();
164
165  protected final long readPt;
166  private boolean topChanged = false;
167
168  /** An internal constructor. */
169  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo,
170      int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
171    this.readPt = readPt;
172    this.store = store;
173    this.cacheBlocks = cacheBlocks;
174    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
175    get = scan.isGetScan();
176    explicitColumnQuery = numColumns > 0;
177    this.scan = scan;
178    this.now = EnvironmentEdgeManager.currentTime();
179    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
180    this.minVersions = scanInfo.getMinVersions();
181
182    // We look up row-column Bloom filters for multi-column queries as part of
183    // the seek operation. However, we also look the row-column Bloom filter
184    // for multi-row (non-"get") scans because this is not done in
185    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
186    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1)
187        && (store == null || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
188    this.maxRowSize = scanInfo.getTableMaxRowSize();
189    if (get) {
190      this.readType = Scan.ReadType.PREAD;
191      this.scanUsePread = true;
192    } else if (scanType != ScanType.USER_SCAN) {
193      // For compaction scanners never use Pread as already we have stream based scanners on the
194      // store files to be compacted
195      this.readType = Scan.ReadType.STREAM;
196      this.scanUsePread = false;
197    } else {
198      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
199        this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT;
200      } else {
201        this.readType = scan.getReadType();
202      }
203      // Always start with pread unless user specific stream. Will change to stream later if
204      // readType is default if the scan keeps running for a long time.
205      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
206    }
207    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
208    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
209    // Parallel seeking is on if the config allows and more there is more than one store file.
210    if (store != null && store.getStorefilesCount() > 1) {
211      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
212      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
213        this.parallelSeekEnabled = true;
214        this.executor = rsService.getExecutorService();
215      }
216    }
217  }
218
219  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
220    this.currentScanners.addAll(scanners);
221  }
222
223  /**
224   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
225   * are not in a compaction.
226   *
227   * @param store who we scan
228   * @param scan the spec
229   * @param columns which columns we are scanning
230   * @throws IOException
231   */
232  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
233      long readPt) throws IOException {
234    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt,
235        scan.getCacheBlocks(), ScanType.USER_SCAN);
236    if (columns != null && scan.isRaw()) {
237      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
238    }
239    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
240      store.getCoprocessorHost());
241
242    store.addChangedReaderObserver(this);
243
244    List<KeyValueScanner> scanners = null;
245    try {
246      // Pass columns to try to filter out unnecessary StoreFiles.
247      scanners = selectScannersFrom(store,
248        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
249          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
250
251      // Seek all scanners to the start of the Row (or if the exact matching row
252      // key does not exist, then to the start of the next matching Row).
253      // Always check bloom filter to optimize the top row seek for delete
254      // family marker.
255      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
256        parallelSeekEnabled);
257
258      // set storeLimit
259      this.storeLimit = scan.getMaxResultsPerColumnFamily();
260
261      // set rowOffset
262      this.storeOffset = scan.getRowOffsetPerColumnFamily();
263      addCurrentScanners(scanners);
264      // Combine all seeked scanners with a heap
265      resetKVHeap(scanners, comparator);
266    } catch (IOException e) {
267      clearAndClose(scanners);
268      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
269      // and might cause memory leak
270      store.deleteChangedReaderObserver(this);
271      throw e;
272    }
273  }
274
275  // a dummy scan instance for compaction.
276  private static final Scan SCAN_FOR_COMPACTION = new Scan();
277
278  /**
279   * Used for store file compaction and memstore compaction.
280   * <p>
281   * Opens a scanner across specified StoreFiles/MemStoreSegments.
282   * @param store who we scan
283   * @param scanners ancillary scanners
284   * @param smallestReadPoint the readPoint that we should use for tracking versions
285   */
286  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
287      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
288    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
289  }
290
291  /**
292   * Used for compactions that drop deletes from a limited range of rows.
293   * <p>
294   * Opens a scanner across specified StoreFiles.
295   * @param store who we scan
296   * @param scanners ancillary scanners
297   * @param smallestReadPoint the readPoint that we should use for tracking versions
298   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
299   * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
300   */
301  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
302      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
303      byte[] dropDeletesToRow) throws IOException {
304    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
305        earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
306  }
307
308  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
309      ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
310      byte[] dropDeletesToRow) throws IOException {
311    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
312        store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
313    assert scanType != ScanType.USER_SCAN;
314    matcher =
315        CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
316          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
317
318    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
319    scanners = selectScannersFrom(store, scanners);
320
321    // Seek all scanners to the initial key
322    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
323    addCurrentScanners(scanners);
324    // Combine all seeked scanners with a heap
325    resetKVHeap(scanners, comparator);
326  }
327
328  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
329      throws IOException {
330    // Seek all scanners to the initial key
331    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
332    addCurrentScanners(scanners);
333    resetKVHeap(scanners, comparator);
334  }
335
336  // For mob compaction only as we do not have a Store instance when doing mob compaction.
337  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
338      List<? extends KeyValueScanner> scanners) throws IOException {
339    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
340    assert scanType != ScanType.USER_SCAN;
341    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
342      oldestUnexpiredTS, now, null, null, null);
343    seekAllScanner(scanInfo, scanners);
344  }
345
346  // Used to instantiate a scanner for user scan in test
347  @VisibleForTesting
348  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
349      List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
350    // 0 is passed as readpoint because the test bypasses Store
351    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
352        scanType);
353    if (scanType == ScanType.USER_SCAN) {
354      this.matcher =
355          UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
356    } else {
357      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
358        HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
359    }
360    seekAllScanner(scanInfo, scanners);
361  }
362
363  // Used to instantiate a scanner for user scan in test
364  @VisibleForTesting
365  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
366      List<? extends KeyValueScanner> scanners) throws IOException {
367    // 0 is passed as readpoint because the test bypasses Store
368    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
369        ScanType.USER_SCAN);
370    this.matcher =
371        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
372    seekAllScanner(scanInfo, scanners);
373  }
374
375  // Used to instantiate a scanner for compaction in test
376  @VisibleForTesting
377  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
378      List<? extends KeyValueScanner> scanners) throws IOException {
379    // 0 is passed as readpoint because the test bypasses Store
380    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions)
381      : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
382    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
383      HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
384    seekAllScanner(scanInfo, scanners);
385  }
386
387  @VisibleForTesting
388  boolean isScanUsePread() {
389    return this.scanUsePread;
390  }
391  /**
392   * Seek the specified scanners with the given key
393   * @param scanners
394   * @param seekKey
395   * @param isLazy true if using lazy seek
396   * @param isParallelSeek true if using parallel seek
397   * @throws IOException
398   */
399  protected void seekScanners(List<? extends KeyValueScanner> scanners,
400      Cell seekKey, boolean isLazy, boolean isParallelSeek)
401      throws IOException {
402    // Seek all scanners to the start of the Row (or if the exact matching row
403    // key does not exist, then to the start of the next matching Row).
404    // Always check bloom filter to optimize the top row seek for delete
405    // family marker.
406    if (isLazy) {
407      for (KeyValueScanner scanner : scanners) {
408        scanner.requestSeek(seekKey, false, true);
409      }
410    } else {
411      if (!isParallelSeek) {
412        long totalScannersSoughtBytes = 0;
413        for (KeyValueScanner scanner : scanners) {
414          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
415            throw new RowTooBigException("Max row size allowed: " + maxRowSize
416              + ", but row is bigger than that");
417          }
418          scanner.seek(seekKey);
419          Cell c = scanner.peek();
420          if (c != null) {
421            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
422          }
423        }
424      } else {
425        parallelSeek(scanners, seekKey);
426      }
427    }
428  }
429
430  @VisibleForTesting
431  protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
432      CellComparator comparator) throws IOException {
433    // Combine all seeked scanners with a heap
434    heap = newKVHeap(scanners, comparator);
435  }
436
437  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
438      CellComparator comparator) throws IOException {
439    return new KeyValueHeap(scanners, comparator);
440  }
441
442  /**
443   * Filters the given list of scanners using Bloom filter, time range, and TTL.
444   * <p>
445   * Will be overridden by testcase so declared as protected.
446   */
447  @VisibleForTesting
448  protected List<KeyValueScanner> selectScannersFrom(HStore store,
449      List<? extends KeyValueScanner> allScanners) {
450    boolean memOnly;
451    boolean filesOnly;
452    if (scan instanceof InternalScan) {
453      InternalScan iscan = (InternalScan) scan;
454      memOnly = iscan.isCheckOnlyMemStore();
455      filesOnly = iscan.isCheckOnlyStoreFiles();
456    } else {
457      memOnly = false;
458      filesOnly = false;
459    }
460
461    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
462
463    // We can only exclude store files based on TTL if minVersions is set to 0.
464    // Otherwise, we might have to return KVs that have technically expired.
465    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
466
467    // include only those scan files which pass all filters
468    for (KeyValueScanner kvs : allScanners) {
469      boolean isFile = kvs.isFileScanner();
470      if ((!isFile && filesOnly) || (isFile && memOnly)) {
471        continue;
472      }
473
474      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
475        scanners.add(kvs);
476      } else {
477        kvs.close();
478      }
479    }
480    return scanners;
481  }
482
483  @Override
484  public Cell peek() {
485    return heap != null ? heap.peek() : null;
486  }
487
488  @Override
489  public KeyValue next() {
490    // throw runtime exception perhaps?
491    throw new RuntimeException("Never call StoreScanner.next()");
492  }
493
494  @Override
495  public void close() {
496    close(true);
497  }
498
499  private void close(boolean withDelayedScannersClose) {
500    closeLock.lock();
501    // If the closeLock is acquired then any subsequent updateReaders()
502    // call is ignored.
503    try {
504      if (this.closing) {
505        return;
506      }
507      if (withDelayedScannersClose) {
508        this.closing = true;
509      }
510      // For mob compaction, we do not have a store.
511      if (this.store != null) {
512        this.store.deleteChangedReaderObserver(this);
513      }
514      if (withDelayedScannersClose) {
515        clearAndClose(scannersForDelayedClose);
516        clearAndClose(memStoreScannersAfterFlush);
517        clearAndClose(flushedstoreFileScanners);
518        if (this.heap != null) {
519          this.heap.close();
520          this.currentScanners.clear();
521          this.heap = null; // CLOSED!
522        }
523      } else {
524        if (this.heap != null) {
525          this.scannersForDelayedClose.add(this.heap);
526          this.currentScanners.clear();
527          this.heap = null;
528        }
529      }
530    } finally {
531      closeLock.unlock();
532    }
533  }
534
535  @Override
536  public boolean seek(Cell key) throws IOException {
537    if (checkFlushed()) {
538      reopenAfterFlush();
539    }
540    return this.heap.seek(key);
541  }
542
543  /**
544   * Get the next row of values from this Store.
545   * @param outResult
546   * @param scannerContext
547   * @return true if there are more rows, false if scanner is done
548   */
549  @Override
550  public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
551    if (scannerContext == null) {
552      throw new IllegalArgumentException("Scanner context cannot be null");
553    }
554    if (checkFlushed() && reopenAfterFlush()) {
555      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
556    }
557
558    // if the heap was left null, then the scanners had previously run out anyways, close and
559    // return.
560    if (this.heap == null) {
561      // By this time partial close should happened because already heap is null
562      close(false);// Do all cleanup except heap.close()
563      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
564    }
565
566    Cell cell = this.heap.peek();
567    if (cell == null) {
568      close(false);// Do all cleanup except heap.close()
569      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
570    }
571
572    // only call setRow if the row changes; avoids confusing the query matcher
573    // if scanning intra-row
574
575    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
576    // rows. Else it is possible we are still traversing the same row so we must perform the row
577    // comparison.
578    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
579      this.countPerRow = 0;
580      matcher.setToNewRow(cell);
581    }
582
583    // Clear progress away unless invoker has indicated it should be kept.
584    if (!scannerContext.getKeepProgress()) {
585      scannerContext.clearProgress();
586    }
587
588    int count = 0;
589    long totalBytesRead = 0;
590    // track the cells for metrics only if it is a user read request.
591    boolean onlyFromMemstore = matcher.isUserScan();
592    try {
593      LOOP: do {
594        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
595        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
596        // in
597        // the shipped method below.
598        if (kvsScanned % cellsPerHeartbeatCheck == 0
599            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) {
600          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
601            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
602          }
603        }
604        // Do object compare - we set prevKV from the same heap.
605        if (prevCell != cell) {
606          ++kvsScanned;
607        }
608        checkScanOrder(prevCell, cell, comparator);
609        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
610        bytesRead += cellSize;
611        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
612          // return immediately if we want to switch from pread to stream. We need this because we
613          // can
614          // only switch in the shipped method, if user use a filter to filter out everything and
615          // rpc
616          // timeout is very large then the shipped method will never be called until the whole scan
617          // is finished, but at that time we have already scan all the data...
618          // See HBASE-20457 for more details.
619          // And there is still a scenario that can not be handled. If we have a very large row,
620          // which
621          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
622          // here, we still need to scan all the qualifiers before returning...
623          scannerContext.returnImmediately();
624        }
625        prevCell = cell;
626        scannerContext.setLastPeekedCell(cell);
627        topChanged = false;
628        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
629        switch (qcode) {
630          case INCLUDE:
631          case INCLUDE_AND_SEEK_NEXT_ROW:
632          case INCLUDE_AND_SEEK_NEXT_COL:
633
634            Filter f = matcher.getFilter();
635            if (f != null) {
636              cell = f.transformCell(cell);
637            }
638            this.countPerRow++;
639            if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
640              // do what SEEK_NEXT_ROW does.
641              if (!matcher.moreRowsMayExistAfter(cell)) {
642                close(false);// Do all cleanup except heap.close()
643                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
644              }
645              matcher.clearCurrentRow();
646              seekToNextRow(cell);
647              break LOOP;
648            }
649
650            // add to results only if we have skipped #storeOffset kvs
651            // also update metric accordingly
652            if (this.countPerRow > storeOffset) {
653              outResult.add(cell);
654
655              // Update local tracking information
656              count++;
657              totalBytesRead += cellSize;
658
659              /**
660               * Increment the metric if all the cells are from memstore.
661               * If not we will account it for mixed reads
662               */
663              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
664              // Update the progress of the scanner context
665              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
666              scannerContext.incrementBatchProgress(1);
667
668              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
669                String message = "Max row size allowed: " + maxRowSize
670                    + ", but the row is bigger than that, the row info: "
671                    + CellUtil.toString(cell, false) + ", already have process row cells = "
672                    + outResult.size() + ", it belong to region = "
673                    + store.getHRegion().getRegionInfo().getRegionNameAsString();
674                LOG.warn(message);
675                throw new RowTooBigException(message);
676              }
677            }
678
679            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
680              if (!matcher.moreRowsMayExistAfter(cell)) {
681                close(false);// Do all cleanup except heap.close()
682                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
683              }
684              matcher.clearCurrentRow();
685              seekOrSkipToNextRow(cell);
686            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
687              seekOrSkipToNextColumn(cell);
688            } else {
689              this.heap.next();
690            }
691
692            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
693              break LOOP;
694            }
695            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
696              break LOOP;
697            }
698            continue;
699
700          case DONE:
701            // Optimization for Gets! If DONE, no more to get on this row, early exit!
702            if (get) {
703              // Then no more to this row... exit.
704              close(false);// Do all cleanup except heap.close()
705              // update metric
706              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
707            }
708            matcher.clearCurrentRow();
709            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
710
711          case DONE_SCAN:
712            close(false);// Do all cleanup except heap.close()
713            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
714
715          case SEEK_NEXT_ROW:
716            // This is just a relatively simple end of scan fix, to short-cut end
717            // us if there is an endKey in the scan.
718            if (!matcher.moreRowsMayExistAfter(cell)) {
719              close(false);// Do all cleanup except heap.close()
720              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
721            }
722            matcher.clearCurrentRow();
723            seekOrSkipToNextRow(cell);
724            NextState stateAfterSeekNextRow = needToReturn(outResult);
725            if (stateAfterSeekNextRow != null) {
726              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
727            }
728            break;
729
730          case SEEK_NEXT_COL:
731            seekOrSkipToNextColumn(cell);
732            NextState stateAfterSeekNextColumn = needToReturn(outResult);
733            if (stateAfterSeekNextColumn != null) {
734              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
735            }
736            break;
737
738          case SKIP:
739            this.heap.next();
740            break;
741
742          case SEEK_NEXT_USING_HINT:
743            Cell nextKV = matcher.getNextKeyHint(cell);
744            if (nextKV != null && comparator.compare(nextKV, cell) > 0) {
745              seekAsDirection(nextKV);
746              NextState stateAfterSeekByHint = needToReturn(outResult);
747              if (stateAfterSeekByHint != null) {
748                return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
749              }
750            } else {
751              heap.next();
752            }
753            break;
754
755          default:
756            throw new RuntimeException("UNEXPECTED");
757        }
758      } while ((cell = this.heap.peek()) != null);
759
760      if (count > 0) {
761        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
762      }
763
764      // No more keys
765      close(false);// Do all cleanup except heap.close()
766      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
767    } finally {
768      // increment only if we have some result
769      if (count > 0 && matcher.isUserScan()) {
770        // if true increment memstore metrics, if not the mixed one
771        updateMetricsStore(onlyFromMemstore);
772      }
773    }
774  }
775
776  private void updateMetricsStore(boolean memstoreRead) {
777    if (store != null) {
778      store.updateMetricsStore(memstoreRead);
779    } else {
780      // for testing.
781      if (memstoreRead) {
782        memstoreOnlyReads++;
783      } else {
784        mixedReads++;
785      }
786    }
787  }
788
789  /**
790   * If the top cell won't be flushed into disk, the new top cell may be
791   * changed after #reopenAfterFlush. Because the older top cell only exist
792   * in the memstore scanner but the memstore scanner is replaced by hfile
793   * scanner after #reopenAfterFlush. If the row of top cell is changed,
794   * we should return the current cells. Otherwise, we may return
795   * the cells across different rows.
796   * @param outResult the cells which are visible for user scan
797   * @return null is the top cell doesn't change. Otherwise, the NextState
798   *         to return
799   */
800  private NextState needToReturn(List<Cell> outResult) {
801    if (!outResult.isEmpty() && topChanged) {
802      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
803    }
804    return null;
805  }
806
807  private void seekOrSkipToNextRow(Cell cell) throws IOException {
808    // If it is a Get Scan, then we know that we are done with this row; there are no more
809    // rows beyond the current one: don't try to optimize.
810    if (!get) {
811      if (trySkipToNextRow(cell)) {
812        return;
813      }
814    }
815    seekToNextRow(cell);
816  }
817
818  private void seekOrSkipToNextColumn(Cell cell) throws IOException {
819    if (!trySkipToNextColumn(cell)) {
820      seekAsDirection(matcher.getKeyForNextColumn(cell));
821    }
822  }
823
824  /**
825   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
826   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row,
827   * or seek to an arbitrary seek key. This method decides whether a seek is the most efficient
828   * _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP,
829   * SKIP inside the current, loaded block).
830   * It does this by looking at the next indexed key of the current HFile. This key
831   * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
832   * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
833   * the current Cell but compare as though it were a seek key; see down in
834   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the
835   * next block we *_SEEK, otherwise we just SKIP to the next requested cell.
836   *
837   * <p>Other notes:
838   * <ul>
839   * <li>Rows can straddle block boundaries</li>
840   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
841   * different block than column C1 at T2)</li>
842   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a
843   * few SKIPs...</li>
844   * <li>We want to SEEK when the chance is high that we'll be able to seek
845   * past many Cells, especially if we know we need to go to the next block.</li>
846   * </ul>
847   * <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether
848   * we'll likely end up seeking to the next block (or past the next block) to get our next column.
849   * Example:
850   * <pre>
851   * |    BLOCK 1              |     BLOCK 2                   |
852   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
853   *                                   ^         ^
854   *                                   |         |
855   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
856   *
857   *
858   * |    BLOCK 1                       |     BLOCK 2                      |
859   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
860   *                                            ^              ^
861   *                                            |              |
862   *                                    Next Index Key        SEEK_NEXT_COL
863   * </pre>
864   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
865   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
866   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
867   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
868   * where the SEEK will not land us in the next block, it is very likely better to issues a series
869   * of SKIPs.
870   * @param cell current cell
871   * @return true means skip to next row, false means not
872   */
873  @VisibleForTesting
874  protected boolean trySkipToNextRow(Cell cell) throws IOException {
875    Cell nextCell = null;
876    // used to guard against a changed next indexed key by doing a identity comparison
877    // when the identity changes we need to compare the bytes again
878    Cell previousIndexedKey = null;
879    do {
880      Cell nextIndexedKey = getNextIndexedKey();
881      if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY &&
882          (nextIndexedKey == previousIndexedKey ||
883          matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) {
884        this.heap.next();
885        ++kvsScanned;
886        previousIndexedKey = nextIndexedKey;
887      } else {
888        return false;
889      }
890    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
891    return true;
892  }
893
894  /**
895   * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)}
896   * @param cell current cell
897   * @return true means skip to next column, false means not
898   */
899  @VisibleForTesting
900  protected boolean trySkipToNextColumn(Cell cell) throws IOException {
901    Cell nextCell = null;
902    // used to guard against a changed next indexed key by doing a identity comparison
903    // when the identity changes we need to compare the bytes again
904    Cell previousIndexedKey = null;
905    do {
906      Cell nextIndexedKey = getNextIndexedKey();
907      if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY &&
908          (nextIndexedKey == previousIndexedKey ||
909          matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) {
910        this.heap.next();
911        ++kvsScanned;
912        previousIndexedKey = nextIndexedKey;
913      } else {
914        return false;
915      }
916    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
917    // We need this check because it may happen that the new scanner that we get
918    // during heap.next() is requiring reseek due of fake KV previously generated for
919    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
920    if (useRowColBloom && nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) {
921      return false;
922    }
923    return true;
924  }
925
926  @Override
927  public long getReadPoint() {
928    return this.readPt;
929  }
930
931  private static void clearAndClose(List<KeyValueScanner> scanners) {
932    if (scanners == null) {
933      return;
934    }
935    for (KeyValueScanner s : scanners) {
936      s.close();
937    }
938    scanners.clear();
939  }
940
941  // Implementation of ChangedReadersObserver
942  @Override
943  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
944      throws IOException {
945    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
946      return;
947    }
948    boolean updateReaders = false;
949    flushLock.lock();
950    try {
951      if (!closeLock.tryLock()) {
952        // The reason for doing this is that when the current store scanner does not retrieve
953        // any new cells, then the scanner is considered to be done. The heap of this scanner
954        // is not closed till the shipped() call is completed. Hence in that case if at all
955        // the partial close (close (false)) has been called before updateReaders(), there is no
956        // need for the updateReaders() to happen.
957        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
958        // no lock acquired.
959        clearAndClose(memStoreScanners);
960        return;
961      }
962      // lock acquired
963      updateReaders = true;
964      if (this.closing) {
965        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
966        clearAndClose(memStoreScanners);
967        return;
968      }
969      flushed = true;
970      final boolean isCompaction = false;
971      boolean usePread = get || scanUsePread;
972      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
973      // calls next(). So its better we create scanners here rather than next() call. Ensure
974      // these scanners are properly closed() whether or not the scan is completed successfully
975      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
976      // store files. In case of stream scanners this eager creation does not induce performance
977      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
978      List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
979        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
980      flushedstoreFileScanners.addAll(scanners);
981      if (!CollectionUtils.isEmpty(memStoreScanners)) {
982        clearAndClose(memStoreScannersAfterFlush);
983        memStoreScannersAfterFlush.addAll(memStoreScanners);
984      }
985    } finally {
986      flushLock.unlock();
987      if (updateReaders) {
988        closeLock.unlock();
989      }
990    }
991    // Let the next() call handle re-creating and seeking
992  }
993
994  /**
995   * @return if top of heap has changed (and KeyValueHeap has to try the next KV)
996   */
997  protected final boolean reopenAfterFlush() throws IOException {
998    // here we can make sure that we have a Store instance so no null check on store.
999    Cell lastTop = heap.peek();
1000    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
1001    // scanners? We did so in the constructor and we could have done it now by storing the scan
1002    // object from the constructor
1003    List<KeyValueScanner> scanners;
1004    flushLock.lock();
1005    try {
1006      List<KeyValueScanner> allScanners =
1007          new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1008      allScanners.addAll(flushedstoreFileScanners);
1009      allScanners.addAll(memStoreScannersAfterFlush);
1010      scanners = selectScannersFrom(store, allScanners);
1011      // Clear the current set of flushed store files scanners so that they don't get added again
1012      flushedstoreFileScanners.clear();
1013      memStoreScannersAfterFlush.clear();
1014    } finally {
1015      flushLock.unlock();
1016    }
1017
1018    // Seek the new scanners to the last key
1019    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1020    // remove the older memstore scanner
1021    for (int i = currentScanners.size() - 1; i >=0; i--) {
1022      if (!currentScanners.get(i).isFileScanner()) {
1023        scannersForDelayedClose.add(currentScanners.remove(i));
1024      } else {
1025        // we add the memstore scanner to the end of currentScanners
1026        break;
1027      }
1028    }
1029    // add the newly created scanners on the flushed files and the current active memstore scanner
1030    addCurrentScanners(scanners);
1031    // Combine all seeked scanners with a heap
1032    resetKVHeap(this.currentScanners, store.getComparator());
1033    resetQueryMatcher(lastTop);
1034    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1035      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() +
1036          ",and after = " + heap.peek());
1037      topChanged = true;
1038    } else {
1039      topChanged = false;
1040    }
1041    return topChanged;
1042  }
1043
1044  private void resetQueryMatcher(Cell lastTopKey) {
1045    // Reset the state of the Query Matcher and set to top row.
1046    // Only reset and call setRow if the row changes; avoids confusing the
1047    // query matcher if scanning intra-row.
1048    Cell cell = heap.peek();
1049    if (cell == null) {
1050      cell = lastTopKey;
1051    }
1052    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1053      this.countPerRow = 0;
1054      // The setToNewRow will call reset internally
1055      matcher.setToNewRow(cell);
1056    }
1057  }
1058
1059  /**
1060   * Check whether scan as expected order
1061   * @param prevKV
1062   * @param kv
1063   * @param comparator
1064   * @throws IOException
1065   */
1066  protected void checkScanOrder(Cell prevKV, Cell kv,
1067      CellComparator comparator) throws IOException {
1068    // Check that the heap gives us KVs in an increasing order.
1069    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key "
1070        + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1071  }
1072
1073  protected boolean seekToNextRow(Cell c) throws IOException {
1074    return reseek(PrivateCellUtil.createLastOnRow(c));
1075  }
1076
1077  /**
1078   * Do a reseek in a normal StoreScanner(scan forward)
1079   * @param kv
1080   * @return true if scanner has values left, false if end of scanner
1081   * @throws IOException
1082   */
1083  protected boolean seekAsDirection(Cell kv)
1084      throws IOException {
1085    return reseek(kv);
1086  }
1087
1088  @Override
1089  public boolean reseek(Cell kv) throws IOException {
1090    if (checkFlushed()) {
1091      reopenAfterFlush();
1092    }
1093    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1094      return heap.requestSeek(kv, true, useRowColBloom);
1095    }
1096    return heap.reseek(kv);
1097  }
1098
1099  @VisibleForTesting
1100  void trySwitchToStreamRead() {
1101    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing ||
1102        heap.peek() == null || bytesRead < preadMaxBytes) {
1103      return;
1104    }
1105    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1106        this.store.getColumnFamilyName());
1107    scanUsePread = false;
1108    Cell lastTop = heap.peek();
1109    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1110    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1111    for (KeyValueScanner kvs : currentScanners) {
1112      if (!kvs.isFileScanner()) {
1113        // collect memstorescanners here
1114        memstoreScanners.add(kvs);
1115      } else {
1116        scannersToClose.add(kvs);
1117      }
1118    }
1119    List<KeyValueScanner> fileScanners = null;
1120    List<KeyValueScanner> newCurrentScanners;
1121    KeyValueHeap newHeap;
1122    try {
1123      // We must have a store instance here so no null check
1124      // recreate the scanners on the current file scanners
1125      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
1126        matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
1127        scan.includeStopRow(), readPt, false);
1128      if (fileScanners == null) {
1129        return;
1130      }
1131      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1132      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1133      newCurrentScanners.addAll(fileScanners);
1134      newCurrentScanners.addAll(memstoreScanners);
1135      newHeap = newKVHeap(newCurrentScanners, comparator);
1136    } catch (Exception e) {
1137      LOG.warn("failed to switch to stream read", e);
1138      if (fileScanners != null) {
1139        fileScanners.forEach(KeyValueScanner::close);
1140      }
1141      return;
1142    }
1143    currentScanners.clear();
1144    addCurrentScanners(newCurrentScanners);
1145    this.heap = newHeap;
1146    resetQueryMatcher(lastTop);
1147    scannersToClose.forEach(KeyValueScanner::close);
1148  }
1149
1150  protected final boolean checkFlushed() {
1151    // check the var without any lock. Suppose even if we see the old
1152    // value here still it is ok to continue because we will not be resetting
1153    // the heap but will continue with the referenced memstore's snapshot. For compactions
1154    // any way we don't need the updateReaders at all to happen as we still continue with
1155    // the older files
1156    if (flushed) {
1157      // If there is a flush and the current scan is notified on the flush ensure that the
1158      // scan's heap gets reset and we do a seek on the newly flushed file.
1159      if (this.closing) {
1160        return false;
1161      }
1162      // reset the flag
1163      flushed = false;
1164      return true;
1165    }
1166    return false;
1167  }
1168
1169
1170  /**
1171   * Seek storefiles in parallel to optimize IO latency as much as possible
1172   * @param scanners the list {@link KeyValueScanner}s to be read from
1173   * @param kv the KeyValue on which the operation is being requested
1174   * @throws IOException
1175   */
1176  private void parallelSeek(final List<? extends KeyValueScanner>
1177      scanners, final Cell kv) throws IOException {
1178    if (scanners.isEmpty()) return;
1179    int storeFileScannerCount = scanners.size();
1180    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1181    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1182    for (KeyValueScanner scanner : scanners) {
1183      if (scanner instanceof StoreFileScanner) {
1184        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
1185          this.readPt, latch);
1186        executor.submit(seekHandler);
1187        handlers.add(seekHandler);
1188      } else {
1189        scanner.seek(kv);
1190        latch.countDown();
1191      }
1192    }
1193
1194    try {
1195      latch.await();
1196    } catch (InterruptedException ie) {
1197      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
1198    }
1199
1200    for (ParallelSeekHandler handler : handlers) {
1201      if (handler.getErr() != null) {
1202        throw new IOException(handler.getErr());
1203      }
1204    }
1205  }
1206
1207  /**
1208   * Used in testing.
1209   * @return all scanners in no particular order
1210   */
1211  @VisibleForTesting
1212  List<KeyValueScanner> getAllScannersForTesting() {
1213    List<KeyValueScanner> allScanners = new ArrayList<>();
1214    KeyValueScanner current = heap.getCurrentForTesting();
1215    if (current != null)
1216      allScanners.add(current);
1217    for (KeyValueScanner scanner : heap.getHeap())
1218      allScanners.add(scanner);
1219    return allScanners;
1220  }
1221
1222  static void enableLazySeekGlobally(boolean enable) {
1223    lazySeekEnabledGlobally = enable;
1224  }
1225
1226  /**
1227   * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
1228   */
1229  public long getEstimatedNumberOfKvsScanned() {
1230    return this.kvsScanned;
1231  }
1232
1233  @Override
1234  public Cell getNextIndexedKey() {
1235    return this.heap.getNextIndexedKey();
1236  }
1237
1238  @Override
1239  public void shipped() throws IOException {
1240    if (prevCell != null) {
1241      // Do the copy here so that in case the prevCell ref is pointing to the previous
1242      // blocks we can safely release those blocks.
1243      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1244      // fetched from HDFS. Copying this would ensure that we let go the references to these
1245      // blocks so that they can be GCed safely(in case of bucket cache)
1246      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1247    }
1248    matcher.beforeShipped();
1249    // There wont be further fetch of Cells from these scanners. Just close.
1250    clearAndClose(scannersForDelayedClose);
1251    if (this.heap != null) {
1252      this.heap.shipped();
1253      // When switching from pread to stream, we will open a new scanner for each store file, but
1254      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1255      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1256      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1257      // method, so we here will also open new scanners and close old scanners in shipped method.
1258      // See HBASE-18055 for more details.
1259      trySwitchToStreamRead();
1260    }
1261  }
1262}