001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.NavigableSet;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.locks.ReentrantLock;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.KeyValueUtil;
037import org.apache.hadoop.hbase.client.IsolationLevel;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.executor.ExecutorService;
040import org.apache.hadoop.hbase.filter.Filter;
041import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
042import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
043import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
044import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
045import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
046import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.yetus.audience.InterfaceAudience;
049
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
056
057/**
058 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
059 * for a single row.
060 * <p>
061 * The implementation is not thread safe. So there will be no race between next and close. The only
062 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
063 * is a flush.
064 */
065@InterfaceAudience.Private
066public class StoreScanner extends NonReversedNonLazyKeyValueScanner
067    implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
068  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
069  // In unit tests, the store could be null
070  protected final HStore store;
071  private final CellComparator comparator;
072  private ScanQueryMatcher matcher;
073  protected KeyValueHeap heap;
074  private boolean cacheBlocks;
075
076  private long countPerRow = 0;
077  private int storeLimit = -1;
078  private int storeOffset = 0;
079
080  // Used to indicate that the scanner has closed (see HBASE-1107)
081  private volatile boolean closing = false;
082  private final boolean get;
083  private final boolean explicitColumnQuery;
084  private final boolean useRowColBloom;
085  /**
086   * A flag that enables StoreFileScanner parallel-seeking
087   */
088  private boolean parallelSeekEnabled = false;
089  private ExecutorService executor;
090  private final Scan scan;
091  private final long oldestUnexpiredTS;
092  private final long now;
093  private final int minVersions;
094  private final long maxRowSize;
095  private final long cellsPerHeartbeatCheck;
096
097  // 1) Collects all the KVHeap that are eagerly getting closed during the
098  //    course of a scan
099  // 2) Collects the unused memstore scanners. If we close the memstore scanners
100  //    before sending data to client, the chunk may be reclaimed by other
101  //    updates and the data will be corrupt.
102  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
103
104  /**
105   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
106   * KVs skipped via seeking to next row/column. TODO: estimate them?
107   */
108  private long kvsScanned = 0;
109  private Cell prevCell = null;
110
111  private final long preadMaxBytes;
112  private long bytesRead;
113
114  /** We don't ever expect to change this, the constant is just for clarity. */
115  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
116  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
117      "hbase.storescanner.parallel.seek.enable";
118
119  /** Used during unit testing to ensure that lazy seek does save seek ops */
120  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
121
122  /**
123   * The number of cells scanned in between timeout checks. Specifying a larger value means that
124   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
125   * timeout checks.
126   */
127  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
128      "hbase.cells.scanned.per.heartbeat.check";
129
130  /**
131   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
132   */
133  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
134
135  /**
136   * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
137   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
138   * block size for this store.
139   */
140  public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
141
142  private final Scan.ReadType readType;
143
144  // A flag whether use pread for scan
145  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
146  private boolean scanUsePread;
147  // Indicates whether there was flush during the course of the scan
148  private volatile boolean flushed = false;
149  // generally we get one file from a flush
150  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
151  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
152  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
153  // The current list of scanners
154  @VisibleForTesting
155  final List<KeyValueScanner> currentScanners = new ArrayList<>();
156  // flush update lock
157  private final ReentrantLock flushLock = new ReentrantLock();
158  // lock for closing.
159  private final ReentrantLock closeLock = new ReentrantLock();
160
161  protected final long readPt;
162  private boolean topChanged = false;
163
164  /** An internal constructor. */
165  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo,
166      int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
167    this.readPt = readPt;
168    this.store = store;
169    this.cacheBlocks = cacheBlocks;
170    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
171    get = scan.isGetScan();
172    explicitColumnQuery = numColumns > 0;
173    this.scan = scan;
174    this.now = EnvironmentEdgeManager.currentTime();
175    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
176    this.minVersions = scanInfo.getMinVersions();
177
178    // We look up row-column Bloom filters for multi-column queries as part of
179    // the seek operation. However, we also look the row-column Bloom filter
180    // for multi-row (non-"get") scans because this is not done in
181    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
182    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1);
183    this.maxRowSize = scanInfo.getTableMaxRowSize();
184    if (get) {
185      this.readType = Scan.ReadType.PREAD;
186      this.scanUsePread = true;
187    } else if (scanType != ScanType.USER_SCAN) {
188      // For compaction scanners never use Pread as already we have stream based scanners on the
189      // store files to be compacted
190      this.readType = Scan.ReadType.STREAM;
191      this.scanUsePread = false;
192    } else {
193      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
194        this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT;
195      } else {
196        this.readType = scan.getReadType();
197      }
198      // Always start with pread unless user specific stream. Will change to stream later if
199      // readType is default if the scan keeps running for a long time.
200      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
201    }
202    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
203    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
204    // Parallel seeking is on if the config allows and more there is more than one store file.
205    if (store != null && store.getStorefilesCount() > 1) {
206      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
207      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
208        this.parallelSeekEnabled = true;
209        this.executor = rsService.getExecutorService();
210      }
211    }
212  }
213
214  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
215    this.currentScanners.addAll(scanners);
216  }
217
218  /**
219   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
220   * are not in a compaction.
221   *
222   * @param store who we scan
223   * @param scan the spec
224   * @param columns which columns we are scanning
225   * @throws IOException
226   */
227  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
228      long readPt) throws IOException {
229    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt,
230        scan.getCacheBlocks(), ScanType.USER_SCAN);
231    if (columns != null && scan.isRaw()) {
232      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
233    }
234    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
235      store.getCoprocessorHost());
236
237    store.addChangedReaderObserver(this);
238
239    List<KeyValueScanner> scanners = null;
240    try {
241      // Pass columns to try to filter out unnecessary StoreFiles.
242      scanners = selectScannersFrom(store,
243        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
244          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
245
246      // Seek all scanners to the start of the Row (or if the exact matching row
247      // key does not exist, then to the start of the next matching Row).
248      // Always check bloom filter to optimize the top row seek for delete
249      // family marker.
250      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
251        parallelSeekEnabled);
252
253      // set storeLimit
254      this.storeLimit = scan.getMaxResultsPerColumnFamily();
255
256      // set rowOffset
257      this.storeOffset = scan.getRowOffsetPerColumnFamily();
258      addCurrentScanners(scanners);
259      // Combine all seeked scanners with a heap
260      resetKVHeap(scanners, comparator);
261    } catch (IOException e) {
262      clearAndClose(scanners);
263      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
264      // and might cause memory leak
265      store.deleteChangedReaderObserver(this);
266      throw e;
267    }
268  }
269
270  // a dummy scan instance for compaction.
271  private static final Scan SCAN_FOR_COMPACTION = new Scan();
272
273  /**
274   * Used for store file compaction and memstore compaction.
275   * <p>
276   * Opens a scanner across specified StoreFiles/MemStoreSegments.
277   * @param store who we scan
278   * @param scanners ancillary scanners
279   * @param smallestReadPoint the readPoint that we should use for tracking versions
280   */
281  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
282      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
283    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
284  }
285
286  /**
287   * Used for compactions that drop deletes from a limited range of rows.
288   * <p>
289   * Opens a scanner across specified StoreFiles.
290   * @param store who we scan
291   * @param scanners ancillary scanners
292   * @param smallestReadPoint the readPoint that we should use for tracking versions
293   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
294   * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
295   */
296  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
297      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
298      byte[] dropDeletesToRow) throws IOException {
299    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
300        earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
301  }
302
303  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
304      ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
305      byte[] dropDeletesToRow) throws IOException {
306    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
307        store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
308    assert scanType != ScanType.USER_SCAN;
309    matcher =
310        CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
311          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
312
313    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
314    scanners = selectScannersFrom(store, scanners);
315
316    // Seek all scanners to the initial key
317    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
318    addCurrentScanners(scanners);
319    // Combine all seeked scanners with a heap
320    resetKVHeap(scanners, comparator);
321  }
322
323  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
324      throws IOException {
325    // Seek all scanners to the initial key
326    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
327    addCurrentScanners(scanners);
328    resetKVHeap(scanners, comparator);
329  }
330
331  // For mob compaction only as we do not have a Store instance when doing mob compaction.
332  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
333      List<? extends KeyValueScanner> scanners) throws IOException {
334    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
335    assert scanType != ScanType.USER_SCAN;
336    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
337      oldestUnexpiredTS, now, null, null, null);
338    seekAllScanner(scanInfo, scanners);
339  }
340
341  // Used to instantiate a scanner for user scan in test
342  @VisibleForTesting
343  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
344      List<? extends KeyValueScanner> scanners) throws IOException {
345    // 0 is passed as readpoint because the test bypasses Store
346    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L,
347        scan.getCacheBlocks(), ScanType.USER_SCAN);
348    this.matcher =
349        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
350    seekAllScanner(scanInfo, scanners);
351  }
352
353  // Used to instantiate a scanner for compaction in test
354  @VisibleForTesting
355  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
356      List<? extends KeyValueScanner> scanners) throws IOException {
357    // 0 is passed as readpoint because the test bypasses Store
358    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions)
359      : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
360    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
361      HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
362    seekAllScanner(scanInfo, scanners);
363  }
364
365  @VisibleForTesting
366  boolean isScanUsePread() {
367    return this.scanUsePread;
368  }
369  /**
370   * Seek the specified scanners with the given key
371   * @param scanners
372   * @param seekKey
373   * @param isLazy true if using lazy seek
374   * @param isParallelSeek true if using parallel seek
375   * @throws IOException
376   */
377  protected void seekScanners(List<? extends KeyValueScanner> scanners,
378      Cell seekKey, boolean isLazy, boolean isParallelSeek)
379      throws IOException {
380    // Seek all scanners to the start of the Row (or if the exact matching row
381    // key does not exist, then to the start of the next matching Row).
382    // Always check bloom filter to optimize the top row seek for delete
383    // family marker.
384    if (isLazy) {
385      for (KeyValueScanner scanner : scanners) {
386        scanner.requestSeek(seekKey, false, true);
387      }
388    } else {
389      if (!isParallelSeek) {
390        long totalScannersSoughtBytes = 0;
391        for (KeyValueScanner scanner : scanners) {
392          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
393            throw new RowTooBigException("Max row size allowed: " + maxRowSize
394              + ", but row is bigger than that");
395          }
396          scanner.seek(seekKey);
397          Cell c = scanner.peek();
398          if (c != null) {
399            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
400          }
401        }
402      } else {
403        parallelSeek(scanners, seekKey);
404      }
405    }
406  }
407
408  @VisibleForTesting
409  protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
410      CellComparator comparator) throws IOException {
411    // Combine all seeked scanners with a heap
412    heap = newKVHeap(scanners, comparator);
413  }
414
415  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
416      CellComparator comparator) throws IOException {
417    return new KeyValueHeap(scanners, comparator);
418  }
419
420  /**
421   * Filters the given list of scanners using Bloom filter, time range, and TTL.
422   * <p>
423   * Will be overridden by testcase so declared as protected.
424   */
425  @VisibleForTesting
426  protected List<KeyValueScanner> selectScannersFrom(HStore store,
427      List<? extends KeyValueScanner> allScanners) {
428    boolean memOnly;
429    boolean filesOnly;
430    if (scan instanceof InternalScan) {
431      InternalScan iscan = (InternalScan) scan;
432      memOnly = iscan.isCheckOnlyMemStore();
433      filesOnly = iscan.isCheckOnlyStoreFiles();
434    } else {
435      memOnly = false;
436      filesOnly = false;
437    }
438
439    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
440
441    // We can only exclude store files based on TTL if minVersions is set to 0.
442    // Otherwise, we might have to return KVs that have technically expired.
443    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
444
445    // include only those scan files which pass all filters
446    for (KeyValueScanner kvs : allScanners) {
447      boolean isFile = kvs.isFileScanner();
448      if ((!isFile && filesOnly) || (isFile && memOnly)) {
449        continue;
450      }
451
452      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
453        scanners.add(kvs);
454      } else {
455        kvs.close();
456      }
457    }
458    return scanners;
459  }
460
461  @Override
462  public Cell peek() {
463    return heap != null ? heap.peek() : null;
464  }
465
466  @Override
467  public KeyValue next() {
468    // throw runtime exception perhaps?
469    throw new RuntimeException("Never call StoreScanner.next()");
470  }
471
472  @Override
473  public void close() {
474    close(true);
475  }
476
477  private void close(boolean withDelayedScannersClose) {
478    closeLock.lock();
479    // If the closeLock is acquired then any subsequent updateReaders()
480    // call is ignored.
481    try {
482      if (this.closing) {
483        return;
484      }
485      if (withDelayedScannersClose) {
486        this.closing = true;
487      }
488      // For mob compaction, we do not have a store.
489      if (this.store != null) {
490        this.store.deleteChangedReaderObserver(this);
491      }
492      if (withDelayedScannersClose) {
493        clearAndClose(scannersForDelayedClose);
494        clearAndClose(memStoreScannersAfterFlush);
495        clearAndClose(flushedstoreFileScanners);
496        if (this.heap != null) {
497          this.heap.close();
498          this.currentScanners.clear();
499          this.heap = null; // CLOSED!
500        }
501      } else {
502        if (this.heap != null) {
503          this.scannersForDelayedClose.add(this.heap);
504          this.currentScanners.clear();
505          this.heap = null;
506        }
507      }
508    } finally {
509      closeLock.unlock();
510    }
511  }
512
513  @Override
514  public boolean seek(Cell key) throws IOException {
515    if (checkFlushed()) {
516      reopenAfterFlush();
517    }
518    return this.heap.seek(key);
519  }
520
521  /**
522   * Get the next row of values from this Store.
523   * @param outResult
524   * @param scannerContext
525   * @return true if there are more rows, false if scanner is done
526   */
527  @Override
528  public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
529    if (scannerContext == null) {
530      throw new IllegalArgumentException("Scanner context cannot be null");
531    }
532    if (checkFlushed() && reopenAfterFlush()) {
533      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
534    }
535
536    // if the heap was left null, then the scanners had previously run out anyways, close and
537    // return.
538    if (this.heap == null) {
539      // By this time partial close should happened because already heap is null
540      close(false);// Do all cleanup except heap.close()
541      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
542    }
543
544    Cell cell = this.heap.peek();
545    if (cell == null) {
546      close(false);// Do all cleanup except heap.close()
547      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
548    }
549
550    // only call setRow if the row changes; avoids confusing the query matcher
551    // if scanning intra-row
552
553    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
554    // rows. Else it is possible we are still traversing the same row so we must perform the row
555    // comparison.
556    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
557      this.countPerRow = 0;
558      matcher.setToNewRow(cell);
559    }
560
561    // Clear progress away unless invoker has indicated it should be kept.
562    if (!scannerContext.getKeepProgress()) {
563      scannerContext.clearProgress();
564    }
565
566    int count = 0;
567    long totalBytesRead = 0;
568
569    LOOP: do {
570      // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
571      // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in
572      // the shipped method below.
573      if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread &&
574        readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) {
575        if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
576          return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
577        }
578      }
579      // Do object compare - we set prevKV from the same heap.
580      if (prevCell != cell) {
581        ++kvsScanned;
582      }
583      checkScanOrder(prevCell, cell, comparator);
584      int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
585      bytesRead += cellSize;
586      if (scanUsePread && readType == Scan.ReadType.DEFAULT &&
587        bytesRead > preadMaxBytes) {
588        // return immediately if we want to switch from pread to stream. We need this because we can
589        // only switch in the shipped method, if user use a filter to filter out everything and rpc
590        // timeout is very large then the shipped method will never be called until the whole scan
591        // is finished, but at that time we have already scan all the data...
592        // See HBASE-20457 for more details.
593        // And there is still a scenario that can not be handled. If we have a very large row, which
594        // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
595        // here, we still need to scan all the qualifiers before returning...
596        scannerContext.returnImmediately();
597      }
598      prevCell = cell;
599      scannerContext.setLastPeekedCell(cell);
600      topChanged = false;
601      ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
602      switch (qcode) {
603        case INCLUDE:
604        case INCLUDE_AND_SEEK_NEXT_ROW:
605        case INCLUDE_AND_SEEK_NEXT_COL:
606
607          Filter f = matcher.getFilter();
608          if (f != null) {
609            cell = f.transformCell(cell);
610          }
611
612          this.countPerRow++;
613          if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
614            // do what SEEK_NEXT_ROW does.
615            if (!matcher.moreRowsMayExistAfter(cell)) {
616              close(false);// Do all cleanup except heap.close()
617              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
618            }
619            matcher.clearCurrentRow();
620            seekToNextRow(cell);
621            break LOOP;
622          }
623
624          // add to results only if we have skipped #storeOffset kvs
625          // also update metric accordingly
626          if (this.countPerRow > storeOffset) {
627            outResult.add(cell);
628
629            // Update local tracking information
630            count++;
631            totalBytesRead += cellSize;
632
633            // Update the progress of the scanner context
634            scannerContext.incrementSizeProgress(cellSize,
635              PrivateCellUtil.estimatedSizeOfCell(cell));
636            scannerContext.incrementBatchProgress(1);
637
638            if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
639              throw new RowTooBigException(
640                  "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
641            }
642          }
643
644          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
645            if (!matcher.moreRowsMayExistAfter(cell)) {
646              close(false);// Do all cleanup except heap.close()
647              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
648            }
649            matcher.clearCurrentRow();
650            seekOrSkipToNextRow(cell);
651          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
652            seekOrSkipToNextColumn(cell);
653          } else {
654            this.heap.next();
655          }
656
657          if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
658            break LOOP;
659          }
660          if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
661            break LOOP;
662          }
663          continue;
664
665        case DONE:
666          // Optimization for Gets! If DONE, no more to get on this row, early exit!
667          if (get) {
668            // Then no more to this row... exit.
669            close(false);// Do all cleanup except heap.close()
670            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
671          }
672          matcher.clearCurrentRow();
673          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
674
675        case DONE_SCAN:
676          close(false);// Do all cleanup except heap.close()
677          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
678
679        case SEEK_NEXT_ROW:
680          // This is just a relatively simple end of scan fix, to short-cut end
681          // us if there is an endKey in the scan.
682          if (!matcher.moreRowsMayExistAfter(cell)) {
683            close(false);// Do all cleanup except heap.close()
684            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
685          }
686          matcher.clearCurrentRow();
687          seekOrSkipToNextRow(cell);
688          NextState stateAfterSeekNextRow = needToReturn(outResult);
689          if (stateAfterSeekNextRow != null) {
690            return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
691          }
692          break;
693
694        case SEEK_NEXT_COL:
695          seekOrSkipToNextColumn(cell);
696          NextState stateAfterSeekNextColumn = needToReturn(outResult);
697          if (stateAfterSeekNextColumn != null) {
698            return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
699          }
700          break;
701
702        case SKIP:
703          this.heap.next();
704          break;
705
706        case SEEK_NEXT_USING_HINT:
707          Cell nextKV = matcher.getNextKeyHint(cell);
708          if (nextKV != null && comparator.compare(nextKV, cell) > 0) {
709            seekAsDirection(nextKV);
710            NextState stateAfterSeekByHint = needToReturn(outResult);
711            if (stateAfterSeekByHint != null) {
712              return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
713            }
714          } else {
715            heap.next();
716          }
717          break;
718
719        default:
720          throw new RuntimeException("UNEXPECTED");
721      }
722    } while ((cell = this.heap.peek()) != null);
723
724    if (count > 0) {
725      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
726    }
727
728    // No more keys
729    close(false);// Do all cleanup except heap.close()
730    return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
731  }
732
733  /**
734   * If the top cell won't be flushed into disk, the new top cell may be
735   * changed after #reopenAfterFlush. Because the older top cell only exist
736   * in the memstore scanner but the memstore scanner is replaced by hfile
737   * scanner after #reopenAfterFlush. If the row of top cell is changed,
738   * we should return the current cells. Otherwise, we may return
739   * the cells across different rows.
740   * @param outResult the cells which are visible for user scan
741   * @return null is the top cell doesn't change. Otherwise, the NextState
742   *         to return
743   */
744  private NextState needToReturn(List<Cell> outResult) {
745    if (!outResult.isEmpty() && topChanged) {
746      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
747    }
748    return null;
749  }
750
751  private void seekOrSkipToNextRow(Cell cell) throws IOException {
752    // If it is a Get Scan, then we know that we are done with this row; there are no more
753    // rows beyond the current one: don't try to optimize.
754    if (!get) {
755      if (trySkipToNextRow(cell)) {
756        return;
757      }
758    }
759    seekToNextRow(cell);
760  }
761
762  private void seekOrSkipToNextColumn(Cell cell) throws IOException {
763    if (!trySkipToNextColumn(cell)) {
764      seekAsDirection(matcher.getKeyForNextColumn(cell));
765    }
766  }
767
768  /**
769   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
770   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row,
771   * or seek to an arbitrary seek key. This method decides whether a seek is the most efficient
772   * _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP,
773   * SKIP inside the current, loaded block).
774   * It does this by looking at the next indexed key of the current HFile. This key
775   * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
776   * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
777   * the current Cell but compare as though it were a seek key; see down in
778   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the
779   * next block we *_SEEK, otherwise we just SKIP to the next requested cell.
780   *
781   * <p>Other notes:
782   * <ul>
783   * <li>Rows can straddle block boundaries</li>
784   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
785   * different block than column C1 at T2)</li>
786   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a
787   * few SKIPs...</li>
788   * <li>We want to SEEK when the chance is high that we'll be able to seek
789   * past many Cells, especially if we know we need to go to the next block.</li>
790   * </ul>
791   * <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether
792   * we'll likely end up seeking to the next block (or past the next block) to get our next column.
793   * Example:
794   * <pre>
795   * |    BLOCK 1              |     BLOCK 2                   |
796   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
797   *                                   ^         ^
798   *                                   |         |
799   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
800   *
801   *
802   * |    BLOCK 1                       |     BLOCK 2                      |
803   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
804   *                                            ^              ^
805   *                                            |              |
806   *                                    Next Index Key        SEEK_NEXT_COL
807   * </pre>
808   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
809   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
810   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
811   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
812   * where the SEEK will not land us in the next block, it is very likely better to issues a series
813   * of SKIPs.
814   * @param cell current cell
815   * @return true means skip to next row, false means not
816   */
817  @VisibleForTesting
818  protected boolean trySkipToNextRow(Cell cell) throws IOException {
819    Cell nextCell = null;
820    // used to guard against a changed next indexed key by doing a identity comparison
821    // when the identity changes we need to compare the bytes again
822    Cell previousIndexedKey = null;
823    do {
824      Cell nextIndexedKey = getNextIndexedKey();
825      if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY &&
826          (nextIndexedKey == previousIndexedKey ||
827          matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) {
828        this.heap.next();
829        ++kvsScanned;
830        previousIndexedKey = nextIndexedKey;
831      } else {
832        return false;
833      }
834    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
835    return true;
836  }
837
838  /**
839   * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)}
840   * @param cell current cell
841   * @return true means skip to next column, false means not
842   */
843  @VisibleForTesting
844  protected boolean trySkipToNextColumn(Cell cell) throws IOException {
845    Cell nextCell = null;
846    // used to guard against a changed next indexed key by doing a identity comparison
847    // when the identity changes we need to compare the bytes again
848    Cell previousIndexedKey = null;
849    do {
850      Cell nextIndexedKey = getNextIndexedKey();
851      if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY &&
852          (nextIndexedKey == previousIndexedKey ||
853          matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) {
854        this.heap.next();
855        ++kvsScanned;
856        previousIndexedKey = nextIndexedKey;
857      } else {
858        return false;
859      }
860    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
861    // We need this check because it may happen that the new scanner that we get
862    // during heap.next() is requiring reseek due of fake KV previously generated for
863    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
864    if (nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) {
865      return false;
866    }
867    return true;
868  }
869
870  @Override
871  public long getReadPoint() {
872    return this.readPt;
873  }
874
875  private static void clearAndClose(List<KeyValueScanner> scanners) {
876    if (scanners == null) {
877      return;
878    }
879    for (KeyValueScanner s : scanners) {
880      s.close();
881    }
882    scanners.clear();
883  }
884
885  // Implementation of ChangedReadersObserver
886  @Override
887  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
888      throws IOException {
889    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
890      return;
891    }
892    boolean updateReaders = false;
893    flushLock.lock();
894    try {
895      if (!closeLock.tryLock()) {
896        // The reason for doing this is that when the current store scanner does not retrieve
897        // any new cells, then the scanner is considered to be done. The heap of this scanner
898        // is not closed till the shipped() call is completed. Hence in that case if at all
899        // the partial close (close (false)) has been called before updateReaders(), there is no
900        // need for the updateReaders() to happen.
901        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
902        // no lock acquired.
903        clearAndClose(memStoreScanners);
904        return;
905      }
906      // lock acquired
907      updateReaders = true;
908      if (this.closing) {
909        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
910        clearAndClose(memStoreScanners);
911        return;
912      }
913      flushed = true;
914      final boolean isCompaction = false;
915      boolean usePread = get || scanUsePread;
916      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
917      // calls next(). So its better we create scanners here rather than next() call. Ensure
918      // these scanners are properly closed() whether or not the scan is completed successfully
919      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
920      // store files. In case of stream scanners this eager creation does not induce performance
921      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
922      List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
923        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
924      flushedstoreFileScanners.addAll(scanners);
925      if (!CollectionUtils.isEmpty(memStoreScanners)) {
926        clearAndClose(memStoreScannersAfterFlush);
927        memStoreScannersAfterFlush.addAll(memStoreScanners);
928      }
929    } finally {
930      flushLock.unlock();
931      if (updateReaders) {
932        closeLock.unlock();
933      }
934    }
935    // Let the next() call handle re-creating and seeking
936  }
937
938  /**
939   * @return if top of heap has changed (and KeyValueHeap has to try the next KV)
940   */
941  protected final boolean reopenAfterFlush() throws IOException {
942    // here we can make sure that we have a Store instance so no null check on store.
943    Cell lastTop = heap.peek();
944    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
945    // scanners? We did so in the constructor and we could have done it now by storing the scan
946    // object from the constructor
947    List<KeyValueScanner> scanners;
948    flushLock.lock();
949    try {
950      List<KeyValueScanner> allScanners =
951          new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
952      allScanners.addAll(flushedstoreFileScanners);
953      allScanners.addAll(memStoreScannersAfterFlush);
954      scanners = selectScannersFrom(store, allScanners);
955      // Clear the current set of flushed store files scanners so that they don't get added again
956      flushedstoreFileScanners.clear();
957      memStoreScannersAfterFlush.clear();
958    } finally {
959      flushLock.unlock();
960    }
961
962    // Seek the new scanners to the last key
963    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
964    // remove the older memstore scanner
965    for (int i = currentScanners.size() - 1; i >=0; i--) {
966      if (!currentScanners.get(i).isFileScanner()) {
967        scannersForDelayedClose.add(currentScanners.remove(i));
968      } else {
969        // we add the memstore scanner to the end of currentScanners
970        break;
971      }
972    }
973    // add the newly created scanners on the flushed files and the current active memstore scanner
974    addCurrentScanners(scanners);
975    // Combine all seeked scanners with a heap
976    resetKVHeap(this.currentScanners, store.getComparator());
977    resetQueryMatcher(lastTop);
978    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
979      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() +
980          ",and after = " + heap.peek());
981      topChanged = true;
982    } else {
983      topChanged = false;
984    }
985    return topChanged;
986  }
987
988  private void resetQueryMatcher(Cell lastTopKey) {
989    // Reset the state of the Query Matcher and set to top row.
990    // Only reset and call setRow if the row changes; avoids confusing the
991    // query matcher if scanning intra-row.
992    Cell cell = heap.peek();
993    if (cell == null) {
994      cell = lastTopKey;
995    }
996    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
997      this.countPerRow = 0;
998      // The setToNewRow will call reset internally
999      matcher.setToNewRow(cell);
1000    }
1001  }
1002
1003  /**
1004   * Check whether scan as expected order
1005   * @param prevKV
1006   * @param kv
1007   * @param comparator
1008   * @throws IOException
1009   */
1010  protected void checkScanOrder(Cell prevKV, Cell kv,
1011      CellComparator comparator) throws IOException {
1012    // Check that the heap gives us KVs in an increasing order.
1013    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key "
1014        + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1015  }
1016
1017  protected boolean seekToNextRow(Cell c) throws IOException {
1018    return reseek(PrivateCellUtil.createLastOnRow(c));
1019  }
1020
1021  /**
1022   * Do a reseek in a normal StoreScanner(scan forward)
1023   * @param kv
1024   * @return true if scanner has values left, false if end of scanner
1025   * @throws IOException
1026   */
1027  protected boolean seekAsDirection(Cell kv)
1028      throws IOException {
1029    return reseek(kv);
1030  }
1031
1032  @Override
1033  public boolean reseek(Cell kv) throws IOException {
1034    if (checkFlushed()) {
1035      reopenAfterFlush();
1036    }
1037    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1038      return heap.requestSeek(kv, true, useRowColBloom);
1039    }
1040    return heap.reseek(kv);
1041  }
1042
1043  @VisibleForTesting
1044  void trySwitchToStreamRead() {
1045    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing ||
1046        heap.peek() == null || bytesRead < preadMaxBytes) {
1047      return;
1048    }
1049    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1050        this.store.getColumnFamilyName());
1051    scanUsePread = false;
1052    Cell lastTop = heap.peek();
1053    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1054    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1055    for (KeyValueScanner kvs : currentScanners) {
1056      if (!kvs.isFileScanner()) {
1057        // collect memstorescanners here
1058        memstoreScanners.add(kvs);
1059      } else {
1060        scannersToClose.add(kvs);
1061      }
1062    }
1063    List<KeyValueScanner> fileScanners = null;
1064    List<KeyValueScanner> newCurrentScanners;
1065    KeyValueHeap newHeap;
1066    try {
1067      // We must have a store instance here so no null check
1068      // recreate the scanners on the current file scanners
1069      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
1070        matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
1071        scan.includeStopRow(), readPt, false);
1072      if (fileScanners == null) {
1073        return;
1074      }
1075      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1076      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1077      newCurrentScanners.addAll(fileScanners);
1078      newCurrentScanners.addAll(memstoreScanners);
1079      newHeap = newKVHeap(newCurrentScanners, comparator);
1080    } catch (Exception e) {
1081      LOG.warn("failed to switch to stream read", e);
1082      if (fileScanners != null) {
1083        fileScanners.forEach(KeyValueScanner::close);
1084      }
1085      return;
1086    }
1087    currentScanners.clear();
1088    addCurrentScanners(newCurrentScanners);
1089    this.heap = newHeap;
1090    resetQueryMatcher(lastTop);
1091    scannersToClose.forEach(KeyValueScanner::close);
1092  }
1093
1094  protected final boolean checkFlushed() {
1095    // check the var without any lock. Suppose even if we see the old
1096    // value here still it is ok to continue because we will not be resetting
1097    // the heap but will continue with the referenced memstore's snapshot. For compactions
1098    // any way we don't need the updateReaders at all to happen as we still continue with
1099    // the older files
1100    if (flushed) {
1101      // If there is a flush and the current scan is notified on the flush ensure that the
1102      // scan's heap gets reset and we do a seek on the newly flushed file.
1103      if (this.closing) {
1104        return false;
1105      }
1106      // reset the flag
1107      flushed = false;
1108      return true;
1109    }
1110    return false;
1111  }
1112
1113
1114  /**
1115   * Seek storefiles in parallel to optimize IO latency as much as possible
1116   * @param scanners the list {@link KeyValueScanner}s to be read from
1117   * @param kv the KeyValue on which the operation is being requested
1118   * @throws IOException
1119   */
1120  private void parallelSeek(final List<? extends KeyValueScanner>
1121      scanners, final Cell kv) throws IOException {
1122    if (scanners.isEmpty()) return;
1123    int storeFileScannerCount = scanners.size();
1124    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1125    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1126    for (KeyValueScanner scanner : scanners) {
1127      if (scanner instanceof StoreFileScanner) {
1128        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
1129          this.readPt, latch);
1130        executor.submit(seekHandler);
1131        handlers.add(seekHandler);
1132      } else {
1133        scanner.seek(kv);
1134        latch.countDown();
1135      }
1136    }
1137
1138    try {
1139      latch.await();
1140    } catch (InterruptedException ie) {
1141      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
1142    }
1143
1144    for (ParallelSeekHandler handler : handlers) {
1145      if (handler.getErr() != null) {
1146        throw new IOException(handler.getErr());
1147      }
1148    }
1149  }
1150
1151  /**
1152   * Used in testing.
1153   * @return all scanners in no particular order
1154   */
1155  @VisibleForTesting
1156  List<KeyValueScanner> getAllScannersForTesting() {
1157    List<KeyValueScanner> allScanners = new ArrayList<>();
1158    KeyValueScanner current = heap.getCurrentForTesting();
1159    if (current != null)
1160      allScanners.add(current);
1161    for (KeyValueScanner scanner : heap.getHeap())
1162      allScanners.add(scanner);
1163    return allScanners;
1164  }
1165
1166  static void enableLazySeekGlobally(boolean enable) {
1167    lazySeekEnabledGlobally = enable;
1168  }
1169
1170  /**
1171   * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
1172   */
1173  public long getEstimatedNumberOfKvsScanned() {
1174    return this.kvsScanned;
1175  }
1176
1177  @Override
1178  public Cell getNextIndexedKey() {
1179    return this.heap.getNextIndexedKey();
1180  }
1181
1182  @Override
1183  public void shipped() throws IOException {
1184    if (prevCell != null) {
1185      // Do the copy here so that in case the prevCell ref is pointing to the previous
1186      // blocks we can safely release those blocks.
1187      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1188      // fetched from HDFS. Copying this would ensure that we let go the references to these
1189      // blocks so that they can be GCed safely(in case of bucket cache)
1190      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1191    }
1192    matcher.beforeShipped();
1193    // There wont be further fetch of Cells from these scanners. Just close.
1194    clearAndClose(scannersForDelayedClose);
1195    if (this.heap != null) {
1196      this.heap.shipped();
1197      // When switching from pread to stream, we will open a new scanner for each store file, but
1198      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1199      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1200      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1201      // method, so we here will also open new scanners and close old scanners in shipped method.
1202      // See HBASE-18055 for more details.
1203      trySwitchToStreamRead();
1204    }
1205  }
1206}
1207