001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.UnknownScannerException; 037import org.apache.hadoop.hbase.client.ClientInternalHelper; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 042import org.apache.hadoop.hbase.filter.FilterWrapper; 043import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 044import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 045import org.apache.hadoop.hbase.ipc.RpcCall; 046import org.apache.hadoop.hbase.ipc.RpcCallback; 047import org.apache.hadoop.hbase.ipc.RpcServer; 048import org.apache.hadoop.hbase.regionserver.Region.Operation; 049import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 050import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 051import org.apache.hadoop.hbase.trace.TraceUtil; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058 059/** 060 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 061 */ 062@InterfaceAudience.Private 063public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 064 065 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 066 067 // Package local for testability 068 KeyValueHeap storeHeap = null; 069 070 /** 071 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 072 * if on-demand column family loading is enabled. 073 */ 074 KeyValueHeap joinedHeap = null; 075 076 /** 077 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 078 * for which we are populating the values. 079 */ 080 protected ExtendedCell joinedContinuationRow = null; 081 private boolean filterClosed = false; 082 083 protected final byte[] stopRow; 084 protected final boolean includeStopRow; 085 protected final HRegion region; 086 protected final CellComparator comparator; 087 088 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 089 090 private final long readPt; 091 private final long maxResultSize; 092 private final ScannerContext defaultScannerContext; 093 private final FilterWrapper filter; 094 private final String operationId; 095 096 private RegionServerServices rsServices; 097 098 @Override 099 public RegionInfo getRegionInfo() { 100 return region.getRegionInfo(); 101 } 102 103 private static boolean hasNonce(HRegion region, long nonce) { 104 RegionServerServices rsServices = region.getRegionServerServices(); 105 return nonce != HConstants.NO_NONCE && rsServices != null 106 && rsServices.getNonceManager() != null; 107 } 108 109 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 110 long nonceGroup, long nonce) throws IOException { 111 this.region = region; 112 this.maxResultSize = scan.getMaxResultSize(); 113 if (scan.hasFilter()) { 114 this.filter = new FilterWrapper(scan.getFilter()); 115 } else { 116 this.filter = null; 117 } 118 this.comparator = region.getCellComparator(); 119 /** 120 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 121 * scanner context that can be used to enforce the batch limit in the event that a 122 * ScannerContext is not specified during an invocation of next/nextRaw 123 */ 124 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 125 this.stopRow = scan.getStopRow(); 126 this.includeStopRow = scan.includeStopRow(); 127 this.operationId = scan.getId(); 128 129 // synchronize on scannerReadPoints so that nobody calculates 130 // getSmallestReadPoint, before scannerReadPoints is updated. 131 IsolationLevel isolationLevel = scan.getIsolationLevel(); 132 long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan); 133 this.scannerReadPoints = region.scannerReadPoints; 134 this.rsServices = region.getRegionServerServices(); 135 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 136 try { 137 if (mvccReadPoint > 0) { 138 this.readPt = mvccReadPoint; 139 } else if (hasNonce(region, nonce)) { 140 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 141 } else { 142 this.readPt = region.getReadPoint(isolationLevel); 143 } 144 scannerReadPoints.put(this, this.readPt); 145 } finally { 146 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 147 } 148 initializeScanners(scan, additionalScanners); 149 } 150 151 public ScannerContext getContext() { 152 return defaultScannerContext; 153 } 154 155 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 156 throws IOException { 157 // Here we separate all scanners into two lists - scanner that provide data required 158 // by the filter to operate (scanners list) and all others (joinedScanners list). 159 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 160 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 161 // Store all already instantiated scanners for exception handling 162 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 163 // handle additionalScanners 164 if (additionalScanners != null && !additionalScanners.isEmpty()) { 165 scanners.addAll(additionalScanners); 166 instantiatedScanners.addAll(additionalScanners); 167 } 168 169 try { 170 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 171 HStore store = region.getStore(entry.getKey()); 172 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 173 instantiatedScanners.add(scanner); 174 if ( 175 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 176 || this.filter.isFamilyEssential(entry.getKey()) 177 ) { 178 scanners.add(scanner); 179 } else { 180 joinedScanners.add(scanner); 181 } 182 } 183 initializeKVHeap(scanners, joinedScanners, region); 184 } catch (Throwable t) { 185 throw handleException(instantiatedScanners, t); 186 } 187 } 188 189 protected void initializeKVHeap(List<KeyValueScanner> scanners, 190 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 191 this.storeHeap = new KeyValueHeap(scanners, comparator); 192 if (!joinedScanners.isEmpty()) { 193 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 194 } 195 } 196 197 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 198 // remove scaner read point before throw the exception 199 scannerReadPoints.remove(this); 200 if (storeHeap != null) { 201 storeHeap.close(); 202 storeHeap = null; 203 if (joinedHeap != null) { 204 joinedHeap.close(); 205 joinedHeap = null; 206 } 207 } else { 208 // close all already instantiated scanners before throwing the exception 209 for (KeyValueScanner scanner : instantiatedScanners) { 210 scanner.close(); 211 } 212 } 213 return t instanceof IOException ? (IOException) t : new IOException(t); 214 } 215 216 @Override 217 public long getMaxResultSize() { 218 return maxResultSize; 219 } 220 221 @Override 222 public long getMvccReadPoint() { 223 return this.readPt; 224 } 225 226 @Override 227 public int getBatch() { 228 return this.defaultScannerContext.getBatchLimit(); 229 } 230 231 @Override 232 public String getOperationId() { 233 return operationId; 234 } 235 236 /** 237 * Reset both the filter and the old filter. 238 * @throws IOException in case a filter raises an I/O exception. 239 */ 240 protected final void resetFilters() throws IOException { 241 if (filter != null) { 242 filter.reset(); 243 } 244 } 245 246 @Override 247 public boolean next(List<? super ExtendedCell> outResults) throws IOException { 248 // apply the batching limit by default 249 return next(outResults, defaultScannerContext); 250 } 251 252 @Override 253 public synchronized boolean next(List<? super ExtendedCell> outResults, 254 ScannerContext scannerContext) throws IOException { 255 if (this.filterClosed) { 256 throw new UnknownScannerException("Scanner was closed (timed out?) " 257 + "after we renewed it. Could be caused by a very slow scanner " 258 + "or a lengthy garbage collection"); 259 } 260 region.startRegionOperation(Operation.SCAN); 261 try { 262 return nextRaw(outResults, scannerContext); 263 } finally { 264 region.closeRegionOperation(Operation.SCAN); 265 } 266 } 267 268 @Override 269 public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException { 270 // Use the RegionScanner's context by default 271 return nextRaw(outResults, defaultScannerContext); 272 } 273 274 @Override 275 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext) 276 throws IOException { 277 if (storeHeap == null) { 278 // scanner is closed 279 throw new UnknownScannerException("Scanner was closed"); 280 } 281 boolean moreValues = false; 282 if (outResults.isEmpty()) { 283 // Usually outResults is empty. This is true when next is called 284 // to handle scan or get operation. 285 moreValues = nextInternal(outResults, scannerContext); 286 } else { 287 List<ExtendedCell> tmpList = new ArrayList<>(); 288 moreValues = nextInternal(tmpList, scannerContext); 289 outResults.addAll(tmpList); 290 } 291 region.addReadRequestsCount(1); 292 if (region.getMetrics() != null) { 293 region.getMetrics().updateReadRequestCount(); 294 } 295 296 // If the size limit was reached it means a partial Result is being returned. Returning a 297 // partial Result means that we should not reset the filters; filters should only be reset in 298 // between rows 299 if (!scannerContext.mayHaveMoreCellsInRow()) { 300 resetFilters(); 301 } 302 303 if (isFilterDoneInternal()) { 304 moreValues = false; 305 } 306 return moreValues; 307 } 308 309 /** Returns true if more cells exist after this batch, false if scanner is done */ 310 private boolean populateFromJoinedHeap(List<? super ExtendedCell> results, 311 ScannerContext scannerContext) throws IOException { 312 assert joinedContinuationRow != null; 313 boolean moreValues = 314 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 315 316 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 317 // We are done with this row, reset the continuation. 318 joinedContinuationRow = null; 319 } 320 // As the data is obtained from two independent heaps, we need to 321 // ensure that result list is sorted, because Result relies on that. 322 ((List<Cell>) results).sort(comparator); 323 return moreValues; 324 } 325 326 /** 327 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 328 * reached, or remainingResultSize (if not -1) is reaced 329 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 330 * @return state of last call to {@link KeyValueHeap#next()} 331 */ 332 private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap, 333 ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException { 334 Cell nextKv; 335 boolean moreCellsInRow = false; 336 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 337 // Scanning between column families and thus the scope is between cells 338 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 339 do { 340 // Check for thread interrupt status in case we have been signaled from 341 // #interruptRegionOperation. 342 region.checkInterrupt(); 343 344 // We want to maintain any progress that is made towards the limits while scanning across 345 // different column families. To do this, we toggle the keep progress flag on during calls 346 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 347 scannerContext.setKeepProgress(true); 348 heap.next(results, scannerContext); 349 scannerContext.setKeepProgress(tmpKeepProgress); 350 351 nextKv = heap.peek(); 352 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 353 if (!moreCellsInRow) { 354 incrementCountOfRowsScannedMetric(scannerContext); 355 } 356 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 357 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 358 } else if (scannerContext.checkSizeLimit(limitScope)) { 359 ScannerContext.NextState state = 360 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 361 return scannerContext.setScannerState(state).hasMoreValues(); 362 } else if (scannerContext.checkTimeLimit(limitScope)) { 363 ScannerContext.NextState state = 364 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 365 return scannerContext.setScannerState(state).hasMoreValues(); 366 } 367 } while (moreCellsInRow); 368 return nextKv != null; 369 } 370 371 /** 372 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 373 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 374 * there are more cells to be read in the row. 375 * @return true When there are more cells in the row to be read 376 */ 377 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 378 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 379 } 380 381 /** Returns True if a filter rules the scanner is over, done. */ 382 @Override 383 public synchronized boolean isFilterDone() throws IOException { 384 return isFilterDoneInternal(); 385 } 386 387 private boolean isFilterDoneInternal() throws IOException { 388 return this.filter != null && this.filter.filterAllRemaining(); 389 } 390 391 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 392 if (rpcCall.isPresent()) { 393 // If a user specifies a too-restrictive or too-slow scanner, the 394 // client might time out and disconnect while the server side 395 // is still processing the request. We should abort aggressively 396 // in that case. 397 long afterTime = rpcCall.get().disconnectSince(); 398 if (afterTime >= 0) { 399 throw new CallerDisconnectedException( 400 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 401 + " after " + afterTime + " ms, since " + "caller disconnected"); 402 } 403 } 404 } 405 406 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 407 long initialSizeProgress, long initialHeapSizeProgress) { 408 // Starting to scan a new row. Reset the scanner progress according to whether or not 409 // progress should be kept. 410 if (scannerContext.getKeepProgress()) { 411 // Progress should be kept. Reset to initial values seen at start of method invocation. 412 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 413 initialHeapSizeProgress); 414 } else { 415 scannerContext.clearProgress(); 416 } 417 } 418 419 private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext) 420 throws IOException { 421 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 422 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 423 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 424 425 // Save the initial progress from the Scanner context in these local variables. The progress 426 // may need to be reset a few times if rows are being filtered out so we save the initial 427 // progress. 428 int initialBatchProgress = scannerContext.getBatchProgress(); 429 long initialSizeProgress = scannerContext.getDataSizeProgress(); 430 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 431 432 // Used to check time limit 433 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 434 435 // The loop here is used only when at some point during the next we determine 436 // that due to effects of filters or otherwise, we have an empty row in the result. 437 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 438 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 439 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 440 while (true) { 441 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 442 initialHeapSizeProgress); 443 checkClientDisconnect(rpcCall); 444 445 // Check for thread interrupt status in case we have been signaled from 446 // #interruptRegionOperation. 447 region.checkInterrupt(); 448 449 // Let's see what we have in the storeHeap. 450 ExtendedCell current = this.storeHeap.peek(); 451 452 boolean shouldStop = shouldStop(current); 453 // When has filter row is true it means that the all the cells for a particular row must be 454 // read before a filtering decision can be made. This means that filters where hasFilterRow 455 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 456 // table that has very large rows. 457 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 458 459 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 460 // would prevent the filters from being evaluated. Thus, if it is true, change the 461 // scope of any limits that could potentially create partial results to 462 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 463 if (hasFilterRow) { 464 if (LOG.isTraceEnabled()) { 465 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 466 + " formed. Changing scope of limits that may create partials"); 467 } 468 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 469 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 470 limitScope = LimitScope.BETWEEN_ROWS; 471 } 472 473 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 474 if (hasFilterRow) { 475 throw new IncompatibleFilterException( 476 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 477 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 478 } 479 return true; 480 } 481 482 // Check if we were getting data from the joinedHeap and hit the limit. 483 // If not, then it's main path - getting results from storeHeap. 484 if (joinedContinuationRow == null) { 485 // First, check if we are at a stop row. If so, there are no more results. 486 if (shouldStop) { 487 if (hasFilterRow) { 488 filter.filterRowCells((List<Cell>) results); 489 } 490 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 491 } 492 493 // Check if rowkey filter wants to exclude this row. If so, loop to next. 494 // Technically, if we hit limits before on this row, we don't need this call. 495 if (filterRowKey(current)) { 496 incrementCountOfRowsFilteredMetric(scannerContext); 497 // early check, see HBASE-16296 498 if (isFilterDoneInternal()) { 499 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 500 } 501 // Typically the count of rows scanned is incremented inside #populateResult. However, 502 // here we are filtering a row based purely on its row key, preventing us from calling 503 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 504 incrementCountOfRowsScannedMetric(scannerContext); 505 boolean moreRows = nextRow(scannerContext, current); 506 if (!moreRows) { 507 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 508 } 509 results.clear(); 510 511 // Read nothing as the rowkey was filtered, but still need to check time limit 512 // We also check size limit because we might have read blocks in getting to this point. 513 if (scannerContext.checkAnyLimitReached(limitScope)) { 514 return true; 515 } 516 continue; 517 } 518 519 // Ok, we are good, let's try to get some results from the main heap. 520 populateResult(results, this.storeHeap, scannerContext, current); 521 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 522 if (hasFilterRow) { 523 throw new IncompatibleFilterException( 524 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 525 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 526 } 527 return true; 528 } 529 530 // Check for thread interrupt status in case we have been signaled from 531 // #interruptRegionOperation. 532 region.checkInterrupt(); 533 534 Cell nextKv = this.storeHeap.peek(); 535 shouldStop = shouldStop(nextKv); 536 // save that the row was empty before filters applied to it. 537 final boolean isEmptyRow = results.isEmpty(); 538 539 // We have the part of the row necessary for filtering (all of it, usually). 540 // First filter with the filterRow(List). 541 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 542 if (hasFilterRow) { 543 ret = filter.filterRowCellsWithRet((List<Cell>) results); 544 545 // We don't know how the results have changed after being filtered. Must set progress 546 // according to contents of results now. 547 if (scannerContext.getKeepProgress()) { 548 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 549 initialHeapSizeProgress); 550 } else { 551 scannerContext.clearProgress(); 552 } 553 scannerContext.incrementBatchProgress(results.size()); 554 for (ExtendedCell cell : (List<ExtendedCell>) results) { 555 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 556 cell.heapSize()); 557 } 558 } 559 560 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 561 incrementCountOfRowsFilteredMetric(scannerContext); 562 results.clear(); 563 boolean moreRows = nextRow(scannerContext, current); 564 if (!moreRows) { 565 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 566 } 567 568 // This row was totally filtered out, if this is NOT the last row, 569 // we should continue on. Otherwise, nothing else to do. 570 if (!shouldStop) { 571 // Read nothing as the cells was filtered, but still need to check time limit. 572 // We also check size limit because we might have read blocks in getting to this point. 573 if (scannerContext.checkAnyLimitReached(limitScope)) { 574 return true; 575 } 576 continue; 577 } 578 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 579 } 580 581 // Ok, we are done with storeHeap for this row. 582 // Now we may need to fetch additional, non-essential data into row. 583 // These values are not needed for filter to work, so we postpone their 584 // fetch to (possibly) reduce amount of data loads from disk. 585 if (this.joinedHeap != null) { 586 boolean mayHaveData = joinedHeapMayHaveData(current); 587 if (mayHaveData) { 588 joinedContinuationRow = current; 589 populateFromJoinedHeap(results, scannerContext); 590 591 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 592 return true; 593 } 594 } 595 } 596 } else { 597 // Populating from the joined heap was stopped by limits, populate some more. 598 populateFromJoinedHeap(results, scannerContext); 599 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 600 return true; 601 } 602 } 603 // We may have just called populateFromJoinedMap and hit the limits. If that is 604 // the case, we need to call it again on the next next() invocation. 605 if (joinedContinuationRow != null) { 606 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 607 } 608 609 // Finally, we are done with both joinedHeap and storeHeap. 610 // Double check to prevent empty rows from appearing in result. It could be 611 // the case when SingleColumnValueExcludeFilter is used. 612 if (results.isEmpty()) { 613 incrementCountOfRowsFilteredMetric(scannerContext); 614 boolean moreRows = nextRow(scannerContext, current); 615 if (!moreRows) { 616 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 617 } 618 if (!shouldStop) { 619 // We check size limit because we might have read blocks in the nextRow call above, or 620 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 621 // and for those scans which filter row _cells_ this is the only place we can actually 622 // enforce that the scan does not exceed limits since it bypasses all other checks above. 623 if (scannerContext.checkSizeLimit(limitScope)) { 624 return true; 625 } 626 continue; 627 } 628 } 629 630 if (shouldStop) { 631 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 632 } else { 633 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 634 } 635 } 636 } 637 638 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 639 region.filteredReadRequestsCount.increment(); 640 if (region.getMetrics() != null) { 641 region.getMetrics().updateFilteredRecords(); 642 } 643 644 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 645 return; 646 } 647 648 scannerContext.getMetrics() 649 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1); 650 } 651 652 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 653 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 654 return; 655 } 656 657 scannerContext.getMetrics() 658 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1); 659 } 660 661 /** Returns true when the joined heap may have data for the current row */ 662 private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException { 663 Cell nextJoinedKv = joinedHeap.peek(); 664 boolean matchCurrentRow = 665 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 666 boolean matchAfterSeek = false; 667 668 // If the next value in the joined heap does not match the current row, try to seek to the 669 // correct row 670 if (!matchCurrentRow) { 671 ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 672 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 673 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 674 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 675 } 676 677 return matchCurrentRow || matchAfterSeek; 678 } 679 680 /** 681 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 682 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 683 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 684 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 685 * filterRow() will be skipped. 686 */ 687 private boolean filterRow() throws IOException { 688 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 689 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 690 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 691 } 692 693 private boolean filterRowKey(Cell current) throws IOException { 694 return filter != null && filter.filterRowKey(current); 695 } 696 697 /** 698 * A mocked list implementation - discards all updates. 699 */ 700 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 701 702 @Override 703 public void add(int index, Cell element) { 704 // do nothing 705 } 706 707 @Override 708 public boolean addAll(int index, Collection<? extends Cell> c) { 709 return false; // this list is never changed as a result of an update 710 } 711 712 @Override 713 public KeyValue get(int index) { 714 throw new UnsupportedOperationException(); 715 } 716 717 @Override 718 public int size() { 719 return 0; 720 } 721 }; 722 723 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 724 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 725 726 // Enable skipping row mode, which disables limits and skips tracking progress for all 727 // but block size. We keep tracking block size because skipping a row in this way 728 // might involve reading blocks along the way. 729 scannerContext.setSkippingRow(true); 730 731 Cell next; 732 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 733 // Check for thread interrupt status in case we have been signaled from 734 // #interruptRegionOperation. 735 region.checkInterrupt(); 736 this.storeHeap.next(MOCKED_LIST, scannerContext); 737 } 738 739 scannerContext.setSkippingRow(false); 740 resetFilters(); 741 742 // Calling the hook in CP which allows it to do a fast forward 743 return this.region.getCoprocessorHost() == null 744 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 745 } 746 747 protected boolean shouldStop(Cell currentRowCell) { 748 if (currentRowCell == null) { 749 return true; 750 } 751 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 752 return false; 753 } 754 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 755 return c > 0 || (c == 0 && !includeStopRow); 756 } 757 758 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 759 justification = "this method is only called inside close which is synchronized") 760 private void closeInternal() { 761 if (storeHeap != null) { 762 storeHeap.close(); 763 storeHeap = null; 764 } 765 if (joinedHeap != null) { 766 joinedHeap.close(); 767 joinedHeap = null; 768 } 769 // no need to synchronize here. 770 scannerReadPoints.remove(this); 771 this.filterClosed = true; 772 } 773 774 @Override 775 public synchronized void close() { 776 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 777 } 778 779 @Override 780 public synchronized boolean reseek(byte[] row) throws IOException { 781 return TraceUtil.trace(() -> { 782 if (row == null) { 783 throw new IllegalArgumentException("Row cannot be null."); 784 } 785 boolean result = false; 786 region.startRegionOperation(); 787 ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 788 try { 789 // use request seek to make use of the lazy seek option. See HBASE-5520 790 result = this.storeHeap.requestSeek(kv, true, true); 791 if (this.joinedHeap != null) { 792 result = this.joinedHeap.requestSeek(kv, true, true) || result; 793 } 794 } finally { 795 region.closeRegionOperation(); 796 } 797 return result; 798 }, () -> region.createRegionSpan("RegionScanner.reseek")); 799 } 800 801 @Override 802 public void shipped() throws IOException { 803 if (storeHeap != null) { 804 storeHeap.shipped(); 805 } 806 if (joinedHeap != null) { 807 joinedHeap.shipped(); 808 } 809 } 810 811 @Override 812 public void run() throws IOException { 813 // This is the RPC callback method executed. We do the close in of the scanner in this 814 // callback 815 this.close(); 816 } 817}