001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.UnknownScannerException; 036import org.apache.hadoop.hbase.client.IsolationLevel; 037import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.filter.FilterWrapper; 041import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 042import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 043import org.apache.hadoop.hbase.ipc.RpcCall; 044import org.apache.hadoop.hbase.ipc.RpcCallback; 045import org.apache.hadoop.hbase.ipc.RpcServer; 046import org.apache.hadoop.hbase.regionserver.Region.Operation; 047import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 048import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 049import org.apache.hadoop.hbase.trace.TraceUtil; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 056 057/** 058 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 059 */ 060@InterfaceAudience.Private 061class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 062 063 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 064 065 // Package local for testability 066 KeyValueHeap storeHeap = null; 067 068 /** 069 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 070 * if on-demand column family loading is enabled. 071 */ 072 KeyValueHeap joinedHeap = null; 073 074 /** 075 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 076 * for which we are populating the values. 077 */ 078 protected Cell joinedContinuationRow = null; 079 private boolean filterClosed = false; 080 081 protected final byte[] stopRow; 082 protected final boolean includeStopRow; 083 protected final HRegion region; 084 protected final CellComparator comparator; 085 086 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 087 088 private final long readPt; 089 private final long maxResultSize; 090 private final ScannerContext defaultScannerContext; 091 private final FilterWrapper filter; 092 private final String operationId; 093 094 private RegionServerServices rsServices; 095 096 @Override 097 public RegionInfo getRegionInfo() { 098 return region.getRegionInfo(); 099 } 100 101 private static boolean hasNonce(HRegion region, long nonce) { 102 RegionServerServices rsServices = region.getRegionServerServices(); 103 return nonce != HConstants.NO_NONCE && rsServices != null 104 && rsServices.getNonceManager() != null; 105 } 106 107 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 108 long nonceGroup, long nonce) throws IOException { 109 this.region = region; 110 this.maxResultSize = scan.getMaxResultSize(); 111 if (scan.hasFilter()) { 112 this.filter = new FilterWrapper(scan.getFilter()); 113 } else { 114 this.filter = null; 115 } 116 this.comparator = region.getCellComparator(); 117 /** 118 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 119 * scanner context that can be used to enforce the batch limit in the event that a 120 * ScannerContext is not specified during an invocation of next/nextRaw 121 */ 122 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 123 this.stopRow = scan.getStopRow(); 124 this.includeStopRow = scan.includeStopRow(); 125 this.operationId = scan.getId(); 126 127 // synchronize on scannerReadPoints so that nobody calculates 128 // getSmallestReadPoint, before scannerReadPoints is updated. 129 IsolationLevel isolationLevel = scan.getIsolationLevel(); 130 long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); 131 this.scannerReadPoints = region.scannerReadPoints; 132 this.rsServices = region.getRegionServerServices(); 133 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 134 try { 135 if (mvccReadPoint > 0) { 136 this.readPt = mvccReadPoint; 137 } else if (hasNonce(region, nonce)) { 138 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 139 } else { 140 this.readPt = region.getReadPoint(isolationLevel); 141 } 142 scannerReadPoints.put(this, this.readPt); 143 } finally { 144 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 145 } 146 initializeScanners(scan, additionalScanners); 147 } 148 149 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 150 throws IOException { 151 // Here we separate all scanners into two lists - scanner that provide data required 152 // by the filter to operate (scanners list) and all others (joinedScanners list). 153 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 154 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 155 // Store all already instantiated scanners for exception handling 156 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 157 // handle additionalScanners 158 if (additionalScanners != null && !additionalScanners.isEmpty()) { 159 scanners.addAll(additionalScanners); 160 instantiatedScanners.addAll(additionalScanners); 161 } 162 163 try { 164 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 165 HStore store = region.getStore(entry.getKey()); 166 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 167 instantiatedScanners.add(scanner); 168 if ( 169 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 170 || this.filter.isFamilyEssential(entry.getKey()) 171 ) { 172 scanners.add(scanner); 173 } else { 174 joinedScanners.add(scanner); 175 } 176 } 177 initializeKVHeap(scanners, joinedScanners, region); 178 } catch (Throwable t) { 179 throw handleException(instantiatedScanners, t); 180 } 181 } 182 183 protected void initializeKVHeap(List<KeyValueScanner> scanners, 184 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 185 this.storeHeap = new KeyValueHeap(scanners, comparator); 186 if (!joinedScanners.isEmpty()) { 187 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 188 } 189 } 190 191 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 192 // remove scaner read point before throw the exception 193 scannerReadPoints.remove(this); 194 if (storeHeap != null) { 195 storeHeap.close(); 196 storeHeap = null; 197 if (joinedHeap != null) { 198 joinedHeap.close(); 199 joinedHeap = null; 200 } 201 } else { 202 // close all already instantiated scanners before throwing the exception 203 for (KeyValueScanner scanner : instantiatedScanners) { 204 scanner.close(); 205 } 206 } 207 return t instanceof IOException ? (IOException) t : new IOException(t); 208 } 209 210 @Override 211 public long getMaxResultSize() { 212 return maxResultSize; 213 } 214 215 @Override 216 public long getMvccReadPoint() { 217 return this.readPt; 218 } 219 220 @Override 221 public int getBatch() { 222 return this.defaultScannerContext.getBatchLimit(); 223 } 224 225 @Override 226 public String getOperationId() { 227 return operationId; 228 } 229 230 /** 231 * Reset both the filter and the old filter. 232 * @throws IOException in case a filter raises an I/O exception. 233 */ 234 protected final void resetFilters() throws IOException { 235 if (filter != null) { 236 filter.reset(); 237 } 238 } 239 240 @Override 241 public boolean next(List<Cell> outResults) throws IOException { 242 // apply the batching limit by default 243 return next(outResults, defaultScannerContext); 244 } 245 246 @Override 247 public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) 248 throws IOException { 249 if (this.filterClosed) { 250 throw new UnknownScannerException("Scanner was closed (timed out?) " 251 + "after we renewed it. Could be caused by a very slow scanner " 252 + "or a lengthy garbage collection"); 253 } 254 region.startRegionOperation(Operation.SCAN); 255 try { 256 return nextRaw(outResults, scannerContext); 257 } finally { 258 region.closeRegionOperation(Operation.SCAN); 259 } 260 } 261 262 @Override 263 public boolean nextRaw(List<Cell> outResults) throws IOException { 264 // Use the RegionScanner's context by default 265 return nextRaw(outResults, defaultScannerContext); 266 } 267 268 @Override 269 public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException { 270 if (storeHeap == null) { 271 // scanner is closed 272 throw new UnknownScannerException("Scanner was closed"); 273 } 274 boolean moreValues = false; 275 if (outResults.isEmpty()) { 276 // Usually outResults is empty. This is true when next is called 277 // to handle scan or get operation. 278 moreValues = nextInternal(outResults, scannerContext); 279 } else { 280 List<Cell> tmpList = new ArrayList<>(); 281 moreValues = nextInternal(tmpList, scannerContext); 282 outResults.addAll(tmpList); 283 } 284 285 region.addReadRequestsCount(1); 286 if (region.getMetrics() != null) { 287 region.getMetrics().updateReadRequestCount(); 288 } 289 290 // If the size limit was reached it means a partial Result is being returned. Returning a 291 // partial Result means that we should not reset the filters; filters should only be reset in 292 // between rows 293 if (!scannerContext.mayHaveMoreCellsInRow()) { 294 resetFilters(); 295 } 296 297 if (isFilterDoneInternal()) { 298 moreValues = false; 299 } 300 return moreValues; 301 } 302 303 /** Returns true if more cells exist after this batch, false if scanner is done */ 304 private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext) 305 throws IOException { 306 assert joinedContinuationRow != null; 307 boolean moreValues = 308 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 309 310 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 311 // We are done with this row, reset the continuation. 312 joinedContinuationRow = null; 313 } 314 // As the data is obtained from two independent heaps, we need to 315 // ensure that result list is sorted, because Result relies on that. 316 results.sort(comparator); 317 return moreValues; 318 } 319 320 /** 321 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 322 * reached, or remainingResultSize (if not -1) is reaced 323 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 324 * @return state of last call to {@link KeyValueHeap#next()} 325 */ 326 private boolean populateResult(List<Cell> results, KeyValueHeap heap, 327 ScannerContext scannerContext, Cell currentRowCell) throws IOException { 328 Cell nextKv; 329 boolean moreCellsInRow = false; 330 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 331 // Scanning between column families and thus the scope is between cells 332 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 333 do { 334 // Check for thread interrupt status in case we have been signaled from 335 // #interruptRegionOperation. 336 region.checkInterrupt(); 337 338 // We want to maintain any progress that is made towards the limits while scanning across 339 // different column families. To do this, we toggle the keep progress flag on during calls 340 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 341 scannerContext.setKeepProgress(true); 342 heap.next(results, scannerContext); 343 scannerContext.setKeepProgress(tmpKeepProgress); 344 345 nextKv = heap.peek(); 346 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 347 if (!moreCellsInRow) { 348 incrementCountOfRowsScannedMetric(scannerContext); 349 } 350 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 351 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 352 } else if (scannerContext.checkSizeLimit(limitScope)) { 353 ScannerContext.NextState state = 354 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 355 return scannerContext.setScannerState(state).hasMoreValues(); 356 } else if (scannerContext.checkTimeLimit(limitScope)) { 357 ScannerContext.NextState state = 358 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 359 return scannerContext.setScannerState(state).hasMoreValues(); 360 } 361 } while (moreCellsInRow); 362 return nextKv != null; 363 } 364 365 /** 366 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 367 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 368 * there are more cells to be read in the row. 369 * @return true When there are more cells in the row to be read 370 */ 371 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 372 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 373 } 374 375 /** Returns True if a filter rules the scanner is over, done. */ 376 @Override 377 public synchronized boolean isFilterDone() throws IOException { 378 return isFilterDoneInternal(); 379 } 380 381 private boolean isFilterDoneInternal() throws IOException { 382 return this.filter != null && this.filter.filterAllRemaining(); 383 } 384 385 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 386 if (rpcCall.isPresent()) { 387 // If a user specifies a too-restrictive or too-slow scanner, the 388 // client might time out and disconnect while the server side 389 // is still processing the request. We should abort aggressively 390 // in that case. 391 long afterTime = rpcCall.get().disconnectSince(); 392 if (afterTime >= 0) { 393 throw new CallerDisconnectedException( 394 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 395 + " after " + afterTime + " ms, since " + "caller disconnected"); 396 } 397 } 398 } 399 400 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 401 long initialSizeProgress, long initialHeapSizeProgress) { 402 // Starting to scan a new row. Reset the scanner progress according to whether or not 403 // progress should be kept. 404 if (scannerContext.getKeepProgress()) { 405 // Progress should be kept. Reset to initial values seen at start of method invocation. 406 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 407 initialHeapSizeProgress); 408 } else { 409 scannerContext.clearProgress(); 410 } 411 } 412 413 private boolean nextInternal(List<Cell> results, ScannerContext scannerContext) 414 throws IOException { 415 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 416 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 417 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 418 419 // Save the initial progress from the Scanner context in these local variables. The progress 420 // may need to be reset a few times if rows are being filtered out so we save the initial 421 // progress. 422 int initialBatchProgress = scannerContext.getBatchProgress(); 423 long initialSizeProgress = scannerContext.getDataSizeProgress(); 424 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 425 426 // Used to check time limit 427 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 428 429 // The loop here is used only when at some point during the next we determine 430 // that due to effects of filters or otherwise, we have an empty row in the result. 431 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 432 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 433 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 434 while (true) { 435 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 436 initialHeapSizeProgress); 437 checkClientDisconnect(rpcCall); 438 439 // Check for thread interrupt status in case we have been signaled from 440 // #interruptRegionOperation. 441 region.checkInterrupt(); 442 443 // Let's see what we have in the storeHeap. 444 Cell current = this.storeHeap.peek(); 445 446 boolean shouldStop = shouldStop(current); 447 // When has filter row is true it means that the all the cells for a particular row must be 448 // read before a filtering decision can be made. This means that filters where hasFilterRow 449 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 450 // table that has very large rows. 451 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 452 453 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 454 // would prevent the filters from being evaluated. Thus, if it is true, change the 455 // scope of any limits that could potentially create partial results to 456 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 457 if (hasFilterRow) { 458 if (LOG.isTraceEnabled()) { 459 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 460 + " formed. Changing scope of limits that may create partials"); 461 } 462 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 463 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 464 limitScope = LimitScope.BETWEEN_ROWS; 465 } 466 467 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 468 if (hasFilterRow) { 469 throw new IncompatibleFilterException( 470 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 471 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 472 } 473 return true; 474 } 475 476 // Check if we were getting data from the joinedHeap and hit the limit. 477 // If not, then it's main path - getting results from storeHeap. 478 if (joinedContinuationRow == null) { 479 // First, check if we are at a stop row. If so, there are no more results. 480 if (shouldStop) { 481 if (hasFilterRow) { 482 filter.filterRowCells(results); 483 } 484 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 485 } 486 487 // Check if rowkey filter wants to exclude this row. If so, loop to next. 488 // Technically, if we hit limits before on this row, we don't need this call. 489 if (filterRowKey(current)) { 490 incrementCountOfRowsFilteredMetric(scannerContext); 491 // early check, see HBASE-16296 492 if (isFilterDoneInternal()) { 493 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 494 } 495 // Typically the count of rows scanned is incremented inside #populateResult. However, 496 // here we are filtering a row based purely on its row key, preventing us from calling 497 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 498 incrementCountOfRowsScannedMetric(scannerContext); 499 boolean moreRows = nextRow(scannerContext, current); 500 if (!moreRows) { 501 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 502 } 503 results.clear(); 504 505 // Read nothing as the rowkey was filtered, but still need to check time limit 506 // We also check size limit because we might have read blocks in getting to this point. 507 if (scannerContext.checkAnyLimitReached(limitScope)) { 508 return true; 509 } 510 continue; 511 } 512 513 // Ok, we are good, let's try to get some results from the main heap. 514 populateResult(results, this.storeHeap, scannerContext, current); 515 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 516 if (hasFilterRow) { 517 throw new IncompatibleFilterException( 518 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 519 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 520 } 521 return true; 522 } 523 524 // Check for thread interrupt status in case we have been signaled from 525 // #interruptRegionOperation. 526 region.checkInterrupt(); 527 528 Cell nextKv = this.storeHeap.peek(); 529 shouldStop = shouldStop(nextKv); 530 // save that the row was empty before filters applied to it. 531 final boolean isEmptyRow = results.isEmpty(); 532 533 // We have the part of the row necessary for filtering (all of it, usually). 534 // First filter with the filterRow(List). 535 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 536 if (hasFilterRow) { 537 ret = filter.filterRowCellsWithRet(results); 538 539 // We don't know how the results have changed after being filtered. Must set progress 540 // according to contents of results now. 541 if (scannerContext.getKeepProgress()) { 542 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 543 initialHeapSizeProgress); 544 } else { 545 scannerContext.clearProgress(); 546 } 547 scannerContext.incrementBatchProgress(results.size()); 548 for (Cell cell : results) { 549 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 550 cell.heapSize()); 551 } 552 } 553 554 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 555 incrementCountOfRowsFilteredMetric(scannerContext); 556 results.clear(); 557 boolean moreRows = nextRow(scannerContext, current); 558 if (!moreRows) { 559 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 560 } 561 562 // This row was totally filtered out, if this is NOT the last row, 563 // we should continue on. Otherwise, nothing else to do. 564 if (!shouldStop) { 565 // Read nothing as the cells was filtered, but still need to check time limit. 566 // We also check size limit because we might have read blocks in getting to this point. 567 if (scannerContext.checkAnyLimitReached(limitScope)) { 568 return true; 569 } 570 continue; 571 } 572 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 573 } 574 575 // Ok, we are done with storeHeap for this row. 576 // Now we may need to fetch additional, non-essential data into row. 577 // These values are not needed for filter to work, so we postpone their 578 // fetch to (possibly) reduce amount of data loads from disk. 579 if (this.joinedHeap != null) { 580 boolean mayHaveData = joinedHeapMayHaveData(current); 581 if (mayHaveData) { 582 joinedContinuationRow = current; 583 populateFromJoinedHeap(results, scannerContext); 584 585 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 586 return true; 587 } 588 } 589 } 590 } else { 591 // Populating from the joined heap was stopped by limits, populate some more. 592 populateFromJoinedHeap(results, scannerContext); 593 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 594 return true; 595 } 596 } 597 // We may have just called populateFromJoinedMap and hit the limits. If that is 598 // the case, we need to call it again on the next next() invocation. 599 if (joinedContinuationRow != null) { 600 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 601 } 602 603 // Finally, we are done with both joinedHeap and storeHeap. 604 // Double check to prevent empty rows from appearing in result. It could be 605 // the case when SingleColumnValueExcludeFilter is used. 606 if (results.isEmpty()) { 607 incrementCountOfRowsFilteredMetric(scannerContext); 608 boolean moreRows = nextRow(scannerContext, current); 609 if (!moreRows) { 610 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 611 } 612 if (!shouldStop) { 613 // We check size limit because we might have read blocks in the nextRow call above, or 614 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 615 // and for those scans which filter row _cells_ this is the only place we can actually 616 // enforce that the scan does not exceed limits since it bypasses all other checks above. 617 if (scannerContext.checkSizeLimit(limitScope)) { 618 return true; 619 } 620 continue; 621 } 622 } 623 624 if (shouldStop) { 625 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 626 } else { 627 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 628 } 629 } 630 } 631 632 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 633 region.filteredReadRequestsCount.increment(); 634 if (region.getMetrics() != null) { 635 region.getMetrics().updateFilteredRecords(); 636 } 637 638 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 639 return; 640 } 641 642 scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); 643 } 644 645 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 646 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 647 return; 648 } 649 650 scannerContext.getMetrics().countOfRowsScanned.incrementAndGet(); 651 } 652 653 /** Returns true when the joined heap may have data for the current row */ 654 private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException { 655 Cell nextJoinedKv = joinedHeap.peek(); 656 boolean matchCurrentRow = 657 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 658 boolean matchAfterSeek = false; 659 660 // If the next value in the joined heap does not match the current row, try to seek to the 661 // correct row 662 if (!matchCurrentRow) { 663 Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 664 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 665 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 666 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 667 } 668 669 return matchCurrentRow || matchAfterSeek; 670 } 671 672 /** 673 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 674 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 675 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 676 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 677 * filterRow() will be skipped. 678 */ 679 private boolean filterRow() throws IOException { 680 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 681 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 682 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 683 } 684 685 private boolean filterRowKey(Cell current) throws IOException { 686 return filter != null && filter.filterRowKey(current); 687 } 688 689 /** 690 * A mocked list implementation - discards all updates. 691 */ 692 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 693 694 @Override 695 public void add(int index, Cell element) { 696 // do nothing 697 } 698 699 @Override 700 public boolean addAll(int index, Collection<? extends Cell> c) { 701 return false; // this list is never changed as a result of an update 702 } 703 704 @Override 705 public KeyValue get(int index) { 706 throw new UnsupportedOperationException(); 707 } 708 709 @Override 710 public int size() { 711 return 0; 712 } 713 }; 714 715 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 716 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 717 718 // Enable skipping row mode, which disables limits and skips tracking progress for all 719 // but block size. We keep tracking block size because skipping a row in this way 720 // might involve reading blocks along the way. 721 scannerContext.setSkippingRow(true); 722 723 Cell next; 724 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 725 // Check for thread interrupt status in case we have been signaled from 726 // #interruptRegionOperation. 727 region.checkInterrupt(); 728 this.storeHeap.next(MOCKED_LIST, scannerContext); 729 } 730 731 scannerContext.setSkippingRow(false); 732 resetFilters(); 733 734 // Calling the hook in CP which allows it to do a fast forward 735 return this.region.getCoprocessorHost() == null 736 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 737 } 738 739 protected boolean shouldStop(Cell currentRowCell) { 740 if (currentRowCell == null) { 741 return true; 742 } 743 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 744 return false; 745 } 746 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 747 return c > 0 || (c == 0 && !includeStopRow); 748 } 749 750 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 751 justification = "this method is only called inside close which is synchronized") 752 private void closeInternal() { 753 if (storeHeap != null) { 754 storeHeap.close(); 755 storeHeap = null; 756 } 757 if (joinedHeap != null) { 758 joinedHeap.close(); 759 joinedHeap = null; 760 } 761 // no need to synchronize here. 762 scannerReadPoints.remove(this); 763 this.filterClosed = true; 764 } 765 766 @Override 767 public synchronized void close() { 768 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 769 } 770 771 @Override 772 public synchronized boolean reseek(byte[] row) throws IOException { 773 return TraceUtil.trace(() -> { 774 if (row == null) { 775 throw new IllegalArgumentException("Row cannot be null."); 776 } 777 boolean result = false; 778 region.startRegionOperation(); 779 Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 780 try { 781 // use request seek to make use of the lazy seek option. See HBASE-5520 782 result = this.storeHeap.requestSeek(kv, true, true); 783 if (this.joinedHeap != null) { 784 result = this.joinedHeap.requestSeek(kv, true, true) || result; 785 } 786 } finally { 787 region.closeRegionOperation(); 788 } 789 return result; 790 }, () -> region.createRegionSpan("RegionScanner.reseek")); 791 } 792 793 @Override 794 public void shipped() throws IOException { 795 if (storeHeap != null) { 796 storeHeap.shipped(); 797 } 798 if (joinedHeap != null) { 799 joinedHeap.shipped(); 800 } 801 } 802 803 @Override 804 public void run() throws IOException { 805 // This is the RPC callback method executed. We do the close in of the scanner in this 806 // callback 807 this.close(); 808 } 809}