001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.NavigableSet;
026import java.util.OptionalInt;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.KeyValueUtil;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.executor.ExecutorService;
041import org.apache.hadoop.hbase.filter.Filter;
042import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
043import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
044import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
045import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
046import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
047import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
048import org.apache.hadoop.hbase.util.CollectionUtils;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055
056/**
057 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
058 * for a single row.
059 * <p>
060 * The implementation is not thread safe. So there will be no race between next and close. The only
061 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
062 * is a flush.
063 */
064@InterfaceAudience.Private
065public class StoreScanner extends NonReversedNonLazyKeyValueScanner
066    implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
067  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
068  // In unit tests, the store could be null
069  protected final HStore store;
070  private final CellComparator comparator;
071  private ScanQueryMatcher matcher;
072  protected KeyValueHeap heap;
073  private boolean cacheBlocks;
074
075  private long countPerRow = 0;
076  private int storeLimit = -1;
077  private int storeOffset = 0;
078
079  // Used to indicate that the scanner has closed (see HBASE-1107)
080  private volatile boolean closing = false;
081  private final boolean get;
082  private final boolean explicitColumnQuery;
083  private final boolean useRowColBloom;
084  /**
085   * A flag that enables StoreFileScanner parallel-seeking
086   */
087  private boolean parallelSeekEnabled = false;
088  private ExecutorService executor;
089  private final Scan scan;
090  private final long oldestUnexpiredTS;
091  private final long now;
092  private final int minVersions;
093  private final long maxRowSize;
094  private final long cellsPerHeartbeatCheck;
095
096  // 1) Collects all the KVHeap that are eagerly getting closed during the
097  //    course of a scan
098  // 2) Collects the unused memstore scanners. If we close the memstore scanners
099  //    before sending data to client, the chunk may be reclaimed by other
100  //    updates and the data will be corrupt.
101  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
102
103  /**
104   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
105   * KVs skipped via seeking to next row/column. TODO: estimate them?
106   */
107  private long kvsScanned = 0;
108  private Cell prevCell = null;
109
110  private final long preadMaxBytes;
111  private long bytesRead;
112
113  /** We don't ever expect to change this, the constant is just for clarity. */
114  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
115  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
116      "hbase.storescanner.parallel.seek.enable";
117
118  /** Used during unit testing to ensure that lazy seek does save seek ops */
119  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
120
121  /**
122   * The number of cells scanned in between timeout checks. Specifying a larger value means that
123   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
124   * timeout checks.
125   */
126  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
127      "hbase.cells.scanned.per.heartbeat.check";
128
129  /**
130   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
131   */
132  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
133
134  /**
135   * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
136   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
137   * block size for this store.
138   */
139  public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
140
141  private final Scan.ReadType readType;
142
143  // A flag whether use pread for scan
144  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
145  private boolean scanUsePread;
146  // Indicates whether there was flush during the course of the scan
147  private volatile boolean flushed = false;
148  // generally we get one file from a flush
149  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
150  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
151  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
152  // The current list of scanners
153  @VisibleForTesting
154  final List<KeyValueScanner> currentScanners = new ArrayList<>();
155  // flush update lock
156  private final ReentrantLock flushLock = new ReentrantLock();
157  // lock for closing.
158  private final ReentrantLock closeLock = new ReentrantLock();
159
160  protected final long readPt;
161  private boolean topChanged = false;
162
163  /** An internal constructor. */
164  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo,
165      int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
166    this.readPt = readPt;
167    this.store = store;
168    this.cacheBlocks = cacheBlocks;
169    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
170    get = scan.isGetScan();
171    explicitColumnQuery = numColumns > 0;
172    this.scan = scan;
173    this.now = EnvironmentEdgeManager.currentTime();
174    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
175    this.minVersions = scanInfo.getMinVersions();
176
177     // We look up row-column Bloom filters for multi-column queries as part of
178     // the seek operation. However, we also look the row-column Bloom filter
179     // for multi-row (non-"get") scans because this is not done in
180     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
181     this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1);
182     this.maxRowSize = scanInfo.getTableMaxRowSize();
183    if (get) {
184      this.readType = Scan.ReadType.PREAD;
185      this.scanUsePread = true;
186    } else if(scanType != ScanType.USER_SCAN) {
187      // For compaction scanners never use Pread as already we have stream based scanners on the
188      // store files to be compacted
189      this.readType = Scan.ReadType.STREAM;
190      this.scanUsePread = false;
191    } else {
192      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
193        this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT;
194      } else {
195        this.readType = scan.getReadType();
196      }
197      // Always start with pread unless user specific stream. Will change to stream later if
198      // readType is default if the scan keeps running for a long time.
199      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
200    }
201    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
202    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
203    // Parallel seeking is on if the config allows and more there is more than one store file.
204    if (store != null && store.getStorefilesCount() > 1) {
205      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
206      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
207        this.parallelSeekEnabled = true;
208        this.executor = rsService.getExecutorService();
209      }
210    }
211  }
212
213  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
214    this.currentScanners.addAll(scanners);
215  }
216
217  /**
218   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
219   * are not in a compaction.
220   *
221   * @param store who we scan
222   * @param scan the spec
223   * @param columns which columns we are scanning
224   * @throws IOException
225   */
226  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
227      long readPt) throws IOException {
228    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt,
229        scan.getCacheBlocks(), ScanType.USER_SCAN);
230    if (columns != null && scan.isRaw()) {
231      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
232    }
233    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
234      store.getCoprocessorHost());
235
236    store.addChangedReaderObserver(this);
237
238    try {
239      // Pass columns to try to filter out unnecessary StoreFiles.
240      List<KeyValueScanner> scanners = selectScannersFrom(store,
241        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
242          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
243
244      // Seek all scanners to the start of the Row (or if the exact matching row
245      // key does not exist, then to the start of the next matching Row).
246      // Always check bloom filter to optimize the top row seek for delete
247      // family marker.
248      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
249        parallelSeekEnabled);
250
251      // set storeLimit
252      this.storeLimit = scan.getMaxResultsPerColumnFamily();
253
254      // set rowOffset
255      this.storeOffset = scan.getRowOffsetPerColumnFamily();
256      addCurrentScanners(scanners);
257      // Combine all seeked scanners with a heap
258      resetKVHeap(scanners, comparator);
259    } catch (IOException e) {
260      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
261      // and might cause memory leak
262      store.deleteChangedReaderObserver(this);
263      throw e;
264    }
265  }
266
267  // a dummy scan instance for compaction.
268  private static final Scan SCAN_FOR_COMPACTION = new Scan();
269
270  /**
271   * Used for store file compaction and memstore compaction.
272   * <p>
273   * Opens a scanner across specified StoreFiles/MemStoreSegments.
274   * @param store who we scan
275   * @param scanners ancillary scanners
276   * @param smallestReadPoint the readPoint that we should use for tracking versions
277   */
278  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
279      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
280    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
281  }
282
283  /**
284   * Used for compactions that drop deletes from a limited range of rows.
285   * <p>
286   * Opens a scanner across specified StoreFiles.
287   * @param store who we scan
288   * @param scanners ancillary scanners
289   * @param smallestReadPoint the readPoint that we should use for tracking versions
290   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
291   * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
292   */
293  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
294      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
295      byte[] dropDeletesToRow) throws IOException {
296    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
297        earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
298  }
299
300  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
301      ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
302      byte[] dropDeletesToRow) throws IOException {
303    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
304        store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
305    assert scanType != ScanType.USER_SCAN;
306    matcher =
307        CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
308          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
309
310    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
311    scanners = selectScannersFrom(store, scanners);
312
313    // Seek all scanners to the initial key
314    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
315    addCurrentScanners(scanners);
316    // Combine all seeked scanners with a heap
317    resetKVHeap(scanners, comparator);
318  }
319
320  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
321      throws IOException {
322    // Seek all scanners to the initial key
323    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
324    addCurrentScanners(scanners);
325    resetKVHeap(scanners, comparator);
326  }
327
328  // For mob compaction only as we do not have a Store instance when doing mob compaction.
329  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
330      List<? extends KeyValueScanner> scanners) throws IOException {
331    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
332    assert scanType != ScanType.USER_SCAN;
333    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
334      oldestUnexpiredTS, now, null, null, null);
335    seekAllScanner(scanInfo, scanners);
336  }
337
338  // Used to instantiate a scanner for user scan in test
339  @VisibleForTesting
340  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
341      List<? extends KeyValueScanner> scanners) throws IOException {
342    // 0 is passed as readpoint because the test bypasses Store
343    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L,
344        scan.getCacheBlocks(), ScanType.USER_SCAN);
345    this.matcher =
346        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
347    seekAllScanner(scanInfo, scanners);
348  }
349
350  // Used to instantiate a scanner for compaction in test
351  @VisibleForTesting
352  StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType,
353      List<? extends KeyValueScanner> scanners) throws IOException {
354    // 0 is passed as readpoint because the test bypasses Store
355    this(null, maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
356        : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
357    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
358      HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
359    seekAllScanner(scanInfo, scanners);
360  }
361
362  @VisibleForTesting
363  boolean isScanUsePread() {
364    return this.scanUsePread;
365  }
366  /**
367   * Seek the specified scanners with the given key
368   * @param scanners
369   * @param seekKey
370   * @param isLazy true if using lazy seek
371   * @param isParallelSeek true if using parallel seek
372   * @throws IOException
373   */
374  protected void seekScanners(List<? extends KeyValueScanner> scanners,
375      Cell seekKey, boolean isLazy, boolean isParallelSeek)
376      throws IOException {
377    // Seek all scanners to the start of the Row (or if the exact matching row
378    // key does not exist, then to the start of the next matching Row).
379    // Always check bloom filter to optimize the top row seek for delete
380    // family marker.
381    if (isLazy) {
382      for (KeyValueScanner scanner : scanners) {
383        scanner.requestSeek(seekKey, false, true);
384      }
385    } else {
386      if (!isParallelSeek) {
387        long totalScannersSoughtBytes = 0;
388        for (KeyValueScanner scanner : scanners) {
389          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
390            throw new RowTooBigException("Max row size allowed: " + maxRowSize
391              + ", but row is bigger than that");
392          }
393          scanner.seek(seekKey);
394          Cell c = scanner.peek();
395          if (c != null) {
396            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
397          }
398        }
399      } else {
400        parallelSeek(scanners, seekKey);
401      }
402    }
403  }
404
405  @VisibleForTesting
406  protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
407      CellComparator comparator) throws IOException {
408    // Combine all seeked scanners with a heap
409    heap = newKVHeap(scanners, comparator);
410  }
411
412  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
413      CellComparator comparator) throws IOException {
414    return new KeyValueHeap(scanners, comparator);
415  }
416
417  /**
418   * Filters the given list of scanners using Bloom filter, time range, and TTL.
419   * <p>
420   * Will be overridden by testcase so declared as protected.
421   */
422  @VisibleForTesting
423  protected List<KeyValueScanner> selectScannersFrom(HStore store,
424      List<? extends KeyValueScanner> allScanners) {
425    boolean memOnly;
426    boolean filesOnly;
427    if (scan instanceof InternalScan) {
428      InternalScan iscan = (InternalScan) scan;
429      memOnly = iscan.isCheckOnlyMemStore();
430      filesOnly = iscan.isCheckOnlyStoreFiles();
431    } else {
432      memOnly = false;
433      filesOnly = false;
434    }
435
436    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
437
438    // We can only exclude store files based on TTL if minVersions is set to 0.
439    // Otherwise, we might have to return KVs that have technically expired.
440    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
441
442    // include only those scan files which pass all filters
443    for (KeyValueScanner kvs : allScanners) {
444      boolean isFile = kvs.isFileScanner();
445      if ((!isFile && filesOnly) || (isFile && memOnly)) {
446        continue;
447      }
448
449      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
450        scanners.add(kvs);
451      } else {
452        kvs.close();
453      }
454    }
455    return scanners;
456  }
457
458  @Override
459  public Cell peek() {
460    return heap != null ? heap.peek() : null;
461  }
462
463  @Override
464  public KeyValue next() {
465    // throw runtime exception perhaps?
466    throw new RuntimeException("Never call StoreScanner.next()");
467  }
468
469  @Override
470  public void close() {
471    close(true);
472  }
473
474  private void close(boolean withDelayedScannersClose) {
475    closeLock.lock();
476    // If the closeLock is acquired then any subsequent updateReaders()
477    // call is ignored.
478    try {
479      if (this.closing) {
480        return;
481      }
482      if (withDelayedScannersClose) {
483        this.closing = true;
484      }
485      // For mob compaction, we do not have a store.
486      if (this.store != null) {
487        this.store.deleteChangedReaderObserver(this);
488      }
489      if (withDelayedScannersClose) {
490        clearAndClose(scannersForDelayedClose);
491        clearAndClose(memStoreScannersAfterFlush);
492        clearAndClose(flushedstoreFileScanners);
493        if (this.heap != null) {
494          this.heap.close();
495          this.currentScanners.clear();
496          this.heap = null; // CLOSED!
497        }
498      } else {
499        if (this.heap != null) {
500          this.scannersForDelayedClose.add(this.heap);
501          this.currentScanners.clear();
502          this.heap = null;
503        }
504      }
505    } finally {
506      closeLock.unlock();
507    }
508  }
509
510  @Override
511  public boolean seek(Cell key) throws IOException {
512    if (checkFlushed()) {
513      reopenAfterFlush();
514    }
515    return this.heap.seek(key);
516  }
517
518  /**
519   * Get the next row of values from this Store.
520   * @param outResult
521   * @param scannerContext
522   * @return true if there are more rows, false if scanner is done
523   */
524  @Override
525  public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
526    if (scannerContext == null) {
527      throw new IllegalArgumentException("Scanner context cannot be null");
528    }
529    if (checkFlushed() && reopenAfterFlush()) {
530      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
531    }
532
533    // if the heap was left null, then the scanners had previously run out anyways, close and
534    // return.
535    if (this.heap == null) {
536      // By this time partial close should happened because already heap is null
537      close(false);// Do all cleanup except heap.close()
538      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
539    }
540
541    Cell cell = this.heap.peek();
542    if (cell == null) {
543      close(false);// Do all cleanup except heap.close()
544      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
545    }
546
547    // only call setRow if the row changes; avoids confusing the query matcher
548    // if scanning intra-row
549
550    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
551    // rows. Else it is possible we are still traversing the same row so we must perform the row
552    // comparison.
553    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
554      this.countPerRow = 0;
555      matcher.setToNewRow(cell);
556    }
557
558    // Clear progress away unless invoker has indicated it should be kept.
559    if (!scannerContext.getKeepProgress()) {
560      scannerContext.clearProgress();
561    }
562
563    int count = 0;
564    long totalBytesRead = 0;
565
566    LOOP: do {
567      // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
568      if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
569        if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
570          return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
571        }
572      }
573      // Do object compare - we set prevKV from the same heap.
574      if (prevCell != cell) {
575        ++kvsScanned;
576      }
577      checkScanOrder(prevCell, cell, comparator);
578      int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
579      bytesRead += cellSize;
580      prevCell = cell;
581      scannerContext.setLastPeekedCell(cell);
582      topChanged = false;
583      ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
584      switch (qcode) {
585        case INCLUDE:
586        case INCLUDE_AND_SEEK_NEXT_ROW:
587        case INCLUDE_AND_SEEK_NEXT_COL:
588
589          Filter f = matcher.getFilter();
590          if (f != null) {
591            cell = f.transformCell(cell);
592          }
593
594          this.countPerRow++;
595          if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
596            // do what SEEK_NEXT_ROW does.
597            if (!matcher.moreRowsMayExistAfter(cell)) {
598              close(false);// Do all cleanup except heap.close()
599              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
600            }
601            matcher.clearCurrentRow();
602            seekToNextRow(cell);
603            break LOOP;
604          }
605
606          // add to results only if we have skipped #storeOffset kvs
607          // also update metric accordingly
608          if (this.countPerRow > storeOffset) {
609            outResult.add(cell);
610
611            // Update local tracking information
612            count++;
613            totalBytesRead += cellSize;
614
615            // Update the progress of the scanner context
616            scannerContext.incrementSizeProgress(cellSize,
617              PrivateCellUtil.estimatedSizeOfCell(cell));
618            scannerContext.incrementBatchProgress(1);
619
620            if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
621              throw new RowTooBigException(
622                  "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
623            }
624          }
625
626          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
627            if (!matcher.moreRowsMayExistAfter(cell)) {
628              close(false);// Do all cleanup except heap.close()
629              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
630            }
631            matcher.clearCurrentRow();
632            seekOrSkipToNextRow(cell);
633          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
634            seekOrSkipToNextColumn(cell);
635          } else {
636            this.heap.next();
637          }
638
639          if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
640            break LOOP;
641          }
642          if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
643            break LOOP;
644          }
645          continue;
646
647        case DONE:
648          // Optimization for Gets! If DONE, no more to get on this row, early exit!
649          if (get) {
650            // Then no more to this row... exit.
651            close(false);// Do all cleanup except heap.close()
652            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
653          }
654          matcher.clearCurrentRow();
655          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
656
657        case DONE_SCAN:
658          close(false);// Do all cleanup except heap.close()
659          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
660
661        case SEEK_NEXT_ROW:
662          // This is just a relatively simple end of scan fix, to short-cut end
663          // us if there is an endKey in the scan.
664          if (!matcher.moreRowsMayExistAfter(cell)) {
665            close(false);// Do all cleanup except heap.close()
666            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
667          }
668          matcher.clearCurrentRow();
669          seekOrSkipToNextRow(cell);
670          NextState stateAfterSeekNextRow = needToReturn(outResult);
671          if (stateAfterSeekNextRow != null) {
672            return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
673          }
674          break;
675
676        case SEEK_NEXT_COL:
677          seekOrSkipToNextColumn(cell);
678          NextState stateAfterSeekNextColumn = needToReturn(outResult);
679          if (stateAfterSeekNextColumn != null) {
680            return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
681          }
682          break;
683
684        case SKIP:
685          this.heap.next();
686          break;
687
688        case SEEK_NEXT_USING_HINT:
689          Cell nextKV = matcher.getNextKeyHint(cell);
690          if (nextKV != null && comparator.compare(nextKV, cell) > 0) {
691            seekAsDirection(nextKV);
692            NextState stateAfterSeekByHint = needToReturn(outResult);
693            if (stateAfterSeekByHint != null) {
694              return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
695            }
696          } else {
697            heap.next();
698          }
699          break;
700
701        default:
702          throw new RuntimeException("UNEXPECTED");
703      }
704    } while ((cell = this.heap.peek()) != null);
705
706    if (count > 0) {
707      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
708    }
709
710    // No more keys
711    close(false);// Do all cleanup except heap.close()
712    return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
713  }
714
715  /**
716   * If the top cell won't be flushed into disk, the new top cell may be
717   * changed after #reopenAfterFlush. Because the older top cell only exist
718   * in the memstore scanner but the memstore scanner is replaced by hfile
719   * scanner after #reopenAfterFlush. If the row of top cell is changed,
720   * we should return the current cells. Otherwise, we may return
721   * the cells across different rows.
722   * @param outResult the cells which are visible for user scan
723   * @return null is the top cell doesn't change. Otherwise, the NextState
724   *         to return
725   */
726  private NextState needToReturn(List<Cell> outResult) {
727    if (!outResult.isEmpty() && topChanged) {
728      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
729    }
730    return null;
731  }
732
733  private void seekOrSkipToNextRow(Cell cell) throws IOException {
734    // If it is a Get Scan, then we know that we are done with this row; there are no more
735    // rows beyond the current one: don't try to optimize.
736    if (!get) {
737      if (trySkipToNextRow(cell)) {
738        return;
739      }
740    }
741    seekToNextRow(cell);
742  }
743
744  private void seekOrSkipToNextColumn(Cell cell) throws IOException {
745    if (!trySkipToNextColumn(cell)) {
746      seekAsDirection(matcher.getKeyForNextColumn(cell));
747    }
748  }
749
750  /**
751   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
752   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row,
753   * or seek to an arbitrary seek key. This method decides whether a seek is the most efficient
754   * _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP,
755   * SKIP inside the current, loaded block).
756   * It does this by looking at the next indexed key of the current HFile. This key
757   * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
758   * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
759   * the current Cell but compare as though it were a seek key; see down in
760   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the
761   * next block we *_SEEK, otherwise we just SKIP to the next requested cell.
762   *
763   * <p>Other notes:
764   * <ul>
765   * <li>Rows can straddle block boundaries</li>
766   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
767   * different block than column C1 at T2)</li>
768   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a
769   * few SKIPs...</li>
770   * <li>We want to SEEK when the chance is high that we'll be able to seek
771   * past many Cells, especially if we know we need to go to the next block.</li>
772   * </ul>
773   * <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether
774   * we'll likely end up seeking to the next block (or past the next block) to get our next column.
775   * Example:
776   * <pre>
777   * |    BLOCK 1              |     BLOCK 2                   |
778   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
779   *                                   ^         ^
780   *                                   |         |
781   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
782   *
783   *
784   * |    BLOCK 1                       |     BLOCK 2                      |
785   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
786   *                                            ^              ^
787   *                                            |              |
788   *                                    Next Index Key        SEEK_NEXT_COL
789   * </pre>
790   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
791   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
792   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
793   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
794   * where the SEEK will not land us in the next block, it is very likely better to issues a series
795   * of SKIPs.
796   * @param cell current cell
797   * @return true means skip to next row, false means not
798   */
799  @VisibleForTesting
800  protected boolean trySkipToNextRow(Cell cell) throws IOException {
801    Cell nextCell = null;
802    // used to guard against a changed next indexed key by doing a identity comparison
803    // when the identity changes we need to compare the bytes again
804    Cell previousIndexedKey = null;
805    do {
806      Cell nextIndexedKey = getNextIndexedKey();
807      if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY &&
808          (nextIndexedKey == previousIndexedKey ||
809          matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) {
810        this.heap.next();
811        ++kvsScanned;
812        previousIndexedKey = nextIndexedKey;
813      } else {
814        return false;
815      }
816    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
817    return true;
818  }
819
820  /**
821   * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)}
822   * @param cell current cell
823   * @return true means skip to next column, false means not
824   */
825  @VisibleForTesting
826  protected boolean trySkipToNextColumn(Cell cell) throws IOException {
827    Cell nextCell = null;
828    // used to guard against a changed next indexed key by doing a identity comparison
829    // when the identity changes we need to compare the bytes again
830    Cell previousIndexedKey = null;
831    do {
832      Cell nextIndexedKey = getNextIndexedKey();
833      if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY &&
834          (nextIndexedKey == previousIndexedKey ||
835          matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) {
836        this.heap.next();
837        ++kvsScanned;
838        previousIndexedKey = nextIndexedKey;
839      } else {
840        return false;
841      }
842    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
843    // We need this check because it may happen that the new scanner that we get
844    // during heap.next() is requiring reseek due of fake KV previously generated for
845    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
846    if (nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) {
847      return false;
848    }
849    return true;
850  }
851
852  @Override
853  public long getReadPoint() {
854    return this.readPt;
855  }
856
857  private static void clearAndClose(List<KeyValueScanner> scanners) {
858    for (KeyValueScanner s : scanners) {
859      s.close();
860    }
861    scanners.clear();
862  }
863
864  // Implementation of ChangedReadersObserver
865  @Override
866  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
867      throws IOException {
868    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
869      return;
870    }
871    boolean updateReaders = false;
872    flushLock.lock();
873    try {
874      if (!closeLock.tryLock()) {
875        // The reason for doing this is that when the current store scanner does not retrieve
876        // any new cells, then the scanner is considered to be done. The heap of this scanner
877        // is not closed till the shipped() call is completed. Hence in that case if at all
878        // the partial close (close (false)) has been called before updateReaders(), there is no
879        // need for the updateReaders() to happen.
880        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
881        // no lock acquired.
882        return;
883      }
884      // lock acquired
885      updateReaders = true;
886      if (this.closing) {
887        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
888        return;
889      }
890      flushed = true;
891      final boolean isCompaction = false;
892      boolean usePread = get || scanUsePread;
893      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
894      // calls next(). So its better we create scanners here rather than next() call. Ensure
895      // these scanners are properly closed() whether or not the scan is completed successfully
896      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
897      // store files. In case of stream scanners this eager creation does not induce performance
898      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
899      List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
900        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
901      flushedstoreFileScanners.addAll(scanners);
902      if (!CollectionUtils.isEmpty(memStoreScanners)) {
903        clearAndClose(memStoreScannersAfterFlush);
904        memStoreScannersAfterFlush.addAll(memStoreScanners);
905      }
906    } finally {
907      flushLock.unlock();
908      if (updateReaders) {
909        closeLock.unlock();
910      }
911    }
912    // Let the next() call handle re-creating and seeking
913  }
914
915  /**
916   * @return if top of heap has changed (and KeyValueHeap has to try the next KV)
917   */
918  protected final boolean reopenAfterFlush() throws IOException {
919    // here we can make sure that we have a Store instance so no null check on store.
920    Cell lastTop = heap.peek();
921    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
922    // scanners? We did so in the constructor and we could have done it now by storing the scan
923    // object from the constructor
924    List<KeyValueScanner> scanners;
925    flushLock.lock();
926    try {
927      List<KeyValueScanner> allScanners =
928          new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
929      allScanners.addAll(flushedstoreFileScanners);
930      allScanners.addAll(memStoreScannersAfterFlush);
931      scanners = selectScannersFrom(store, allScanners);
932      // Clear the current set of flushed store files scanners so that they don't get added again
933      flushedstoreFileScanners.clear();
934      memStoreScannersAfterFlush.clear();
935    } finally {
936      flushLock.unlock();
937    }
938
939    // Seek the new scanners to the last key
940    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
941    // remove the older memstore scanner
942    for (int i = currentScanners.size() - 1; i >=0; i--) {
943      if (!currentScanners.get(i).isFileScanner()) {
944        scannersForDelayedClose.add(currentScanners.remove(i));
945      } else {
946        // we add the memstore scanner to the end of currentScanners
947        break;
948      }
949    }
950    // add the newly created scanners on the flushed files and the current active memstore scanner
951    addCurrentScanners(scanners);
952    // Combine all seeked scanners with a heap
953    resetKVHeap(this.currentScanners, store.getComparator());
954    resetQueryMatcher(lastTop);
955    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
956      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() +
957          ",and after = " + heap.peek());
958      topChanged = true;
959    } else {
960      topChanged = false;
961    }
962    return topChanged;
963  }
964
965  private void resetQueryMatcher(Cell lastTopKey) {
966    // Reset the state of the Query Matcher and set to top row.
967    // Only reset and call setRow if the row changes; avoids confusing the
968    // query matcher if scanning intra-row.
969    Cell cell = heap.peek();
970    if (cell == null) {
971      cell = lastTopKey;
972    }
973    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
974      this.countPerRow = 0;
975      // The setToNewRow will call reset internally
976      matcher.setToNewRow(cell);
977    }
978  }
979
980  /**
981   * Check whether scan as expected order
982   * @param prevKV
983   * @param kv
984   * @param comparator
985   * @throws IOException
986   */
987  protected void checkScanOrder(Cell prevKV, Cell kv,
988      CellComparator comparator) throws IOException {
989    // Check that the heap gives us KVs in an increasing order.
990    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key "
991        + prevKV + " followed by a smaller key " + kv + " in cf " + store;
992  }
993
994  protected boolean seekToNextRow(Cell c) throws IOException {
995    return reseek(PrivateCellUtil.createLastOnRow(c));
996  }
997
998  /**
999   * Do a reseek in a normal StoreScanner(scan forward)
1000   * @param kv
1001   * @return true if scanner has values left, false if end of scanner
1002   * @throws IOException
1003   */
1004  protected boolean seekAsDirection(Cell kv)
1005      throws IOException {
1006    return reseek(kv);
1007  }
1008
1009  @Override
1010  public boolean reseek(Cell kv) throws IOException {
1011    if (checkFlushed()) {
1012      reopenAfterFlush();
1013    }
1014    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1015      return heap.requestSeek(kv, true, useRowColBloom);
1016    }
1017    return heap.reseek(kv);
1018  }
1019
1020  @VisibleForTesting
1021  void trySwitchToStreamRead() {
1022    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing ||
1023        heap.peek() == null || bytesRead < preadMaxBytes) {
1024      return;
1025    }
1026    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1027        this.store.getColumnFamilyName());
1028    scanUsePread = false;
1029    Cell lastTop = heap.peek();
1030    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1031    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1032    for (KeyValueScanner kvs : currentScanners) {
1033      if (!kvs.isFileScanner()) {
1034        // collect memstorescanners here
1035        memstoreScanners.add(kvs);
1036      } else {
1037        scannersToClose.add(kvs);
1038      }
1039    }
1040    List<KeyValueScanner> fileScanners = null;
1041    List<KeyValueScanner> newCurrentScanners;
1042    KeyValueHeap newHeap;
1043    try {
1044      // We must have a store instance here so no null check
1045      // recreate the scanners on the current file scanners
1046      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
1047        matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
1048        scan.includeStopRow(), readPt, false);
1049      if (fileScanners == null) {
1050        return;
1051      }
1052      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1053      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1054      newCurrentScanners.addAll(fileScanners);
1055      newCurrentScanners.addAll(memstoreScanners);
1056      newHeap = newKVHeap(newCurrentScanners, comparator);
1057    } catch (Exception e) {
1058      LOG.warn("failed to switch to stream read", e);
1059      if (fileScanners != null) {
1060        fileScanners.forEach(KeyValueScanner::close);
1061      }
1062      return;
1063    }
1064    currentScanners.clear();
1065    addCurrentScanners(newCurrentScanners);
1066    this.heap = newHeap;
1067    resetQueryMatcher(lastTop);
1068    scannersToClose.forEach(KeyValueScanner::close);
1069  }
1070
1071  protected final boolean checkFlushed() {
1072    // check the var without any lock. Suppose even if we see the old
1073    // value here still it is ok to continue because we will not be resetting
1074    // the heap but will continue with the referenced memstore's snapshot. For compactions
1075    // any way we don't need the updateReaders at all to happen as we still continue with
1076    // the older files
1077    if (flushed) {
1078      // If there is a flush and the current scan is notified on the flush ensure that the
1079      // scan's heap gets reset and we do a seek on the newly flushed file.
1080      if (this.closing) {
1081        return false;
1082      }
1083      // reset the flag
1084      flushed = false;
1085      return true;
1086    }
1087    return false;
1088  }
1089
1090  /**
1091   * @see KeyValueScanner#getScannerOrder()
1092   */
1093  @Override
1094  public long getScannerOrder() {
1095    return 0;
1096  }
1097
1098  /**
1099   * Seek storefiles in parallel to optimize IO latency as much as possible
1100   * @param scanners the list {@link KeyValueScanner}s to be read from
1101   * @param kv the KeyValue on which the operation is being requested
1102   * @throws IOException
1103   */
1104  private void parallelSeek(final List<? extends KeyValueScanner>
1105      scanners, final Cell kv) throws IOException {
1106    if (scanners.isEmpty()) return;
1107    int storeFileScannerCount = scanners.size();
1108    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1109    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1110    for (KeyValueScanner scanner : scanners) {
1111      if (scanner instanceof StoreFileScanner) {
1112        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
1113          this.readPt, latch);
1114        executor.submit(seekHandler);
1115        handlers.add(seekHandler);
1116      } else {
1117        scanner.seek(kv);
1118        latch.countDown();
1119      }
1120    }
1121
1122    try {
1123      latch.await();
1124    } catch (InterruptedException ie) {
1125      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
1126    }
1127
1128    for (ParallelSeekHandler handler : handlers) {
1129      if (handler.getErr() != null) {
1130        throw new IOException(handler.getErr());
1131      }
1132    }
1133  }
1134
1135  /**
1136   * Used in testing.
1137   * @return all scanners in no particular order
1138   */
1139  @VisibleForTesting
1140  List<KeyValueScanner> getAllScannersForTesting() {
1141    List<KeyValueScanner> allScanners = new ArrayList<>();
1142    KeyValueScanner current = heap.getCurrentForTesting();
1143    if (current != null)
1144      allScanners.add(current);
1145    for (KeyValueScanner scanner : heap.getHeap())
1146      allScanners.add(scanner);
1147    return allScanners;
1148  }
1149
1150  static void enableLazySeekGlobally(boolean enable) {
1151    lazySeekEnabledGlobally = enable;
1152  }
1153
1154  /**
1155   * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
1156   */
1157  public long getEstimatedNumberOfKvsScanned() {
1158    return this.kvsScanned;
1159  }
1160
1161  @Override
1162  public Cell getNextIndexedKey() {
1163    return this.heap.getNextIndexedKey();
1164  }
1165
1166  @Override
1167  public void shipped() throws IOException {
1168    if (prevCell != null) {
1169      // Do the copy here so that in case the prevCell ref is pointing to the previous
1170      // blocks we can safely release those blocks.
1171      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1172      // fetched from HDFS. Copying this would ensure that we let go the references to these
1173      // blocks so that they can be GCed safely(in case of bucket cache)
1174      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1175    }
1176    matcher.beforeShipped();
1177    // There wont be further fetch of Cells from these scanners. Just close.
1178    clearAndClose(scannersForDelayedClose);
1179    if (this.heap != null) {
1180      this.heap.shipped();
1181      // When switching from pread to stream, we will open a new scanner for each store file, but
1182      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1183      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1184      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1185      // method, so we here will also open new scanners and close old scanners in shipped method.
1186      // See HBASE-18055 for more details.
1187      trySwitchToStreamRead();
1188    }
1189  }
1190}
1191