001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableSet; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.ExtendedCell; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.PrivateCellUtil; 041import org.apache.hadoop.hbase.UnknownScannerException; 042import org.apache.hadoop.hbase.client.ClientInternalHelper; 043import org.apache.hadoop.hbase.client.IsolationLevel; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 047import org.apache.hadoop.hbase.filter.FilterWrapper; 048import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 049import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 050import org.apache.hadoop.hbase.ipc.RpcCall; 051import org.apache.hadoop.hbase.ipc.RpcCallback; 052import org.apache.hadoop.hbase.ipc.RpcServer; 053import org.apache.hadoop.hbase.regionserver.Region.Operation; 054import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 055import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 056import org.apache.hadoop.hbase.trace.TraceUtil; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 063 064/** 065 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 066 */ 067@InterfaceAudience.Private 068public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 069 070 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 071 072 // Package local for testability 073 KeyValueHeap storeHeap = null; 074 075 /** 076 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 077 * if on-demand column family loading is enabled. 078 */ 079 KeyValueHeap joinedHeap = null; 080 081 /** 082 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 083 * for which we are populating the values. 084 */ 085 protected ExtendedCell joinedContinuationRow = null; 086 private boolean filterClosed = false; 087 088 protected final byte[] stopRow; 089 protected final boolean includeStopRow; 090 protected final boolean reversed; 091 protected final HRegion region; 092 protected final CellComparator comparator; 093 094 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 095 096 private final long readPt; 097 private final long maxResultSize; 098 private final ScannerContext defaultScannerContext; 099 private final FilterWrapper filter; 100 private final String operationId; 101 102 private RegionServerServices rsServices; 103 104 private final Set<Path> filesRead = new HashSet<>(); 105 106 @Override 107 public RegionInfo getRegionInfo() { 108 return region.getRegionInfo(); 109 } 110 111 private static boolean hasNonce(HRegion region, long nonce) { 112 RegionServerServices rsServices = region.getRegionServerServices(); 113 return nonce != HConstants.NO_NONCE && rsServices != null 114 && rsServices.getNonceManager() != null; 115 } 116 117 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 118 long nonceGroup, long nonce) throws IOException { 119 this.region = region; 120 this.maxResultSize = scan.getMaxResultSize(); 121 if (scan.hasFilter()) { 122 this.filter = new FilterWrapper(scan.getFilter()); 123 } else { 124 this.filter = null; 125 } 126 this.comparator = region.getCellComparator(); 127 /** 128 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 129 * scanner context that can be used to enforce the batch limit in the event that a 130 * ScannerContext is not specified during an invocation of next/nextRaw 131 */ 132 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 133 this.stopRow = scan.getStopRow(); 134 this.includeStopRow = scan.includeStopRow(); 135 this.reversed = scan.isReversed(); 136 this.operationId = scan.getId(); 137 138 // synchronize on scannerReadPoints so that nobody calculates 139 // getSmallestReadPoint, before scannerReadPoints is updated. 140 IsolationLevel isolationLevel = scan.getIsolationLevel(); 141 long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan); 142 this.scannerReadPoints = region.scannerReadPoints; 143 this.rsServices = region.getRegionServerServices(); 144 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 145 try { 146 if (mvccReadPoint > 0) { 147 this.readPt = mvccReadPoint; 148 } else if (hasNonce(region, nonce)) { 149 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 150 } else { 151 this.readPt = region.getReadPoint(isolationLevel); 152 } 153 scannerReadPoints.put(this, this.readPt); 154 } finally { 155 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 156 } 157 initializeScanners(scan, additionalScanners); 158 } 159 160 public ScannerContext getContext() { 161 return defaultScannerContext; 162 } 163 164 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 165 throws IOException { 166 // Here we separate all scanners into two lists - scanner that provide data required 167 // by the filter to operate (scanners list) and all others (joinedScanners list). 168 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 169 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 170 // Store all already instantiated scanners for exception handling 171 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 172 // handle additionalScanners 173 if (additionalScanners != null && !additionalScanners.isEmpty()) { 174 scanners.addAll(additionalScanners); 175 instantiatedScanners.addAll(additionalScanners); 176 } 177 178 try { 179 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 180 HStore store = region.getStore(entry.getKey()); 181 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 182 instantiatedScanners.add(scanner); 183 if ( 184 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 185 || this.filter.isFamilyEssential(entry.getKey()) 186 ) { 187 scanners.add(scanner); 188 } else { 189 joinedScanners.add(scanner); 190 } 191 } 192 initializeKVHeap(scanners, joinedScanners, region); 193 } catch (Throwable t) { 194 throw handleException(instantiatedScanners, t); 195 } 196 } 197 198 protected void initializeKVHeap(List<KeyValueScanner> scanners, 199 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 200 this.storeHeap = new KeyValueHeap(scanners, comparator); 201 if (!joinedScanners.isEmpty()) { 202 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 203 } 204 } 205 206 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 207 // remove scaner read point before throw the exception 208 scannerReadPoints.remove(this); 209 if (storeHeap != null) { 210 storeHeap.close(); 211 storeHeap = null; 212 if (joinedHeap != null) { 213 joinedHeap.close(); 214 joinedHeap = null; 215 } 216 } else { 217 // close all already instantiated scanners before throwing the exception 218 for (KeyValueScanner scanner : instantiatedScanners) { 219 scanner.close(); 220 } 221 } 222 return t instanceof IOException ? (IOException) t : new IOException(t); 223 } 224 225 @Override 226 public long getMaxResultSize() { 227 return maxResultSize; 228 } 229 230 @Override 231 public long getMvccReadPoint() { 232 return this.readPt; 233 } 234 235 @Override 236 public int getBatch() { 237 return this.defaultScannerContext.getBatchLimit(); 238 } 239 240 @Override 241 public String getOperationId() { 242 return operationId; 243 } 244 245 /** 246 * Reset both the filter and the old filter. 247 * @throws IOException in case a filter raises an I/O exception. 248 */ 249 protected final void resetFilters() throws IOException { 250 if (filter != null) { 251 filter.reset(); 252 } 253 } 254 255 @Override 256 public boolean next(List<? super ExtendedCell> outResults) throws IOException { 257 // apply the batching limit by default 258 return next(outResults, defaultScannerContext); 259 } 260 261 @Override 262 public synchronized boolean next(List<? super ExtendedCell> outResults, 263 ScannerContext scannerContext) throws IOException { 264 if (this.filterClosed) { 265 throw new UnknownScannerException("Scanner was closed (timed out?) " 266 + "after we renewed it. Could be caused by a very slow scanner " 267 + "or a lengthy garbage collection"); 268 } 269 region.startRegionOperation(Operation.SCAN); 270 try { 271 return nextRaw(outResults, scannerContext); 272 } finally { 273 region.closeRegionOperation(Operation.SCAN); 274 } 275 } 276 277 @Override 278 public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException { 279 // Use the RegionScanner's context by default 280 return nextRaw(outResults, defaultScannerContext); 281 } 282 283 @Override 284 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext) 285 throws IOException { 286 if (storeHeap == null) { 287 // scanner is closed 288 throw new UnknownScannerException("Scanner was closed"); 289 } 290 boolean moreValues = false; 291 if (outResults.isEmpty()) { 292 // Usually outResults is empty. This is true when next is called 293 // to handle scan or get operation. 294 moreValues = nextInternal(outResults, scannerContext); 295 } else { 296 List<ExtendedCell> tmpList = new ArrayList<>(); 297 moreValues = nextInternal(tmpList, scannerContext); 298 outResults.addAll(tmpList); 299 } 300 region.addReadRequestsCount(1); 301 if (region.getMetrics() != null) { 302 region.getMetrics().updateReadRequestCount(); 303 } 304 305 // If the size limit was reached it means a partial Result is being returned. Returning a 306 // partial Result means that we should not reset the filters; filters should only be reset in 307 // between rows 308 if (!scannerContext.mayHaveMoreCellsInRow()) { 309 resetFilters(); 310 } 311 312 if (isFilterDoneInternal()) { 313 moreValues = false; 314 } 315 return moreValues; 316 } 317 318 /** Returns true if more cells exist after this batch, false if scanner is done */ 319 private boolean populateFromJoinedHeap(List<? super ExtendedCell> results, 320 ScannerContext scannerContext) throws IOException { 321 assert joinedContinuationRow != null; 322 boolean moreValues = 323 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 324 325 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 326 // We are done with this row, reset the continuation. 327 joinedContinuationRow = null; 328 } 329 // As the data is obtained from two independent heaps, we need to 330 // ensure that result list is sorted, because Result relies on that. 331 ((List<Cell>) results).sort(comparator); 332 return moreValues; 333 } 334 335 /** 336 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 337 * reached, or remainingResultSize (if not -1) is reaced 338 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 339 * @return state of last call to {@link KeyValueHeap#next()} 340 */ 341 private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap, 342 ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException { 343 Cell nextKv; 344 boolean moreCellsInRow = false; 345 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 346 // Scanning between column families and thus the scope is between cells 347 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 348 do { 349 // Check for thread interrupt status in case we have been signaled from 350 // #interruptRegionOperation. 351 region.checkInterrupt(); 352 353 // We want to maintain any progress that is made towards the limits while scanning across 354 // different column families. To do this, we toggle the keep progress flag on during calls 355 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 356 scannerContext.setKeepProgress(true); 357 heap.next(results, scannerContext); 358 scannerContext.setKeepProgress(tmpKeepProgress); 359 360 nextKv = heap.peek(); 361 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 362 if (!moreCellsInRow) { 363 incrementCountOfRowsScannedMetric(scannerContext); 364 } 365 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 366 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 367 } else if (scannerContext.checkSizeLimit(limitScope)) { 368 ScannerContext.NextState state = 369 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 370 return scannerContext.setScannerState(state).hasMoreValues(); 371 } else if (scannerContext.checkTimeLimit(limitScope)) { 372 ScannerContext.NextState state = 373 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 374 return scannerContext.setScannerState(state).hasMoreValues(); 375 } 376 } while (moreCellsInRow); 377 return nextKv != null; 378 } 379 380 /** 381 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 382 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 383 * there are more cells to be read in the row. 384 * @return true When there are more cells in the row to be read 385 */ 386 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 387 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 388 } 389 390 /** Returns True if a filter rules the scanner is over, done. */ 391 @Override 392 public synchronized boolean isFilterDone() throws IOException { 393 return isFilterDoneInternal(); 394 } 395 396 private boolean isFilterDoneInternal() throws IOException { 397 return this.filter != null && this.filter.filterAllRemaining(); 398 } 399 400 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 401 if (rpcCall.isPresent()) { 402 // If a user specifies a too-restrictive or too-slow scanner, the 403 // client might time out and disconnect while the server side 404 // is still processing the request. We should abort aggressively 405 // in that case. 406 long afterTime = rpcCall.get().disconnectSince(); 407 if (afterTime >= 0) { 408 throw new CallerDisconnectedException( 409 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 410 + " after " + afterTime + " ms, since " + "caller disconnected"); 411 } 412 } 413 } 414 415 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 416 long initialSizeProgress, long initialHeapSizeProgress) { 417 // Starting to scan a new row. Reset the scanner progress according to whether or not 418 // progress should be kept. 419 if (scannerContext.getKeepProgress()) { 420 // Progress should be kept. Reset to initial values seen at start of method invocation. 421 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 422 initialHeapSizeProgress); 423 } else { 424 scannerContext.clearProgress(); 425 } 426 } 427 428 private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext) 429 throws IOException { 430 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 431 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 432 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 433 434 // Save the initial progress from the Scanner context in these local variables. The progress 435 // may need to be reset a few times if rows are being filtered out so we save the initial 436 // progress. 437 int initialBatchProgress = scannerContext.getBatchProgress(); 438 long initialSizeProgress = scannerContext.getDataSizeProgress(); 439 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 440 441 // Used to check time limit 442 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 443 444 // The loop here is used only when at some point during the next we determine 445 // that due to effects of filters or otherwise, we have an empty row in the result. 446 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 447 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 448 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 449 while (true) { 450 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 451 initialHeapSizeProgress); 452 checkClientDisconnect(rpcCall); 453 454 // Check for thread interrupt status in case we have been signaled from 455 // #interruptRegionOperation. 456 region.checkInterrupt(); 457 458 // Let's see what we have in the storeHeap. 459 ExtendedCell current = this.storeHeap.peek(); 460 461 boolean shouldStop = shouldStop(current); 462 // When has filter row is true it means that the all the cells for a particular row must be 463 // read before a filtering decision can be made. This means that filters where hasFilterRow 464 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 465 // table that has very large rows. 466 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 467 468 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 469 // would prevent the filters from being evaluated. Thus, if it is true, change the 470 // scope of any limits that could potentially create partial results to 471 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 472 if (hasFilterRow) { 473 if (LOG.isTraceEnabled()) { 474 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 475 + " formed. Changing scope of limits that may create partials"); 476 } 477 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 478 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 479 limitScope = LimitScope.BETWEEN_ROWS; 480 } 481 482 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 483 if (hasFilterRow) { 484 throw new IncompatibleFilterException( 485 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 486 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 487 } 488 return true; 489 } 490 491 // Check if we were getting data from the joinedHeap and hit the limit. 492 // If not, then it's main path - getting results from storeHeap. 493 if (joinedContinuationRow == null) { 494 // First, check if we are at a stop row. If so, there are no more results. 495 if (shouldStop) { 496 if (hasFilterRow) { 497 filter.filterRowCells((List<Cell>) results); 498 } 499 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 500 } 501 502 // Check if rowkey filter wants to exclude this row. If so, loop to next. 503 // Technically, if we hit limits before on this row, we don't need this call. 504 if (filterRowKey(current)) { 505 incrementCountOfRowsFilteredMetric(scannerContext); 506 // early check, see HBASE-16296 507 if (isFilterDoneInternal()) { 508 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 509 } 510 // HBASE-29974: ask the filter for a seek hint so we can jump directly past the rejected 511 // row instead of iterating through its cells one-by-one via nextRow(). 512 ExtendedCell rowHint = getHintForRejectedRow(current); 513 // Typically the count of rows scanned is incremented inside #populateResult. However, 514 // here we are filtering a row based purely on its row key, preventing us from calling 515 // #populateResult. Thus, perform the necessary increment here to rows scanned metric. 516 // Placed after getHintForRejectedRow so that a buggy filter throwing DNRIOE doesn't 517 // leave the metric incremented for a row that was never actually processed. 518 incrementCountOfRowsScannedMetric(scannerContext); 519 boolean moreRows = (rowHint != null) 520 ? nextRowViaHint(scannerContext, current, rowHint) 521 : nextRow(scannerContext, current); 522 if (!moreRows) { 523 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 524 } 525 results.clear(); 526 527 // Read nothing as the rowkey was filtered, but still need to check time limit 528 // We also check size limit because we might have read blocks in getting to this point. 529 if (scannerContext.checkAnyLimitReached(limitScope)) { 530 return true; 531 } 532 continue; 533 } 534 535 // Ok, we are good, let's try to get some results from the main heap. 536 populateResult(results, this.storeHeap, scannerContext, current); 537 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 538 if (hasFilterRow) { 539 throw new IncompatibleFilterException( 540 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 541 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 542 } 543 return true; 544 } 545 546 // Check for thread interrupt status in case we have been signaled from 547 // #interruptRegionOperation. 548 region.checkInterrupt(); 549 550 Cell nextKv = this.storeHeap.peek(); 551 shouldStop = shouldStop(nextKv); 552 // save that the row was empty before filters applied to it. 553 final boolean isEmptyRow = results.isEmpty(); 554 555 // We have the part of the row necessary for filtering (all of it, usually). 556 // First filter with the filterRow(List). 557 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 558 if (hasFilterRow) { 559 ret = filter.filterRowCellsWithRet((List<Cell>) results); 560 561 // We don't know how the results have changed after being filtered. Must set progress 562 // according to contents of results now. 563 if (scannerContext.getKeepProgress()) { 564 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 565 initialHeapSizeProgress); 566 } else { 567 scannerContext.clearProgress(); 568 } 569 scannerContext.incrementBatchProgress(results.size()); 570 for (ExtendedCell cell : (List<ExtendedCell>) results) { 571 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 572 cell.heapSize()); 573 } 574 } 575 576 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 577 incrementCountOfRowsFilteredMetric(scannerContext); 578 results.clear(); 579 boolean moreRows = nextRow(scannerContext, current); 580 if (!moreRows) { 581 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 582 } 583 584 // This row was totally filtered out, if this is NOT the last row, 585 // we should continue on. Otherwise, nothing else to do. 586 if (!shouldStop) { 587 // Read nothing as the cells was filtered, but still need to check time limit. 588 // We also check size limit because we might have read blocks in getting to this point. 589 if (scannerContext.checkAnyLimitReached(limitScope)) { 590 return true; 591 } 592 continue; 593 } 594 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 595 } 596 597 // Ok, we are done with storeHeap for this row. 598 // Now we may need to fetch additional, non-essential data into row. 599 // These values are not needed for filter to work, so we postpone their 600 // fetch to (possibly) reduce amount of data loads from disk. 601 if (this.joinedHeap != null) { 602 boolean mayHaveData = joinedHeapMayHaveData(current); 603 if (mayHaveData) { 604 joinedContinuationRow = current; 605 populateFromJoinedHeap(results, scannerContext); 606 607 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 608 return true; 609 } 610 } 611 } 612 } else { 613 // Populating from the joined heap was stopped by limits, populate some more. 614 populateFromJoinedHeap(results, scannerContext); 615 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 616 return true; 617 } 618 } 619 // We may have just called populateFromJoinedMap and hit the limits. If that is 620 // the case, we need to call it again on the next next() invocation. 621 if (joinedContinuationRow != null) { 622 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 623 } 624 625 // Finally, we are done with both joinedHeap and storeHeap. 626 // Double check to prevent empty rows from appearing in result. It could be 627 // the case when SingleColumnValueExcludeFilter is used. 628 if (results.isEmpty()) { 629 incrementCountOfRowsFilteredMetric(scannerContext); 630 boolean moreRows = nextRow(scannerContext, current); 631 if (!moreRows) { 632 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 633 } 634 if (!shouldStop) { 635 // We check size limit because we might have read blocks in the nextRow call above, or 636 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 637 // and for those scans which filter row _cells_ this is the only place we can actually 638 // enforce that the scan does not exceed limits since it bypasses all other checks above. 639 if (scannerContext.checkSizeLimit(limitScope)) { 640 return true; 641 } 642 continue; 643 } 644 } 645 646 if (shouldStop) { 647 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 648 } else { 649 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 650 } 651 } 652 } 653 654 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 655 region.filteredReadRequestsCount.increment(); 656 if (region.getMetrics() != null) { 657 region.getMetrics().updateFilteredRecords(); 658 } 659 660 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 661 return; 662 } 663 664 scannerContext.getMetrics() 665 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1); 666 } 667 668 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 669 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 670 return; 671 } 672 673 scannerContext.getMetrics() 674 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1); 675 } 676 677 /** Returns true when the joined heap may have data for the current row */ 678 private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException { 679 Cell nextJoinedKv = joinedHeap.peek(); 680 boolean matchCurrentRow = 681 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 682 boolean matchAfterSeek = false; 683 684 // If the next value in the joined heap does not match the current row, try to seek to the 685 // correct row 686 if (!matchCurrentRow) { 687 ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 688 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 689 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 690 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 691 } 692 693 return matchCurrentRow || matchAfterSeek; 694 } 695 696 /** 697 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 698 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 699 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 700 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 701 * filterRow() will be skipped. 702 */ 703 private boolean filterRow() throws IOException { 704 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 705 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 706 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 707 } 708 709 private boolean filterRowKey(Cell current) throws IOException { 710 return filter != null && filter.filterRowKey(current); 711 } 712 713 /** 714 * A mocked list implementation - discards all updates. 715 */ 716 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 717 718 @Override 719 public void add(int index, Cell element) { 720 // do nothing 721 } 722 723 @Override 724 public boolean addAll(int index, Collection<? extends Cell> c) { 725 return false; // this list is never changed as a result of an update 726 } 727 728 @Override 729 public KeyValue get(int index) { 730 throw new UnsupportedOperationException(); 731 } 732 733 @Override 734 public int size() { 735 return 0; 736 } 737 }; 738 739 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 740 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 741 742 // Enable skipping row mode, which disables limits and skips tracking progress for all 743 // but block size. We keep tracking block size because skipping a row in this way 744 // might involve reading blocks along the way. 745 scannerContext.setSkippingRow(true); 746 747 Cell next; 748 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 749 // Check for thread interrupt status in case we have been signaled from 750 // #interruptRegionOperation. 751 region.checkInterrupt(); 752 this.storeHeap.next(MOCKED_LIST, scannerContext); 753 } 754 755 scannerContext.setSkippingRow(false); 756 resetFilters(); 757 758 // Calling the hook in CP which allows it to do a fast forward 759 return this.region.getCoprocessorHost() == null 760 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 761 } 762 763 /** 764 * Fast-path alternative to {@link #nextRow} used when the filter has provided a seek hint via 765 * {@link org.apache.hadoop.hbase.filter.Filter#getHintForRejectedRow(Cell)}. Instead of iterating 766 * through every cell in the rejected row one-by-one, this method issues a single seek to jump 767 * directly to the filter's suggested position ({@code requestSeek} for forward scans, 768 * {@code backwardSeek} for reversed scans). 769 * <p> 770 * The skipping-row mode flag is set around the seek so that block-level size tracking continues 771 * to function (consistent with {@link #nextRow}), and the filter state is reset afterwards so the 772 * next row starts with a clean filter context. 773 * <p> 774 * <strong>Stop-row invariant:</strong> This method does not validate that {@code hint} falls 775 * within the scan's stop row. If the hint overshoots, the next iteration's 776 * {@link #shouldStop(Cell)} check catches it and returns NO_MORE_VALUES. One wasted seek may 777 * occur, but correctness is maintained. 778 * <p> 779 * <strong>Metrics note:</strong> The rows-scanned metric is incremented once by the caller for 780 * the rejected row. Rows physically skipped by the seek are not individually counted — this 781 * reflects the fact that no per-row work was done for those rows. 782 * <p> 783 * <strong>Coprocessor note:</strong> {@code postScannerFilterRow} is invoked once with 784 * {@code curRowCell}, not once per skipped row. Coprocessors counting filtered rows should be 785 * aware of this semantic when the hint path is used. 786 * @param scannerContext scanner context used for limit tracking 787 * @param curRowCell the first cell of the row that was rejected by {@code filterRowKey}; 788 * passed to the coprocessor hook for observability 789 * @param hint the validated {@link ExtendedCell} returned by the filter; the scanner 790 * will seek to this position 791 * @return {@code true} if scanning should continue, {@code false} if a coprocessor requests an 792 * early stop (mirrors the contract of {@link #nextRow}) 793 * @throws IOException if the seek or the coprocessor hook signals a failure 794 */ 795 private boolean nextRowViaHint(ScannerContext scannerContext, Cell curRowCell, ExtendedCell hint) 796 throws IOException { 797 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 798 799 int difference = comparator.compareRows(hint, curRowCell); 800 if ((!reversed && difference > 0) || (reversed && difference < 0)) { 801 scannerContext.setSkippingRow(true); 802 if (reversed) { 803 // ReversedKeyValueHeap does not support requestSeek; use backwardSeek 804 // to position at-or-before the hint within the target row. 805 // seekToPreviousRow would skip past the hint row entirely. 806 this.storeHeap.backwardSeek(hint); 807 } else { 808 this.storeHeap.requestSeek(hint, true, true); 809 } 810 scannerContext.setSkippingRow(false); 811 812 resetFilters(); 813 814 return this.region.getCoprocessorHost() == null 815 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 816 } 817 818 return nextRow(scannerContext, curRowCell); 819 } 820 821 /** 822 * Asks the current {@link org.apache.hadoop.hbase.filter.FilterWrapper} for a seek hint to use 823 * after a row has been rejected by {@link #filterRowKey}. If the wrapped filter overrides 824 * {@link org.apache.hadoop.hbase.filter.Filter#getHintForRejectedRow(Cell)}, this returns its 825 * answer as an {@link ExtendedCell}; otherwise returns {@code null}. 826 * <p> 827 * The returned cell is validated to be an {@link ExtendedCell} because filters run on the server 828 * side and the scanner infrastructure requires {@code ExtendedCell} references. 829 * @param rowCell the first cell of the rejected row (same cell passed to {@code filterRowKey}) 830 * @return a validated {@link ExtendedCell} seek target, or {@code null} if the filter provides no 831 * hint 832 * @throws DoNotRetryIOException if the filter returns a non-{@link ExtendedCell} instance 833 * @throws IOException if the filter signals an I/O failure 834 */ 835 private ExtendedCell getHintForRejectedRow(Cell rowCell) throws IOException { 836 if (filter == null) { 837 return null; 838 } 839 Cell hint = filter.getHintForRejectedRow(rowCell); 840 if (hint == null) { 841 return null; 842 } 843 if (!(hint instanceof ExtendedCell)) { 844 throw new DoNotRetryIOException( 845 "Incorrect filter implementation: the Cell returned by getHintForRejectedRow " 846 + "is not an ExtendedCell. Filter class: " + filter.getClass().getName()); 847 } 848 return (ExtendedCell) hint; 849 } 850 851 protected boolean shouldStop(Cell currentRowCell) { 852 if (currentRowCell == null) { 853 return true; 854 } 855 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 856 return false; 857 } 858 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 859 return c > 0 || (c == 0 && !includeStopRow); 860 } 861 862 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 863 justification = "this method is only called inside close which is synchronized") 864 private void closeInternal() { 865 if (storeHeap != null) { 866 storeHeap.close(); 867 filesRead.addAll(storeHeap.getFilesRead()); 868 storeHeap = null; 869 } 870 if (joinedHeap != null) { 871 joinedHeap.close(); 872 filesRead.addAll(joinedHeap.getFilesRead()); 873 joinedHeap = null; 874 } 875 // no need to synchronize here. 876 scannerReadPoints.remove(this); 877 this.filterClosed = true; 878 } 879 880 @Override 881 public synchronized void close() { 882 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 883 } 884 885 /** 886 * Returns the set of store file paths that were successfully read by this scanner. Populated at 887 * close from the underlying store heap and joined heap (if any). 888 */ 889 @Override 890 public Set<Path> getFilesRead() { 891 return Collections.unmodifiableSet(filesRead); 892 } 893 894 @Override 895 public synchronized boolean reseek(byte[] row) throws IOException { 896 return TraceUtil.trace(() -> { 897 if (row == null) { 898 throw new IllegalArgumentException("Row cannot be null."); 899 } 900 boolean result = false; 901 region.startRegionOperation(); 902 ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 903 try { 904 // use request seek to make use of the lazy seek option. See HBASE-5520 905 result = this.storeHeap.requestSeek(kv, true, true); 906 if (this.joinedHeap != null) { 907 result = this.joinedHeap.requestSeek(kv, true, true) || result; 908 } 909 } finally { 910 region.closeRegionOperation(); 911 } 912 return result; 913 }, () -> region.createRegionSpan("RegionScanner.reseek")); 914 } 915 916 @Override 917 public void shipped() throws IOException { 918 if (storeHeap != null) { 919 storeHeap.shipped(); 920 } 921 if (joinedHeap != null) { 922 joinedHeap.shipped(); 923 } 924 } 925 926 @Override 927 public void run() throws IOException { 928 // This is the RPC callback method executed. We do the close in of the scanner in this 929 // callback 930 this.close(); 931 } 932}