001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.UnknownScannerException; 037import org.apache.hadoop.hbase.client.ClientInternalHelper; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 042import org.apache.hadoop.hbase.filter.FilterWrapper; 043import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 044import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 045import org.apache.hadoop.hbase.ipc.RpcCall; 046import org.apache.hadoop.hbase.ipc.RpcCallback; 047import org.apache.hadoop.hbase.ipc.RpcServer; 048import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 049import org.apache.hadoop.hbase.regionserver.Region.Operation; 050import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 051import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 052import org.apache.hadoop.hbase.trace.TraceUtil; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 059 060/** 061 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 062 */ 063@InterfaceAudience.Private 064public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 065 066 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 067 068 // Package local for testability 069 KeyValueHeap storeHeap = null; 070 071 /** 072 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 073 * if on-demand column family loading is enabled. 074 */ 075 KeyValueHeap joinedHeap = null; 076 077 /** 078 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 079 * for which we are populating the values. 080 */ 081 protected ExtendedCell joinedContinuationRow = null; 082 private boolean filterClosed = false; 083 084 protected final byte[] stopRow; 085 protected final boolean includeStopRow; 086 protected final HRegion region; 087 protected final CellComparator comparator; 088 089 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 090 091 private final long readPt; 092 private final long maxResultSize; 093 private final ScannerContext defaultScannerContext; 094 private final FilterWrapper filter; 095 private final String operationId; 096 097 private RegionServerServices rsServices; 098 099 private ServerSideScanMetrics scannerInitMetrics = null; 100 101 @Override 102 public RegionInfo getRegionInfo() { 103 return region.getRegionInfo(); 104 } 105 106 private static boolean hasNonce(HRegion region, long nonce) { 107 RegionServerServices rsServices = region.getRegionServerServices(); 108 return nonce != HConstants.NO_NONCE && rsServices != null 109 && rsServices.getNonceManager() != null; 110 } 111 112 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 113 long nonceGroup, long nonce) throws IOException { 114 this.region = region; 115 this.maxResultSize = scan.getMaxResultSize(); 116 if (scan.hasFilter()) { 117 this.filter = new FilterWrapper(scan.getFilter()); 118 } else { 119 this.filter = null; 120 } 121 this.comparator = region.getCellComparator(); 122 /** 123 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 124 * scanner context that can be used to enforce the batch limit in the event that a 125 * ScannerContext is not specified during an invocation of next/nextRaw 126 */ 127 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 128 this.stopRow = scan.getStopRow(); 129 this.includeStopRow = scan.includeStopRow(); 130 this.operationId = scan.getId(); 131 132 // synchronize on scannerReadPoints so that nobody calculates 133 // getSmallestReadPoint, before scannerReadPoints is updated. 134 IsolationLevel isolationLevel = scan.getIsolationLevel(); 135 long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan); 136 this.scannerReadPoints = region.scannerReadPoints; 137 this.rsServices = region.getRegionServerServices(); 138 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 139 try { 140 if (mvccReadPoint > 0) { 141 this.readPt = mvccReadPoint; 142 } else if (hasNonce(region, nonce)) { 143 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 144 } else { 145 this.readPt = region.getReadPoint(isolationLevel); 146 } 147 scannerReadPoints.put(this, this.readPt); 148 } finally { 149 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 150 } 151 boolean isScanMetricsEnabled = scan.isScanMetricsEnabled(); 152 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled); 153 if (isScanMetricsEnabled) { 154 this.scannerInitMetrics = new ServerSideScanMetrics(); 155 ThreadLocalServerSideScanMetrics.reset(); 156 } 157 initializeScanners(scan, additionalScanners); 158 if (isScanMetricsEnabled) { 159 ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scannerInitMetrics); 160 } 161 } 162 163 public ScannerContext getContext() { 164 return defaultScannerContext; 165 } 166 167 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 168 throws IOException { 169 // Here we separate all scanners into two lists - scanner that provide data required 170 // by the filter to operate (scanners list) and all others (joinedScanners list). 171 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 172 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 173 // Store all already instantiated scanners for exception handling 174 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 175 // handle additionalScanners 176 if (additionalScanners != null && !additionalScanners.isEmpty()) { 177 scanners.addAll(additionalScanners); 178 instantiatedScanners.addAll(additionalScanners); 179 } 180 181 try { 182 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 183 HStore store = region.getStore(entry.getKey()); 184 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 185 instantiatedScanners.add(scanner); 186 if ( 187 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 188 || this.filter.isFamilyEssential(entry.getKey()) 189 ) { 190 scanners.add(scanner); 191 } else { 192 joinedScanners.add(scanner); 193 } 194 } 195 initializeKVHeap(scanners, joinedScanners, region); 196 } catch (Throwable t) { 197 throw handleException(instantiatedScanners, t); 198 } 199 } 200 201 protected void initializeKVHeap(List<KeyValueScanner> scanners, 202 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 203 this.storeHeap = new KeyValueHeap(scanners, comparator); 204 if (!joinedScanners.isEmpty()) { 205 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 206 } 207 } 208 209 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 210 // remove scaner read point before throw the exception 211 scannerReadPoints.remove(this); 212 if (storeHeap != null) { 213 storeHeap.close(); 214 storeHeap = null; 215 if (joinedHeap != null) { 216 joinedHeap.close(); 217 joinedHeap = null; 218 } 219 } else { 220 // close all already instantiated scanners before throwing the exception 221 for (KeyValueScanner scanner : instantiatedScanners) { 222 scanner.close(); 223 } 224 } 225 return t instanceof IOException ? (IOException) t : new IOException(t); 226 } 227 228 @Override 229 public long getMaxResultSize() { 230 return maxResultSize; 231 } 232 233 @Override 234 public long getMvccReadPoint() { 235 return this.readPt; 236 } 237 238 @Override 239 public int getBatch() { 240 return this.defaultScannerContext.getBatchLimit(); 241 } 242 243 @Override 244 public String getOperationId() { 245 return operationId; 246 } 247 248 /** 249 * Reset both the filter and the old filter. 250 * @throws IOException in case a filter raises an I/O exception. 251 */ 252 protected final void resetFilters() throws IOException { 253 if (filter != null) { 254 filter.reset(); 255 } 256 } 257 258 @Override 259 public boolean next(List<? super ExtendedCell> outResults) throws IOException { 260 // apply the batching limit by default 261 return next(outResults, defaultScannerContext); 262 } 263 264 @Override 265 public synchronized boolean next(List<? super ExtendedCell> outResults, 266 ScannerContext scannerContext) throws IOException { 267 if (this.filterClosed) { 268 throw new UnknownScannerException("Scanner was closed (timed out?) " 269 + "after we renewed it. Could be caused by a very slow scanner " 270 + "or a lengthy garbage collection"); 271 } 272 region.startRegionOperation(Operation.SCAN); 273 try { 274 return nextRaw(outResults, scannerContext); 275 } finally { 276 region.closeRegionOperation(Operation.SCAN); 277 } 278 } 279 280 @Override 281 public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException { 282 // Use the RegionScanner's context by default 283 return nextRaw(outResults, defaultScannerContext); 284 } 285 286 @Override 287 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext) 288 throws IOException { 289 if (storeHeap == null) { 290 // scanner is closed 291 throw new UnknownScannerException("Scanner was closed"); 292 } 293 boolean moreValues = false; 294 boolean isScanMetricsEnabled = scannerContext.isTrackingMetrics(); 295 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled); 296 if (isScanMetricsEnabled) { 297 ThreadLocalServerSideScanMetrics.reset(); 298 ServerSideScanMetrics scanMetrics = scannerContext.getMetrics(); 299 if (scannerInitMetrics != null) { 300 scannerInitMetrics.getMetricsMap().forEach(scanMetrics::addToCounter); 301 scannerInitMetrics = null; 302 } 303 } 304 if (outResults.isEmpty()) { 305 // Usually outResults is empty. This is true when next is called 306 // to handle scan or get operation. 307 moreValues = nextInternal(outResults, scannerContext); 308 } else { 309 List<ExtendedCell> tmpList = new ArrayList<>(); 310 moreValues = nextInternal(tmpList, scannerContext); 311 outResults.addAll(tmpList); 312 } 313 if (isScanMetricsEnabled) { 314 ServerSideScanMetrics scanMetrics = scannerContext.getMetrics(); 315 ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics); 316 } 317 region.addReadRequestsCount(1); 318 if (region.getMetrics() != null) { 319 region.getMetrics().updateReadRequestCount(); 320 } 321 322 // If the size limit was reached it means a partial Result is being returned. Returning a 323 // partial Result means that we should not reset the filters; filters should only be reset in 324 // between rows 325 if (!scannerContext.mayHaveMoreCellsInRow()) { 326 resetFilters(); 327 } 328 329 if (isFilterDoneInternal()) { 330 moreValues = false; 331 } 332 return moreValues; 333 } 334 335 /** Returns true if more cells exist after this batch, false if scanner is done */ 336 private boolean populateFromJoinedHeap(List<? super ExtendedCell> results, 337 ScannerContext scannerContext) throws IOException { 338 assert joinedContinuationRow != null; 339 boolean moreValues = 340 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 341 342 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 343 // We are done with this row, reset the continuation. 344 joinedContinuationRow = null; 345 } 346 // As the data is obtained from two independent heaps, we need to 347 // ensure that result list is sorted, because Result relies on that. 348 ((List<Cell>) results).sort(comparator); 349 return moreValues; 350 } 351 352 /** 353 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 354 * reached, or remainingResultSize (if not -1) is reaced 355 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 356 * @return state of last call to {@link KeyValueHeap#next()} 357 */ 358 private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap, 359 ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException { 360 Cell nextKv; 361 boolean moreCellsInRow = false; 362 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 363 // Scanning between column families and thus the scope is between cells 364 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 365 do { 366 // Check for thread interrupt status in case we have been signaled from 367 // #interruptRegionOperation. 368 region.checkInterrupt(); 369 370 // We want to maintain any progress that is made towards the limits while scanning across 371 // different column families. To do this, we toggle the keep progress flag on during calls 372 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 373 scannerContext.setKeepProgress(true); 374 heap.next(results, scannerContext); 375 scannerContext.setKeepProgress(tmpKeepProgress); 376 377 nextKv = heap.peek(); 378 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 379 if (!moreCellsInRow) { 380 incrementCountOfRowsScannedMetric(scannerContext); 381 } 382 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 383 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 384 } else if (scannerContext.checkSizeLimit(limitScope)) { 385 ScannerContext.NextState state = 386 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 387 return scannerContext.setScannerState(state).hasMoreValues(); 388 } else if (scannerContext.checkTimeLimit(limitScope)) { 389 ScannerContext.NextState state = 390 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 391 return scannerContext.setScannerState(state).hasMoreValues(); 392 } 393 } while (moreCellsInRow); 394 return nextKv != null; 395 } 396 397 /** 398 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 399 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 400 * there are more cells to be read in the row. 401 * @return true When there are more cells in the row to be read 402 */ 403 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 404 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 405 } 406 407 /** Returns True if a filter rules the scanner is over, done. */ 408 @Override 409 public synchronized boolean isFilterDone() throws IOException { 410 return isFilterDoneInternal(); 411 } 412 413 private boolean isFilterDoneInternal() throws IOException { 414 return this.filter != null && this.filter.filterAllRemaining(); 415 } 416 417 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 418 if (rpcCall.isPresent()) { 419 // If a user specifies a too-restrictive or too-slow scanner, the 420 // client might time out and disconnect while the server side 421 // is still processing the request. We should abort aggressively 422 // in that case. 423 long afterTime = rpcCall.get().disconnectSince(); 424 if (afterTime >= 0) { 425 throw new CallerDisconnectedException( 426 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 427 + " after " + afterTime + " ms, since " + "caller disconnected"); 428 } 429 } 430 } 431 432 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 433 long initialSizeProgress, long initialHeapSizeProgress) { 434 // Starting to scan a new row. Reset the scanner progress according to whether or not 435 // progress should be kept. 436 if (scannerContext.getKeepProgress()) { 437 // Progress should be kept. Reset to initial values seen at start of method invocation. 438 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 439 initialHeapSizeProgress); 440 } else { 441 scannerContext.clearProgress(); 442 } 443 } 444 445 private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext) 446 throws IOException { 447 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 448 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 449 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 450 451 // Save the initial progress from the Scanner context in these local variables. The progress 452 // may need to be reset a few times if rows are being filtered out so we save the initial 453 // progress. 454 int initialBatchProgress = scannerContext.getBatchProgress(); 455 long initialSizeProgress = scannerContext.getDataSizeProgress(); 456 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 457 458 // Used to check time limit 459 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 460 461 // The loop here is used only when at some point during the next we determine 462 // that due to effects of filters or otherwise, we have an empty row in the result. 463 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 464 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 465 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 466 while (true) { 467 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 468 initialHeapSizeProgress); 469 checkClientDisconnect(rpcCall); 470 471 // Check for thread interrupt status in case we have been signaled from 472 // #interruptRegionOperation. 473 region.checkInterrupt(); 474 475 // Let's see what we have in the storeHeap. 476 ExtendedCell current = this.storeHeap.peek(); 477 478 boolean shouldStop = shouldStop(current); 479 // When has filter row is true it means that the all the cells for a particular row must be 480 // read before a filtering decision can be made. This means that filters where hasFilterRow 481 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 482 // table that has very large rows. 483 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 484 485 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 486 // would prevent the filters from being evaluated. Thus, if it is true, change the 487 // scope of any limits that could potentially create partial results to 488 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 489 if (hasFilterRow) { 490 if (LOG.isTraceEnabled()) { 491 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 492 + " formed. Changing scope of limits that may create partials"); 493 } 494 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 495 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 496 limitScope = LimitScope.BETWEEN_ROWS; 497 } 498 499 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 500 if (hasFilterRow) { 501 throw new IncompatibleFilterException( 502 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 503 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 504 } 505 return true; 506 } 507 508 // Check if we were getting data from the joinedHeap and hit the limit. 509 // If not, then it's main path - getting results from storeHeap. 510 if (joinedContinuationRow == null) { 511 // First, check if we are at a stop row. If so, there are no more results. 512 if (shouldStop) { 513 if (hasFilterRow) { 514 filter.filterRowCells((List<Cell>) results); 515 } 516 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 517 } 518 519 // Check if rowkey filter wants to exclude this row. If so, loop to next. 520 // Technically, if we hit limits before on this row, we don't need this call. 521 if (filterRowKey(current)) { 522 incrementCountOfRowsFilteredMetric(scannerContext); 523 // early check, see HBASE-16296 524 if (isFilterDoneInternal()) { 525 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 526 } 527 // Typically the count of rows scanned is incremented inside #populateResult. However, 528 // here we are filtering a row based purely on its row key, preventing us from calling 529 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 530 incrementCountOfRowsScannedMetric(scannerContext); 531 boolean moreRows = nextRow(scannerContext, current); 532 if (!moreRows) { 533 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 534 } 535 results.clear(); 536 537 // Read nothing as the rowkey was filtered, but still need to check time limit 538 // We also check size limit because we might have read blocks in getting to this point. 539 if (scannerContext.checkAnyLimitReached(limitScope)) { 540 return true; 541 } 542 continue; 543 } 544 545 // Ok, we are good, let's try to get some results from the main heap. 546 populateResult(results, this.storeHeap, scannerContext, current); 547 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 548 if (hasFilterRow) { 549 throw new IncompatibleFilterException( 550 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 551 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 552 } 553 return true; 554 } 555 556 // Check for thread interrupt status in case we have been signaled from 557 // #interruptRegionOperation. 558 region.checkInterrupt(); 559 560 Cell nextKv = this.storeHeap.peek(); 561 shouldStop = shouldStop(nextKv); 562 // save that the row was empty before filters applied to it. 563 final boolean isEmptyRow = results.isEmpty(); 564 565 // We have the part of the row necessary for filtering (all of it, usually). 566 // First filter with the filterRow(List). 567 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 568 if (hasFilterRow) { 569 ret = filter.filterRowCellsWithRet((List<Cell>) results); 570 571 // We don't know how the results have changed after being filtered. Must set progress 572 // according to contents of results now. 573 if (scannerContext.getKeepProgress()) { 574 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 575 initialHeapSizeProgress); 576 } else { 577 scannerContext.clearProgress(); 578 } 579 scannerContext.incrementBatchProgress(results.size()); 580 for (ExtendedCell cell : (List<ExtendedCell>) results) { 581 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 582 cell.heapSize()); 583 } 584 } 585 586 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 587 incrementCountOfRowsFilteredMetric(scannerContext); 588 results.clear(); 589 boolean moreRows = nextRow(scannerContext, current); 590 if (!moreRows) { 591 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 592 } 593 594 // This row was totally filtered out, if this is NOT the last row, 595 // we should continue on. Otherwise, nothing else to do. 596 if (!shouldStop) { 597 // Read nothing as the cells was filtered, but still need to check time limit. 598 // We also check size limit because we might have read blocks in getting to this point. 599 if (scannerContext.checkAnyLimitReached(limitScope)) { 600 return true; 601 } 602 continue; 603 } 604 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 605 } 606 607 // Ok, we are done with storeHeap for this row. 608 // Now we may need to fetch additional, non-essential data into row. 609 // These values are not needed for filter to work, so we postpone their 610 // fetch to (possibly) reduce amount of data loads from disk. 611 if (this.joinedHeap != null) { 612 boolean mayHaveData = joinedHeapMayHaveData(current); 613 if (mayHaveData) { 614 joinedContinuationRow = current; 615 populateFromJoinedHeap(results, scannerContext); 616 617 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 618 return true; 619 } 620 } 621 } 622 } else { 623 // Populating from the joined heap was stopped by limits, populate some more. 624 populateFromJoinedHeap(results, scannerContext); 625 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 626 return true; 627 } 628 } 629 // We may have just called populateFromJoinedMap and hit the limits. If that is 630 // the case, we need to call it again on the next next() invocation. 631 if (joinedContinuationRow != null) { 632 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 633 } 634 635 // Finally, we are done with both joinedHeap and storeHeap. 636 // Double check to prevent empty rows from appearing in result. It could be 637 // the case when SingleColumnValueExcludeFilter is used. 638 if (results.isEmpty()) { 639 incrementCountOfRowsFilteredMetric(scannerContext); 640 boolean moreRows = nextRow(scannerContext, current); 641 if (!moreRows) { 642 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 643 } 644 if (!shouldStop) { 645 // We check size limit because we might have read blocks in the nextRow call above, or 646 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 647 // and for those scans which filter row _cells_ this is the only place we can actually 648 // enforce that the scan does not exceed limits since it bypasses all other checks above. 649 if (scannerContext.checkSizeLimit(limitScope)) { 650 return true; 651 } 652 continue; 653 } 654 } 655 656 if (shouldStop) { 657 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 658 } else { 659 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 660 } 661 } 662 } 663 664 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 665 region.filteredReadRequestsCount.increment(); 666 if (region.getMetrics() != null) { 667 region.getMetrics().updateFilteredRecords(); 668 } 669 670 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 671 return; 672 } 673 674 scannerContext.getMetrics() 675 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1); 676 } 677 678 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 679 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 680 return; 681 } 682 683 scannerContext.getMetrics() 684 .addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1); 685 } 686 687 /** Returns true when the joined heap may have data for the current row */ 688 private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException { 689 Cell nextJoinedKv = joinedHeap.peek(); 690 boolean matchCurrentRow = 691 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 692 boolean matchAfterSeek = false; 693 694 // If the next value in the joined heap does not match the current row, try to seek to the 695 // correct row 696 if (!matchCurrentRow) { 697 ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 698 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 699 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 700 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 701 } 702 703 return matchCurrentRow || matchAfterSeek; 704 } 705 706 /** 707 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 708 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 709 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 710 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 711 * filterRow() will be skipped. 712 */ 713 private boolean filterRow() throws IOException { 714 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 715 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 716 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 717 } 718 719 private boolean filterRowKey(Cell current) throws IOException { 720 return filter != null && filter.filterRowKey(current); 721 } 722 723 /** 724 * A mocked list implementation - discards all updates. 725 */ 726 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 727 728 @Override 729 public void add(int index, Cell element) { 730 // do nothing 731 } 732 733 @Override 734 public boolean addAll(int index, Collection<? extends Cell> c) { 735 return false; // this list is never changed as a result of an update 736 } 737 738 @Override 739 public KeyValue get(int index) { 740 throw new UnsupportedOperationException(); 741 } 742 743 @Override 744 public int size() { 745 return 0; 746 } 747 }; 748 749 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 750 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 751 752 // Enable skipping row mode, which disables limits and skips tracking progress for all 753 // but block size. We keep tracking block size because skipping a row in this way 754 // might involve reading blocks along the way. 755 scannerContext.setSkippingRow(true); 756 757 Cell next; 758 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 759 // Check for thread interrupt status in case we have been signaled from 760 // #interruptRegionOperation. 761 region.checkInterrupt(); 762 this.storeHeap.next(MOCKED_LIST, scannerContext); 763 } 764 765 scannerContext.setSkippingRow(false); 766 resetFilters(); 767 768 // Calling the hook in CP which allows it to do a fast forward 769 return this.region.getCoprocessorHost() == null 770 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 771 } 772 773 protected boolean shouldStop(Cell currentRowCell) { 774 if (currentRowCell == null) { 775 return true; 776 } 777 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 778 return false; 779 } 780 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 781 return c > 0 || (c == 0 && !includeStopRow); 782 } 783 784 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 785 justification = "this method is only called inside close which is synchronized") 786 private void closeInternal() { 787 if (storeHeap != null) { 788 storeHeap.close(); 789 storeHeap = null; 790 } 791 if (joinedHeap != null) { 792 joinedHeap.close(); 793 joinedHeap = null; 794 } 795 // no need to synchronize here. 796 scannerReadPoints.remove(this); 797 this.filterClosed = true; 798 } 799 800 @Override 801 public synchronized void close() { 802 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 803 } 804 805 @Override 806 public synchronized boolean reseek(byte[] row) throws IOException { 807 return TraceUtil.trace(() -> { 808 if (row == null) { 809 throw new IllegalArgumentException("Row cannot be null."); 810 } 811 boolean result = false; 812 region.startRegionOperation(); 813 ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 814 try { 815 // use request seek to make use of the lazy seek option. See HBASE-5520 816 result = this.storeHeap.requestSeek(kv, true, true); 817 if (this.joinedHeap != null) { 818 result = this.joinedHeap.requestSeek(kv, true, true) || result; 819 } 820 } finally { 821 region.closeRegionOperation(); 822 } 823 return result; 824 }, () -> region.createRegionSpan("RegionScanner.reseek")); 825 } 826 827 @Override 828 public void shipped() throws IOException { 829 if (storeHeap != null) { 830 storeHeap.shipped(); 831 } 832 if (joinedHeap != null) { 833 joinedHeap.shipped(); 834 } 835 } 836 837 @Override 838 public void run() throws IOException { 839 // This is the RPC callback method executed. We do the close in of the scanner in this 840 // callback 841 this.close(); 842 } 843}