001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.Set;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.locks.ReentrantLock;
032import java.util.function.IntConsumer;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseInterfaceAudience;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValueUtil;
043import org.apache.hadoop.hbase.PrivateCellUtil;
044import org.apache.hadoop.hbase.PrivateConstants;
045import org.apache.hadoop.hbase.client.IsolationLevel;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.conf.ConfigKey;
048import org.apache.hadoop.hbase.executor.ExecutorService;
049import org.apache.hadoop.hbase.filter.Filter;
050import org.apache.hadoop.hbase.ipc.RpcCall;
051import org.apache.hadoop.hbase.ipc.RpcServer;
052import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
053import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
054import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
055import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
056import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
057import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
064import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
065
066/**
067 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
068 * for a single row.
069 * <p>
070 * The implementation is not thread safe. So there will be no race between next and close. The only
071 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
072 * is a flush.
073 */
074@InterfaceAudience.Private
075public class StoreScanner extends NonReversedNonLazyKeyValueScanner
076  implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
077  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
078  // In unit tests, the store could be null
079  protected final HStore store;
080  private final CellComparator comparator;
081  private ScanQueryMatcher matcher;
082  protected KeyValueHeap heap;
083  private boolean cacheBlocks;
084
085  private long countPerRow = 0;
086  private int storeLimit = -1;
087  private int storeOffset = 0;
088
089  // Used to indicate that the scanner has closed (see HBASE-1107)
090  private volatile boolean closing = false;
091  private final boolean get;
092  private final boolean explicitColumnQuery;
093  private final boolean useRowColBloom;
094  /**
095   * A flag that enables StoreFileScanner parallel-seeking
096   */
097  private boolean parallelSeekEnabled = false;
098  private ExecutorService executor;
099  private final Scan scan;
100  private final long oldestUnexpiredTS;
101  private final long now;
102  private final int minVersions;
103  private final long maxRowSize;
104  private final long cellsPerHeartbeatCheck;
105  long memstoreOnlyReads;
106  long mixedReads;
107
108  // 1) Collects all the KVHeap that are eagerly getting closed during the
109  // course of a scan
110  // 2) Collects the unused memstore scanners. If we close the memstore scanners
111  // before sending data to client, the chunk may be reclaimed by other
112  // updates and the data will be corrupt.
113  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
114
115  // Tracks file paths successfully read (scanners closed) by this store scanner.
116  private final Set<Path> filesRead = new HashSet<>();
117
118  /**
119   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via
120   * seeking to next row/column. TODO: estimate them?
121   */
122  private long kvsScanned = 0;
123  private ExtendedCell prevCell = null;
124
125  private final long preadMaxBytes;
126  private long bytesRead;
127
128  /** We don't ever expect to change this, the constant is just for clarity. */
129  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
130  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
131    "hbase.storescanner.parallel.seek.enable";
132
133  /** Used during unit testing to ensure that lazy seek does save seek ops */
134  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
135
136  /**
137   * The number of cells scanned in between timeout checks. Specifying a larger value means that
138   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
139   * timeout checks.
140   */
141  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
142    ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check");
143
144  /**
145   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
146   */
147  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
148
149  /**
150   * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
151   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
152   * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT,
153   * we will open scanner with stream mode itself.
154   */
155  public static final String STORESCANNER_PREAD_MAX_BYTES =
156    ConfigKey.LONG("hbase.storescanner.pread.max.bytes");
157
158  private final Scan.ReadType readType;
159
160  // A flag whether use pread for scan
161  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
162  private boolean scanUsePread;
163  // Indicates whether there was flush during the course of the scan
164  private volatile boolean flushed = false;
165  // generally we get one file from a flush
166  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
167  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
168  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
169  // The current list of scanners
170  final List<KeyValueScanner> currentScanners = new ArrayList<>();
171  // flush update lock
172  private final ReentrantLock flushLock = new ReentrantLock();
173  // lock for closing.
174  private final ReentrantLock closeLock = new ReentrantLock();
175
176  protected final long readPt;
177  private boolean topChanged = false;
178
179  // These are used to verify the state of the scanner during testing.
180  private static AtomicBoolean hasUpdatedReaders;
181  private static AtomicBoolean hasSwitchedToStreamRead;
182
183  /** An internal constructor. */
184  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
185    boolean cacheBlocks, ScanType scanType) {
186    this.readPt = readPt;
187    this.store = store;
188    this.cacheBlocks = cacheBlocks;
189    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
190    get = scan.isGetScan();
191    explicitColumnQuery = numColumns > 0;
192    this.scan = scan;
193    this.now = EnvironmentEdgeManager.currentTime();
194    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
195    this.minVersions = scanInfo.getMinVersions();
196
197    // We look up row-column Bloom filters for multi-column queries as part of
198    // the seek operation. However, we also look the row-column Bloom filter
199    // for multi-row (non-"get") scans because this is not done in
200    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
201    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null
202      || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
203    this.maxRowSize = scanInfo.getTableMaxRowSize();
204    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
205    if (get) {
206      this.readType = Scan.ReadType.PREAD;
207      this.scanUsePread = true;
208    } else if (scanType != ScanType.USER_SCAN) {
209      // For compaction scanners never use Pread as already we have stream based scanners on the
210      // store files to be compacted
211      this.readType = Scan.ReadType.STREAM;
212      this.scanUsePread = false;
213    } else {
214      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
215        if (scanInfo.isUsePread()) {
216          this.readType = Scan.ReadType.PREAD;
217        } else if (this.preadMaxBytes < 0) {
218          this.readType = Scan.ReadType.STREAM;
219        } else {
220          this.readType = Scan.ReadType.DEFAULT;
221        }
222      } else {
223        this.readType = scan.getReadType();
224      }
225      // Always start with pread unless user specific stream. Will change to stream later if
226      // readType is default if the scan keeps running for a long time.
227      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
228    }
229    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
230    // Parallel seeking is on if the config allows and more there is more than one store file.
231    if (store != null && store.getStorefilesCount() > 1) {
232      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
233      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
234        this.parallelSeekEnabled = true;
235        this.executor = rsService.getExecutorService();
236      }
237    }
238  }
239
240  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
241    this.currentScanners.addAll(scanners);
242  }
243
244  private static boolean isOnlyLatestVersionScan(Scan scan) {
245    // No need to check for Scan#getMaxVersions because live version files generated by store file
246    // writer retains max versions specified in ColumnFamilyDescriptor for the given CF
247    return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
248  }
249
250  /**
251   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a
252   * compaction.
253   * @param store   who we scan
254   * @param scan    the spec
255   * @param columns which columns we are scanning
256   */
257  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
258    long readPt) throws IOException {
259    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
260      ScanType.USER_SCAN);
261    if (columns != null && scan.isRaw()) {
262      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
263    }
264    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
265      store.getCoprocessorHost());
266
267    store.addChangedReaderObserver(this);
268
269    List<KeyValueScanner> scanners = null;
270    try {
271      // Pass columns to try to filter out unnecessary StoreFiles.
272      scanners = selectScannersFrom(store,
273        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
274          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt,
275          isOnlyLatestVersionScan(scan)));
276
277      // Seek all scanners to the start of the Row (or if the exact matching row
278      // key does not exist, then to the start of the next matching Row).
279      // Always check bloom filter to optimize the top row seek for delete
280      // family marker.
281
282      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
283        parallelSeekEnabled);
284
285      // set storeLimit
286      this.storeLimit = scan.getMaxResultsPerColumnFamily();
287
288      // set rowOffset
289      this.storeOffset = scan.getRowOffsetPerColumnFamily();
290      addCurrentScanners(scanners);
291      // Combine all seeked scanners with a heap
292      resetKVHeap(scanners, comparator);
293    } catch (IOException e) {
294      clearAndClose(scanners, false); // do not track files when closing due to exception
295      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
296      // and might cause memory leak
297      store.deleteChangedReaderObserver(this);
298      throw e;
299    }
300  }
301
302  // a dummy scan instance for compaction.
303  private static final Scan SCAN_FOR_COMPACTION = new Scan();
304
305  /**
306   * Used for store file compaction and memstore compaction.
307   * <p>
308   * Opens a scanner across specified StoreFiles/MemStoreSegments.
309   * @param store             who we scan
310   * @param scanners          ancillary scanners
311   * @param smallestReadPoint the readPoint that we should use for tracking versions
312   */
313  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
314    ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
315    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
316  }
317
318  /**
319   * Used for compactions that drop deletes from a limited range of rows.
320   * <p>
321   * Opens a scanner across specified StoreFiles.
322   * @param store              who we scan
323   * @param scanners           ancillary scanners
324   * @param smallestReadPoint  the readPoint that we should use for tracking versions
325   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
326   * @param dropDeletesToRow   The exclusive right bound of the range; can be EMPTY_END_ROW.
327   */
328  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
329    long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)
330    throws IOException {
331    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
332      earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
333  }
334
335  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
336    ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
337    byte[] dropDeletesToRow) throws IOException {
338    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
339      store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
340    assert scanType != ScanType.USER_SCAN;
341    matcher =
342      CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
343        oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
344
345    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
346    scanners = selectScannersFrom(store, scanners);
347
348    // Seek all scanners to the initial key
349    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
350    addCurrentScanners(scanners);
351    // Combine all seeked scanners with a heap
352    resetKVHeap(scanners, comparator);
353  }
354
355  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
356    throws IOException {
357    // Seek all scanners to the initial key
358    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
359    addCurrentScanners(scanners);
360    resetKVHeap(scanners, comparator);
361  }
362
363  // For mob compaction only as we do not have a Store instance when doing mob compaction.
364  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
365    List<? extends KeyValueScanner> scanners) throws IOException {
366    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
367    assert scanType != ScanType.USER_SCAN;
368    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
369      oldestUnexpiredTS, now, null, null, null);
370    seekAllScanner(scanInfo, scanners);
371  }
372
373  // Used to instantiate a scanner for user scan in test
374  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
375    List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
376    // 0 is passed as readpoint because the test bypasses Store
377    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
378      scanType);
379    if (scanType == ScanType.USER_SCAN) {
380      this.matcher =
381        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
382    } else {
383      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
384        PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
385    }
386    seekAllScanner(scanInfo, scanners);
387  }
388
389  // Used to instantiate a scanner for user scan in test
390  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
391    List<? extends KeyValueScanner> scanners) throws IOException {
392    // 0 is passed as readpoint because the test bypasses Store
393    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
394      ScanType.USER_SCAN);
395    this.matcher =
396      UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
397    seekAllScanner(scanInfo, scanners);
398  }
399
400  // Used to instantiate a scanner for compaction in test
401  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
402    List<? extends KeyValueScanner> scanners) throws IOException {
403    // 0 is passed as readpoint because the test bypasses Store
404    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,
405      scanInfo, 0, 0L, false, scanType);
406    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
407      PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
408    seekAllScanner(scanInfo, scanners);
409  }
410
411  boolean isScanUsePread() {
412    return this.scanUsePread;
413  }
414
415  /**
416   * Seek the specified scanners with the given key
417   * @param isLazy         true if using lazy seek
418   * @param isParallelSeek true if using parallel seek
419   */
420  protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey,
421    boolean isLazy, boolean isParallelSeek) throws IOException {
422    // Seek all scanners to the start of the Row (or if the exact matching row
423    // key does not exist, then to the start of the next matching Row).
424    // Always check bloom filter to optimize the top row seek for delete
425    // family marker.
426    if (isLazy) {
427      for (KeyValueScanner scanner : scanners) {
428        scanner.requestSeek(seekKey, false, true);
429      }
430    } else {
431      if (!isParallelSeek) {
432        long totalScannersSoughtBytes = 0;
433        for (KeyValueScanner scanner : scanners) {
434          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
435            throw new RowTooBigException(
436              "Max row size allowed: " + maxRowSize + ", but row is bigger than that");
437          }
438          scanner.seek(seekKey);
439          Cell c = scanner.peek();
440          if (c != null) {
441            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
442          }
443        }
444      } else {
445        parallelSeek(scanners, seekKey);
446      }
447    }
448  }
449
450  protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
451    throws IOException {
452    // Combine all seeked scanners with a heap
453    heap = newKVHeap(scanners, comparator);
454  }
455
456  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
457    CellComparator comparator) throws IOException {
458    return new KeyValueHeap(scanners, comparator);
459  }
460
461  /**
462   * Filters the given list of scanners using Bloom filter, time range, and TTL.
463   * <p>
464   * Will be overridden by testcase so declared as protected.
465   */
466  protected List<KeyValueScanner> selectScannersFrom(HStore store,
467    List<? extends KeyValueScanner> allScanners) {
468    boolean memOnly;
469    boolean filesOnly;
470    if (scan instanceof InternalScan) {
471      InternalScan iscan = (InternalScan) scan;
472      memOnly = iscan.isCheckOnlyMemStore();
473      filesOnly = iscan.isCheckOnlyStoreFiles();
474    } else {
475      memOnly = false;
476      filesOnly = false;
477    }
478    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
479
480    // We can only exclude store files based on TTL if minVersions is set to 0.
481    // Otherwise, we might have to return KVs that have technically expired.
482    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
483
484    // include only those scan files which pass all filters
485    for (KeyValueScanner kvs : allScanners) {
486      boolean isFile = kvs.isFileScanner();
487      if ((!isFile && filesOnly) || (isFile && memOnly)) {
488        kvs.close();
489        filesRead.addAll(kvs.getFilesRead());
490        continue;
491      }
492
493      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
494        scanners.add(kvs);
495      } else {
496        kvs.close();
497        filesRead.addAll(kvs.getFilesRead());
498      }
499    }
500    return scanners;
501  }
502
503  @Override
504  public ExtendedCell peek() {
505    return heap != null ? heap.peek() : null;
506  }
507
508  @Override
509  public KeyValue next() {
510    // throw runtime exception perhaps?
511    throw new RuntimeException("Never call StoreScanner.next()");
512  }
513
514  @Override
515  public void close() {
516    close(true);
517  }
518
519  private void close(boolean withDelayedScannersClose) {
520    closeLock.lock();
521    // If the closeLock is acquired then any subsequent updateReaders()
522    // call is ignored.
523    try {
524      if (this.closing) {
525        return;
526      }
527      if (withDelayedScannersClose) {
528        this.closing = true;
529      }
530      // For mob compaction, we do not have a store.
531      if (this.store != null) {
532        this.store.deleteChangedReaderObserver(this);
533      }
534      if (withDelayedScannersClose) {
535        clearAndClose(scannersForDelayedClose);
536        clearAndClose(memStoreScannersAfterFlush);
537        clearAndClose(flushedstoreFileScanners);
538        if (this.heap != null) {
539          this.heap.close();
540          this.filesRead.addAll(this.heap.getFilesRead());
541          this.currentScanners.clear();
542          this.heap = null; // CLOSED!
543        }
544      } else {
545        if (this.heap != null) {
546          this.scannersForDelayedClose.add(this.heap);
547          this.currentScanners.clear();
548          this.heap = null;
549        }
550      }
551    } finally {
552      closeLock.unlock();
553    }
554  }
555
556  @Override
557  public boolean seek(ExtendedCell key) throws IOException {
558    if (checkFlushed()) {
559      reopenAfterFlush();
560    }
561    return this.heap.seek(key);
562  }
563
564  /**
565   * Get the next row of values from this Store.
566   * @return true if there are more rows, false if scanner is done
567   */
568  @Override
569  public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext)
570    throws IOException {
571    if (scannerContext == null) {
572      throw new IllegalArgumentException("Scanner context cannot be null");
573    }
574    if (checkFlushed() && reopenAfterFlush()) {
575      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
576    }
577
578    // if the heap was left null, then the scanners had previously run out anyways, close and
579    // return.
580    if (this.heap == null) {
581      // By this time partial close should happened because already heap is null
582      close(false);// Do all cleanup except heap.close()
583      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
584    }
585
586    ExtendedCell cell = this.heap.peek();
587    if (cell == null) {
588      close(false);// Do all cleanup except heap.close()
589      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
590    }
591
592    // only call setRow if the row changes; avoids confusing the query matcher
593    // if scanning intra-row
594
595    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
596    // rows. Else it is possible we are still traversing the same row so we must perform the row
597    // comparison.
598    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
599      this.countPerRow = 0;
600      matcher.setToNewRow(cell);
601    }
602
603    // Clear progress away unless invoker has indicated it should be kept.
604    if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
605      scannerContext.clearProgress();
606    }
607
608    Optional<RpcCall> rpcCall =
609      matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
610    // re-useable closure to avoid allocations
611    IntConsumer recordBlockSize = blockSize -> {
612      if (rpcCall.isPresent()) {
613        rpcCall.get().incrementBlockBytesScanned(blockSize);
614      }
615      scannerContext.incrementBlockProgress(blockSize);
616    };
617
618    int count = 0;
619    long totalBytesRead = 0;
620    // track the cells for metrics only if it is a user read request.
621    boolean onlyFromMemstore = matcher.isUserScan();
622    try {
623      LOOP: do {
624        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
625        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
626        // in
627        // the shipped method below.
628        if (
629          kvsScanned % cellsPerHeartbeatCheck == 0
630            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
631        ) {
632          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
633            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
634          }
635        }
636        // Do object compare - we set prevKV from the same heap.
637        if (prevCell != cell) {
638          ++kvsScanned;
639        }
640        checkScanOrder(prevCell, cell, comparator);
641        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
642        bytesRead += cellSize;
643        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
644          // return immediately if we want to switch from pread to stream. We need this because we
645          // can
646          // only switch in the shipped method, if user use a filter to filter out everything and
647          // rpc
648          // timeout is very large then the shipped method will never be called until the whole scan
649          // is finished, but at that time we have already scan all the data...
650          // See HBASE-20457 for more details.
651          // And there is still a scenario that can not be handled. If we have a very large row,
652          // which
653          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
654          // here, we still need to scan all the qualifiers before returning...
655          scannerContext.returnImmediately();
656        }
657
658        heap.recordBlockSize(recordBlockSize);
659
660        prevCell = cell;
661        scannerContext.setLastPeekedCell(cell);
662        topChanged = false;
663        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
664        switch (qcode) {
665          case INCLUDE:
666          case INCLUDE_AND_SEEK_NEXT_ROW:
667          case INCLUDE_AND_SEEK_NEXT_COL:
668            Filter f = matcher.getFilter();
669            if (f != null) {
670              Cell transformedCell = f.transformCell(cell);
671              // fast path, most filters just return the same cell instance
672              if (transformedCell != cell) {
673                if (transformedCell instanceof ExtendedCell) {
674                  cell = (ExtendedCell) transformedCell;
675                } else {
676                  throw new DoNotRetryIOException("Incorrect filter implementation, "
677                    + "the Cell returned by transformCell is not an ExtendedCell. Filter class: "
678                    + f.getClass().getName());
679                }
680              }
681            }
682            this.countPerRow++;
683
684            // add to results only if we have skipped #storeOffset kvs
685            // also update metric accordingly
686            if (this.countPerRow > storeOffset) {
687              outResult.add(cell);
688
689              // Update local tracking information
690              count++;
691              totalBytesRead += cellSize;
692
693              /**
694               * Increment the metric if all the cells are from memstore. If not we will account it
695               * for mixed reads
696               */
697              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
698              // Update the progress of the scanner context
699              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
700              scannerContext.incrementBatchProgress(1);
701
702              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
703                String message = "Max row size allowed: " + maxRowSize
704                  + ", but the row is bigger than that, the row info: "
705                  + CellUtil.toString(cell, false) + ", already have process row cells = "
706                  + outResult.size() + ", it belong to region = "
707                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
708                LOG.warn(message);
709                throw new RowTooBigException(message);
710              }
711
712              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
713                // do what SEEK_NEXT_ROW does.
714                if (!matcher.moreRowsMayExistAfter(cell)) {
715                  close(false);// Do all cleanup except heap.close()
716                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
717                }
718                matcher.clearCurrentRow();
719                seekToNextRow(cell);
720                break LOOP;
721              }
722            }
723
724            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
725              if (!matcher.moreRowsMayExistAfter(cell)) {
726                close(false);// Do all cleanup except heap.close()
727                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
728              }
729              matcher.clearCurrentRow();
730              seekOrSkipToNextRow(cell);
731            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
732              seekOrSkipToNextColumn(cell);
733            } else {
734              this.heap.next();
735            }
736
737            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
738              break LOOP;
739            }
740            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
741              break LOOP;
742            }
743            continue;
744
745          case DONE:
746            // Optimization for Gets! If DONE, no more to get on this row, early exit!
747            if (get) {
748              // Then no more to this row... exit.
749              close(false);// Do all cleanup except heap.close()
750              // update metric
751              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
752            }
753            matcher.clearCurrentRow();
754            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
755
756          case DONE_SCAN:
757            close(false);// Do all cleanup except heap.close()
758            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
759
760          case SEEK_NEXT_ROW:
761            // This is just a relatively simple end of scan fix, to short-cut end
762            // us if there is an endKey in the scan.
763            if (!matcher.moreRowsMayExistAfter(cell)) {
764              close(false);// Do all cleanup except heap.close()
765              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
766            }
767            matcher.clearCurrentRow();
768            seekOrSkipToNextRow(cell);
769            NextState stateAfterSeekNextRow = needToReturn();
770            if (stateAfterSeekNextRow != null) {
771              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
772            }
773            break;
774
775          case SEEK_NEXT_COL:
776            seekOrSkipToNextColumn(cell);
777            NextState stateAfterSeekNextColumn = needToReturn();
778            if (stateAfterSeekNextColumn != null) {
779              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
780            }
781            break;
782
783          case SKIP:
784            this.heap.next();
785            break;
786
787          case SEEK_NEXT_USING_HINT:
788            ExtendedCell nextKV = matcher.getNextKeyHint(cell);
789            if (nextKV != null) {
790              int difference = comparator.compare(nextKV, cell);
791              if (
792                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
793              ) {
794                seekAsDirection(nextKV);
795                NextState stateAfterSeekByHint = needToReturn();
796                if (stateAfterSeekByHint != null) {
797                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
798                }
799                break;
800              }
801            }
802            heap.next();
803            break;
804
805          default:
806            throw new RuntimeException("UNEXPECTED");
807        }
808
809        // One last chance to break due to size limit. The INCLUDE* cases above already check
810        // limit and continue. For the various filtered cases, we need to check because block
811        // size limit may have been exceeded even if we don't add cells to result list.
812        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
813          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
814        }
815      } while ((cell = this.heap.peek()) != null);
816
817      if (count > 0) {
818        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
819      }
820
821      // No more keys
822      close(false);// Do all cleanup except heap.close()
823      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
824    } finally {
825      // increment only if we have some result
826      if (count > 0 && matcher.isUserScan()) {
827        // if true increment memstore metrics, if not the mixed one
828        updateMetricsStore(onlyFromMemstore);
829      }
830    }
831  }
832
833  private void updateMetricsStore(boolean memstoreRead) {
834    if (store != null) {
835      store.updateMetricsStore(memstoreRead);
836    } else {
837      // for testing.
838      if (memstoreRead) {
839        memstoreOnlyReads++;
840      } else {
841        mixedReads++;
842      }
843    }
844  }
845
846  /**
847   * If the top cell won't be flushed into disk, the new top cell may be changed after
848   * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the
849   * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
850   * is changed, we should return the current cells. Otherwise, we may return the cells across
851   * different rows.
852   * @return null is the top cell doesn't change. Otherwise, the NextState to return
853   */
854  private NextState needToReturn() {
855    if (topChanged) {
856      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
857    }
858    return null;
859  }
860
861  private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException {
862    // If it is a Get Scan, then we know that we are done with this row; there are no more
863    // rows beyond the current one: don't try to optimize.
864    if (!get) {
865      if (trySkipToNextRow(cell)) {
866        return;
867      }
868    }
869    seekToNextRow(cell);
870  }
871
872  private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException {
873    if (!trySkipToNextColumn(cell)) {
874      seekAsDirection(matcher.getKeyForNextColumn(cell));
875    }
876  }
877
878  /**
879   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
880   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an
881   * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to
882   * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
883   * current, loaded block). It does this by looking at the next indexed key of the current HFile.
884   * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible
885   * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work
886   * with the current Cell but compare as though it were a seek key; see down in
887   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,
888   * otherwise we just SKIP to the next requested cell.
889   * <p>
890   * Other notes:
891   * <ul>
892   * <li>Rows can straddle block boundaries</li>
893   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
894   * different block than column C1 at T2)</li>
895   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few
896   * SKIPs...</li>
897   * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells,
898   * especially if we know we need to go to the next block.</li>
899   * </ul>
900   * <p>
901   * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll
902   * likely end up seeking to the next block (or past the next block) to get our next column.
903   * Example:
904   *
905   * <pre>
906   * |    BLOCK 1              |     BLOCK 2                   |
907   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
908   *                                   ^         ^
909   *                                   |         |
910   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
911   *
912   *
913   * |    BLOCK 1                       |     BLOCK 2                      |
914   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
915   *                                            ^              ^
916   *                                            |              |
917   *                                    Next Index Key        SEEK_NEXT_COL
918   * </pre>
919   *
920   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
921   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
922   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
923   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
924   * where the SEEK will not land us in the next block, it is very likely better to issues a series
925   * of SKIPs.
926   * @param cell current cell
927   * @return true means skip to next row, false means not
928   */
929  protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException {
930    ExtendedCell nextCell = null;
931    // used to guard against a changed next indexed key by doing a identity comparison
932    // when the identity changes we need to compare the bytes again
933    ExtendedCell previousIndexedKey = null;
934    do {
935      ExtendedCell nextIndexedKey = getNextIndexedKey();
936      if (
937        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
938          && (nextIndexedKey == previousIndexedKey
939            || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)
940      ) {
941        this.heap.next();
942        ++kvsScanned;
943        previousIndexedKey = nextIndexedKey;
944      } else {
945        return false;
946      }
947    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
948    return true;
949  }
950
951  /**
952   * See {@link #trySkipToNextRow(ExtendedCell)}
953   * @param cell current cell
954   * @return true means skip to next column, false means not
955   */
956  protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException {
957    ExtendedCell nextCell = null;
958    // used to guard against a changed next indexed key by doing a identity comparison
959    // when the identity changes we need to compare the bytes again
960    ExtendedCell previousIndexedKey = null;
961    do {
962      ExtendedCell nextIndexedKey = getNextIndexedKey();
963      if (
964        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
965          && (nextIndexedKey == previousIndexedKey
966            || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)
967      ) {
968        this.heap.next();
969        ++kvsScanned;
970        previousIndexedKey = nextIndexedKey;
971      } else {
972        return false;
973      }
974    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
975    // We need this check because it may happen that the new scanner that we get
976    // during heap.next() is requiring reseek due of fake KV previously generated for
977    // ROWCOL bloom filter optimization. See HBASE-19863 and HBASE-29907 for more details
978    if (useRowColBloom && nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) {
979      return false;
980    }
981    return true;
982  }
983
984  @Override
985  public long getReadPoint() {
986    return this.readPt;
987  }
988
989  private void clearAndClose(List<KeyValueScanner> scanners) {
990    clearAndClose(scanners, true);
991  }
992
993  private void clearAndClose(List<KeyValueScanner> scanners, boolean trackFiles) {
994    if (scanners == null) {
995      return;
996    }
997    for (KeyValueScanner s : scanners) {
998      s.close();
999      if (trackFiles) {
1000        this.filesRead.addAll(s.getFilesRead());
1001      }
1002    }
1003    scanners.clear();
1004  }
1005
1006  // Implementation of ChangedReadersObserver
1007  @Override
1008  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
1009    throws IOException {
1010    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
1011      return;
1012    }
1013    boolean updateReaders = false;
1014    flushLock.lock();
1015    try {
1016      if (!closeLock.tryLock()) {
1017        // The reason for doing this is that when the current store scanner does not retrieve
1018        // any new cells, then the scanner is considered to be done. The heap of this scanner
1019        // is not closed till the shipped() call is completed. Hence in that case if at all
1020        // the partial close (close (false)) has been called before updateReaders(), there is no
1021        // need for the updateReaders() to happen.
1022        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
1023        // no lock acquired.
1024        clearAndClose(memStoreScanners);
1025        return;
1026      }
1027      // lock acquired
1028      updateReaders = true;
1029      if (this.closing) {
1030        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
1031        clearAndClose(memStoreScanners);
1032        return;
1033      }
1034      flushed = true;
1035      final boolean isCompaction = false;
1036      boolean usePread = get || scanUsePread;
1037      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
1038      // calls next(). So its better we create scanners here rather than next() call. Ensure
1039      // these scanners are properly closed() whether or not the scan is completed successfully
1040      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
1041      // store files. In case of stream scanners this eager creation does not induce performance
1042      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
1043      List<KeyValueScanner> scanners =
1044        store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher,
1045          scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan));
1046      flushedstoreFileScanners.addAll(scanners);
1047      if (!CollectionUtils.isEmpty(memStoreScanners)) {
1048        clearAndClose(memStoreScannersAfterFlush);
1049        memStoreScannersAfterFlush.addAll(memStoreScanners);
1050      }
1051    } finally {
1052      flushLock.unlock();
1053      if (updateReaders) {
1054        closeLock.unlock();
1055      }
1056      if (hasUpdatedReaders != null) {
1057        hasUpdatedReaders.set(true);
1058      }
1059    }
1060    // Let the next() call handle re-creating and seeking
1061  }
1062
1063  /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */
1064  protected final boolean reopenAfterFlush() throws IOException {
1065    // here we can make sure that we have a Store instance so no null check on store.
1066    ExtendedCell lastTop = heap.peek();
1067    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
1068    // scanners? We did so in the constructor and we could have done it now by storing the scan
1069    // object from the constructor
1070    List<KeyValueScanner> scanners;
1071    flushLock.lock();
1072    try {
1073      List<KeyValueScanner> allScanners =
1074        new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1075      allScanners.addAll(flushedstoreFileScanners);
1076      allScanners.addAll(memStoreScannersAfterFlush);
1077      scanners = selectScannersFrom(store, allScanners);
1078      // Clear the current set of flushed store files scanners so that they don't get added again
1079      flushedstoreFileScanners.clear();
1080      memStoreScannersAfterFlush.clear();
1081    } finally {
1082      flushLock.unlock();
1083    }
1084
1085    // Seek the new scanners to the last key
1086    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1087    // remove the older memstore scanner
1088    for (int i = currentScanners.size() - 1; i >= 0; i--) {
1089      if (!currentScanners.get(i).isFileScanner()) {
1090        scannersForDelayedClose.add(currentScanners.remove(i));
1091      } else {
1092        // we add the memstore scanner to the end of currentScanners
1093        break;
1094      }
1095    }
1096    // add the newly created scanners on the flushed files and the current active memstore scanner
1097    addCurrentScanners(scanners);
1098    // Combine all seeked scanners with a heap
1099    resetKVHeap(this.currentScanners, store.getComparator());
1100    resetQueryMatcher(lastTop);
1101    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1102      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()
1103        + ",and after = " + heap.peek());
1104      topChanged = true;
1105    } else {
1106      topChanged = false;
1107    }
1108    return topChanged;
1109  }
1110
1111  private void resetQueryMatcher(ExtendedCell lastTopKey) {
1112    // Reset the state of the Query Matcher and set to top row.
1113    // Only reset and call setRow if the row changes; avoids confusing the
1114    // query matcher if scanning intra-row.
1115    ExtendedCell cell = heap.peek();
1116    if (cell == null) {
1117      cell = lastTopKey;
1118    }
1119    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1120      this.countPerRow = 0;
1121      // The setToNewRow will call reset internally
1122      matcher.setToNewRow(cell);
1123    }
1124  }
1125
1126  /**
1127   * Check whether scan as expected order
1128   */
1129  protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)
1130    throws IOException {
1131    // Check that the heap gives us KVs in an increasing order.
1132    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0
1133      : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1134  }
1135
1136  protected boolean seekToNextRow(ExtendedCell c) throws IOException {
1137    return reseek(PrivateCellUtil.createLastOnRow(c));
1138  }
1139
1140  /**
1141   * Do a reseek in a normal StoreScanner(scan forward)
1142   * @return true if scanner has values left, false if end of scanner
1143   */
1144  protected boolean seekAsDirection(ExtendedCell kv) throws IOException {
1145    return reseek(kv);
1146  }
1147
1148  @Override
1149  public boolean reseek(ExtendedCell kv) throws IOException {
1150    if (checkFlushed()) {
1151      reopenAfterFlush();
1152    }
1153    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1154      return heap.requestSeek(kv, true, useRowColBloom);
1155    }
1156    return heap.reseek(kv);
1157  }
1158
1159  void trySwitchToStreamRead() {
1160    if (
1161      readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
1162        || bytesRead < preadMaxBytes
1163    ) {
1164      return;
1165    }
1166    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1167      this.store.getColumnFamilyName());
1168    scanUsePread = false;
1169    ExtendedCell lastTop = heap.peek();
1170    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1171    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1172    for (KeyValueScanner kvs : currentScanners) {
1173      if (!kvs.isFileScanner()) {
1174        // collect memstorescanners here
1175        memstoreScanners.add(kvs);
1176      } else {
1177        scannersToClose.add(kvs);
1178      }
1179    }
1180    List<KeyValueScanner> fileScanners = null;
1181    List<KeyValueScanner> newCurrentScanners;
1182    KeyValueHeap newHeap;
1183    try {
1184      // We must have a store instance here so no null check
1185      // recreate the scanners on the current file scanners
1186      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,
1187        scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
1188        readPt, false);
1189      if (fileScanners == null) {
1190        return;
1191      }
1192      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1193      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1194      newCurrentScanners.addAll(fileScanners);
1195      newCurrentScanners.addAll(memstoreScanners);
1196      newHeap = newKVHeap(newCurrentScanners, comparator);
1197    } catch (Exception e) {
1198      LOG.warn("failed to switch to stream read", e);
1199      if (fileScanners != null) {
1200        fileScanners.forEach(KeyValueScanner::close);
1201      }
1202      return;
1203    }
1204    currentScanners.clear();
1205    addCurrentScanners(newCurrentScanners);
1206    this.heap = newHeap;
1207    resetQueryMatcher(lastTop);
1208    for (KeyValueScanner scanner : scannersToClose) {
1209      scanner.close();
1210      this.filesRead.addAll(scanner.getFilesRead());
1211    }
1212    if (hasSwitchedToStreamRead != null) {
1213      hasSwitchedToStreamRead.set(true);
1214    }
1215  }
1216
1217  protected final boolean checkFlushed() {
1218    // check the var without any lock. Suppose even if we see the old
1219    // value here still it is ok to continue because we will not be resetting
1220    // the heap but will continue with the referenced memstore's snapshot. For compactions
1221    // any way we don't need the updateReaders at all to happen as we still continue with
1222    // the older files
1223    if (flushed) {
1224      // If there is a flush and the current scan is notified on the flush ensure that the
1225      // scan's heap gets reset and we do a seek on the newly flushed file.
1226      if (this.closing) {
1227        return false;
1228      }
1229      // reset the flag
1230      flushed = false;
1231      return true;
1232    }
1233    return false;
1234  }
1235
1236  /**
1237   * Seek storefiles in parallel to optimize IO latency as much as possible
1238   * @param scanners the list {@link KeyValueScanner}s to be read from
1239   * @param kv       the KeyValue on which the operation is being requested
1240   */
1241  private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv)
1242    throws IOException {
1243    if (scanners.isEmpty()) return;
1244    int storeFileScannerCount = scanners.size();
1245    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1246    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1247    for (KeyValueScanner scanner : scanners) {
1248      if (scanner instanceof StoreFileScanner) {
1249        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);
1250        executor.submit(seekHandler);
1251        handlers.add(seekHandler);
1252      } else {
1253        scanner.seek(kv);
1254        latch.countDown();
1255      }
1256    }
1257
1258    try {
1259      latch.await();
1260    } catch (InterruptedException ie) {
1261      throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
1262    }
1263
1264    for (ParallelSeekHandler handler : handlers) {
1265      if (handler.getErr() != null) {
1266        throw new IOException(handler.getErr());
1267      }
1268    }
1269  }
1270
1271  /**
1272   * Used in testing.
1273   * @return all scanners in no particular order
1274   */
1275  List<KeyValueScanner> getAllScannersForTesting() {
1276    List<KeyValueScanner> allScanners = new ArrayList<>();
1277    KeyValueScanner current = heap.getCurrentForTesting();
1278    if (current != null) allScanners.add(current);
1279    for (KeyValueScanner scanner : heap.getHeap())
1280      allScanners.add(scanner);
1281    return allScanners;
1282  }
1283
1284  static void enableLazySeekGlobally(boolean enable) {
1285    lazySeekEnabledGlobally = enable;
1286  }
1287
1288  /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */
1289  public long getEstimatedNumberOfKvsScanned() {
1290    return this.kvsScanned;
1291  }
1292
1293  /**
1294   * Returns the set of store file paths that were successfully read by this scanner. Populated at
1295   * close from the key-value heap and any closed child scanners.
1296   */
1297  @Override
1298  public Set<Path> getFilesRead() {
1299    return Collections.unmodifiableSet(filesRead);
1300  }
1301
1302  @Override
1303  public ExtendedCell getNextIndexedKey() {
1304    return this.heap.getNextIndexedKey();
1305  }
1306
1307  @Override
1308  public void shipped() throws IOException {
1309    if (prevCell != null) {
1310      // Do the copy here so that in case the prevCell ref is pointing to the previous
1311      // blocks we can safely release those blocks.
1312      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1313      // fetched from HDFS. Copying this would ensure that we let go the references to these
1314      // blocks so that they can be GCed safely(in case of bucket cache)
1315      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1316    }
1317    matcher.beforeShipped();
1318    // There wont be further fetch of Cells from these scanners. Just close.
1319    clearAndClose(scannersForDelayedClose);
1320    if (this.heap != null) {
1321      this.heap.shipped();
1322      // When switching from pread to stream, we will open a new scanner for each store file, but
1323      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1324      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1325      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1326      // method, so we here will also open new scanners and close old scanners in shipped method.
1327      // See HBASE-18055 for more details.
1328      trySwitchToStreamRead();
1329    }
1330  }
1331
1332  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1333  static final void instrument() {
1334    hasUpdatedReaders = new AtomicBoolean(false);
1335    hasSwitchedToStreamRead = new AtomicBoolean(false);
1336  }
1337
1338  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1339  static final boolean hasUpdatedReaders() {
1340    return hasUpdatedReaders.get();
1341  }
1342
1343  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1344  static final boolean hasSwitchedToStreamRead() {
1345    return hasSwitchedToStreamRead.get();
1346  }
1347
1348  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1349  static final void resetHasUpdatedReaders() {
1350    hasUpdatedReaders.set(false);
1351  }
1352
1353  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1354  static final void resetHasSwitchedToStreamRead() {
1355    hasSwitchedToStreamRead.set(false);
1356  }
1357}