001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.UnknownScannerException;
036import org.apache.hadoop.hbase.client.IsolationLevel;
037import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.filter.FilterWrapper;
041import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
042import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
043import org.apache.hadoop.hbase.ipc.RpcCall;
044import org.apache.hadoop.hbase.ipc.RpcCallback;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.regionserver.Region.Operation;
047import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
048import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
049import org.apache.hadoop.hbase.trace.TraceUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
056
057/**
058 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
059 */
060@InterfaceAudience.Private
061class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
062
063  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
064
065  // Package local for testability
066  KeyValueHeap storeHeap = null;
067
068  /**
069   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
070   * if on-demand column family loading is enabled.
071   */
072  KeyValueHeap joinedHeap = null;
073
074  /**
075   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
076   * for which we are populating the values.
077   */
078  protected Cell joinedContinuationRow = null;
079  private boolean filterClosed = false;
080
081  protected final byte[] stopRow;
082  protected final boolean includeStopRow;
083  protected final HRegion region;
084  protected final CellComparator comparator;
085
086  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
087
088  private final long readPt;
089  private final long maxResultSize;
090  private final ScannerContext defaultScannerContext;
091  private final FilterWrapper filter;
092  private final String operationId;
093
094  private RegionServerServices rsServices;
095
096  @Override
097  public RegionInfo getRegionInfo() {
098    return region.getRegionInfo();
099  }
100
101  private static boolean hasNonce(HRegion region, long nonce) {
102    RegionServerServices rsServices = region.getRegionServerServices();
103    return nonce != HConstants.NO_NONCE && rsServices != null
104      && rsServices.getNonceManager() != null;
105  }
106
107  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
108    long nonceGroup, long nonce) throws IOException {
109    this.region = region;
110    this.maxResultSize = scan.getMaxResultSize();
111    if (scan.hasFilter()) {
112      this.filter = new FilterWrapper(scan.getFilter());
113    } else {
114      this.filter = null;
115    }
116    this.comparator = region.getCellComparator();
117    /**
118     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
119     * scanner context that can be used to enforce the batch limit in the event that a
120     * ScannerContext is not specified during an invocation of next/nextRaw
121     */
122    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
123    this.stopRow = scan.getStopRow();
124    this.includeStopRow = scan.includeStopRow();
125    this.operationId = scan.getId();
126
127    // synchronize on scannerReadPoints so that nobody calculates
128    // getSmallestReadPoint, before scannerReadPoints is updated.
129    IsolationLevel isolationLevel = scan.getIsolationLevel();
130    long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
131    this.scannerReadPoints = region.scannerReadPoints;
132    this.rsServices = region.getRegionServerServices();
133    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
134    try {
135      if (mvccReadPoint > 0) {
136        this.readPt = mvccReadPoint;
137      } else if (hasNonce(region, nonce)) {
138        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
139      } else {
140        this.readPt = region.getReadPoint(isolationLevel);
141      }
142      scannerReadPoints.put(this, this.readPt);
143    } finally {
144      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
145    }
146    initializeScanners(scan, additionalScanners);
147  }
148
149  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
150    throws IOException {
151    // Here we separate all scanners into two lists - scanner that provide data required
152    // by the filter to operate (scanners list) and all others (joinedScanners list).
153    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
154    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
155    // Store all already instantiated scanners for exception handling
156    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
157    // handle additionalScanners
158    if (additionalScanners != null && !additionalScanners.isEmpty()) {
159      scanners.addAll(additionalScanners);
160      instantiatedScanners.addAll(additionalScanners);
161    }
162
163    try {
164      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
165        HStore store = region.getStore(entry.getKey());
166        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
167        instantiatedScanners.add(scanner);
168        if (
169          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
170            || this.filter.isFamilyEssential(entry.getKey())
171        ) {
172          scanners.add(scanner);
173        } else {
174          joinedScanners.add(scanner);
175        }
176      }
177      initializeKVHeap(scanners, joinedScanners, region);
178    } catch (Throwable t) {
179      throw handleException(instantiatedScanners, t);
180    }
181  }
182
183  protected void initializeKVHeap(List<KeyValueScanner> scanners,
184    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
185    this.storeHeap = new KeyValueHeap(scanners, comparator);
186    if (!joinedScanners.isEmpty()) {
187      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
188    }
189  }
190
191  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
192    // remove scaner read point before throw the exception
193    scannerReadPoints.remove(this);
194    if (storeHeap != null) {
195      storeHeap.close();
196      storeHeap = null;
197      if (joinedHeap != null) {
198        joinedHeap.close();
199        joinedHeap = null;
200      }
201    } else {
202      // close all already instantiated scanners before throwing the exception
203      for (KeyValueScanner scanner : instantiatedScanners) {
204        scanner.close();
205      }
206    }
207    return t instanceof IOException ? (IOException) t : new IOException(t);
208  }
209
210  @Override
211  public long getMaxResultSize() {
212    return maxResultSize;
213  }
214
215  @Override
216  public long getMvccReadPoint() {
217    return this.readPt;
218  }
219
220  @Override
221  public int getBatch() {
222    return this.defaultScannerContext.getBatchLimit();
223  }
224
225  @Override
226  public String getOperationId() {
227    return operationId;
228  }
229
230  /**
231   * Reset both the filter and the old filter.
232   * @throws IOException in case a filter raises an I/O exception.
233   */
234  protected final void resetFilters() throws IOException {
235    if (filter != null) {
236      filter.reset();
237    }
238  }
239
240  @Override
241  public boolean next(List<Cell> outResults) throws IOException {
242    // apply the batching limit by default
243    return next(outResults, defaultScannerContext);
244  }
245
246  @Override
247  public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
248    throws IOException {
249    if (this.filterClosed) {
250      throw new UnknownScannerException("Scanner was closed (timed out?) "
251        + "after we renewed it. Could be caused by a very slow scanner "
252        + "or a lengthy garbage collection");
253    }
254    region.startRegionOperation(Operation.SCAN);
255    try {
256      return nextRaw(outResults, scannerContext);
257    } finally {
258      region.closeRegionOperation(Operation.SCAN);
259    }
260  }
261
262  @Override
263  public boolean nextRaw(List<Cell> outResults) throws IOException {
264    // Use the RegionScanner's context by default
265    return nextRaw(outResults, defaultScannerContext);
266  }
267
268  @Override
269  public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
270    if (storeHeap == null) {
271      // scanner is closed
272      throw new UnknownScannerException("Scanner was closed");
273    }
274    boolean moreValues = false;
275    if (outResults.isEmpty()) {
276      // Usually outResults is empty. This is true when next is called
277      // to handle scan or get operation.
278      moreValues = nextInternal(outResults, scannerContext);
279    } else {
280      List<Cell> tmpList = new ArrayList<>();
281      moreValues = nextInternal(tmpList, scannerContext);
282      outResults.addAll(tmpList);
283    }
284
285    region.addReadRequestsCount(1);
286    if (region.getMetrics() != null) {
287      region.getMetrics().updateReadRequestCount();
288    }
289
290    // If the size limit was reached it means a partial Result is being returned. Returning a
291    // partial Result means that we should not reset the filters; filters should only be reset in
292    // between rows
293    if (!scannerContext.mayHaveMoreCellsInRow()) {
294      resetFilters();
295    }
296
297    if (isFilterDoneInternal()) {
298      moreValues = false;
299    }
300    return moreValues;
301  }
302
303  /** Returns true if more cells exist after this batch, false if scanner is done */
304  private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
305    throws IOException {
306    assert joinedContinuationRow != null;
307    boolean moreValues =
308      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
309
310    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
311      // We are done with this row, reset the continuation.
312      joinedContinuationRow = null;
313    }
314    // As the data is obtained from two independent heaps, we need to
315    // ensure that result list is sorted, because Result relies on that.
316    results.sort(comparator);
317    return moreValues;
318  }
319
320  /**
321   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
322   * reached, or remainingResultSize (if not -1) is reaced
323   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
324   * @return state of last call to {@link KeyValueHeap#next()}
325   */
326  private boolean populateResult(List<Cell> results, KeyValueHeap heap,
327    ScannerContext scannerContext, Cell currentRowCell) throws IOException {
328    Cell nextKv;
329    boolean moreCellsInRow = false;
330    boolean tmpKeepProgress = scannerContext.getKeepProgress();
331    // Scanning between column families and thus the scope is between cells
332    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
333    do {
334      // Check for thread interrupt status in case we have been signaled from
335      // #interruptRegionOperation.
336      region.checkInterrupt();
337
338      // We want to maintain any progress that is made towards the limits while scanning across
339      // different column families. To do this, we toggle the keep progress flag on during calls
340      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
341      scannerContext.setKeepProgress(true);
342      heap.next(results, scannerContext);
343      scannerContext.setKeepProgress(tmpKeepProgress);
344
345      nextKv = heap.peek();
346      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
347      if (!moreCellsInRow) {
348        incrementCountOfRowsScannedMetric(scannerContext);
349      }
350      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
351        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
352      } else if (scannerContext.checkSizeLimit(limitScope)) {
353        ScannerContext.NextState state =
354          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
355        return scannerContext.setScannerState(state).hasMoreValues();
356      } else if (scannerContext.checkTimeLimit(limitScope)) {
357        ScannerContext.NextState state =
358          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
359        return scannerContext.setScannerState(state).hasMoreValues();
360      }
361    } while (moreCellsInRow);
362    return nextKv != null;
363  }
364
365  /**
366   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
367   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
368   * there are more cells to be read in the row.
369   * @return true When there are more cells in the row to be read
370   */
371  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
372    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
373  }
374
375  /** Returns True if a filter rules the scanner is over, done. */
376  @Override
377  public synchronized boolean isFilterDone() throws IOException {
378    return isFilterDoneInternal();
379  }
380
381  private boolean isFilterDoneInternal() throws IOException {
382    return this.filter != null && this.filter.filterAllRemaining();
383  }
384
385  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
386    if (rpcCall.isPresent()) {
387      // If a user specifies a too-restrictive or too-slow scanner, the
388      // client might time out and disconnect while the server side
389      // is still processing the request. We should abort aggressively
390      // in that case.
391      long afterTime = rpcCall.get().disconnectSince();
392      if (afterTime >= 0) {
393        throw new CallerDisconnectedException(
394          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
395            + " after " + afterTime + " ms, since " + "caller disconnected");
396      }
397    }
398  }
399
400  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
401    long initialSizeProgress, long initialHeapSizeProgress) {
402    // Starting to scan a new row. Reset the scanner progress according to whether or not
403    // progress should be kept.
404    if (scannerContext.getKeepProgress()) {
405      // Progress should be kept. Reset to initial values seen at start of method invocation.
406      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
407        initialHeapSizeProgress);
408    } else {
409      scannerContext.clearProgress();
410    }
411  }
412
413  private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
414    throws IOException {
415    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
416    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
417    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
418
419    // Save the initial progress from the Scanner context in these local variables. The progress
420    // may need to be reset a few times if rows are being filtered out so we save the initial
421    // progress.
422    int initialBatchProgress = scannerContext.getBatchProgress();
423    long initialSizeProgress = scannerContext.getDataSizeProgress();
424    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
425
426    // Used to check time limit
427    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
428
429    // The loop here is used only when at some point during the next we determine
430    // that due to effects of filters or otherwise, we have an empty row in the result.
431    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
432    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
433    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
434    while (true) {
435      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
436        initialHeapSizeProgress);
437      checkClientDisconnect(rpcCall);
438
439      // Check for thread interrupt status in case we have been signaled from
440      // #interruptRegionOperation.
441      region.checkInterrupt();
442
443      // Let's see what we have in the storeHeap.
444      Cell current = this.storeHeap.peek();
445
446      boolean shouldStop = shouldStop(current);
447      // When has filter row is true it means that the all the cells for a particular row must be
448      // read before a filtering decision can be made. This means that filters where hasFilterRow
449      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
450      // table that has very large rows.
451      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
452
453      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
454      // would prevent the filters from being evaluated. Thus, if it is true, change the
455      // scope of any limits that could potentially create partial results to
456      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
457      if (hasFilterRow) {
458        if (LOG.isTraceEnabled()) {
459          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
460            + " formed. Changing scope of limits that may create partials");
461        }
462        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
463        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
464        limitScope = LimitScope.BETWEEN_ROWS;
465      }
466
467      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
468        if (hasFilterRow) {
469          throw new IncompatibleFilterException(
470            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
471              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
472        }
473        return true;
474      }
475
476      // Check if we were getting data from the joinedHeap and hit the limit.
477      // If not, then it's main path - getting results from storeHeap.
478      if (joinedContinuationRow == null) {
479        // First, check if we are at a stop row. If so, there are no more results.
480        if (shouldStop) {
481          if (hasFilterRow) {
482            filter.filterRowCells(results);
483          }
484          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
485        }
486
487        // Check if rowkey filter wants to exclude this row. If so, loop to next.
488        // Technically, if we hit limits before on this row, we don't need this call.
489        if (filterRowKey(current)) {
490          incrementCountOfRowsFilteredMetric(scannerContext);
491          // early check, see HBASE-16296
492          if (isFilterDoneInternal()) {
493            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
494          }
495          // Typically the count of rows scanned is incremented inside #populateResult. However,
496          // here we are filtering a row based purely on its row key, preventing us from calling
497          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
498          incrementCountOfRowsScannedMetric(scannerContext);
499          boolean moreRows = nextRow(scannerContext, current);
500          if (!moreRows) {
501            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
502          }
503          results.clear();
504
505          // Read nothing as the rowkey was filtered, but still need to check time limit
506          if (scannerContext.checkTimeLimit(limitScope)) {
507            return true;
508          }
509          continue;
510        }
511
512        // Ok, we are good, let's try to get some results from the main heap.
513        populateResult(results, this.storeHeap, scannerContext, current);
514        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
515          if (hasFilterRow) {
516            throw new IncompatibleFilterException(
517              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
518                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
519          }
520          return true;
521        }
522
523        // Check for thread interrupt status in case we have been signaled from
524        // #interruptRegionOperation.
525        region.checkInterrupt();
526
527        Cell nextKv = this.storeHeap.peek();
528        shouldStop = shouldStop(nextKv);
529        // save that the row was empty before filters applied to it.
530        final boolean isEmptyRow = results.isEmpty();
531
532        // We have the part of the row necessary for filtering (all of it, usually).
533        // First filter with the filterRow(List).
534        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
535        if (hasFilterRow) {
536          ret = filter.filterRowCellsWithRet(results);
537
538          // We don't know how the results have changed after being filtered. Must set progress
539          // according to contents of results now.
540          if (scannerContext.getKeepProgress()) {
541            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
542              initialHeapSizeProgress);
543          } else {
544            scannerContext.clearProgress();
545          }
546          scannerContext.incrementBatchProgress(results.size());
547          for (Cell cell : results) {
548            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
549              cell.heapSize());
550          }
551        }
552
553        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
554          incrementCountOfRowsFilteredMetric(scannerContext);
555          results.clear();
556          boolean moreRows = nextRow(scannerContext, current);
557          if (!moreRows) {
558            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
559          }
560
561          // This row was totally filtered out, if this is NOT the last row,
562          // we should continue on. Otherwise, nothing else to do.
563          if (!shouldStop) {
564            // Read nothing as the cells was filtered, but still need to check time limit
565            if (scannerContext.checkTimeLimit(limitScope)) {
566              return true;
567            }
568            continue;
569          }
570          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
571        }
572
573        // Ok, we are done with storeHeap for this row.
574        // Now we may need to fetch additional, non-essential data into row.
575        // These values are not needed for filter to work, so we postpone their
576        // fetch to (possibly) reduce amount of data loads from disk.
577        if (this.joinedHeap != null) {
578          boolean mayHaveData = joinedHeapMayHaveData(current);
579          if (mayHaveData) {
580            joinedContinuationRow = current;
581            populateFromJoinedHeap(results, scannerContext);
582
583            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
584              return true;
585            }
586          }
587        }
588      } else {
589        // Populating from the joined heap was stopped by limits, populate some more.
590        populateFromJoinedHeap(results, scannerContext);
591        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
592          return true;
593        }
594      }
595      // We may have just called populateFromJoinedMap and hit the limits. If that is
596      // the case, we need to call it again on the next next() invocation.
597      if (joinedContinuationRow != null) {
598        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
599      }
600
601      // Finally, we are done with both joinedHeap and storeHeap.
602      // Double check to prevent empty rows from appearing in result. It could be
603      // the case when SingleColumnValueExcludeFilter is used.
604      if (results.isEmpty()) {
605        incrementCountOfRowsFilteredMetric(scannerContext);
606        boolean moreRows = nextRow(scannerContext, current);
607        if (!moreRows) {
608          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
609        }
610        if (!shouldStop) {
611          continue;
612        }
613      }
614
615      if (shouldStop) {
616        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
617      } else {
618        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
619      }
620    }
621  }
622
623  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
624    region.filteredReadRequestsCount.increment();
625    if (region.getMetrics() != null) {
626      region.getMetrics().updateFilteredRecords();
627    }
628
629    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
630      return;
631    }
632
633    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
634  }
635
636  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
637    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
638      return;
639    }
640
641    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
642  }
643
644  /** Returns true when the joined heap may have data for the current row */
645  private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException {
646    Cell nextJoinedKv = joinedHeap.peek();
647    boolean matchCurrentRow =
648      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
649    boolean matchAfterSeek = false;
650
651    // If the next value in the joined heap does not match the current row, try to seek to the
652    // correct row
653    if (!matchCurrentRow) {
654      Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
655      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
656      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
657        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
658    }
659
660    return matchCurrentRow || matchAfterSeek;
661  }
662
663  /**
664   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
665   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
666   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
667   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
668   * filterRow() will be skipped.
669   */
670  private boolean filterRow() throws IOException {
671    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
672    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
673    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
674  }
675
676  private boolean filterRowKey(Cell current) throws IOException {
677    return filter != null && filter.filterRowKey(current);
678  }
679
680  /**
681   * A mocked list implementation - discards all updates.
682   */
683  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
684
685    @Override
686    public void add(int index, Cell element) {
687      // do nothing
688    }
689
690    @Override
691    public boolean addAll(int index, Collection<? extends Cell> c) {
692      return false; // this list is never changed as a result of an update
693    }
694
695    @Override
696    public KeyValue get(int index) {
697      throw new UnsupportedOperationException();
698    }
699
700    @Override
701    public int size() {
702      return 0;
703    }
704  };
705
706  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
707    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
708    Cell next;
709    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
710      // Check for thread interrupt status in case we have been signaled from
711      // #interruptRegionOperation.
712      region.checkInterrupt();
713      this.storeHeap.next(MOCKED_LIST);
714    }
715    resetFilters();
716
717    // Calling the hook in CP which allows it to do a fast forward
718    return this.region.getCoprocessorHost() == null
719      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
720  }
721
722  protected boolean shouldStop(Cell currentRowCell) {
723    if (currentRowCell == null) {
724      return true;
725    }
726    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
727      return false;
728    }
729    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
730    return c > 0 || (c == 0 && !includeStopRow);
731  }
732
733  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
734      justification = "this method is only called inside close which is synchronized")
735  private void closeInternal() {
736    if (storeHeap != null) {
737      storeHeap.close();
738      storeHeap = null;
739    }
740    if (joinedHeap != null) {
741      joinedHeap.close();
742      joinedHeap = null;
743    }
744    // no need to synchronize here.
745    scannerReadPoints.remove(this);
746    this.filterClosed = true;
747  }
748
749  @Override
750  public synchronized void close() {
751    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
752  }
753
754  @Override
755  public synchronized boolean reseek(byte[] row) throws IOException {
756    return TraceUtil.trace(() -> {
757      if (row == null) {
758        throw new IllegalArgumentException("Row cannot be null.");
759      }
760      boolean result = false;
761      region.startRegionOperation();
762      Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
763      try {
764        // use request seek to make use of the lazy seek option. See HBASE-5520
765        result = this.storeHeap.requestSeek(kv, true, true);
766        if (this.joinedHeap != null) {
767          result = this.joinedHeap.requestSeek(kv, true, true) || result;
768        }
769      } finally {
770        region.closeRegionOperation();
771      }
772      return result;
773    }, () -> region.createRegionSpan("RegionScanner.reseek"));
774  }
775
776  @Override
777  public void shipped() throws IOException {
778    if (storeHeap != null) {
779      storeHeap.shipped();
780    }
781    if (joinedHeap != null) {
782      joinedHeap.shipped();
783    }
784  }
785
786  @Override
787  public void run() throws IOException {
788    // This is the RPC callback method executed. We do the close in of the scanner in this
789    // callback
790    this.close();
791  }
792}