001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.UnknownScannerException;
036import org.apache.hadoop.hbase.client.IsolationLevel;
037import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.filter.FilterWrapper;
041import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
042import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
043import org.apache.hadoop.hbase.ipc.RpcCall;
044import org.apache.hadoop.hbase.ipc.RpcCallback;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.regionserver.Region.Operation;
047import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
048import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
049import org.apache.hadoop.hbase.trace.TraceUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
056
057/**
058 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
059 */
060@InterfaceAudience.Private
061class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
062
063  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
064
065  // Package local for testability
066  KeyValueHeap storeHeap = null;
067
068  /**
069   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
070   * if on-demand column family loading is enabled.
071   */
072  KeyValueHeap joinedHeap = null;
073
074  /**
075   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
076   * for which we are populating the values.
077   */
078  protected Cell joinedContinuationRow = null;
079  private boolean filterClosed = false;
080
081  protected final byte[] stopRow;
082  protected final boolean includeStopRow;
083  protected final HRegion region;
084  protected final CellComparator comparator;
085
086  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
087
088  private final long readPt;
089  private final long maxResultSize;
090  private final ScannerContext defaultScannerContext;
091  private final FilterWrapper filter;
092  private final String operationId;
093
094  private RegionServerServices rsServices;
095
096  @Override
097  public RegionInfo getRegionInfo() {
098    return region.getRegionInfo();
099  }
100
101  private static boolean hasNonce(HRegion region, long nonce) {
102    RegionServerServices rsServices = region.getRegionServerServices();
103    return nonce != HConstants.NO_NONCE && rsServices != null
104      && rsServices.getNonceManager() != null;
105  }
106
107  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
108    long nonceGroup, long nonce) throws IOException {
109    this.region = region;
110    this.maxResultSize = scan.getMaxResultSize();
111    if (scan.hasFilter()) {
112      this.filter = new FilterWrapper(scan.getFilter());
113    } else {
114      this.filter = null;
115    }
116    this.comparator = region.getCellComparator();
117    /**
118     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
119     * scanner context that can be used to enforce the batch limit in the event that a
120     * ScannerContext is not specified during an invocation of next/nextRaw
121     */
122    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
123    this.stopRow = scan.getStopRow();
124    this.includeStopRow = scan.includeStopRow();
125    this.operationId = scan.getId();
126
127    // synchronize on scannerReadPoints so that nobody calculates
128    // getSmallestReadPoint, before scannerReadPoints is updated.
129    IsolationLevel isolationLevel = scan.getIsolationLevel();
130    long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
131    this.scannerReadPoints = region.scannerReadPoints;
132    this.rsServices = region.getRegionServerServices();
133    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
134    try {
135      if (mvccReadPoint > 0) {
136        this.readPt = mvccReadPoint;
137      } else if (hasNonce(region, nonce)) {
138        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
139      } else {
140        this.readPt = region.getReadPoint(isolationLevel);
141      }
142      scannerReadPoints.put(this, this.readPt);
143    } finally {
144      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
145    }
146    initializeScanners(scan, additionalScanners);
147  }
148
149  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
150    throws IOException {
151    // Here we separate all scanners into two lists - scanner that provide data required
152    // by the filter to operate (scanners list) and all others (joinedScanners list).
153    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
154    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
155    // Store all already instantiated scanners for exception handling
156    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
157    // handle additionalScanners
158    if (additionalScanners != null && !additionalScanners.isEmpty()) {
159      scanners.addAll(additionalScanners);
160      instantiatedScanners.addAll(additionalScanners);
161    }
162
163    try {
164      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
165        HStore store = region.getStore(entry.getKey());
166        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
167        instantiatedScanners.add(scanner);
168        if (
169          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
170            || this.filter.isFamilyEssential(entry.getKey())
171        ) {
172          scanners.add(scanner);
173        } else {
174          joinedScanners.add(scanner);
175        }
176      }
177      initializeKVHeap(scanners, joinedScanners, region);
178    } catch (Throwable t) {
179      throw handleException(instantiatedScanners, t);
180    }
181  }
182
183  protected void initializeKVHeap(List<KeyValueScanner> scanners,
184    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
185    this.storeHeap = new KeyValueHeap(scanners, comparator);
186    if (!joinedScanners.isEmpty()) {
187      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
188    }
189  }
190
191  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
192    // remove scaner read point before throw the exception
193    scannerReadPoints.remove(this);
194    if (storeHeap != null) {
195      storeHeap.close();
196      storeHeap = null;
197      if (joinedHeap != null) {
198        joinedHeap.close();
199        joinedHeap = null;
200      }
201    } else {
202      // close all already instantiated scanners before throwing the exception
203      for (KeyValueScanner scanner : instantiatedScanners) {
204        scanner.close();
205      }
206    }
207    return t instanceof IOException ? (IOException) t : new IOException(t);
208  }
209
210  @Override
211  public long getMaxResultSize() {
212    return maxResultSize;
213  }
214
215  @Override
216  public long getMvccReadPoint() {
217    return this.readPt;
218  }
219
220  @Override
221  public int getBatch() {
222    return this.defaultScannerContext.getBatchLimit();
223  }
224
225  @Override
226  public String getOperationId() {
227    return operationId;
228  }
229
230  /**
231   * Reset both the filter and the old filter.
232   * @throws IOException in case a filter raises an I/O exception.
233   */
234  protected final void resetFilters() throws IOException {
235    if (filter != null) {
236      filter.reset();
237    }
238  }
239
240  @Override
241  public boolean next(List<Cell> outResults) throws IOException {
242    // apply the batching limit by default
243    return next(outResults, defaultScannerContext);
244  }
245
246  @Override
247  public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
248    throws IOException {
249    if (this.filterClosed) {
250      throw new UnknownScannerException("Scanner was closed (timed out?) "
251        + "after we renewed it. Could be caused by a very slow scanner "
252        + "or a lengthy garbage collection");
253    }
254    region.startRegionOperation(Operation.SCAN);
255    try {
256      return nextRaw(outResults, scannerContext);
257    } finally {
258      region.closeRegionOperation(Operation.SCAN);
259    }
260  }
261
262  @Override
263  public boolean nextRaw(List<Cell> outResults) throws IOException {
264    // Use the RegionScanner's context by default
265    return nextRaw(outResults, defaultScannerContext);
266  }
267
268  @Override
269  public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
270    if (storeHeap == null) {
271      // scanner is closed
272      throw new UnknownScannerException("Scanner was closed");
273    }
274    boolean moreValues = false;
275    if (outResults.isEmpty()) {
276      // Usually outResults is empty. This is true when next is called
277      // to handle scan or get operation.
278      moreValues = nextInternal(outResults, scannerContext);
279    } else {
280      List<Cell> tmpList = new ArrayList<>();
281      moreValues = nextInternal(tmpList, scannerContext);
282      outResults.addAll(tmpList);
283    }
284
285    region.addReadRequestsCount(1);
286    if (region.getMetrics() != null) {
287      region.getMetrics().updateReadRequestCount();
288    }
289
290    // If the size limit was reached it means a partial Result is being returned. Returning a
291    // partial Result means that we should not reset the filters; filters should only be reset in
292    // between rows
293    if (!scannerContext.mayHaveMoreCellsInRow()) {
294      resetFilters();
295    }
296
297    if (isFilterDoneInternal()) {
298      moreValues = false;
299    }
300    return moreValues;
301  }
302
303  /** Returns true if more cells exist after this batch, false if scanner is done */
304  private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
305    throws IOException {
306    assert joinedContinuationRow != null;
307    boolean moreValues =
308      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
309
310    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
311      // We are done with this row, reset the continuation.
312      joinedContinuationRow = null;
313    }
314    // As the data is obtained from two independent heaps, we need to
315    // ensure that result list is sorted, because Result relies on that.
316    results.sort(comparator);
317    return moreValues;
318  }
319
320  /**
321   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
322   * reached, or remainingResultSize (if not -1) is reaced
323   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
324   * @return state of last call to {@link KeyValueHeap#next()}
325   */
326  private boolean populateResult(List<Cell> results, KeyValueHeap heap,
327    ScannerContext scannerContext, Cell currentRowCell) throws IOException {
328    Cell nextKv;
329    boolean moreCellsInRow = false;
330    boolean tmpKeepProgress = scannerContext.getKeepProgress();
331    // Scanning between column families and thus the scope is between cells
332    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
333    do {
334      // Check for thread interrupt status in case we have been signaled from
335      // #interruptRegionOperation.
336      region.checkInterrupt();
337
338      // We want to maintain any progress that is made towards the limits while scanning across
339      // different column families. To do this, we toggle the keep progress flag on during calls
340      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
341      scannerContext.setKeepProgress(true);
342      heap.next(results, scannerContext);
343      scannerContext.setKeepProgress(tmpKeepProgress);
344
345      nextKv = heap.peek();
346      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
347      if (!moreCellsInRow) {
348        incrementCountOfRowsScannedMetric(scannerContext);
349      }
350      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
351        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
352      } else if (scannerContext.checkSizeLimit(limitScope)) {
353        ScannerContext.NextState state =
354          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
355        return scannerContext.setScannerState(state).hasMoreValues();
356      } else if (scannerContext.checkTimeLimit(limitScope)) {
357        ScannerContext.NextState state =
358          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
359        return scannerContext.setScannerState(state).hasMoreValues();
360      }
361    } while (moreCellsInRow);
362    return nextKv != null;
363  }
364
365  /**
366   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
367   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
368   * there are more cells to be read in the row.
369   * @return true When there are more cells in the row to be read
370   */
371  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
372    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
373  }
374
375  /** Returns True if a filter rules the scanner is over, done. */
376  @Override
377  public synchronized boolean isFilterDone() throws IOException {
378    return isFilterDoneInternal();
379  }
380
381  private boolean isFilterDoneInternal() throws IOException {
382    return this.filter != null && this.filter.filterAllRemaining();
383  }
384
385  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
386    if (rpcCall.isPresent()) {
387      // If a user specifies a too-restrictive or too-slow scanner, the
388      // client might time out and disconnect while the server side
389      // is still processing the request. We should abort aggressively
390      // in that case.
391      long afterTime = rpcCall.get().disconnectSince();
392      if (afterTime >= 0) {
393        throw new CallerDisconnectedException(
394          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
395            + " after " + afterTime + " ms, since " + "caller disconnected");
396      }
397    }
398  }
399
400  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
401    long initialSizeProgress, long initialHeapSizeProgress) {
402    // Starting to scan a new row. Reset the scanner progress according to whether or not
403    // progress should be kept.
404    if (scannerContext.getKeepProgress()) {
405      // Progress should be kept. Reset to initial values seen at start of method invocation.
406      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
407        initialHeapSizeProgress);
408    } else {
409      scannerContext.clearProgress();
410    }
411  }
412
413  private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
414    throws IOException {
415    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
416    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
417    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
418
419    // Save the initial progress from the Scanner context in these local variables. The progress
420    // may need to be reset a few times if rows are being filtered out so we save the initial
421    // progress.
422    int initialBatchProgress = scannerContext.getBatchProgress();
423    long initialSizeProgress = scannerContext.getDataSizeProgress();
424    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
425
426    // Used to check time limit
427    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
428
429    // The loop here is used only when at some point during the next we determine
430    // that due to effects of filters or otherwise, we have an empty row in the result.
431    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
432    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
433    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
434    while (true) {
435      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
436        initialHeapSizeProgress);
437      checkClientDisconnect(rpcCall);
438
439      // Check for thread interrupt status in case we have been signaled from
440      // #interruptRegionOperation.
441      region.checkInterrupt();
442
443      // Let's see what we have in the storeHeap.
444      Cell current = this.storeHeap.peek();
445
446      boolean shouldStop = shouldStop(current);
447      // When has filter row is true it means that the all the cells for a particular row must be
448      // read before a filtering decision can be made. This means that filters where hasFilterRow
449      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
450      // table that has very large rows.
451      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
452
453      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
454      // would prevent the filters from being evaluated. Thus, if it is true, change the
455      // scope of any limits that could potentially create partial results to
456      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
457      if (hasFilterRow) {
458        if (LOG.isTraceEnabled()) {
459          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
460            + " formed. Changing scope of limits that may create partials");
461        }
462        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
463        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
464        limitScope = LimitScope.BETWEEN_ROWS;
465      }
466
467      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
468        if (hasFilterRow) {
469          throw new IncompatibleFilterException(
470            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
471              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
472        }
473        return true;
474      }
475
476      // Check if we were getting data from the joinedHeap and hit the limit.
477      // If not, then it's main path - getting results from storeHeap.
478      if (joinedContinuationRow == null) {
479        // First, check if we are at a stop row. If so, there are no more results.
480        if (shouldStop) {
481          if (hasFilterRow) {
482            filter.filterRowCells(results);
483          }
484          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
485        }
486
487        // Check if rowkey filter wants to exclude this row. If so, loop to next.
488        // Technically, if we hit limits before on this row, we don't need this call.
489        if (filterRowKey(current)) {
490          incrementCountOfRowsFilteredMetric(scannerContext);
491          // early check, see HBASE-16296
492          if (isFilterDoneInternal()) {
493            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
494          }
495          // Typically the count of rows scanned is incremented inside #populateResult. However,
496          // here we are filtering a row based purely on its row key, preventing us from calling
497          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
498          incrementCountOfRowsScannedMetric(scannerContext);
499          boolean moreRows = nextRow(scannerContext, current);
500          if (!moreRows) {
501            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
502          }
503          results.clear();
504
505          // Read nothing as the rowkey was filtered, but still need to check time limit
506          // We also check size limit because we might have read blocks in getting to this point.
507          if (scannerContext.checkAnyLimitReached(limitScope)) {
508            return true;
509          }
510          continue;
511        }
512
513        // Ok, we are good, let's try to get some results from the main heap.
514        populateResult(results, this.storeHeap, scannerContext, current);
515        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
516          if (hasFilterRow) {
517            throw new IncompatibleFilterException(
518              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
519                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
520          }
521          return true;
522        }
523
524        // Check for thread interrupt status in case we have been signaled from
525        // #interruptRegionOperation.
526        region.checkInterrupt();
527
528        Cell nextKv = this.storeHeap.peek();
529        shouldStop = shouldStop(nextKv);
530        // save that the row was empty before filters applied to it.
531        final boolean isEmptyRow = results.isEmpty();
532
533        // We have the part of the row necessary for filtering (all of it, usually).
534        // First filter with the filterRow(List).
535        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
536        if (hasFilterRow) {
537          ret = filter.filterRowCellsWithRet(results);
538
539          // We don't know how the results have changed after being filtered. Must set progress
540          // according to contents of results now.
541          if (scannerContext.getKeepProgress()) {
542            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
543              initialHeapSizeProgress);
544          } else {
545            scannerContext.clearProgress();
546          }
547          scannerContext.incrementBatchProgress(results.size());
548          for (Cell cell : results) {
549            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
550              cell.heapSize());
551          }
552        }
553
554        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
555          incrementCountOfRowsFilteredMetric(scannerContext);
556          results.clear();
557          boolean moreRows = nextRow(scannerContext, current);
558          if (!moreRows) {
559            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
560          }
561
562          // This row was totally filtered out, if this is NOT the last row,
563          // we should continue on. Otherwise, nothing else to do.
564          if (!shouldStop) {
565            // Read nothing as the cells was filtered, but still need to check time limit.
566            // We also check size limit because we might have read blocks in getting to this point.
567            if (scannerContext.checkAnyLimitReached(limitScope)) {
568              return true;
569            }
570            continue;
571          }
572          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
573        }
574
575        // Ok, we are done with storeHeap for this row.
576        // Now we may need to fetch additional, non-essential data into row.
577        // These values are not needed for filter to work, so we postpone their
578        // fetch to (possibly) reduce amount of data loads from disk.
579        if (this.joinedHeap != null) {
580          boolean mayHaveData = joinedHeapMayHaveData(current);
581          if (mayHaveData) {
582            joinedContinuationRow = current;
583            populateFromJoinedHeap(results, scannerContext);
584
585            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
586              return true;
587            }
588          }
589        }
590      } else {
591        // Populating from the joined heap was stopped by limits, populate some more.
592        populateFromJoinedHeap(results, scannerContext);
593        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
594          return true;
595        }
596      }
597      // We may have just called populateFromJoinedMap and hit the limits. If that is
598      // the case, we need to call it again on the next next() invocation.
599      if (joinedContinuationRow != null) {
600        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
601      }
602
603      // Finally, we are done with both joinedHeap and storeHeap.
604      // Double check to prevent empty rows from appearing in result. It could be
605      // the case when SingleColumnValueExcludeFilter is used.
606      if (results.isEmpty()) {
607        incrementCountOfRowsFilteredMetric(scannerContext);
608        boolean moreRows = nextRow(scannerContext, current);
609        if (!moreRows) {
610          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
611        }
612        if (!shouldStop) {
613          // We check size limit because we might have read blocks in the nextRow call above, or
614          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
615          // and for those scans which filter row _cells_ this is the only place we can actually
616          // enforce that the scan does not exceed limits since it bypasses all other checks above.
617          if (scannerContext.checkSizeLimit(limitScope)) {
618            return true;
619          }
620          continue;
621        }
622      }
623
624      if (shouldStop) {
625        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
626      } else {
627        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
628      }
629    }
630  }
631
632  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
633    region.filteredReadRequestsCount.increment();
634    if (region.getMetrics() != null) {
635      region.getMetrics().updateFilteredRecords();
636    }
637
638    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
639      return;
640    }
641
642    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
643  }
644
645  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
646    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
647      return;
648    }
649
650    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
651  }
652
653  /** Returns true when the joined heap may have data for the current row */
654  private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException {
655    Cell nextJoinedKv = joinedHeap.peek();
656    boolean matchCurrentRow =
657      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
658    boolean matchAfterSeek = false;
659
660    // If the next value in the joined heap does not match the current row, try to seek to the
661    // correct row
662    if (!matchCurrentRow) {
663      Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
664      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
665      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
666        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
667    }
668
669    return matchCurrentRow || matchAfterSeek;
670  }
671
672  /**
673   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
674   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
675   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
676   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
677   * filterRow() will be skipped.
678   */
679  private boolean filterRow() throws IOException {
680    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
681    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
682    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
683  }
684
685  private boolean filterRowKey(Cell current) throws IOException {
686    return filter != null && filter.filterRowKey(current);
687  }
688
689  /**
690   * A mocked list implementation - discards all updates.
691   */
692  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
693
694    @Override
695    public void add(int index, Cell element) {
696      // do nothing
697    }
698
699    @Override
700    public boolean addAll(int index, Collection<? extends Cell> c) {
701      return false; // this list is never changed as a result of an update
702    }
703
704    @Override
705    public KeyValue get(int index) {
706      throw new UnsupportedOperationException();
707    }
708
709    @Override
710    public int size() {
711      return 0;
712    }
713  };
714
715  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
716    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
717
718    // Enable skipping row mode, which disables limits and skips tracking progress for all
719    // but block size. We keep tracking block size because skipping a row in this way
720    // might involve reading blocks along the way.
721    scannerContext.setSkippingRow(true);
722
723    Cell next;
724    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
725      // Check for thread interrupt status in case we have been signaled from
726      // #interruptRegionOperation.
727      region.checkInterrupt();
728      this.storeHeap.next(MOCKED_LIST, scannerContext);
729    }
730
731    scannerContext.setSkippingRow(false);
732    resetFilters();
733
734    // Calling the hook in CP which allows it to do a fast forward
735    return this.region.getCoprocessorHost() == null
736      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
737  }
738
739  protected boolean shouldStop(Cell currentRowCell) {
740    if (currentRowCell == null) {
741      return true;
742    }
743    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
744      return false;
745    }
746    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
747    return c > 0 || (c == 0 && !includeStopRow);
748  }
749
750  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
751      justification = "this method is only called inside close which is synchronized")
752  private void closeInternal() {
753    if (storeHeap != null) {
754      storeHeap.close();
755      storeHeap = null;
756    }
757    if (joinedHeap != null) {
758      joinedHeap.close();
759      joinedHeap = null;
760    }
761    // no need to synchronize here.
762    scannerReadPoints.remove(this);
763    this.filterClosed = true;
764  }
765
766  @Override
767  public synchronized void close() {
768    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
769  }
770
771  @Override
772  public synchronized boolean reseek(byte[] row) throws IOException {
773    return TraceUtil.trace(() -> {
774      if (row == null) {
775        throw new IllegalArgumentException("Row cannot be null.");
776      }
777      boolean result = false;
778      region.startRegionOperation();
779      Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
780      try {
781        // use request seek to make use of the lazy seek option. See HBASE-5520
782        result = this.storeHeap.requestSeek(kv, true, true);
783        if (this.joinedHeap != null) {
784          result = this.joinedHeap.requestSeek(kv, true, true) || result;
785        }
786      } finally {
787        region.closeRegionOperation();
788      }
789      return result;
790    }, () -> region.createRegionSpan("RegionScanner.reseek"));
791  }
792
793  @Override
794  public void shipped() throws IOException {
795    if (storeHeap != null) {
796      storeHeap.shipped();
797    }
798    if (joinedHeap != null) {
799      joinedHeap.shipped();
800    }
801  }
802
803  @Override
804  public void run() throws IOException {
805    // This is the RPC callback method executed. We do the close in of the scanner in this
806    // callback
807    this.close();
808  }
809}