001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.UnknownScannerException;
037import org.apache.hadoop.hbase.client.ClientInternalHelper;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
042import org.apache.hadoop.hbase.filter.FilterWrapper;
043import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
044import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
045import org.apache.hadoop.hbase.ipc.RpcCall;
046import org.apache.hadoop.hbase.ipc.RpcCallback;
047import org.apache.hadoop.hbase.ipc.RpcServer;
048import org.apache.hadoop.hbase.regionserver.Region.Operation;
049import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
050import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
051import org.apache.hadoop.hbase.trace.TraceUtil;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058
059/**
060 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
061 */
062@InterfaceAudience.Private
063public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
064
065  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
066
067  // Package local for testability
068  KeyValueHeap storeHeap = null;
069
070  /**
071   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
072   * if on-demand column family loading is enabled.
073   */
074  KeyValueHeap joinedHeap = null;
075
076  /**
077   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
078   * for which we are populating the values.
079   */
080  protected ExtendedCell joinedContinuationRow = null;
081  private boolean filterClosed = false;
082
083  protected final byte[] stopRow;
084  protected final boolean includeStopRow;
085  protected final HRegion region;
086  protected final CellComparator comparator;
087
088  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
089
090  private final long readPt;
091  private final long maxResultSize;
092  private final ScannerContext defaultScannerContext;
093  private final FilterWrapper filter;
094  private final String operationId;
095
096  private RegionServerServices rsServices;
097
098  @Override
099  public RegionInfo getRegionInfo() {
100    return region.getRegionInfo();
101  }
102
103  private static boolean hasNonce(HRegion region, long nonce) {
104    RegionServerServices rsServices = region.getRegionServerServices();
105    return nonce != HConstants.NO_NONCE && rsServices != null
106      && rsServices.getNonceManager() != null;
107  }
108
109  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
110    long nonceGroup, long nonce) throws IOException {
111    this.region = region;
112    this.maxResultSize = scan.getMaxResultSize();
113    if (scan.hasFilter()) {
114      this.filter = new FilterWrapper(scan.getFilter());
115    } else {
116      this.filter = null;
117    }
118    this.comparator = region.getCellComparator();
119    /**
120     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
121     * scanner context that can be used to enforce the batch limit in the event that a
122     * ScannerContext is not specified during an invocation of next/nextRaw
123     */
124    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
125    this.stopRow = scan.getStopRow();
126    this.includeStopRow = scan.includeStopRow();
127    this.operationId = scan.getId();
128
129    // synchronize on scannerReadPoints so that nobody calculates
130    // getSmallestReadPoint, before scannerReadPoints is updated.
131    IsolationLevel isolationLevel = scan.getIsolationLevel();
132    long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
133    this.scannerReadPoints = region.scannerReadPoints;
134    this.rsServices = region.getRegionServerServices();
135    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
136    try {
137      if (mvccReadPoint > 0) {
138        this.readPt = mvccReadPoint;
139      } else if (hasNonce(region, nonce)) {
140        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
141      } else {
142        this.readPt = region.getReadPoint(isolationLevel);
143      }
144      scannerReadPoints.put(this, this.readPt);
145    } finally {
146      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
147    }
148    initializeScanners(scan, additionalScanners);
149  }
150
151  public ScannerContext getContext() {
152    return defaultScannerContext;
153  }
154
155  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
156    throws IOException {
157    // Here we separate all scanners into two lists - scanner that provide data required
158    // by the filter to operate (scanners list) and all others (joinedScanners list).
159    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
160    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
161    // Store all already instantiated scanners for exception handling
162    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
163    // handle additionalScanners
164    if (additionalScanners != null && !additionalScanners.isEmpty()) {
165      scanners.addAll(additionalScanners);
166      instantiatedScanners.addAll(additionalScanners);
167    }
168
169    try {
170      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
171        HStore store = region.getStore(entry.getKey());
172        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
173        instantiatedScanners.add(scanner);
174        if (
175          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
176            || this.filter.isFamilyEssential(entry.getKey())
177        ) {
178          scanners.add(scanner);
179        } else {
180          joinedScanners.add(scanner);
181        }
182      }
183      initializeKVHeap(scanners, joinedScanners, region);
184    } catch (Throwable t) {
185      throw handleException(instantiatedScanners, t);
186    }
187  }
188
189  protected void initializeKVHeap(List<KeyValueScanner> scanners,
190    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
191    this.storeHeap = new KeyValueHeap(scanners, comparator);
192    if (!joinedScanners.isEmpty()) {
193      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
194    }
195  }
196
197  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
198    // remove scaner read point before throw the exception
199    scannerReadPoints.remove(this);
200    if (storeHeap != null) {
201      storeHeap.close();
202      storeHeap = null;
203      if (joinedHeap != null) {
204        joinedHeap.close();
205        joinedHeap = null;
206      }
207    } else {
208      // close all already instantiated scanners before throwing the exception
209      for (KeyValueScanner scanner : instantiatedScanners) {
210        scanner.close();
211      }
212    }
213    return t instanceof IOException ? (IOException) t : new IOException(t);
214  }
215
216  @Override
217  public long getMaxResultSize() {
218    return maxResultSize;
219  }
220
221  @Override
222  public long getMvccReadPoint() {
223    return this.readPt;
224  }
225
226  @Override
227  public int getBatch() {
228    return this.defaultScannerContext.getBatchLimit();
229  }
230
231  @Override
232  public String getOperationId() {
233    return operationId;
234  }
235
236  /**
237   * Reset both the filter and the old filter.
238   * @throws IOException in case a filter raises an I/O exception.
239   */
240  protected final void resetFilters() throws IOException {
241    if (filter != null) {
242      filter.reset();
243    }
244  }
245
246  @Override
247  public boolean next(List<? super ExtendedCell> outResults) throws IOException {
248    // apply the batching limit by default
249    return next(outResults, defaultScannerContext);
250  }
251
252  @Override
253  public synchronized boolean next(List<? super ExtendedCell> outResults,
254    ScannerContext scannerContext) throws IOException {
255    if (this.filterClosed) {
256      throw new UnknownScannerException("Scanner was closed (timed out?) "
257        + "after we renewed it. Could be caused by a very slow scanner "
258        + "or a lengthy garbage collection");
259    }
260    region.startRegionOperation(Operation.SCAN);
261    try {
262      return nextRaw(outResults, scannerContext);
263    } finally {
264      region.closeRegionOperation(Operation.SCAN);
265    }
266  }
267
268  @Override
269  public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException {
270    // Use the RegionScanner's context by default
271    return nextRaw(outResults, defaultScannerContext);
272  }
273
274  @Override
275  public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext)
276    throws IOException {
277    if (storeHeap == null) {
278      // scanner is closed
279      throw new UnknownScannerException("Scanner was closed");
280    }
281    boolean moreValues = false;
282    if (outResults.isEmpty()) {
283      // Usually outResults is empty. This is true when next is called
284      // to handle scan or get operation.
285      moreValues = nextInternal(outResults, scannerContext);
286    } else {
287      List<ExtendedCell> tmpList = new ArrayList<>();
288      moreValues = nextInternal(tmpList, scannerContext);
289      outResults.addAll(tmpList);
290    }
291
292    region.addReadRequestsCount(1);
293    if (region.getMetrics() != null) {
294      region.getMetrics().updateReadRequestCount();
295    }
296
297    // If the size limit was reached it means a partial Result is being returned. Returning a
298    // partial Result means that we should not reset the filters; filters should only be reset in
299    // between rows
300    if (!scannerContext.mayHaveMoreCellsInRow()) {
301      resetFilters();
302    }
303
304    if (isFilterDoneInternal()) {
305      moreValues = false;
306    }
307    return moreValues;
308  }
309
310  /** Returns true if more cells exist after this batch, false if scanner is done */
311  private boolean populateFromJoinedHeap(List<? super ExtendedCell> results,
312    ScannerContext scannerContext) throws IOException {
313    assert joinedContinuationRow != null;
314    boolean moreValues =
315      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
316
317    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
318      // We are done with this row, reset the continuation.
319      joinedContinuationRow = null;
320    }
321    // As the data is obtained from two independent heaps, we need to
322    // ensure that result list is sorted, because Result relies on that.
323    ((List<Cell>) results).sort(comparator);
324    return moreValues;
325  }
326
327  /**
328   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
329   * reached, or remainingResultSize (if not -1) is reaced
330   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
331   * @return state of last call to {@link KeyValueHeap#next()}
332   */
333  private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap,
334    ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException {
335    Cell nextKv;
336    boolean moreCellsInRow = false;
337    boolean tmpKeepProgress = scannerContext.getKeepProgress();
338    // Scanning between column families and thus the scope is between cells
339    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
340    do {
341      // Check for thread interrupt status in case we have been signaled from
342      // #interruptRegionOperation.
343      region.checkInterrupt();
344
345      // We want to maintain any progress that is made towards the limits while scanning across
346      // different column families. To do this, we toggle the keep progress flag on during calls
347      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
348      scannerContext.setKeepProgress(true);
349      heap.next(results, scannerContext);
350      scannerContext.setKeepProgress(tmpKeepProgress);
351
352      nextKv = heap.peek();
353      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
354      if (!moreCellsInRow) {
355        incrementCountOfRowsScannedMetric(scannerContext);
356      }
357      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
358        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
359      } else if (scannerContext.checkSizeLimit(limitScope)) {
360        ScannerContext.NextState state =
361          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
362        return scannerContext.setScannerState(state).hasMoreValues();
363      } else if (scannerContext.checkTimeLimit(limitScope)) {
364        ScannerContext.NextState state =
365          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
366        return scannerContext.setScannerState(state).hasMoreValues();
367      }
368    } while (moreCellsInRow);
369    return nextKv != null;
370  }
371
372  /**
373   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
374   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
375   * there are more cells to be read in the row.
376   * @return true When there are more cells in the row to be read
377   */
378  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
379    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
380  }
381
382  /** Returns True if a filter rules the scanner is over, done. */
383  @Override
384  public synchronized boolean isFilterDone() throws IOException {
385    return isFilterDoneInternal();
386  }
387
388  private boolean isFilterDoneInternal() throws IOException {
389    return this.filter != null && this.filter.filterAllRemaining();
390  }
391
392  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
393    if (rpcCall.isPresent()) {
394      // If a user specifies a too-restrictive or too-slow scanner, the
395      // client might time out and disconnect while the server side
396      // is still processing the request. We should abort aggressively
397      // in that case.
398      long afterTime = rpcCall.get().disconnectSince();
399      if (afterTime >= 0) {
400        throw new CallerDisconnectedException(
401          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
402            + " after " + afterTime + " ms, since " + "caller disconnected");
403      }
404    }
405  }
406
407  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
408    long initialSizeProgress, long initialHeapSizeProgress) {
409    // Starting to scan a new row. Reset the scanner progress according to whether or not
410    // progress should be kept.
411    if (scannerContext.getKeepProgress()) {
412      // Progress should be kept. Reset to initial values seen at start of method invocation.
413      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
414        initialHeapSizeProgress);
415    } else {
416      scannerContext.clearProgress();
417    }
418  }
419
420  private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
421    throws IOException {
422    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
423    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
424    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
425
426    // Save the initial progress from the Scanner context in these local variables. The progress
427    // may need to be reset a few times if rows are being filtered out so we save the initial
428    // progress.
429    int initialBatchProgress = scannerContext.getBatchProgress();
430    long initialSizeProgress = scannerContext.getDataSizeProgress();
431    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
432
433    // Used to check time limit
434    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
435
436    // The loop here is used only when at some point during the next we determine
437    // that due to effects of filters or otherwise, we have an empty row in the result.
438    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
439    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
440    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
441    while (true) {
442      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
443        initialHeapSizeProgress);
444      checkClientDisconnect(rpcCall);
445
446      // Check for thread interrupt status in case we have been signaled from
447      // #interruptRegionOperation.
448      region.checkInterrupt();
449
450      // Let's see what we have in the storeHeap.
451      ExtendedCell current = this.storeHeap.peek();
452
453      boolean shouldStop = shouldStop(current);
454      // When has filter row is true it means that the all the cells for a particular row must be
455      // read before a filtering decision can be made. This means that filters where hasFilterRow
456      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
457      // table that has very large rows.
458      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
459
460      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
461      // would prevent the filters from being evaluated. Thus, if it is true, change the
462      // scope of any limits that could potentially create partial results to
463      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
464      if (hasFilterRow) {
465        if (LOG.isTraceEnabled()) {
466          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
467            + " formed. Changing scope of limits that may create partials");
468        }
469        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
470        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
471        limitScope = LimitScope.BETWEEN_ROWS;
472      }
473
474      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
475        if (hasFilterRow) {
476          throw new IncompatibleFilterException(
477            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
478              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
479        }
480        return true;
481      }
482
483      // Check if we were getting data from the joinedHeap and hit the limit.
484      // If not, then it's main path - getting results from storeHeap.
485      if (joinedContinuationRow == null) {
486        // First, check if we are at a stop row. If so, there are no more results.
487        if (shouldStop) {
488          if (hasFilterRow) {
489            filter.filterRowCells((List<Cell>) results);
490          }
491          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
492        }
493
494        // Check if rowkey filter wants to exclude this row. If so, loop to next.
495        // Technically, if we hit limits before on this row, we don't need this call.
496        if (filterRowKey(current)) {
497          incrementCountOfRowsFilteredMetric(scannerContext);
498          // early check, see HBASE-16296
499          if (isFilterDoneInternal()) {
500            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
501          }
502          // Typically the count of rows scanned is incremented inside #populateResult. However,
503          // here we are filtering a row based purely on its row key, preventing us from calling
504          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
505          incrementCountOfRowsScannedMetric(scannerContext);
506          boolean moreRows = nextRow(scannerContext, current);
507          if (!moreRows) {
508            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
509          }
510          results.clear();
511
512          // Read nothing as the rowkey was filtered, but still need to check time limit
513          // We also check size limit because we might have read blocks in getting to this point.
514          if (scannerContext.checkAnyLimitReached(limitScope)) {
515            return true;
516          }
517          continue;
518        }
519
520        // Ok, we are good, let's try to get some results from the main heap.
521        populateResult(results, this.storeHeap, scannerContext, current);
522        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
523          if (hasFilterRow) {
524            throw new IncompatibleFilterException(
525              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
526                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
527          }
528          return true;
529        }
530
531        // Check for thread interrupt status in case we have been signaled from
532        // #interruptRegionOperation.
533        region.checkInterrupt();
534
535        Cell nextKv = this.storeHeap.peek();
536        shouldStop = shouldStop(nextKv);
537        // save that the row was empty before filters applied to it.
538        final boolean isEmptyRow = results.isEmpty();
539
540        // We have the part of the row necessary for filtering (all of it, usually).
541        // First filter with the filterRow(List).
542        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
543        if (hasFilterRow) {
544          ret = filter.filterRowCellsWithRet((List<Cell>) results);
545
546          // We don't know how the results have changed after being filtered. Must set progress
547          // according to contents of results now.
548          if (scannerContext.getKeepProgress()) {
549            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
550              initialHeapSizeProgress);
551          } else {
552            scannerContext.clearProgress();
553          }
554          scannerContext.incrementBatchProgress(results.size());
555          for (ExtendedCell cell : (List<ExtendedCell>) results) {
556            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
557              cell.heapSize());
558          }
559        }
560
561        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
562          incrementCountOfRowsFilteredMetric(scannerContext);
563          results.clear();
564          boolean moreRows = nextRow(scannerContext, current);
565          if (!moreRows) {
566            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
567          }
568
569          // This row was totally filtered out, if this is NOT the last row,
570          // we should continue on. Otherwise, nothing else to do.
571          if (!shouldStop) {
572            // Read nothing as the cells was filtered, but still need to check time limit.
573            // We also check size limit because we might have read blocks in getting to this point.
574            if (scannerContext.checkAnyLimitReached(limitScope)) {
575              return true;
576            }
577            continue;
578          }
579          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
580        }
581
582        // Ok, we are done with storeHeap for this row.
583        // Now we may need to fetch additional, non-essential data into row.
584        // These values are not needed for filter to work, so we postpone their
585        // fetch to (possibly) reduce amount of data loads from disk.
586        if (this.joinedHeap != null) {
587          boolean mayHaveData = joinedHeapMayHaveData(current);
588          if (mayHaveData) {
589            joinedContinuationRow = current;
590            populateFromJoinedHeap(results, scannerContext);
591
592            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
593              return true;
594            }
595          }
596        }
597      } else {
598        // Populating from the joined heap was stopped by limits, populate some more.
599        populateFromJoinedHeap(results, scannerContext);
600        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
601          return true;
602        }
603      }
604      // We may have just called populateFromJoinedMap and hit the limits. If that is
605      // the case, we need to call it again on the next next() invocation.
606      if (joinedContinuationRow != null) {
607        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
608      }
609
610      // Finally, we are done with both joinedHeap and storeHeap.
611      // Double check to prevent empty rows from appearing in result. It could be
612      // the case when SingleColumnValueExcludeFilter is used.
613      if (results.isEmpty()) {
614        incrementCountOfRowsFilteredMetric(scannerContext);
615        boolean moreRows = nextRow(scannerContext, current);
616        if (!moreRows) {
617          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
618        }
619        if (!shouldStop) {
620          // We check size limit because we might have read blocks in the nextRow call above, or
621          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
622          // and for those scans which filter row _cells_ this is the only place we can actually
623          // enforce that the scan does not exceed limits since it bypasses all other checks above.
624          if (scannerContext.checkSizeLimit(limitScope)) {
625            return true;
626          }
627          continue;
628        }
629      }
630
631      if (shouldStop) {
632        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
633      } else {
634        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
635      }
636    }
637  }
638
639  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
640    region.filteredReadRequestsCount.increment();
641    if (region.getMetrics() != null) {
642      region.getMetrics().updateFilteredRecords();
643    }
644
645    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
646      return;
647    }
648
649    scannerContext.getMetrics()
650      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
651  }
652
653  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
654    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
655      return;
656    }
657
658    scannerContext.getMetrics()
659      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
660  }
661
662  /** Returns true when the joined heap may have data for the current row */
663  private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException {
664    Cell nextJoinedKv = joinedHeap.peek();
665    boolean matchCurrentRow =
666      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
667    boolean matchAfterSeek = false;
668
669    // If the next value in the joined heap does not match the current row, try to seek to the
670    // correct row
671    if (!matchCurrentRow) {
672      ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
673      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
674      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
675        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
676    }
677
678    return matchCurrentRow || matchAfterSeek;
679  }
680
681  /**
682   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
683   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
684   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
685   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
686   * filterRow() will be skipped.
687   */
688  private boolean filterRow() throws IOException {
689    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
690    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
691    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
692  }
693
694  private boolean filterRowKey(Cell current) throws IOException {
695    return filter != null && filter.filterRowKey(current);
696  }
697
698  /**
699   * A mocked list implementation - discards all updates.
700   */
701  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
702
703    @Override
704    public void add(int index, Cell element) {
705      // do nothing
706    }
707
708    @Override
709    public boolean addAll(int index, Collection<? extends Cell> c) {
710      return false; // this list is never changed as a result of an update
711    }
712
713    @Override
714    public KeyValue get(int index) {
715      throw new UnsupportedOperationException();
716    }
717
718    @Override
719    public int size() {
720      return 0;
721    }
722  };
723
724  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
725    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
726
727    // Enable skipping row mode, which disables limits and skips tracking progress for all
728    // but block size. We keep tracking block size because skipping a row in this way
729    // might involve reading blocks along the way.
730    scannerContext.setSkippingRow(true);
731
732    Cell next;
733    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
734      // Check for thread interrupt status in case we have been signaled from
735      // #interruptRegionOperation.
736      region.checkInterrupt();
737      this.storeHeap.next(MOCKED_LIST, scannerContext);
738    }
739
740    scannerContext.setSkippingRow(false);
741    resetFilters();
742
743    // Calling the hook in CP which allows it to do a fast forward
744    return this.region.getCoprocessorHost() == null
745      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
746  }
747
748  protected boolean shouldStop(Cell currentRowCell) {
749    if (currentRowCell == null) {
750      return true;
751    }
752    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
753      return false;
754    }
755    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
756    return c > 0 || (c == 0 && !includeStopRow);
757  }
758
759  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
760      justification = "this method is only called inside close which is synchronized")
761  private void closeInternal() {
762    if (storeHeap != null) {
763      storeHeap.close();
764      storeHeap = null;
765    }
766    if (joinedHeap != null) {
767      joinedHeap.close();
768      joinedHeap = null;
769    }
770    // no need to synchronize here.
771    scannerReadPoints.remove(this);
772    this.filterClosed = true;
773  }
774
775  @Override
776  public synchronized void close() {
777    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
778  }
779
780  @Override
781  public synchronized boolean reseek(byte[] row) throws IOException {
782    return TraceUtil.trace(() -> {
783      if (row == null) {
784        throw new IllegalArgumentException("Row cannot be null.");
785      }
786      boolean result = false;
787      region.startRegionOperation();
788      ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
789      try {
790        // use request seek to make use of the lazy seek option. See HBASE-5520
791        result = this.storeHeap.requestSeek(kv, true, true);
792        if (this.joinedHeap != null) {
793          result = this.joinedHeap.requestSeek(kv, true, true) || result;
794        }
795      } finally {
796        region.closeRegionOperation();
797      }
798      return result;
799    }, () -> region.createRegionSpan("RegionScanner.reseek"));
800  }
801
802  @Override
803  public void shipped() throws IOException {
804    if (storeHeap != null) {
805      storeHeap.shipped();
806    }
807    if (joinedHeap != null) {
808      joinedHeap.shipped();
809    }
810  }
811
812  @Override
813  public void run() throws IOException {
814    // This is the RPC callback method executed. We do the close in of the scanner in this
815    // callback
816    this.close();
817  }
818}