001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.UnknownScannerException;
036import org.apache.hadoop.hbase.client.IsolationLevel;
037import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.filter.FilterWrapper;
041import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
042import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
043import org.apache.hadoop.hbase.ipc.RpcCall;
044import org.apache.hadoop.hbase.ipc.RpcCallback;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.regionserver.Region.Operation;
047import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
048import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
049import org.apache.hadoop.hbase.trace.TraceUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
056
057/**
058 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
059 */
060@InterfaceAudience.Private
061class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
062
063  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
064
065  // Package local for testability
066  KeyValueHeap storeHeap = null;
067
068  /**
069   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
070   * if on-demand column family loading is enabled.
071   */
072  KeyValueHeap joinedHeap = null;
073
074  /**
075   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
076   * for which we are populating the values.
077   */
078  protected Cell joinedContinuationRow = null;
079  private boolean filterClosed = false;
080
081  protected final byte[] stopRow;
082  protected final boolean includeStopRow;
083  protected final HRegion region;
084  protected final CellComparator comparator;
085
086  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
087
088  private final long readPt;
089  private final long maxResultSize;
090  private final ScannerContext defaultScannerContext;
091  private final FilterWrapper filter;
092  private final String operationId;
093
094  private RegionServerServices rsServices;
095
096  @Override
097  public RegionInfo getRegionInfo() {
098    return region.getRegionInfo();
099  }
100
101  private static boolean hasNonce(HRegion region, long nonce) {
102    RegionServerServices rsServices = region.getRegionServerServices();
103    return nonce != HConstants.NO_NONCE && rsServices != null
104      && rsServices.getNonceManager() != null;
105  }
106
107  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
108    long nonceGroup, long nonce) throws IOException {
109    this.region = region;
110    this.maxResultSize = scan.getMaxResultSize();
111    if (scan.hasFilter()) {
112      this.filter = new FilterWrapper(scan.getFilter());
113    } else {
114      this.filter = null;
115    }
116    this.comparator = region.getCellComparator();
117    /**
118     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
119     * scanner context that can be used to enforce the batch limit in the event that a
120     * ScannerContext is not specified during an invocation of next/nextRaw
121     */
122    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
123    this.stopRow = scan.getStopRow();
124    this.includeStopRow = scan.includeStopRow();
125    this.operationId = scan.getId();
126
127    // synchronize on scannerReadPoints so that nobody calculates
128    // getSmallestReadPoint, before scannerReadPoints is updated.
129    IsolationLevel isolationLevel = scan.getIsolationLevel();
130    long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
131    this.scannerReadPoints = region.scannerReadPoints;
132    this.rsServices = region.getRegionServerServices();
133    synchronized (scannerReadPoints) {
134      if (mvccReadPoint > 0) {
135        this.readPt = mvccReadPoint;
136      } else if (hasNonce(region, nonce)) {
137        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
138      } else {
139        this.readPt = region.getReadPoint(isolationLevel);
140      }
141      scannerReadPoints.put(this, this.readPt);
142    }
143    initializeScanners(scan, additionalScanners);
144  }
145
146  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
147    throws IOException {
148    // Here we separate all scanners into two lists - scanner that provide data required
149    // by the filter to operate (scanners list) and all others (joinedScanners list).
150    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
151    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
152    // Store all already instantiated scanners for exception handling
153    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
154    // handle additionalScanners
155    if (additionalScanners != null && !additionalScanners.isEmpty()) {
156      scanners.addAll(additionalScanners);
157      instantiatedScanners.addAll(additionalScanners);
158    }
159
160    try {
161      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
162        HStore store = region.getStore(entry.getKey());
163        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
164        instantiatedScanners.add(scanner);
165        if (
166          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
167            || this.filter.isFamilyEssential(entry.getKey())
168        ) {
169          scanners.add(scanner);
170        } else {
171          joinedScanners.add(scanner);
172        }
173      }
174      initializeKVHeap(scanners, joinedScanners, region);
175    } catch (Throwable t) {
176      throw handleException(instantiatedScanners, t);
177    }
178  }
179
180  protected void initializeKVHeap(List<KeyValueScanner> scanners,
181    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
182    this.storeHeap = new KeyValueHeap(scanners, comparator);
183    if (!joinedScanners.isEmpty()) {
184      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
185    }
186  }
187
188  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
189    // remove scaner read point before throw the exception
190    scannerReadPoints.remove(this);
191    if (storeHeap != null) {
192      storeHeap.close();
193      storeHeap = null;
194      if (joinedHeap != null) {
195        joinedHeap.close();
196        joinedHeap = null;
197      }
198    } else {
199      // close all already instantiated scanners before throwing the exception
200      for (KeyValueScanner scanner : instantiatedScanners) {
201        scanner.close();
202      }
203    }
204    return t instanceof IOException ? (IOException) t : new IOException(t);
205  }
206
207  @Override
208  public long getMaxResultSize() {
209    return maxResultSize;
210  }
211
212  @Override
213  public long getMvccReadPoint() {
214    return this.readPt;
215  }
216
217  @Override
218  public int getBatch() {
219    return this.defaultScannerContext.getBatchLimit();
220  }
221
222  @Override
223  public String getOperationId() {
224    return operationId;
225  }
226
227  /**
228   * Reset both the filter and the old filter.
229   * @throws IOException in case a filter raises an I/O exception.
230   */
231  protected final void resetFilters() throws IOException {
232    if (filter != null) {
233      filter.reset();
234    }
235  }
236
237  @Override
238  public boolean next(List<Cell> outResults) throws IOException {
239    // apply the batching limit by default
240    return next(outResults, defaultScannerContext);
241  }
242
243  @Override
244  public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
245    throws IOException {
246    if (this.filterClosed) {
247      throw new UnknownScannerException("Scanner was closed (timed out?) "
248        + "after we renewed it. Could be caused by a very slow scanner "
249        + "or a lengthy garbage collection");
250    }
251    region.startRegionOperation(Operation.SCAN);
252    try {
253      return nextRaw(outResults, scannerContext);
254    } finally {
255      region.closeRegionOperation(Operation.SCAN);
256    }
257  }
258
259  @Override
260  public boolean nextRaw(List<Cell> outResults) throws IOException {
261    // Use the RegionScanner's context by default
262    return nextRaw(outResults, defaultScannerContext);
263  }
264
265  @Override
266  public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
267    if (storeHeap == null) {
268      // scanner is closed
269      throw new UnknownScannerException("Scanner was closed");
270    }
271    boolean moreValues = false;
272    if (outResults.isEmpty()) {
273      // Usually outResults is empty. This is true when next is called
274      // to handle scan or get operation.
275      moreValues = nextInternal(outResults, scannerContext);
276    } else {
277      List<Cell> tmpList = new ArrayList<>();
278      moreValues = nextInternal(tmpList, scannerContext);
279      outResults.addAll(tmpList);
280    }
281
282    region.addReadRequestsCount(1);
283    if (region.getMetrics() != null) {
284      region.getMetrics().updateReadRequestCount();
285    }
286
287    // If the size limit was reached it means a partial Result is being returned. Returning a
288    // partial Result means that we should not reset the filters; filters should only be reset in
289    // between rows
290    if (!scannerContext.mayHaveMoreCellsInRow()) {
291      resetFilters();
292    }
293
294    if (isFilterDoneInternal()) {
295      moreValues = false;
296    }
297    return moreValues;
298  }
299
300  /**
301   * @return true if more cells exist after this batch, false if scanner is done
302   */
303  private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
304    throws IOException {
305    assert joinedContinuationRow != null;
306    boolean moreValues =
307      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
308
309    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
310      // We are done with this row, reset the continuation.
311      joinedContinuationRow = null;
312    }
313    // As the data is obtained from two independent heaps, we need to
314    // ensure that result list is sorted, because Result relies on that.
315    results.sort(comparator);
316    return moreValues;
317  }
318
319  /**
320   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
321   * reached, or remainingResultSize (if not -1) is reaced
322   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
323   * @return state of last call to {@link KeyValueHeap#next()}
324   */
325  private boolean populateResult(List<Cell> results, KeyValueHeap heap,
326    ScannerContext scannerContext, Cell currentRowCell) throws IOException {
327    Cell nextKv;
328    boolean moreCellsInRow = false;
329    boolean tmpKeepProgress = scannerContext.getKeepProgress();
330    // Scanning between column families and thus the scope is between cells
331    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
332    do {
333      // Check for thread interrupt status in case we have been signaled from
334      // #interruptRegionOperation.
335      region.checkInterrupt();
336
337      // We want to maintain any progress that is made towards the limits while scanning across
338      // different column families. To do this, we toggle the keep progress flag on during calls
339      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
340      scannerContext.setKeepProgress(true);
341      heap.next(results, scannerContext);
342      scannerContext.setKeepProgress(tmpKeepProgress);
343
344      nextKv = heap.peek();
345      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
346      if (!moreCellsInRow) {
347        incrementCountOfRowsScannedMetric(scannerContext);
348      }
349      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
350        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
351      } else if (scannerContext.checkSizeLimit(limitScope)) {
352        ScannerContext.NextState state =
353          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
354        return scannerContext.setScannerState(state).hasMoreValues();
355      } else if (scannerContext.checkTimeLimit(limitScope)) {
356        ScannerContext.NextState state =
357          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
358        return scannerContext.setScannerState(state).hasMoreValues();
359      }
360    } while (moreCellsInRow);
361    return nextKv != null;
362  }
363
364  /**
365   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
366   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
367   * there are more cells to be read in the row.
368   * @return true When there are more cells in the row to be read
369   */
370  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
371    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
372  }
373
374  /**
375   * @return True if a filter rules the scanner is over, done.
376   */
377  @Override
378  public synchronized boolean isFilterDone() throws IOException {
379    return isFilterDoneInternal();
380  }
381
382  private boolean isFilterDoneInternal() throws IOException {
383    return this.filter != null && this.filter.filterAllRemaining();
384  }
385
386  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
387    if (rpcCall.isPresent()) {
388      // If a user specifies a too-restrictive or too-slow scanner, the
389      // client might time out and disconnect while the server side
390      // is still processing the request. We should abort aggressively
391      // in that case.
392      long afterTime = rpcCall.get().disconnectSince();
393      if (afterTime >= 0) {
394        throw new CallerDisconnectedException(
395          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
396            + " after " + afterTime + " ms, since " + "caller disconnected");
397      }
398    }
399  }
400
401  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
402    long initialSizeProgress, long initialHeapSizeProgress) {
403    // Starting to scan a new row. Reset the scanner progress according to whether or not
404    // progress should be kept.
405    if (scannerContext.getKeepProgress()) {
406      // Progress should be kept. Reset to initial values seen at start of method invocation.
407      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
408        initialHeapSizeProgress);
409    } else {
410      scannerContext.clearProgress();
411    }
412  }
413
414  private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
415    throws IOException {
416    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
417    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
418    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
419
420    // Save the initial progress from the Scanner context in these local variables. The progress
421    // may need to be reset a few times if rows are being filtered out so we save the initial
422    // progress.
423    int initialBatchProgress = scannerContext.getBatchProgress();
424    long initialSizeProgress = scannerContext.getDataSizeProgress();
425    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
426
427    // Used to check time limit
428    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
429
430    // The loop here is used only when at some point during the next we determine
431    // that due to effects of filters or otherwise, we have an empty row in the result.
432    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
433    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
434    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
435    while (true) {
436      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
437        initialHeapSizeProgress);
438      checkClientDisconnect(rpcCall);
439
440      // Check for thread interrupt status in case we have been signaled from
441      // #interruptRegionOperation.
442      region.checkInterrupt();
443
444      // Let's see what we have in the storeHeap.
445      Cell current = this.storeHeap.peek();
446
447      boolean shouldStop = shouldStop(current);
448      // When has filter row is true it means that the all the cells for a particular row must be
449      // read before a filtering decision can be made. This means that filters where hasFilterRow
450      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
451      // table that has very large rows.
452      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
453
454      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
455      // would prevent the filters from being evaluated. Thus, if it is true, change the
456      // scope of any limits that could potentially create partial results to
457      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
458      if (hasFilterRow) {
459        if (LOG.isTraceEnabled()) {
460          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
461            + " formed. Changing scope of limits that may create partials");
462        }
463        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
464        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
465        limitScope = LimitScope.BETWEEN_ROWS;
466      }
467
468      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
469        if (hasFilterRow) {
470          throw new IncompatibleFilterException(
471            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
472              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
473        }
474        return true;
475      }
476
477      // Check if we were getting data from the joinedHeap and hit the limit.
478      // If not, then it's main path - getting results from storeHeap.
479      if (joinedContinuationRow == null) {
480        // First, check if we are at a stop row. If so, there are no more results.
481        if (shouldStop) {
482          if (hasFilterRow) {
483            filter.filterRowCells(results);
484          }
485          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
486        }
487
488        // Check if rowkey filter wants to exclude this row. If so, loop to next.
489        // Technically, if we hit limits before on this row, we don't need this call.
490        if (filterRowKey(current)) {
491          incrementCountOfRowsFilteredMetric(scannerContext);
492          // early check, see HBASE-16296
493          if (isFilterDoneInternal()) {
494            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
495          }
496          // Typically the count of rows scanned is incremented inside #populateResult. However,
497          // here we are filtering a row based purely on its row key, preventing us from calling
498          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
499          incrementCountOfRowsScannedMetric(scannerContext);
500          boolean moreRows = nextRow(scannerContext, current);
501          if (!moreRows) {
502            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
503          }
504          results.clear();
505
506          // Read nothing as the rowkey was filtered, but still need to check time limit
507          if (scannerContext.checkTimeLimit(limitScope)) {
508            return true;
509          }
510          continue;
511        }
512
513        // Ok, we are good, let's try to get some results from the main heap.
514        populateResult(results, this.storeHeap, scannerContext, current);
515        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
516          if (hasFilterRow) {
517            throw new IncompatibleFilterException(
518              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
519                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
520          }
521          return true;
522        }
523
524        // Check for thread interrupt status in case we have been signaled from
525        // #interruptRegionOperation.
526        region.checkInterrupt();
527
528        Cell nextKv = this.storeHeap.peek();
529        shouldStop = shouldStop(nextKv);
530        // save that the row was empty before filters applied to it.
531        final boolean isEmptyRow = results.isEmpty();
532
533        // We have the part of the row necessary for filtering (all of it, usually).
534        // First filter with the filterRow(List).
535        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
536        if (hasFilterRow) {
537          ret = filter.filterRowCellsWithRet(results);
538
539          // We don't know how the results have changed after being filtered. Must set progress
540          // according to contents of results now.
541          if (scannerContext.getKeepProgress()) {
542            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
543              initialHeapSizeProgress);
544          } else {
545            scannerContext.clearProgress();
546          }
547          scannerContext.incrementBatchProgress(results.size());
548          for (Cell cell : results) {
549            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
550              cell.heapSize());
551          }
552        }
553
554        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
555          incrementCountOfRowsFilteredMetric(scannerContext);
556          results.clear();
557          boolean moreRows = nextRow(scannerContext, current);
558          if (!moreRows) {
559            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
560          }
561
562          // This row was totally filtered out, if this is NOT the last row,
563          // we should continue on. Otherwise, nothing else to do.
564          if (!shouldStop) {
565            // Read nothing as the cells was filtered, but still need to check time limit
566            if (scannerContext.checkTimeLimit(limitScope)) {
567              return true;
568            }
569            continue;
570          }
571          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
572        }
573
574        // Ok, we are done with storeHeap for this row.
575        // Now we may need to fetch additional, non-essential data into row.
576        // These values are not needed for filter to work, so we postpone their
577        // fetch to (possibly) reduce amount of data loads from disk.
578        if (this.joinedHeap != null) {
579          boolean mayHaveData = joinedHeapMayHaveData(current);
580          if (mayHaveData) {
581            joinedContinuationRow = current;
582            populateFromJoinedHeap(results, scannerContext);
583
584            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
585              return true;
586            }
587          }
588        }
589      } else {
590        // Populating from the joined heap was stopped by limits, populate some more.
591        populateFromJoinedHeap(results, scannerContext);
592        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
593          return true;
594        }
595      }
596      // We may have just called populateFromJoinedMap and hit the limits. If that is
597      // the case, we need to call it again on the next next() invocation.
598      if (joinedContinuationRow != null) {
599        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
600      }
601
602      // Finally, we are done with both joinedHeap and storeHeap.
603      // Double check to prevent empty rows from appearing in result. It could be
604      // the case when SingleColumnValueExcludeFilter is used.
605      if (results.isEmpty()) {
606        incrementCountOfRowsFilteredMetric(scannerContext);
607        boolean moreRows = nextRow(scannerContext, current);
608        if (!moreRows) {
609          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
610        }
611        if (!shouldStop) {
612          continue;
613        }
614      }
615
616      if (shouldStop) {
617        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
618      } else {
619        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
620      }
621    }
622  }
623
624  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
625    region.filteredReadRequestsCount.increment();
626    if (region.getMetrics() != null) {
627      region.getMetrics().updateFilteredRecords();
628    }
629
630    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
631      return;
632    }
633
634    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
635  }
636
637  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
638    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
639      return;
640    }
641
642    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
643  }
644
645  /**
646   * @return true when the joined heap may have data for the current row
647   */
648  private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException {
649    Cell nextJoinedKv = joinedHeap.peek();
650    boolean matchCurrentRow =
651      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
652    boolean matchAfterSeek = false;
653
654    // If the next value in the joined heap does not match the current row, try to seek to the
655    // correct row
656    if (!matchCurrentRow) {
657      Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
658      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
659      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
660        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
661    }
662
663    return matchCurrentRow || matchAfterSeek;
664  }
665
666  /**
667   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
668   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
669   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
670   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
671   * filterRow() will be skipped.
672   */
673  private boolean filterRow() throws IOException {
674    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
675    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
676    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
677  }
678
679  private boolean filterRowKey(Cell current) throws IOException {
680    return filter != null && filter.filterRowKey(current);
681  }
682
683  /**
684   * A mocked list implementation - discards all updates.
685   */
686  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
687
688    @Override
689    public void add(int index, Cell element) {
690      // do nothing
691    }
692
693    @Override
694    public boolean addAll(int index, Collection<? extends Cell> c) {
695      return false; // this list is never changed as a result of an update
696    }
697
698    @Override
699    public KeyValue get(int index) {
700      throw new UnsupportedOperationException();
701    }
702
703    @Override
704    public int size() {
705      return 0;
706    }
707  };
708
709  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
710    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
711    Cell next;
712    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
713      // Check for thread interrupt status in case we have been signaled from
714      // #interruptRegionOperation.
715      region.checkInterrupt();
716      this.storeHeap.next(MOCKED_LIST);
717    }
718    resetFilters();
719
720    // Calling the hook in CP which allows it to do a fast forward
721    return this.region.getCoprocessorHost() == null
722      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
723  }
724
725  protected boolean shouldStop(Cell currentRowCell) {
726    if (currentRowCell == null) {
727      return true;
728    }
729    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
730      return false;
731    }
732    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
733    return c > 0 || (c == 0 && !includeStopRow);
734  }
735
736  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
737      justification = "this method is only called inside close which is synchronized")
738  private void closeInternal() {
739    if (storeHeap != null) {
740      storeHeap.close();
741      storeHeap = null;
742    }
743    if (joinedHeap != null) {
744      joinedHeap.close();
745      joinedHeap = null;
746    }
747    // no need to synchronize here.
748    scannerReadPoints.remove(this);
749    this.filterClosed = true;
750  }
751
752  @Override
753  public synchronized void close() {
754    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
755  }
756
757  @Override
758  public synchronized boolean reseek(byte[] row) throws IOException {
759    return TraceUtil.trace(() -> {
760      if (row == null) {
761        throw new IllegalArgumentException("Row cannot be null.");
762      }
763      boolean result = false;
764      region.startRegionOperation();
765      Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
766      try {
767        // use request seek to make use of the lazy seek option. See HBASE-5520
768        result = this.storeHeap.requestSeek(kv, true, true);
769        if (this.joinedHeap != null) {
770          result = this.joinedHeap.requestSeek(kv, true, true) || result;
771        }
772      } finally {
773        region.closeRegionOperation();
774      }
775      return result;
776    }, () -> region.createRegionSpan("RegionScanner.reseek"));
777  }
778
779  @Override
780  public void shipped() throws IOException {
781    if (storeHeap != null) {
782      storeHeap.shipped();
783    }
784    if (joinedHeap != null) {
785      joinedHeap.shipped();
786    }
787  }
788
789  @Override
790  public void run() throws IOException {
791    // This is the RPC callback method executed. We do the close in of the scanner in this
792    // callback
793    this.close();
794  }
795}