001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.Optional;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.locks.ReentrantLock;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.KeyValueUtil;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.PrivateConstants;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.conf.ConfigKey;
041import org.apache.hadoop.hbase.executor.ExecutorService;
042import org.apache.hadoop.hbase.filter.Filter;
043import org.apache.hadoop.hbase.ipc.RpcCall;
044import org.apache.hadoop.hbase.ipc.RpcServer;
045import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
046import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
047import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
048import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
049import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
050import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
058
059/**
060 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
061 * for a single row.
062 * <p>
063 * The implementation is not thread safe. So there will be no race between next and close. The only
064 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
065 * is a flush.
066 */
067@InterfaceAudience.Private
068public class StoreScanner extends NonReversedNonLazyKeyValueScanner
069  implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
070  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
071  // In unit tests, the store could be null
072  protected final HStore store;
073  private final CellComparator comparator;
074  private ScanQueryMatcher matcher;
075  protected KeyValueHeap heap;
076  private boolean cacheBlocks;
077
078  private long countPerRow = 0;
079  private int storeLimit = -1;
080  private int storeOffset = 0;
081
082  // Used to indicate that the scanner has closed (see HBASE-1107)
083  private volatile boolean closing = false;
084  private final boolean get;
085  private final boolean explicitColumnQuery;
086  private final boolean useRowColBloom;
087  /**
088   * A flag that enables StoreFileScanner parallel-seeking
089   */
090  private boolean parallelSeekEnabled = false;
091  private ExecutorService executor;
092  private final Scan scan;
093  private final long oldestUnexpiredTS;
094  private final long now;
095  private final int minVersions;
096  private final long maxRowSize;
097  private final long cellsPerHeartbeatCheck;
098  long memstoreOnlyReads;
099  long mixedReads;
100
101  // 1) Collects all the KVHeap that are eagerly getting closed during the
102  // course of a scan
103  // 2) Collects the unused memstore scanners. If we close the memstore scanners
104  // before sending data to client, the chunk may be reclaimed by other
105  // updates and the data will be corrupt.
106  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
107
108  /**
109   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via
110   * seeking to next row/column. TODO: estimate them?
111   */
112  private long kvsScanned = 0;
113  private ExtendedCell prevCell = null;
114
115  private final long preadMaxBytes;
116  private long bytesRead;
117
118  /** We don't ever expect to change this, the constant is just for clarity. */
119  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
120  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
121    "hbase.storescanner.parallel.seek.enable";
122
123  /** Used during unit testing to ensure that lazy seek does save seek ops */
124  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
125
126  /**
127   * The number of cells scanned in between timeout checks. Specifying a larger value means that
128   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
129   * timeout checks.
130   */
131  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
132    ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check");
133
134  /**
135   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
136   */
137  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
138
139  /**
140   * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
141   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
142   * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT,
143   * we will open scanner with stream mode itself.
144   */
145  public static final String STORESCANNER_PREAD_MAX_BYTES =
146    ConfigKey.LONG("hbase.storescanner.pread.max.bytes");
147
148  private final Scan.ReadType readType;
149
150  // A flag whether use pread for scan
151  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
152  private boolean scanUsePread;
153  // Indicates whether there was flush during the course of the scan
154  private volatile boolean flushed = false;
155  // generally we get one file from a flush
156  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
157  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
158  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
159  // The current list of scanners
160  final List<KeyValueScanner> currentScanners = new ArrayList<>();
161  // flush update lock
162  private final ReentrantLock flushLock = new ReentrantLock();
163  // lock for closing.
164  private final ReentrantLock closeLock = new ReentrantLock();
165
166  protected final long readPt;
167  private boolean topChanged = false;
168
169  /** An internal constructor. */
170  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
171    boolean cacheBlocks, ScanType scanType) {
172    this.readPt = readPt;
173    this.store = store;
174    this.cacheBlocks = cacheBlocks;
175    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
176    get = scan.isGetScan();
177    explicitColumnQuery = numColumns > 0;
178    this.scan = scan;
179    this.now = EnvironmentEdgeManager.currentTime();
180    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
181    this.minVersions = scanInfo.getMinVersions();
182
183    // We look up row-column Bloom filters for multi-column queries as part of
184    // the seek operation. However, we also look the row-column Bloom filter
185    // for multi-row (non-"get") scans because this is not done in
186    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
187    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null
188      || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
189    this.maxRowSize = scanInfo.getTableMaxRowSize();
190    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
191    if (get) {
192      this.readType = Scan.ReadType.PREAD;
193      this.scanUsePread = true;
194    } else if (scanType != ScanType.USER_SCAN) {
195      // For compaction scanners never use Pread as already we have stream based scanners on the
196      // store files to be compacted
197      this.readType = Scan.ReadType.STREAM;
198      this.scanUsePread = false;
199    } else {
200      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
201        if (scanInfo.isUsePread()) {
202          this.readType = Scan.ReadType.PREAD;
203        } else if (this.preadMaxBytes < 0) {
204          this.readType = Scan.ReadType.STREAM;
205        } else {
206          this.readType = Scan.ReadType.DEFAULT;
207        }
208      } else {
209        this.readType = scan.getReadType();
210      }
211      // Always start with pread unless user specific stream. Will change to stream later if
212      // readType is default if the scan keeps running for a long time.
213      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
214    }
215    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
216    // Parallel seeking is on if the config allows and more there is more than one store file.
217    if (store != null && store.getStorefilesCount() > 1) {
218      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
219      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
220        this.parallelSeekEnabled = true;
221        this.executor = rsService.getExecutorService();
222      }
223    }
224  }
225
226  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
227    this.currentScanners.addAll(scanners);
228  }
229
230  private static boolean isOnlyLatestVersionScan(Scan scan) {
231    // No need to check for Scan#getMaxVersions because live version files generated by store file
232    // writer retains max versions specified in ColumnFamilyDescriptor for the given CF
233    return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
234  }
235
236  /**
237   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a
238   * compaction.
239   * @param store   who we scan
240   * @param scan    the spec
241   * @param columns which columns we are scanning
242   */
243  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
244    long readPt) throws IOException {
245    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
246      ScanType.USER_SCAN);
247    if (columns != null && scan.isRaw()) {
248      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
249    }
250    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
251      store.getCoprocessorHost());
252
253    store.addChangedReaderObserver(this);
254
255    List<KeyValueScanner> scanners = null;
256    try {
257      // Pass columns to try to filter out unnecessary StoreFiles.
258      scanners = selectScannersFrom(store,
259        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
260          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt,
261          isOnlyLatestVersionScan(scan)));
262
263      // Seek all scanners to the start of the Row (or if the exact matching row
264      // key does not exist, then to the start of the next matching Row).
265      // Always check bloom filter to optimize the top row seek for delete
266      // family marker.
267      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
268        parallelSeekEnabled);
269
270      // set storeLimit
271      this.storeLimit = scan.getMaxResultsPerColumnFamily();
272
273      // set rowOffset
274      this.storeOffset = scan.getRowOffsetPerColumnFamily();
275      addCurrentScanners(scanners);
276      // Combine all seeked scanners with a heap
277      resetKVHeap(scanners, comparator);
278    } catch (IOException e) {
279      clearAndClose(scanners);
280      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
281      // and might cause memory leak
282      store.deleteChangedReaderObserver(this);
283      throw e;
284    }
285  }
286
287  // a dummy scan instance for compaction.
288  private static final Scan SCAN_FOR_COMPACTION = new Scan();
289
290  /**
291   * Used for store file compaction and memstore compaction.
292   * <p>
293   * Opens a scanner across specified StoreFiles/MemStoreSegments.
294   * @param store             who we scan
295   * @param scanners          ancillary scanners
296   * @param smallestReadPoint the readPoint that we should use for tracking versions
297   */
298  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
299    ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
300    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
301  }
302
303  /**
304   * Used for compactions that drop deletes from a limited range of rows.
305   * <p>
306   * Opens a scanner across specified StoreFiles.
307   * @param store              who we scan
308   * @param scanners           ancillary scanners
309   * @param smallestReadPoint  the readPoint that we should use for tracking versions
310   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
311   * @param dropDeletesToRow   The exclusive right bound of the range; can be EMPTY_END_ROW.
312   */
313  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
314    long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)
315    throws IOException {
316    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
317      earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
318  }
319
320  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
321    ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
322    byte[] dropDeletesToRow) throws IOException {
323    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
324      store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
325    assert scanType != ScanType.USER_SCAN;
326    matcher =
327      CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
328        oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
329
330    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
331    scanners = selectScannersFrom(store, scanners);
332
333    // Seek all scanners to the initial key
334    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
335    addCurrentScanners(scanners);
336    // Combine all seeked scanners with a heap
337    resetKVHeap(scanners, comparator);
338  }
339
340  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
341    throws IOException {
342    // Seek all scanners to the initial key
343    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
344    addCurrentScanners(scanners);
345    resetKVHeap(scanners, comparator);
346  }
347
348  // For mob compaction only as we do not have a Store instance when doing mob compaction.
349  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
350    List<? extends KeyValueScanner> scanners) throws IOException {
351    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
352    assert scanType != ScanType.USER_SCAN;
353    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
354      oldestUnexpiredTS, now, null, null, null);
355    seekAllScanner(scanInfo, scanners);
356  }
357
358  // Used to instantiate a scanner for user scan in test
359  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
360    List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
361    // 0 is passed as readpoint because the test bypasses Store
362    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
363      scanType);
364    if (scanType == ScanType.USER_SCAN) {
365      this.matcher =
366        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
367    } else {
368      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
369        PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
370    }
371    seekAllScanner(scanInfo, scanners);
372  }
373
374  // Used to instantiate a scanner for user scan in test
375  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
376    List<? extends KeyValueScanner> scanners) throws IOException {
377    // 0 is passed as readpoint because the test bypasses Store
378    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
379      ScanType.USER_SCAN);
380    this.matcher =
381      UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
382    seekAllScanner(scanInfo, scanners);
383  }
384
385  // Used to instantiate a scanner for compaction in test
386  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
387    List<? extends KeyValueScanner> scanners) throws IOException {
388    // 0 is passed as readpoint because the test bypasses Store
389    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,
390      scanInfo, 0, 0L, false, scanType);
391    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
392      PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
393    seekAllScanner(scanInfo, scanners);
394  }
395
396  boolean isScanUsePread() {
397    return this.scanUsePread;
398  }
399
400  /**
401   * Seek the specified scanners with the given key
402   * @param isLazy         true if using lazy seek
403   * @param isParallelSeek true if using parallel seek
404   */
405  protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey,
406    boolean isLazy, boolean isParallelSeek) throws IOException {
407    // Seek all scanners to the start of the Row (or if the exact matching row
408    // key does not exist, then to the start of the next matching Row).
409    // Always check bloom filter to optimize the top row seek for delete
410    // family marker.
411    if (isLazy) {
412      for (KeyValueScanner scanner : scanners) {
413        scanner.requestSeek(seekKey, false, true);
414      }
415    } else {
416      if (!isParallelSeek) {
417        long totalScannersSoughtBytes = 0;
418        for (KeyValueScanner scanner : scanners) {
419          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
420            throw new RowTooBigException(
421              "Max row size allowed: " + maxRowSize + ", but row is bigger than that");
422          }
423          scanner.seek(seekKey);
424          Cell c = scanner.peek();
425          if (c != null) {
426            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
427          }
428        }
429      } else {
430        parallelSeek(scanners, seekKey);
431      }
432    }
433  }
434
435  protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
436    throws IOException {
437    // Combine all seeked scanners with a heap
438    heap = newKVHeap(scanners, comparator);
439  }
440
441  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
442    CellComparator comparator) throws IOException {
443    return new KeyValueHeap(scanners, comparator);
444  }
445
446  /**
447   * Filters the given list of scanners using Bloom filter, time range, and TTL.
448   * <p>
449   * Will be overridden by testcase so declared as protected.
450   */
451  protected List<KeyValueScanner> selectScannersFrom(HStore store,
452    List<? extends KeyValueScanner> allScanners) {
453    boolean memOnly;
454    boolean filesOnly;
455    if (scan instanceof InternalScan) {
456      InternalScan iscan = (InternalScan) scan;
457      memOnly = iscan.isCheckOnlyMemStore();
458      filesOnly = iscan.isCheckOnlyStoreFiles();
459    } else {
460      memOnly = false;
461      filesOnly = false;
462    }
463
464    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
465
466    // We can only exclude store files based on TTL if minVersions is set to 0.
467    // Otherwise, we might have to return KVs that have technically expired.
468    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
469
470    // include only those scan files which pass all filters
471    for (KeyValueScanner kvs : allScanners) {
472      boolean isFile = kvs.isFileScanner();
473      if ((!isFile && filesOnly) || (isFile && memOnly)) {
474        kvs.close();
475        continue;
476      }
477
478      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
479        scanners.add(kvs);
480      } else {
481        kvs.close();
482      }
483    }
484    return scanners;
485  }
486
487  @Override
488  public ExtendedCell peek() {
489    return heap != null ? heap.peek() : null;
490  }
491
492  @Override
493  public KeyValue next() {
494    // throw runtime exception perhaps?
495    throw new RuntimeException("Never call StoreScanner.next()");
496  }
497
498  @Override
499  public void close() {
500    close(true);
501  }
502
503  private void close(boolean withDelayedScannersClose) {
504    closeLock.lock();
505    // If the closeLock is acquired then any subsequent updateReaders()
506    // call is ignored.
507    try {
508      if (this.closing) {
509        return;
510      }
511      if (withDelayedScannersClose) {
512        this.closing = true;
513      }
514      // For mob compaction, we do not have a store.
515      if (this.store != null) {
516        this.store.deleteChangedReaderObserver(this);
517      }
518      if (withDelayedScannersClose) {
519        clearAndClose(scannersForDelayedClose);
520        clearAndClose(memStoreScannersAfterFlush);
521        clearAndClose(flushedstoreFileScanners);
522        if (this.heap != null) {
523          this.heap.close();
524          this.currentScanners.clear();
525          this.heap = null; // CLOSED!
526        }
527      } else {
528        if (this.heap != null) {
529          this.scannersForDelayedClose.add(this.heap);
530          this.currentScanners.clear();
531          this.heap = null;
532        }
533      }
534    } finally {
535      closeLock.unlock();
536    }
537  }
538
539  @Override
540  public boolean seek(ExtendedCell key) throws IOException {
541    if (checkFlushed()) {
542      reopenAfterFlush();
543    }
544    return this.heap.seek(key);
545  }
546
547  /**
548   * Get the next row of values from this Store.
549   * @return true if there are more rows, false if scanner is done
550   */
551  @Override
552  public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext)
553    throws IOException {
554    if (scannerContext == null) {
555      throw new IllegalArgumentException("Scanner context cannot be null");
556    }
557    if (checkFlushed() && reopenAfterFlush()) {
558      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
559    }
560
561    // if the heap was left null, then the scanners had previously run out anyways, close and
562    // return.
563    if (this.heap == null) {
564      // By this time partial close should happened because already heap is null
565      close(false);// Do all cleanup except heap.close()
566      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
567    }
568
569    ExtendedCell cell = this.heap.peek();
570    if (cell == null) {
571      close(false);// Do all cleanup except heap.close()
572      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
573    }
574
575    // only call setRow if the row changes; avoids confusing the query matcher
576    // if scanning intra-row
577
578    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
579    // rows. Else it is possible we are still traversing the same row so we must perform the row
580    // comparison.
581    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
582      this.countPerRow = 0;
583      matcher.setToNewRow(cell);
584    }
585
586    // Clear progress away unless invoker has indicated it should be kept.
587    if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
588      scannerContext.clearProgress();
589    }
590
591    Optional<RpcCall> rpcCall =
592      matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
593
594    int count = 0;
595    long totalBytesRead = 0;
596    // track the cells for metrics only if it is a user read request.
597    boolean onlyFromMemstore = matcher.isUserScan();
598    try {
599      LOOP: do {
600        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
601        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
602        // in
603        // the shipped method below.
604        if (
605          kvsScanned % cellsPerHeartbeatCheck == 0
606            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
607        ) {
608          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
609            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
610          }
611        }
612        // Do object compare - we set prevKV from the same heap.
613        if (prevCell != cell) {
614          ++kvsScanned;
615        }
616        checkScanOrder(prevCell, cell, comparator);
617        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
618        bytesRead += cellSize;
619        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
620          // return immediately if we want to switch from pread to stream. We need this because we
621          // can
622          // only switch in the shipped method, if user use a filter to filter out everything and
623          // rpc
624          // timeout is very large then the shipped method will never be called until the whole scan
625          // is finished, but at that time we have already scan all the data...
626          // See HBASE-20457 for more details.
627          // And there is still a scenario that can not be handled. If we have a very large row,
628          // which
629          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
630          // here, we still need to scan all the qualifiers before returning...
631          scannerContext.returnImmediately();
632        }
633
634        heap.recordBlockSize(blockSize -> {
635          if (rpcCall.isPresent()) {
636            rpcCall.get().incrementBlockBytesScanned(blockSize);
637          }
638          scannerContext.incrementBlockProgress(blockSize);
639        });
640
641        prevCell = cell;
642        scannerContext.setLastPeekedCell(cell);
643        topChanged = false;
644        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
645        switch (qcode) {
646          case INCLUDE:
647          case INCLUDE_AND_SEEK_NEXT_ROW:
648          case INCLUDE_AND_SEEK_NEXT_COL:
649            Filter f = matcher.getFilter();
650            if (f != null) {
651              Cell transformedCell = f.transformCell(cell);
652              // fast path, most filters just return the same cell instance
653              if (transformedCell != cell) {
654                if (transformedCell instanceof ExtendedCell) {
655                  cell = (ExtendedCell) transformedCell;
656                } else {
657                  throw new DoNotRetryIOException("Incorrect filter implementation, "
658                    + "the Cell returned by transformCell is not an ExtendedCell. Filter class: "
659                    + f.getClass().getName());
660                }
661              }
662            }
663            this.countPerRow++;
664
665            // add to results only if we have skipped #storeOffset kvs
666            // also update metric accordingly
667            if (this.countPerRow > storeOffset) {
668              outResult.add(cell);
669
670              // Update local tracking information
671              count++;
672              totalBytesRead += cellSize;
673
674              /**
675               * Increment the metric if all the cells are from memstore. If not we will account it
676               * for mixed reads
677               */
678              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
679              // Update the progress of the scanner context
680              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
681              scannerContext.incrementBatchProgress(1);
682
683              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
684                String message = "Max row size allowed: " + maxRowSize
685                  + ", but the row is bigger than that, the row info: "
686                  + CellUtil.toString(cell, false) + ", already have process row cells = "
687                  + outResult.size() + ", it belong to region = "
688                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
689                LOG.warn(message);
690                throw new RowTooBigException(message);
691              }
692
693              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
694                // do what SEEK_NEXT_ROW does.
695                if (!matcher.moreRowsMayExistAfter(cell)) {
696                  close(false);// Do all cleanup except heap.close()
697                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
698                }
699                matcher.clearCurrentRow();
700                seekToNextRow(cell);
701                break LOOP;
702              }
703            }
704
705            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
706              if (!matcher.moreRowsMayExistAfter(cell)) {
707                close(false);// Do all cleanup except heap.close()
708                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
709              }
710              matcher.clearCurrentRow();
711              seekOrSkipToNextRow(cell);
712            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
713              seekOrSkipToNextColumn(cell);
714            } else {
715              this.heap.next();
716            }
717
718            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
719              break LOOP;
720            }
721            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
722              break LOOP;
723            }
724            continue;
725
726          case DONE:
727            // Optimization for Gets! If DONE, no more to get on this row, early exit!
728            if (get) {
729              // Then no more to this row... exit.
730              close(false);// Do all cleanup except heap.close()
731              // update metric
732              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
733            }
734            matcher.clearCurrentRow();
735            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
736
737          case DONE_SCAN:
738            close(false);// Do all cleanup except heap.close()
739            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
740
741          case SEEK_NEXT_ROW:
742            // This is just a relatively simple end of scan fix, to short-cut end
743            // us if there is an endKey in the scan.
744            if (!matcher.moreRowsMayExistAfter(cell)) {
745              close(false);// Do all cleanup except heap.close()
746              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
747            }
748            matcher.clearCurrentRow();
749            seekOrSkipToNextRow(cell);
750            NextState stateAfterSeekNextRow = needToReturn(outResult);
751            if (stateAfterSeekNextRow != null) {
752              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
753            }
754            break;
755
756          case SEEK_NEXT_COL:
757            seekOrSkipToNextColumn(cell);
758            NextState stateAfterSeekNextColumn = needToReturn(outResult);
759            if (stateAfterSeekNextColumn != null) {
760              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
761            }
762            break;
763
764          case SKIP:
765            this.heap.next();
766            break;
767
768          case SEEK_NEXT_USING_HINT:
769            ExtendedCell nextKV = matcher.getNextKeyHint(cell);
770            if (nextKV != null) {
771              int difference = comparator.compare(nextKV, cell);
772              if (
773                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
774              ) {
775                seekAsDirection(nextKV);
776                NextState stateAfterSeekByHint = needToReturn(outResult);
777                if (stateAfterSeekByHint != null) {
778                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
779                }
780                break;
781              }
782            }
783            heap.next();
784            break;
785
786          default:
787            throw new RuntimeException("UNEXPECTED");
788        }
789
790        // One last chance to break due to size limit. The INCLUDE* cases above already check
791        // limit and continue. For the various filtered cases, we need to check because block
792        // size limit may have been exceeded even if we don't add cells to result list.
793        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
794          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
795        }
796      } while ((cell = this.heap.peek()) != null);
797
798      if (count > 0) {
799        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
800      }
801
802      // No more keys
803      close(false);// Do all cleanup except heap.close()
804      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
805    } finally {
806      // increment only if we have some result
807      if (count > 0 && matcher.isUserScan()) {
808        // if true increment memstore metrics, if not the mixed one
809        updateMetricsStore(onlyFromMemstore);
810      }
811    }
812  }
813
814  private void updateMetricsStore(boolean memstoreRead) {
815    if (store != null) {
816      store.updateMetricsStore(memstoreRead);
817    } else {
818      // for testing.
819      if (memstoreRead) {
820        memstoreOnlyReads++;
821      } else {
822        mixedReads++;
823      }
824    }
825  }
826
827  /**
828   * If the top cell won't be flushed into disk, the new top cell may be changed after
829   * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the
830   * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
831   * is changed, we should return the current cells. Otherwise, we may return the cells across
832   * different rows.
833   * @param outResult the cells which are visible for user scan
834   * @return null is the top cell doesn't change. Otherwise, the NextState to return
835   */
836  private NextState needToReturn(List<? super ExtendedCell> outResult) {
837    if (!outResult.isEmpty() && topChanged) {
838      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
839    }
840    return null;
841  }
842
843  private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException {
844    // If it is a Get Scan, then we know that we are done with this row; there are no more
845    // rows beyond the current one: don't try to optimize.
846    if (!get) {
847      if (trySkipToNextRow(cell)) {
848        return;
849      }
850    }
851    seekToNextRow(cell);
852  }
853
854  private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException {
855    if (!trySkipToNextColumn(cell)) {
856      seekAsDirection(matcher.getKeyForNextColumn(cell));
857    }
858  }
859
860  /**
861   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
862   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an
863   * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to
864   * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
865   * current, loaded block). It does this by looking at the next indexed key of the current HFile.
866   * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible
867   * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work
868   * with the current Cell but compare as though it were a seek key; see down in
869   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,
870   * otherwise we just SKIP to the next requested cell.
871   * <p>
872   * Other notes:
873   * <ul>
874   * <li>Rows can straddle block boundaries</li>
875   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
876   * different block than column C1 at T2)</li>
877   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few
878   * SKIPs...</li>
879   * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells,
880   * especially if we know we need to go to the next block.</li>
881   * </ul>
882   * <p>
883   * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll
884   * likely end up seeking to the next block (or past the next block) to get our next column.
885   * Example:
886   *
887   * <pre>
888   * |    BLOCK 1              |     BLOCK 2                   |
889   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
890   *                                   ^         ^
891   *                                   |         |
892   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
893   *
894   *
895   * |    BLOCK 1                       |     BLOCK 2                      |
896   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
897   *                                            ^              ^
898   *                                            |              |
899   *                                    Next Index Key        SEEK_NEXT_COL
900   * </pre>
901   *
902   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
903   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
904   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
905   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
906   * where the SEEK will not land us in the next block, it is very likely better to issues a series
907   * of SKIPs.
908   * @param cell current cell
909   * @return true means skip to next row, false means not
910   */
911  protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException {
912    ExtendedCell nextCell = null;
913    // used to guard against a changed next indexed key by doing a identity comparison
914    // when the identity changes we need to compare the bytes again
915    ExtendedCell previousIndexedKey = null;
916    do {
917      ExtendedCell nextIndexedKey = getNextIndexedKey();
918      if (
919        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
920          && (nextIndexedKey == previousIndexedKey
921            || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)
922      ) {
923        this.heap.next();
924        ++kvsScanned;
925        previousIndexedKey = nextIndexedKey;
926      } else {
927        return false;
928      }
929    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
930    return true;
931  }
932
933  /**
934   * See {@link #trySkipToNextRow(ExtendedCell)}
935   * @param cell current cell
936   * @return true means skip to next column, false means not
937   */
938  protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException {
939    ExtendedCell nextCell = null;
940    // used to guard against a changed next indexed key by doing a identity comparison
941    // when the identity changes we need to compare the bytes again
942    ExtendedCell previousIndexedKey = null;
943    do {
944      ExtendedCell nextIndexedKey = getNextIndexedKey();
945      if (
946        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
947          && (nextIndexedKey == previousIndexedKey
948            || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)
949      ) {
950        this.heap.next();
951        ++kvsScanned;
952        previousIndexedKey = nextIndexedKey;
953      } else {
954        return false;
955      }
956    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
957    // We need this check because it may happen that the new scanner that we get
958    // during heap.next() is requiring reseek due of fake KV previously generated for
959    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
960    if (
961      useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP
962    ) {
963      return false;
964    }
965    return true;
966  }
967
968  @Override
969  public long getReadPoint() {
970    return this.readPt;
971  }
972
973  private static void clearAndClose(List<KeyValueScanner> scanners) {
974    if (scanners == null) {
975      return;
976    }
977    for (KeyValueScanner s : scanners) {
978      s.close();
979    }
980    scanners.clear();
981  }
982
983  // Implementation of ChangedReadersObserver
984  @Override
985  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
986    throws IOException {
987    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
988      return;
989    }
990    boolean updateReaders = false;
991    flushLock.lock();
992    try {
993      if (!closeLock.tryLock()) {
994        // The reason for doing this is that when the current store scanner does not retrieve
995        // any new cells, then the scanner is considered to be done. The heap of this scanner
996        // is not closed till the shipped() call is completed. Hence in that case if at all
997        // the partial close (close (false)) has been called before updateReaders(), there is no
998        // need for the updateReaders() to happen.
999        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
1000        // no lock acquired.
1001        clearAndClose(memStoreScanners);
1002        return;
1003      }
1004      // lock acquired
1005      updateReaders = true;
1006      if (this.closing) {
1007        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
1008        clearAndClose(memStoreScanners);
1009        return;
1010      }
1011      flushed = true;
1012      final boolean isCompaction = false;
1013      boolean usePread = get || scanUsePread;
1014      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
1015      // calls next(). So its better we create scanners here rather than next() call. Ensure
1016      // these scanners are properly closed() whether or not the scan is completed successfully
1017      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
1018      // store files. In case of stream scanners this eager creation does not induce performance
1019      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
1020      List<KeyValueScanner> scanners =
1021        store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher,
1022          scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan));
1023      flushedstoreFileScanners.addAll(scanners);
1024      if (!CollectionUtils.isEmpty(memStoreScanners)) {
1025        clearAndClose(memStoreScannersAfterFlush);
1026        memStoreScannersAfterFlush.addAll(memStoreScanners);
1027      }
1028    } finally {
1029      flushLock.unlock();
1030      if (updateReaders) {
1031        closeLock.unlock();
1032      }
1033    }
1034    // Let the next() call handle re-creating and seeking
1035  }
1036
1037  /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */
1038  protected final boolean reopenAfterFlush() throws IOException {
1039    // here we can make sure that we have a Store instance so no null check on store.
1040    ExtendedCell lastTop = heap.peek();
1041    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
1042    // scanners? We did so in the constructor and we could have done it now by storing the scan
1043    // object from the constructor
1044    List<KeyValueScanner> scanners;
1045    flushLock.lock();
1046    try {
1047      List<KeyValueScanner> allScanners =
1048        new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1049      allScanners.addAll(flushedstoreFileScanners);
1050      allScanners.addAll(memStoreScannersAfterFlush);
1051      scanners = selectScannersFrom(store, allScanners);
1052      // Clear the current set of flushed store files scanners so that they don't get added again
1053      flushedstoreFileScanners.clear();
1054      memStoreScannersAfterFlush.clear();
1055    } finally {
1056      flushLock.unlock();
1057    }
1058
1059    // Seek the new scanners to the last key
1060    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1061    // remove the older memstore scanner
1062    for (int i = currentScanners.size() - 1; i >= 0; i--) {
1063      if (!currentScanners.get(i).isFileScanner()) {
1064        scannersForDelayedClose.add(currentScanners.remove(i));
1065      } else {
1066        // we add the memstore scanner to the end of currentScanners
1067        break;
1068      }
1069    }
1070    // add the newly created scanners on the flushed files and the current active memstore scanner
1071    addCurrentScanners(scanners);
1072    // Combine all seeked scanners with a heap
1073    resetKVHeap(this.currentScanners, store.getComparator());
1074    resetQueryMatcher(lastTop);
1075    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1076      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()
1077        + ",and after = " + heap.peek());
1078      topChanged = true;
1079    } else {
1080      topChanged = false;
1081    }
1082    return topChanged;
1083  }
1084
1085  private void resetQueryMatcher(ExtendedCell lastTopKey) {
1086    // Reset the state of the Query Matcher and set to top row.
1087    // Only reset and call setRow if the row changes; avoids confusing the
1088    // query matcher if scanning intra-row.
1089    ExtendedCell cell = heap.peek();
1090    if (cell == null) {
1091      cell = lastTopKey;
1092    }
1093    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1094      this.countPerRow = 0;
1095      // The setToNewRow will call reset internally
1096      matcher.setToNewRow(cell);
1097    }
1098  }
1099
1100  /**
1101   * Check whether scan as expected order
1102   */
1103  protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)
1104    throws IOException {
1105    // Check that the heap gives us KVs in an increasing order.
1106    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0
1107      : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1108  }
1109
1110  protected boolean seekToNextRow(ExtendedCell c) throws IOException {
1111    return reseek(PrivateCellUtil.createLastOnRow(c));
1112  }
1113
1114  /**
1115   * Do a reseek in a normal StoreScanner(scan forward)
1116   * @return true if scanner has values left, false if end of scanner
1117   */
1118  protected boolean seekAsDirection(ExtendedCell kv) throws IOException {
1119    return reseek(kv);
1120  }
1121
1122  @Override
1123  public boolean reseek(ExtendedCell kv) throws IOException {
1124    if (checkFlushed()) {
1125      reopenAfterFlush();
1126    }
1127    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1128      return heap.requestSeek(kv, true, useRowColBloom);
1129    }
1130    return heap.reseek(kv);
1131  }
1132
1133  void trySwitchToStreamRead() {
1134    if (
1135      readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
1136        || bytesRead < preadMaxBytes
1137    ) {
1138      return;
1139    }
1140    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1141      this.store.getColumnFamilyName());
1142    scanUsePread = false;
1143    ExtendedCell lastTop = heap.peek();
1144    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1145    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1146    for (KeyValueScanner kvs : currentScanners) {
1147      if (!kvs.isFileScanner()) {
1148        // collect memstorescanners here
1149        memstoreScanners.add(kvs);
1150      } else {
1151        scannersToClose.add(kvs);
1152      }
1153    }
1154    List<KeyValueScanner> fileScanners = null;
1155    List<KeyValueScanner> newCurrentScanners;
1156    KeyValueHeap newHeap;
1157    try {
1158      // We must have a store instance here so no null check
1159      // recreate the scanners on the current file scanners
1160      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,
1161        scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
1162        readPt, false);
1163      if (fileScanners == null) {
1164        return;
1165      }
1166      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1167      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1168      newCurrentScanners.addAll(fileScanners);
1169      newCurrentScanners.addAll(memstoreScanners);
1170      newHeap = newKVHeap(newCurrentScanners, comparator);
1171    } catch (Exception e) {
1172      LOG.warn("failed to switch to stream read", e);
1173      if (fileScanners != null) {
1174        fileScanners.forEach(KeyValueScanner::close);
1175      }
1176      return;
1177    }
1178    currentScanners.clear();
1179    addCurrentScanners(newCurrentScanners);
1180    this.heap = newHeap;
1181    resetQueryMatcher(lastTop);
1182    scannersToClose.forEach(KeyValueScanner::close);
1183  }
1184
1185  protected final boolean checkFlushed() {
1186    // check the var without any lock. Suppose even if we see the old
1187    // value here still it is ok to continue because we will not be resetting
1188    // the heap but will continue with the referenced memstore's snapshot. For compactions
1189    // any way we don't need the updateReaders at all to happen as we still continue with
1190    // the older files
1191    if (flushed) {
1192      // If there is a flush and the current scan is notified on the flush ensure that the
1193      // scan's heap gets reset and we do a seek on the newly flushed file.
1194      if (this.closing) {
1195        return false;
1196      }
1197      // reset the flag
1198      flushed = false;
1199      return true;
1200    }
1201    return false;
1202  }
1203
1204  /**
1205   * Seek storefiles in parallel to optimize IO latency as much as possible
1206   * @param scanners the list {@link KeyValueScanner}s to be read from
1207   * @param kv       the KeyValue on which the operation is being requested
1208   */
1209  private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv)
1210    throws IOException {
1211    if (scanners.isEmpty()) return;
1212    int storeFileScannerCount = scanners.size();
1213    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1214    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1215    for (KeyValueScanner scanner : scanners) {
1216      if (scanner instanceof StoreFileScanner) {
1217        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);
1218        executor.submit(seekHandler);
1219        handlers.add(seekHandler);
1220      } else {
1221        scanner.seek(kv);
1222        latch.countDown();
1223      }
1224    }
1225
1226    try {
1227      latch.await();
1228    } catch (InterruptedException ie) {
1229      throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
1230    }
1231
1232    for (ParallelSeekHandler handler : handlers) {
1233      if (handler.getErr() != null) {
1234        throw new IOException(handler.getErr());
1235      }
1236    }
1237  }
1238
1239  /**
1240   * Used in testing.
1241   * @return all scanners in no particular order
1242   */
1243  List<KeyValueScanner> getAllScannersForTesting() {
1244    List<KeyValueScanner> allScanners = new ArrayList<>();
1245    KeyValueScanner current = heap.getCurrentForTesting();
1246    if (current != null) allScanners.add(current);
1247    for (KeyValueScanner scanner : heap.getHeap())
1248      allScanners.add(scanner);
1249    return allScanners;
1250  }
1251
1252  static void enableLazySeekGlobally(boolean enable) {
1253    lazySeekEnabledGlobally = enable;
1254  }
1255
1256  /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */
1257  public long getEstimatedNumberOfKvsScanned() {
1258    return this.kvsScanned;
1259  }
1260
1261  @Override
1262  public ExtendedCell getNextIndexedKey() {
1263    return this.heap.getNextIndexedKey();
1264  }
1265
1266  @Override
1267  public void shipped() throws IOException {
1268    if (prevCell != null) {
1269      // Do the copy here so that in case the prevCell ref is pointing to the previous
1270      // blocks we can safely release those blocks.
1271      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1272      // fetched from HDFS. Copying this would ensure that we let go the references to these
1273      // blocks so that they can be GCed safely(in case of bucket cache)
1274      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1275    }
1276    matcher.beforeShipped();
1277    // There wont be further fetch of Cells from these scanners. Just close.
1278    clearAndClose(scannersForDelayedClose);
1279    if (this.heap != null) {
1280      this.heap.shipped();
1281      // When switching from pread to stream, we will open a new scanner for each store file, but
1282      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1283      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1284      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1285      // method, so we here will also open new scanners and close old scanners in shipped method.
1286      // See HBASE-18055 for more details.
1287      trySwitchToStreamRead();
1288    }
1289  }
1290}