001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.UnknownScannerException; 037import org.apache.hadoop.hbase.client.ClientInternalHelper; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 042import org.apache.hadoop.hbase.filter.FilterWrapper; 043import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 044import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 045import org.apache.hadoop.hbase.ipc.RpcCall; 046import org.apache.hadoop.hbase.ipc.RpcCallback; 047import org.apache.hadoop.hbase.ipc.RpcServer; 048import org.apache.hadoop.hbase.regionserver.Region.Operation; 049import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 050import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 051import org.apache.hadoop.hbase.trace.TraceUtil; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058 059/** 060 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 061 */ 062@InterfaceAudience.Private 063public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 064 065 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 066 067 // Package local for testability 068 KeyValueHeap storeHeap = null; 069 070 /** 071 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 072 * if on-demand column family loading is enabled. 073 */ 074 KeyValueHeap joinedHeap = null; 075 076 /** 077 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 078 * for which we are populating the values. 079 */ 080 protected ExtendedCell joinedContinuationRow = null; 081 private boolean filterClosed = false; 082 083 protected final byte[] stopRow; 084 protected final boolean includeStopRow; 085 protected final HRegion region; 086 protected final CellComparator comparator; 087 088 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 089 090 private final long readPt; 091 private final long maxResultSize; 092 private final ScannerContext defaultScannerContext; 093 private final FilterWrapper filter; 094 private final String operationId; 095 096 private RegionServerServices rsServices; 097 098 @Override 099 public RegionInfo getRegionInfo() { 100 return region.getRegionInfo(); 101 } 102 103 private static boolean hasNonce(HRegion region, long nonce) { 104 RegionServerServices rsServices = region.getRegionServerServices(); 105 return nonce != HConstants.NO_NONCE && rsServices != null 106 && rsServices.getNonceManager() != null; 107 } 108 109 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 110 long nonceGroup, long nonce) throws IOException { 111 this.region = region; 112 this.maxResultSize = scan.getMaxResultSize(); 113 if (scan.hasFilter()) { 114 this.filter = new FilterWrapper(scan.getFilter()); 115 } else { 116 this.filter = null; 117 } 118 this.comparator = region.getCellComparator(); 119 /** 120 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 121 * scanner context that can be used to enforce the batch limit in the event that a 122 * ScannerContext is not specified during an invocation of next/nextRaw 123 */ 124 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 125 this.stopRow = scan.getStopRow(); 126 this.includeStopRow = scan.includeStopRow(); 127 this.operationId = scan.getId(); 128 129 // synchronize on scannerReadPoints so that nobody calculates 130 // getSmallestReadPoint, before scannerReadPoints is updated. 131 IsolationLevel isolationLevel = scan.getIsolationLevel(); 132 long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan); 133 this.scannerReadPoints = region.scannerReadPoints; 134 this.rsServices = region.getRegionServerServices(); 135 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 136 try { 137 if (mvccReadPoint > 0) { 138 this.readPt = mvccReadPoint; 139 } else if (hasNonce(region, nonce)) { 140 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 141 } else { 142 this.readPt = region.getReadPoint(isolationLevel); 143 } 144 scannerReadPoints.put(this, this.readPt); 145 } finally { 146 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 147 } 148 initializeScanners(scan, additionalScanners); 149 } 150 151 public ScannerContext getContext() { 152 return defaultScannerContext; 153 } 154 155 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 156 throws IOException { 157 // Here we separate all scanners into two lists - scanner that provide data required 158 // by the filter to operate (scanners list) and all others (joinedScanners list). 159 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 160 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 161 // Store all already instantiated scanners for exception handling 162 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 163 // handle additionalScanners 164 if (additionalScanners != null && !additionalScanners.isEmpty()) { 165 scanners.addAll(additionalScanners); 166 instantiatedScanners.addAll(additionalScanners); 167 } 168 169 try { 170 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 171 HStore store = region.getStore(entry.getKey()); 172 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 173 instantiatedScanners.add(scanner); 174 if ( 175 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 176 || this.filter.isFamilyEssential(entry.getKey()) 177 ) { 178 scanners.add(scanner); 179 } else { 180 joinedScanners.add(scanner); 181 } 182 } 183 initializeKVHeap(scanners, joinedScanners, region); 184 } catch (Throwable t) { 185 throw handleException(instantiatedScanners, t); 186 } 187 } 188 189 protected void initializeKVHeap(List<KeyValueScanner> scanners, 190 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 191 this.storeHeap = new KeyValueHeap(scanners, comparator); 192 if (!joinedScanners.isEmpty()) { 193 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 194 } 195 } 196 197 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 198 // remove scaner read point before throw the exception 199 scannerReadPoints.remove(this); 200 if (storeHeap != null) { 201 storeHeap.close(); 202 storeHeap = null; 203 if (joinedHeap != null) { 204 joinedHeap.close(); 205 joinedHeap = null; 206 } 207 } else { 208 // close all already instantiated scanners before throwing the exception 209 for (KeyValueScanner scanner : instantiatedScanners) { 210 scanner.close(); 211 } 212 } 213 return t instanceof IOException ? (IOException) t : new IOException(t); 214 } 215 216 @Override 217 public long getMaxResultSize() { 218 return maxResultSize; 219 } 220 221 @Override 222 public long getMvccReadPoint() { 223 return this.readPt; 224 } 225 226 @Override 227 public int getBatch() { 228 return this.defaultScannerContext.getBatchLimit(); 229 } 230 231 @Override 232 public String getOperationId() { 233 return operationId; 234 } 235 236 /** 237 * Reset both the filter and the old filter. 238 * @throws IOException in case a filter raises an I/O exception. 239 */ 240 protected final void resetFilters() throws IOException { 241 if (filter != null) { 242 filter.reset(); 243 } 244 } 245 246 @Override 247 public boolean next(List<? super ExtendedCell> outResults) throws IOException { 248 // apply the batching limit by default 249 return next(outResults, defaultScannerContext); 250 } 251 252 @Override 253 public synchronized boolean next(List<? super ExtendedCell> outResults, 254 ScannerContext scannerContext) throws IOException { 255 if (this.filterClosed) { 256 throw new UnknownScannerException("Scanner was closed (timed out?) " 257 + "after we renewed it. Could be caused by a very slow scanner " 258 + "or a lengthy garbage collection"); 259 } 260 region.startRegionOperation(Operation.SCAN); 261 try { 262 return nextRaw(outResults, scannerContext); 263 } finally { 264 region.closeRegionOperation(Operation.SCAN); 265 } 266 } 267 268 @Override 269 public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException { 270 // Use the RegionScanner's context by default 271 return nextRaw(outResults, defaultScannerContext); 272 } 273 274 @Override 275 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext) 276 throws IOException { 277 if (storeHeap == null) { 278 // scanner is closed 279 throw new UnknownScannerException("Scanner was closed"); 280 } 281 boolean moreValues = false; 282 if (outResults.isEmpty()) { 283 // Usually outResults is empty. This is true when next is called 284 // to handle scan or get operation. 285 moreValues = nextInternal(outResults, scannerContext); 286 } else { 287 List<ExtendedCell> tmpList = new ArrayList<>(); 288 moreValues = nextInternal(tmpList, scannerContext); 289 outResults.addAll(tmpList); 290 } 291 292 region.addReadRequestsCount(1); 293 if (region.getMetrics() != null) { 294 region.getMetrics().updateReadRequestCount(); 295 } 296 297 // If the size limit was reached it means a partial Result is being returned. Returning a 298 // partial Result means that we should not reset the filters; filters should only be reset in 299 // between rows 300 if (!scannerContext.mayHaveMoreCellsInRow()) { 301 resetFilters(); 302 } 303 304 if (isFilterDoneInternal()) { 305 moreValues = false; 306 } 307 return moreValues; 308 } 309 310 /** Returns true if more cells exist after this batch, false if scanner is done */ 311 private boolean populateFromJoinedHeap(List<? super ExtendedCell> results, 312 ScannerContext scannerContext) throws IOException { 313 assert joinedContinuationRow != null; 314 boolean moreValues = 315 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 316 317 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 318 // We are done with this row, reset the continuation. 319 joinedContinuationRow = null; 320 } 321 // As the data is obtained from two independent heaps, we need to 322 // ensure that result list is sorted, because Result relies on that. 323 ((List<Cell>) results).sort(comparator); 324 return moreValues; 325 } 326 327 /** 328 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 329 * reached, or remainingResultSize (if not -1) is reaced 330 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 331 * @return state of last call to {@link KeyValueHeap#next()} 332 */ 333 private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap, 334 ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException { 335 Cell nextKv; 336 boolean moreCellsInRow = false; 337 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 338 // Scanning between column families and thus the scope is between cells 339 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 340 do { 341 // Check for thread interrupt status in case we have been signaled from 342 // #interruptRegionOperation. 343 region.checkInterrupt(); 344 345 // We want to maintain any progress that is made towards the limits while scanning across 346 // different column families. To do this, we toggle the keep progress flag on during calls 347 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 348 scannerContext.setKeepProgress(true); 349 heap.next(results, scannerContext); 350 scannerContext.setKeepProgress(tmpKeepProgress); 351 352 nextKv = heap.peek(); 353 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 354 if (!moreCellsInRow) { 355 incrementCountOfRowsScannedMetric(scannerContext); 356 } 357 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 358 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 359 } else if (scannerContext.checkSizeLimit(limitScope)) { 360 ScannerContext.NextState state = 361 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 362 return scannerContext.setScannerState(state).hasMoreValues(); 363 } else if (scannerContext.checkTimeLimit(limitScope)) { 364 ScannerContext.NextState state = 365 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 366 return scannerContext.setScannerState(state).hasMoreValues(); 367 } 368 } while (moreCellsInRow); 369 return nextKv != null; 370 } 371 372 /** 373 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 374 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 375 * there are more cells to be read in the row. 376 * @return true When there are more cells in the row to be read 377 */ 378 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 379 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 380 } 381 382 /** Returns True if a filter rules the scanner is over, done. */ 383 @Override 384 public synchronized boolean isFilterDone() throws IOException { 385 return isFilterDoneInternal(); 386 } 387 388 private boolean isFilterDoneInternal() throws IOException { 389 return this.filter != null && this.filter.filterAllRemaining(); 390 } 391 392 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 393 if (rpcCall.isPresent()) { 394 // If a user specifies a too-restrictive or too-slow scanner, the 395 // client might time out and disconnect while the server side 396 // is still processing the request. We should abort aggressively 397 // in that case. 398 long afterTime = rpcCall.get().disconnectSince(); 399 if (afterTime >= 0) { 400 throw new CallerDisconnectedException( 401 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 402 + " after " + afterTime + " ms, since " + "caller disconnected"); 403 } 404 } 405 } 406 407 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 408 long initialSizeProgress, long initialHeapSizeProgress) { 409 // Starting to scan a new row. Reset the scanner progress according to whether or not 410 // progress should be kept. 411 if (scannerContext.getKeepProgress()) { 412 // Progress should be kept. Reset to initial values seen at start of method invocation. 413 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 414 initialHeapSizeProgress); 415 } else { 416 scannerContext.clearProgress(); 417 } 418 } 419 420 private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext) 421 throws IOException { 422 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 423 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 424 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 425 426 // Save the initial progress from the Scanner context in these local variables. The progress 427 // may need to be reset a few times if rows are being filtered out so we save the initial 428 // progress. 429 int initialBatchProgress = scannerContext.getBatchProgress(); 430 long initialSizeProgress = scannerContext.getDataSizeProgress(); 431 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 432 433 // Used to check time limit 434 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 435 436 // The loop here is used only when at some point during the next we determine 437 // that due to effects of filters or otherwise, we have an empty row in the result. 438 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 439 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 440 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 441 while (true) { 442 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 443 initialHeapSizeProgress); 444 checkClientDisconnect(rpcCall); 445 446 // Check for thread interrupt status in case we have been signaled from 447 // #interruptRegionOperation. 448 region.checkInterrupt(); 449 450 // Let's see what we have in the storeHeap. 451 ExtendedCell current = this.storeHeap.peek(); 452 453 boolean shouldStop = shouldStop(current); 454 // When has filter row is true it means that the all the cells for a particular row must be 455 // read before a filtering decision can be made. This means that filters where hasFilterRow 456 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 457 // table that has very large rows. 458 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 459 460 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 461 // would prevent the filters from being evaluated. Thus, if it is true, change the 462 // scope of any limits that could potentially create partial results to 463 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 464 if (hasFilterRow) { 465 if (LOG.isTraceEnabled()) { 466 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 467 + " formed. Changing scope of limits that may create partials"); 468 } 469 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 470 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 471 limitScope = LimitScope.BETWEEN_ROWS; 472 } 473 474 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 475 if (hasFilterRow) { 476 throw new IncompatibleFilterException( 477 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 478 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 479 } 480 return true; 481 } 482 483 // Check if we were getting data from the joinedHeap and hit the limit. 484 // If not, then it's main path - getting results from storeHeap. 485 if (joinedContinuationRow == null) { 486 // First, check if we are at a stop row. If so, there are no more results. 487 if (shouldStop) { 488 if (hasFilterRow) { 489 filter.filterRowCells((List<Cell>) results); 490 } 491 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 492 } 493 494 // Check if rowkey filter wants to exclude this row. If so, loop to next. 495 // Technically, if we hit limits before on this row, we don't need this call. 496 if (filterRowKey(current)) { 497 incrementCountOfRowsFilteredMetric(scannerContext); 498 // early check, see HBASE-16296 499 if (isFilterDoneInternal()) { 500 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 501 } 502 // Typically the count of rows scanned is incremented inside #populateResult. However, 503 // here we are filtering a row based purely on its row key, preventing us from calling 504 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 505 incrementCountOfRowsScannedMetric(scannerContext); 506 boolean moreRows = nextRow(scannerContext, current); 507 if (!moreRows) { 508 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 509 } 510 results.clear(); 511 512 // Read nothing as the rowkey was filtered, but still need to check time limit 513 // We also check size limit because we might have read blocks in getting to this point. 514 if (scannerContext.checkAnyLimitReached(limitScope)) { 515 return true; 516 } 517 continue; 518 } 519 520 // Ok, we are good, let's try to get some results from the main heap. 521 populateResult(results, this.storeHeap, scannerContext, current); 522 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 523 if (hasFilterRow) { 524 throw new IncompatibleFilterException( 525 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 526 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 527 } 528 return true; 529 } 530 531 // Check for thread interrupt status in case we have been signaled from 532 // #interruptRegionOperation. 533 region.checkInterrupt(); 534 535 Cell nextKv = this.storeHeap.peek(); 536 shouldStop = shouldStop(nextKv); 537 // save that the row was empty before filters applied to it. 538 final boolean isEmptyRow = results.isEmpty(); 539 540 // We have the part of the row necessary for filtering (all of it, usually). 541 // First filter with the filterRow(List). 542 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 543 if (hasFilterRow) { 544 ret = filter.filterRowCellsWithRet((List<Cell>) results); 545 546 // We don't know how the results have changed after being filtered. Must set progress 547 // according to contents of results now. 548 if (scannerContext.getKeepProgress()) { 549 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 550 initialHeapSizeProgress); 551 } else { 552 scannerContext.clearProgress(); 553 } 554 scannerContext.incrementBatchProgress(results.size()); 555 for (ExtendedCell cell : (List<ExtendedCell>) results) { 556 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 557 cell.heapSize()); 558 } 559 } 560 561 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 562 incrementCountOfRowsFilteredMetric(scannerContext); 563 results.clear(); 564 boolean moreRows = nextRow(scannerContext, current); 565 if (!moreRows) { 566 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 567 } 568 569 // This row was totally filtered out, if this is NOT the last row, 570 // we should continue on. Otherwise, nothing else to do. 571 if (!shouldStop) { 572 // Read nothing as the cells was filtered, but still need to check time limit. 573 // We also check size limit because we might have read blocks in getting to this point. 574 if (scannerContext.checkAnyLimitReached(limitScope)) { 575 return true; 576 } 577 continue; 578 } 579 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 580 } 581 582 // Ok, we are done with storeHeap for this row. 583 // Now we may need to fetch additional, non-essential data into row. 584 // These values are not needed for filter to work, so we postpone their 585 // fetch to (possibly) reduce amount of data loads from disk. 586 if (this.joinedHeap != null) { 587 boolean mayHaveData = joinedHeapMayHaveData(current); 588 if (mayHaveData) { 589 joinedContinuationRow = current; 590 populateFromJoinedHeap(results, scannerContext); 591 592 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 593 return true; 594 } 595 } 596 } 597 } else { 598 // Populating from the joined heap was stopped by limits, populate some more. 599 populateFromJoinedHeap(results, scannerContext); 600 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 601 return true; 602 } 603 } 604 // We may have just called populateFromJoinedMap and hit the limits. If that is 605 // the case, we need to call it again on the next next() invocation. 606 if (joinedContinuationRow != null) { 607 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 608 } 609 610 // Finally, we are done with both joinedHeap and storeHeap. 611 // Double check to prevent empty rows from appearing in result. It could be 612 // the case when SingleColumnValueExcludeFilter is used. 613 if (results.isEmpty()) { 614 incrementCountOfRowsFilteredMetric(scannerContext); 615 boolean moreRows = nextRow(scannerContext, current); 616 if (!moreRows) { 617 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 618 } 619 if (!shouldStop) { 620 // We check size limit because we might have read blocks in the nextRow call above, or 621 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 622 // and for those scans which filter row _cells_ this is the only place we can actually 623 // enforce that the scan does not exceed limits since it bypasses all other checks above. 624 if (scannerContext.checkSizeLimit(limitScope)) { 625 return true; 626 } 627 continue; 628 } 629 } 630 631 if (shouldStop) { 632 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 633 } else { 634 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 635 } 636 } 637 } 638 639 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 640 region.filteredReadRequestsCount.increment(); 641 if (region.getMetrics() != null) { 642 region.getMetrics().updateFilteredRecords(); 643 } 644 645 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 646 return; 647 } 648 649 scannerContext.getMetrics() 650 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1); 651 } 652 653 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 654 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 655 return; 656 } 657 658 scannerContext.getMetrics() 659 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1); 660 } 661 662 /** Returns true when the joined heap may have data for the current row */ 663 private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException { 664 Cell nextJoinedKv = joinedHeap.peek(); 665 boolean matchCurrentRow = 666 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 667 boolean matchAfterSeek = false; 668 669 // If the next value in the joined heap does not match the current row, try to seek to the 670 // correct row 671 if (!matchCurrentRow) { 672 ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 673 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 674 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 675 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 676 } 677 678 return matchCurrentRow || matchAfterSeek; 679 } 680 681 /** 682 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 683 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 684 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 685 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 686 * filterRow() will be skipped. 687 */ 688 private boolean filterRow() throws IOException { 689 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 690 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 691 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 692 } 693 694 private boolean filterRowKey(Cell current) throws IOException { 695 return filter != null && filter.filterRowKey(current); 696 } 697 698 /** 699 * A mocked list implementation - discards all updates. 700 */ 701 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 702 703 @Override 704 public void add(int index, Cell element) { 705 // do nothing 706 } 707 708 @Override 709 public boolean addAll(int index, Collection<? extends Cell> c) { 710 return false; // this list is never changed as a result of an update 711 } 712 713 @Override 714 public KeyValue get(int index) { 715 throw new UnsupportedOperationException(); 716 } 717 718 @Override 719 public int size() { 720 return 0; 721 } 722 }; 723 724 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 725 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 726 727 // Enable skipping row mode, which disables limits and skips tracking progress for all 728 // but block size. We keep tracking block size because skipping a row in this way 729 // might involve reading blocks along the way. 730 scannerContext.setSkippingRow(true); 731 732 Cell next; 733 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 734 // Check for thread interrupt status in case we have been signaled from 735 // #interruptRegionOperation. 736 region.checkInterrupt(); 737 this.storeHeap.next(MOCKED_LIST, scannerContext); 738 } 739 740 scannerContext.setSkippingRow(false); 741 resetFilters(); 742 743 // Calling the hook in CP which allows it to do a fast forward 744 return this.region.getCoprocessorHost() == null 745 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 746 } 747 748 protected boolean shouldStop(Cell currentRowCell) { 749 if (currentRowCell == null) { 750 return true; 751 } 752 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 753 return false; 754 } 755 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 756 return c > 0 || (c == 0 && !includeStopRow); 757 } 758 759 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 760 justification = "this method is only called inside close which is synchronized") 761 private void closeInternal() { 762 if (storeHeap != null) { 763 storeHeap.close(); 764 storeHeap = null; 765 } 766 if (joinedHeap != null) { 767 joinedHeap.close(); 768 joinedHeap = null; 769 } 770 // no need to synchronize here. 771 scannerReadPoints.remove(this); 772 this.filterClosed = true; 773 } 774 775 @Override 776 public synchronized void close() { 777 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 778 } 779 780 @Override 781 public synchronized boolean reseek(byte[] row) throws IOException { 782 return TraceUtil.trace(() -> { 783 if (row == null) { 784 throw new IllegalArgumentException("Row cannot be null."); 785 } 786 boolean result = false; 787 region.startRegionOperation(); 788 ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 789 try { 790 // use request seek to make use of the lazy seek option. See HBASE-5520 791 result = this.storeHeap.requestSeek(kv, true, true); 792 if (this.joinedHeap != null) { 793 result = this.joinedHeap.requestSeek(kv, true, true) || result; 794 } 795 } finally { 796 region.closeRegionOperation(); 797 } 798 return result; 799 }, () -> region.createRegionSpan("RegionScanner.reseek")); 800 } 801 802 @Override 803 public void shipped() throws IOException { 804 if (storeHeap != null) { 805 storeHeap.shipped(); 806 } 807 if (joinedHeap != null) { 808 joinedHeap.shipped(); 809 } 810 } 811 812 @Override 813 public void run() throws IOException { 814 // This is the RPC callback method executed. We do the close in of the scanner in this 815 // callback 816 this.close(); 817 } 818}