001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.UnknownScannerException;
037import org.apache.hadoop.hbase.client.ClientInternalHelper;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
042import org.apache.hadoop.hbase.filter.FilterWrapper;
043import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
044import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
045import org.apache.hadoop.hbase.ipc.RpcCall;
046import org.apache.hadoop.hbase.ipc.RpcCallback;
047import org.apache.hadoop.hbase.ipc.RpcServer;
048import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
049import org.apache.hadoop.hbase.regionserver.Region.Operation;
050import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
051import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
052import org.apache.hadoop.hbase.trace.TraceUtil;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
059
060/**
061 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
062 */
063@InterfaceAudience.Private
064public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
065
066  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
067
068  // Package local for testability
069  KeyValueHeap storeHeap = null;
070
071  /**
072   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
073   * if on-demand column family loading is enabled.
074   */
075  KeyValueHeap joinedHeap = null;
076
077  /**
078   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
079   * for which we are populating the values.
080   */
081  protected ExtendedCell joinedContinuationRow = null;
082  private boolean filterClosed = false;
083
084  protected final byte[] stopRow;
085  protected final boolean includeStopRow;
086  protected final HRegion region;
087  protected final CellComparator comparator;
088
089  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
090
091  private final long readPt;
092  private final long maxResultSize;
093  private final ScannerContext defaultScannerContext;
094  private final FilterWrapper filter;
095  private final String operationId;
096
097  private RegionServerServices rsServices;
098
099  private ServerSideScanMetrics scannerInitMetrics = null;
100
101  @Override
102  public RegionInfo getRegionInfo() {
103    return region.getRegionInfo();
104  }
105
106  private static boolean hasNonce(HRegion region, long nonce) {
107    RegionServerServices rsServices = region.getRegionServerServices();
108    return nonce != HConstants.NO_NONCE && rsServices != null
109      && rsServices.getNonceManager() != null;
110  }
111
112  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
113    long nonceGroup, long nonce) throws IOException {
114    this.region = region;
115    this.maxResultSize = scan.getMaxResultSize();
116    if (scan.hasFilter()) {
117      this.filter = new FilterWrapper(scan.getFilter());
118    } else {
119      this.filter = null;
120    }
121    this.comparator = region.getCellComparator();
122    /**
123     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
124     * scanner context that can be used to enforce the batch limit in the event that a
125     * ScannerContext is not specified during an invocation of next/nextRaw
126     */
127    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
128    this.stopRow = scan.getStopRow();
129    this.includeStopRow = scan.includeStopRow();
130    this.operationId = scan.getId();
131
132    // synchronize on scannerReadPoints so that nobody calculates
133    // getSmallestReadPoint, before scannerReadPoints is updated.
134    IsolationLevel isolationLevel = scan.getIsolationLevel();
135    long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
136    this.scannerReadPoints = region.scannerReadPoints;
137    this.rsServices = region.getRegionServerServices();
138    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
139    try {
140      if (mvccReadPoint > 0) {
141        this.readPt = mvccReadPoint;
142      } else if (hasNonce(region, nonce)) {
143        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
144      } else {
145        this.readPt = region.getReadPoint(isolationLevel);
146      }
147      scannerReadPoints.put(this, this.readPt);
148    } finally {
149      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
150    }
151    boolean isScanMetricsEnabled = scan.isScanMetricsEnabled();
152    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
153    if (isScanMetricsEnabled) {
154      this.scannerInitMetrics = new ServerSideScanMetrics();
155      ThreadLocalServerSideScanMetrics.reset();
156    }
157    initializeScanners(scan, additionalScanners);
158    if (isScanMetricsEnabled) {
159      ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scannerInitMetrics);
160    }
161  }
162
163  public ScannerContext getContext() {
164    return defaultScannerContext;
165  }
166
167  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
168    throws IOException {
169    // Here we separate all scanners into two lists - scanner that provide data required
170    // by the filter to operate (scanners list) and all others (joinedScanners list).
171    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
172    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
173    // Store all already instantiated scanners for exception handling
174    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
175    // handle additionalScanners
176    if (additionalScanners != null && !additionalScanners.isEmpty()) {
177      scanners.addAll(additionalScanners);
178      instantiatedScanners.addAll(additionalScanners);
179    }
180
181    try {
182      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
183        HStore store = region.getStore(entry.getKey());
184        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
185        instantiatedScanners.add(scanner);
186        if (
187          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
188            || this.filter.isFamilyEssential(entry.getKey())
189        ) {
190          scanners.add(scanner);
191        } else {
192          joinedScanners.add(scanner);
193        }
194      }
195      initializeKVHeap(scanners, joinedScanners, region);
196    } catch (Throwable t) {
197      throw handleException(instantiatedScanners, t);
198    }
199  }
200
201  protected void initializeKVHeap(List<KeyValueScanner> scanners,
202    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
203    this.storeHeap = new KeyValueHeap(scanners, comparator);
204    if (!joinedScanners.isEmpty()) {
205      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
206    }
207  }
208
209  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
210    // remove scaner read point before throw the exception
211    scannerReadPoints.remove(this);
212    if (storeHeap != null) {
213      storeHeap.close();
214      storeHeap = null;
215      if (joinedHeap != null) {
216        joinedHeap.close();
217        joinedHeap = null;
218      }
219    } else {
220      // close all already instantiated scanners before throwing the exception
221      for (KeyValueScanner scanner : instantiatedScanners) {
222        scanner.close();
223      }
224    }
225    return t instanceof IOException ? (IOException) t : new IOException(t);
226  }
227
228  @Override
229  public long getMaxResultSize() {
230    return maxResultSize;
231  }
232
233  @Override
234  public long getMvccReadPoint() {
235    return this.readPt;
236  }
237
238  @Override
239  public int getBatch() {
240    return this.defaultScannerContext.getBatchLimit();
241  }
242
243  @Override
244  public String getOperationId() {
245    return operationId;
246  }
247
248  /**
249   * Reset both the filter and the old filter.
250   * @throws IOException in case a filter raises an I/O exception.
251   */
252  protected final void resetFilters() throws IOException {
253    if (filter != null) {
254      filter.reset();
255    }
256  }
257
258  @Override
259  public boolean next(List<? super ExtendedCell> outResults) throws IOException {
260    // apply the batching limit by default
261    return next(outResults, defaultScannerContext);
262  }
263
264  @Override
265  public synchronized boolean next(List<? super ExtendedCell> outResults,
266    ScannerContext scannerContext) throws IOException {
267    if (this.filterClosed) {
268      throw new UnknownScannerException("Scanner was closed (timed out?) "
269        + "after we renewed it. Could be caused by a very slow scanner "
270        + "or a lengthy garbage collection");
271    }
272    region.startRegionOperation(Operation.SCAN);
273    try {
274      return nextRaw(outResults, scannerContext);
275    } finally {
276      region.closeRegionOperation(Operation.SCAN);
277    }
278  }
279
280  @Override
281  public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException {
282    // Use the RegionScanner's context by default
283    return nextRaw(outResults, defaultScannerContext);
284  }
285
286  @Override
287  public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext)
288    throws IOException {
289    if (storeHeap == null) {
290      // scanner is closed
291      throw new UnknownScannerException("Scanner was closed");
292    }
293    boolean moreValues = false;
294    boolean isScanMetricsEnabled = scannerContext.isTrackingMetrics();
295    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
296    if (isScanMetricsEnabled) {
297      ThreadLocalServerSideScanMetrics.reset();
298      ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
299      if (scannerInitMetrics != null) {
300        scannerInitMetrics.getMetricsMap().forEach(scanMetrics::addToCounter);
301        scannerInitMetrics = null;
302      }
303    }
304    if (outResults.isEmpty()) {
305      // Usually outResults is empty. This is true when next is called
306      // to handle scan or get operation.
307      moreValues = nextInternal(outResults, scannerContext);
308    } else {
309      List<ExtendedCell> tmpList = new ArrayList<>();
310      moreValues = nextInternal(tmpList, scannerContext);
311      outResults.addAll(tmpList);
312    }
313    if (isScanMetricsEnabled) {
314      ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
315      ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics);
316    }
317    region.addReadRequestsCount(1);
318    if (region.getMetrics() != null) {
319      region.getMetrics().updateReadRequestCount();
320    }
321
322    // If the size limit was reached it means a partial Result is being returned. Returning a
323    // partial Result means that we should not reset the filters; filters should only be reset in
324    // between rows
325    if (!scannerContext.mayHaveMoreCellsInRow()) {
326      resetFilters();
327    }
328
329    if (isFilterDoneInternal()) {
330      moreValues = false;
331    }
332    return moreValues;
333  }
334
335  /** Returns true if more cells exist after this batch, false if scanner is done */
336  private boolean populateFromJoinedHeap(List<? super ExtendedCell> results,
337    ScannerContext scannerContext) throws IOException {
338    assert joinedContinuationRow != null;
339    boolean moreValues =
340      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
341
342    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
343      // We are done with this row, reset the continuation.
344      joinedContinuationRow = null;
345    }
346    // As the data is obtained from two independent heaps, we need to
347    // ensure that result list is sorted, because Result relies on that.
348    ((List<Cell>) results).sort(comparator);
349    return moreValues;
350  }
351
352  /**
353   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
354   * reached, or remainingResultSize (if not -1) is reaced
355   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
356   * @return state of last call to {@link KeyValueHeap#next()}
357   */
358  private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap,
359    ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException {
360    Cell nextKv;
361    boolean moreCellsInRow = false;
362    boolean tmpKeepProgress = scannerContext.getKeepProgress();
363    // Scanning between column families and thus the scope is between cells
364    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
365    do {
366      // Check for thread interrupt status in case we have been signaled from
367      // #interruptRegionOperation.
368      region.checkInterrupt();
369
370      // We want to maintain any progress that is made towards the limits while scanning across
371      // different column families. To do this, we toggle the keep progress flag on during calls
372      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
373      scannerContext.setKeepProgress(true);
374      heap.next(results, scannerContext);
375      scannerContext.setKeepProgress(tmpKeepProgress);
376
377      nextKv = heap.peek();
378      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
379      if (!moreCellsInRow) {
380        incrementCountOfRowsScannedMetric(scannerContext);
381      }
382      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
383        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
384      } else if (scannerContext.checkSizeLimit(limitScope)) {
385        ScannerContext.NextState state =
386          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
387        return scannerContext.setScannerState(state).hasMoreValues();
388      } else if (scannerContext.checkTimeLimit(limitScope)) {
389        ScannerContext.NextState state =
390          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
391        return scannerContext.setScannerState(state).hasMoreValues();
392      }
393    } while (moreCellsInRow);
394    return nextKv != null;
395  }
396
397  /**
398   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
399   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
400   * there are more cells to be read in the row.
401   * @return true When there are more cells in the row to be read
402   */
403  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
404    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
405  }
406
407  /** Returns True if a filter rules the scanner is over, done. */
408  @Override
409  public synchronized boolean isFilterDone() throws IOException {
410    return isFilterDoneInternal();
411  }
412
413  private boolean isFilterDoneInternal() throws IOException {
414    return this.filter != null && this.filter.filterAllRemaining();
415  }
416
417  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
418    if (rpcCall.isPresent()) {
419      // If a user specifies a too-restrictive or too-slow scanner, the
420      // client might time out and disconnect while the server side
421      // is still processing the request. We should abort aggressively
422      // in that case.
423      long afterTime = rpcCall.get().disconnectSince();
424      if (afterTime >= 0) {
425        throw new CallerDisconnectedException(
426          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
427            + " after " + afterTime + " ms, since " + "caller disconnected");
428      }
429    }
430  }
431
432  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
433    long initialSizeProgress, long initialHeapSizeProgress) {
434    // Starting to scan a new row. Reset the scanner progress according to whether or not
435    // progress should be kept.
436    if (scannerContext.getKeepProgress()) {
437      // Progress should be kept. Reset to initial values seen at start of method invocation.
438      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
439        initialHeapSizeProgress);
440    } else {
441      scannerContext.clearProgress();
442    }
443  }
444
445  private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
446    throws IOException {
447    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
448    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
449    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
450
451    // Save the initial progress from the Scanner context in these local variables. The progress
452    // may need to be reset a few times if rows are being filtered out so we save the initial
453    // progress.
454    int initialBatchProgress = scannerContext.getBatchProgress();
455    long initialSizeProgress = scannerContext.getDataSizeProgress();
456    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
457
458    // Used to check time limit
459    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
460
461    // The loop here is used only when at some point during the next we determine
462    // that due to effects of filters or otherwise, we have an empty row in the result.
463    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
464    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
465    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
466    while (true) {
467      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
468        initialHeapSizeProgress);
469      checkClientDisconnect(rpcCall);
470
471      // Check for thread interrupt status in case we have been signaled from
472      // #interruptRegionOperation.
473      region.checkInterrupt();
474
475      // Let's see what we have in the storeHeap.
476      ExtendedCell current = this.storeHeap.peek();
477
478      boolean shouldStop = shouldStop(current);
479      // When has filter row is true it means that the all the cells for a particular row must be
480      // read before a filtering decision can be made. This means that filters where hasFilterRow
481      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
482      // table that has very large rows.
483      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
484
485      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
486      // would prevent the filters from being evaluated. Thus, if it is true, change the
487      // scope of any limits that could potentially create partial results to
488      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
489      if (hasFilterRow) {
490        if (LOG.isTraceEnabled()) {
491          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
492            + " formed. Changing scope of limits that may create partials");
493        }
494        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
495        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
496        limitScope = LimitScope.BETWEEN_ROWS;
497      }
498
499      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
500        if (hasFilterRow) {
501          throw new IncompatibleFilterException(
502            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
503              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
504        }
505        return true;
506      }
507
508      // Check if we were getting data from the joinedHeap and hit the limit.
509      // If not, then it's main path - getting results from storeHeap.
510      if (joinedContinuationRow == null) {
511        // First, check if we are at a stop row. If so, there are no more results.
512        if (shouldStop) {
513          if (hasFilterRow) {
514            filter.filterRowCells((List<Cell>) results);
515          }
516          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
517        }
518
519        // Check if rowkey filter wants to exclude this row. If so, loop to next.
520        // Technically, if we hit limits before on this row, we don't need this call.
521        if (filterRowKey(current)) {
522          incrementCountOfRowsFilteredMetric(scannerContext);
523          // early check, see HBASE-16296
524          if (isFilterDoneInternal()) {
525            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
526          }
527          // Typically the count of rows scanned is incremented inside #populateResult. However,
528          // here we are filtering a row based purely on its row key, preventing us from calling
529          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
530          incrementCountOfRowsScannedMetric(scannerContext);
531          boolean moreRows = nextRow(scannerContext, current);
532          if (!moreRows) {
533            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
534          }
535          results.clear();
536
537          // Read nothing as the rowkey was filtered, but still need to check time limit
538          // We also check size limit because we might have read blocks in getting to this point.
539          if (scannerContext.checkAnyLimitReached(limitScope)) {
540            return true;
541          }
542          continue;
543        }
544
545        // Ok, we are good, let's try to get some results from the main heap.
546        populateResult(results, this.storeHeap, scannerContext, current);
547        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
548          if (hasFilterRow) {
549            throw new IncompatibleFilterException(
550              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
551                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
552          }
553          return true;
554        }
555
556        // Check for thread interrupt status in case we have been signaled from
557        // #interruptRegionOperation.
558        region.checkInterrupt();
559
560        Cell nextKv = this.storeHeap.peek();
561        shouldStop = shouldStop(nextKv);
562        // save that the row was empty before filters applied to it.
563        final boolean isEmptyRow = results.isEmpty();
564
565        // We have the part of the row necessary for filtering (all of it, usually).
566        // First filter with the filterRow(List).
567        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
568        if (hasFilterRow) {
569          ret = filter.filterRowCellsWithRet((List<Cell>) results);
570
571          // We don't know how the results have changed after being filtered. Must set progress
572          // according to contents of results now.
573          if (scannerContext.getKeepProgress()) {
574            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
575              initialHeapSizeProgress);
576          } else {
577            scannerContext.clearProgress();
578          }
579          scannerContext.incrementBatchProgress(results.size());
580          for (ExtendedCell cell : (List<ExtendedCell>) results) {
581            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
582              cell.heapSize());
583          }
584        }
585
586        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
587          incrementCountOfRowsFilteredMetric(scannerContext);
588          results.clear();
589          boolean moreRows = nextRow(scannerContext, current);
590          if (!moreRows) {
591            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
592          }
593
594          // This row was totally filtered out, if this is NOT the last row,
595          // we should continue on. Otherwise, nothing else to do.
596          if (!shouldStop) {
597            // Read nothing as the cells was filtered, but still need to check time limit.
598            // We also check size limit because we might have read blocks in getting to this point.
599            if (scannerContext.checkAnyLimitReached(limitScope)) {
600              return true;
601            }
602            continue;
603          }
604          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
605        }
606
607        // Ok, we are done with storeHeap for this row.
608        // Now we may need to fetch additional, non-essential data into row.
609        // These values are not needed for filter to work, so we postpone their
610        // fetch to (possibly) reduce amount of data loads from disk.
611        if (this.joinedHeap != null) {
612          boolean mayHaveData = joinedHeapMayHaveData(current);
613          if (mayHaveData) {
614            joinedContinuationRow = current;
615            populateFromJoinedHeap(results, scannerContext);
616
617            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
618              return true;
619            }
620          }
621        }
622      } else {
623        // Populating from the joined heap was stopped by limits, populate some more.
624        populateFromJoinedHeap(results, scannerContext);
625        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
626          return true;
627        }
628      }
629      // We may have just called populateFromJoinedMap and hit the limits. If that is
630      // the case, we need to call it again on the next next() invocation.
631      if (joinedContinuationRow != null) {
632        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
633      }
634
635      // Finally, we are done with both joinedHeap and storeHeap.
636      // Double check to prevent empty rows from appearing in result. It could be
637      // the case when SingleColumnValueExcludeFilter is used.
638      if (results.isEmpty()) {
639        incrementCountOfRowsFilteredMetric(scannerContext);
640        boolean moreRows = nextRow(scannerContext, current);
641        if (!moreRows) {
642          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
643        }
644        if (!shouldStop) {
645          // We check size limit because we might have read blocks in the nextRow call above, or
646          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
647          // and for those scans which filter row _cells_ this is the only place we can actually
648          // enforce that the scan does not exceed limits since it bypasses all other checks above.
649          if (scannerContext.checkSizeLimit(limitScope)) {
650            return true;
651          }
652          continue;
653        }
654      }
655
656      if (shouldStop) {
657        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
658      } else {
659        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
660      }
661    }
662  }
663
664  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
665    region.filteredReadRequestsCount.increment();
666    if (region.getMetrics() != null) {
667      region.getMetrics().updateFilteredRecords();
668    }
669
670    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
671      return;
672    }
673
674    scannerContext.getMetrics()
675      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
676  }
677
678  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
679    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
680      return;
681    }
682
683    scannerContext.getMetrics()
684      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
685  }
686
687  /** Returns true when the joined heap may have data for the current row */
688  private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException {
689    Cell nextJoinedKv = joinedHeap.peek();
690    boolean matchCurrentRow =
691      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
692    boolean matchAfterSeek = false;
693
694    // If the next value in the joined heap does not match the current row, try to seek to the
695    // correct row
696    if (!matchCurrentRow) {
697      ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
698      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
699      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
700        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
701    }
702
703    return matchCurrentRow || matchAfterSeek;
704  }
705
706  /**
707   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
708   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
709   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
710   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
711   * filterRow() will be skipped.
712   */
713  private boolean filterRow() throws IOException {
714    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
715    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
716    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
717  }
718
719  private boolean filterRowKey(Cell current) throws IOException {
720    return filter != null && filter.filterRowKey(current);
721  }
722
723  /**
724   * A mocked list implementation - discards all updates.
725   */
726  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
727
728    @Override
729    public void add(int index, Cell element) {
730      // do nothing
731    }
732
733    @Override
734    public boolean addAll(int index, Collection<? extends Cell> c) {
735      return false; // this list is never changed as a result of an update
736    }
737
738    @Override
739    public KeyValue get(int index) {
740      throw new UnsupportedOperationException();
741    }
742
743    @Override
744    public int size() {
745      return 0;
746    }
747  };
748
749  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
750    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
751
752    // Enable skipping row mode, which disables limits and skips tracking progress for all
753    // but block size. We keep tracking block size because skipping a row in this way
754    // might involve reading blocks along the way.
755    scannerContext.setSkippingRow(true);
756
757    Cell next;
758    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
759      // Check for thread interrupt status in case we have been signaled from
760      // #interruptRegionOperation.
761      region.checkInterrupt();
762      this.storeHeap.next(MOCKED_LIST, scannerContext);
763    }
764
765    scannerContext.setSkippingRow(false);
766    resetFilters();
767
768    // Calling the hook in CP which allows it to do a fast forward
769    return this.region.getCoprocessorHost() == null
770      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
771  }
772
773  protected boolean shouldStop(Cell currentRowCell) {
774    if (currentRowCell == null) {
775      return true;
776    }
777    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
778      return false;
779    }
780    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
781    return c > 0 || (c == 0 && !includeStopRow);
782  }
783
784  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
785      justification = "this method is only called inside close which is synchronized")
786  private void closeInternal() {
787    if (storeHeap != null) {
788      storeHeap.close();
789      storeHeap = null;
790    }
791    if (joinedHeap != null) {
792      joinedHeap.close();
793      joinedHeap = null;
794    }
795    // no need to synchronize here.
796    scannerReadPoints.remove(this);
797    this.filterClosed = true;
798  }
799
800  @Override
801  public synchronized void close() {
802    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
803  }
804
805  @Override
806  public synchronized boolean reseek(byte[] row) throws IOException {
807    return TraceUtil.trace(() -> {
808      if (row == null) {
809        throw new IllegalArgumentException("Row cannot be null.");
810      }
811      boolean result = false;
812      region.startRegionOperation();
813      ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
814      try {
815        // use request seek to make use of the lazy seek option. See HBASE-5520
816        result = this.storeHeap.requestSeek(kv, true, true);
817        if (this.joinedHeap != null) {
818          result = this.joinedHeap.requestSeek(kv, true, true) || result;
819        }
820      } finally {
821        region.closeRegionOperation();
822      }
823      return result;
824    }, () -> region.createRegionSpan("RegionScanner.reseek"));
825  }
826
827  @Override
828  public void shipped() throws IOException {
829    if (storeHeap != null) {
830      storeHeap.shipped();
831    }
832    if (joinedHeap != null) {
833      joinedHeap.shipped();
834    }
835  }
836
837  @Override
838  public void run() throws IOException {
839    // This is the RPC callback method executed. We do the close in of the scanner in this
840    // callback
841    this.close();
842  }
843}