001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.Optional;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.locks.ReentrantLock;
029import java.util.function.IntConsumer;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.ExtendedCell;
035import org.apache.hadoop.hbase.HBaseInterfaceAudience;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.PrivateConstants;
041import org.apache.hadoop.hbase.client.IsolationLevel;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.conf.ConfigKey;
044import org.apache.hadoop.hbase.executor.ExecutorService;
045import org.apache.hadoop.hbase.filter.Filter;
046import org.apache.hadoop.hbase.ipc.RpcCall;
047import org.apache.hadoop.hbase.ipc.RpcServer;
048import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
049import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
050import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
051import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
052import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
053import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
061
062/**
063 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
064 * for a single row.
065 * <p>
066 * The implementation is not thread safe. So there will be no race between next and close. The only
067 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
068 * is a flush.
069 */
070@InterfaceAudience.Private
071public class StoreScanner extends NonReversedNonLazyKeyValueScanner
072  implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
073  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
074  // In unit tests, the store could be null
075  protected final HStore store;
076  private final CellComparator comparator;
077  private ScanQueryMatcher matcher;
078  protected KeyValueHeap heap;
079  private boolean cacheBlocks;
080
081  private long countPerRow = 0;
082  private int storeLimit = -1;
083  private int storeOffset = 0;
084
085  // Used to indicate that the scanner has closed (see HBASE-1107)
086  private volatile boolean closing = false;
087  private final boolean get;
088  private final boolean explicitColumnQuery;
089  private final boolean useRowColBloom;
090  /**
091   * A flag that enables StoreFileScanner parallel-seeking
092   */
093  private boolean parallelSeekEnabled = false;
094  private ExecutorService executor;
095  private final Scan scan;
096  private final long oldestUnexpiredTS;
097  private final long now;
098  private final int minVersions;
099  private final long maxRowSize;
100  private final long cellsPerHeartbeatCheck;
101  long memstoreOnlyReads;
102  long mixedReads;
103
104  // 1) Collects all the KVHeap that are eagerly getting closed during the
105  // course of a scan
106  // 2) Collects the unused memstore scanners. If we close the memstore scanners
107  // before sending data to client, the chunk may be reclaimed by other
108  // updates and the data will be corrupt.
109  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
110
111  /**
112   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via
113   * seeking to next row/column. TODO: estimate them?
114   */
115  private long kvsScanned = 0;
116  private ExtendedCell prevCell = null;
117
118  private final long preadMaxBytes;
119  private long bytesRead;
120
121  /** We don't ever expect to change this, the constant is just for clarity. */
122  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
123  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
124    "hbase.storescanner.parallel.seek.enable";
125
126  /** Used during unit testing to ensure that lazy seek does save seek ops */
127  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
128
129  /**
130   * The number of cells scanned in between timeout checks. Specifying a larger value means that
131   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
132   * timeout checks.
133   */
134  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
135    ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check");
136
137  /**
138   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
139   */
140  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
141
142  /**
143   * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
144   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
145   * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT,
146   * we will open scanner with stream mode itself.
147   */
148  public static final String STORESCANNER_PREAD_MAX_BYTES =
149    ConfigKey.LONG("hbase.storescanner.pread.max.bytes");
150
151  private final Scan.ReadType readType;
152
153  // A flag whether use pread for scan
154  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
155  private boolean scanUsePread;
156  // Indicates whether there was flush during the course of the scan
157  private volatile boolean flushed = false;
158  // generally we get one file from a flush
159  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
160  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
161  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
162  // The current list of scanners
163  final List<KeyValueScanner> currentScanners = new ArrayList<>();
164  // flush update lock
165  private final ReentrantLock flushLock = new ReentrantLock();
166  // lock for closing.
167  private final ReentrantLock closeLock = new ReentrantLock();
168
169  protected final long readPt;
170  private boolean topChanged = false;
171
172  // These are used to verify the state of the scanner during testing.
173  private static AtomicBoolean hasUpdatedReaders;
174  private static AtomicBoolean hasSwitchedToStreamRead;
175
176  /** An internal constructor. */
177  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
178    boolean cacheBlocks, ScanType scanType) {
179    this.readPt = readPt;
180    this.store = store;
181    this.cacheBlocks = cacheBlocks;
182    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
183    get = scan.isGetScan();
184    explicitColumnQuery = numColumns > 0;
185    this.scan = scan;
186    this.now = EnvironmentEdgeManager.currentTime();
187    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
188    this.minVersions = scanInfo.getMinVersions();
189
190    // We look up row-column Bloom filters for multi-column queries as part of
191    // the seek operation. However, we also look the row-column Bloom filter
192    // for multi-row (non-"get") scans because this is not done in
193    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
194    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null
195      || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
196    this.maxRowSize = scanInfo.getTableMaxRowSize();
197    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
198    if (get) {
199      this.readType = Scan.ReadType.PREAD;
200      this.scanUsePread = true;
201    } else if (scanType != ScanType.USER_SCAN) {
202      // For compaction scanners never use Pread as already we have stream based scanners on the
203      // store files to be compacted
204      this.readType = Scan.ReadType.STREAM;
205      this.scanUsePread = false;
206    } else {
207      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
208        if (scanInfo.isUsePread()) {
209          this.readType = Scan.ReadType.PREAD;
210        } else if (this.preadMaxBytes < 0) {
211          this.readType = Scan.ReadType.STREAM;
212        } else {
213          this.readType = Scan.ReadType.DEFAULT;
214        }
215      } else {
216        this.readType = scan.getReadType();
217      }
218      // Always start with pread unless user specific stream. Will change to stream later if
219      // readType is default if the scan keeps running for a long time.
220      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
221    }
222    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
223    // Parallel seeking is on if the config allows and more there is more than one store file.
224    if (store != null && store.getStorefilesCount() > 1) {
225      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
226      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
227        this.parallelSeekEnabled = true;
228        this.executor = rsService.getExecutorService();
229      }
230    }
231  }
232
233  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
234    this.currentScanners.addAll(scanners);
235  }
236
237  private static boolean isOnlyLatestVersionScan(Scan scan) {
238    // No need to check for Scan#getMaxVersions because live version files generated by store file
239    // writer retains max versions specified in ColumnFamilyDescriptor for the given CF
240    return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
241  }
242
243  /**
244   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a
245   * compaction.
246   * @param store   who we scan
247   * @param scan    the spec
248   * @param columns which columns we are scanning
249   */
250  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
251    long readPt) throws IOException {
252    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
253      ScanType.USER_SCAN);
254    if (columns != null && scan.isRaw()) {
255      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
256    }
257    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
258      store.getCoprocessorHost());
259
260    store.addChangedReaderObserver(this);
261
262    List<KeyValueScanner> scanners = null;
263    try {
264      // Pass columns to try to filter out unnecessary StoreFiles.
265      scanners = selectScannersFrom(store,
266        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
267          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt,
268          isOnlyLatestVersionScan(scan)));
269
270      // Seek all scanners to the start of the Row (or if the exact matching row
271      // key does not exist, then to the start of the next matching Row).
272      // Always check bloom filter to optimize the top row seek for delete
273      // family marker.
274      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
275        parallelSeekEnabled);
276
277      // set storeLimit
278      this.storeLimit = scan.getMaxResultsPerColumnFamily();
279
280      // set rowOffset
281      this.storeOffset = scan.getRowOffsetPerColumnFamily();
282      addCurrentScanners(scanners);
283      // Combine all seeked scanners with a heap
284      resetKVHeap(scanners, comparator);
285    } catch (IOException e) {
286      clearAndClose(scanners);
287      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
288      // and might cause memory leak
289      store.deleteChangedReaderObserver(this);
290      throw e;
291    }
292  }
293
294  // a dummy scan instance for compaction.
295  private static final Scan SCAN_FOR_COMPACTION = new Scan();
296
297  /**
298   * Used for store file compaction and memstore compaction.
299   * <p>
300   * Opens a scanner across specified StoreFiles/MemStoreSegments.
301   * @param store             who we scan
302   * @param scanners          ancillary scanners
303   * @param smallestReadPoint the readPoint that we should use for tracking versions
304   */
305  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
306    ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
307    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
308  }
309
310  /**
311   * Used for compactions that drop deletes from a limited range of rows.
312   * <p>
313   * Opens a scanner across specified StoreFiles.
314   * @param store              who we scan
315   * @param scanners           ancillary scanners
316   * @param smallestReadPoint  the readPoint that we should use for tracking versions
317   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
318   * @param dropDeletesToRow   The exclusive right bound of the range; can be EMPTY_END_ROW.
319   */
320  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
321    long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)
322    throws IOException {
323    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
324      earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
325  }
326
327  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
328    ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
329    byte[] dropDeletesToRow) throws IOException {
330    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
331      store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
332    assert scanType != ScanType.USER_SCAN;
333    matcher =
334      CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
335        oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
336
337    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
338    scanners = selectScannersFrom(store, scanners);
339
340    // Seek all scanners to the initial key
341    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
342    addCurrentScanners(scanners);
343    // Combine all seeked scanners with a heap
344    resetKVHeap(scanners, comparator);
345  }
346
347  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
348    throws IOException {
349    // Seek all scanners to the initial key
350    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
351    addCurrentScanners(scanners);
352    resetKVHeap(scanners, comparator);
353  }
354
355  // For mob compaction only as we do not have a Store instance when doing mob compaction.
356  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
357    List<? extends KeyValueScanner> scanners) throws IOException {
358    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
359    assert scanType != ScanType.USER_SCAN;
360    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
361      oldestUnexpiredTS, now, null, null, null);
362    seekAllScanner(scanInfo, scanners);
363  }
364
365  // Used to instantiate a scanner for user scan in test
366  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
367    List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
368    // 0 is passed as readpoint because the test bypasses Store
369    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
370      scanType);
371    if (scanType == ScanType.USER_SCAN) {
372      this.matcher =
373        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
374    } else {
375      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
376        PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
377    }
378    seekAllScanner(scanInfo, scanners);
379  }
380
381  // Used to instantiate a scanner for user scan in test
382  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
383    List<? extends KeyValueScanner> scanners) throws IOException {
384    // 0 is passed as readpoint because the test bypasses Store
385    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
386      ScanType.USER_SCAN);
387    this.matcher =
388      UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
389    seekAllScanner(scanInfo, scanners);
390  }
391
392  // Used to instantiate a scanner for compaction in test
393  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
394    List<? extends KeyValueScanner> scanners) throws IOException {
395    // 0 is passed as readpoint because the test bypasses Store
396    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,
397      scanInfo, 0, 0L, false, scanType);
398    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
399      PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
400    seekAllScanner(scanInfo, scanners);
401  }
402
403  boolean isScanUsePread() {
404    return this.scanUsePread;
405  }
406
407  /**
408   * Seek the specified scanners with the given key
409   * @param isLazy         true if using lazy seek
410   * @param isParallelSeek true if using parallel seek
411   */
412  protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey,
413    boolean isLazy, boolean isParallelSeek) throws IOException {
414    // Seek all scanners to the start of the Row (or if the exact matching row
415    // key does not exist, then to the start of the next matching Row).
416    // Always check bloom filter to optimize the top row seek for delete
417    // family marker.
418    if (isLazy) {
419      for (KeyValueScanner scanner : scanners) {
420        scanner.requestSeek(seekKey, false, true);
421      }
422    } else {
423      if (!isParallelSeek) {
424        long totalScannersSoughtBytes = 0;
425        for (KeyValueScanner scanner : scanners) {
426          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
427            throw new RowTooBigException(
428              "Max row size allowed: " + maxRowSize + ", but row is bigger than that");
429          }
430          scanner.seek(seekKey);
431          Cell c = scanner.peek();
432          if (c != null) {
433            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
434          }
435        }
436      } else {
437        parallelSeek(scanners, seekKey);
438      }
439    }
440  }
441
442  protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
443    throws IOException {
444    // Combine all seeked scanners with a heap
445    heap = newKVHeap(scanners, comparator);
446  }
447
448  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
449    CellComparator comparator) throws IOException {
450    return new KeyValueHeap(scanners, comparator);
451  }
452
453  /**
454   * Filters the given list of scanners using Bloom filter, time range, and TTL.
455   * <p>
456   * Will be overridden by testcase so declared as protected.
457   */
458  protected List<KeyValueScanner> selectScannersFrom(HStore store,
459    List<? extends KeyValueScanner> allScanners) {
460    boolean memOnly;
461    boolean filesOnly;
462    if (scan instanceof InternalScan) {
463      InternalScan iscan = (InternalScan) scan;
464      memOnly = iscan.isCheckOnlyMemStore();
465      filesOnly = iscan.isCheckOnlyStoreFiles();
466    } else {
467      memOnly = false;
468      filesOnly = false;
469    }
470
471    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
472
473    // We can only exclude store files based on TTL if minVersions is set to 0.
474    // Otherwise, we might have to return KVs that have technically expired.
475    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
476
477    // include only those scan files which pass all filters
478    for (KeyValueScanner kvs : allScanners) {
479      boolean isFile = kvs.isFileScanner();
480      if ((!isFile && filesOnly) || (isFile && memOnly)) {
481        kvs.close();
482        continue;
483      }
484
485      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
486        scanners.add(kvs);
487      } else {
488        kvs.close();
489      }
490    }
491    return scanners;
492  }
493
494  @Override
495  public ExtendedCell peek() {
496    return heap != null ? heap.peek() : null;
497  }
498
499  @Override
500  public KeyValue next() {
501    // throw runtime exception perhaps?
502    throw new RuntimeException("Never call StoreScanner.next()");
503  }
504
505  @Override
506  public void close() {
507    close(true);
508  }
509
510  private void close(boolean withDelayedScannersClose) {
511    closeLock.lock();
512    // If the closeLock is acquired then any subsequent updateReaders()
513    // call is ignored.
514    try {
515      if (this.closing) {
516        return;
517      }
518      if (withDelayedScannersClose) {
519        this.closing = true;
520      }
521      // For mob compaction, we do not have a store.
522      if (this.store != null) {
523        this.store.deleteChangedReaderObserver(this);
524      }
525      if (withDelayedScannersClose) {
526        clearAndClose(scannersForDelayedClose);
527        clearAndClose(memStoreScannersAfterFlush);
528        clearAndClose(flushedstoreFileScanners);
529        if (this.heap != null) {
530          this.heap.close();
531          this.currentScanners.clear();
532          this.heap = null; // CLOSED!
533        }
534      } else {
535        if (this.heap != null) {
536          this.scannersForDelayedClose.add(this.heap);
537          this.currentScanners.clear();
538          this.heap = null;
539        }
540      }
541    } finally {
542      closeLock.unlock();
543    }
544  }
545
546  @Override
547  public boolean seek(ExtendedCell key) throws IOException {
548    if (checkFlushed()) {
549      reopenAfterFlush();
550    }
551    return this.heap.seek(key);
552  }
553
554  /**
555   * Get the next row of values from this Store.
556   * @return true if there are more rows, false if scanner is done
557   */
558  @Override
559  public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext)
560    throws IOException {
561    if (scannerContext == null) {
562      throw new IllegalArgumentException("Scanner context cannot be null");
563    }
564    if (checkFlushed() && reopenAfterFlush()) {
565      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
566    }
567
568    // if the heap was left null, then the scanners had previously run out anyways, close and
569    // return.
570    if (this.heap == null) {
571      // By this time partial close should happened because already heap is null
572      close(false);// Do all cleanup except heap.close()
573      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
574    }
575
576    ExtendedCell cell = this.heap.peek();
577    if (cell == null) {
578      close(false);// Do all cleanup except heap.close()
579      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
580    }
581
582    // only call setRow if the row changes; avoids confusing the query matcher
583    // if scanning intra-row
584
585    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
586    // rows. Else it is possible we are still traversing the same row so we must perform the row
587    // comparison.
588    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
589      this.countPerRow = 0;
590      matcher.setToNewRow(cell);
591    }
592
593    // Clear progress away unless invoker has indicated it should be kept.
594    if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
595      scannerContext.clearProgress();
596    }
597
598    Optional<RpcCall> rpcCall =
599      matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
600    // re-useable closure to avoid allocations
601    IntConsumer recordBlockSize = blockSize -> {
602      if (rpcCall.isPresent()) {
603        rpcCall.get().incrementBlockBytesScanned(blockSize);
604      }
605      scannerContext.incrementBlockProgress(blockSize);
606    };
607
608    int count = 0;
609    long totalBytesRead = 0;
610    // track the cells for metrics only if it is a user read request.
611    boolean onlyFromMemstore = matcher.isUserScan();
612    try {
613      LOOP: do {
614        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
615        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
616        // in
617        // the shipped method below.
618        if (
619          kvsScanned % cellsPerHeartbeatCheck == 0
620            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
621        ) {
622          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
623            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
624          }
625        }
626        // Do object compare - we set prevKV from the same heap.
627        if (prevCell != cell) {
628          ++kvsScanned;
629        }
630        checkScanOrder(prevCell, cell, comparator);
631        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
632        bytesRead += cellSize;
633        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
634          // return immediately if we want to switch from pread to stream. We need this because we
635          // can
636          // only switch in the shipped method, if user use a filter to filter out everything and
637          // rpc
638          // timeout is very large then the shipped method will never be called until the whole scan
639          // is finished, but at that time we have already scan all the data...
640          // See HBASE-20457 for more details.
641          // And there is still a scenario that can not be handled. If we have a very large row,
642          // which
643          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
644          // here, we still need to scan all the qualifiers before returning...
645          scannerContext.returnImmediately();
646        }
647
648        heap.recordBlockSize(recordBlockSize);
649
650        prevCell = cell;
651        scannerContext.setLastPeekedCell(cell);
652        topChanged = false;
653        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
654        switch (qcode) {
655          case INCLUDE:
656          case INCLUDE_AND_SEEK_NEXT_ROW:
657          case INCLUDE_AND_SEEK_NEXT_COL:
658            Filter f = matcher.getFilter();
659            if (f != null) {
660              Cell transformedCell = f.transformCell(cell);
661              // fast path, most filters just return the same cell instance
662              if (transformedCell != cell) {
663                if (transformedCell instanceof ExtendedCell) {
664                  cell = (ExtendedCell) transformedCell;
665                } else {
666                  throw new DoNotRetryIOException("Incorrect filter implementation, "
667                    + "the Cell returned by transformCell is not an ExtendedCell. Filter class: "
668                    + f.getClass().getName());
669                }
670              }
671            }
672            this.countPerRow++;
673
674            // add to results only if we have skipped #storeOffset kvs
675            // also update metric accordingly
676            if (this.countPerRow > storeOffset) {
677              outResult.add(cell);
678
679              // Update local tracking information
680              count++;
681              totalBytesRead += cellSize;
682
683              /**
684               * Increment the metric if all the cells are from memstore. If not we will account it
685               * for mixed reads
686               */
687              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
688              // Update the progress of the scanner context
689              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
690              scannerContext.incrementBatchProgress(1);
691
692              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
693                String message = "Max row size allowed: " + maxRowSize
694                  + ", but the row is bigger than that, the row info: "
695                  + CellUtil.toString(cell, false) + ", already have process row cells = "
696                  + outResult.size() + ", it belong to region = "
697                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
698                LOG.warn(message);
699                throw new RowTooBigException(message);
700              }
701
702              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
703                // do what SEEK_NEXT_ROW does.
704                if (!matcher.moreRowsMayExistAfter(cell)) {
705                  close(false);// Do all cleanup except heap.close()
706                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
707                }
708                matcher.clearCurrentRow();
709                seekToNextRow(cell);
710                break LOOP;
711              }
712            }
713
714            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
715              if (!matcher.moreRowsMayExistAfter(cell)) {
716                close(false);// Do all cleanup except heap.close()
717                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
718              }
719              matcher.clearCurrentRow();
720              seekOrSkipToNextRow(cell);
721            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
722              seekOrSkipToNextColumn(cell);
723            } else {
724              this.heap.next();
725            }
726
727            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
728              break LOOP;
729            }
730            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
731              break LOOP;
732            }
733            continue;
734
735          case DONE:
736            // Optimization for Gets! If DONE, no more to get on this row, early exit!
737            if (get) {
738              // Then no more to this row... exit.
739              close(false);// Do all cleanup except heap.close()
740              // update metric
741              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
742            }
743            matcher.clearCurrentRow();
744            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
745
746          case DONE_SCAN:
747            close(false);// Do all cleanup except heap.close()
748            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
749
750          case SEEK_NEXT_ROW:
751            // This is just a relatively simple end of scan fix, to short-cut end
752            // us if there is an endKey in the scan.
753            if (!matcher.moreRowsMayExistAfter(cell)) {
754              close(false);// Do all cleanup except heap.close()
755              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
756            }
757            matcher.clearCurrentRow();
758            seekOrSkipToNextRow(cell);
759            NextState stateAfterSeekNextRow = needToReturn();
760            if (stateAfterSeekNextRow != null) {
761              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
762            }
763            break;
764
765          case SEEK_NEXT_COL:
766            seekOrSkipToNextColumn(cell);
767            NextState stateAfterSeekNextColumn = needToReturn();
768            if (stateAfterSeekNextColumn != null) {
769              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
770            }
771            break;
772
773          case SKIP:
774            this.heap.next();
775            break;
776
777          case SEEK_NEXT_USING_HINT:
778            ExtendedCell nextKV = matcher.getNextKeyHint(cell);
779            if (nextKV != null) {
780              int difference = comparator.compare(nextKV, cell);
781              if (
782                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
783              ) {
784                seekAsDirection(nextKV);
785                NextState stateAfterSeekByHint = needToReturn();
786                if (stateAfterSeekByHint != null) {
787                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
788                }
789                break;
790              }
791            }
792            heap.next();
793            break;
794
795          default:
796            throw new RuntimeException("UNEXPECTED");
797        }
798
799        // One last chance to break due to size limit. The INCLUDE* cases above already check
800        // limit and continue. For the various filtered cases, we need to check because block
801        // size limit may have been exceeded even if we don't add cells to result list.
802        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
803          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
804        }
805      } while ((cell = this.heap.peek()) != null);
806
807      if (count > 0) {
808        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
809      }
810
811      // No more keys
812      close(false);// Do all cleanup except heap.close()
813      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
814    } finally {
815      // increment only if we have some result
816      if (count > 0 && matcher.isUserScan()) {
817        // if true increment memstore metrics, if not the mixed one
818        updateMetricsStore(onlyFromMemstore);
819      }
820    }
821  }
822
823  private void updateMetricsStore(boolean memstoreRead) {
824    if (store != null) {
825      store.updateMetricsStore(memstoreRead);
826    } else {
827      // for testing.
828      if (memstoreRead) {
829        memstoreOnlyReads++;
830      } else {
831        mixedReads++;
832      }
833    }
834  }
835
836  /**
837   * If the top cell won't be flushed into disk, the new top cell may be changed after
838   * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the
839   * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
840   * is changed, we should return the current cells. Otherwise, we may return the cells across
841   * different rows.
842   * @return null is the top cell doesn't change. Otherwise, the NextState to return
843   */
844  private NextState needToReturn() {
845    if (topChanged) {
846      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
847    }
848    return null;
849  }
850
851  private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException {
852    // If it is a Get Scan, then we know that we are done with this row; there are no more
853    // rows beyond the current one: don't try to optimize.
854    if (!get) {
855      if (trySkipToNextRow(cell)) {
856        return;
857      }
858    }
859    seekToNextRow(cell);
860  }
861
862  private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException {
863    if (!trySkipToNextColumn(cell)) {
864      seekAsDirection(matcher.getKeyForNextColumn(cell));
865    }
866  }
867
868  /**
869   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
870   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an
871   * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to
872   * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
873   * current, loaded block). It does this by looking at the next indexed key of the current HFile.
874   * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible
875   * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work
876   * with the current Cell but compare as though it were a seek key; see down in
877   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,
878   * otherwise we just SKIP to the next requested cell.
879   * <p>
880   * Other notes:
881   * <ul>
882   * <li>Rows can straddle block boundaries</li>
883   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
884   * different block than column C1 at T2)</li>
885   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few
886   * SKIPs...</li>
887   * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells,
888   * especially if we know we need to go to the next block.</li>
889   * </ul>
890   * <p>
891   * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll
892   * likely end up seeking to the next block (or past the next block) to get our next column.
893   * Example:
894   *
895   * <pre>
896   * |    BLOCK 1              |     BLOCK 2                   |
897   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
898   *                                   ^         ^
899   *                                   |         |
900   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
901   *
902   *
903   * |    BLOCK 1                       |     BLOCK 2                      |
904   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
905   *                                            ^              ^
906   *                                            |              |
907   *                                    Next Index Key        SEEK_NEXT_COL
908   * </pre>
909   *
910   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
911   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
912   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
913   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
914   * where the SEEK will not land us in the next block, it is very likely better to issues a series
915   * of SKIPs.
916   * @param cell current cell
917   * @return true means skip to next row, false means not
918   */
919  protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException {
920    ExtendedCell nextCell = null;
921    // used to guard against a changed next indexed key by doing a identity comparison
922    // when the identity changes we need to compare the bytes again
923    ExtendedCell previousIndexedKey = null;
924    do {
925      ExtendedCell nextIndexedKey = getNextIndexedKey();
926      if (
927        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
928          && (nextIndexedKey == previousIndexedKey
929            || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)
930      ) {
931        this.heap.next();
932        ++kvsScanned;
933        previousIndexedKey = nextIndexedKey;
934      } else {
935        return false;
936      }
937    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
938    return true;
939  }
940
941  /**
942   * See {@link #trySkipToNextRow(ExtendedCell)}
943   * @param cell current cell
944   * @return true means skip to next column, false means not
945   */
946  protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException {
947    ExtendedCell nextCell = null;
948    // used to guard against a changed next indexed key by doing a identity comparison
949    // when the identity changes we need to compare the bytes again
950    ExtendedCell previousIndexedKey = null;
951    do {
952      ExtendedCell nextIndexedKey = getNextIndexedKey();
953      if (
954        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
955          && (nextIndexedKey == previousIndexedKey
956            || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)
957      ) {
958        this.heap.next();
959        ++kvsScanned;
960        previousIndexedKey = nextIndexedKey;
961      } else {
962        return false;
963      }
964    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
965    // We need this check because it may happen that the new scanner that we get
966    // during heap.next() is requiring reseek due of fake KV previously generated for
967    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
968    if (
969      useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP
970    ) {
971      return false;
972    }
973    return true;
974  }
975
976  @Override
977  public long getReadPoint() {
978    return this.readPt;
979  }
980
981  private static void clearAndClose(List<KeyValueScanner> scanners) {
982    if (scanners == null) {
983      return;
984    }
985    for (KeyValueScanner s : scanners) {
986      s.close();
987    }
988    scanners.clear();
989  }
990
991  // Implementation of ChangedReadersObserver
992  @Override
993  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
994    throws IOException {
995    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
996      return;
997    }
998    boolean updateReaders = false;
999    flushLock.lock();
1000    try {
1001      if (!closeLock.tryLock()) {
1002        // The reason for doing this is that when the current store scanner does not retrieve
1003        // any new cells, then the scanner is considered to be done. The heap of this scanner
1004        // is not closed till the shipped() call is completed. Hence in that case if at all
1005        // the partial close (close (false)) has been called before updateReaders(), there is no
1006        // need for the updateReaders() to happen.
1007        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
1008        // no lock acquired.
1009        clearAndClose(memStoreScanners);
1010        return;
1011      }
1012      // lock acquired
1013      updateReaders = true;
1014      if (this.closing) {
1015        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
1016        clearAndClose(memStoreScanners);
1017        return;
1018      }
1019      flushed = true;
1020      final boolean isCompaction = false;
1021      boolean usePread = get || scanUsePread;
1022      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
1023      // calls next(). So its better we create scanners here rather than next() call. Ensure
1024      // these scanners are properly closed() whether or not the scan is completed successfully
1025      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
1026      // store files. In case of stream scanners this eager creation does not induce performance
1027      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
1028      List<KeyValueScanner> scanners =
1029        store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher,
1030          scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan));
1031      flushedstoreFileScanners.addAll(scanners);
1032      if (!CollectionUtils.isEmpty(memStoreScanners)) {
1033        clearAndClose(memStoreScannersAfterFlush);
1034        memStoreScannersAfterFlush.addAll(memStoreScanners);
1035      }
1036    } finally {
1037      flushLock.unlock();
1038      if (updateReaders) {
1039        closeLock.unlock();
1040      }
1041      if (hasUpdatedReaders != null) {
1042        hasUpdatedReaders.set(true);
1043      }
1044    }
1045    // Let the next() call handle re-creating and seeking
1046  }
1047
1048  /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */
1049  protected final boolean reopenAfterFlush() throws IOException {
1050    // here we can make sure that we have a Store instance so no null check on store.
1051    ExtendedCell lastTop = heap.peek();
1052    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
1053    // scanners? We did so in the constructor and we could have done it now by storing the scan
1054    // object from the constructor
1055    List<KeyValueScanner> scanners;
1056    flushLock.lock();
1057    try {
1058      List<KeyValueScanner> allScanners =
1059        new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1060      allScanners.addAll(flushedstoreFileScanners);
1061      allScanners.addAll(memStoreScannersAfterFlush);
1062      scanners = selectScannersFrom(store, allScanners);
1063      // Clear the current set of flushed store files scanners so that they don't get added again
1064      flushedstoreFileScanners.clear();
1065      memStoreScannersAfterFlush.clear();
1066    } finally {
1067      flushLock.unlock();
1068    }
1069
1070    // Seek the new scanners to the last key
1071    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1072    // remove the older memstore scanner
1073    for (int i = currentScanners.size() - 1; i >= 0; i--) {
1074      if (!currentScanners.get(i).isFileScanner()) {
1075        scannersForDelayedClose.add(currentScanners.remove(i));
1076      } else {
1077        // we add the memstore scanner to the end of currentScanners
1078        break;
1079      }
1080    }
1081    // add the newly created scanners on the flushed files and the current active memstore scanner
1082    addCurrentScanners(scanners);
1083    // Combine all seeked scanners with a heap
1084    resetKVHeap(this.currentScanners, store.getComparator());
1085    resetQueryMatcher(lastTop);
1086    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1087      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()
1088        + ",and after = " + heap.peek());
1089      topChanged = true;
1090    } else {
1091      topChanged = false;
1092    }
1093    return topChanged;
1094  }
1095
1096  private void resetQueryMatcher(ExtendedCell lastTopKey) {
1097    // Reset the state of the Query Matcher and set to top row.
1098    // Only reset and call setRow if the row changes; avoids confusing the
1099    // query matcher if scanning intra-row.
1100    ExtendedCell cell = heap.peek();
1101    if (cell == null) {
1102      cell = lastTopKey;
1103    }
1104    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1105      this.countPerRow = 0;
1106      // The setToNewRow will call reset internally
1107      matcher.setToNewRow(cell);
1108    }
1109  }
1110
1111  /**
1112   * Check whether scan as expected order
1113   */
1114  protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)
1115    throws IOException {
1116    // Check that the heap gives us KVs in an increasing order.
1117    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0
1118      : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1119  }
1120
1121  protected boolean seekToNextRow(ExtendedCell c) throws IOException {
1122    return reseek(PrivateCellUtil.createLastOnRow(c));
1123  }
1124
1125  /**
1126   * Do a reseek in a normal StoreScanner(scan forward)
1127   * @return true if scanner has values left, false if end of scanner
1128   */
1129  protected boolean seekAsDirection(ExtendedCell kv) throws IOException {
1130    return reseek(kv);
1131  }
1132
1133  @Override
1134  public boolean reseek(ExtendedCell kv) throws IOException {
1135    if (checkFlushed()) {
1136      reopenAfterFlush();
1137    }
1138    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1139      return heap.requestSeek(kv, true, useRowColBloom);
1140    }
1141    return heap.reseek(kv);
1142  }
1143
1144  void trySwitchToStreamRead() {
1145    if (
1146      readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
1147        || bytesRead < preadMaxBytes
1148    ) {
1149      return;
1150    }
1151    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1152      this.store.getColumnFamilyName());
1153    scanUsePread = false;
1154    ExtendedCell lastTop = heap.peek();
1155    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1156    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1157    for (KeyValueScanner kvs : currentScanners) {
1158      if (!kvs.isFileScanner()) {
1159        // collect memstorescanners here
1160        memstoreScanners.add(kvs);
1161      } else {
1162        scannersToClose.add(kvs);
1163      }
1164    }
1165    List<KeyValueScanner> fileScanners = null;
1166    List<KeyValueScanner> newCurrentScanners;
1167    KeyValueHeap newHeap;
1168    try {
1169      // We must have a store instance here so no null check
1170      // recreate the scanners on the current file scanners
1171      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,
1172        scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
1173        readPt, false);
1174      if (fileScanners == null) {
1175        return;
1176      }
1177      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1178      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1179      newCurrentScanners.addAll(fileScanners);
1180      newCurrentScanners.addAll(memstoreScanners);
1181      newHeap = newKVHeap(newCurrentScanners, comparator);
1182    } catch (Exception e) {
1183      LOG.warn("failed to switch to stream read", e);
1184      if (fileScanners != null) {
1185        fileScanners.forEach(KeyValueScanner::close);
1186      }
1187      return;
1188    }
1189    currentScanners.clear();
1190    addCurrentScanners(newCurrentScanners);
1191    this.heap = newHeap;
1192    resetQueryMatcher(lastTop);
1193    scannersToClose.forEach(KeyValueScanner::close);
1194    if (hasSwitchedToStreamRead != null) {
1195      hasSwitchedToStreamRead.set(true);
1196    }
1197  }
1198
1199  protected final boolean checkFlushed() {
1200    // check the var without any lock. Suppose even if we see the old
1201    // value here still it is ok to continue because we will not be resetting
1202    // the heap but will continue with the referenced memstore's snapshot. For compactions
1203    // any way we don't need the updateReaders at all to happen as we still continue with
1204    // the older files
1205    if (flushed) {
1206      // If there is a flush and the current scan is notified on the flush ensure that the
1207      // scan's heap gets reset and we do a seek on the newly flushed file.
1208      if (this.closing) {
1209        return false;
1210      }
1211      // reset the flag
1212      flushed = false;
1213      return true;
1214    }
1215    return false;
1216  }
1217
1218  /**
1219   * Seek storefiles in parallel to optimize IO latency as much as possible
1220   * @param scanners the list {@link KeyValueScanner}s to be read from
1221   * @param kv       the KeyValue on which the operation is being requested
1222   */
1223  private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv)
1224    throws IOException {
1225    if (scanners.isEmpty()) return;
1226    int storeFileScannerCount = scanners.size();
1227    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1228    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1229    for (KeyValueScanner scanner : scanners) {
1230      if (scanner instanceof StoreFileScanner) {
1231        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);
1232        executor.submit(seekHandler);
1233        handlers.add(seekHandler);
1234      } else {
1235        scanner.seek(kv);
1236        latch.countDown();
1237      }
1238    }
1239
1240    try {
1241      latch.await();
1242    } catch (InterruptedException ie) {
1243      throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
1244    }
1245
1246    for (ParallelSeekHandler handler : handlers) {
1247      if (handler.getErr() != null) {
1248        throw new IOException(handler.getErr());
1249      }
1250    }
1251  }
1252
1253  /**
1254   * Used in testing.
1255   * @return all scanners in no particular order
1256   */
1257  List<KeyValueScanner> getAllScannersForTesting() {
1258    List<KeyValueScanner> allScanners = new ArrayList<>();
1259    KeyValueScanner current = heap.getCurrentForTesting();
1260    if (current != null) allScanners.add(current);
1261    for (KeyValueScanner scanner : heap.getHeap())
1262      allScanners.add(scanner);
1263    return allScanners;
1264  }
1265
1266  static void enableLazySeekGlobally(boolean enable) {
1267    lazySeekEnabledGlobally = enable;
1268  }
1269
1270  /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */
1271  public long getEstimatedNumberOfKvsScanned() {
1272    return this.kvsScanned;
1273  }
1274
1275  @Override
1276  public ExtendedCell getNextIndexedKey() {
1277    return this.heap.getNextIndexedKey();
1278  }
1279
1280  @Override
1281  public void shipped() throws IOException {
1282    if (prevCell != null) {
1283      // Do the copy here so that in case the prevCell ref is pointing to the previous
1284      // blocks we can safely release those blocks.
1285      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1286      // fetched from HDFS. Copying this would ensure that we let go the references to these
1287      // blocks so that they can be GCed safely(in case of bucket cache)
1288      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1289    }
1290    matcher.beforeShipped();
1291    // There wont be further fetch of Cells from these scanners. Just close.
1292    clearAndClose(scannersForDelayedClose);
1293    if (this.heap != null) {
1294      this.heap.shipped();
1295      // When switching from pread to stream, we will open a new scanner for each store file, but
1296      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1297      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1298      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1299      // method, so we here will also open new scanners and close old scanners in shipped method.
1300      // See HBASE-18055 for more details.
1301      trySwitchToStreamRead();
1302    }
1303  }
1304
1305  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1306  static final void instrument() {
1307    hasUpdatedReaders = new AtomicBoolean(false);
1308    hasSwitchedToStreamRead = new AtomicBoolean(false);
1309  }
1310
1311  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1312  static final boolean hasUpdatedReaders() {
1313    return hasUpdatedReaders.get();
1314  }
1315
1316  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1317  static final boolean hasSwitchedToStreamRead() {
1318    return hasSwitchedToStreamRead.get();
1319  }
1320
1321  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1322  static final void resetHasUpdatedReaders() {
1323    hasUpdatedReaders.set(false);
1324  }
1325
1326  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1327  static final void resetHasSwitchedToStreamRead() {
1328    hasSwitchedToStreamRead.set(false);
1329  }
1330}