001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.UnknownScannerException; 037import org.apache.hadoop.hbase.client.ClientInternalHelper; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.filter.FilterWrapper; 042import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 043import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 044import org.apache.hadoop.hbase.ipc.RpcCall; 045import org.apache.hadoop.hbase.ipc.RpcCallback; 046import org.apache.hadoop.hbase.ipc.RpcServer; 047import org.apache.hadoop.hbase.regionserver.Region.Operation; 048import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 049import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 050import org.apache.hadoop.hbase.trace.TraceUtil; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 057 058/** 059 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 060 */ 061@InterfaceAudience.Private 062class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 063 064 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 065 066 // Package local for testability 067 KeyValueHeap storeHeap = null; 068 069 /** 070 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 071 * if on-demand column family loading is enabled. 072 */ 073 KeyValueHeap joinedHeap = null; 074 075 /** 076 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 077 * for which we are populating the values. 078 */ 079 protected ExtendedCell joinedContinuationRow = null; 080 private boolean filterClosed = false; 081 082 protected final byte[] stopRow; 083 protected final boolean includeStopRow; 084 protected final HRegion region; 085 protected final CellComparator comparator; 086 087 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 088 089 private final long readPt; 090 private final long maxResultSize; 091 private final ScannerContext defaultScannerContext; 092 private final FilterWrapper filter; 093 private final String operationId; 094 095 private RegionServerServices rsServices; 096 097 @Override 098 public RegionInfo getRegionInfo() { 099 return region.getRegionInfo(); 100 } 101 102 private static boolean hasNonce(HRegion region, long nonce) { 103 RegionServerServices rsServices = region.getRegionServerServices(); 104 return nonce != HConstants.NO_NONCE && rsServices != null 105 && rsServices.getNonceManager() != null; 106 } 107 108 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 109 long nonceGroup, long nonce) throws IOException { 110 this.region = region; 111 this.maxResultSize = scan.getMaxResultSize(); 112 if (scan.hasFilter()) { 113 this.filter = new FilterWrapper(scan.getFilter()); 114 } else { 115 this.filter = null; 116 } 117 this.comparator = region.getCellComparator(); 118 /** 119 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 120 * scanner context that can be used to enforce the batch limit in the event that a 121 * ScannerContext is not specified during an invocation of next/nextRaw 122 */ 123 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 124 this.stopRow = scan.getStopRow(); 125 this.includeStopRow = scan.includeStopRow(); 126 this.operationId = scan.getId(); 127 128 // synchronize on scannerReadPoints so that nobody calculates 129 // getSmallestReadPoint, before scannerReadPoints is updated. 130 IsolationLevel isolationLevel = scan.getIsolationLevel(); 131 long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan); 132 this.scannerReadPoints = region.scannerReadPoints; 133 this.rsServices = region.getRegionServerServices(); 134 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 135 try { 136 if (mvccReadPoint > 0) { 137 this.readPt = mvccReadPoint; 138 } else if (hasNonce(region, nonce)) { 139 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 140 } else { 141 this.readPt = region.getReadPoint(isolationLevel); 142 } 143 scannerReadPoints.put(this, this.readPt); 144 } finally { 145 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 146 } 147 initializeScanners(scan, additionalScanners); 148 } 149 150 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 151 throws IOException { 152 // Here we separate all scanners into two lists - scanner that provide data required 153 // by the filter to operate (scanners list) and all others (joinedScanners list). 154 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 155 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 156 // Store all already instantiated scanners for exception handling 157 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 158 // handle additionalScanners 159 if (additionalScanners != null && !additionalScanners.isEmpty()) { 160 scanners.addAll(additionalScanners); 161 instantiatedScanners.addAll(additionalScanners); 162 } 163 164 try { 165 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 166 HStore store = region.getStore(entry.getKey()); 167 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 168 instantiatedScanners.add(scanner); 169 if ( 170 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 171 || this.filter.isFamilyEssential(entry.getKey()) 172 ) { 173 scanners.add(scanner); 174 } else { 175 joinedScanners.add(scanner); 176 } 177 } 178 initializeKVHeap(scanners, joinedScanners, region); 179 } catch (Throwable t) { 180 throw handleException(instantiatedScanners, t); 181 } 182 } 183 184 protected void initializeKVHeap(List<KeyValueScanner> scanners, 185 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 186 this.storeHeap = new KeyValueHeap(scanners, comparator); 187 if (!joinedScanners.isEmpty()) { 188 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 189 } 190 } 191 192 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 193 // remove scaner read point before throw the exception 194 scannerReadPoints.remove(this); 195 if (storeHeap != null) { 196 storeHeap.close(); 197 storeHeap = null; 198 if (joinedHeap != null) { 199 joinedHeap.close(); 200 joinedHeap = null; 201 } 202 } else { 203 // close all already instantiated scanners before throwing the exception 204 for (KeyValueScanner scanner : instantiatedScanners) { 205 scanner.close(); 206 } 207 } 208 return t instanceof IOException ? (IOException) t : new IOException(t); 209 } 210 211 @Override 212 public long getMaxResultSize() { 213 return maxResultSize; 214 } 215 216 @Override 217 public long getMvccReadPoint() { 218 return this.readPt; 219 } 220 221 @Override 222 public int getBatch() { 223 return this.defaultScannerContext.getBatchLimit(); 224 } 225 226 @Override 227 public String getOperationId() { 228 return operationId; 229 } 230 231 /** 232 * Reset both the filter and the old filter. 233 * @throws IOException in case a filter raises an I/O exception. 234 */ 235 protected final void resetFilters() throws IOException { 236 if (filter != null) { 237 filter.reset(); 238 } 239 } 240 241 @Override 242 public boolean next(List<? super ExtendedCell> outResults) throws IOException { 243 // apply the batching limit by default 244 return next(outResults, defaultScannerContext); 245 } 246 247 @Override 248 public synchronized boolean next(List<? super ExtendedCell> outResults, 249 ScannerContext scannerContext) throws IOException { 250 if (this.filterClosed) { 251 throw new UnknownScannerException("Scanner was closed (timed out?) " 252 + "after we renewed it. Could be caused by a very slow scanner " 253 + "or a lengthy garbage collection"); 254 } 255 region.startRegionOperation(Operation.SCAN); 256 try { 257 return nextRaw(outResults, scannerContext); 258 } finally { 259 region.closeRegionOperation(Operation.SCAN); 260 } 261 } 262 263 @Override 264 public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException { 265 // Use the RegionScanner's context by default 266 return nextRaw(outResults, defaultScannerContext); 267 } 268 269 @Override 270 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext) 271 throws IOException { 272 if (storeHeap == null) { 273 // scanner is closed 274 throw new UnknownScannerException("Scanner was closed"); 275 } 276 boolean moreValues = false; 277 if (outResults.isEmpty()) { 278 // Usually outResults is empty. This is true when next is called 279 // to handle scan or get operation. 280 moreValues = nextInternal(outResults, scannerContext); 281 } else { 282 List<ExtendedCell> tmpList = new ArrayList<>(); 283 moreValues = nextInternal(tmpList, scannerContext); 284 outResults.addAll(tmpList); 285 } 286 287 region.addReadRequestsCount(1); 288 if (region.getMetrics() != null) { 289 region.getMetrics().updateReadRequestCount(); 290 } 291 292 // If the size limit was reached it means a partial Result is being returned. Returning a 293 // partial Result means that we should not reset the filters; filters should only be reset in 294 // between rows 295 if (!scannerContext.mayHaveMoreCellsInRow()) { 296 resetFilters(); 297 } 298 299 if (isFilterDoneInternal()) { 300 moreValues = false; 301 } 302 return moreValues; 303 } 304 305 /** Returns true if more cells exist after this batch, false if scanner is done */ 306 private boolean populateFromJoinedHeap(List<? super ExtendedCell> results, 307 ScannerContext scannerContext) throws IOException { 308 assert joinedContinuationRow != null; 309 boolean moreValues = 310 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 311 312 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 313 // We are done with this row, reset the continuation. 314 joinedContinuationRow = null; 315 } 316 // As the data is obtained from two independent heaps, we need to 317 // ensure that result list is sorted, because Result relies on that. 318 ((List<Cell>) results).sort(comparator); 319 return moreValues; 320 } 321 322 /** 323 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 324 * reached, or remainingResultSize (if not -1) is reaced 325 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 326 * @return state of last call to {@link KeyValueHeap#next()} 327 */ 328 private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap, 329 ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException { 330 Cell nextKv; 331 boolean moreCellsInRow = false; 332 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 333 // Scanning between column families and thus the scope is between cells 334 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 335 do { 336 // Check for thread interrupt status in case we have been signaled from 337 // #interruptRegionOperation. 338 region.checkInterrupt(); 339 340 // We want to maintain any progress that is made towards the limits while scanning across 341 // different column families. To do this, we toggle the keep progress flag on during calls 342 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 343 scannerContext.setKeepProgress(true); 344 heap.next(results, scannerContext); 345 scannerContext.setKeepProgress(tmpKeepProgress); 346 347 nextKv = heap.peek(); 348 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 349 if (!moreCellsInRow) { 350 incrementCountOfRowsScannedMetric(scannerContext); 351 } 352 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 353 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 354 } else if (scannerContext.checkSizeLimit(limitScope)) { 355 ScannerContext.NextState state = 356 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 357 return scannerContext.setScannerState(state).hasMoreValues(); 358 } else if (scannerContext.checkTimeLimit(limitScope)) { 359 ScannerContext.NextState state = 360 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 361 return scannerContext.setScannerState(state).hasMoreValues(); 362 } 363 } while (moreCellsInRow); 364 return nextKv != null; 365 } 366 367 /** 368 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 369 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 370 * there are more cells to be read in the row. 371 * @return true When there are more cells in the row to be read 372 */ 373 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 374 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 375 } 376 377 /** Returns True if a filter rules the scanner is over, done. */ 378 @Override 379 public synchronized boolean isFilterDone() throws IOException { 380 return isFilterDoneInternal(); 381 } 382 383 private boolean isFilterDoneInternal() throws IOException { 384 return this.filter != null && this.filter.filterAllRemaining(); 385 } 386 387 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 388 if (rpcCall.isPresent()) { 389 // If a user specifies a too-restrictive or too-slow scanner, the 390 // client might time out and disconnect while the server side 391 // is still processing the request. We should abort aggressively 392 // in that case. 393 long afterTime = rpcCall.get().disconnectSince(); 394 if (afterTime >= 0) { 395 throw new CallerDisconnectedException( 396 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 397 + " after " + afterTime + " ms, since " + "caller disconnected"); 398 } 399 } 400 } 401 402 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 403 long initialSizeProgress, long initialHeapSizeProgress) { 404 // Starting to scan a new row. Reset the scanner progress according to whether or not 405 // progress should be kept. 406 if (scannerContext.getKeepProgress()) { 407 // Progress should be kept. Reset to initial values seen at start of method invocation. 408 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 409 initialHeapSizeProgress); 410 } else { 411 scannerContext.clearProgress(); 412 } 413 } 414 415 private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext) 416 throws IOException { 417 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 418 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 419 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 420 421 // Save the initial progress from the Scanner context in these local variables. The progress 422 // may need to be reset a few times if rows are being filtered out so we save the initial 423 // progress. 424 int initialBatchProgress = scannerContext.getBatchProgress(); 425 long initialSizeProgress = scannerContext.getDataSizeProgress(); 426 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 427 428 // Used to check time limit 429 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 430 431 // The loop here is used only when at some point during the next we determine 432 // that due to effects of filters or otherwise, we have an empty row in the result. 433 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 434 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 435 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 436 while (true) { 437 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 438 initialHeapSizeProgress); 439 checkClientDisconnect(rpcCall); 440 441 // Check for thread interrupt status in case we have been signaled from 442 // #interruptRegionOperation. 443 region.checkInterrupt(); 444 445 // Let's see what we have in the storeHeap. 446 ExtendedCell current = this.storeHeap.peek(); 447 448 boolean shouldStop = shouldStop(current); 449 // When has filter row is true it means that the all the cells for a particular row must be 450 // read before a filtering decision can be made. This means that filters where hasFilterRow 451 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 452 // table that has very large rows. 453 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 454 455 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 456 // would prevent the filters from being evaluated. Thus, if it is true, change the 457 // scope of any limits that could potentially create partial results to 458 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 459 if (hasFilterRow) { 460 if (LOG.isTraceEnabled()) { 461 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 462 + " formed. Changing scope of limits that may create partials"); 463 } 464 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 465 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 466 limitScope = LimitScope.BETWEEN_ROWS; 467 } 468 469 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 470 if (hasFilterRow) { 471 throw new IncompatibleFilterException( 472 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 473 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 474 } 475 return true; 476 } 477 478 // Check if we were getting data from the joinedHeap and hit the limit. 479 // If not, then it's main path - getting results from storeHeap. 480 if (joinedContinuationRow == null) { 481 // First, check if we are at a stop row. If so, there are no more results. 482 if (shouldStop) { 483 if (hasFilterRow) { 484 filter.filterRowCells((List<Cell>) results); 485 } 486 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 487 } 488 489 // Check if rowkey filter wants to exclude this row. If so, loop to next. 490 // Technically, if we hit limits before on this row, we don't need this call. 491 if (filterRowKey(current)) { 492 incrementCountOfRowsFilteredMetric(scannerContext); 493 // early check, see HBASE-16296 494 if (isFilterDoneInternal()) { 495 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 496 } 497 // Typically the count of rows scanned is incremented inside #populateResult. However, 498 // here we are filtering a row based purely on its row key, preventing us from calling 499 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 500 incrementCountOfRowsScannedMetric(scannerContext); 501 boolean moreRows = nextRow(scannerContext, current); 502 if (!moreRows) { 503 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 504 } 505 results.clear(); 506 507 // Read nothing as the rowkey was filtered, but still need to check time limit 508 // We also check size limit because we might have read blocks in getting to this point. 509 if (scannerContext.checkAnyLimitReached(limitScope)) { 510 return true; 511 } 512 continue; 513 } 514 515 // Ok, we are good, let's try to get some results from the main heap. 516 populateResult(results, this.storeHeap, scannerContext, current); 517 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 518 if (hasFilterRow) { 519 throw new IncompatibleFilterException( 520 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 521 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 522 } 523 return true; 524 } 525 526 // Check for thread interrupt status in case we have been signaled from 527 // #interruptRegionOperation. 528 region.checkInterrupt(); 529 530 Cell nextKv = this.storeHeap.peek(); 531 shouldStop = shouldStop(nextKv); 532 // save that the row was empty before filters applied to it. 533 final boolean isEmptyRow = results.isEmpty(); 534 535 // We have the part of the row necessary for filtering (all of it, usually). 536 // First filter with the filterRow(List). 537 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 538 if (hasFilterRow) { 539 ret = filter.filterRowCellsWithRet((List<Cell>) results); 540 541 // We don't know how the results have changed after being filtered. Must set progress 542 // according to contents of results now. 543 if (scannerContext.getKeepProgress()) { 544 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 545 initialHeapSizeProgress); 546 } else { 547 scannerContext.clearProgress(); 548 } 549 scannerContext.incrementBatchProgress(results.size()); 550 for (ExtendedCell cell : (List<ExtendedCell>) results) { 551 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 552 cell.heapSize()); 553 } 554 } 555 556 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 557 incrementCountOfRowsFilteredMetric(scannerContext); 558 results.clear(); 559 boolean moreRows = nextRow(scannerContext, current); 560 if (!moreRows) { 561 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 562 } 563 564 // This row was totally filtered out, if this is NOT the last row, 565 // we should continue on. Otherwise, nothing else to do. 566 if (!shouldStop) { 567 // Read nothing as the cells was filtered, but still need to check time limit. 568 // We also check size limit because we might have read blocks in getting to this point. 569 if (scannerContext.checkAnyLimitReached(limitScope)) { 570 return true; 571 } 572 continue; 573 } 574 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 575 } 576 577 // Ok, we are done with storeHeap for this row. 578 // Now we may need to fetch additional, non-essential data into row. 579 // These values are not needed for filter to work, so we postpone their 580 // fetch to (possibly) reduce amount of data loads from disk. 581 if (this.joinedHeap != null) { 582 boolean mayHaveData = joinedHeapMayHaveData(current); 583 if (mayHaveData) { 584 joinedContinuationRow = current; 585 populateFromJoinedHeap(results, scannerContext); 586 587 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 588 return true; 589 } 590 } 591 } 592 } else { 593 // Populating from the joined heap was stopped by limits, populate some more. 594 populateFromJoinedHeap(results, scannerContext); 595 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 596 return true; 597 } 598 } 599 // We may have just called populateFromJoinedMap and hit the limits. If that is 600 // the case, we need to call it again on the next next() invocation. 601 if (joinedContinuationRow != null) { 602 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 603 } 604 605 // Finally, we are done with both joinedHeap and storeHeap. 606 // Double check to prevent empty rows from appearing in result. It could be 607 // the case when SingleColumnValueExcludeFilter is used. 608 if (results.isEmpty()) { 609 incrementCountOfRowsFilteredMetric(scannerContext); 610 boolean moreRows = nextRow(scannerContext, current); 611 if (!moreRows) { 612 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 613 } 614 if (!shouldStop) { 615 // We check size limit because we might have read blocks in the nextRow call above, or 616 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 617 // and for those scans which filter row _cells_ this is the only place we can actually 618 // enforce that the scan does not exceed limits since it bypasses all other checks above. 619 if (scannerContext.checkSizeLimit(limitScope)) { 620 return true; 621 } 622 continue; 623 } 624 } 625 626 if (shouldStop) { 627 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 628 } else { 629 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 630 } 631 } 632 } 633 634 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 635 region.filteredReadRequestsCount.increment(); 636 if (region.getMetrics() != null) { 637 region.getMetrics().updateFilteredRecords(); 638 } 639 640 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 641 return; 642 } 643 644 scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); 645 } 646 647 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 648 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 649 return; 650 } 651 652 scannerContext.getMetrics().countOfRowsScanned.incrementAndGet(); 653 } 654 655 /** Returns true when the joined heap may have data for the current row */ 656 private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException { 657 Cell nextJoinedKv = joinedHeap.peek(); 658 boolean matchCurrentRow = 659 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 660 boolean matchAfterSeek = false; 661 662 // If the next value in the joined heap does not match the current row, try to seek to the 663 // correct row 664 if (!matchCurrentRow) { 665 ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 666 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 667 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 668 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 669 } 670 671 return matchCurrentRow || matchAfterSeek; 672 } 673 674 /** 675 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 676 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 677 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 678 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 679 * filterRow() will be skipped. 680 */ 681 private boolean filterRow() throws IOException { 682 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 683 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 684 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 685 } 686 687 private boolean filterRowKey(Cell current) throws IOException { 688 return filter != null && filter.filterRowKey(current); 689 } 690 691 /** 692 * A mocked list implementation - discards all updates. 693 */ 694 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 695 696 @Override 697 public void add(int index, Cell element) { 698 // do nothing 699 } 700 701 @Override 702 public boolean addAll(int index, Collection<? extends Cell> c) { 703 return false; // this list is never changed as a result of an update 704 } 705 706 @Override 707 public KeyValue get(int index) { 708 throw new UnsupportedOperationException(); 709 } 710 711 @Override 712 public int size() { 713 return 0; 714 } 715 }; 716 717 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 718 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 719 720 // Enable skipping row mode, which disables limits and skips tracking progress for all 721 // but block size. We keep tracking block size because skipping a row in this way 722 // might involve reading blocks along the way. 723 scannerContext.setSkippingRow(true); 724 725 Cell next; 726 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 727 // Check for thread interrupt status in case we have been signaled from 728 // #interruptRegionOperation. 729 region.checkInterrupt(); 730 this.storeHeap.next(MOCKED_LIST, scannerContext); 731 } 732 733 scannerContext.setSkippingRow(false); 734 resetFilters(); 735 736 // Calling the hook in CP which allows it to do a fast forward 737 return this.region.getCoprocessorHost() == null 738 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 739 } 740 741 protected boolean shouldStop(Cell currentRowCell) { 742 if (currentRowCell == null) { 743 return true; 744 } 745 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 746 return false; 747 } 748 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 749 return c > 0 || (c == 0 && !includeStopRow); 750 } 751 752 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 753 justification = "this method is only called inside close which is synchronized") 754 private void closeInternal() { 755 if (storeHeap != null) { 756 storeHeap.close(); 757 storeHeap = null; 758 } 759 if (joinedHeap != null) { 760 joinedHeap.close(); 761 joinedHeap = null; 762 } 763 // no need to synchronize here. 764 scannerReadPoints.remove(this); 765 this.filterClosed = true; 766 } 767 768 @Override 769 public synchronized void close() { 770 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 771 } 772 773 @Override 774 public synchronized boolean reseek(byte[] row) throws IOException { 775 return TraceUtil.trace(() -> { 776 if (row == null) { 777 throw new IllegalArgumentException("Row cannot be null."); 778 } 779 boolean result = false; 780 region.startRegionOperation(); 781 ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 782 try { 783 // use request seek to make use of the lazy seek option. See HBASE-5520 784 result = this.storeHeap.requestSeek(kv, true, true); 785 if (this.joinedHeap != null) { 786 result = this.joinedHeap.requestSeek(kv, true, true) || result; 787 } 788 } finally { 789 region.closeRegionOperation(); 790 } 791 return result; 792 }, () -> region.createRegionSpan("RegionScanner.reseek")); 793 } 794 795 @Override 796 public void shipped() throws IOException { 797 if (storeHeap != null) { 798 storeHeap.shipped(); 799 } 800 if (joinedHeap != null) { 801 joinedHeap.shipped(); 802 } 803 } 804 805 @Override 806 public void run() throws IOException { 807 // This is the RPC callback method executed. We do the close in of the scanner in this 808 // callback 809 this.close(); 810 } 811}