001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.Optional;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.locks.ReentrantLock;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.KeyValueUtil;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.PrivateConstants;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.executor.ExecutorService;
041import org.apache.hadoop.hbase.filter.Filter;
042import org.apache.hadoop.hbase.ipc.RpcCall;
043import org.apache.hadoop.hbase.ipc.RpcServer;
044import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
045import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
046import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
047import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
048import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
049import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
056import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
057
058/**
059 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
060 * for a single row.
061 * <p>
062 * The implementation is not thread safe. So there will be no race between next and close. The only
063 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
064 * is a flush.
065 */
066@InterfaceAudience.Private
067public class StoreScanner extends NonReversedNonLazyKeyValueScanner
068  implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
069  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
070  // In unit tests, the store could be null
071  protected final HStore store;
072  private final CellComparator comparator;
073  private ScanQueryMatcher matcher;
074  protected KeyValueHeap heap;
075  private boolean cacheBlocks;
076
077  private long countPerRow = 0;
078  private int storeLimit = -1;
079  private int storeOffset = 0;
080
081  // Used to indicate that the scanner has closed (see HBASE-1107)
082  private volatile boolean closing = false;
083  private final boolean get;
084  private final boolean explicitColumnQuery;
085  private final boolean useRowColBloom;
086  /**
087   * A flag that enables StoreFileScanner parallel-seeking
088   */
089  private boolean parallelSeekEnabled = false;
090  private ExecutorService executor;
091  private final Scan scan;
092  private final long oldestUnexpiredTS;
093  private final long now;
094  private final int minVersions;
095  private final long maxRowSize;
096  private final long cellsPerHeartbeatCheck;
097  long memstoreOnlyReads;
098  long mixedReads;
099
100  // 1) Collects all the KVHeap that are eagerly getting closed during the
101  // course of a scan
102  // 2) Collects the unused memstore scanners. If we close the memstore scanners
103  // before sending data to client, the chunk may be reclaimed by other
104  // updates and the data will be corrupt.
105  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
106
107  /**
108   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via
109   * seeking to next row/column. TODO: estimate them?
110   */
111  private long kvsScanned = 0;
112  private ExtendedCell prevCell = null;
113
114  private final long preadMaxBytes;
115  private long bytesRead;
116
117  /** We don't ever expect to change this, the constant is just for clarity. */
118  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
119  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
120    "hbase.storescanner.parallel.seek.enable";
121
122  /** Used during unit testing to ensure that lazy seek does save seek ops */
123  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
124
125  /**
126   * The number of cells scanned in between timeout checks. Specifying a larger value means that
127   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
128   * timeout checks.
129   */
130  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
131    "hbase.cells.scanned.per.heartbeat.check";
132
133  /**
134   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
135   */
136  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
137
138  /**
139   * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
140   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
141   * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT,
142   * we will open scanner with stream mode itself.
143   */
144  public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
145
146  private final Scan.ReadType readType;
147
148  // A flag whether use pread for scan
149  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
150  private boolean scanUsePread;
151  // Indicates whether there was flush during the course of the scan
152  private volatile boolean flushed = false;
153  // generally we get one file from a flush
154  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
155  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
156  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
157  // The current list of scanners
158  final List<KeyValueScanner> currentScanners = new ArrayList<>();
159  // flush update lock
160  private final ReentrantLock flushLock = new ReentrantLock();
161  // lock for closing.
162  private final ReentrantLock closeLock = new ReentrantLock();
163
164  protected final long readPt;
165  private boolean topChanged = false;
166
167  /** An internal constructor. */
168  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
169    boolean cacheBlocks, ScanType scanType) {
170    this.readPt = readPt;
171    this.store = store;
172    this.cacheBlocks = cacheBlocks;
173    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
174    get = scan.isGetScan();
175    explicitColumnQuery = numColumns > 0;
176    this.scan = scan;
177    this.now = EnvironmentEdgeManager.currentTime();
178    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
179    this.minVersions = scanInfo.getMinVersions();
180
181    // We look up row-column Bloom filters for multi-column queries as part of
182    // the seek operation. However, we also look the row-column Bloom filter
183    // for multi-row (non-"get") scans because this is not done in
184    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
185    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null
186      || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
187    this.maxRowSize = scanInfo.getTableMaxRowSize();
188    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
189    if (get) {
190      this.readType = Scan.ReadType.PREAD;
191      this.scanUsePread = true;
192    } else if (scanType != ScanType.USER_SCAN) {
193      // For compaction scanners never use Pread as already we have stream based scanners on the
194      // store files to be compacted
195      this.readType = Scan.ReadType.STREAM;
196      this.scanUsePread = false;
197    } else {
198      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
199        if (scanInfo.isUsePread()) {
200          this.readType = Scan.ReadType.PREAD;
201        } else if (this.preadMaxBytes < 0) {
202          this.readType = Scan.ReadType.STREAM;
203        } else {
204          this.readType = Scan.ReadType.DEFAULT;
205        }
206      } else {
207        this.readType = scan.getReadType();
208      }
209      // Always start with pread unless user specific stream. Will change to stream later if
210      // readType is default if the scan keeps running for a long time.
211      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
212    }
213    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
214    // Parallel seeking is on if the config allows and more there is more than one store file.
215    if (store != null && store.getStorefilesCount() > 1) {
216      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
217      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
218        this.parallelSeekEnabled = true;
219        this.executor = rsService.getExecutorService();
220      }
221    }
222  }
223
224  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
225    this.currentScanners.addAll(scanners);
226  }
227
228  private static boolean isOnlyLatestVersionScan(Scan scan) {
229    // No need to check for Scan#getMaxVersions because live version files generated by store file
230    // writer retains max versions specified in ColumnFamilyDescriptor for the given CF
231    return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
232  }
233
234  /**
235   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a
236   * compaction.
237   * @param store   who we scan
238   * @param scan    the spec
239   * @param columns which columns we are scanning
240   */
241  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
242    long readPt) throws IOException {
243    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
244      ScanType.USER_SCAN);
245    if (columns != null && scan.isRaw()) {
246      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
247    }
248    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
249      store.getCoprocessorHost());
250
251    store.addChangedReaderObserver(this);
252
253    List<KeyValueScanner> scanners = null;
254    try {
255      // Pass columns to try to filter out unnecessary StoreFiles.
256      scanners = selectScannersFrom(store,
257        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
258          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt,
259          isOnlyLatestVersionScan(scan)));
260
261      // Seek all scanners to the start of the Row (or if the exact matching row
262      // key does not exist, then to the start of the next matching Row).
263      // Always check bloom filter to optimize the top row seek for delete
264      // family marker.
265      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
266        parallelSeekEnabled);
267
268      // set storeLimit
269      this.storeLimit = scan.getMaxResultsPerColumnFamily();
270
271      // set rowOffset
272      this.storeOffset = scan.getRowOffsetPerColumnFamily();
273      addCurrentScanners(scanners);
274      // Combine all seeked scanners with a heap
275      resetKVHeap(scanners, comparator);
276    } catch (IOException e) {
277      clearAndClose(scanners);
278      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
279      // and might cause memory leak
280      store.deleteChangedReaderObserver(this);
281      throw e;
282    }
283  }
284
285  // a dummy scan instance for compaction.
286  private static final Scan SCAN_FOR_COMPACTION = new Scan();
287
288  /**
289   * Used for store file compaction and memstore compaction.
290   * <p>
291   * Opens a scanner across specified StoreFiles/MemStoreSegments.
292   * @param store             who we scan
293   * @param scanners          ancillary scanners
294   * @param smallestReadPoint the readPoint that we should use for tracking versions
295   */
296  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
297    ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
298    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
299  }
300
301  /**
302   * Used for compactions that drop deletes from a limited range of rows.
303   * <p>
304   * Opens a scanner across specified StoreFiles.
305   * @param store              who we scan
306   * @param scanners           ancillary scanners
307   * @param smallestReadPoint  the readPoint that we should use for tracking versions
308   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
309   * @param dropDeletesToRow   The exclusive right bound of the range; can be EMPTY_END_ROW.
310   */
311  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
312    long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)
313    throws IOException {
314    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
315      earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
316  }
317
318  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
319    ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
320    byte[] dropDeletesToRow) throws IOException {
321    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
322      store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
323    assert scanType != ScanType.USER_SCAN;
324    matcher =
325      CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
326        oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
327
328    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
329    scanners = selectScannersFrom(store, scanners);
330
331    // Seek all scanners to the initial key
332    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
333    addCurrentScanners(scanners);
334    // Combine all seeked scanners with a heap
335    resetKVHeap(scanners, comparator);
336  }
337
338  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
339    throws IOException {
340    // Seek all scanners to the initial key
341    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
342    addCurrentScanners(scanners);
343    resetKVHeap(scanners, comparator);
344  }
345
346  // For mob compaction only as we do not have a Store instance when doing mob compaction.
347  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
348    List<? extends KeyValueScanner> scanners) throws IOException {
349    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
350    assert scanType != ScanType.USER_SCAN;
351    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
352      oldestUnexpiredTS, now, null, null, null);
353    seekAllScanner(scanInfo, scanners);
354  }
355
356  // Used to instantiate a scanner for user scan in test
357  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
358    List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
359    // 0 is passed as readpoint because the test bypasses Store
360    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
361      scanType);
362    if (scanType == ScanType.USER_SCAN) {
363      this.matcher =
364        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
365    } else {
366      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
367        PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
368    }
369    seekAllScanner(scanInfo, scanners);
370  }
371
372  // Used to instantiate a scanner for user scan in test
373  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
374    List<? extends KeyValueScanner> scanners) throws IOException {
375    // 0 is passed as readpoint because the test bypasses Store
376    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
377      ScanType.USER_SCAN);
378    this.matcher =
379      UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
380    seekAllScanner(scanInfo, scanners);
381  }
382
383  // Used to instantiate a scanner for compaction in test
384  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
385    List<? extends KeyValueScanner> scanners) throws IOException {
386    // 0 is passed as readpoint because the test bypasses Store
387    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,
388      scanInfo, 0, 0L, false, scanType);
389    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
390      PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
391    seekAllScanner(scanInfo, scanners);
392  }
393
394  boolean isScanUsePread() {
395    return this.scanUsePread;
396  }
397
398  /**
399   * Seek the specified scanners with the given key
400   * @param isLazy         true if using lazy seek
401   * @param isParallelSeek true if using parallel seek
402   */
403  protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey,
404    boolean isLazy, boolean isParallelSeek) throws IOException {
405    // Seek all scanners to the start of the Row (or if the exact matching row
406    // key does not exist, then to the start of the next matching Row).
407    // Always check bloom filter to optimize the top row seek for delete
408    // family marker.
409    if (isLazy) {
410      for (KeyValueScanner scanner : scanners) {
411        scanner.requestSeek(seekKey, false, true);
412      }
413    } else {
414      if (!isParallelSeek) {
415        long totalScannersSoughtBytes = 0;
416        for (KeyValueScanner scanner : scanners) {
417          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
418            throw new RowTooBigException(
419              "Max row size allowed: " + maxRowSize + ", but row is bigger than that");
420          }
421          scanner.seek(seekKey);
422          Cell c = scanner.peek();
423          if (c != null) {
424            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
425          }
426        }
427      } else {
428        parallelSeek(scanners, seekKey);
429      }
430    }
431  }
432
433  protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
434    throws IOException {
435    // Combine all seeked scanners with a heap
436    heap = newKVHeap(scanners, comparator);
437  }
438
439  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
440    CellComparator comparator) throws IOException {
441    return new KeyValueHeap(scanners, comparator);
442  }
443
444  /**
445   * Filters the given list of scanners using Bloom filter, time range, and TTL.
446   * <p>
447   * Will be overridden by testcase so declared as protected.
448   */
449  protected List<KeyValueScanner> selectScannersFrom(HStore store,
450    List<? extends KeyValueScanner> allScanners) {
451    boolean memOnly;
452    boolean filesOnly;
453    if (scan instanceof InternalScan) {
454      InternalScan iscan = (InternalScan) scan;
455      memOnly = iscan.isCheckOnlyMemStore();
456      filesOnly = iscan.isCheckOnlyStoreFiles();
457    } else {
458      memOnly = false;
459      filesOnly = false;
460    }
461
462    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
463
464    // We can only exclude store files based on TTL if minVersions is set to 0.
465    // Otherwise, we might have to return KVs that have technically expired.
466    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
467
468    // include only those scan files which pass all filters
469    for (KeyValueScanner kvs : allScanners) {
470      boolean isFile = kvs.isFileScanner();
471      if ((!isFile && filesOnly) || (isFile && memOnly)) {
472        kvs.close();
473        continue;
474      }
475
476      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
477        scanners.add(kvs);
478      } else {
479        kvs.close();
480      }
481    }
482    return scanners;
483  }
484
485  @Override
486  public ExtendedCell peek() {
487    return heap != null ? heap.peek() : null;
488  }
489
490  @Override
491  public KeyValue next() {
492    // throw runtime exception perhaps?
493    throw new RuntimeException("Never call StoreScanner.next()");
494  }
495
496  @Override
497  public void close() {
498    close(true);
499  }
500
501  private void close(boolean withDelayedScannersClose) {
502    closeLock.lock();
503    // If the closeLock is acquired then any subsequent updateReaders()
504    // call is ignored.
505    try {
506      if (this.closing) {
507        return;
508      }
509      if (withDelayedScannersClose) {
510        this.closing = true;
511      }
512      // For mob compaction, we do not have a store.
513      if (this.store != null) {
514        this.store.deleteChangedReaderObserver(this);
515      }
516      if (withDelayedScannersClose) {
517        clearAndClose(scannersForDelayedClose);
518        clearAndClose(memStoreScannersAfterFlush);
519        clearAndClose(flushedstoreFileScanners);
520        if (this.heap != null) {
521          this.heap.close();
522          this.currentScanners.clear();
523          this.heap = null; // CLOSED!
524        }
525      } else {
526        if (this.heap != null) {
527          this.scannersForDelayedClose.add(this.heap);
528          this.currentScanners.clear();
529          this.heap = null;
530        }
531      }
532    } finally {
533      closeLock.unlock();
534    }
535  }
536
537  @Override
538  public boolean seek(ExtendedCell key) throws IOException {
539    if (checkFlushed()) {
540      reopenAfterFlush();
541    }
542    return this.heap.seek(key);
543  }
544
545  /**
546   * Get the next row of values from this Store.
547   * @return true if there are more rows, false if scanner is done
548   */
549  @Override
550  public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext)
551    throws IOException {
552    if (scannerContext == null) {
553      throw new IllegalArgumentException("Scanner context cannot be null");
554    }
555    if (checkFlushed() && reopenAfterFlush()) {
556      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
557    }
558
559    // if the heap was left null, then the scanners had previously run out anyways, close and
560    // return.
561    if (this.heap == null) {
562      // By this time partial close should happened because already heap is null
563      close(false);// Do all cleanup except heap.close()
564      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
565    }
566
567    ExtendedCell cell = this.heap.peek();
568    if (cell == null) {
569      close(false);// Do all cleanup except heap.close()
570      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
571    }
572
573    // only call setRow if the row changes; avoids confusing the query matcher
574    // if scanning intra-row
575
576    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
577    // rows. Else it is possible we are still traversing the same row so we must perform the row
578    // comparison.
579    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
580      this.countPerRow = 0;
581      matcher.setToNewRow(cell);
582    }
583
584    // Clear progress away unless invoker has indicated it should be kept.
585    if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
586      scannerContext.clearProgress();
587    }
588
589    Optional<RpcCall> rpcCall =
590      matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
591
592    int count = 0;
593    long totalBytesRead = 0;
594    // track the cells for metrics only if it is a user read request.
595    boolean onlyFromMemstore = matcher.isUserScan();
596    try {
597      LOOP: do {
598        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
599        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
600        // in
601        // the shipped method below.
602        if (
603          kvsScanned % cellsPerHeartbeatCheck == 0
604            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
605        ) {
606          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
607            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
608          }
609        }
610        // Do object compare - we set prevKV from the same heap.
611        if (prevCell != cell) {
612          ++kvsScanned;
613        }
614        checkScanOrder(prevCell, cell, comparator);
615        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
616        bytesRead += cellSize;
617        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
618          // return immediately if we want to switch from pread to stream. We need this because we
619          // can
620          // only switch in the shipped method, if user use a filter to filter out everything and
621          // rpc
622          // timeout is very large then the shipped method will never be called until the whole scan
623          // is finished, but at that time we have already scan all the data...
624          // See HBASE-20457 for more details.
625          // And there is still a scenario that can not be handled. If we have a very large row,
626          // which
627          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
628          // here, we still need to scan all the qualifiers before returning...
629          scannerContext.returnImmediately();
630        }
631
632        heap.recordBlockSize(blockSize -> {
633          if (rpcCall.isPresent()) {
634            rpcCall.get().incrementBlockBytesScanned(blockSize);
635          }
636          scannerContext.incrementBlockProgress(blockSize);
637        });
638
639        prevCell = cell;
640        scannerContext.setLastPeekedCell(cell);
641        topChanged = false;
642        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
643        switch (qcode) {
644          case INCLUDE:
645          case INCLUDE_AND_SEEK_NEXT_ROW:
646          case INCLUDE_AND_SEEK_NEXT_COL:
647            Filter f = matcher.getFilter();
648            if (f != null) {
649              Cell transformedCell = f.transformCell(cell);
650              // fast path, most filters just return the same cell instance
651              if (transformedCell != cell) {
652                if (transformedCell instanceof ExtendedCell) {
653                  cell = (ExtendedCell) transformedCell;
654                } else {
655                  throw new DoNotRetryIOException("Incorrect filter implementation, "
656                    + "the Cell returned by transformCell is not an ExtendedCell. Filter class: "
657                    + f.getClass().getName());
658                }
659              }
660            }
661            this.countPerRow++;
662
663            // add to results only if we have skipped #storeOffset kvs
664            // also update metric accordingly
665            if (this.countPerRow > storeOffset) {
666              outResult.add(cell);
667
668              // Update local tracking information
669              count++;
670              totalBytesRead += cellSize;
671
672              /**
673               * Increment the metric if all the cells are from memstore. If not we will account it
674               * for mixed reads
675               */
676              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
677              // Update the progress of the scanner context
678              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
679              scannerContext.incrementBatchProgress(1);
680
681              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
682                String message = "Max row size allowed: " + maxRowSize
683                  + ", but the row is bigger than that, the row info: "
684                  + CellUtil.toString(cell, false) + ", already have process row cells = "
685                  + outResult.size() + ", it belong to region = "
686                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
687                LOG.warn(message);
688                throw new RowTooBigException(message);
689              }
690
691              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
692                // do what SEEK_NEXT_ROW does.
693                if (!matcher.moreRowsMayExistAfter(cell)) {
694                  close(false);// Do all cleanup except heap.close()
695                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
696                }
697                matcher.clearCurrentRow();
698                seekToNextRow(cell);
699                break LOOP;
700              }
701            }
702
703            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
704              if (!matcher.moreRowsMayExistAfter(cell)) {
705                close(false);// Do all cleanup except heap.close()
706                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
707              }
708              matcher.clearCurrentRow();
709              seekOrSkipToNextRow(cell);
710            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
711              seekOrSkipToNextColumn(cell);
712            } else {
713              this.heap.next();
714            }
715
716            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
717              break LOOP;
718            }
719            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
720              break LOOP;
721            }
722            continue;
723
724          case DONE:
725            // Optimization for Gets! If DONE, no more to get on this row, early exit!
726            if (get) {
727              // Then no more to this row... exit.
728              close(false);// Do all cleanup except heap.close()
729              // update metric
730              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
731            }
732            matcher.clearCurrentRow();
733            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
734
735          case DONE_SCAN:
736            close(false);// Do all cleanup except heap.close()
737            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
738
739          case SEEK_NEXT_ROW:
740            // This is just a relatively simple end of scan fix, to short-cut end
741            // us if there is an endKey in the scan.
742            if (!matcher.moreRowsMayExistAfter(cell)) {
743              close(false);// Do all cleanup except heap.close()
744              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
745            }
746            matcher.clearCurrentRow();
747            seekOrSkipToNextRow(cell);
748            NextState stateAfterSeekNextRow = needToReturn(outResult);
749            if (stateAfterSeekNextRow != null) {
750              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
751            }
752            break;
753
754          case SEEK_NEXT_COL:
755            seekOrSkipToNextColumn(cell);
756            NextState stateAfterSeekNextColumn = needToReturn(outResult);
757            if (stateAfterSeekNextColumn != null) {
758              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
759            }
760            break;
761
762          case SKIP:
763            this.heap.next();
764            break;
765
766          case SEEK_NEXT_USING_HINT:
767            ExtendedCell nextKV = matcher.getNextKeyHint(cell);
768            if (nextKV != null) {
769              int difference = comparator.compare(nextKV, cell);
770              if (
771                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
772              ) {
773                seekAsDirection(nextKV);
774                NextState stateAfterSeekByHint = needToReturn(outResult);
775                if (stateAfterSeekByHint != null) {
776                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
777                }
778                break;
779              }
780            }
781            heap.next();
782            break;
783
784          default:
785            throw new RuntimeException("UNEXPECTED");
786        }
787
788        // One last chance to break due to size limit. The INCLUDE* cases above already check
789        // limit and continue. For the various filtered cases, we need to check because block
790        // size limit may have been exceeded even if we don't add cells to result list.
791        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
792          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
793        }
794      } while ((cell = this.heap.peek()) != null);
795
796      if (count > 0) {
797        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
798      }
799
800      // No more keys
801      close(false);// Do all cleanup except heap.close()
802      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
803    } finally {
804      // increment only if we have some result
805      if (count > 0 && matcher.isUserScan()) {
806        // if true increment memstore metrics, if not the mixed one
807        updateMetricsStore(onlyFromMemstore);
808      }
809    }
810  }
811
812  private void updateMetricsStore(boolean memstoreRead) {
813    if (store != null) {
814      store.updateMetricsStore(memstoreRead);
815    } else {
816      // for testing.
817      if (memstoreRead) {
818        memstoreOnlyReads++;
819      } else {
820        mixedReads++;
821      }
822    }
823  }
824
825  /**
826   * If the top cell won't be flushed into disk, the new top cell may be changed after
827   * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the
828   * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
829   * is changed, we should return the current cells. Otherwise, we may return the cells across
830   * different rows.
831   * @param outResult the cells which are visible for user scan
832   * @return null is the top cell doesn't change. Otherwise, the NextState to return
833   */
834  private NextState needToReturn(List<? super ExtendedCell> outResult) {
835    if (!outResult.isEmpty() && topChanged) {
836      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
837    }
838    return null;
839  }
840
841  private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException {
842    // If it is a Get Scan, then we know that we are done with this row; there are no more
843    // rows beyond the current one: don't try to optimize.
844    if (!get) {
845      if (trySkipToNextRow(cell)) {
846        return;
847      }
848    }
849    seekToNextRow(cell);
850  }
851
852  private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException {
853    if (!trySkipToNextColumn(cell)) {
854      seekAsDirection(matcher.getKeyForNextColumn(cell));
855    }
856  }
857
858  /**
859   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
860   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an
861   * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to
862   * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
863   * current, loaded block). It does this by looking at the next indexed key of the current HFile.
864   * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible
865   * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work
866   * with the current Cell but compare as though it were a seek key; see down in
867   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,
868   * otherwise we just SKIP to the next requested cell.
869   * <p>
870   * Other notes:
871   * <ul>
872   * <li>Rows can straddle block boundaries</li>
873   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
874   * different block than column C1 at T2)</li>
875   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few
876   * SKIPs...</li>
877   * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells,
878   * especially if we know we need to go to the next block.</li>
879   * </ul>
880   * <p>
881   * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll
882   * likely end up seeking to the next block (or past the next block) to get our next column.
883   * Example:
884   *
885   * <pre>
886   * |    BLOCK 1              |     BLOCK 2                   |
887   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
888   *                                   ^         ^
889   *                                   |         |
890   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
891   *
892   *
893   * |    BLOCK 1                       |     BLOCK 2                      |
894   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
895   *                                            ^              ^
896   *                                            |              |
897   *                                    Next Index Key        SEEK_NEXT_COL
898   * </pre>
899   *
900   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
901   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
902   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
903   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
904   * where the SEEK will not land us in the next block, it is very likely better to issues a series
905   * of SKIPs.
906   * @param cell current cell
907   * @return true means skip to next row, false means not
908   */
909  protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException {
910    ExtendedCell nextCell = null;
911    // used to guard against a changed next indexed key by doing a identity comparison
912    // when the identity changes we need to compare the bytes again
913    ExtendedCell previousIndexedKey = null;
914    do {
915      ExtendedCell nextIndexedKey = getNextIndexedKey();
916      if (
917        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
918          && (nextIndexedKey == previousIndexedKey
919            || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)
920      ) {
921        this.heap.next();
922        ++kvsScanned;
923        previousIndexedKey = nextIndexedKey;
924      } else {
925        return false;
926      }
927    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
928    return true;
929  }
930
931  /**
932   * See {@link #trySkipToNextRow(ExtendedCell)}
933   * @param cell current cell
934   * @return true means skip to next column, false means not
935   */
936  protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException {
937    ExtendedCell nextCell = null;
938    // used to guard against a changed next indexed key by doing a identity comparison
939    // when the identity changes we need to compare the bytes again
940    ExtendedCell previousIndexedKey = null;
941    do {
942      ExtendedCell nextIndexedKey = getNextIndexedKey();
943      if (
944        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
945          && (nextIndexedKey == previousIndexedKey
946            || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)
947      ) {
948        this.heap.next();
949        ++kvsScanned;
950        previousIndexedKey = nextIndexedKey;
951      } else {
952        return false;
953      }
954    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
955    // We need this check because it may happen that the new scanner that we get
956    // during heap.next() is requiring reseek due of fake KV previously generated for
957    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
958    if (
959      useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP
960    ) {
961      return false;
962    }
963    return true;
964  }
965
966  @Override
967  public long getReadPoint() {
968    return this.readPt;
969  }
970
971  private static void clearAndClose(List<KeyValueScanner> scanners) {
972    if (scanners == null) {
973      return;
974    }
975    for (KeyValueScanner s : scanners) {
976      s.close();
977    }
978    scanners.clear();
979  }
980
981  // Implementation of ChangedReadersObserver
982  @Override
983  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
984    throws IOException {
985    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
986      return;
987    }
988    boolean updateReaders = false;
989    flushLock.lock();
990    try {
991      if (!closeLock.tryLock()) {
992        // The reason for doing this is that when the current store scanner does not retrieve
993        // any new cells, then the scanner is considered to be done. The heap of this scanner
994        // is not closed till the shipped() call is completed. Hence in that case if at all
995        // the partial close (close (false)) has been called before updateReaders(), there is no
996        // need for the updateReaders() to happen.
997        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
998        // no lock acquired.
999        clearAndClose(memStoreScanners);
1000        return;
1001      }
1002      // lock acquired
1003      updateReaders = true;
1004      if (this.closing) {
1005        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
1006        clearAndClose(memStoreScanners);
1007        return;
1008      }
1009      flushed = true;
1010      final boolean isCompaction = false;
1011      boolean usePread = get || scanUsePread;
1012      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
1013      // calls next(). So its better we create scanners here rather than next() call. Ensure
1014      // these scanners are properly closed() whether or not the scan is completed successfully
1015      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
1016      // store files. In case of stream scanners this eager creation does not induce performance
1017      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
1018      List<KeyValueScanner> scanners =
1019        store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher,
1020          scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan));
1021      flushedstoreFileScanners.addAll(scanners);
1022      if (!CollectionUtils.isEmpty(memStoreScanners)) {
1023        clearAndClose(memStoreScannersAfterFlush);
1024        memStoreScannersAfterFlush.addAll(memStoreScanners);
1025      }
1026    } finally {
1027      flushLock.unlock();
1028      if (updateReaders) {
1029        closeLock.unlock();
1030      }
1031    }
1032    // Let the next() call handle re-creating and seeking
1033  }
1034
1035  /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */
1036  protected final boolean reopenAfterFlush() throws IOException {
1037    // here we can make sure that we have a Store instance so no null check on store.
1038    ExtendedCell lastTop = heap.peek();
1039    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
1040    // scanners? We did so in the constructor and we could have done it now by storing the scan
1041    // object from the constructor
1042    List<KeyValueScanner> scanners;
1043    flushLock.lock();
1044    try {
1045      List<KeyValueScanner> allScanners =
1046        new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1047      allScanners.addAll(flushedstoreFileScanners);
1048      allScanners.addAll(memStoreScannersAfterFlush);
1049      scanners = selectScannersFrom(store, allScanners);
1050      // Clear the current set of flushed store files scanners so that they don't get added again
1051      flushedstoreFileScanners.clear();
1052      memStoreScannersAfterFlush.clear();
1053    } finally {
1054      flushLock.unlock();
1055    }
1056
1057    // Seek the new scanners to the last key
1058    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1059    // remove the older memstore scanner
1060    for (int i = currentScanners.size() - 1; i >= 0; i--) {
1061      if (!currentScanners.get(i).isFileScanner()) {
1062        scannersForDelayedClose.add(currentScanners.remove(i));
1063      } else {
1064        // we add the memstore scanner to the end of currentScanners
1065        break;
1066      }
1067    }
1068    // add the newly created scanners on the flushed files and the current active memstore scanner
1069    addCurrentScanners(scanners);
1070    // Combine all seeked scanners with a heap
1071    resetKVHeap(this.currentScanners, store.getComparator());
1072    resetQueryMatcher(lastTop);
1073    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1074      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()
1075        + ",and after = " + heap.peek());
1076      topChanged = true;
1077    } else {
1078      topChanged = false;
1079    }
1080    return topChanged;
1081  }
1082
1083  private void resetQueryMatcher(ExtendedCell lastTopKey) {
1084    // Reset the state of the Query Matcher and set to top row.
1085    // Only reset and call setRow if the row changes; avoids confusing the
1086    // query matcher if scanning intra-row.
1087    ExtendedCell cell = heap.peek();
1088    if (cell == null) {
1089      cell = lastTopKey;
1090    }
1091    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1092      this.countPerRow = 0;
1093      // The setToNewRow will call reset internally
1094      matcher.setToNewRow(cell);
1095    }
1096  }
1097
1098  /**
1099   * Check whether scan as expected order
1100   */
1101  protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)
1102    throws IOException {
1103    // Check that the heap gives us KVs in an increasing order.
1104    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0
1105      : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1106  }
1107
1108  protected boolean seekToNextRow(ExtendedCell c) throws IOException {
1109    return reseek(PrivateCellUtil.createLastOnRow(c));
1110  }
1111
1112  /**
1113   * Do a reseek in a normal StoreScanner(scan forward)
1114   * @return true if scanner has values left, false if end of scanner
1115   */
1116  protected boolean seekAsDirection(ExtendedCell kv) throws IOException {
1117    return reseek(kv);
1118  }
1119
1120  @Override
1121  public boolean reseek(ExtendedCell kv) throws IOException {
1122    if (checkFlushed()) {
1123      reopenAfterFlush();
1124    }
1125    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1126      return heap.requestSeek(kv, true, useRowColBloom);
1127    }
1128    return heap.reseek(kv);
1129  }
1130
1131  void trySwitchToStreamRead() {
1132    if (
1133      readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
1134        || bytesRead < preadMaxBytes
1135    ) {
1136      return;
1137    }
1138    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1139      this.store.getColumnFamilyName());
1140    scanUsePread = false;
1141    ExtendedCell lastTop = heap.peek();
1142    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1143    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1144    for (KeyValueScanner kvs : currentScanners) {
1145      if (!kvs.isFileScanner()) {
1146        // collect memstorescanners here
1147        memstoreScanners.add(kvs);
1148      } else {
1149        scannersToClose.add(kvs);
1150      }
1151    }
1152    List<KeyValueScanner> fileScanners = null;
1153    List<KeyValueScanner> newCurrentScanners;
1154    KeyValueHeap newHeap;
1155    try {
1156      // We must have a store instance here so no null check
1157      // recreate the scanners on the current file scanners
1158      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,
1159        scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
1160        readPt, false);
1161      if (fileScanners == null) {
1162        return;
1163      }
1164      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1165      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1166      newCurrentScanners.addAll(fileScanners);
1167      newCurrentScanners.addAll(memstoreScanners);
1168      newHeap = newKVHeap(newCurrentScanners, comparator);
1169    } catch (Exception e) {
1170      LOG.warn("failed to switch to stream read", e);
1171      if (fileScanners != null) {
1172        fileScanners.forEach(KeyValueScanner::close);
1173      }
1174      return;
1175    }
1176    currentScanners.clear();
1177    addCurrentScanners(newCurrentScanners);
1178    this.heap = newHeap;
1179    resetQueryMatcher(lastTop);
1180    scannersToClose.forEach(KeyValueScanner::close);
1181  }
1182
1183  protected final boolean checkFlushed() {
1184    // check the var without any lock. Suppose even if we see the old
1185    // value here still it is ok to continue because we will not be resetting
1186    // the heap but will continue with the referenced memstore's snapshot. For compactions
1187    // any way we don't need the updateReaders at all to happen as we still continue with
1188    // the older files
1189    if (flushed) {
1190      // If there is a flush and the current scan is notified on the flush ensure that the
1191      // scan's heap gets reset and we do a seek on the newly flushed file.
1192      if (this.closing) {
1193        return false;
1194      }
1195      // reset the flag
1196      flushed = false;
1197      return true;
1198    }
1199    return false;
1200  }
1201
1202  /**
1203   * Seek storefiles in parallel to optimize IO latency as much as possible
1204   * @param scanners the list {@link KeyValueScanner}s to be read from
1205   * @param kv       the KeyValue on which the operation is being requested
1206   */
1207  private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv)
1208    throws IOException {
1209    if (scanners.isEmpty()) return;
1210    int storeFileScannerCount = scanners.size();
1211    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1212    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1213    for (KeyValueScanner scanner : scanners) {
1214      if (scanner instanceof StoreFileScanner) {
1215        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);
1216        executor.submit(seekHandler);
1217        handlers.add(seekHandler);
1218      } else {
1219        scanner.seek(kv);
1220        latch.countDown();
1221      }
1222    }
1223
1224    try {
1225      latch.await();
1226    } catch (InterruptedException ie) {
1227      throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
1228    }
1229
1230    for (ParallelSeekHandler handler : handlers) {
1231      if (handler.getErr() != null) {
1232        throw new IOException(handler.getErr());
1233      }
1234    }
1235  }
1236
1237  /**
1238   * Used in testing.
1239   * @return all scanners in no particular order
1240   */
1241  List<KeyValueScanner> getAllScannersForTesting() {
1242    List<KeyValueScanner> allScanners = new ArrayList<>();
1243    KeyValueScanner current = heap.getCurrentForTesting();
1244    if (current != null) allScanners.add(current);
1245    for (KeyValueScanner scanner : heap.getHeap())
1246      allScanners.add(scanner);
1247    return allScanners;
1248  }
1249
1250  static void enableLazySeekGlobally(boolean enable) {
1251    lazySeekEnabledGlobally = enable;
1252  }
1253
1254  /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */
1255  public long getEstimatedNumberOfKvsScanned() {
1256    return this.kvsScanned;
1257  }
1258
1259  @Override
1260  public ExtendedCell getNextIndexedKey() {
1261    return this.heap.getNextIndexedKey();
1262  }
1263
1264  @Override
1265  public void shipped() throws IOException {
1266    if (prevCell != null) {
1267      // Do the copy here so that in case the prevCell ref is pointing to the previous
1268      // blocks we can safely release those blocks.
1269      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1270      // fetched from HDFS. Copying this would ensure that we let go the references to these
1271      // blocks so that they can be GCed safely(in case of bucket cache)
1272      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1273    }
1274    matcher.beforeShipped();
1275    // There wont be further fetch of Cells from these scanners. Just close.
1276    clearAndClose(scannersForDelayedClose);
1277    if (this.heap != null) {
1278      this.heap.shipped();
1279      // When switching from pread to stream, we will open a new scanner for each store file, but
1280      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1281      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1282      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1283      // method, so we here will also open new scanners and close old scanners in shipped method.
1284      // See HBASE-18055 for more details.
1285      trySwitchToStreamRead();
1286    }
1287  }
1288}