001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableSet;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.UnknownScannerException;
041import org.apache.hadoop.hbase.client.ClientInternalHelper;
042import org.apache.hadoop.hbase.client.IsolationLevel;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
046import org.apache.hadoop.hbase.filter.FilterWrapper;
047import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
048import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
049import org.apache.hadoop.hbase.ipc.RpcCall;
050import org.apache.hadoop.hbase.ipc.RpcCallback;
051import org.apache.hadoop.hbase.ipc.RpcServer;
052import org.apache.hadoop.hbase.regionserver.Region.Operation;
053import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
054import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
055import org.apache.hadoop.hbase.trace.TraceUtil;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
062
063/**
064 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
065 */
066@InterfaceAudience.Private
067public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
068
069  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
070
071  // Package local for testability
072  KeyValueHeap storeHeap = null;
073
074  /**
075   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
076   * if on-demand column family loading is enabled.
077   */
078  KeyValueHeap joinedHeap = null;
079
080  /**
081   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
082   * for which we are populating the values.
083   */
084  protected ExtendedCell joinedContinuationRow = null;
085  private boolean filterClosed = false;
086
087  protected final byte[] stopRow;
088  protected final boolean includeStopRow;
089  protected final HRegion region;
090  protected final CellComparator comparator;
091
092  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
093
094  private final long readPt;
095  private final long maxResultSize;
096  private final ScannerContext defaultScannerContext;
097  private final FilterWrapper filter;
098  private final String operationId;
099
100  private RegionServerServices rsServices;
101
102  private final Set<Path> filesRead = new HashSet<>();
103
104  @Override
105  public RegionInfo getRegionInfo() {
106    return region.getRegionInfo();
107  }
108
109  private static boolean hasNonce(HRegion region, long nonce) {
110    RegionServerServices rsServices = region.getRegionServerServices();
111    return nonce != HConstants.NO_NONCE && rsServices != null
112      && rsServices.getNonceManager() != null;
113  }
114
115  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
116    long nonceGroup, long nonce) throws IOException {
117    this.region = region;
118    this.maxResultSize = scan.getMaxResultSize();
119    if (scan.hasFilter()) {
120      this.filter = new FilterWrapper(scan.getFilter());
121    } else {
122      this.filter = null;
123    }
124    this.comparator = region.getCellComparator();
125    /**
126     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
127     * scanner context that can be used to enforce the batch limit in the event that a
128     * ScannerContext is not specified during an invocation of next/nextRaw
129     */
130    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
131    this.stopRow = scan.getStopRow();
132    this.includeStopRow = scan.includeStopRow();
133    this.operationId = scan.getId();
134
135    // synchronize on scannerReadPoints so that nobody calculates
136    // getSmallestReadPoint, before scannerReadPoints is updated.
137    IsolationLevel isolationLevel = scan.getIsolationLevel();
138    long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
139    this.scannerReadPoints = region.scannerReadPoints;
140    this.rsServices = region.getRegionServerServices();
141    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
142    try {
143      if (mvccReadPoint > 0) {
144        this.readPt = mvccReadPoint;
145      } else if (hasNonce(region, nonce)) {
146        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
147      } else {
148        this.readPt = region.getReadPoint(isolationLevel);
149      }
150      scannerReadPoints.put(this, this.readPt);
151    } finally {
152      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
153    }
154    initializeScanners(scan, additionalScanners);
155  }
156
157  public ScannerContext getContext() {
158    return defaultScannerContext;
159  }
160
161  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
162    throws IOException {
163    // Here we separate all scanners into two lists - scanner that provide data required
164    // by the filter to operate (scanners list) and all others (joinedScanners list).
165    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
166    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
167    // Store all already instantiated scanners for exception handling
168    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
169    // handle additionalScanners
170    if (additionalScanners != null && !additionalScanners.isEmpty()) {
171      scanners.addAll(additionalScanners);
172      instantiatedScanners.addAll(additionalScanners);
173    }
174
175    try {
176      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
177        HStore store = region.getStore(entry.getKey());
178        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
179        instantiatedScanners.add(scanner);
180        if (
181          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
182            || this.filter.isFamilyEssential(entry.getKey())
183        ) {
184          scanners.add(scanner);
185        } else {
186          joinedScanners.add(scanner);
187        }
188      }
189      initializeKVHeap(scanners, joinedScanners, region);
190    } catch (Throwable t) {
191      throw handleException(instantiatedScanners, t);
192    }
193  }
194
195  protected void initializeKVHeap(List<KeyValueScanner> scanners,
196    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
197    this.storeHeap = new KeyValueHeap(scanners, comparator);
198    if (!joinedScanners.isEmpty()) {
199      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
200    }
201  }
202
203  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
204    // remove scaner read point before throw the exception
205    scannerReadPoints.remove(this);
206    if (storeHeap != null) {
207      storeHeap.close();
208      storeHeap = null;
209      if (joinedHeap != null) {
210        joinedHeap.close();
211        joinedHeap = null;
212      }
213    } else {
214      // close all already instantiated scanners before throwing the exception
215      for (KeyValueScanner scanner : instantiatedScanners) {
216        scanner.close();
217      }
218    }
219    return t instanceof IOException ? (IOException) t : new IOException(t);
220  }
221
222  @Override
223  public long getMaxResultSize() {
224    return maxResultSize;
225  }
226
227  @Override
228  public long getMvccReadPoint() {
229    return this.readPt;
230  }
231
232  @Override
233  public int getBatch() {
234    return this.defaultScannerContext.getBatchLimit();
235  }
236
237  @Override
238  public String getOperationId() {
239    return operationId;
240  }
241
242  /**
243   * Reset both the filter and the old filter.
244   * @throws IOException in case a filter raises an I/O exception.
245   */
246  protected final void resetFilters() throws IOException {
247    if (filter != null) {
248      filter.reset();
249    }
250  }
251
252  @Override
253  public boolean next(List<? super ExtendedCell> outResults) throws IOException {
254    // apply the batching limit by default
255    return next(outResults, defaultScannerContext);
256  }
257
258  @Override
259  public synchronized boolean next(List<? super ExtendedCell> outResults,
260    ScannerContext scannerContext) throws IOException {
261    if (this.filterClosed) {
262      throw new UnknownScannerException("Scanner was closed (timed out?) "
263        + "after we renewed it. Could be caused by a very slow scanner "
264        + "or a lengthy garbage collection");
265    }
266    region.startRegionOperation(Operation.SCAN);
267    try {
268      return nextRaw(outResults, scannerContext);
269    } finally {
270      region.closeRegionOperation(Operation.SCAN);
271    }
272  }
273
274  @Override
275  public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException {
276    // Use the RegionScanner's context by default
277    return nextRaw(outResults, defaultScannerContext);
278  }
279
280  @Override
281  public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext)
282    throws IOException {
283    if (storeHeap == null) {
284      // scanner is closed
285      throw new UnknownScannerException("Scanner was closed");
286    }
287    boolean moreValues = false;
288    if (outResults.isEmpty()) {
289      // Usually outResults is empty. This is true when next is called
290      // to handle scan or get operation.
291      moreValues = nextInternal(outResults, scannerContext);
292    } else {
293      List<ExtendedCell> tmpList = new ArrayList<>();
294      moreValues = nextInternal(tmpList, scannerContext);
295      outResults.addAll(tmpList);
296    }
297    region.addReadRequestsCount(1);
298    if (region.getMetrics() != null) {
299      region.getMetrics().updateReadRequestCount();
300    }
301
302    // If the size limit was reached it means a partial Result is being returned. Returning a
303    // partial Result means that we should not reset the filters; filters should only be reset in
304    // between rows
305    if (!scannerContext.mayHaveMoreCellsInRow()) {
306      resetFilters();
307    }
308
309    if (isFilterDoneInternal()) {
310      moreValues = false;
311    }
312    return moreValues;
313  }
314
315  /** Returns true if more cells exist after this batch, false if scanner is done */
316  private boolean populateFromJoinedHeap(List<? super ExtendedCell> results,
317    ScannerContext scannerContext) throws IOException {
318    assert joinedContinuationRow != null;
319    boolean moreValues =
320      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
321
322    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
323      // We are done with this row, reset the continuation.
324      joinedContinuationRow = null;
325    }
326    // As the data is obtained from two independent heaps, we need to
327    // ensure that result list is sorted, because Result relies on that.
328    ((List<Cell>) results).sort(comparator);
329    return moreValues;
330  }
331
332  /**
333   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
334   * reached, or remainingResultSize (if not -1) is reaced
335   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
336   * @return state of last call to {@link KeyValueHeap#next()}
337   */
338  private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap,
339    ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException {
340    Cell nextKv;
341    boolean moreCellsInRow = false;
342    boolean tmpKeepProgress = scannerContext.getKeepProgress();
343    // Scanning between column families and thus the scope is between cells
344    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
345    do {
346      // Check for thread interrupt status in case we have been signaled from
347      // #interruptRegionOperation.
348      region.checkInterrupt();
349
350      // We want to maintain any progress that is made towards the limits while scanning across
351      // different column families. To do this, we toggle the keep progress flag on during calls
352      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
353      scannerContext.setKeepProgress(true);
354      heap.next(results, scannerContext);
355      scannerContext.setKeepProgress(tmpKeepProgress);
356
357      nextKv = heap.peek();
358      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
359      if (!moreCellsInRow) {
360        incrementCountOfRowsScannedMetric(scannerContext);
361      }
362      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
363        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
364      } else if (scannerContext.checkSizeLimit(limitScope)) {
365        ScannerContext.NextState state =
366          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
367        return scannerContext.setScannerState(state).hasMoreValues();
368      } else if (scannerContext.checkTimeLimit(limitScope)) {
369        ScannerContext.NextState state =
370          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
371        return scannerContext.setScannerState(state).hasMoreValues();
372      }
373    } while (moreCellsInRow);
374    return nextKv != null;
375  }
376
377  /**
378   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
379   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
380   * there are more cells to be read in the row.
381   * @return true When there are more cells in the row to be read
382   */
383  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
384    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
385  }
386
387  /** Returns True if a filter rules the scanner is over, done. */
388  @Override
389  public synchronized boolean isFilterDone() throws IOException {
390    return isFilterDoneInternal();
391  }
392
393  private boolean isFilterDoneInternal() throws IOException {
394    return this.filter != null && this.filter.filterAllRemaining();
395  }
396
397  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
398    if (rpcCall.isPresent()) {
399      // If a user specifies a too-restrictive or too-slow scanner, the
400      // client might time out and disconnect while the server side
401      // is still processing the request. We should abort aggressively
402      // in that case.
403      long afterTime = rpcCall.get().disconnectSince();
404      if (afterTime >= 0) {
405        throw new CallerDisconnectedException(
406          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
407            + " after " + afterTime + " ms, since " + "caller disconnected");
408      }
409    }
410  }
411
412  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
413    long initialSizeProgress, long initialHeapSizeProgress) {
414    // Starting to scan a new row. Reset the scanner progress according to whether or not
415    // progress should be kept.
416    if (scannerContext.getKeepProgress()) {
417      // Progress should be kept. Reset to initial values seen at start of method invocation.
418      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
419        initialHeapSizeProgress);
420    } else {
421      scannerContext.clearProgress();
422    }
423  }
424
425  private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
426    throws IOException {
427    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
428    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
429    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
430
431    // Save the initial progress from the Scanner context in these local variables. The progress
432    // may need to be reset a few times if rows are being filtered out so we save the initial
433    // progress.
434    int initialBatchProgress = scannerContext.getBatchProgress();
435    long initialSizeProgress = scannerContext.getDataSizeProgress();
436    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
437
438    // Used to check time limit
439    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
440
441    // The loop here is used only when at some point during the next we determine
442    // that due to effects of filters or otherwise, we have an empty row in the result.
443    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
444    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
445    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
446    while (true) {
447      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
448        initialHeapSizeProgress);
449      checkClientDisconnect(rpcCall);
450
451      // Check for thread interrupt status in case we have been signaled from
452      // #interruptRegionOperation.
453      region.checkInterrupt();
454
455      // Let's see what we have in the storeHeap.
456      ExtendedCell current = this.storeHeap.peek();
457
458      boolean shouldStop = shouldStop(current);
459      // When has filter row is true it means that the all the cells for a particular row must be
460      // read before a filtering decision can be made. This means that filters where hasFilterRow
461      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
462      // table that has very large rows.
463      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
464
465      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
466      // would prevent the filters from being evaluated. Thus, if it is true, change the
467      // scope of any limits that could potentially create partial results to
468      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
469      if (hasFilterRow) {
470        if (LOG.isTraceEnabled()) {
471          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
472            + " formed. Changing scope of limits that may create partials");
473        }
474        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
475        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
476        limitScope = LimitScope.BETWEEN_ROWS;
477      }
478
479      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
480        if (hasFilterRow) {
481          throw new IncompatibleFilterException(
482            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
483              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
484        }
485        return true;
486      }
487
488      // Check if we were getting data from the joinedHeap and hit the limit.
489      // If not, then it's main path - getting results from storeHeap.
490      if (joinedContinuationRow == null) {
491        // First, check if we are at a stop row. If so, there are no more results.
492        if (shouldStop) {
493          if (hasFilterRow) {
494            filter.filterRowCells((List<Cell>) results);
495          }
496          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
497        }
498
499        // Check if rowkey filter wants to exclude this row. If so, loop to next.
500        // Technically, if we hit limits before on this row, we don't need this call.
501        if (filterRowKey(current)) {
502          incrementCountOfRowsFilteredMetric(scannerContext);
503          // early check, see HBASE-16296
504          if (isFilterDoneInternal()) {
505            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
506          }
507          // Typically the count of rows scanned is incremented inside #populateResult. However,
508          // here we are filtering a row based purely on its row key, preventing us from calling
509          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
510          incrementCountOfRowsScannedMetric(scannerContext);
511          boolean moreRows = nextRow(scannerContext, current);
512          if (!moreRows) {
513            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
514          }
515          results.clear();
516
517          // Read nothing as the rowkey was filtered, but still need to check time limit
518          // We also check size limit because we might have read blocks in getting to this point.
519          if (scannerContext.checkAnyLimitReached(limitScope)) {
520            return true;
521          }
522          continue;
523        }
524
525        // Ok, we are good, let's try to get some results from the main heap.
526        populateResult(results, this.storeHeap, scannerContext, current);
527        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
528          if (hasFilterRow) {
529            throw new IncompatibleFilterException(
530              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
531                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
532          }
533          return true;
534        }
535
536        // Check for thread interrupt status in case we have been signaled from
537        // #interruptRegionOperation.
538        region.checkInterrupt();
539
540        Cell nextKv = this.storeHeap.peek();
541        shouldStop = shouldStop(nextKv);
542        // save that the row was empty before filters applied to it.
543        final boolean isEmptyRow = results.isEmpty();
544
545        // We have the part of the row necessary for filtering (all of it, usually).
546        // First filter with the filterRow(List).
547        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
548        if (hasFilterRow) {
549          ret = filter.filterRowCellsWithRet((List<Cell>) results);
550
551          // We don't know how the results have changed after being filtered. Must set progress
552          // according to contents of results now.
553          if (scannerContext.getKeepProgress()) {
554            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
555              initialHeapSizeProgress);
556          } else {
557            scannerContext.clearProgress();
558          }
559          scannerContext.incrementBatchProgress(results.size());
560          for (ExtendedCell cell : (List<ExtendedCell>) results) {
561            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
562              cell.heapSize());
563          }
564        }
565
566        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
567          incrementCountOfRowsFilteredMetric(scannerContext);
568          results.clear();
569          boolean moreRows = nextRow(scannerContext, current);
570          if (!moreRows) {
571            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
572          }
573
574          // This row was totally filtered out, if this is NOT the last row,
575          // we should continue on. Otherwise, nothing else to do.
576          if (!shouldStop) {
577            // Read nothing as the cells was filtered, but still need to check time limit.
578            // We also check size limit because we might have read blocks in getting to this point.
579            if (scannerContext.checkAnyLimitReached(limitScope)) {
580              return true;
581            }
582            continue;
583          }
584          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
585        }
586
587        // Ok, we are done with storeHeap for this row.
588        // Now we may need to fetch additional, non-essential data into row.
589        // These values are not needed for filter to work, so we postpone their
590        // fetch to (possibly) reduce amount of data loads from disk.
591        if (this.joinedHeap != null) {
592          boolean mayHaveData = joinedHeapMayHaveData(current);
593          if (mayHaveData) {
594            joinedContinuationRow = current;
595            populateFromJoinedHeap(results, scannerContext);
596
597            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
598              return true;
599            }
600          }
601        }
602      } else {
603        // Populating from the joined heap was stopped by limits, populate some more.
604        populateFromJoinedHeap(results, scannerContext);
605        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
606          return true;
607        }
608      }
609      // We may have just called populateFromJoinedMap and hit the limits. If that is
610      // the case, we need to call it again on the next next() invocation.
611      if (joinedContinuationRow != null) {
612        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
613      }
614
615      // Finally, we are done with both joinedHeap and storeHeap.
616      // Double check to prevent empty rows from appearing in result. It could be
617      // the case when SingleColumnValueExcludeFilter is used.
618      if (results.isEmpty()) {
619        incrementCountOfRowsFilteredMetric(scannerContext);
620        boolean moreRows = nextRow(scannerContext, current);
621        if (!moreRows) {
622          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
623        }
624        if (!shouldStop) {
625          // We check size limit because we might have read blocks in the nextRow call above, or
626          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
627          // and for those scans which filter row _cells_ this is the only place we can actually
628          // enforce that the scan does not exceed limits since it bypasses all other checks above.
629          if (scannerContext.checkSizeLimit(limitScope)) {
630            return true;
631          }
632          continue;
633        }
634      }
635
636      if (shouldStop) {
637        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
638      } else {
639        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
640      }
641    }
642  }
643
644  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
645    region.filteredReadRequestsCount.increment();
646    if (region.getMetrics() != null) {
647      region.getMetrics().updateFilteredRecords();
648    }
649
650    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
651      return;
652    }
653
654    scannerContext.getMetrics()
655      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
656  }
657
658  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
659    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
660      return;
661    }
662
663    scannerContext.getMetrics()
664      .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
665  }
666
667  /** Returns true when the joined heap may have data for the current row */
668  private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException {
669    Cell nextJoinedKv = joinedHeap.peek();
670    boolean matchCurrentRow =
671      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
672    boolean matchAfterSeek = false;
673
674    // If the next value in the joined heap does not match the current row, try to seek to the
675    // correct row
676    if (!matchCurrentRow) {
677      ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
678      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
679      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
680        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
681    }
682
683    return matchCurrentRow || matchAfterSeek;
684  }
685
686  /**
687   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
688   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
689   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
690   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
691   * filterRow() will be skipped.
692   */
693  private boolean filterRow() throws IOException {
694    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
695    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
696    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
697  }
698
699  private boolean filterRowKey(Cell current) throws IOException {
700    return filter != null && filter.filterRowKey(current);
701  }
702
703  /**
704   * A mocked list implementation - discards all updates.
705   */
706  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
707
708    @Override
709    public void add(int index, Cell element) {
710      // do nothing
711    }
712
713    @Override
714    public boolean addAll(int index, Collection<? extends Cell> c) {
715      return false; // this list is never changed as a result of an update
716    }
717
718    @Override
719    public KeyValue get(int index) {
720      throw new UnsupportedOperationException();
721    }
722
723    @Override
724    public int size() {
725      return 0;
726    }
727  };
728
729  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
730    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
731
732    // Enable skipping row mode, which disables limits and skips tracking progress for all
733    // but block size. We keep tracking block size because skipping a row in this way
734    // might involve reading blocks along the way.
735    scannerContext.setSkippingRow(true);
736
737    Cell next;
738    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
739      // Check for thread interrupt status in case we have been signaled from
740      // #interruptRegionOperation.
741      region.checkInterrupt();
742      this.storeHeap.next(MOCKED_LIST, scannerContext);
743    }
744
745    scannerContext.setSkippingRow(false);
746    resetFilters();
747
748    // Calling the hook in CP which allows it to do a fast forward
749    return this.region.getCoprocessorHost() == null
750      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
751  }
752
753  protected boolean shouldStop(Cell currentRowCell) {
754    if (currentRowCell == null) {
755      return true;
756    }
757    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
758      return false;
759    }
760    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
761    return c > 0 || (c == 0 && !includeStopRow);
762  }
763
764  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
765      justification = "this method is only called inside close which is synchronized")
766  private void closeInternal() {
767    if (storeHeap != null) {
768      storeHeap.close();
769      filesRead.addAll(storeHeap.getFilesRead());
770      storeHeap = null;
771    }
772    if (joinedHeap != null) {
773      joinedHeap.close();
774      filesRead.addAll(joinedHeap.getFilesRead());
775      joinedHeap = null;
776    }
777    // no need to synchronize here.
778    scannerReadPoints.remove(this);
779    this.filterClosed = true;
780  }
781
782  @Override
783  public synchronized void close() {
784    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
785  }
786
787  /**
788   * Returns the set of store file paths that were successfully read by this scanner. Populated at
789   * close from the underlying store heap and joined heap (if any).
790   */
791  @Override
792  public Set<Path> getFilesRead() {
793    return Collections.unmodifiableSet(filesRead);
794  }
795
796  @Override
797  public synchronized boolean reseek(byte[] row) throws IOException {
798    return TraceUtil.trace(() -> {
799      if (row == null) {
800        throw new IllegalArgumentException("Row cannot be null.");
801      }
802      boolean result = false;
803      region.startRegionOperation();
804      ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
805      try {
806        // use request seek to make use of the lazy seek option. See HBASE-5520
807        result = this.storeHeap.requestSeek(kv, true, true);
808        if (this.joinedHeap != null) {
809          result = this.joinedHeap.requestSeek(kv, true, true) || result;
810        }
811      } finally {
812        region.closeRegionOperation();
813      }
814      return result;
815    }, () -> region.createRegionSpan("RegionScanner.reseek"));
816  }
817
818  @Override
819  public void shipped() throws IOException {
820    if (storeHeap != null) {
821      storeHeap.shipped();
822    }
823    if (joinedHeap != null) {
824      joinedHeap.shipped();
825    }
826  }
827
828  @Override
829  public void run() throws IOException {
830    // This is the RPC callback method executed. We do the close in of the scanner in this
831    // callback
832    this.close();
833  }
834}