001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.Optional;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.locks.ReentrantLock;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.KeyValueUtil;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.PrivateConstants;
036import org.apache.hadoop.hbase.client.IsolationLevel;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.executor.ExecutorService;
039import org.apache.hadoop.hbase.filter.Filter;
040import org.apache.hadoop.hbase.ipc.RpcCall;
041import org.apache.hadoop.hbase.ipc.RpcServer;
042import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
043import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
044import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
045import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
046import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
047import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
055
056/**
057 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
058 * for a single row.
059 * <p>
060 * The implementation is not thread safe. So there will be no race between next and close. The only
061 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
062 * is a flush.
063 */
064@InterfaceAudience.Private
065public class StoreScanner extends NonReversedNonLazyKeyValueScanner
066  implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
067  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
068  // In unit tests, the store could be null
069  protected final HStore store;
070  private final CellComparator comparator;
071  private ScanQueryMatcher matcher;
072  protected KeyValueHeap heap;
073  private boolean cacheBlocks;
074
075  private long countPerRow = 0;
076  private int storeLimit = -1;
077  private int storeOffset = 0;
078
079  // Used to indicate that the scanner has closed (see HBASE-1107)
080  private volatile boolean closing = false;
081  private final boolean get;
082  private final boolean explicitColumnQuery;
083  private final boolean useRowColBloom;
084  /**
085   * A flag that enables StoreFileScanner parallel-seeking
086   */
087  private boolean parallelSeekEnabled = false;
088  private ExecutorService executor;
089  private final Scan scan;
090  private final long oldestUnexpiredTS;
091  private final long now;
092  private final int minVersions;
093  private final long maxRowSize;
094  private final long cellsPerHeartbeatCheck;
095  long memstoreOnlyReads;
096  long mixedReads;
097
098  // 1) Collects all the KVHeap that are eagerly getting closed during the
099  // course of a scan
100  // 2) Collects the unused memstore scanners. If we close the memstore scanners
101  // before sending data to client, the chunk may be reclaimed by other
102  // updates and the data will be corrupt.
103  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
104
105  /**
106   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via
107   * seeking to next row/column. TODO: estimate them?
108   */
109  private long kvsScanned = 0;
110  private Cell prevCell = null;
111
112  private final long preadMaxBytes;
113  private long bytesRead;
114
115  /** We don't ever expect to change this, the constant is just for clarity. */
116  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
117  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
118    "hbase.storescanner.parallel.seek.enable";
119
120  /** Used during unit testing to ensure that lazy seek does save seek ops */
121  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
122
123  /**
124   * The number of cells scanned in between timeout checks. Specifying a larger value means that
125   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
126   * timeout checks.
127   */
128  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
129    "hbase.cells.scanned.per.heartbeat.check";
130
131  /**
132   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
133   */
134  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
135
136  /**
137   * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
138   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
139   * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT,
140   * we will open scanner with stream mode itself.
141   */
142  public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
143
144  private final Scan.ReadType readType;
145
146  // A flag whether use pread for scan
147  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
148  private boolean scanUsePread;
149  // Indicates whether there was flush during the course of the scan
150  private volatile boolean flushed = false;
151  // generally we get one file from a flush
152  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
153  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
154  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
155  // The current list of scanners
156  final List<KeyValueScanner> currentScanners = new ArrayList<>();
157  // flush update lock
158  private final ReentrantLock flushLock = new ReentrantLock();
159  // lock for closing.
160  private final ReentrantLock closeLock = new ReentrantLock();
161
162  protected final long readPt;
163  private boolean topChanged = false;
164
165  /** An internal constructor. */
166  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
167    boolean cacheBlocks, ScanType scanType) {
168    this.readPt = readPt;
169    this.store = store;
170    this.cacheBlocks = cacheBlocks;
171    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
172    get = scan.isGetScan();
173    explicitColumnQuery = numColumns > 0;
174    this.scan = scan;
175    this.now = EnvironmentEdgeManager.currentTime();
176    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
177    this.minVersions = scanInfo.getMinVersions();
178
179    // We look up row-column Bloom filters for multi-column queries as part of
180    // the seek operation. However, we also look the row-column Bloom filter
181    // for multi-row (non-"get") scans because this is not done in
182    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
183    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null
184      || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
185    this.maxRowSize = scanInfo.getTableMaxRowSize();
186    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
187    if (get) {
188      this.readType = Scan.ReadType.PREAD;
189      this.scanUsePread = true;
190    } else if (scanType != ScanType.USER_SCAN) {
191      // For compaction scanners never use Pread as already we have stream based scanners on the
192      // store files to be compacted
193      this.readType = Scan.ReadType.STREAM;
194      this.scanUsePread = false;
195    } else {
196      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
197        if (scanInfo.isUsePread()) {
198          this.readType = Scan.ReadType.PREAD;
199        } else if (this.preadMaxBytes < 0) {
200          this.readType = Scan.ReadType.STREAM;
201        } else {
202          this.readType = Scan.ReadType.DEFAULT;
203        }
204      } else {
205        this.readType = scan.getReadType();
206      }
207      // Always start with pread unless user specific stream. Will change to stream later if
208      // readType is default if the scan keeps running for a long time.
209      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
210    }
211    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
212    // Parallel seeking is on if the config allows and more there is more than one store file.
213    if (store != null && store.getStorefilesCount() > 1) {
214      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
215      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
216        this.parallelSeekEnabled = true;
217        this.executor = rsService.getExecutorService();
218      }
219    }
220  }
221
222  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
223    this.currentScanners.addAll(scanners);
224  }
225
226  /**
227   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a
228   * compaction.
229   * @param store   who we scan
230   * @param scan    the spec
231   * @param columns which columns we are scanning
232   */
233  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
234    long readPt) throws IOException {
235    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
236      ScanType.USER_SCAN);
237    if (columns != null && scan.isRaw()) {
238      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
239    }
240    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
241      store.getCoprocessorHost());
242
243    store.addChangedReaderObserver(this);
244
245    List<KeyValueScanner> scanners = null;
246    try {
247      // Pass columns to try to filter out unnecessary StoreFiles.
248      scanners = selectScannersFrom(store,
249        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
250          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
251
252      // Seek all scanners to the start of the Row (or if the exact matching row
253      // key does not exist, then to the start of the next matching Row).
254      // Always check bloom filter to optimize the top row seek for delete
255      // family marker.
256      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
257        parallelSeekEnabled);
258
259      // set storeLimit
260      this.storeLimit = scan.getMaxResultsPerColumnFamily();
261
262      // set rowOffset
263      this.storeOffset = scan.getRowOffsetPerColumnFamily();
264      addCurrentScanners(scanners);
265      // Combine all seeked scanners with a heap
266      resetKVHeap(scanners, comparator);
267    } catch (IOException e) {
268      clearAndClose(scanners);
269      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
270      // and might cause memory leak
271      store.deleteChangedReaderObserver(this);
272      throw e;
273    }
274  }
275
276  // a dummy scan instance for compaction.
277  private static final Scan SCAN_FOR_COMPACTION = new Scan();
278
279  /**
280   * Used for store file compaction and memstore compaction.
281   * <p>
282   * Opens a scanner across specified StoreFiles/MemStoreSegments.
283   * @param store             who we scan
284   * @param scanners          ancillary scanners
285   * @param smallestReadPoint the readPoint that we should use for tracking versions
286   */
287  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
288    ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
289    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
290  }
291
292  /**
293   * Used for compactions that drop deletes from a limited range of rows.
294   * <p>
295   * Opens a scanner across specified StoreFiles.
296   * @param store              who we scan
297   * @param scanners           ancillary scanners
298   * @param smallestReadPoint  the readPoint that we should use for tracking versions
299   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
300   * @param dropDeletesToRow   The exclusive right bound of the range; can be EMPTY_END_ROW.
301   */
302  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
303    long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)
304    throws IOException {
305    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
306      earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
307  }
308
309  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
310    ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
311    byte[] dropDeletesToRow) throws IOException {
312    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
313      store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
314    assert scanType != ScanType.USER_SCAN;
315    matcher =
316      CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
317        oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
318
319    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
320    scanners = selectScannersFrom(store, scanners);
321
322    // Seek all scanners to the initial key
323    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
324    addCurrentScanners(scanners);
325    // Combine all seeked scanners with a heap
326    resetKVHeap(scanners, comparator);
327  }
328
329  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
330    throws IOException {
331    // Seek all scanners to the initial key
332    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
333    addCurrentScanners(scanners);
334    resetKVHeap(scanners, comparator);
335  }
336
337  // For mob compaction only as we do not have a Store instance when doing mob compaction.
338  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
339    List<? extends KeyValueScanner> scanners) throws IOException {
340    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
341    assert scanType != ScanType.USER_SCAN;
342    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
343      oldestUnexpiredTS, now, null, null, null);
344    seekAllScanner(scanInfo, scanners);
345  }
346
347  // Used to instantiate a scanner for user scan in test
348  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
349    List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
350    // 0 is passed as readpoint because the test bypasses Store
351    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
352      scanType);
353    if (scanType == ScanType.USER_SCAN) {
354      this.matcher =
355        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
356    } else {
357      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
358        PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
359    }
360    seekAllScanner(scanInfo, scanners);
361  }
362
363  // Used to instantiate a scanner for user scan in test
364  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
365    List<? extends KeyValueScanner> scanners) throws IOException {
366    // 0 is passed as readpoint because the test bypasses Store
367    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
368      ScanType.USER_SCAN);
369    this.matcher =
370      UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
371    seekAllScanner(scanInfo, scanners);
372  }
373
374  // Used to instantiate a scanner for compaction in test
375  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
376    List<? extends KeyValueScanner> scanners) throws IOException {
377    // 0 is passed as readpoint because the test bypasses Store
378    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,
379      scanInfo, 0, 0L, false, scanType);
380    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
381      PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
382    seekAllScanner(scanInfo, scanners);
383  }
384
385  boolean isScanUsePread() {
386    return this.scanUsePread;
387  }
388
389  /**
390   * Seek the specified scanners with the given key
391   * @param isLazy         true if using lazy seek
392   * @param isParallelSeek true if using parallel seek
393   */
394  protected void seekScanners(List<? extends KeyValueScanner> scanners, Cell seekKey,
395    boolean isLazy, boolean isParallelSeek) throws IOException {
396    // Seek all scanners to the start of the Row (or if the exact matching row
397    // key does not exist, then to the start of the next matching Row).
398    // Always check bloom filter to optimize the top row seek for delete
399    // family marker.
400    if (isLazy) {
401      for (KeyValueScanner scanner : scanners) {
402        scanner.requestSeek(seekKey, false, true);
403      }
404    } else {
405      if (!isParallelSeek) {
406        long totalScannersSoughtBytes = 0;
407        for (KeyValueScanner scanner : scanners) {
408          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
409            throw new RowTooBigException(
410              "Max row size allowed: " + maxRowSize + ", but row is bigger than that");
411          }
412          scanner.seek(seekKey);
413          Cell c = scanner.peek();
414          if (c != null) {
415            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
416          }
417        }
418      } else {
419        parallelSeek(scanners, seekKey);
420      }
421    }
422  }
423
424  protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
425    throws IOException {
426    // Combine all seeked scanners with a heap
427    heap = newKVHeap(scanners, comparator);
428  }
429
430  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
431    CellComparator comparator) throws IOException {
432    return new KeyValueHeap(scanners, comparator);
433  }
434
435  /**
436   * Filters the given list of scanners using Bloom filter, time range, and TTL.
437   * <p>
438   * Will be overridden by testcase so declared as protected.
439   */
440  protected List<KeyValueScanner> selectScannersFrom(HStore store,
441    List<? extends KeyValueScanner> allScanners) {
442    boolean memOnly;
443    boolean filesOnly;
444    if (scan instanceof InternalScan) {
445      InternalScan iscan = (InternalScan) scan;
446      memOnly = iscan.isCheckOnlyMemStore();
447      filesOnly = iscan.isCheckOnlyStoreFiles();
448    } else {
449      memOnly = false;
450      filesOnly = false;
451    }
452
453    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
454
455    // We can only exclude store files based on TTL if minVersions is set to 0.
456    // Otherwise, we might have to return KVs that have technically expired.
457    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
458
459    // include only those scan files which pass all filters
460    for (KeyValueScanner kvs : allScanners) {
461      boolean isFile = kvs.isFileScanner();
462      if ((!isFile && filesOnly) || (isFile && memOnly)) {
463        kvs.close();
464        continue;
465      }
466
467      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
468        scanners.add(kvs);
469      } else {
470        kvs.close();
471      }
472    }
473    return scanners;
474  }
475
476  @Override
477  public Cell peek() {
478    return heap != null ? heap.peek() : null;
479  }
480
481  @Override
482  public KeyValue next() {
483    // throw runtime exception perhaps?
484    throw new RuntimeException("Never call StoreScanner.next()");
485  }
486
487  @Override
488  public void close() {
489    close(true);
490  }
491
492  private void close(boolean withDelayedScannersClose) {
493    closeLock.lock();
494    // If the closeLock is acquired then any subsequent updateReaders()
495    // call is ignored.
496    try {
497      if (this.closing) {
498        return;
499      }
500      if (withDelayedScannersClose) {
501        this.closing = true;
502      }
503      // For mob compaction, we do not have a store.
504      if (this.store != null) {
505        this.store.deleteChangedReaderObserver(this);
506      }
507      if (withDelayedScannersClose) {
508        clearAndClose(scannersForDelayedClose);
509        clearAndClose(memStoreScannersAfterFlush);
510        clearAndClose(flushedstoreFileScanners);
511        if (this.heap != null) {
512          this.heap.close();
513          this.currentScanners.clear();
514          this.heap = null; // CLOSED!
515        }
516      } else {
517        if (this.heap != null) {
518          this.scannersForDelayedClose.add(this.heap);
519          this.currentScanners.clear();
520          this.heap = null;
521        }
522      }
523    } finally {
524      closeLock.unlock();
525    }
526  }
527
528  @Override
529  public boolean seek(Cell key) throws IOException {
530    if (checkFlushed()) {
531      reopenAfterFlush();
532    }
533    return this.heap.seek(key);
534  }
535
536  /**
537   * Get the next row of values from this Store.
538   * @return true if there are more rows, false if scanner is done
539   */
540  @Override
541  public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
542    if (scannerContext == null) {
543      throw new IllegalArgumentException("Scanner context cannot be null");
544    }
545    if (checkFlushed() && reopenAfterFlush()) {
546      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
547    }
548
549    // if the heap was left null, then the scanners had previously run out anyways, close and
550    // return.
551    if (this.heap == null) {
552      // By this time partial close should happened because already heap is null
553      close(false);// Do all cleanup except heap.close()
554      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
555    }
556
557    Cell cell = this.heap.peek();
558    if (cell == null) {
559      close(false);// Do all cleanup except heap.close()
560      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
561    }
562
563    // only call setRow if the row changes; avoids confusing the query matcher
564    // if scanning intra-row
565
566    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
567    // rows. Else it is possible we are still traversing the same row so we must perform the row
568    // comparison.
569    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
570      this.countPerRow = 0;
571      matcher.setToNewRow(cell);
572    }
573
574    // Clear progress away unless invoker has indicated it should be kept.
575    if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
576      scannerContext.clearProgress();
577    }
578
579    Optional<RpcCall> rpcCall =
580      matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
581
582    int count = 0;
583    long totalBytesRead = 0;
584    // track the cells for metrics only if it is a user read request.
585    boolean onlyFromMemstore = matcher.isUserScan();
586    try {
587      LOOP: do {
588        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
589        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
590        // in
591        // the shipped method below.
592        if (
593          kvsScanned % cellsPerHeartbeatCheck == 0
594            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
595        ) {
596          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
597            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
598          }
599        }
600        // Do object compare - we set prevKV from the same heap.
601        if (prevCell != cell) {
602          ++kvsScanned;
603        }
604        checkScanOrder(prevCell, cell, comparator);
605        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
606        bytesRead += cellSize;
607        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
608          // return immediately if we want to switch from pread to stream. We need this because we
609          // can
610          // only switch in the shipped method, if user use a filter to filter out everything and
611          // rpc
612          // timeout is very large then the shipped method will never be called until the whole scan
613          // is finished, but at that time we have already scan all the data...
614          // See HBASE-20457 for more details.
615          // And there is still a scenario that can not be handled. If we have a very large row,
616          // which
617          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
618          // here, we still need to scan all the qualifiers before returning...
619          scannerContext.returnImmediately();
620        }
621
622        heap.recordBlockSize(blockSize -> {
623          if (rpcCall.isPresent()) {
624            rpcCall.get().incrementBlockBytesScanned(blockSize);
625          }
626          scannerContext.incrementBlockProgress(blockSize);
627        });
628
629        prevCell = cell;
630        scannerContext.setLastPeekedCell(cell);
631        topChanged = false;
632        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
633        switch (qcode) {
634          case INCLUDE:
635          case INCLUDE_AND_SEEK_NEXT_ROW:
636          case INCLUDE_AND_SEEK_NEXT_COL:
637            Filter f = matcher.getFilter();
638            if (f != null) {
639              cell = f.transformCell(cell);
640            }
641            this.countPerRow++;
642
643            // add to results only if we have skipped #storeOffset kvs
644            // also update metric accordingly
645            if (this.countPerRow > storeOffset) {
646              outResult.add(cell);
647
648              // Update local tracking information
649              count++;
650              totalBytesRead += cellSize;
651
652              /**
653               * Increment the metric if all the cells are from memstore. If not we will account it
654               * for mixed reads
655               */
656              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
657              // Update the progress of the scanner context
658              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
659              scannerContext.incrementBatchProgress(1);
660
661              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
662                String message = "Max row size allowed: " + maxRowSize
663                  + ", but the row is bigger than that, the row info: "
664                  + CellUtil.toString(cell, false) + ", already have process row cells = "
665                  + outResult.size() + ", it belong to region = "
666                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
667                LOG.warn(message);
668                throw new RowTooBigException(message);
669              }
670
671              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
672                // do what SEEK_NEXT_ROW does.
673                if (!matcher.moreRowsMayExistAfter(cell)) {
674                  close(false);// Do all cleanup except heap.close()
675                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
676                }
677                matcher.clearCurrentRow();
678                seekToNextRow(cell);
679                break LOOP;
680              }
681            }
682
683            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
684              if (!matcher.moreRowsMayExistAfter(cell)) {
685                close(false);// Do all cleanup except heap.close()
686                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
687              }
688              matcher.clearCurrentRow();
689              seekOrSkipToNextRow(cell);
690            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
691              seekOrSkipToNextColumn(cell);
692            } else {
693              this.heap.next();
694            }
695
696            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
697              break LOOP;
698            }
699            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
700              break LOOP;
701            }
702            continue;
703
704          case DONE:
705            // Optimization for Gets! If DONE, no more to get on this row, early exit!
706            if (get) {
707              // Then no more to this row... exit.
708              close(false);// Do all cleanup except heap.close()
709              // update metric
710              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
711            }
712            matcher.clearCurrentRow();
713            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
714
715          case DONE_SCAN:
716            close(false);// Do all cleanup except heap.close()
717            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
718
719          case SEEK_NEXT_ROW:
720            // This is just a relatively simple end of scan fix, to short-cut end
721            // us if there is an endKey in the scan.
722            if (!matcher.moreRowsMayExistAfter(cell)) {
723              close(false);// Do all cleanup except heap.close()
724              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
725            }
726            matcher.clearCurrentRow();
727            seekOrSkipToNextRow(cell);
728            NextState stateAfterSeekNextRow = needToReturn(outResult);
729            if (stateAfterSeekNextRow != null) {
730              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
731            }
732            break;
733
734          case SEEK_NEXT_COL:
735            seekOrSkipToNextColumn(cell);
736            NextState stateAfterSeekNextColumn = needToReturn(outResult);
737            if (stateAfterSeekNextColumn != null) {
738              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
739            }
740            break;
741
742          case SKIP:
743            this.heap.next();
744            break;
745
746          case SEEK_NEXT_USING_HINT:
747            Cell nextKV = matcher.getNextKeyHint(cell);
748            if (nextKV != null) {
749              int difference = comparator.compare(nextKV, cell);
750              if (
751                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
752              ) {
753                seekAsDirection(nextKV);
754                NextState stateAfterSeekByHint = needToReturn(outResult);
755                if (stateAfterSeekByHint != null) {
756                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
757                }
758                break;
759              }
760            }
761            heap.next();
762            break;
763
764          default:
765            throw new RuntimeException("UNEXPECTED");
766        }
767
768        // One last chance to break due to size limit. The INCLUDE* cases above already check
769        // limit and continue. For the various filtered cases, we need to check because block
770        // size limit may have been exceeded even if we don't add cells to result list.
771        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
772          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
773        }
774      } while ((cell = this.heap.peek()) != null);
775
776      if (count > 0) {
777        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
778      }
779
780      // No more keys
781      close(false);// Do all cleanup except heap.close()
782      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
783    } finally {
784      // increment only if we have some result
785      if (count > 0 && matcher.isUserScan()) {
786        // if true increment memstore metrics, if not the mixed one
787        updateMetricsStore(onlyFromMemstore);
788      }
789    }
790  }
791
792  private void updateMetricsStore(boolean memstoreRead) {
793    if (store != null) {
794      store.updateMetricsStore(memstoreRead);
795    } else {
796      // for testing.
797      if (memstoreRead) {
798        memstoreOnlyReads++;
799      } else {
800        mixedReads++;
801      }
802    }
803  }
804
805  /**
806   * If the top cell won't be flushed into disk, the new top cell may be changed after
807   * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the
808   * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
809   * is changed, we should return the current cells. Otherwise, we may return the cells across
810   * different rows.
811   * @param outResult the cells which are visible for user scan
812   * @return null is the top cell doesn't change. Otherwise, the NextState to return
813   */
814  private NextState needToReturn(List<Cell> outResult) {
815    if (!outResult.isEmpty() && topChanged) {
816      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
817    }
818    return null;
819  }
820
821  private void seekOrSkipToNextRow(Cell cell) throws IOException {
822    // If it is a Get Scan, then we know that we are done with this row; there are no more
823    // rows beyond the current one: don't try to optimize.
824    if (!get) {
825      if (trySkipToNextRow(cell)) {
826        return;
827      }
828    }
829    seekToNextRow(cell);
830  }
831
832  private void seekOrSkipToNextColumn(Cell cell) throws IOException {
833    if (!trySkipToNextColumn(cell)) {
834      seekAsDirection(matcher.getKeyForNextColumn(cell));
835    }
836  }
837
838  /**
839   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
840   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an
841   * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to
842   * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
843   * current, loaded block). It does this by looking at the next indexed key of the current HFile.
844   * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible
845   * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work
846   * with the current Cell but compare as though it were a seek key; see down in
847   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,
848   * otherwise we just SKIP to the next requested cell.
849   * <p>
850   * Other notes:
851   * <ul>
852   * <li>Rows can straddle block boundaries</li>
853   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
854   * different block than column C1 at T2)</li>
855   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few
856   * SKIPs...</li>
857   * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells,
858   * especially if we know we need to go to the next block.</li>
859   * </ul>
860   * <p>
861   * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll
862   * likely end up seeking to the next block (or past the next block) to get our next column.
863   * Example:
864   *
865   * <pre>
866   * |    BLOCK 1              |     BLOCK 2                   |
867   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
868   *                                   ^         ^
869   *                                   |         |
870   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
871   *
872   *
873   * |    BLOCK 1                       |     BLOCK 2                      |
874   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
875   *                                            ^              ^
876   *                                            |              |
877   *                                    Next Index Key        SEEK_NEXT_COL
878   * </pre>
879   *
880   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
881   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
882   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
883   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
884   * where the SEEK will not land us in the next block, it is very likely better to issues a series
885   * of SKIPs.
886   * @param cell current cell
887   * @return true means skip to next row, false means not
888   */
889  protected boolean trySkipToNextRow(Cell cell) throws IOException {
890    Cell nextCell = null;
891    // used to guard against a changed next indexed key by doing a identity comparison
892    // when the identity changes we need to compare the bytes again
893    Cell previousIndexedKey = null;
894    do {
895      Cell nextIndexedKey = getNextIndexedKey();
896      if (
897        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
898          && (nextIndexedKey == previousIndexedKey
899            || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)
900      ) {
901        this.heap.next();
902        ++kvsScanned;
903        previousIndexedKey = nextIndexedKey;
904      } else {
905        return false;
906      }
907    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
908    return true;
909  }
910
911  /**
912   * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)}
913   * @param cell current cell
914   * @return true means skip to next column, false means not
915   */
916  protected boolean trySkipToNextColumn(Cell cell) throws IOException {
917    Cell nextCell = null;
918    // used to guard against a changed next indexed key by doing a identity comparison
919    // when the identity changes we need to compare the bytes again
920    Cell previousIndexedKey = null;
921    do {
922      Cell nextIndexedKey = getNextIndexedKey();
923      if (
924        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
925          && (nextIndexedKey == previousIndexedKey
926            || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)
927      ) {
928        this.heap.next();
929        ++kvsScanned;
930        previousIndexedKey = nextIndexedKey;
931      } else {
932        return false;
933      }
934    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
935    // We need this check because it may happen that the new scanner that we get
936    // during heap.next() is requiring reseek due of fake KV previously generated for
937    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
938    if (
939      useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP
940    ) {
941      return false;
942    }
943    return true;
944  }
945
946  @Override
947  public long getReadPoint() {
948    return this.readPt;
949  }
950
951  private static void clearAndClose(List<KeyValueScanner> scanners) {
952    if (scanners == null) {
953      return;
954    }
955    for (KeyValueScanner s : scanners) {
956      s.close();
957    }
958    scanners.clear();
959  }
960
961  // Implementation of ChangedReadersObserver
962  @Override
963  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
964    throws IOException {
965    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
966      return;
967    }
968    boolean updateReaders = false;
969    flushLock.lock();
970    try {
971      if (!closeLock.tryLock()) {
972        // The reason for doing this is that when the current store scanner does not retrieve
973        // any new cells, then the scanner is considered to be done. The heap of this scanner
974        // is not closed till the shipped() call is completed. Hence in that case if at all
975        // the partial close (close (false)) has been called before updateReaders(), there is no
976        // need for the updateReaders() to happen.
977        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
978        // no lock acquired.
979        clearAndClose(memStoreScanners);
980        return;
981      }
982      // lock acquired
983      updateReaders = true;
984      if (this.closing) {
985        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
986        clearAndClose(memStoreScanners);
987        return;
988      }
989      flushed = true;
990      final boolean isCompaction = false;
991      boolean usePread = get || scanUsePread;
992      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
993      // calls next(). So its better we create scanners here rather than next() call. Ensure
994      // these scanners are properly closed() whether or not the scan is completed successfully
995      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
996      // store files. In case of stream scanners this eager creation does not induce performance
997      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
998      List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
999        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
1000      flushedstoreFileScanners.addAll(scanners);
1001      if (!CollectionUtils.isEmpty(memStoreScanners)) {
1002        clearAndClose(memStoreScannersAfterFlush);
1003        memStoreScannersAfterFlush.addAll(memStoreScanners);
1004      }
1005    } finally {
1006      flushLock.unlock();
1007      if (updateReaders) {
1008        closeLock.unlock();
1009      }
1010    }
1011    // Let the next() call handle re-creating and seeking
1012  }
1013
1014  /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */
1015  protected final boolean reopenAfterFlush() throws IOException {
1016    // here we can make sure that we have a Store instance so no null check on store.
1017    Cell lastTop = heap.peek();
1018    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
1019    // scanners? We did so in the constructor and we could have done it now by storing the scan
1020    // object from the constructor
1021    List<KeyValueScanner> scanners;
1022    flushLock.lock();
1023    try {
1024      List<KeyValueScanner> allScanners =
1025        new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1026      allScanners.addAll(flushedstoreFileScanners);
1027      allScanners.addAll(memStoreScannersAfterFlush);
1028      scanners = selectScannersFrom(store, allScanners);
1029      // Clear the current set of flushed store files scanners so that they don't get added again
1030      flushedstoreFileScanners.clear();
1031      memStoreScannersAfterFlush.clear();
1032    } finally {
1033      flushLock.unlock();
1034    }
1035
1036    // Seek the new scanners to the last key
1037    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1038    // remove the older memstore scanner
1039    for (int i = currentScanners.size() - 1; i >= 0; i--) {
1040      if (!currentScanners.get(i).isFileScanner()) {
1041        scannersForDelayedClose.add(currentScanners.remove(i));
1042      } else {
1043        // we add the memstore scanner to the end of currentScanners
1044        break;
1045      }
1046    }
1047    // add the newly created scanners on the flushed files and the current active memstore scanner
1048    addCurrentScanners(scanners);
1049    // Combine all seeked scanners with a heap
1050    resetKVHeap(this.currentScanners, store.getComparator());
1051    resetQueryMatcher(lastTop);
1052    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1053      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()
1054        + ",and after = " + heap.peek());
1055      topChanged = true;
1056    } else {
1057      topChanged = false;
1058    }
1059    return topChanged;
1060  }
1061
1062  private void resetQueryMatcher(Cell lastTopKey) {
1063    // Reset the state of the Query Matcher and set to top row.
1064    // Only reset and call setRow if the row changes; avoids confusing the
1065    // query matcher if scanning intra-row.
1066    Cell cell = heap.peek();
1067    if (cell == null) {
1068      cell = lastTopKey;
1069    }
1070    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1071      this.countPerRow = 0;
1072      // The setToNewRow will call reset internally
1073      matcher.setToNewRow(cell);
1074    }
1075  }
1076
1077  /**
1078   * Check whether scan as expected order
1079   */
1080  protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)
1081    throws IOException {
1082    // Check that the heap gives us KVs in an increasing order.
1083    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0
1084      : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1085  }
1086
1087  protected boolean seekToNextRow(Cell c) throws IOException {
1088    return reseek(PrivateCellUtil.createLastOnRow(c));
1089  }
1090
1091  /**
1092   * Do a reseek in a normal StoreScanner(scan forward)
1093   * @return true if scanner has values left, false if end of scanner
1094   */
1095  protected boolean seekAsDirection(Cell kv) throws IOException {
1096    return reseek(kv);
1097  }
1098
1099  @Override
1100  public boolean reseek(Cell kv) throws IOException {
1101    if (checkFlushed()) {
1102      reopenAfterFlush();
1103    }
1104    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1105      return heap.requestSeek(kv, true, useRowColBloom);
1106    }
1107    return heap.reseek(kv);
1108  }
1109
1110  void trySwitchToStreamRead() {
1111    if (
1112      readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
1113        || bytesRead < preadMaxBytes
1114    ) {
1115      return;
1116    }
1117    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1118      this.store.getColumnFamilyName());
1119    scanUsePread = false;
1120    Cell lastTop = heap.peek();
1121    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1122    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1123    for (KeyValueScanner kvs : currentScanners) {
1124      if (!kvs.isFileScanner()) {
1125        // collect memstorescanners here
1126        memstoreScanners.add(kvs);
1127      } else {
1128        scannersToClose.add(kvs);
1129      }
1130    }
1131    List<KeyValueScanner> fileScanners = null;
1132    List<KeyValueScanner> newCurrentScanners;
1133    KeyValueHeap newHeap;
1134    try {
1135      // We must have a store instance here so no null check
1136      // recreate the scanners on the current file scanners
1137      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,
1138        scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
1139        readPt, false);
1140      if (fileScanners == null) {
1141        return;
1142      }
1143      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1144      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1145      newCurrentScanners.addAll(fileScanners);
1146      newCurrentScanners.addAll(memstoreScanners);
1147      newHeap = newKVHeap(newCurrentScanners, comparator);
1148    } catch (Exception e) {
1149      LOG.warn("failed to switch to stream read", e);
1150      if (fileScanners != null) {
1151        fileScanners.forEach(KeyValueScanner::close);
1152      }
1153      return;
1154    }
1155    currentScanners.clear();
1156    addCurrentScanners(newCurrentScanners);
1157    this.heap = newHeap;
1158    resetQueryMatcher(lastTop);
1159    scannersToClose.forEach(KeyValueScanner::close);
1160  }
1161
1162  protected final boolean checkFlushed() {
1163    // check the var without any lock. Suppose even if we see the old
1164    // value here still it is ok to continue because we will not be resetting
1165    // the heap but will continue with the referenced memstore's snapshot. For compactions
1166    // any way we don't need the updateReaders at all to happen as we still continue with
1167    // the older files
1168    if (flushed) {
1169      // If there is a flush and the current scan is notified on the flush ensure that the
1170      // scan's heap gets reset and we do a seek on the newly flushed file.
1171      if (this.closing) {
1172        return false;
1173      }
1174      // reset the flag
1175      flushed = false;
1176      return true;
1177    }
1178    return false;
1179  }
1180
1181  /**
1182   * Seek storefiles in parallel to optimize IO latency as much as possible
1183   * @param scanners the list {@link KeyValueScanner}s to be read from
1184   * @param kv       the KeyValue on which the operation is being requested
1185   */
1186  private void parallelSeek(final List<? extends KeyValueScanner> scanners, final Cell kv)
1187    throws IOException {
1188    if (scanners.isEmpty()) return;
1189    int storeFileScannerCount = scanners.size();
1190    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1191    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1192    for (KeyValueScanner scanner : scanners) {
1193      if (scanner instanceof StoreFileScanner) {
1194        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);
1195        executor.submit(seekHandler);
1196        handlers.add(seekHandler);
1197      } else {
1198        scanner.seek(kv);
1199        latch.countDown();
1200      }
1201    }
1202
1203    try {
1204      latch.await();
1205    } catch (InterruptedException ie) {
1206      throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
1207    }
1208
1209    for (ParallelSeekHandler handler : handlers) {
1210      if (handler.getErr() != null) {
1211        throw new IOException(handler.getErr());
1212      }
1213    }
1214  }
1215
1216  /**
1217   * Used in testing.
1218   * @return all scanners in no particular order
1219   */
1220  List<KeyValueScanner> getAllScannersForTesting() {
1221    List<KeyValueScanner> allScanners = new ArrayList<>();
1222    KeyValueScanner current = heap.getCurrentForTesting();
1223    if (current != null) allScanners.add(current);
1224    for (KeyValueScanner scanner : heap.getHeap())
1225      allScanners.add(scanner);
1226    return allScanners;
1227  }
1228
1229  static void enableLazySeekGlobally(boolean enable) {
1230    lazySeekEnabledGlobally = enable;
1231  }
1232
1233  /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */
1234  public long getEstimatedNumberOfKvsScanned() {
1235    return this.kvsScanned;
1236  }
1237
1238  @Override
1239  public Cell getNextIndexedKey() {
1240    return this.heap.getNextIndexedKey();
1241  }
1242
1243  @Override
1244  public void shipped() throws IOException {
1245    if (prevCell != null) {
1246      // Do the copy here so that in case the prevCell ref is pointing to the previous
1247      // blocks we can safely release those blocks.
1248      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1249      // fetched from HDFS. Copying this would ensure that we let go the references to these
1250      // blocks so that they can be GCed safely(in case of bucket cache)
1251      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1252    }
1253    matcher.beforeShipped();
1254    // There wont be further fetch of Cells from these scanners. Just close.
1255    clearAndClose(scannersForDelayedClose);
1256    if (this.heap != null) {
1257      this.heap.shipped();
1258      // When switching from pread to stream, we will open a new scanner for each store file, but
1259      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1260      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1261      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1262      // method, so we here will also open new scanners and close old scanners in shipped method.
1263      // See HBASE-18055 for more details.
1264      trySwitchToStreamRead();
1265    }
1266  }
1267}