001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.UnknownScannerException;
037import org.apache.hadoop.hbase.client.ClientInternalHelper;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
042import org.apache.hadoop.hbase.filter.FilterWrapper;
043import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
044import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
045import org.apache.hadoop.hbase.ipc.RpcCall;
046import org.apache.hadoop.hbase.ipc.RpcCallback;
047import org.apache.hadoop.hbase.ipc.RpcServer;
048import org.apache.hadoop.hbase.regionserver.Region.Operation;
049import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
050import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
051import org.apache.hadoop.hbase.trace.TraceUtil;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058
059/**
060 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
061 */
062@InterfaceAudience.Private
063public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
064
065  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
066
067  // Package local for testability
068  KeyValueHeap storeHeap = null;
069
070  /**
071   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
072   * if on-demand column family loading is enabled.
073   */
074  KeyValueHeap joinedHeap = null;
075
076  /**
077   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
078   * for which we are populating the values.
079   */
080  protected ExtendedCell joinedContinuationRow = null;
081  private boolean filterClosed = false;
082
083  protected final byte[] stopRow;
084  protected final boolean includeStopRow;
085  protected final HRegion region;
086  protected final CellComparator comparator;
087
088  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
089
090  private final long readPt;
091  private final long maxResultSize;
092  private final ScannerContext defaultScannerContext;
093  private final FilterWrapper filter;
094  private final String operationId;
095
096  private RegionServerServices rsServices;
097
098  @Override
099  public RegionInfo getRegionInfo() {
100    return region.getRegionInfo();
101  }
102
103  private static boolean hasNonce(HRegion region, long nonce) {
104    RegionServerServices rsServices = region.getRegionServerServices();
105    return nonce != HConstants.NO_NONCE && rsServices != null
106      && rsServices.getNonceManager() != null;
107  }
108
109  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
110    long nonceGroup, long nonce) throws IOException {
111    this.region = region;
112    this.maxResultSize = scan.getMaxResultSize();
113    if (scan.hasFilter()) {
114      this.filter = new FilterWrapper(scan.getFilter());
115    } else {
116      this.filter = null;
117    }
118    this.comparator = region.getCellComparator();
119    /**
120     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
121     * scanner context that can be used to enforce the batch limit in the event that a
122     * ScannerContext is not specified during an invocation of next/nextRaw
123     */
124    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
125    this.stopRow = scan.getStopRow();
126    this.includeStopRow = scan.includeStopRow();
127    this.operationId = scan.getId();
128
129    // synchronize on scannerReadPoints so that nobody calculates
130    // getSmallestReadPoint, before scannerReadPoints is updated.
131    IsolationLevel isolationLevel = scan.getIsolationLevel();
132    long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
133    this.scannerReadPoints = region.scannerReadPoints;
134    this.rsServices = region.getRegionServerServices();
135    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
136    try {
137      if (mvccReadPoint > 0) {
138        this.readPt = mvccReadPoint;
139      } else if (hasNonce(region, nonce)) {
140        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
141      } else {
142        this.readPt = region.getReadPoint(isolationLevel);
143      }
144      scannerReadPoints.put(this, this.readPt);
145    } finally {
146      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
147    }
148    initializeScanners(scan, additionalScanners);
149  }
150
151  public ScannerContext getContext() {
152    return defaultScannerContext;
153  }
154
155  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
156    throws IOException {
157    // Here we separate all scanners into two lists - scanner that provide data required
158    // by the filter to operate (scanners list) and all others (joinedScanners list).
159    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
160    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
161    // Store all already instantiated scanners for exception handling
162    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
163    // handle additionalScanners
164    if (additionalScanners != null && !additionalScanners.isEmpty()) {
165      scanners.addAll(additionalScanners);
166      instantiatedScanners.addAll(additionalScanners);
167    }
168
169    try {
170      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
171        HStore store = region.getStore(entry.getKey());
172        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
173        instantiatedScanners.add(scanner);
174        if (
175          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
176            || this.filter.isFamilyEssential(entry.getKey())
177        ) {
178          scanners.add(scanner);
179        } else {
180          joinedScanners.add(scanner);
181        }
182      }
183      initializeKVHeap(scanners, joinedScanners, region);
184    } catch (Throwable t) {
185      throw handleException(instantiatedScanners, t);
186    }
187  }
188
189  protected void initializeKVHeap(List<KeyValueScanner> scanners,
190    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
191    this.storeHeap = new KeyValueHeap(scanners, comparator);
192    if (!joinedScanners.isEmpty()) {
193      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
194    }
195  }
196
197  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
198    // remove scaner read point before throw the exception
199    scannerReadPoints.remove(this);
200    if (storeHeap != null) {
201      storeHeap.close();
202      storeHeap = null;
203      if (joinedHeap != null) {
204        joinedHeap.close();
205        joinedHeap = null;
206      }
207    } else {
208      // close all already instantiated scanners before throwing the exception
209      for (KeyValueScanner scanner : instantiatedScanners) {
210        scanner.close();
211      }
212    }
213    return t instanceof IOException ? (IOException) t : new IOException(t);
214  }
215
216  @Override
217  public long getMaxResultSize() {
218    return maxResultSize;
219  }
220
221  @Override
222  public long getMvccReadPoint() {
223    return this.readPt;
224  }
225
226  @Override
227  public int getBatch() {
228    return this.defaultScannerContext.getBatchLimit();
229  }
230
231  @Override
232  public String getOperationId() {
233    return operationId;
234  }
235
236  /**
237   * Reset both the filter and the old filter.
238   * @throws IOException in case a filter raises an I/O exception.
239   */
240  protected final void resetFilters() throws IOException {
241    if (filter != null) {
242      filter.reset();
243    }
244  }
245
246  @Override
247  public boolean next(List<? super ExtendedCell> outResults) throws IOException {
248    // apply the batching limit by default
249    return next(outResults, defaultScannerContext);
250  }
251
252  @Override
253  public synchronized boolean next(List<? super ExtendedCell> outResults,
254    ScannerContext scannerContext) throws IOException {
255    if (this.filterClosed) {
256      throw new UnknownScannerException("Scanner was closed (timed out?) "
257        + "after we renewed it. Could be caused by a very slow scanner "
258        + "or a lengthy garbage collection");
259    }
260    region.startRegionOperation(Operation.SCAN);
261    try {
262      return nextRaw(outResults, scannerContext);
263    } finally {
264      region.closeRegionOperation(Operation.SCAN);
265    }
266  }
267
268  @Override
269  public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException {
270    // Use the RegionScanner's context by default
271    return nextRaw(outResults, defaultScannerContext);
272  }
273
274  @Override
275  public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext)
276    throws IOException {
277    if (storeHeap == null) {
278      // scanner is closed
279      throw new UnknownScannerException("Scanner was closed");
280    }
281    boolean moreValues = false;
282    if (outResults.isEmpty()) {
283      // Usually outResults is empty. This is true when next is called
284      // to handle scan or get operation.
285      moreValues = nextInternal(outResults, scannerContext);
286    } else {
287      List<ExtendedCell> tmpList = new ArrayList<>();
288      moreValues = nextInternal(tmpList, scannerContext);
289      outResults.addAll(tmpList);
290    }
291    region.addReadRequestsCount(1);
292    if (region.getMetrics() != null) {
293      region.getMetrics().updateReadRequestCount();
294    }
295
296    // If the size limit was reached it means a partial Result is being returned. Returning a
297    // partial Result means that we should not reset the filters; filters should only be reset in
298    // between rows
299    if (!scannerContext.mayHaveMoreCellsInRow()) {
300      resetFilters();
301    }
302
303    if (isFilterDoneInternal()) {
304      moreValues = false;
305    }
306    return moreValues;
307  }
308
309  /** Returns true if more cells exist after this batch, false if scanner is done */
310  private boolean populateFromJoinedHeap(List<? super ExtendedCell> results,
311    ScannerContext scannerContext) throws IOException {
312    assert joinedContinuationRow != null;
313    boolean moreValues =
314      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
315
316    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
317      // We are done with this row, reset the continuation.
318      joinedContinuationRow = null;
319    }
320    // As the data is obtained from two independent heaps, we need to
321    // ensure that result list is sorted, because Result relies on that.
322    ((List<Cell>) results).sort(comparator);
323    return moreValues;
324  }
325
326  /**
327   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
328   * reached, or remainingResultSize (if not -1) is reaced
329   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
330   * @return state of last call to {@link KeyValueHeap#next()}
331   */
332  private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap,
333    ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException {
334    Cell nextKv;
335    boolean moreCellsInRow = false;
336    boolean tmpKeepProgress = scannerContext.getKeepProgress();
337    // Scanning between column families and thus the scope is between cells
338    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
339    do {
340      // Check for thread interrupt status in case we have been signaled from
341      // #interruptRegionOperation.
342      region.checkInterrupt();
343
344      // We want to maintain any progress that is made towards the limits while scanning across
345      // different column families. To do this, we toggle the keep progress flag on during calls
346      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
347      scannerContext.setKeepProgress(true);
348      heap.next(results, scannerContext);
349      scannerContext.setKeepProgress(tmpKeepProgress);
350
351      nextKv = heap.peek();
352      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
353      if (!moreCellsInRow) {
354        incrementCountOfRowsScannedMetric(scannerContext);
355      }
356      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
357        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
358      } else if (scannerContext.checkSizeLimit(limitScope)) {
359        ScannerContext.NextState state =
360          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
361        return scannerContext.setScannerState(state).hasMoreValues();
362      } else if (scannerContext.checkTimeLimit(limitScope)) {
363        ScannerContext.NextState state =
364          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
365        return scannerContext.setScannerState(state).hasMoreValues();
366      }
367    } while (moreCellsInRow);
368    return nextKv != null;
369  }
370
371  /**
372   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
373   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
374   * there are more cells to be read in the row.
375   * @return true When there are more cells in the row to be read
376   */
377  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
378    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
379  }
380
381  /** Returns True if a filter rules the scanner is over, done. */
382  @Override
383  public synchronized boolean isFilterDone() throws IOException {
384    return isFilterDoneInternal();
385  }
386
387  private boolean isFilterDoneInternal() throws IOException {
388    return this.filter != null && this.filter.filterAllRemaining();
389  }
390
391  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
392    if (rpcCall.isPresent()) {
393      // If a user specifies a too-restrictive or too-slow scanner, the
394      // client might time out and disconnect while the server side
395      // is still processing the request. We should abort aggressively
396      // in that case.
397      long afterTime = rpcCall.get().disconnectSince();
398      if (afterTime >= 0) {
399        throw new CallerDisconnectedException(
400          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
401            + " after " + afterTime + " ms, since " + "caller disconnected");
402      }
403    }
404  }
405
406  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
407    long initialSizeProgress, long initialHeapSizeProgress) {
408    // Starting to scan a new row. Reset the scanner progress according to whether or not
409    // progress should be kept.
410    if (scannerContext.getKeepProgress()) {
411      // Progress should be kept. Reset to initial values seen at start of method invocation.
412      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
413        initialHeapSizeProgress);
414    } else {
415      scannerContext.clearProgress();
416    }
417  }
418
419  private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
420    throws IOException {
421    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
422    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
423    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
424
425    // Save the initial progress from the Scanner context in these local variables. The progress
426    // may need to be reset a few times if rows are being filtered out so we save the initial
427    // progress.
428    int initialBatchProgress = scannerContext.getBatchProgress();
429    long initialSizeProgress = scannerContext.getDataSizeProgress();
430    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
431
432    // Used to check time limit
433    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
434
435    // The loop here is used only when at some point during the next we determine
436    // that due to effects of filters or otherwise, we have an empty row in the result.
437    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
438    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
439    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
440    while (true) {
441      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
442        initialHeapSizeProgress);
443      checkClientDisconnect(rpcCall);
444
445      // Check for thread interrupt status in case we have been signaled from
446      // #interruptRegionOperation.
447      region.checkInterrupt();
448
449      // Let's see what we have in the storeHeap.
450      ExtendedCell current = this.storeHeap.peek();
451
452      boolean shouldStop = shouldStop(current);
453      // When has filter row is true it means that the all the cells for a particular row must be
454      // read before a filtering decision can be made. This means that filters where hasFilterRow
455      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
456      // table that has very large rows.
457      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
458
459      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
460      // would prevent the filters from being evaluated. Thus, if it is true, change the
461      // scope of any limits that could potentially create partial results to
462      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
463      if (hasFilterRow) {
464        if (LOG.isTraceEnabled()) {
465          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
466            + " formed. Changing scope of limits that may create partials");
467        }
468        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
469        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
470        limitScope = LimitScope.BETWEEN_ROWS;
471      }
472
473      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
474        if (hasFilterRow) {
475          throw new IncompatibleFilterException(
476            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
477              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
478        }
479        return true;
480      }
481
482      // Check if we were getting data from the joinedHeap and hit the limit.
483      // If not, then it's main path - getting results from storeHeap.
484      if (joinedContinuationRow == null) {
485        // First, check if we are at a stop row. If so, there are no more results.
486        if (shouldStop) {
487          if (hasFilterRow) {
488            filter.filterRowCells((List<Cell>) results);
489          }
490          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
491        }
492
493        // Check if rowkey filter wants to exclude this row. If so, loop to next.
494        // Technically, if we hit limits before on this row, we don't need this call.
495        if (filterRowKey(current)) {
496          incrementCountOfRowsFilteredMetric(scannerContext);
497          // early check, see HBASE-16296
498          if (isFilterDoneInternal()) {
499            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
500          }
501          // Typically the count of rows scanned is incremented inside #populateResult. However,
502          // here we are filtering a row based purely on its row key, preventing us from calling
503          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
504          incrementCountOfRowsScannedMetric(scannerContext);
505          boolean moreRows = nextRow(scannerContext, current);
506          if (!moreRows) {
507            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
508          }
509          results.clear();
510
511          // Read nothing as the rowkey was filtered, but still need to check time limit
512          // We also check size limit because we might have read blocks in getting to this point.
513          if (scannerContext.checkAnyLimitReached(limitScope)) {
514            return true;
515          }
516          continue;
517        }
518
519        // Ok, we are good, let's try to get some results from the main heap.
520        populateResult(results, this.storeHeap, scannerContext, current);
521        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
522          if (hasFilterRow) {
523            throw new IncompatibleFilterException(
524              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
525                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
526          }
527          return true;
528        }
529
530        // Check for thread interrupt status in case we have been signaled from
531        // #interruptRegionOperation.
532        region.checkInterrupt();
533
534        Cell nextKv = this.storeHeap.peek();
535        shouldStop = shouldStop(nextKv);
536        // save that the row was empty before filters applied to it.
537        final boolean isEmptyRow = results.isEmpty();
538
539        // We have the part of the row necessary for filtering (all of it, usually).
540        // First filter with the filterRow(List).
541        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
542        if (hasFilterRow) {
543          ret = filter.filterRowCellsWithRet((List<Cell>) results);
544
545          // We don't know how the results have changed after being filtered. Must set progress
546          // according to contents of results now.
547          if (scannerContext.getKeepProgress()) {
548            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
549              initialHeapSizeProgress);
550          } else {
551            scannerContext.clearProgress();
552          }
553          scannerContext.incrementBatchProgress(results.size());
554          for (ExtendedCell cell : (List<ExtendedCell>) results) {
555            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
556              cell.heapSize());
557          }
558        }
559
560        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
561          incrementCountOfRowsFilteredMetric(scannerContext);
562          results.clear();
563          boolean moreRows = nextRow(scannerContext, current);
564          if (!moreRows) {
565            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
566          }
567
568          // This row was totally filtered out, if this is NOT the last row,
569          // we should continue on. Otherwise, nothing else to do.
570          if (!shouldStop) {
571            // Read nothing as the cells was filtered, but still need to check time limit.
572            // We also check size limit because we might have read blocks in getting to this point.
573            if (scannerContext.checkAnyLimitReached(limitScope)) {
574              return true;
575            }
576            continue;
577          }
578          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
579        }
580
581        // Ok, we are done with storeHeap for this row.
582        // Now we may need to fetch additional, non-essential data into row.
583        // These values are not needed for filter to work, so we postpone their
584        // fetch to (possibly) reduce amount of data loads from disk.
585        if (this.joinedHeap != null) {
586          boolean mayHaveData = joinedHeapMayHaveData(current);
587          if (mayHaveData) {
588            joinedContinuationRow = current;
589            populateFromJoinedHeap(results, scannerContext);
590
591            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
592              return true;
593            }
594          }
595        }
596      } else {
597        // Populating from the joined heap was stopped by limits, populate some more.
598        populateFromJoinedHeap(results, scannerContext);
599        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
600          return true;
601        }
602      }
603      // We may have just called populateFromJoinedMap and hit the limits. If that is
604      // the case, we need to call it again on the next next() invocation.
605      if (joinedContinuationRow != null) {
606        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
607      }
608
609      // Finally, we are done with both joinedHeap and storeHeap.
610      // Double check to prevent empty rows from appearing in result. It could be
611      // the case when SingleColumnValueExcludeFilter is used.
612      if (results.isEmpty()) {
613        incrementCountOfRowsFilteredMetric(scannerContext);
614        boolean moreRows = nextRow(scannerContext, current);
615        if (!moreRows) {
616          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
617        }
618        if (!shouldStop) {
619          // We check size limit because we might have read blocks in the nextRow call above, or
620          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
621          // and for those scans which filter row _cells_ this is the only place we can actually
622          // enforce that the scan does not exceed limits since it bypasses all other checks above.
623          if (scannerContext.checkSizeLimit(limitScope)) {
624            return true;
625          }
626          continue;
627        }
628      }
629
630      if (shouldStop) {
631        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
632      } else {
633        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
634      }
635    }
636  }
637
638  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
639    region.filteredReadRequestsCount.increment();
640    if (region.getMetrics() != null) {
641      region.getMetrics().updateFilteredRecords();
642    }
643
644    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
645      return;
646    }
647
648    scannerContext.getMetrics()
649      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
650  }
651
652  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
653    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
654      return;
655    }
656
657    scannerContext.getMetrics()
658      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
659  }
660
661  /** Returns true when the joined heap may have data for the current row */
662  private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException {
663    Cell nextJoinedKv = joinedHeap.peek();
664    boolean matchCurrentRow =
665      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
666    boolean matchAfterSeek = false;
667
668    // If the next value in the joined heap does not match the current row, try to seek to the
669    // correct row
670    if (!matchCurrentRow) {
671      ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
672      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
673      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
674        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
675    }
676
677    return matchCurrentRow || matchAfterSeek;
678  }
679
680  /**
681   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
682   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
683   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
684   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
685   * filterRow() will be skipped.
686   */
687  private boolean filterRow() throws IOException {
688    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
689    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
690    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
691  }
692
693  private boolean filterRowKey(Cell current) throws IOException {
694    return filter != null && filter.filterRowKey(current);
695  }
696
697  /**
698   * A mocked list implementation - discards all updates.
699   */
700  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
701
702    @Override
703    public void add(int index, Cell element) {
704      // do nothing
705    }
706
707    @Override
708    public boolean addAll(int index, Collection<? extends Cell> c) {
709      return false; // this list is never changed as a result of an update
710    }
711
712    @Override
713    public KeyValue get(int index) {
714      throw new UnsupportedOperationException();
715    }
716
717    @Override
718    public int size() {
719      return 0;
720    }
721  };
722
723  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
724    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
725
726    // Enable skipping row mode, which disables limits and skips tracking progress for all
727    // but block size. We keep tracking block size because skipping a row in this way
728    // might involve reading blocks along the way.
729    scannerContext.setSkippingRow(true);
730
731    Cell next;
732    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
733      // Check for thread interrupt status in case we have been signaled from
734      // #interruptRegionOperation.
735      region.checkInterrupt();
736      this.storeHeap.next(MOCKED_LIST, scannerContext);
737    }
738
739    scannerContext.setSkippingRow(false);
740    resetFilters();
741
742    // Calling the hook in CP which allows it to do a fast forward
743    return this.region.getCoprocessorHost() == null
744      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
745  }
746
747  protected boolean shouldStop(Cell currentRowCell) {
748    if (currentRowCell == null) {
749      return true;
750    }
751    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
752      return false;
753    }
754    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
755    return c > 0 || (c == 0 && !includeStopRow);
756  }
757
758  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
759      justification = "this method is only called inside close which is synchronized")
760  private void closeInternal() {
761    if (storeHeap != null) {
762      storeHeap.close();
763      storeHeap = null;
764    }
765    if (joinedHeap != null) {
766      joinedHeap.close();
767      joinedHeap = null;
768    }
769    // no need to synchronize here.
770    scannerReadPoints.remove(this);
771    this.filterClosed = true;
772  }
773
774  @Override
775  public synchronized void close() {
776    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
777  }
778
779  @Override
780  public synchronized boolean reseek(byte[] row) throws IOException {
781    return TraceUtil.trace(() -> {
782      if (row == null) {
783        throw new IllegalArgumentException("Row cannot be null.");
784      }
785      boolean result = false;
786      region.startRegionOperation();
787      ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
788      try {
789        // use request seek to make use of the lazy seek option. See HBASE-5520
790        result = this.storeHeap.requestSeek(kv, true, true);
791        if (this.joinedHeap != null) {
792          result = this.joinedHeap.requestSeek(kv, true, true) || result;
793        }
794      } finally {
795        region.closeRegionOperation();
796      }
797      return result;
798    }, () -> region.createRegionSpan("RegionScanner.reseek"));
799  }
800
801  @Override
802  public void shipped() throws IOException {
803    if (storeHeap != null) {
804      storeHeap.shipped();
805    }
806    if (joinedHeap != null) {
807      joinedHeap.shipped();
808    }
809  }
810
811  @Override
812  public void run() throws IOException {
813    // This is the RPC callback method executed. We do the close in of the scanner in this
814    // callback
815    this.close();
816  }
817}