001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.UnknownScannerException;
036import org.apache.hadoop.hbase.client.IsolationLevel;
037import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.filter.FilterWrapper;
041import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
042import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
043import org.apache.hadoop.hbase.ipc.RpcCall;
044import org.apache.hadoop.hbase.ipc.RpcCallback;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.regionserver.Region.Operation;
047import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
048import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
049import org.apache.hadoop.hbase.trace.TraceUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
056
057/**
058 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
059 */
060@InterfaceAudience.Private
061class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
062
063  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
064
065  // Package local for testability
066  KeyValueHeap storeHeap = null;
067
068  /**
069   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
070   * if on-demand column family loading is enabled.
071   */
072  KeyValueHeap joinedHeap = null;
073
074  /**
075   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
076   * for which we are populating the values.
077   */
078  protected Cell joinedContinuationRow = null;
079  private boolean filterClosed = false;
080
081  protected final byte[] stopRow;
082  protected final boolean includeStopRow;
083  protected final HRegion region;
084  protected final CellComparator comparator;
085
086  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
087
088  private final long readPt;
089  private final long maxResultSize;
090  private final ScannerContext defaultScannerContext;
091  private final FilterWrapper filter;
092  private final String operationId;
093
094  private RegionServerServices rsServices;
095
096  @Override
097  public RegionInfo getRegionInfo() {
098    return region.getRegionInfo();
099  }
100
101  private static boolean hasNonce(HRegion region, long nonce) {
102    RegionServerServices rsServices = region.getRegionServerServices();
103    return nonce != HConstants.NO_NONCE && rsServices != null
104      && rsServices.getNonceManager() != null;
105  }
106
107  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
108    long nonceGroup, long nonce) throws IOException {
109    this.region = region;
110    this.maxResultSize = scan.getMaxResultSize();
111    if (scan.hasFilter()) {
112      this.filter = new FilterWrapper(scan.getFilter());
113    } else {
114      this.filter = null;
115    }
116    this.comparator = region.getCellComparator();
117    /**
118     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
119     * scanner context that can be used to enforce the batch limit in the event that a
120     * ScannerContext is not specified during an invocation of next/nextRaw
121     */
122    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
123    this.stopRow = scan.getStopRow();
124    this.includeStopRow = scan.includeStopRow();
125    this.operationId = scan.getId();
126
127    // synchronize on scannerReadPoints so that nobody calculates
128    // getSmallestReadPoint, before scannerReadPoints is updated.
129    IsolationLevel isolationLevel = scan.getIsolationLevel();
130    long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
131    this.scannerReadPoints = region.scannerReadPoints;
132    this.rsServices = region.getRegionServerServices();
133    synchronized (scannerReadPoints) {
134      if (mvccReadPoint > 0) {
135        this.readPt = mvccReadPoint;
136      } else if (hasNonce(region, nonce)) {
137        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
138      } else {
139        this.readPt = region.getReadPoint(isolationLevel);
140      }
141      scannerReadPoints.put(this, this.readPt);
142    }
143    initializeScanners(scan, additionalScanners);
144  }
145
146  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
147    throws IOException {
148    // Here we separate all scanners into two lists - scanner that provide data required
149    // by the filter to operate (scanners list) and all others (joinedScanners list).
150    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
151    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
152    // Store all already instantiated scanners for exception handling
153    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
154    // handle additionalScanners
155    if (additionalScanners != null && !additionalScanners.isEmpty()) {
156      scanners.addAll(additionalScanners);
157      instantiatedScanners.addAll(additionalScanners);
158    }
159
160    try {
161      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
162        HStore store = region.getStore(entry.getKey());
163        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
164        instantiatedScanners.add(scanner);
165        if (
166          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
167            || this.filter.isFamilyEssential(entry.getKey())
168        ) {
169          scanners.add(scanner);
170        } else {
171          joinedScanners.add(scanner);
172        }
173      }
174      initializeKVHeap(scanners, joinedScanners, region);
175    } catch (Throwable t) {
176      throw handleException(instantiatedScanners, t);
177    }
178  }
179
180  protected void initializeKVHeap(List<KeyValueScanner> scanners,
181    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
182    this.storeHeap = new KeyValueHeap(scanners, comparator);
183    if (!joinedScanners.isEmpty()) {
184      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
185    }
186  }
187
188  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
189    // remove scaner read point before throw the exception
190    scannerReadPoints.remove(this);
191    if (storeHeap != null) {
192      storeHeap.close();
193      storeHeap = null;
194      if (joinedHeap != null) {
195        joinedHeap.close();
196        joinedHeap = null;
197      }
198    } else {
199      // close all already instantiated scanners before throwing the exception
200      for (KeyValueScanner scanner : instantiatedScanners) {
201        scanner.close();
202      }
203    }
204    return t instanceof IOException ? (IOException) t : new IOException(t);
205  }
206
207  @Override
208  public long getMaxResultSize() {
209    return maxResultSize;
210  }
211
212  @Override
213  public long getMvccReadPoint() {
214    return this.readPt;
215  }
216
217  @Override
218  public int getBatch() {
219    return this.defaultScannerContext.getBatchLimit();
220  }
221
222  @Override
223  public String getOperationId() {
224    return operationId;
225  }
226
227  /**
228   * Reset both the filter and the old filter.
229   * @throws IOException in case a filter raises an I/O exception.
230   */
231  protected final void resetFilters() throws IOException {
232    if (filter != null) {
233      filter.reset();
234    }
235  }
236
237  @Override
238  public boolean next(List<Cell> outResults) throws IOException {
239    // apply the batching limit by default
240    return next(outResults, defaultScannerContext);
241  }
242
243  @Override
244  public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
245    throws IOException {
246    if (this.filterClosed) {
247      throw new UnknownScannerException("Scanner was closed (timed out?) "
248        + "after we renewed it. Could be caused by a very slow scanner "
249        + "or a lengthy garbage collection");
250    }
251    region.startRegionOperation(Operation.SCAN);
252    try {
253      return nextRaw(outResults, scannerContext);
254    } finally {
255      region.closeRegionOperation(Operation.SCAN);
256    }
257  }
258
259  @Override
260  public boolean nextRaw(List<Cell> outResults) throws IOException {
261    // Use the RegionScanner's context by default
262    return nextRaw(outResults, defaultScannerContext);
263  }
264
265  @Override
266  public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
267    if (storeHeap == null) {
268      // scanner is closed
269      throw new UnknownScannerException("Scanner was closed");
270    }
271    boolean moreValues = false;
272    if (outResults.isEmpty()) {
273      // Usually outResults is empty. This is true when next is called
274      // to handle scan or get operation.
275      moreValues = nextInternal(outResults, scannerContext);
276    } else {
277      List<Cell> tmpList = new ArrayList<>();
278      moreValues = nextInternal(tmpList, scannerContext);
279      outResults.addAll(tmpList);
280    }
281
282    region.addReadRequestsCount(1);
283    if (region.getMetrics() != null) {
284      region.getMetrics().updateReadRequestCount();
285    }
286
287    // If the size limit was reached it means a partial Result is being returned. Returning a
288    // partial Result means that we should not reset the filters; filters should only be reset in
289    // between rows
290    if (!scannerContext.mayHaveMoreCellsInRow()) {
291      resetFilters();
292    }
293
294    if (isFilterDoneInternal()) {
295      moreValues = false;
296    }
297    return moreValues;
298  }
299
300  /** Returns true if more cells exist after this batch, false if scanner is done */
301  private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
302    throws IOException {
303    assert joinedContinuationRow != null;
304    boolean moreValues =
305      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
306
307    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
308      // We are done with this row, reset the continuation.
309      joinedContinuationRow = null;
310    }
311    // As the data is obtained from two independent heaps, we need to
312    // ensure that result list is sorted, because Result relies on that.
313    results.sort(comparator);
314    return moreValues;
315  }
316
317  /**
318   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
319   * reached, or remainingResultSize (if not -1) is reaced
320   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
321   * @return state of last call to {@link KeyValueHeap#next()}
322   */
323  private boolean populateResult(List<Cell> results, KeyValueHeap heap,
324    ScannerContext scannerContext, Cell currentRowCell) throws IOException {
325    Cell nextKv;
326    boolean moreCellsInRow = false;
327    boolean tmpKeepProgress = scannerContext.getKeepProgress();
328    // Scanning between column families and thus the scope is between cells
329    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
330    do {
331      // Check for thread interrupt status in case we have been signaled from
332      // #interruptRegionOperation.
333      region.checkInterrupt();
334
335      // We want to maintain any progress that is made towards the limits while scanning across
336      // different column families. To do this, we toggle the keep progress flag on during calls
337      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
338      scannerContext.setKeepProgress(true);
339      heap.next(results, scannerContext);
340      scannerContext.setKeepProgress(tmpKeepProgress);
341
342      nextKv = heap.peek();
343      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
344      if (!moreCellsInRow) {
345        incrementCountOfRowsScannedMetric(scannerContext);
346      }
347      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
348        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
349      } else if (scannerContext.checkSizeLimit(limitScope)) {
350        ScannerContext.NextState state =
351          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
352        return scannerContext.setScannerState(state).hasMoreValues();
353      } else if (scannerContext.checkTimeLimit(limitScope)) {
354        ScannerContext.NextState state =
355          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
356        return scannerContext.setScannerState(state).hasMoreValues();
357      }
358    } while (moreCellsInRow);
359    return nextKv != null;
360  }
361
362  /**
363   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
364   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
365   * there are more cells to be read in the row.
366   * @return true When there are more cells in the row to be read
367   */
368  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
369    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
370  }
371
372  /** Returns True if a filter rules the scanner is over, done. */
373  @Override
374  public synchronized boolean isFilterDone() throws IOException {
375    return isFilterDoneInternal();
376  }
377
378  private boolean isFilterDoneInternal() throws IOException {
379    return this.filter != null && this.filter.filterAllRemaining();
380  }
381
382  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
383    if (rpcCall.isPresent()) {
384      // If a user specifies a too-restrictive or too-slow scanner, the
385      // client might time out and disconnect while the server side
386      // is still processing the request. We should abort aggressively
387      // in that case.
388      long afterTime = rpcCall.get().disconnectSince();
389      if (afterTime >= 0) {
390        throw new CallerDisconnectedException(
391          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
392            + " after " + afterTime + " ms, since " + "caller disconnected");
393      }
394    }
395  }
396
397  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
398    long initialSizeProgress, long initialHeapSizeProgress) {
399    // Starting to scan a new row. Reset the scanner progress according to whether or not
400    // progress should be kept.
401    if (scannerContext.getKeepProgress()) {
402      // Progress should be kept. Reset to initial values seen at start of method invocation.
403      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
404        initialHeapSizeProgress);
405    } else {
406      scannerContext.clearProgress();
407    }
408  }
409
410  private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
411    throws IOException {
412    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
413    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
414    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
415
416    // Save the initial progress from the Scanner context in these local variables. The progress
417    // may need to be reset a few times if rows are being filtered out so we save the initial
418    // progress.
419    int initialBatchProgress = scannerContext.getBatchProgress();
420    long initialSizeProgress = scannerContext.getDataSizeProgress();
421    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
422
423    // Used to check time limit
424    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
425
426    // The loop here is used only when at some point during the next we determine
427    // that due to effects of filters or otherwise, we have an empty row in the result.
428    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
429    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
430    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
431    while (true) {
432      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
433        initialHeapSizeProgress);
434      checkClientDisconnect(rpcCall);
435
436      // Check for thread interrupt status in case we have been signaled from
437      // #interruptRegionOperation.
438      region.checkInterrupt();
439
440      // Let's see what we have in the storeHeap.
441      Cell current = this.storeHeap.peek();
442
443      boolean shouldStop = shouldStop(current);
444      // When has filter row is true it means that the all the cells for a particular row must be
445      // read before a filtering decision can be made. This means that filters where hasFilterRow
446      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
447      // table that has very large rows.
448      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
449
450      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
451      // would prevent the filters from being evaluated. Thus, if it is true, change the
452      // scope of any limits that could potentially create partial results to
453      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
454      if (hasFilterRow) {
455        if (LOG.isTraceEnabled()) {
456          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
457            + " formed. Changing scope of limits that may create partials");
458        }
459        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
460        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
461        limitScope = LimitScope.BETWEEN_ROWS;
462      }
463
464      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
465        if (hasFilterRow) {
466          throw new IncompatibleFilterException(
467            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
468              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
469        }
470        return true;
471      }
472
473      // Check if we were getting data from the joinedHeap and hit the limit.
474      // If not, then it's main path - getting results from storeHeap.
475      if (joinedContinuationRow == null) {
476        // First, check if we are at a stop row. If so, there are no more results.
477        if (shouldStop) {
478          if (hasFilterRow) {
479            filter.filterRowCells(results);
480          }
481          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
482        }
483
484        // Check if rowkey filter wants to exclude this row. If so, loop to next.
485        // Technically, if we hit limits before on this row, we don't need this call.
486        if (filterRowKey(current)) {
487          incrementCountOfRowsFilteredMetric(scannerContext);
488          // early check, see HBASE-16296
489          if (isFilterDoneInternal()) {
490            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
491          }
492          // Typically the count of rows scanned is incremented inside #populateResult. However,
493          // here we are filtering a row based purely on its row key, preventing us from calling
494          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
495          incrementCountOfRowsScannedMetric(scannerContext);
496          boolean moreRows = nextRow(scannerContext, current);
497          if (!moreRows) {
498            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
499          }
500          results.clear();
501
502          // Read nothing as the rowkey was filtered, but still need to check time limit
503          if (scannerContext.checkTimeLimit(limitScope)) {
504            return true;
505          }
506          continue;
507        }
508
509        // Ok, we are good, let's try to get some results from the main heap.
510        populateResult(results, this.storeHeap, scannerContext, current);
511        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
512          if (hasFilterRow) {
513            throw new IncompatibleFilterException(
514              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
515                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
516          }
517          return true;
518        }
519
520        // Check for thread interrupt status in case we have been signaled from
521        // #interruptRegionOperation.
522        region.checkInterrupt();
523
524        Cell nextKv = this.storeHeap.peek();
525        shouldStop = shouldStop(nextKv);
526        // save that the row was empty before filters applied to it.
527        final boolean isEmptyRow = results.isEmpty();
528
529        // We have the part of the row necessary for filtering (all of it, usually).
530        // First filter with the filterRow(List).
531        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
532        if (hasFilterRow) {
533          ret = filter.filterRowCellsWithRet(results);
534
535          // We don't know how the results have changed after being filtered. Must set progress
536          // according to contents of results now.
537          if (scannerContext.getKeepProgress()) {
538            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
539              initialHeapSizeProgress);
540          } else {
541            scannerContext.clearProgress();
542          }
543          scannerContext.incrementBatchProgress(results.size());
544          for (Cell cell : results) {
545            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
546              cell.heapSize());
547          }
548        }
549
550        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
551          incrementCountOfRowsFilteredMetric(scannerContext);
552          results.clear();
553          boolean moreRows = nextRow(scannerContext, current);
554          if (!moreRows) {
555            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
556          }
557
558          // This row was totally filtered out, if this is NOT the last row,
559          // we should continue on. Otherwise, nothing else to do.
560          if (!shouldStop) {
561            // Read nothing as the cells was filtered, but still need to check time limit
562            if (scannerContext.checkTimeLimit(limitScope)) {
563              return true;
564            }
565            continue;
566          }
567          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
568        }
569
570        // Ok, we are done with storeHeap for this row.
571        // Now we may need to fetch additional, non-essential data into row.
572        // These values are not needed for filter to work, so we postpone their
573        // fetch to (possibly) reduce amount of data loads from disk.
574        if (this.joinedHeap != null) {
575          boolean mayHaveData = joinedHeapMayHaveData(current);
576          if (mayHaveData) {
577            joinedContinuationRow = current;
578            populateFromJoinedHeap(results, scannerContext);
579
580            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
581              return true;
582            }
583          }
584        }
585      } else {
586        // Populating from the joined heap was stopped by limits, populate some more.
587        populateFromJoinedHeap(results, scannerContext);
588        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
589          return true;
590        }
591      }
592      // We may have just called populateFromJoinedMap and hit the limits. If that is
593      // the case, we need to call it again on the next next() invocation.
594      if (joinedContinuationRow != null) {
595        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
596      }
597
598      // Finally, we are done with both joinedHeap and storeHeap.
599      // Double check to prevent empty rows from appearing in result. It could be
600      // the case when SingleColumnValueExcludeFilter is used.
601      if (results.isEmpty()) {
602        incrementCountOfRowsFilteredMetric(scannerContext);
603        boolean moreRows = nextRow(scannerContext, current);
604        if (!moreRows) {
605          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
606        }
607        if (!shouldStop) {
608          continue;
609        }
610      }
611
612      if (shouldStop) {
613        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
614      } else {
615        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
616      }
617    }
618  }
619
620  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
621    region.filteredReadRequestsCount.increment();
622    if (region.getMetrics() != null) {
623      region.getMetrics().updateFilteredRecords();
624    }
625
626    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
627      return;
628    }
629
630    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
631  }
632
633  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
634    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
635      return;
636    }
637
638    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
639  }
640
641  /** Returns true when the joined heap may have data for the current row */
642  private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException {
643    Cell nextJoinedKv = joinedHeap.peek();
644    boolean matchCurrentRow =
645      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
646    boolean matchAfterSeek = false;
647
648    // If the next value in the joined heap does not match the current row, try to seek to the
649    // correct row
650    if (!matchCurrentRow) {
651      Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
652      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
653      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
654        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
655    }
656
657    return matchCurrentRow || matchAfterSeek;
658  }
659
660  /**
661   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
662   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
663   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
664   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
665   * filterRow() will be skipped.
666   */
667  private boolean filterRow() throws IOException {
668    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
669    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
670    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
671  }
672
673  private boolean filterRowKey(Cell current) throws IOException {
674    return filter != null && filter.filterRowKey(current);
675  }
676
677  /**
678   * A mocked list implementation - discards all updates.
679   */
680  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
681
682    @Override
683    public void add(int index, Cell element) {
684      // do nothing
685    }
686
687    @Override
688    public boolean addAll(int index, Collection<? extends Cell> c) {
689      return false; // this list is never changed as a result of an update
690    }
691
692    @Override
693    public KeyValue get(int index) {
694      throw new UnsupportedOperationException();
695    }
696
697    @Override
698    public int size() {
699      return 0;
700    }
701  };
702
703  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
704    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
705    Cell next;
706    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
707      // Check for thread interrupt status in case we have been signaled from
708      // #interruptRegionOperation.
709      region.checkInterrupt();
710      this.storeHeap.next(MOCKED_LIST);
711    }
712    resetFilters();
713
714    // Calling the hook in CP which allows it to do a fast forward
715    return this.region.getCoprocessorHost() == null
716      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
717  }
718
719  protected boolean shouldStop(Cell currentRowCell) {
720    if (currentRowCell == null) {
721      return true;
722    }
723    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
724      return false;
725    }
726    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
727    return c > 0 || (c == 0 && !includeStopRow);
728  }
729
730  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
731      justification = "this method is only called inside close which is synchronized")
732  private void closeInternal() {
733    if (storeHeap != null) {
734      storeHeap.close();
735      storeHeap = null;
736    }
737    if (joinedHeap != null) {
738      joinedHeap.close();
739      joinedHeap = null;
740    }
741    // no need to synchronize here.
742    scannerReadPoints.remove(this);
743    this.filterClosed = true;
744  }
745
746  @Override
747  public synchronized void close() {
748    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
749  }
750
751  @Override
752  public synchronized boolean reseek(byte[] row) throws IOException {
753    return TraceUtil.trace(() -> {
754      if (row == null) {
755        throw new IllegalArgumentException("Row cannot be null.");
756      }
757      boolean result = false;
758      region.startRegionOperation();
759      Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
760      try {
761        // use request seek to make use of the lazy seek option. See HBASE-5520
762        result = this.storeHeap.requestSeek(kv, true, true);
763        if (this.joinedHeap != null) {
764          result = this.joinedHeap.requestSeek(kv, true, true) || result;
765        }
766      } finally {
767        region.closeRegionOperation();
768      }
769      return result;
770    }, () -> region.createRegionSpan("RegionScanner.reseek"));
771  }
772
773  @Override
774  public void shipped() throws IOException {
775    if (storeHeap != null) {
776      storeHeap.shipped();
777    }
778    if (joinedHeap != null) {
779      joinedHeap.shipped();
780    }
781  }
782
783  @Override
784  public void run() throws IOException {
785    // This is the RPC callback method executed. We do the close in of the scanner in this
786    // callback
787    this.close();
788  }
789}