001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableSet; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.UnknownScannerException; 041import org.apache.hadoop.hbase.client.ClientInternalHelper; 042import org.apache.hadoop.hbase.client.IsolationLevel; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 046import org.apache.hadoop.hbase.filter.FilterWrapper; 047import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 048import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 049import org.apache.hadoop.hbase.ipc.RpcCall; 050import org.apache.hadoop.hbase.ipc.RpcCallback; 051import org.apache.hadoop.hbase.ipc.RpcServer; 052import org.apache.hadoop.hbase.regionserver.Region.Operation; 053import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 054import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 055import org.apache.hadoop.hbase.trace.TraceUtil; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 062 063/** 064 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 065 */ 066@InterfaceAudience.Private 067public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 068 069 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 070 071 // Package local for testability 072 KeyValueHeap storeHeap = null; 073 074 /** 075 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 076 * if on-demand column family loading is enabled. 077 */ 078 KeyValueHeap joinedHeap = null; 079 080 /** 081 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 082 * for which we are populating the values. 083 */ 084 protected ExtendedCell joinedContinuationRow = null; 085 private boolean filterClosed = false; 086 087 protected final byte[] stopRow; 088 protected final boolean includeStopRow; 089 protected final HRegion region; 090 protected final CellComparator comparator; 091 092 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 093 094 private final long readPt; 095 private final long maxResultSize; 096 private final ScannerContext defaultScannerContext; 097 private final FilterWrapper filter; 098 private final String operationId; 099 100 private RegionServerServices rsServices; 101 102 private final Set<Path> filesRead = new HashSet<>(); 103 104 @Override 105 public RegionInfo getRegionInfo() { 106 return region.getRegionInfo(); 107 } 108 109 private static boolean hasNonce(HRegion region, long nonce) { 110 RegionServerServices rsServices = region.getRegionServerServices(); 111 return nonce != HConstants.NO_NONCE && rsServices != null 112 && rsServices.getNonceManager() != null; 113 } 114 115 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 116 long nonceGroup, long nonce) throws IOException { 117 this.region = region; 118 this.maxResultSize = scan.getMaxResultSize(); 119 if (scan.hasFilter()) { 120 this.filter = new FilterWrapper(scan.getFilter()); 121 } else { 122 this.filter = null; 123 } 124 this.comparator = region.getCellComparator(); 125 /** 126 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 127 * scanner context that can be used to enforce the batch limit in the event that a 128 * ScannerContext is not specified during an invocation of next/nextRaw 129 */ 130 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 131 this.stopRow = scan.getStopRow(); 132 this.includeStopRow = scan.includeStopRow(); 133 this.operationId = scan.getId(); 134 135 // synchronize on scannerReadPoints so that nobody calculates 136 // getSmallestReadPoint, before scannerReadPoints is updated. 137 IsolationLevel isolationLevel = scan.getIsolationLevel(); 138 long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan); 139 this.scannerReadPoints = region.scannerReadPoints; 140 this.rsServices = region.getRegionServerServices(); 141 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 142 try { 143 if (mvccReadPoint > 0) { 144 this.readPt = mvccReadPoint; 145 } else if (hasNonce(region, nonce)) { 146 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 147 } else { 148 this.readPt = region.getReadPoint(isolationLevel); 149 } 150 scannerReadPoints.put(this, this.readPt); 151 } finally { 152 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 153 } 154 initializeScanners(scan, additionalScanners); 155 } 156 157 public ScannerContext getContext() { 158 return defaultScannerContext; 159 } 160 161 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 162 throws IOException { 163 // Here we separate all scanners into two lists - scanner that provide data required 164 // by the filter to operate (scanners list) and all others (joinedScanners list). 165 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 166 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 167 // Store all already instantiated scanners for exception handling 168 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 169 // handle additionalScanners 170 if (additionalScanners != null && !additionalScanners.isEmpty()) { 171 scanners.addAll(additionalScanners); 172 instantiatedScanners.addAll(additionalScanners); 173 } 174 175 try { 176 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 177 HStore store = region.getStore(entry.getKey()); 178 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 179 instantiatedScanners.add(scanner); 180 if ( 181 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 182 || this.filter.isFamilyEssential(entry.getKey()) 183 ) { 184 scanners.add(scanner); 185 } else { 186 joinedScanners.add(scanner); 187 } 188 } 189 initializeKVHeap(scanners, joinedScanners, region); 190 } catch (Throwable t) { 191 throw handleException(instantiatedScanners, t); 192 } 193 } 194 195 protected void initializeKVHeap(List<KeyValueScanner> scanners, 196 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 197 this.storeHeap = new KeyValueHeap(scanners, comparator); 198 if (!joinedScanners.isEmpty()) { 199 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 200 } 201 } 202 203 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 204 // remove scaner read point before throw the exception 205 scannerReadPoints.remove(this); 206 if (storeHeap != null) { 207 storeHeap.close(); 208 storeHeap = null; 209 if (joinedHeap != null) { 210 joinedHeap.close(); 211 joinedHeap = null; 212 } 213 } else { 214 // close all already instantiated scanners before throwing the exception 215 for (KeyValueScanner scanner : instantiatedScanners) { 216 scanner.close(); 217 } 218 } 219 return t instanceof IOException ? (IOException) t : new IOException(t); 220 } 221 222 @Override 223 public long getMaxResultSize() { 224 return maxResultSize; 225 } 226 227 @Override 228 public long getMvccReadPoint() { 229 return this.readPt; 230 } 231 232 @Override 233 public int getBatch() { 234 return this.defaultScannerContext.getBatchLimit(); 235 } 236 237 @Override 238 public String getOperationId() { 239 return operationId; 240 } 241 242 /** 243 * Reset both the filter and the old filter. 244 * @throws IOException in case a filter raises an I/O exception. 245 */ 246 protected final void resetFilters() throws IOException { 247 if (filter != null) { 248 filter.reset(); 249 } 250 } 251 252 @Override 253 public boolean next(List<? super ExtendedCell> outResults) throws IOException { 254 // apply the batching limit by default 255 return next(outResults, defaultScannerContext); 256 } 257 258 @Override 259 public synchronized boolean next(List<? super ExtendedCell> outResults, 260 ScannerContext scannerContext) throws IOException { 261 if (this.filterClosed) { 262 throw new UnknownScannerException("Scanner was closed (timed out?) " 263 + "after we renewed it. Could be caused by a very slow scanner " 264 + "or a lengthy garbage collection"); 265 } 266 region.startRegionOperation(Operation.SCAN); 267 try { 268 return nextRaw(outResults, scannerContext); 269 } finally { 270 region.closeRegionOperation(Operation.SCAN); 271 } 272 } 273 274 @Override 275 public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException { 276 // Use the RegionScanner's context by default 277 return nextRaw(outResults, defaultScannerContext); 278 } 279 280 @Override 281 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext) 282 throws IOException { 283 if (storeHeap == null) { 284 // scanner is closed 285 throw new UnknownScannerException("Scanner was closed"); 286 } 287 boolean moreValues = false; 288 if (outResults.isEmpty()) { 289 // Usually outResults is empty. This is true when next is called 290 // to handle scan or get operation. 291 moreValues = nextInternal(outResults, scannerContext); 292 } else { 293 List<ExtendedCell> tmpList = new ArrayList<>(); 294 moreValues = nextInternal(tmpList, scannerContext); 295 outResults.addAll(tmpList); 296 } 297 region.addReadRequestsCount(1); 298 if (region.getMetrics() != null) { 299 region.getMetrics().updateReadRequestCount(); 300 } 301 302 // If the size limit was reached it means a partial Result is being returned. Returning a 303 // partial Result means that we should not reset the filters; filters should only be reset in 304 // between rows 305 if (!scannerContext.mayHaveMoreCellsInRow()) { 306 resetFilters(); 307 } 308 309 if (isFilterDoneInternal()) { 310 moreValues = false; 311 } 312 return moreValues; 313 } 314 315 /** Returns true if more cells exist after this batch, false if scanner is done */ 316 private boolean populateFromJoinedHeap(List<? super ExtendedCell> results, 317 ScannerContext scannerContext) throws IOException { 318 assert joinedContinuationRow != null; 319 boolean moreValues = 320 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 321 322 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 323 // We are done with this row, reset the continuation. 324 joinedContinuationRow = null; 325 } 326 // As the data is obtained from two independent heaps, we need to 327 // ensure that result list is sorted, because Result relies on that. 328 ((List<Cell>) results).sort(comparator); 329 return moreValues; 330 } 331 332 /** 333 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 334 * reached, or remainingResultSize (if not -1) is reaced 335 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 336 * @return state of last call to {@link KeyValueHeap#next()} 337 */ 338 private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap, 339 ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException { 340 Cell nextKv; 341 boolean moreCellsInRow = false; 342 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 343 // Scanning between column families and thus the scope is between cells 344 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 345 do { 346 // Check for thread interrupt status in case we have been signaled from 347 // #interruptRegionOperation. 348 region.checkInterrupt(); 349 350 // We want to maintain any progress that is made towards the limits while scanning across 351 // different column families. To do this, we toggle the keep progress flag on during calls 352 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 353 scannerContext.setKeepProgress(true); 354 heap.next(results, scannerContext); 355 scannerContext.setKeepProgress(tmpKeepProgress); 356 357 nextKv = heap.peek(); 358 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 359 if (!moreCellsInRow) { 360 incrementCountOfRowsScannedMetric(scannerContext); 361 } 362 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 363 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 364 } else if (scannerContext.checkSizeLimit(limitScope)) { 365 ScannerContext.NextState state = 366 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 367 return scannerContext.setScannerState(state).hasMoreValues(); 368 } else if (scannerContext.checkTimeLimit(limitScope)) { 369 ScannerContext.NextState state = 370 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 371 return scannerContext.setScannerState(state).hasMoreValues(); 372 } 373 } while (moreCellsInRow); 374 return nextKv != null; 375 } 376 377 /** 378 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 379 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 380 * there are more cells to be read in the row. 381 * @return true When there are more cells in the row to be read 382 */ 383 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 384 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 385 } 386 387 /** Returns True if a filter rules the scanner is over, done. */ 388 @Override 389 public synchronized boolean isFilterDone() throws IOException { 390 return isFilterDoneInternal(); 391 } 392 393 private boolean isFilterDoneInternal() throws IOException { 394 return this.filter != null && this.filter.filterAllRemaining(); 395 } 396 397 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 398 if (rpcCall.isPresent()) { 399 // If a user specifies a too-restrictive or too-slow scanner, the 400 // client might time out and disconnect while the server side 401 // is still processing the request. We should abort aggressively 402 // in that case. 403 long afterTime = rpcCall.get().disconnectSince(); 404 if (afterTime >= 0) { 405 throw new CallerDisconnectedException( 406 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 407 + " after " + afterTime + " ms, since " + "caller disconnected"); 408 } 409 } 410 } 411 412 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 413 long initialSizeProgress, long initialHeapSizeProgress) { 414 // Starting to scan a new row. Reset the scanner progress according to whether or not 415 // progress should be kept. 416 if (scannerContext.getKeepProgress()) { 417 // Progress should be kept. Reset to initial values seen at start of method invocation. 418 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 419 initialHeapSizeProgress); 420 } else { 421 scannerContext.clearProgress(); 422 } 423 } 424 425 private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext) 426 throws IOException { 427 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 428 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 429 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 430 431 // Save the initial progress from the Scanner context in these local variables. The progress 432 // may need to be reset a few times if rows are being filtered out so we save the initial 433 // progress. 434 int initialBatchProgress = scannerContext.getBatchProgress(); 435 long initialSizeProgress = scannerContext.getDataSizeProgress(); 436 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 437 438 // Used to check time limit 439 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 440 441 // The loop here is used only when at some point during the next we determine 442 // that due to effects of filters or otherwise, we have an empty row in the result. 443 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 444 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 445 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 446 while (true) { 447 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 448 initialHeapSizeProgress); 449 checkClientDisconnect(rpcCall); 450 451 // Check for thread interrupt status in case we have been signaled from 452 // #interruptRegionOperation. 453 region.checkInterrupt(); 454 455 // Let's see what we have in the storeHeap. 456 ExtendedCell current = this.storeHeap.peek(); 457 458 boolean shouldStop = shouldStop(current); 459 // When has filter row is true it means that the all the cells for a particular row must be 460 // read before a filtering decision can be made. This means that filters where hasFilterRow 461 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 462 // table that has very large rows. 463 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 464 465 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 466 // would prevent the filters from being evaluated. Thus, if it is true, change the 467 // scope of any limits that could potentially create partial results to 468 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 469 if (hasFilterRow) { 470 if (LOG.isTraceEnabled()) { 471 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 472 + " formed. Changing scope of limits that may create partials"); 473 } 474 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 475 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 476 limitScope = LimitScope.BETWEEN_ROWS; 477 } 478 479 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 480 if (hasFilterRow) { 481 throw new IncompatibleFilterException( 482 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 483 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 484 } 485 return true; 486 } 487 488 // Check if we were getting data from the joinedHeap and hit the limit. 489 // If not, then it's main path - getting results from storeHeap. 490 if (joinedContinuationRow == null) { 491 // First, check if we are at a stop row. If so, there are no more results. 492 if (shouldStop) { 493 if (hasFilterRow) { 494 filter.filterRowCells((List<Cell>) results); 495 } 496 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 497 } 498 499 // Check if rowkey filter wants to exclude this row. If so, loop to next. 500 // Technically, if we hit limits before on this row, we don't need this call. 501 if (filterRowKey(current)) { 502 incrementCountOfRowsFilteredMetric(scannerContext); 503 // early check, see HBASE-16296 504 if (isFilterDoneInternal()) { 505 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 506 } 507 // Typically the count of rows scanned is incremented inside #populateResult. However, 508 // here we are filtering a row based purely on its row key, preventing us from calling 509 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 510 incrementCountOfRowsScannedMetric(scannerContext); 511 boolean moreRows = nextRow(scannerContext, current); 512 if (!moreRows) { 513 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 514 } 515 results.clear(); 516 517 // Read nothing as the rowkey was filtered, but still need to check time limit 518 // We also check size limit because we might have read blocks in getting to this point. 519 if (scannerContext.checkAnyLimitReached(limitScope)) { 520 return true; 521 } 522 continue; 523 } 524 525 // Ok, we are good, let's try to get some results from the main heap. 526 populateResult(results, this.storeHeap, scannerContext, current); 527 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 528 if (hasFilterRow) { 529 throw new IncompatibleFilterException( 530 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 531 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 532 } 533 return true; 534 } 535 536 // Check for thread interrupt status in case we have been signaled from 537 // #interruptRegionOperation. 538 region.checkInterrupt(); 539 540 Cell nextKv = this.storeHeap.peek(); 541 shouldStop = shouldStop(nextKv); 542 // save that the row was empty before filters applied to it. 543 final boolean isEmptyRow = results.isEmpty(); 544 545 // We have the part of the row necessary for filtering (all of it, usually). 546 // First filter with the filterRow(List). 547 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 548 if (hasFilterRow) { 549 ret = filter.filterRowCellsWithRet((List<Cell>) results); 550 551 // We don't know how the results have changed after being filtered. Must set progress 552 // according to contents of results now. 553 if (scannerContext.getKeepProgress()) { 554 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 555 initialHeapSizeProgress); 556 } else { 557 scannerContext.clearProgress(); 558 } 559 scannerContext.incrementBatchProgress(results.size()); 560 for (ExtendedCell cell : (List<ExtendedCell>) results) { 561 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 562 cell.heapSize()); 563 } 564 } 565 566 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 567 incrementCountOfRowsFilteredMetric(scannerContext); 568 results.clear(); 569 boolean moreRows = nextRow(scannerContext, current); 570 if (!moreRows) { 571 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 572 } 573 574 // This row was totally filtered out, if this is NOT the last row, 575 // we should continue on. Otherwise, nothing else to do. 576 if (!shouldStop) { 577 // Read nothing as the cells was filtered, but still need to check time limit. 578 // We also check size limit because we might have read blocks in getting to this point. 579 if (scannerContext.checkAnyLimitReached(limitScope)) { 580 return true; 581 } 582 continue; 583 } 584 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 585 } 586 587 // Ok, we are done with storeHeap for this row. 588 // Now we may need to fetch additional, non-essential data into row. 589 // These values are not needed for filter to work, so we postpone their 590 // fetch to (possibly) reduce amount of data loads from disk. 591 if (this.joinedHeap != null) { 592 boolean mayHaveData = joinedHeapMayHaveData(current); 593 if (mayHaveData) { 594 joinedContinuationRow = current; 595 populateFromJoinedHeap(results, scannerContext); 596 597 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 598 return true; 599 } 600 } 601 } 602 } else { 603 // Populating from the joined heap was stopped by limits, populate some more. 604 populateFromJoinedHeap(results, scannerContext); 605 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 606 return true; 607 } 608 } 609 // We may have just called populateFromJoinedMap and hit the limits. If that is 610 // the case, we need to call it again on the next next() invocation. 611 if (joinedContinuationRow != null) { 612 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 613 } 614 615 // Finally, we are done with both joinedHeap and storeHeap. 616 // Double check to prevent empty rows from appearing in result. It could be 617 // the case when SingleColumnValueExcludeFilter is used. 618 if (results.isEmpty()) { 619 incrementCountOfRowsFilteredMetric(scannerContext); 620 boolean moreRows = nextRow(scannerContext, current); 621 if (!moreRows) { 622 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 623 } 624 if (!shouldStop) { 625 // We check size limit because we might have read blocks in the nextRow call above, or 626 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 627 // and for those scans which filter row _cells_ this is the only place we can actually 628 // enforce that the scan does not exceed limits since it bypasses all other checks above. 629 if (scannerContext.checkSizeLimit(limitScope)) { 630 return true; 631 } 632 continue; 633 } 634 } 635 636 if (shouldStop) { 637 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 638 } else { 639 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 640 } 641 } 642 } 643 644 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 645 region.filteredReadRequestsCount.increment(); 646 if (region.getMetrics() != null) { 647 region.getMetrics().updateFilteredRecords(); 648 } 649 650 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 651 return; 652 } 653 654 scannerContext.getMetrics() 655 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1); 656 } 657 658 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 659 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 660 return; 661 } 662 663 scannerContext.getMetrics() 664 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1); 665 } 666 667 /** Returns true when the joined heap may have data for the current row */ 668 private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException { 669 Cell nextJoinedKv = joinedHeap.peek(); 670 boolean matchCurrentRow = 671 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 672 boolean matchAfterSeek = false; 673 674 // If the next value in the joined heap does not match the current row, try to seek to the 675 // correct row 676 if (!matchCurrentRow) { 677 ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 678 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 679 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 680 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 681 } 682 683 return matchCurrentRow || matchAfterSeek; 684 } 685 686 /** 687 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 688 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 689 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 690 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 691 * filterRow() will be skipped. 692 */ 693 private boolean filterRow() throws IOException { 694 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 695 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 696 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 697 } 698 699 private boolean filterRowKey(Cell current) throws IOException { 700 return filter != null && filter.filterRowKey(current); 701 } 702 703 /** 704 * A mocked list implementation - discards all updates. 705 */ 706 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 707 708 @Override 709 public void add(int index, Cell element) { 710 // do nothing 711 } 712 713 @Override 714 public boolean addAll(int index, Collection<? extends Cell> c) { 715 return false; // this list is never changed as a result of an update 716 } 717 718 @Override 719 public KeyValue get(int index) { 720 throw new UnsupportedOperationException(); 721 } 722 723 @Override 724 public int size() { 725 return 0; 726 } 727 }; 728 729 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 730 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 731 732 // Enable skipping row mode, which disables limits and skips tracking progress for all 733 // but block size. We keep tracking block size because skipping a row in this way 734 // might involve reading blocks along the way. 735 scannerContext.setSkippingRow(true); 736 737 Cell next; 738 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 739 // Check for thread interrupt status in case we have been signaled from 740 // #interruptRegionOperation. 741 region.checkInterrupt(); 742 this.storeHeap.next(MOCKED_LIST, scannerContext); 743 } 744 745 scannerContext.setSkippingRow(false); 746 resetFilters(); 747 748 // Calling the hook in CP which allows it to do a fast forward 749 return this.region.getCoprocessorHost() == null 750 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 751 } 752 753 protected boolean shouldStop(Cell currentRowCell) { 754 if (currentRowCell == null) { 755 return true; 756 } 757 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 758 return false; 759 } 760 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 761 return c > 0 || (c == 0 && !includeStopRow); 762 } 763 764 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 765 justification = "this method is only called inside close which is synchronized") 766 private void closeInternal() { 767 if (storeHeap != null) { 768 storeHeap.close(); 769 filesRead.addAll(storeHeap.getFilesRead()); 770 storeHeap = null; 771 } 772 if (joinedHeap != null) { 773 joinedHeap.close(); 774 filesRead.addAll(joinedHeap.getFilesRead()); 775 joinedHeap = null; 776 } 777 // no need to synchronize here. 778 scannerReadPoints.remove(this); 779 this.filterClosed = true; 780 } 781 782 @Override 783 public synchronized void close() { 784 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 785 } 786 787 /** 788 * Returns the set of store file paths that were successfully read by this scanner. Populated at 789 * close from the underlying store heap and joined heap (if any). 790 */ 791 @Override 792 public Set<Path> getFilesRead() { 793 return Collections.unmodifiableSet(filesRead); 794 } 795 796 @Override 797 public synchronized boolean reseek(byte[] row) throws IOException { 798 return TraceUtil.trace(() -> { 799 if (row == null) { 800 throw new IllegalArgumentException("Row cannot be null."); 801 } 802 boolean result = false; 803 region.startRegionOperation(); 804 ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 805 try { 806 // use request seek to make use of the lazy seek option. See HBASE-5520 807 result = this.storeHeap.requestSeek(kv, true, true); 808 if (this.joinedHeap != null) { 809 result = this.joinedHeap.requestSeek(kv, true, true) || result; 810 } 811 } finally { 812 region.closeRegionOperation(); 813 } 814 return result; 815 }, () -> region.createRegionSpan("RegionScanner.reseek")); 816 } 817 818 @Override 819 public void shipped() throws IOException { 820 if (storeHeap != null) { 821 storeHeap.shipped(); 822 } 823 if (joinedHeap != null) { 824 joinedHeap.shipped(); 825 } 826 } 827 828 @Override 829 public void run() throws IOException { 830 // This is the RPC callback method executed. We do the close in of the scanner in this 831 // callback 832 this.close(); 833 } 834}