001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableSet;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.DoNotRetryIOException;
037import org.apache.hadoop.hbase.ExtendedCell;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.UnknownScannerException;
042import org.apache.hadoop.hbase.client.ClientInternalHelper;
043import org.apache.hadoop.hbase.client.IsolationLevel;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
047import org.apache.hadoop.hbase.filter.FilterWrapper;
048import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
049import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
050import org.apache.hadoop.hbase.ipc.RpcCall;
051import org.apache.hadoop.hbase.ipc.RpcCallback;
052import org.apache.hadoop.hbase.ipc.RpcServer;
053import org.apache.hadoop.hbase.regionserver.Region.Operation;
054import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
055import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
056import org.apache.hadoop.hbase.trace.TraceUtil;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
063
064/**
065 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
066 */
067@InterfaceAudience.Private
068public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
069
070  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
071
072  // Package local for testability
073  KeyValueHeap storeHeap = null;
074
075  /**
076   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
077   * if on-demand column family loading is enabled.
078   */
079  KeyValueHeap joinedHeap = null;
080
081  /**
082   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
083   * for which we are populating the values.
084   */
085  protected ExtendedCell joinedContinuationRow = null;
086  private boolean filterClosed = false;
087
088  protected final byte[] stopRow;
089  protected final boolean includeStopRow;
090  protected final boolean reversed;
091  protected final HRegion region;
092  protected final CellComparator comparator;
093
094  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
095
096  private final long readPt;
097  private final long maxResultSize;
098  private final ScannerContext defaultScannerContext;
099  private final FilterWrapper filter;
100  private final String operationId;
101
102  private RegionServerServices rsServices;
103
104  private final Set<Path> filesRead = new HashSet<>();
105
106  @Override
107  public RegionInfo getRegionInfo() {
108    return region.getRegionInfo();
109  }
110
111  private static boolean hasNonce(HRegion region, long nonce) {
112    RegionServerServices rsServices = region.getRegionServerServices();
113    return nonce != HConstants.NO_NONCE && rsServices != null
114      && rsServices.getNonceManager() != null;
115  }
116
117  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
118    long nonceGroup, long nonce) throws IOException {
119    this.region = region;
120    this.maxResultSize = scan.getMaxResultSize();
121    if (scan.hasFilter()) {
122      this.filter = new FilterWrapper(scan.getFilter());
123    } else {
124      this.filter = null;
125    }
126    this.comparator = region.getCellComparator();
127    /**
128     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
129     * scanner context that can be used to enforce the batch limit in the event that a
130     * ScannerContext is not specified during an invocation of next/nextRaw
131     */
132    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
133    this.stopRow = scan.getStopRow();
134    this.includeStopRow = scan.includeStopRow();
135    this.reversed = scan.isReversed();
136    this.operationId = scan.getId();
137
138    // synchronize on scannerReadPoints so that nobody calculates
139    // getSmallestReadPoint, before scannerReadPoints is updated.
140    IsolationLevel isolationLevel = scan.getIsolationLevel();
141    long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
142    this.scannerReadPoints = region.scannerReadPoints;
143    this.rsServices = region.getRegionServerServices();
144    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
145    try {
146      if (mvccReadPoint > 0) {
147        this.readPt = mvccReadPoint;
148      } else if (hasNonce(region, nonce)) {
149        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
150      } else {
151        this.readPt = region.getReadPoint(isolationLevel);
152      }
153      scannerReadPoints.put(this, this.readPt);
154    } finally {
155      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
156    }
157    initializeScanners(scan, additionalScanners);
158  }
159
160  public ScannerContext getContext() {
161    return defaultScannerContext;
162  }
163
164  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
165    throws IOException {
166    // Here we separate all scanners into two lists - scanner that provide data required
167    // by the filter to operate (scanners list) and all others (joinedScanners list).
168    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
169    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
170    // Store all already instantiated scanners for exception handling
171    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
172    // handle additionalScanners
173    if (additionalScanners != null && !additionalScanners.isEmpty()) {
174      scanners.addAll(additionalScanners);
175      instantiatedScanners.addAll(additionalScanners);
176    }
177
178    try {
179      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
180        HStore store = region.getStore(entry.getKey());
181        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
182        instantiatedScanners.add(scanner);
183        if (
184          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
185            || this.filter.isFamilyEssential(entry.getKey())
186        ) {
187          scanners.add(scanner);
188        } else {
189          joinedScanners.add(scanner);
190        }
191      }
192      initializeKVHeap(scanners, joinedScanners, region);
193    } catch (Throwable t) {
194      throw handleException(instantiatedScanners, t);
195    }
196  }
197
198  protected void initializeKVHeap(List<KeyValueScanner> scanners,
199    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
200    this.storeHeap = new KeyValueHeap(scanners, comparator);
201    if (!joinedScanners.isEmpty()) {
202      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
203    }
204  }
205
206  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
207    // remove scaner read point before throw the exception
208    scannerReadPoints.remove(this);
209    if (storeHeap != null) {
210      storeHeap.close();
211      storeHeap = null;
212      if (joinedHeap != null) {
213        joinedHeap.close();
214        joinedHeap = null;
215      }
216    } else {
217      // close all already instantiated scanners before throwing the exception
218      for (KeyValueScanner scanner : instantiatedScanners) {
219        scanner.close();
220      }
221    }
222    return t instanceof IOException ? (IOException) t : new IOException(t);
223  }
224
225  @Override
226  public long getMaxResultSize() {
227    return maxResultSize;
228  }
229
230  @Override
231  public long getMvccReadPoint() {
232    return this.readPt;
233  }
234
235  @Override
236  public int getBatch() {
237    return this.defaultScannerContext.getBatchLimit();
238  }
239
240  @Override
241  public String getOperationId() {
242    return operationId;
243  }
244
245  /**
246   * Reset both the filter and the old filter.
247   * @throws IOException in case a filter raises an I/O exception.
248   */
249  protected final void resetFilters() throws IOException {
250    if (filter != null) {
251      filter.reset();
252    }
253  }
254
255  @Override
256  public boolean next(List<? super ExtendedCell> outResults) throws IOException {
257    // apply the batching limit by default
258    return next(outResults, defaultScannerContext);
259  }
260
261  @Override
262  public synchronized boolean next(List<? super ExtendedCell> outResults,
263    ScannerContext scannerContext) throws IOException {
264    if (this.filterClosed) {
265      throw new UnknownScannerException("Scanner was closed (timed out?) "
266        + "after we renewed it. Could be caused by a very slow scanner "
267        + "or a lengthy garbage collection");
268    }
269    region.startRegionOperation(Operation.SCAN);
270    try {
271      return nextRaw(outResults, scannerContext);
272    } finally {
273      region.closeRegionOperation(Operation.SCAN);
274    }
275  }
276
277  @Override
278  public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException {
279    // Use the RegionScanner's context by default
280    return nextRaw(outResults, defaultScannerContext);
281  }
282
283  @Override
284  public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext)
285    throws IOException {
286    if (storeHeap == null) {
287      // scanner is closed
288      throw new UnknownScannerException("Scanner was closed");
289    }
290    boolean moreValues = false;
291    if (outResults.isEmpty()) {
292      // Usually outResults is empty. This is true when next is called
293      // to handle scan or get operation.
294      moreValues = nextInternal(outResults, scannerContext);
295    } else {
296      List<ExtendedCell> tmpList = new ArrayList<>();
297      moreValues = nextInternal(tmpList, scannerContext);
298      outResults.addAll(tmpList);
299    }
300    region.addReadRequestsCount(1);
301    if (region.getMetrics() != null) {
302      region.getMetrics().updateReadRequestCount();
303    }
304
305    // If the size limit was reached it means a partial Result is being returned. Returning a
306    // partial Result means that we should not reset the filters; filters should only be reset in
307    // between rows
308    if (!scannerContext.mayHaveMoreCellsInRow()) {
309      resetFilters();
310    }
311
312    if (isFilterDoneInternal()) {
313      moreValues = false;
314    }
315    return moreValues;
316  }
317
318  /** Returns true if more cells exist after this batch, false if scanner is done */
319  private boolean populateFromJoinedHeap(List<? super ExtendedCell> results,
320    ScannerContext scannerContext) throws IOException {
321    assert joinedContinuationRow != null;
322    boolean moreValues =
323      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
324
325    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
326      // We are done with this row, reset the continuation.
327      joinedContinuationRow = null;
328    }
329    // As the data is obtained from two independent heaps, we need to
330    // ensure that result list is sorted, because Result relies on that.
331    ((List<Cell>) results).sort(comparator);
332    return moreValues;
333  }
334
335  /**
336   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
337   * reached, or remainingResultSize (if not -1) is reaced
338   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
339   * @return state of last call to {@link KeyValueHeap#next()}
340   */
341  private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap,
342    ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException {
343    Cell nextKv;
344    boolean moreCellsInRow = false;
345    boolean tmpKeepProgress = scannerContext.getKeepProgress();
346    // Scanning between column families and thus the scope is between cells
347    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
348    do {
349      // Check for thread interrupt status in case we have been signaled from
350      // #interruptRegionOperation.
351      region.checkInterrupt();
352
353      // We want to maintain any progress that is made towards the limits while scanning across
354      // different column families. To do this, we toggle the keep progress flag on during calls
355      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
356      scannerContext.setKeepProgress(true);
357      heap.next(results, scannerContext);
358      scannerContext.setKeepProgress(tmpKeepProgress);
359
360      nextKv = heap.peek();
361      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
362      if (!moreCellsInRow) {
363        incrementCountOfRowsScannedMetric(scannerContext);
364      }
365      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
366        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
367      } else if (scannerContext.checkSizeLimit(limitScope)) {
368        ScannerContext.NextState state =
369          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
370        return scannerContext.setScannerState(state).hasMoreValues();
371      } else if (scannerContext.checkTimeLimit(limitScope)) {
372        ScannerContext.NextState state =
373          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
374        return scannerContext.setScannerState(state).hasMoreValues();
375      }
376    } while (moreCellsInRow);
377    return nextKv != null;
378  }
379
380  /**
381   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
382   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
383   * there are more cells to be read in the row.
384   * @return true When there are more cells in the row to be read
385   */
386  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
387    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
388  }
389
390  /** Returns True if a filter rules the scanner is over, done. */
391  @Override
392  public synchronized boolean isFilterDone() throws IOException {
393    return isFilterDoneInternal();
394  }
395
396  private boolean isFilterDoneInternal() throws IOException {
397    return this.filter != null && this.filter.filterAllRemaining();
398  }
399
400  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
401    if (rpcCall.isPresent()) {
402      // If a user specifies a too-restrictive or too-slow scanner, the
403      // client might time out and disconnect while the server side
404      // is still processing the request. We should abort aggressively
405      // in that case.
406      long afterTime = rpcCall.get().disconnectSince();
407      if (afterTime >= 0) {
408        throw new CallerDisconnectedException(
409          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
410            + " after " + afterTime + " ms, since " + "caller disconnected");
411      }
412    }
413  }
414
415  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
416    long initialSizeProgress, long initialHeapSizeProgress) {
417    // Starting to scan a new row. Reset the scanner progress according to whether or not
418    // progress should be kept.
419    if (scannerContext.getKeepProgress()) {
420      // Progress should be kept. Reset to initial values seen at start of method invocation.
421      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
422        initialHeapSizeProgress);
423    } else {
424      scannerContext.clearProgress();
425    }
426  }
427
428  private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
429    throws IOException {
430    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
431    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
432    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
433
434    // Save the initial progress from the Scanner context in these local variables. The progress
435    // may need to be reset a few times if rows are being filtered out so we save the initial
436    // progress.
437    int initialBatchProgress = scannerContext.getBatchProgress();
438    long initialSizeProgress = scannerContext.getDataSizeProgress();
439    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
440
441    // Used to check time limit
442    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
443
444    // The loop here is used only when at some point during the next we determine
445    // that due to effects of filters or otherwise, we have an empty row in the result.
446    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
447    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
448    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
449    while (true) {
450      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
451        initialHeapSizeProgress);
452      checkClientDisconnect(rpcCall);
453
454      // Check for thread interrupt status in case we have been signaled from
455      // #interruptRegionOperation.
456      region.checkInterrupt();
457
458      // Let's see what we have in the storeHeap.
459      ExtendedCell current = this.storeHeap.peek();
460
461      boolean shouldStop = shouldStop(current);
462      // When has filter row is true it means that the all the cells for a particular row must be
463      // read before a filtering decision can be made. This means that filters where hasFilterRow
464      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
465      // table that has very large rows.
466      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
467
468      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
469      // would prevent the filters from being evaluated. Thus, if it is true, change the
470      // scope of any limits that could potentially create partial results to
471      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
472      if (hasFilterRow) {
473        if (LOG.isTraceEnabled()) {
474          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
475            + " formed. Changing scope of limits that may create partials");
476        }
477        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
478        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
479        limitScope = LimitScope.BETWEEN_ROWS;
480      }
481
482      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
483        if (hasFilterRow) {
484          throw new IncompatibleFilterException(
485            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
486              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
487        }
488        return true;
489      }
490
491      // Check if we were getting data from the joinedHeap and hit the limit.
492      // If not, then it's main path - getting results from storeHeap.
493      if (joinedContinuationRow == null) {
494        // First, check if we are at a stop row. If so, there are no more results.
495        if (shouldStop) {
496          if (hasFilterRow) {
497            filter.filterRowCells((List<Cell>) results);
498          }
499          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
500        }
501
502        // Check if rowkey filter wants to exclude this row. If so, loop to next.
503        // Technically, if we hit limits before on this row, we don't need this call.
504        if (filterRowKey(current)) {
505          incrementCountOfRowsFilteredMetric(scannerContext);
506          // early check, see HBASE-16296
507          if (isFilterDoneInternal()) {
508            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
509          }
510          // HBASE-29974: ask the filter for a seek hint so we can jump directly past the rejected
511          // row instead of iterating through its cells one-by-one via nextRow().
512          ExtendedCell rowHint = getHintForRejectedRow(current);
513          // Typically the count of rows scanned is incremented inside #populateResult. However,
514          // here we are filtering a row based purely on its row key, preventing us from calling
515          // #populateResult. Thus, perform the necessary increment here to rows scanned metric.
516          // Placed after getHintForRejectedRow so that a buggy filter throwing DNRIOE doesn't
517          // leave the metric incremented for a row that was never actually processed.
518          incrementCountOfRowsScannedMetric(scannerContext);
519          boolean moreRows = (rowHint != null)
520            ? nextRowViaHint(scannerContext, current, rowHint)
521            : nextRow(scannerContext, current);
522          if (!moreRows) {
523            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
524          }
525          results.clear();
526
527          // Read nothing as the rowkey was filtered, but still need to check time limit
528          // We also check size limit because we might have read blocks in getting to this point.
529          if (scannerContext.checkAnyLimitReached(limitScope)) {
530            return true;
531          }
532          continue;
533        }
534
535        // Ok, we are good, let's try to get some results from the main heap.
536        populateResult(results, this.storeHeap, scannerContext, current);
537        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
538          if (hasFilterRow) {
539            throw new IncompatibleFilterException(
540              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
541                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
542          }
543          return true;
544        }
545
546        // Check for thread interrupt status in case we have been signaled from
547        // #interruptRegionOperation.
548        region.checkInterrupt();
549
550        Cell nextKv = this.storeHeap.peek();
551        shouldStop = shouldStop(nextKv);
552        // save that the row was empty before filters applied to it.
553        final boolean isEmptyRow = results.isEmpty();
554
555        // We have the part of the row necessary for filtering (all of it, usually).
556        // First filter with the filterRow(List).
557        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
558        if (hasFilterRow) {
559          ret = filter.filterRowCellsWithRet((List<Cell>) results);
560
561          // We don't know how the results have changed after being filtered. Must set progress
562          // according to contents of results now.
563          if (scannerContext.getKeepProgress()) {
564            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
565              initialHeapSizeProgress);
566          } else {
567            scannerContext.clearProgress();
568          }
569          scannerContext.incrementBatchProgress(results.size());
570          for (ExtendedCell cell : (List<ExtendedCell>) results) {
571            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
572              cell.heapSize());
573          }
574        }
575
576        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
577          incrementCountOfRowsFilteredMetric(scannerContext);
578          results.clear();
579          boolean moreRows = nextRow(scannerContext, current);
580          if (!moreRows) {
581            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
582          }
583
584          // This row was totally filtered out, if this is NOT the last row,
585          // we should continue on. Otherwise, nothing else to do.
586          if (!shouldStop) {
587            // Read nothing as the cells was filtered, but still need to check time limit.
588            // We also check size limit because we might have read blocks in getting to this point.
589            if (scannerContext.checkAnyLimitReached(limitScope)) {
590              return true;
591            }
592            continue;
593          }
594          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
595        }
596
597        // Ok, we are done with storeHeap for this row.
598        // Now we may need to fetch additional, non-essential data into row.
599        // These values are not needed for filter to work, so we postpone their
600        // fetch to (possibly) reduce amount of data loads from disk.
601        if (this.joinedHeap != null) {
602          boolean mayHaveData = joinedHeapMayHaveData(current);
603          if (mayHaveData) {
604            joinedContinuationRow = current;
605            populateFromJoinedHeap(results, scannerContext);
606
607            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
608              return true;
609            }
610          }
611        }
612      } else {
613        // Populating from the joined heap was stopped by limits, populate some more.
614        populateFromJoinedHeap(results, scannerContext);
615        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
616          return true;
617        }
618      }
619      // We may have just called populateFromJoinedMap and hit the limits. If that is
620      // the case, we need to call it again on the next next() invocation.
621      if (joinedContinuationRow != null) {
622        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
623      }
624
625      // Finally, we are done with both joinedHeap and storeHeap.
626      // Double check to prevent empty rows from appearing in result. It could be
627      // the case when SingleColumnValueExcludeFilter is used.
628      if (results.isEmpty()) {
629        incrementCountOfRowsFilteredMetric(scannerContext);
630        boolean moreRows = nextRow(scannerContext, current);
631        if (!moreRows) {
632          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
633        }
634        if (!shouldStop) {
635          // We check size limit because we might have read blocks in the nextRow call above, or
636          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
637          // and for those scans which filter row _cells_ this is the only place we can actually
638          // enforce that the scan does not exceed limits since it bypasses all other checks above.
639          if (scannerContext.checkSizeLimit(limitScope)) {
640            return true;
641          }
642          continue;
643        }
644      }
645
646      if (shouldStop) {
647        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
648      } else {
649        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
650      }
651    }
652  }
653
654  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
655    region.filteredReadRequestsCount.increment();
656    if (region.getMetrics() != null) {
657      region.getMetrics().updateFilteredRecords();
658    }
659
660    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
661      return;
662    }
663
664    scannerContext.getMetrics()
665      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
666  }
667
668  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
669    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
670      return;
671    }
672
673    scannerContext.getMetrics()
674      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
675  }
676
677  /** Returns true when the joined heap may have data for the current row */
678  private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException {
679    Cell nextJoinedKv = joinedHeap.peek();
680    boolean matchCurrentRow =
681      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
682    boolean matchAfterSeek = false;
683
684    // If the next value in the joined heap does not match the current row, try to seek to the
685    // correct row
686    if (!matchCurrentRow) {
687      ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
688      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
689      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
690        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
691    }
692
693    return matchCurrentRow || matchAfterSeek;
694  }
695
696  /**
697   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
698   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
699   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
700   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
701   * filterRow() will be skipped.
702   */
703  private boolean filterRow() throws IOException {
704    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
705    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
706    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
707  }
708
709  private boolean filterRowKey(Cell current) throws IOException {
710    return filter != null && filter.filterRowKey(current);
711  }
712
713  /**
714   * A mocked list implementation - discards all updates.
715   */
716  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
717
718    @Override
719    public void add(int index, Cell element) {
720      // do nothing
721    }
722
723    @Override
724    public boolean addAll(int index, Collection<? extends Cell> c) {
725      return false; // this list is never changed as a result of an update
726    }
727
728    @Override
729    public KeyValue get(int index) {
730      throw new UnsupportedOperationException();
731    }
732
733    @Override
734    public int size() {
735      return 0;
736    }
737  };
738
739  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
740    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
741
742    // Enable skipping row mode, which disables limits and skips tracking progress for all
743    // but block size. We keep tracking block size because skipping a row in this way
744    // might involve reading blocks along the way.
745    scannerContext.setSkippingRow(true);
746
747    Cell next;
748    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
749      // Check for thread interrupt status in case we have been signaled from
750      // #interruptRegionOperation.
751      region.checkInterrupt();
752      this.storeHeap.next(MOCKED_LIST, scannerContext);
753    }
754
755    scannerContext.setSkippingRow(false);
756    resetFilters();
757
758    // Calling the hook in CP which allows it to do a fast forward
759    return this.region.getCoprocessorHost() == null
760      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
761  }
762
763  /**
764   * Fast-path alternative to {@link #nextRow} used when the filter has provided a seek hint via
765   * {@link org.apache.hadoop.hbase.filter.Filter#getHintForRejectedRow(Cell)}. Instead of iterating
766   * through every cell in the rejected row one-by-one, this method issues a single seek to jump
767   * directly to the filter's suggested position ({@code requestSeek} for forward scans,
768   * {@code backwardSeek} for reversed scans).
769   * <p>
770   * The skipping-row mode flag is set around the seek so that block-level size tracking continues
771   * to function (consistent with {@link #nextRow}), and the filter state is reset afterwards so the
772   * next row starts with a clean filter context.
773   * <p>
774   * <strong>Stop-row invariant:</strong> This method does not validate that {@code hint} falls
775   * within the scan's stop row. If the hint overshoots, the next iteration's
776   * {@link #shouldStop(Cell)} check catches it and returns NO_MORE_VALUES. One wasted seek may
777   * occur, but correctness is maintained.
778   * <p>
779   * <strong>Metrics note:</strong> The rows-scanned metric is incremented once by the caller for
780   * the rejected row. Rows physically skipped by the seek are not individually counted — this
781   * reflects the fact that no per-row work was done for those rows.
782   * <p>
783   * <strong>Coprocessor note:</strong> {@code postScannerFilterRow} is invoked once with
784   * {@code curRowCell}, not once per skipped row. Coprocessors counting filtered rows should be
785   * aware of this semantic when the hint path is used.
786   * @param scannerContext scanner context used for limit tracking
787   * @param curRowCell     the first cell of the row that was rejected by {@code filterRowKey};
788   *                       passed to the coprocessor hook for observability
789   * @param hint           the validated {@link ExtendedCell} returned by the filter; the scanner
790   *                       will seek to this position
791   * @return {@code true} if scanning should continue, {@code false} if a coprocessor requests an
792   *         early stop (mirrors the contract of {@link #nextRow})
793   * @throws IOException if the seek or the coprocessor hook signals a failure
794   */
795  private boolean nextRowViaHint(ScannerContext scannerContext, Cell curRowCell, ExtendedCell hint)
796    throws IOException {
797    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
798
799    int difference = comparator.compareRows(hint, curRowCell);
800    if ((!reversed && difference > 0) || (reversed && difference < 0)) {
801      scannerContext.setSkippingRow(true);
802      if (reversed) {
803        // ReversedKeyValueHeap does not support requestSeek; use backwardSeek
804        // to position at-or-before the hint within the target row.
805        // seekToPreviousRow would skip past the hint row entirely.
806        this.storeHeap.backwardSeek(hint);
807      } else {
808        this.storeHeap.requestSeek(hint, true, true);
809      }
810      scannerContext.setSkippingRow(false);
811
812      resetFilters();
813
814      return this.region.getCoprocessorHost() == null
815        || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
816    }
817
818    return nextRow(scannerContext, curRowCell);
819  }
820
821  /**
822   * Asks the current {@link org.apache.hadoop.hbase.filter.FilterWrapper} for a seek hint to use
823   * after a row has been rejected by {@link #filterRowKey}. If the wrapped filter overrides
824   * {@link org.apache.hadoop.hbase.filter.Filter#getHintForRejectedRow(Cell)}, this returns its
825   * answer as an {@link ExtendedCell}; otherwise returns {@code null}.
826   * <p>
827   * The returned cell is validated to be an {@link ExtendedCell} because filters run on the server
828   * side and the scanner infrastructure requires {@code ExtendedCell} references.
829   * @param rowCell the first cell of the rejected row (same cell passed to {@code filterRowKey})
830   * @return a validated {@link ExtendedCell} seek target, or {@code null} if the filter provides no
831   *         hint
832   * @throws DoNotRetryIOException if the filter returns a non-{@link ExtendedCell} instance
833   * @throws IOException           if the filter signals an I/O failure
834   */
835  private ExtendedCell getHintForRejectedRow(Cell rowCell) throws IOException {
836    if (filter == null) {
837      return null;
838    }
839    Cell hint = filter.getHintForRejectedRow(rowCell);
840    if (hint == null) {
841      return null;
842    }
843    if (!(hint instanceof ExtendedCell)) {
844      throw new DoNotRetryIOException(
845        "Incorrect filter implementation: the Cell returned by getHintForRejectedRow "
846          + "is not an ExtendedCell. Filter class: " + filter.getClass().getName());
847    }
848    return (ExtendedCell) hint;
849  }
850
851  protected boolean shouldStop(Cell currentRowCell) {
852    if (currentRowCell == null) {
853      return true;
854    }
855    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
856      return false;
857    }
858    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
859    return c > 0 || (c == 0 && !includeStopRow);
860  }
861
862  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
863      justification = "this method is only called inside close which is synchronized")
864  private void closeInternal() {
865    if (storeHeap != null) {
866      storeHeap.close();
867      filesRead.addAll(storeHeap.getFilesRead());
868      storeHeap = null;
869    }
870    if (joinedHeap != null) {
871      joinedHeap.close();
872      filesRead.addAll(joinedHeap.getFilesRead());
873      joinedHeap = null;
874    }
875    // no need to synchronize here.
876    scannerReadPoints.remove(this);
877    this.filterClosed = true;
878  }
879
880  @Override
881  public synchronized void close() {
882    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
883  }
884
885  /**
886   * Returns the set of store file paths that were successfully read by this scanner. Populated at
887   * close from the underlying store heap and joined heap (if any).
888   */
889  @Override
890  public Set<Path> getFilesRead() {
891    return Collections.unmodifiableSet(filesRead);
892  }
893
894  @Override
895  public synchronized boolean reseek(byte[] row) throws IOException {
896    return TraceUtil.trace(() -> {
897      if (row == null) {
898        throw new IllegalArgumentException("Row cannot be null.");
899      }
900      boolean result = false;
901      region.startRegionOperation();
902      ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
903      try {
904        // use request seek to make use of the lazy seek option. See HBASE-5520
905        result = this.storeHeap.requestSeek(kv, true, true);
906        if (this.joinedHeap != null) {
907          result = this.joinedHeap.requestSeek(kv, true, true) || result;
908        }
909      } finally {
910        region.closeRegionOperation();
911      }
912      return result;
913    }, () -> region.createRegionSpan("RegionScanner.reseek"));
914  }
915
916  @Override
917  public void shipped() throws IOException {
918    if (storeHeap != null) {
919      storeHeap.shipped();
920    }
921    if (joinedHeap != null) {
922      joinedHeap.shipped();
923    }
924  }
925
926  @Override
927  public void run() throws IOException {
928    // This is the RPC callback method executed. We do the close in of the scanner in this
929    // callback
930    this.close();
931  }
932}