001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.locks.ReentrantLock;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.KeyValueUtil;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.client.IsolationLevel;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.executor.ExecutorService;
038import org.apache.hadoop.hbase.filter.Filter;
039import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
040import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
041import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
042import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
043import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
044import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
051import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
052
053/**
054 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
055 * for a single row.
056 * <p>
057 * The implementation is not thread safe. So there will be no race between next and close. The only
058 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
059 * is a flush.
060 */
061@InterfaceAudience.Private
062public class StoreScanner extends NonReversedNonLazyKeyValueScanner
063  implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
064  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
065  // In unit tests, the store could be null
066  protected final HStore store;
067  private final CellComparator comparator;
068  private ScanQueryMatcher matcher;
069  protected KeyValueHeap heap;
070  private boolean cacheBlocks;
071
072  private long countPerRow = 0;
073  private int storeLimit = -1;
074  private int storeOffset = 0;
075
076  // Used to indicate that the scanner has closed (see HBASE-1107)
077  private volatile boolean closing = false;
078  private final boolean get;
079  private final boolean explicitColumnQuery;
080  private final boolean useRowColBloom;
081  /**
082   * A flag that enables StoreFileScanner parallel-seeking
083   */
084  private boolean parallelSeekEnabled = false;
085  private ExecutorService executor;
086  private final Scan scan;
087  private final long oldestUnexpiredTS;
088  private final long now;
089  private final int minVersions;
090  private final long maxRowSize;
091  private final long cellsPerHeartbeatCheck;
092  long memstoreOnlyReads;
093  long mixedReads;
094
095  // 1) Collects all the KVHeap that are eagerly getting closed during the
096  // course of a scan
097  // 2) Collects the unused memstore scanners. If we close the memstore scanners
098  // before sending data to client, the chunk may be reclaimed by other
099  // updates and the data will be corrupt.
100  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
101
102  /**
103   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via
104   * seeking to next row/column. TODO: estimate them?
105   */
106  private long kvsScanned = 0;
107  private Cell prevCell = null;
108
109  private final long preadMaxBytes;
110  private long bytesRead;
111
112  /** We don't ever expect to change this, the constant is just for clarity. */
113  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
114  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
115    "hbase.storescanner.parallel.seek.enable";
116
117  /** Used during unit testing to ensure that lazy seek does save seek ops */
118  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
119
120  /**
121   * The number of cells scanned in between timeout checks. Specifying a larger value means that
122   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
123   * timeout checks.
124   */
125  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
126    "hbase.cells.scanned.per.heartbeat.check";
127
128  /**
129   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
130   */
131  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
132
133  /**
134   * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
135   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
136   * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT,
137   * we will open scanner with stream mode itself.
138   */
139  public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
140
141  private final Scan.ReadType readType;
142
143  // A flag whether use pread for scan
144  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
145  private boolean scanUsePread;
146  // Indicates whether there was flush during the course of the scan
147  private volatile boolean flushed = false;
148  // generally we get one file from a flush
149  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
150  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
151  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
152  // The current list of scanners
153  final List<KeyValueScanner> currentScanners = new ArrayList<>();
154  // flush update lock
155  private final ReentrantLock flushLock = new ReentrantLock();
156  // lock for closing.
157  private final ReentrantLock closeLock = new ReentrantLock();
158
159  protected final long readPt;
160  private boolean topChanged = false;
161
162  /** An internal constructor. */
163  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
164    boolean cacheBlocks, ScanType scanType) {
165    this.readPt = readPt;
166    this.store = store;
167    this.cacheBlocks = cacheBlocks;
168    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
169    get = scan.isGetScan();
170    explicitColumnQuery = numColumns > 0;
171    this.scan = scan;
172    this.now = EnvironmentEdgeManager.currentTime();
173    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
174    this.minVersions = scanInfo.getMinVersions();
175
176    // We look up row-column Bloom filters for multi-column queries as part of
177    // the seek operation. However, we also look the row-column Bloom filter
178    // for multi-row (non-"get") scans because this is not done in
179    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
180    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null
181      || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
182    this.maxRowSize = scanInfo.getTableMaxRowSize();
183    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
184    if (get) {
185      this.readType = Scan.ReadType.PREAD;
186      this.scanUsePread = true;
187    } else if (scanType != ScanType.USER_SCAN) {
188      // For compaction scanners never use Pread as already we have stream based scanners on the
189      // store files to be compacted
190      this.readType = Scan.ReadType.STREAM;
191      this.scanUsePread = false;
192    } else {
193      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
194        if (scanInfo.isUsePread()) {
195          this.readType = Scan.ReadType.PREAD;
196        } else if (this.preadMaxBytes < 0) {
197          this.readType = Scan.ReadType.STREAM;
198        } else {
199          this.readType = Scan.ReadType.DEFAULT;
200        }
201      } else {
202        this.readType = scan.getReadType();
203      }
204      // Always start with pread unless user specific stream. Will change to stream later if
205      // readType is default if the scan keeps running for a long time.
206      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
207    }
208    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
209    // Parallel seeking is on if the config allows and more there is more than one store file.
210    if (store != null && store.getStorefilesCount() > 1) {
211      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
212      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
213        this.parallelSeekEnabled = true;
214        this.executor = rsService.getExecutorService();
215      }
216    }
217  }
218
219  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
220    this.currentScanners.addAll(scanners);
221  }
222
223  /**
224   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a
225   * compaction.
226   * @param store   who we scan
227   * @param scan    the spec
228   * @param columns which columns we are scanning n
229   */
230  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
231    long readPt) throws IOException {
232    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
233      ScanType.USER_SCAN);
234    if (columns != null && scan.isRaw()) {
235      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
236    }
237    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
238      store.getCoprocessorHost());
239
240    store.addChangedReaderObserver(this);
241
242    List<KeyValueScanner> scanners = null;
243    try {
244      // Pass columns to try to filter out unnecessary StoreFiles.
245      scanners = selectScannersFrom(store,
246        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
247          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
248
249      // Seek all scanners to the start of the Row (or if the exact matching row
250      // key does not exist, then to the start of the next matching Row).
251      // Always check bloom filter to optimize the top row seek for delete
252      // family marker.
253      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
254        parallelSeekEnabled);
255
256      // set storeLimit
257      this.storeLimit = scan.getMaxResultsPerColumnFamily();
258
259      // set rowOffset
260      this.storeOffset = scan.getRowOffsetPerColumnFamily();
261      addCurrentScanners(scanners);
262      // Combine all seeked scanners with a heap
263      resetKVHeap(scanners, comparator);
264    } catch (IOException e) {
265      clearAndClose(scanners);
266      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
267      // and might cause memory leak
268      store.deleteChangedReaderObserver(this);
269      throw e;
270    }
271  }
272
273  // a dummy scan instance for compaction.
274  private static final Scan SCAN_FOR_COMPACTION = new Scan();
275
276  /**
277   * Used for store file compaction and memstore compaction.
278   * <p>
279   * Opens a scanner across specified StoreFiles/MemStoreSegments.
280   * @param store             who we scan
281   * @param scanners          ancillary scanners
282   * @param smallestReadPoint the readPoint that we should use for tracking versions
283   */
284  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
285    ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
286    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
287  }
288
289  /**
290   * Used for compactions that drop deletes from a limited range of rows.
291   * <p>
292   * Opens a scanner across specified StoreFiles.
293   * @param store              who we scan
294   * @param scanners           ancillary scanners
295   * @param smallestReadPoint  the readPoint that we should use for tracking versions
296   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
297   * @param dropDeletesToRow   The exclusive right bound of the range; can be EMPTY_END_ROW.
298   */
299  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
300    long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)
301    throws IOException {
302    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
303      earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
304  }
305
306  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
307    ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
308    byte[] dropDeletesToRow) throws IOException {
309    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
310      store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
311    assert scanType != ScanType.USER_SCAN;
312    matcher =
313      CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
314        oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
315
316    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
317    scanners = selectScannersFrom(store, scanners);
318
319    // Seek all scanners to the initial key
320    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
321    addCurrentScanners(scanners);
322    // Combine all seeked scanners with a heap
323    resetKVHeap(scanners, comparator);
324  }
325
326  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
327    throws IOException {
328    // Seek all scanners to the initial key
329    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
330    addCurrentScanners(scanners);
331    resetKVHeap(scanners, comparator);
332  }
333
334  // For mob compaction only as we do not have a Store instance when doing mob compaction.
335  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
336    List<? extends KeyValueScanner> scanners) throws IOException {
337    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
338    assert scanType != ScanType.USER_SCAN;
339    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
340      oldestUnexpiredTS, now, null, null, null);
341    seekAllScanner(scanInfo, scanners);
342  }
343
344  // Used to instantiate a scanner for user scan in test
345  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
346    List<? extends KeyValueScanner> scanners) throws IOException {
347    // 0 is passed as readpoint because the test bypasses Store
348    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
349      ScanType.USER_SCAN);
350    this.matcher =
351      UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
352    seekAllScanner(scanInfo, scanners);
353  }
354
355  // Used to instantiate a scanner for user scan in test
356  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
357    List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
358    // 0 is passed as readpoint because the test bypasses Store
359    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
360      scanType);
361    if (scanType == ScanType.USER_SCAN) {
362      this.matcher =
363        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
364    } else {
365      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
366        HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
367    }
368    seekAllScanner(scanInfo, scanners);
369  }
370
371  // Used to instantiate a scanner for compaction in test
372  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
373    List<? extends KeyValueScanner> scanners) throws IOException {
374    // 0 is passed as readpoint because the test bypasses Store
375    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,
376      scanInfo, 0, 0L, false, scanType);
377    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
378      HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
379    seekAllScanner(scanInfo, scanners);
380  }
381
382  boolean isScanUsePread() {
383    return this.scanUsePread;
384  }
385
386  /**
387   * Seek the specified scanners with the given key nn * @param isLazy true if using lazy seek
388   * @param isParallelSeek true if using parallel seek n
389   */
390  protected void seekScanners(List<? extends KeyValueScanner> scanners, Cell seekKey,
391    boolean isLazy, boolean isParallelSeek) throws IOException {
392    // Seek all scanners to the start of the Row (or if the exact matching row
393    // key does not exist, then to the start of the next matching Row).
394    // Always check bloom filter to optimize the top row seek for delete
395    // family marker.
396    if (isLazy) {
397      for (KeyValueScanner scanner : scanners) {
398        scanner.requestSeek(seekKey, false, true);
399      }
400    } else {
401      if (!isParallelSeek) {
402        long totalScannersSoughtBytes = 0;
403        for (KeyValueScanner scanner : scanners) {
404          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
405            throw new RowTooBigException(
406              "Max row size allowed: " + maxRowSize + ", but row is bigger than that");
407          }
408          scanner.seek(seekKey);
409          Cell c = scanner.peek();
410          if (c != null) {
411            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
412          }
413        }
414      } else {
415        parallelSeek(scanners, seekKey);
416      }
417    }
418  }
419
420  protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
421    throws IOException {
422    // Combine all seeked scanners with a heap
423    heap = newKVHeap(scanners, comparator);
424  }
425
426  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
427    CellComparator comparator) throws IOException {
428    return new KeyValueHeap(scanners, comparator);
429  }
430
431  /**
432   * Filters the given list of scanners using Bloom filter, time range, and TTL.
433   * <p>
434   * Will be overridden by testcase so declared as protected.
435   */
436  protected List<KeyValueScanner> selectScannersFrom(HStore store,
437    List<? extends KeyValueScanner> allScanners) {
438    boolean memOnly;
439    boolean filesOnly;
440    if (scan instanceof InternalScan) {
441      InternalScan iscan = (InternalScan) scan;
442      memOnly = iscan.isCheckOnlyMemStore();
443      filesOnly = iscan.isCheckOnlyStoreFiles();
444    } else {
445      memOnly = false;
446      filesOnly = false;
447    }
448
449    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
450
451    // We can only exclude store files based on TTL if minVersions is set to 0.
452    // Otherwise, we might have to return KVs that have technically expired.
453    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
454
455    // include only those scan files which pass all filters
456    for (KeyValueScanner kvs : allScanners) {
457      boolean isFile = kvs.isFileScanner();
458      if ((!isFile && filesOnly) || (isFile && memOnly)) {
459        kvs.close();
460        continue;
461      }
462
463      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
464        scanners.add(kvs);
465      } else {
466        kvs.close();
467      }
468    }
469    return scanners;
470  }
471
472  @Override
473  public Cell peek() {
474    return heap != null ? heap.peek() : null;
475  }
476
477  @Override
478  public KeyValue next() {
479    // throw runtime exception perhaps?
480    throw new RuntimeException("Never call StoreScanner.next()");
481  }
482
483  @Override
484  public void close() {
485    close(true);
486  }
487
488  private void close(boolean withDelayedScannersClose) {
489    closeLock.lock();
490    // If the closeLock is acquired then any subsequent updateReaders()
491    // call is ignored.
492    try {
493      if (this.closing) {
494        return;
495      }
496      if (withDelayedScannersClose) {
497        this.closing = true;
498      }
499      // For mob compaction, we do not have a store.
500      if (this.store != null) {
501        this.store.deleteChangedReaderObserver(this);
502      }
503      if (withDelayedScannersClose) {
504        clearAndClose(scannersForDelayedClose);
505        clearAndClose(memStoreScannersAfterFlush);
506        clearAndClose(flushedstoreFileScanners);
507        if (this.heap != null) {
508          this.heap.close();
509          this.currentScanners.clear();
510          this.heap = null; // CLOSED!
511        }
512      } else {
513        if (this.heap != null) {
514          this.scannersForDelayedClose.add(this.heap);
515          this.currentScanners.clear();
516          this.heap = null;
517        }
518      }
519    } finally {
520      closeLock.unlock();
521    }
522  }
523
524  @Override
525  public boolean seek(Cell key) throws IOException {
526    if (checkFlushed()) {
527      reopenAfterFlush();
528    }
529    return this.heap.seek(key);
530  }
531
532  /**
533   * Get the next row of values from this Store. nn * @return true if there are more rows, false if
534   * scanner is done
535   */
536  @Override
537  public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
538    if (scannerContext == null) {
539      throw new IllegalArgumentException("Scanner context cannot be null");
540    }
541    if (checkFlushed() && reopenAfterFlush()) {
542      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
543    }
544
545    // if the heap was left null, then the scanners had previously run out anyways, close and
546    // return.
547    if (this.heap == null) {
548      // By this time partial close should happened because already heap is null
549      close(false);// Do all cleanup except heap.close()
550      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
551    }
552
553    Cell cell = this.heap.peek();
554    if (cell == null) {
555      close(false);// Do all cleanup except heap.close()
556      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
557    }
558
559    // only call setRow if the row changes; avoids confusing the query matcher
560    // if scanning intra-row
561
562    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
563    // rows. Else it is possible we are still traversing the same row so we must perform the row
564    // comparison.
565    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
566      this.countPerRow = 0;
567      matcher.setToNewRow(cell);
568    }
569
570    // Clear progress away unless invoker has indicated it should be kept.
571    if (!scannerContext.getKeepProgress()) {
572      scannerContext.clearProgress();
573    }
574
575    int count = 0;
576    long totalBytesRead = 0;
577    boolean onlyFromMemstore = matcher.isUserScan();
578    try {
579      LOOP: do {
580        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
581        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
582        // in
583        // the shipped method below.
584        if (
585          kvsScanned % cellsPerHeartbeatCheck == 0
586            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
587        ) {
588          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
589            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
590          }
591        }
592        // Do object compare - we set prevKV from the same heap.
593        if (prevCell != cell) {
594          ++kvsScanned;
595        }
596        checkScanOrder(prevCell, cell, comparator);
597        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
598        bytesRead += cellSize;
599        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
600          // return immediately if we want to switch from pread to stream. We need this because we
601          // can
602          // only switch in the shipped method, if user use a filter to filter out everything and
603          // rpc
604          // timeout is very large then the shipped method will never be called until the whole scan
605          // is finished, but at that time we have already scan all the data...
606          // See HBASE-20457 for more details.
607          // And there is still a scenario that can not be handled. If we have a very large row,
608          // which
609          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
610          // here, we still need to scan all the qualifiers before returning...
611          scannerContext.returnImmediately();
612        }
613        prevCell = cell;
614        scannerContext.setLastPeekedCell(cell);
615        topChanged = false;
616        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
617        switch (qcode) {
618          case INCLUDE:
619          case INCLUDE_AND_SEEK_NEXT_ROW:
620          case INCLUDE_AND_SEEK_NEXT_COL:
621            Filter f = matcher.getFilter();
622            if (f != null) {
623              cell = f.transformCell(cell);
624            }
625            this.countPerRow++;
626
627            // add to results only if we have skipped #storeOffset kvs
628            // also update metric accordingly
629            if (this.countPerRow > storeOffset) {
630              outResult.add(cell);
631
632              // Update local tracking information
633              count++;
634              totalBytesRead += cellSize;
635
636              /**
637               * Increment the metric if all the cells are from memstore. If not we will account it
638               * for mixed reads
639               */
640              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
641              // Update the progress of the scanner context
642              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
643              scannerContext.incrementBatchProgress(1);
644
645              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
646                String message = "Max row size allowed: " + maxRowSize
647                  + ", but the row is bigger than that, the row info: "
648                  + CellUtil.toString(cell, false) + ", already have process row cells = "
649                  + outResult.size() + ", it belong to region = "
650                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
651                LOG.warn(message);
652                throw new RowTooBigException(message);
653              }
654
655              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
656                // do what SEEK_NEXT_ROW does.
657                if (!matcher.moreRowsMayExistAfter(cell)) {
658                  close(false);// Do all cleanup except heap.close()
659                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
660                }
661                matcher.clearCurrentRow();
662                seekToNextRow(cell);
663                break LOOP;
664              }
665            }
666
667            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
668              if (!matcher.moreRowsMayExistAfter(cell)) {
669                close(false);// Do all cleanup except heap.close()
670                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
671              }
672              matcher.clearCurrentRow();
673              seekOrSkipToNextRow(cell);
674            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
675              seekOrSkipToNextColumn(cell);
676            } else {
677              this.heap.next();
678            }
679
680            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
681              break LOOP;
682            }
683            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
684              break LOOP;
685            }
686            continue;
687
688          case DONE:
689            // Optimization for Gets! If DONE, no more to get on this row, early exit!
690            if (get) {
691              // Then no more to this row... exit.
692              close(false);// Do all cleanup except heap.close()
693              // update metric
694              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
695            }
696            matcher.clearCurrentRow();
697            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
698
699          case DONE_SCAN:
700            close(false);// Do all cleanup except heap.close()
701            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
702
703          case SEEK_NEXT_ROW:
704            // This is just a relatively simple end of scan fix, to short-cut end
705            // us if there is an endKey in the scan.
706            if (!matcher.moreRowsMayExistAfter(cell)) {
707              close(false);// Do all cleanup except heap.close()
708              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
709            }
710            matcher.clearCurrentRow();
711            seekOrSkipToNextRow(cell);
712            NextState stateAfterSeekNextRow = needToReturn(outResult);
713            if (stateAfterSeekNextRow != null) {
714              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
715            }
716            break;
717
718          case SEEK_NEXT_COL:
719            seekOrSkipToNextColumn(cell);
720            NextState stateAfterSeekNextColumn = needToReturn(outResult);
721            if (stateAfterSeekNextColumn != null) {
722              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
723            }
724            break;
725
726          case SKIP:
727            this.heap.next();
728            break;
729
730          case SEEK_NEXT_USING_HINT:
731            Cell nextKV = matcher.getNextKeyHint(cell);
732            if (nextKV != null) {
733              int difference = comparator.compare(nextKV, cell);
734              if (
735                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
736              ) {
737                seekAsDirection(nextKV);
738                NextState stateAfterSeekByHint = needToReturn(outResult);
739                if (stateAfterSeekByHint != null) {
740                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
741                }
742                break;
743              }
744            }
745            heap.next();
746            break;
747
748          default:
749            throw new RuntimeException("UNEXPECTED");
750        }
751      } while ((cell = this.heap.peek()) != null);
752
753      if (count > 0) {
754        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
755      }
756
757      // No more keys
758      close(false);// Do all cleanup except heap.close()
759      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
760    } finally {
761      // increment only if we have some result
762      if (count > 0 && matcher.isUserScan()) {
763        // if true increment memstore metrics, if not the mixed one
764        updateMetricsStore(onlyFromMemstore);
765      }
766    }
767  }
768
769  private void updateMetricsStore(boolean memstoreRead) {
770    if (store != null) {
771      store.updateMetricsStore(memstoreRead);
772    } else {
773      // for testing.
774      if (memstoreRead) {
775        memstoreOnlyReads++;
776      } else {
777        mixedReads++;
778      }
779    }
780  }
781
782  /**
783   * If the top cell won't be flushed into disk, the new top cell may be changed after
784   * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the
785   * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
786   * is changed, we should return the current cells. Otherwise, we may return the cells across
787   * different rows.
788   * @param outResult the cells which are visible for user scan
789   * @return null is the top cell doesn't change. Otherwise, the NextState to return
790   */
791  private NextState needToReturn(List<Cell> outResult) {
792    if (!outResult.isEmpty() && topChanged) {
793      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
794    }
795    return null;
796  }
797
798  private void seekOrSkipToNextRow(Cell cell) throws IOException {
799    // If it is a Get Scan, then we know that we are done with this row; there are no more
800    // rows beyond the current one: don't try to optimize.
801    if (!get) {
802      if (trySkipToNextRow(cell)) {
803        return;
804      }
805    }
806    seekToNextRow(cell);
807  }
808
809  private void seekOrSkipToNextColumn(Cell cell) throws IOException {
810    if (!trySkipToNextColumn(cell)) {
811      seekAsDirection(matcher.getKeyForNextColumn(cell));
812    }
813  }
814
815  /**
816   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
817   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an
818   * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to
819   * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
820   * current, loaded block). It does this by looking at the next indexed key of the current HFile.
821   * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible
822   * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work
823   * with the current Cell but compare as though it were a seek key; see down in
824   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,
825   * otherwise we just SKIP to the next requested cell.
826   * <p>
827   * Other notes:
828   * <ul>
829   * <li>Rows can straddle block boundaries</li>
830   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
831   * different block than column C1 at T2)</li>
832   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few
833   * SKIPs...</li>
834   * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells,
835   * especially if we know we need to go to the next block.</li>
836   * </ul>
837   * <p>
838   * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll
839   * likely end up seeking to the next block (or past the next block) to get our next column.
840   * Example:
841   *
842   * <pre>
843   * |    BLOCK 1              |     BLOCK 2                   |
844   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
845   *                                   ^         ^
846   *                                   |         |
847   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
848   *
849   *
850   * |    BLOCK 1                       |     BLOCK 2                      |
851   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
852   *                                            ^              ^
853   *                                            |              |
854   *                                    Next Index Key        SEEK_NEXT_COL
855   * </pre>
856   *
857   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
858   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
859   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
860   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
861   * where the SEEK will not land us in the next block, it is very likely better to issues a series
862   * of SKIPs.
863   * @param cell current cell
864   * @return true means skip to next row, false means not
865   */
866  protected boolean trySkipToNextRow(Cell cell) throws IOException {
867    Cell nextCell = null;
868    // used to guard against a changed next indexed key by doing a identity comparison
869    // when the identity changes we need to compare the bytes again
870    Cell previousIndexedKey = null;
871    do {
872      Cell nextIndexedKey = getNextIndexedKey();
873      if (
874        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
875          && (nextIndexedKey == previousIndexedKey
876            || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)
877      ) {
878        this.heap.next();
879        ++kvsScanned;
880        previousIndexedKey = nextIndexedKey;
881      } else {
882        return false;
883      }
884    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
885    return true;
886  }
887
888  /**
889   * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)}
890   * @param cell current cell
891   * @return true means skip to next column, false means not
892   */
893  protected boolean trySkipToNextColumn(Cell cell) throws IOException {
894    Cell nextCell = null;
895    // used to guard against a changed next indexed key by doing a identity comparison
896    // when the identity changes we need to compare the bytes again
897    Cell previousIndexedKey = null;
898    do {
899      Cell nextIndexedKey = getNextIndexedKey();
900      if (
901        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
902          && (nextIndexedKey == previousIndexedKey
903            || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)
904      ) {
905        this.heap.next();
906        ++kvsScanned;
907        previousIndexedKey = nextIndexedKey;
908      } else {
909        return false;
910      }
911    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
912    // We need this check because it may happen that the new scanner that we get
913    // during heap.next() is requiring reseek due of fake KV previously generated for
914    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
915    if (useRowColBloom && nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) {
916      return false;
917    }
918    return true;
919  }
920
921  @Override
922  public long getReadPoint() {
923    return this.readPt;
924  }
925
926  private static void clearAndClose(List<KeyValueScanner> scanners) {
927    if (scanners == null) {
928      return;
929    }
930    for (KeyValueScanner s : scanners) {
931      s.close();
932    }
933    scanners.clear();
934  }
935
936  // Implementation of ChangedReadersObserver
937  @Override
938  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
939    throws IOException {
940    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
941      return;
942    }
943    boolean updateReaders = false;
944    flushLock.lock();
945    try {
946      if (!closeLock.tryLock()) {
947        // The reason for doing this is that when the current store scanner does not retrieve
948        // any new cells, then the scanner is considered to be done. The heap of this scanner
949        // is not closed till the shipped() call is completed. Hence in that case if at all
950        // the partial close (close (false)) has been called before updateReaders(), there is no
951        // need for the updateReaders() to happen.
952        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
953        // no lock acquired.
954        clearAndClose(memStoreScanners);
955        return;
956      }
957      // lock acquired
958      updateReaders = true;
959      if (this.closing) {
960        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
961        clearAndClose(memStoreScanners);
962        return;
963      }
964      flushed = true;
965      final boolean isCompaction = false;
966      boolean usePread = get || scanUsePread;
967      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
968      // calls next(). So its better we create scanners here rather than next() call. Ensure
969      // these scanners are properly closed() whether or not the scan is completed successfully
970      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
971      // store files. In case of stream scanners this eager creation does not induce performance
972      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
973      List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
974        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
975      flushedstoreFileScanners.addAll(scanners);
976      if (!CollectionUtils.isEmpty(memStoreScanners)) {
977        clearAndClose(memStoreScannersAfterFlush);
978        memStoreScannersAfterFlush.addAll(memStoreScanners);
979      }
980    } finally {
981      flushLock.unlock();
982      if (updateReaders) {
983        closeLock.unlock();
984      }
985    }
986    // Let the next() call handle re-creating and seeking
987  }
988
989  /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */
990  protected final boolean reopenAfterFlush() throws IOException {
991    // here we can make sure that we have a Store instance so no null check on store.
992    Cell lastTop = heap.peek();
993    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
994    // scanners? We did so in the constructor and we could have done it now by storing the scan
995    // object from the constructor
996    List<KeyValueScanner> scanners;
997    flushLock.lock();
998    try {
999      List<KeyValueScanner> allScanners =
1000        new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1001      allScanners.addAll(flushedstoreFileScanners);
1002      allScanners.addAll(memStoreScannersAfterFlush);
1003      scanners = selectScannersFrom(store, allScanners);
1004      // Clear the current set of flushed store files scanners so that they don't get added again
1005      flushedstoreFileScanners.clear();
1006      memStoreScannersAfterFlush.clear();
1007    } finally {
1008      flushLock.unlock();
1009    }
1010
1011    // Seek the new scanners to the last key
1012    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1013    // remove the older memstore scanner
1014    for (int i = currentScanners.size() - 1; i >= 0; i--) {
1015      if (!currentScanners.get(i).isFileScanner()) {
1016        scannersForDelayedClose.add(currentScanners.remove(i));
1017      } else {
1018        // we add the memstore scanner to the end of currentScanners
1019        break;
1020      }
1021    }
1022    // add the newly created scanners on the flushed files and the current active memstore scanner
1023    addCurrentScanners(scanners);
1024    // Combine all seeked scanners with a heap
1025    resetKVHeap(this.currentScanners, store.getComparator());
1026    resetQueryMatcher(lastTop);
1027    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1028      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()
1029        + ",and after = " + heap.peek());
1030      topChanged = true;
1031    } else {
1032      topChanged = false;
1033    }
1034    return topChanged;
1035  }
1036
1037  private void resetQueryMatcher(Cell lastTopKey) {
1038    // Reset the state of the Query Matcher and set to top row.
1039    // Only reset and call setRow if the row changes; avoids confusing the
1040    // query matcher if scanning intra-row.
1041    Cell cell = heap.peek();
1042    if (cell == null) {
1043      cell = lastTopKey;
1044    }
1045    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1046      this.countPerRow = 0;
1047      // The setToNewRow will call reset internally
1048      matcher.setToNewRow(cell);
1049    }
1050  }
1051
1052  /**
1053   * Check whether scan as expected order nnnn
1054   */
1055  protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)
1056    throws IOException {
1057    // Check that the heap gives us KVs in an increasing order.
1058    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0
1059      : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1060  }
1061
1062  protected boolean seekToNextRow(Cell c) throws IOException {
1063    return reseek(PrivateCellUtil.createLastOnRow(c));
1064  }
1065
1066  /**
1067   * Do a reseek in a normal StoreScanner(scan forward) n * @return true if scanner has values left,
1068   * false if end of scanner n
1069   */
1070  protected boolean seekAsDirection(Cell kv) throws IOException {
1071    return reseek(kv);
1072  }
1073
1074  @Override
1075  public boolean reseek(Cell kv) throws IOException {
1076    if (checkFlushed()) {
1077      reopenAfterFlush();
1078    }
1079    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1080      return heap.requestSeek(kv, true, useRowColBloom);
1081    }
1082    return heap.reseek(kv);
1083  }
1084
1085  void trySwitchToStreamRead() {
1086    if (
1087      readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
1088        || bytesRead < preadMaxBytes
1089    ) {
1090      return;
1091    }
1092    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1093      this.store.getColumnFamilyName());
1094    scanUsePread = false;
1095    Cell lastTop = heap.peek();
1096    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1097    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1098    for (KeyValueScanner kvs : currentScanners) {
1099      if (!kvs.isFileScanner()) {
1100        // collect memstorescanners here
1101        memstoreScanners.add(kvs);
1102      } else {
1103        scannersToClose.add(kvs);
1104      }
1105    }
1106    List<KeyValueScanner> fileScanners = null;
1107    List<KeyValueScanner> newCurrentScanners;
1108    KeyValueHeap newHeap;
1109    try {
1110      // We must have a store instance here so no null check
1111      // recreate the scanners on the current file scanners
1112      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,
1113        scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
1114        readPt, false);
1115      if (fileScanners == null) {
1116        return;
1117      }
1118      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1119      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1120      newCurrentScanners.addAll(fileScanners);
1121      newCurrentScanners.addAll(memstoreScanners);
1122      newHeap = newKVHeap(newCurrentScanners, comparator);
1123    } catch (Exception e) {
1124      LOG.warn("failed to switch to stream read", e);
1125      if (fileScanners != null) {
1126        fileScanners.forEach(KeyValueScanner::close);
1127      }
1128      return;
1129    }
1130    currentScanners.clear();
1131    addCurrentScanners(newCurrentScanners);
1132    this.heap = newHeap;
1133    resetQueryMatcher(lastTop);
1134    scannersToClose.forEach(KeyValueScanner::close);
1135  }
1136
1137  protected final boolean checkFlushed() {
1138    // check the var without any lock. Suppose even if we see the old
1139    // value here still it is ok to continue because we will not be resetting
1140    // the heap but will continue with the referenced memstore's snapshot. For compactions
1141    // any way we don't need the updateReaders at all to happen as we still continue with
1142    // the older files
1143    if (flushed) {
1144      // If there is a flush and the current scan is notified on the flush ensure that the
1145      // scan's heap gets reset and we do a seek on the newly flushed file.
1146      if (this.closing) {
1147        return false;
1148      }
1149      // reset the flag
1150      flushed = false;
1151      return true;
1152    }
1153    return false;
1154  }
1155
1156  /**
1157   * Seek storefiles in parallel to optimize IO latency as much as possible
1158   * @param scanners the list {@link KeyValueScanner}s to be read from
1159   * @param kv       the KeyValue on which the operation is being requested n
1160   */
1161  private void parallelSeek(final List<? extends KeyValueScanner> scanners, final Cell kv)
1162    throws IOException {
1163    if (scanners.isEmpty()) return;
1164    int storeFileScannerCount = scanners.size();
1165    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1166    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1167    for (KeyValueScanner scanner : scanners) {
1168      if (scanner instanceof StoreFileScanner) {
1169        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);
1170        executor.submit(seekHandler);
1171        handlers.add(seekHandler);
1172      } else {
1173        scanner.seek(kv);
1174        latch.countDown();
1175      }
1176    }
1177
1178    try {
1179      latch.await();
1180    } catch (InterruptedException ie) {
1181      throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
1182    }
1183
1184    for (ParallelSeekHandler handler : handlers) {
1185      if (handler.getErr() != null) {
1186        throw new IOException(handler.getErr());
1187      }
1188    }
1189  }
1190
1191  /**
1192   * Used in testing.
1193   * @return all scanners in no particular order
1194   */
1195  List<KeyValueScanner> getAllScannersForTesting() {
1196    List<KeyValueScanner> allScanners = new ArrayList<>();
1197    KeyValueScanner current = heap.getCurrentForTesting();
1198    if (current != null) allScanners.add(current);
1199    for (KeyValueScanner scanner : heap.getHeap())
1200      allScanners.add(scanner);
1201    return allScanners;
1202  }
1203
1204  static void enableLazySeekGlobally(boolean enable) {
1205    lazySeekEnabledGlobally = enable;
1206  }
1207
1208  /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */
1209  public long getEstimatedNumberOfKvsScanned() {
1210    return this.kvsScanned;
1211  }
1212
1213  @Override
1214  public Cell getNextIndexedKey() {
1215    return this.heap.getNextIndexedKey();
1216  }
1217
1218  @Override
1219  public void shipped() throws IOException {
1220    if (prevCell != null) {
1221      // Do the copy here so that in case the prevCell ref is pointing to the previous
1222      // blocks we can safely release those blocks.
1223      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1224      // fetched from HDFS. Copying this would ensure that we let go the references to these
1225      // blocks so that they can be GCed safely(in case of bucket cache)
1226      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1227    }
1228    matcher.beforeShipped();
1229    // There wont be further fetch of Cells from these scanners. Just close.
1230    clearAndClose(scannersForDelayedClose);
1231    if (this.heap != null) {
1232      this.heap.shipped();
1233      // When switching from pread to stream, we will open a new scanner for each store file, but
1234      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1235      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1236      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1237      // method, so we here will also open new scanners and close old scanners in shipped method.
1238      // See HBASE-18055 for more details.
1239      trySwitchToStreamRead();
1240    }
1241  }
1242}