001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.UnknownScannerException; 036import org.apache.hadoop.hbase.client.IsolationLevel; 037import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.filter.FilterWrapper; 041import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 042import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 043import org.apache.hadoop.hbase.ipc.RpcCall; 044import org.apache.hadoop.hbase.ipc.RpcCallback; 045import org.apache.hadoop.hbase.ipc.RpcServer; 046import org.apache.hadoop.hbase.regionserver.Region.Operation; 047import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 048import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 049import org.apache.hadoop.hbase.trace.TraceUtil; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 056 057/** 058 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 059 */ 060@InterfaceAudience.Private 061class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 062 063 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 064 065 // Package local for testability 066 KeyValueHeap storeHeap = null; 067 068 /** 069 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 070 * if on-demand column family loading is enabled. 071 */ 072 KeyValueHeap joinedHeap = null; 073 074 /** 075 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 076 * for which we are populating the values. 077 */ 078 protected Cell joinedContinuationRow = null; 079 private boolean filterClosed = false; 080 081 protected final byte[] stopRow; 082 protected final boolean includeStopRow; 083 protected final HRegion region; 084 protected final CellComparator comparator; 085 086 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 087 088 private final long readPt; 089 private final long maxResultSize; 090 private final ScannerContext defaultScannerContext; 091 private final FilterWrapper filter; 092 private final String operationId; 093 094 private RegionServerServices rsServices; 095 096 @Override 097 public RegionInfo getRegionInfo() { 098 return region.getRegionInfo(); 099 } 100 101 private static boolean hasNonce(HRegion region, long nonce) { 102 RegionServerServices rsServices = region.getRegionServerServices(); 103 return nonce != HConstants.NO_NONCE && rsServices != null 104 && rsServices.getNonceManager() != null; 105 } 106 107 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 108 long nonceGroup, long nonce) throws IOException { 109 this.region = region; 110 this.maxResultSize = scan.getMaxResultSize(); 111 if (scan.hasFilter()) { 112 this.filter = new FilterWrapper(scan.getFilter()); 113 } else { 114 this.filter = null; 115 } 116 this.comparator = region.getCellComparator(); 117 /** 118 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 119 * scanner context that can be used to enforce the batch limit in the event that a 120 * ScannerContext is not specified during an invocation of next/nextRaw 121 */ 122 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 123 this.stopRow = scan.getStopRow(); 124 this.includeStopRow = scan.includeStopRow(); 125 this.operationId = scan.getId(); 126 127 // synchronize on scannerReadPoints so that nobody calculates 128 // getSmallestReadPoint, before scannerReadPoints is updated. 129 IsolationLevel isolationLevel = scan.getIsolationLevel(); 130 long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); 131 this.scannerReadPoints = region.scannerReadPoints; 132 this.rsServices = region.getRegionServerServices(); 133 synchronized (scannerReadPoints) { 134 if (mvccReadPoint > 0) { 135 this.readPt = mvccReadPoint; 136 } else if (hasNonce(region, nonce)) { 137 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 138 } else { 139 this.readPt = region.getReadPoint(isolationLevel); 140 } 141 scannerReadPoints.put(this, this.readPt); 142 } 143 initializeScanners(scan, additionalScanners); 144 } 145 146 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 147 throws IOException { 148 // Here we separate all scanners into two lists - scanner that provide data required 149 // by the filter to operate (scanners list) and all others (joinedScanners list). 150 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 151 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 152 // Store all already instantiated scanners for exception handling 153 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 154 // handle additionalScanners 155 if (additionalScanners != null && !additionalScanners.isEmpty()) { 156 scanners.addAll(additionalScanners); 157 instantiatedScanners.addAll(additionalScanners); 158 } 159 160 try { 161 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 162 HStore store = region.getStore(entry.getKey()); 163 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 164 instantiatedScanners.add(scanner); 165 if ( 166 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 167 || this.filter.isFamilyEssential(entry.getKey()) 168 ) { 169 scanners.add(scanner); 170 } else { 171 joinedScanners.add(scanner); 172 } 173 } 174 initializeKVHeap(scanners, joinedScanners, region); 175 } catch (Throwable t) { 176 throw handleException(instantiatedScanners, t); 177 } 178 } 179 180 protected void initializeKVHeap(List<KeyValueScanner> scanners, 181 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 182 this.storeHeap = new KeyValueHeap(scanners, comparator); 183 if (!joinedScanners.isEmpty()) { 184 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 185 } 186 } 187 188 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 189 // remove scaner read point before throw the exception 190 scannerReadPoints.remove(this); 191 if (storeHeap != null) { 192 storeHeap.close(); 193 storeHeap = null; 194 if (joinedHeap != null) { 195 joinedHeap.close(); 196 joinedHeap = null; 197 } 198 } else { 199 // close all already instantiated scanners before throwing the exception 200 for (KeyValueScanner scanner : instantiatedScanners) { 201 scanner.close(); 202 } 203 } 204 return t instanceof IOException ? (IOException) t : new IOException(t); 205 } 206 207 @Override 208 public long getMaxResultSize() { 209 return maxResultSize; 210 } 211 212 @Override 213 public long getMvccReadPoint() { 214 return this.readPt; 215 } 216 217 @Override 218 public int getBatch() { 219 return this.defaultScannerContext.getBatchLimit(); 220 } 221 222 @Override 223 public String getOperationId() { 224 return operationId; 225 } 226 227 /** 228 * Reset both the filter and the old filter. 229 * @throws IOException in case a filter raises an I/O exception. 230 */ 231 protected final void resetFilters() throws IOException { 232 if (filter != null) { 233 filter.reset(); 234 } 235 } 236 237 @Override 238 public boolean next(List<Cell> outResults) throws IOException { 239 // apply the batching limit by default 240 return next(outResults, defaultScannerContext); 241 } 242 243 @Override 244 public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) 245 throws IOException { 246 if (this.filterClosed) { 247 throw new UnknownScannerException("Scanner was closed (timed out?) " 248 + "after we renewed it. Could be caused by a very slow scanner " 249 + "or a lengthy garbage collection"); 250 } 251 region.startRegionOperation(Operation.SCAN); 252 try { 253 return nextRaw(outResults, scannerContext); 254 } finally { 255 region.closeRegionOperation(Operation.SCAN); 256 } 257 } 258 259 @Override 260 public boolean nextRaw(List<Cell> outResults) throws IOException { 261 // Use the RegionScanner's context by default 262 return nextRaw(outResults, defaultScannerContext); 263 } 264 265 @Override 266 public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException { 267 if (storeHeap == null) { 268 // scanner is closed 269 throw new UnknownScannerException("Scanner was closed"); 270 } 271 boolean moreValues = false; 272 if (outResults.isEmpty()) { 273 // Usually outResults is empty. This is true when next is called 274 // to handle scan or get operation. 275 moreValues = nextInternal(outResults, scannerContext); 276 } else { 277 List<Cell> tmpList = new ArrayList<>(); 278 moreValues = nextInternal(tmpList, scannerContext); 279 outResults.addAll(tmpList); 280 } 281 282 region.addReadRequestsCount(1); 283 if (region.getMetrics() != null) { 284 region.getMetrics().updateReadRequestCount(); 285 } 286 287 // If the size limit was reached it means a partial Result is being returned. Returning a 288 // partial Result means that we should not reset the filters; filters should only be reset in 289 // between rows 290 if (!scannerContext.mayHaveMoreCellsInRow()) { 291 resetFilters(); 292 } 293 294 if (isFilterDoneInternal()) { 295 moreValues = false; 296 } 297 return moreValues; 298 } 299 300 /** Returns true if more cells exist after this batch, false if scanner is done */ 301 private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext) 302 throws IOException { 303 assert joinedContinuationRow != null; 304 boolean moreValues = 305 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 306 307 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 308 // We are done with this row, reset the continuation. 309 joinedContinuationRow = null; 310 } 311 // As the data is obtained from two independent heaps, we need to 312 // ensure that result list is sorted, because Result relies on that. 313 results.sort(comparator); 314 return moreValues; 315 } 316 317 /** 318 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 319 * reached, or remainingResultSize (if not -1) is reaced 320 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 321 * @return state of last call to {@link KeyValueHeap#next()} 322 */ 323 private boolean populateResult(List<Cell> results, KeyValueHeap heap, 324 ScannerContext scannerContext, Cell currentRowCell) throws IOException { 325 Cell nextKv; 326 boolean moreCellsInRow = false; 327 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 328 // Scanning between column families and thus the scope is between cells 329 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 330 do { 331 // Check for thread interrupt status in case we have been signaled from 332 // #interruptRegionOperation. 333 region.checkInterrupt(); 334 335 // We want to maintain any progress that is made towards the limits while scanning across 336 // different column families. To do this, we toggle the keep progress flag on during calls 337 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 338 scannerContext.setKeepProgress(true); 339 heap.next(results, scannerContext); 340 scannerContext.setKeepProgress(tmpKeepProgress); 341 342 nextKv = heap.peek(); 343 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 344 if (!moreCellsInRow) { 345 incrementCountOfRowsScannedMetric(scannerContext); 346 } 347 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 348 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 349 } else if (scannerContext.checkSizeLimit(limitScope)) { 350 ScannerContext.NextState state = 351 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 352 return scannerContext.setScannerState(state).hasMoreValues(); 353 } else if (scannerContext.checkTimeLimit(limitScope)) { 354 ScannerContext.NextState state = 355 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 356 return scannerContext.setScannerState(state).hasMoreValues(); 357 } 358 } while (moreCellsInRow); 359 return nextKv != null; 360 } 361 362 /** 363 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 364 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 365 * there are more cells to be read in the row. 366 * @return true When there are more cells in the row to be read 367 */ 368 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 369 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 370 } 371 372 /** Returns True if a filter rules the scanner is over, done. */ 373 @Override 374 public synchronized boolean isFilterDone() throws IOException { 375 return isFilterDoneInternal(); 376 } 377 378 private boolean isFilterDoneInternal() throws IOException { 379 return this.filter != null && this.filter.filterAllRemaining(); 380 } 381 382 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 383 if (rpcCall.isPresent()) { 384 // If a user specifies a too-restrictive or too-slow scanner, the 385 // client might time out and disconnect while the server side 386 // is still processing the request. We should abort aggressively 387 // in that case. 388 long afterTime = rpcCall.get().disconnectSince(); 389 if (afterTime >= 0) { 390 throw new CallerDisconnectedException( 391 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 392 + " after " + afterTime + " ms, since " + "caller disconnected"); 393 } 394 } 395 } 396 397 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 398 long initialSizeProgress, long initialHeapSizeProgress) { 399 // Starting to scan a new row. Reset the scanner progress according to whether or not 400 // progress should be kept. 401 if (scannerContext.getKeepProgress()) { 402 // Progress should be kept. Reset to initial values seen at start of method invocation. 403 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 404 initialHeapSizeProgress); 405 } else { 406 scannerContext.clearProgress(); 407 } 408 } 409 410 private boolean nextInternal(List<Cell> results, ScannerContext scannerContext) 411 throws IOException { 412 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 413 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 414 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 415 416 // Save the initial progress from the Scanner context in these local variables. The progress 417 // may need to be reset a few times if rows are being filtered out so we save the initial 418 // progress. 419 int initialBatchProgress = scannerContext.getBatchProgress(); 420 long initialSizeProgress = scannerContext.getDataSizeProgress(); 421 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 422 423 // Used to check time limit 424 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 425 426 // The loop here is used only when at some point during the next we determine 427 // that due to effects of filters or otherwise, we have an empty row in the result. 428 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 429 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 430 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 431 while (true) { 432 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 433 initialHeapSizeProgress); 434 checkClientDisconnect(rpcCall); 435 436 // Check for thread interrupt status in case we have been signaled from 437 // #interruptRegionOperation. 438 region.checkInterrupt(); 439 440 // Let's see what we have in the storeHeap. 441 Cell current = this.storeHeap.peek(); 442 443 boolean shouldStop = shouldStop(current); 444 // When has filter row is true it means that the all the cells for a particular row must be 445 // read before a filtering decision can be made. This means that filters where hasFilterRow 446 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 447 // table that has very large rows. 448 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 449 450 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 451 // would prevent the filters from being evaluated. Thus, if it is true, change the 452 // scope of any limits that could potentially create partial results to 453 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 454 if (hasFilterRow) { 455 if (LOG.isTraceEnabled()) { 456 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 457 + " formed. Changing scope of limits that may create partials"); 458 } 459 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 460 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 461 limitScope = LimitScope.BETWEEN_ROWS; 462 } 463 464 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 465 if (hasFilterRow) { 466 throw new IncompatibleFilterException( 467 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 468 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 469 } 470 return true; 471 } 472 473 // Check if we were getting data from the joinedHeap and hit the limit. 474 // If not, then it's main path - getting results from storeHeap. 475 if (joinedContinuationRow == null) { 476 // First, check if we are at a stop row. If so, there are no more results. 477 if (shouldStop) { 478 if (hasFilterRow) { 479 filter.filterRowCells(results); 480 } 481 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 482 } 483 484 // Check if rowkey filter wants to exclude this row. If so, loop to next. 485 // Technically, if we hit limits before on this row, we don't need this call. 486 if (filterRowKey(current)) { 487 incrementCountOfRowsFilteredMetric(scannerContext); 488 // early check, see HBASE-16296 489 if (isFilterDoneInternal()) { 490 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 491 } 492 // Typically the count of rows scanned is incremented inside #populateResult. However, 493 // here we are filtering a row based purely on its row key, preventing us from calling 494 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 495 incrementCountOfRowsScannedMetric(scannerContext); 496 boolean moreRows = nextRow(scannerContext, current); 497 if (!moreRows) { 498 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 499 } 500 results.clear(); 501 502 // Read nothing as the rowkey was filtered, but still need to check time limit 503 if (scannerContext.checkTimeLimit(limitScope)) { 504 return true; 505 } 506 continue; 507 } 508 509 // Ok, we are good, let's try to get some results from the main heap. 510 populateResult(results, this.storeHeap, scannerContext, current); 511 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 512 if (hasFilterRow) { 513 throw new IncompatibleFilterException( 514 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 515 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 516 } 517 return true; 518 } 519 520 // Check for thread interrupt status in case we have been signaled from 521 // #interruptRegionOperation. 522 region.checkInterrupt(); 523 524 Cell nextKv = this.storeHeap.peek(); 525 shouldStop = shouldStop(nextKv); 526 // save that the row was empty before filters applied to it. 527 final boolean isEmptyRow = results.isEmpty(); 528 529 // We have the part of the row necessary for filtering (all of it, usually). 530 // First filter with the filterRow(List). 531 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 532 if (hasFilterRow) { 533 ret = filter.filterRowCellsWithRet(results); 534 535 // We don't know how the results have changed after being filtered. Must set progress 536 // according to contents of results now. 537 if (scannerContext.getKeepProgress()) { 538 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 539 initialHeapSizeProgress); 540 } else { 541 scannerContext.clearProgress(); 542 } 543 scannerContext.incrementBatchProgress(results.size()); 544 for (Cell cell : results) { 545 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 546 cell.heapSize()); 547 } 548 } 549 550 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 551 incrementCountOfRowsFilteredMetric(scannerContext); 552 results.clear(); 553 boolean moreRows = nextRow(scannerContext, current); 554 if (!moreRows) { 555 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 556 } 557 558 // This row was totally filtered out, if this is NOT the last row, 559 // we should continue on. Otherwise, nothing else to do. 560 if (!shouldStop) { 561 // Read nothing as the cells was filtered, but still need to check time limit 562 if (scannerContext.checkTimeLimit(limitScope)) { 563 return true; 564 } 565 continue; 566 } 567 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 568 } 569 570 // Ok, we are done with storeHeap for this row. 571 // Now we may need to fetch additional, non-essential data into row. 572 // These values are not needed for filter to work, so we postpone their 573 // fetch to (possibly) reduce amount of data loads from disk. 574 if (this.joinedHeap != null) { 575 boolean mayHaveData = joinedHeapMayHaveData(current); 576 if (mayHaveData) { 577 joinedContinuationRow = current; 578 populateFromJoinedHeap(results, scannerContext); 579 580 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 581 return true; 582 } 583 } 584 } 585 } else { 586 // Populating from the joined heap was stopped by limits, populate some more. 587 populateFromJoinedHeap(results, scannerContext); 588 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 589 return true; 590 } 591 } 592 // We may have just called populateFromJoinedMap and hit the limits. If that is 593 // the case, we need to call it again on the next next() invocation. 594 if (joinedContinuationRow != null) { 595 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 596 } 597 598 // Finally, we are done with both joinedHeap and storeHeap. 599 // Double check to prevent empty rows from appearing in result. It could be 600 // the case when SingleColumnValueExcludeFilter is used. 601 if (results.isEmpty()) { 602 incrementCountOfRowsFilteredMetric(scannerContext); 603 boolean moreRows = nextRow(scannerContext, current); 604 if (!moreRows) { 605 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 606 } 607 if (!shouldStop) { 608 continue; 609 } 610 } 611 612 if (shouldStop) { 613 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 614 } else { 615 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 616 } 617 } 618 } 619 620 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 621 region.filteredReadRequestsCount.increment(); 622 if (region.getMetrics() != null) { 623 region.getMetrics().updateFilteredRecords(); 624 } 625 626 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 627 return; 628 } 629 630 scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); 631 } 632 633 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 634 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 635 return; 636 } 637 638 scannerContext.getMetrics().countOfRowsScanned.incrementAndGet(); 639 } 640 641 /** Returns true when the joined heap may have data for the current row */ 642 private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException { 643 Cell nextJoinedKv = joinedHeap.peek(); 644 boolean matchCurrentRow = 645 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 646 boolean matchAfterSeek = false; 647 648 // If the next value in the joined heap does not match the current row, try to seek to the 649 // correct row 650 if (!matchCurrentRow) { 651 Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 652 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 653 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 654 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 655 } 656 657 return matchCurrentRow || matchAfterSeek; 658 } 659 660 /** 661 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 662 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 663 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 664 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 665 * filterRow() will be skipped. 666 */ 667 private boolean filterRow() throws IOException { 668 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 669 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 670 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 671 } 672 673 private boolean filterRowKey(Cell current) throws IOException { 674 return filter != null && filter.filterRowKey(current); 675 } 676 677 /** 678 * A mocked list implementation - discards all updates. 679 */ 680 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 681 682 @Override 683 public void add(int index, Cell element) { 684 // do nothing 685 } 686 687 @Override 688 public boolean addAll(int index, Collection<? extends Cell> c) { 689 return false; // this list is never changed as a result of an update 690 } 691 692 @Override 693 public KeyValue get(int index) { 694 throw new UnsupportedOperationException(); 695 } 696 697 @Override 698 public int size() { 699 return 0; 700 } 701 }; 702 703 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 704 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 705 Cell next; 706 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 707 // Check for thread interrupt status in case we have been signaled from 708 // #interruptRegionOperation. 709 region.checkInterrupt(); 710 this.storeHeap.next(MOCKED_LIST); 711 } 712 resetFilters(); 713 714 // Calling the hook in CP which allows it to do a fast forward 715 return this.region.getCoprocessorHost() == null 716 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 717 } 718 719 protected boolean shouldStop(Cell currentRowCell) { 720 if (currentRowCell == null) { 721 return true; 722 } 723 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 724 return false; 725 } 726 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 727 return c > 0 || (c == 0 && !includeStopRow); 728 } 729 730 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 731 justification = "this method is only called inside close which is synchronized") 732 private void closeInternal() { 733 if (storeHeap != null) { 734 storeHeap.close(); 735 storeHeap = null; 736 } 737 if (joinedHeap != null) { 738 joinedHeap.close(); 739 joinedHeap = null; 740 } 741 // no need to synchronize here. 742 scannerReadPoints.remove(this); 743 this.filterClosed = true; 744 } 745 746 @Override 747 public synchronized void close() { 748 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 749 } 750 751 @Override 752 public synchronized boolean reseek(byte[] row) throws IOException { 753 return TraceUtil.trace(() -> { 754 if (row == null) { 755 throw new IllegalArgumentException("Row cannot be null."); 756 } 757 boolean result = false; 758 region.startRegionOperation(); 759 Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 760 try { 761 // use request seek to make use of the lazy seek option. See HBASE-5520 762 result = this.storeHeap.requestSeek(kv, true, true); 763 if (this.joinedHeap != null) { 764 result = this.joinedHeap.requestSeek(kv, true, true) || result; 765 } 766 } finally { 767 region.closeRegionOperation(); 768 } 769 return result; 770 }, () -> region.createRegionSpan("RegionScanner.reseek")); 771 } 772 773 @Override 774 public void shipped() throws IOException { 775 if (storeHeap != null) { 776 storeHeap.shipped(); 777 } 778 if (joinedHeap != null) { 779 joinedHeap.shipped(); 780 } 781 } 782 783 @Override 784 public void run() throws IOException { 785 // This is the RPC callback method executed. We do the close in of the scanner in this 786 // callback 787 this.close(); 788 } 789}