001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.NavigableSet; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.locks.ReentrantLock; 028 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.KeyValueUtil; 037import org.apache.hadoop.hbase.client.IsolationLevel; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.executor.ExecutorService; 040import org.apache.hadoop.hbase.filter.Filter; 041import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 042import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 043import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 044import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 045import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 046import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.yetus.audience.InterfaceAudience; 049 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 056 057/** 058 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 059 * for a single row. 060 * <p> 061 * The implementation is not thread safe. So there will be no race between next and close. The only 062 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 063 * is a flush. 064 */ 065@InterfaceAudience.Private 066public class StoreScanner extends NonReversedNonLazyKeyValueScanner 067 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 068 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 069 // In unit tests, the store could be null 070 protected final HStore store; 071 private final CellComparator comparator; 072 private ScanQueryMatcher matcher; 073 protected KeyValueHeap heap; 074 private boolean cacheBlocks; 075 076 private long countPerRow = 0; 077 private int storeLimit = -1; 078 private int storeOffset = 0; 079 080 // Used to indicate that the scanner has closed (see HBASE-1107) 081 private volatile boolean closing = false; 082 private final boolean get; 083 private final boolean explicitColumnQuery; 084 private final boolean useRowColBloom; 085 /** 086 * A flag that enables StoreFileScanner parallel-seeking 087 */ 088 private boolean parallelSeekEnabled = false; 089 private ExecutorService executor; 090 private final Scan scan; 091 private final long oldestUnexpiredTS; 092 private final long now; 093 private final int minVersions; 094 private final long maxRowSize; 095 private final long cellsPerHeartbeatCheck; 096 097 // 1) Collects all the KVHeap that are eagerly getting closed during the 098 // course of a scan 099 // 2) Collects the unused memstore scanners. If we close the memstore scanners 100 // before sending data to client, the chunk may be reclaimed by other 101 // updates and the data will be corrupt. 102 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 103 104 /** 105 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not 106 * KVs skipped via seeking to next row/column. TODO: estimate them? 107 */ 108 private long kvsScanned = 0; 109 private Cell prevCell = null; 110 111 private final long preadMaxBytes; 112 private long bytesRead; 113 114 /** We don't ever expect to change this, the constant is just for clarity. */ 115 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 116 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 117 "hbase.storescanner.parallel.seek.enable"; 118 119 /** Used during unit testing to ensure that lazy seek does save seek ops */ 120 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 121 122 /** 123 * The number of cells scanned in between timeout checks. Specifying a larger value means that 124 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 125 * timeout checks. 126 */ 127 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 128 "hbase.cells.scanned.per.heartbeat.check"; 129 130 /** 131 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 132 */ 133 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 134 135 /** 136 * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 137 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 138 * block size for this store. 139 */ 140 public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; 141 142 private final Scan.ReadType readType; 143 144 // A flag whether use pread for scan 145 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 146 private boolean scanUsePread; 147 // Indicates whether there was flush during the course of the scan 148 private volatile boolean flushed = false; 149 // generally we get one file from a flush 150 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 151 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 152 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 153 // The current list of scanners 154 @VisibleForTesting 155 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 156 // flush update lock 157 private final ReentrantLock flushLock = new ReentrantLock(); 158 // lock for closing. 159 private final ReentrantLock closeLock = new ReentrantLock(); 160 161 protected final long readPt; 162 private boolean topChanged = false; 163 164 /** An internal constructor. */ 165 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, 166 int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) { 167 this.readPt = readPt; 168 this.store = store; 169 this.cacheBlocks = cacheBlocks; 170 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 171 get = scan.isGetScan(); 172 explicitColumnQuery = numColumns > 0; 173 this.scan = scan; 174 this.now = EnvironmentEdgeManager.currentTime(); 175 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 176 this.minVersions = scanInfo.getMinVersions(); 177 178 // We look up row-column Bloom filters for multi-column queries as part of 179 // the seek operation. However, we also look the row-column Bloom filter 180 // for multi-row (non-"get") scans because this is not done in 181 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 182 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1); 183 this.maxRowSize = scanInfo.getTableMaxRowSize(); 184 if (get) { 185 this.readType = Scan.ReadType.PREAD; 186 this.scanUsePread = true; 187 } else if (scanType != ScanType.USER_SCAN) { 188 // For compaction scanners never use Pread as already we have stream based scanners on the 189 // store files to be compacted 190 this.readType = Scan.ReadType.STREAM; 191 this.scanUsePread = false; 192 } else { 193 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 194 this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT; 195 } else { 196 this.readType = scan.getReadType(); 197 } 198 // Always start with pread unless user specific stream. Will change to stream later if 199 // readType is default if the scan keeps running for a long time. 200 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 201 } 202 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 203 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 204 // Parallel seeking is on if the config allows and more there is more than one store file. 205 if (store != null && store.getStorefilesCount() > 1) { 206 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 207 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 208 this.parallelSeekEnabled = true; 209 this.executor = rsService.getExecutorService(); 210 } 211 } 212 } 213 214 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 215 this.currentScanners.addAll(scanners); 216 } 217 218 /** 219 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we 220 * are not in a compaction. 221 * 222 * @param store who we scan 223 * @param scan the spec 224 * @param columns which columns we are scanning 225 * @throws IOException 226 */ 227 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 228 long readPt) throws IOException { 229 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, 230 scan.getCacheBlocks(), ScanType.USER_SCAN); 231 if (columns != null && scan.isRaw()) { 232 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 233 } 234 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 235 store.getCoprocessorHost()); 236 237 store.addChangedReaderObserver(this); 238 239 List<KeyValueScanner> scanners = null; 240 try { 241 // Pass columns to try to filter out unnecessary StoreFiles. 242 scanners = selectScannersFrom(store, 243 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 244 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); 245 246 // Seek all scanners to the start of the Row (or if the exact matching row 247 // key does not exist, then to the start of the next matching Row). 248 // Always check bloom filter to optimize the top row seek for delete 249 // family marker. 250 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 251 parallelSeekEnabled); 252 253 // set storeLimit 254 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 255 256 // set rowOffset 257 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 258 addCurrentScanners(scanners); 259 // Combine all seeked scanners with a heap 260 resetKVHeap(scanners, comparator); 261 } catch (IOException e) { 262 clearAndClose(scanners); 263 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 264 // and might cause memory leak 265 store.deleteChangedReaderObserver(this); 266 throw e; 267 } 268 } 269 270 // a dummy scan instance for compaction. 271 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 272 273 /** 274 * Used for store file compaction and memstore compaction. 275 * <p> 276 * Opens a scanner across specified StoreFiles/MemStoreSegments. 277 * @param store who we scan 278 * @param scanners ancillary scanners 279 * @param smallestReadPoint the readPoint that we should use for tracking versions 280 */ 281 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 282 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 283 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 284 } 285 286 /** 287 * Used for compactions that drop deletes from a limited range of rows. 288 * <p> 289 * Opens a scanner across specified StoreFiles. 290 * @param store who we scan 291 * @param scanners ancillary scanners 292 * @param smallestReadPoint the readPoint that we should use for tracking versions 293 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 294 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 295 */ 296 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 297 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 298 byte[] dropDeletesToRow) throws IOException { 299 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 300 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 301 } 302 303 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 304 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 305 byte[] dropDeletesToRow) throws IOException { 306 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 307 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 308 assert scanType != ScanType.USER_SCAN; 309 matcher = 310 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 311 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 312 313 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 314 scanners = selectScannersFrom(store, scanners); 315 316 // Seek all scanners to the initial key 317 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 318 addCurrentScanners(scanners); 319 // Combine all seeked scanners with a heap 320 resetKVHeap(scanners, comparator); 321 } 322 323 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 324 throws IOException { 325 // Seek all scanners to the initial key 326 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 327 addCurrentScanners(scanners); 328 resetKVHeap(scanners, comparator); 329 } 330 331 // For mob compaction only as we do not have a Store instance when doing mob compaction. 332 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 333 List<? extends KeyValueScanner> scanners) throws IOException { 334 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 335 assert scanType != ScanType.USER_SCAN; 336 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 337 oldestUnexpiredTS, now, null, null, null); 338 seekAllScanner(scanInfo, scanners); 339 } 340 341 // Used to instantiate a scanner for user scan in test 342 @VisibleForTesting 343 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 344 List<? extends KeyValueScanner> scanners) throws IOException { 345 // 0 is passed as readpoint because the test bypasses Store 346 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, 347 scan.getCacheBlocks(), ScanType.USER_SCAN); 348 this.matcher = 349 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 350 seekAllScanner(scanInfo, scanners); 351 } 352 353 // Used to instantiate a scanner for compaction in test 354 @VisibleForTesting 355 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 356 List<? extends KeyValueScanner> scanners) throws IOException { 357 // 0 is passed as readpoint because the test bypasses Store 358 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) 359 : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType); 360 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 361 HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 362 seekAllScanner(scanInfo, scanners); 363 } 364 365 @VisibleForTesting 366 boolean isScanUsePread() { 367 return this.scanUsePread; 368 } 369 /** 370 * Seek the specified scanners with the given key 371 * @param scanners 372 * @param seekKey 373 * @param isLazy true if using lazy seek 374 * @param isParallelSeek true if using parallel seek 375 * @throws IOException 376 */ 377 protected void seekScanners(List<? extends KeyValueScanner> scanners, 378 Cell seekKey, boolean isLazy, boolean isParallelSeek) 379 throws IOException { 380 // Seek all scanners to the start of the Row (or if the exact matching row 381 // key does not exist, then to the start of the next matching Row). 382 // Always check bloom filter to optimize the top row seek for delete 383 // family marker. 384 if (isLazy) { 385 for (KeyValueScanner scanner : scanners) { 386 scanner.requestSeek(seekKey, false, true); 387 } 388 } else { 389 if (!isParallelSeek) { 390 long totalScannersSoughtBytes = 0; 391 for (KeyValueScanner scanner : scanners) { 392 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 393 throw new RowTooBigException("Max row size allowed: " + maxRowSize 394 + ", but row is bigger than that"); 395 } 396 scanner.seek(seekKey); 397 Cell c = scanner.peek(); 398 if (c != null) { 399 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 400 } 401 } 402 } else { 403 parallelSeek(scanners, seekKey); 404 } 405 } 406 } 407 408 @VisibleForTesting 409 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, 410 CellComparator comparator) throws IOException { 411 // Combine all seeked scanners with a heap 412 heap = newKVHeap(scanners, comparator); 413 } 414 415 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 416 CellComparator comparator) throws IOException { 417 return new KeyValueHeap(scanners, comparator); 418 } 419 420 /** 421 * Filters the given list of scanners using Bloom filter, time range, and TTL. 422 * <p> 423 * Will be overridden by testcase so declared as protected. 424 */ 425 @VisibleForTesting 426 protected List<KeyValueScanner> selectScannersFrom(HStore store, 427 List<? extends KeyValueScanner> allScanners) { 428 boolean memOnly; 429 boolean filesOnly; 430 if (scan instanceof InternalScan) { 431 InternalScan iscan = (InternalScan) scan; 432 memOnly = iscan.isCheckOnlyMemStore(); 433 filesOnly = iscan.isCheckOnlyStoreFiles(); 434 } else { 435 memOnly = false; 436 filesOnly = false; 437 } 438 439 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 440 441 // We can only exclude store files based on TTL if minVersions is set to 0. 442 // Otherwise, we might have to return KVs that have technically expired. 443 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 444 445 // include only those scan files which pass all filters 446 for (KeyValueScanner kvs : allScanners) { 447 boolean isFile = kvs.isFileScanner(); 448 if ((!isFile && filesOnly) || (isFile && memOnly)) { 449 continue; 450 } 451 452 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 453 scanners.add(kvs); 454 } else { 455 kvs.close(); 456 } 457 } 458 return scanners; 459 } 460 461 @Override 462 public Cell peek() { 463 return heap != null ? heap.peek() : null; 464 } 465 466 @Override 467 public KeyValue next() { 468 // throw runtime exception perhaps? 469 throw new RuntimeException("Never call StoreScanner.next()"); 470 } 471 472 @Override 473 public void close() { 474 close(true); 475 } 476 477 private void close(boolean withDelayedScannersClose) { 478 closeLock.lock(); 479 // If the closeLock is acquired then any subsequent updateReaders() 480 // call is ignored. 481 try { 482 if (this.closing) { 483 return; 484 } 485 if (withDelayedScannersClose) { 486 this.closing = true; 487 } 488 // For mob compaction, we do not have a store. 489 if (this.store != null) { 490 this.store.deleteChangedReaderObserver(this); 491 } 492 if (withDelayedScannersClose) { 493 clearAndClose(scannersForDelayedClose); 494 clearAndClose(memStoreScannersAfterFlush); 495 clearAndClose(flushedstoreFileScanners); 496 if (this.heap != null) { 497 this.heap.close(); 498 this.currentScanners.clear(); 499 this.heap = null; // CLOSED! 500 } 501 } else { 502 if (this.heap != null) { 503 this.scannersForDelayedClose.add(this.heap); 504 this.currentScanners.clear(); 505 this.heap = null; 506 } 507 } 508 } finally { 509 closeLock.unlock(); 510 } 511 } 512 513 @Override 514 public boolean seek(Cell key) throws IOException { 515 if (checkFlushed()) { 516 reopenAfterFlush(); 517 } 518 return this.heap.seek(key); 519 } 520 521 /** 522 * Get the next row of values from this Store. 523 * @param outResult 524 * @param scannerContext 525 * @return true if there are more rows, false if scanner is done 526 */ 527 @Override 528 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { 529 if (scannerContext == null) { 530 throw new IllegalArgumentException("Scanner context cannot be null"); 531 } 532 if (checkFlushed() && reopenAfterFlush()) { 533 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 534 } 535 536 // if the heap was left null, then the scanners had previously run out anyways, close and 537 // return. 538 if (this.heap == null) { 539 // By this time partial close should happened because already heap is null 540 close(false);// Do all cleanup except heap.close() 541 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 542 } 543 544 Cell cell = this.heap.peek(); 545 if (cell == null) { 546 close(false);// Do all cleanup except heap.close() 547 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 548 } 549 550 // only call setRow if the row changes; avoids confusing the query matcher 551 // if scanning intra-row 552 553 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 554 // rows. Else it is possible we are still traversing the same row so we must perform the row 555 // comparison. 556 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 557 this.countPerRow = 0; 558 matcher.setToNewRow(cell); 559 } 560 561 // Clear progress away unless invoker has indicated it should be kept. 562 if (!scannerContext.getKeepProgress()) { 563 scannerContext.clearProgress(); 564 } 565 566 int count = 0; 567 long totalBytesRead = 0; 568 569 LOOP: do { 570 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 571 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in 572 // the shipped method below. 573 if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread && 574 readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { 575 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 576 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 577 } 578 } 579 // Do object compare - we set prevKV from the same heap. 580 if (prevCell != cell) { 581 ++kvsScanned; 582 } 583 checkScanOrder(prevCell, cell, comparator); 584 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 585 bytesRead += cellSize; 586 if (scanUsePread && readType == Scan.ReadType.DEFAULT && 587 bytesRead > preadMaxBytes) { 588 // return immediately if we want to switch from pread to stream. We need this because we can 589 // only switch in the shipped method, if user use a filter to filter out everything and rpc 590 // timeout is very large then the shipped method will never be called until the whole scan 591 // is finished, but at that time we have already scan all the data... 592 // See HBASE-20457 for more details. 593 // And there is still a scenario that can not be handled. If we have a very large row, which 594 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 595 // here, we still need to scan all the qualifiers before returning... 596 scannerContext.returnImmediately(); 597 } 598 prevCell = cell; 599 scannerContext.setLastPeekedCell(cell); 600 topChanged = false; 601 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 602 switch (qcode) { 603 case INCLUDE: 604 case INCLUDE_AND_SEEK_NEXT_ROW: 605 case INCLUDE_AND_SEEK_NEXT_COL: 606 607 Filter f = matcher.getFilter(); 608 if (f != null) { 609 cell = f.transformCell(cell); 610 } 611 612 this.countPerRow++; 613 if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { 614 // do what SEEK_NEXT_ROW does. 615 if (!matcher.moreRowsMayExistAfter(cell)) { 616 close(false);// Do all cleanup except heap.close() 617 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 618 } 619 matcher.clearCurrentRow(); 620 seekToNextRow(cell); 621 break LOOP; 622 } 623 624 // add to results only if we have skipped #storeOffset kvs 625 // also update metric accordingly 626 if (this.countPerRow > storeOffset) { 627 outResult.add(cell); 628 629 // Update local tracking information 630 count++; 631 totalBytesRead += cellSize; 632 633 // Update the progress of the scanner context 634 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 635 scannerContext.incrementBatchProgress(1); 636 637 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 638 throw new RowTooBigException( 639 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); 640 } 641 } 642 643 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 644 if (!matcher.moreRowsMayExistAfter(cell)) { 645 close(false);// Do all cleanup except heap.close() 646 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 647 } 648 matcher.clearCurrentRow(); 649 seekOrSkipToNextRow(cell); 650 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 651 seekOrSkipToNextColumn(cell); 652 } else { 653 this.heap.next(); 654 } 655 656 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 657 break LOOP; 658 } 659 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 660 break LOOP; 661 } 662 continue; 663 664 case DONE: 665 // Optimization for Gets! If DONE, no more to get on this row, early exit! 666 if (get) { 667 // Then no more to this row... exit. 668 close(false);// Do all cleanup except heap.close() 669 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 670 } 671 matcher.clearCurrentRow(); 672 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 673 674 case DONE_SCAN: 675 close(false);// Do all cleanup except heap.close() 676 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 677 678 case SEEK_NEXT_ROW: 679 // This is just a relatively simple end of scan fix, to short-cut end 680 // us if there is an endKey in the scan. 681 if (!matcher.moreRowsMayExistAfter(cell)) { 682 close(false);// Do all cleanup except heap.close() 683 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 684 } 685 matcher.clearCurrentRow(); 686 seekOrSkipToNextRow(cell); 687 NextState stateAfterSeekNextRow = needToReturn(outResult); 688 if (stateAfterSeekNextRow != null) { 689 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 690 } 691 break; 692 693 case SEEK_NEXT_COL: 694 seekOrSkipToNextColumn(cell); 695 NextState stateAfterSeekNextColumn = needToReturn(outResult); 696 if (stateAfterSeekNextColumn != null) { 697 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 698 } 699 break; 700 701 case SKIP: 702 this.heap.next(); 703 break; 704 705 case SEEK_NEXT_USING_HINT: 706 Cell nextKV = matcher.getNextKeyHint(cell); 707 if (nextKV != null && comparator.compare(nextKV, cell) > 0) { 708 seekAsDirection(nextKV); 709 NextState stateAfterSeekByHint = needToReturn(outResult); 710 if (stateAfterSeekByHint != null) { 711 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 712 } 713 } else { 714 heap.next(); 715 } 716 break; 717 718 default: 719 throw new RuntimeException("UNEXPECTED"); 720 } 721 } while ((cell = this.heap.peek()) != null); 722 723 if (count > 0) { 724 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 725 } 726 727 // No more keys 728 close(false);// Do all cleanup except heap.close() 729 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 730 } 731 732 /** 733 * If the top cell won't be flushed into disk, the new top cell may be 734 * changed after #reopenAfterFlush. Because the older top cell only exist 735 * in the memstore scanner but the memstore scanner is replaced by hfile 736 * scanner after #reopenAfterFlush. If the row of top cell is changed, 737 * we should return the current cells. Otherwise, we may return 738 * the cells across different rows. 739 * @param outResult the cells which are visible for user scan 740 * @return null is the top cell doesn't change. Otherwise, the NextState 741 * to return 742 */ 743 private NextState needToReturn(List<Cell> outResult) { 744 if (!outResult.isEmpty() && topChanged) { 745 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 746 } 747 return null; 748 } 749 750 private void seekOrSkipToNextRow(Cell cell) throws IOException { 751 // If it is a Get Scan, then we know that we are done with this row; there are no more 752 // rows beyond the current one: don't try to optimize. 753 if (!get) { 754 if (trySkipToNextRow(cell)) { 755 return; 756 } 757 } 758 seekToNextRow(cell); 759 } 760 761 private void seekOrSkipToNextColumn(Cell cell) throws IOException { 762 if (!trySkipToNextColumn(cell)) { 763 seekAsDirection(matcher.getKeyForNextColumn(cell)); 764 } 765 } 766 767 /** 768 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 769 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, 770 * or seek to an arbitrary seek key. This method decides whether a seek is the most efficient 771 * _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, 772 * SKIP inside the current, loaded block). 773 * It does this by looking at the next indexed key of the current HFile. This key 774 * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key 775 * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with 776 * the current Cell but compare as though it were a seek key; see down in 777 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the 778 * next block we *_SEEK, otherwise we just SKIP to the next requested cell. 779 * 780 * <p>Other notes: 781 * <ul> 782 * <li>Rows can straddle block boundaries</li> 783 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 784 * different block than column C1 at T2)</li> 785 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a 786 * few SKIPs...</li> 787 * <li>We want to SEEK when the chance is high that we'll be able to seek 788 * past many Cells, especially if we know we need to go to the next block.</li> 789 * </ul> 790 * <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether 791 * we'll likely end up seeking to the next block (or past the next block) to get our next column. 792 * Example: 793 * <pre> 794 * | BLOCK 1 | BLOCK 2 | 795 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 796 * ^ ^ 797 * | | 798 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 799 * 800 * 801 * | BLOCK 1 | BLOCK 2 | 802 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 803 * ^ ^ 804 * | | 805 * Next Index Key SEEK_NEXT_COL 806 * </pre> 807 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 808 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 809 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 810 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 811 * where the SEEK will not land us in the next block, it is very likely better to issues a series 812 * of SKIPs. 813 * @param cell current cell 814 * @return true means skip to next row, false means not 815 */ 816 @VisibleForTesting 817 protected boolean trySkipToNextRow(Cell cell) throws IOException { 818 Cell nextCell = null; 819 // used to guard against a changed next indexed key by doing a identity comparison 820 // when the identity changes we need to compare the bytes again 821 Cell previousIndexedKey = null; 822 do { 823 Cell nextIndexedKey = getNextIndexedKey(); 824 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && 825 (nextIndexedKey == previousIndexedKey || 826 matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) { 827 this.heap.next(); 828 ++kvsScanned; 829 previousIndexedKey = nextIndexedKey; 830 } else { 831 return false; 832 } 833 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 834 return true; 835 } 836 837 /** 838 * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)} 839 * @param cell current cell 840 * @return true means skip to next column, false means not 841 */ 842 @VisibleForTesting 843 protected boolean trySkipToNextColumn(Cell cell) throws IOException { 844 Cell nextCell = null; 845 // used to guard against a changed next indexed key by doing a identity comparison 846 // when the identity changes we need to compare the bytes again 847 Cell previousIndexedKey = null; 848 do { 849 Cell nextIndexedKey = getNextIndexedKey(); 850 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && 851 (nextIndexedKey == previousIndexedKey || 852 matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) { 853 this.heap.next(); 854 ++kvsScanned; 855 previousIndexedKey = nextIndexedKey; 856 } else { 857 return false; 858 } 859 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 860 // We need this check because it may happen that the new scanner that we get 861 // during heap.next() is requiring reseek due of fake KV previously generated for 862 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 863 if (nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) { 864 return false; 865 } 866 return true; 867 } 868 869 @Override 870 public long getReadPoint() { 871 return this.readPt; 872 } 873 874 private static void clearAndClose(List<KeyValueScanner> scanners) { 875 if (scanners == null) { 876 return; 877 } 878 for (KeyValueScanner s : scanners) { 879 s.close(); 880 } 881 scanners.clear(); 882 } 883 884 // Implementation of ChangedReadersObserver 885 @Override 886 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 887 throws IOException { 888 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 889 return; 890 } 891 boolean updateReaders = false; 892 flushLock.lock(); 893 try { 894 if (!closeLock.tryLock()) { 895 // The reason for doing this is that when the current store scanner does not retrieve 896 // any new cells, then the scanner is considered to be done. The heap of this scanner 897 // is not closed till the shipped() call is completed. Hence in that case if at all 898 // the partial close (close (false)) has been called before updateReaders(), there is no 899 // need for the updateReaders() to happen. 900 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 901 // no lock acquired. 902 clearAndClose(memStoreScanners); 903 return; 904 } 905 // lock acquired 906 updateReaders = true; 907 if (this.closing) { 908 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 909 clearAndClose(memStoreScanners); 910 return; 911 } 912 flushed = true; 913 final boolean isCompaction = false; 914 boolean usePread = get || scanUsePread; 915 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 916 // calls next(). So its better we create scanners here rather than next() call. Ensure 917 // these scanners are properly closed() whether or not the scan is completed successfully 918 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 919 // store files. In case of stream scanners this eager creation does not induce performance 920 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 921 List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread, 922 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); 923 flushedstoreFileScanners.addAll(scanners); 924 if (!CollectionUtils.isEmpty(memStoreScanners)) { 925 clearAndClose(memStoreScannersAfterFlush); 926 memStoreScannersAfterFlush.addAll(memStoreScanners); 927 } 928 } finally { 929 flushLock.unlock(); 930 if (updateReaders) { 931 closeLock.unlock(); 932 } 933 } 934 // Let the next() call handle re-creating and seeking 935 } 936 937 /** 938 * @return if top of heap has changed (and KeyValueHeap has to try the next KV) 939 */ 940 protected final boolean reopenAfterFlush() throws IOException { 941 // here we can make sure that we have a Store instance so no null check on store. 942 Cell lastTop = heap.peek(); 943 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 944 // scanners? We did so in the constructor and we could have done it now by storing the scan 945 // object from the constructor 946 List<KeyValueScanner> scanners; 947 flushLock.lock(); 948 try { 949 List<KeyValueScanner> allScanners = 950 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 951 allScanners.addAll(flushedstoreFileScanners); 952 allScanners.addAll(memStoreScannersAfterFlush); 953 scanners = selectScannersFrom(store, allScanners); 954 // Clear the current set of flushed store files scanners so that they don't get added again 955 flushedstoreFileScanners.clear(); 956 memStoreScannersAfterFlush.clear(); 957 } finally { 958 flushLock.unlock(); 959 } 960 961 // Seek the new scanners to the last key 962 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 963 // remove the older memstore scanner 964 for (int i = currentScanners.size() - 1; i >=0; i--) { 965 if (!currentScanners.get(i).isFileScanner()) { 966 scannersForDelayedClose.add(currentScanners.remove(i)); 967 } else { 968 // we add the memstore scanner to the end of currentScanners 969 break; 970 } 971 } 972 // add the newly created scanners on the flushed files and the current active memstore scanner 973 addCurrentScanners(scanners); 974 // Combine all seeked scanners with a heap 975 resetKVHeap(this.currentScanners, store.getComparator()); 976 resetQueryMatcher(lastTop); 977 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 978 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() + 979 ",and after = " + heap.peek()); 980 topChanged = true; 981 } else { 982 topChanged = false; 983 } 984 return topChanged; 985 } 986 987 private void resetQueryMatcher(Cell lastTopKey) { 988 // Reset the state of the Query Matcher and set to top row. 989 // Only reset and call setRow if the row changes; avoids confusing the 990 // query matcher if scanning intra-row. 991 Cell cell = heap.peek(); 992 if (cell == null) { 993 cell = lastTopKey; 994 } 995 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 996 this.countPerRow = 0; 997 // The setToNewRow will call reset internally 998 matcher.setToNewRow(cell); 999 } 1000 } 1001 1002 /** 1003 * Check whether scan as expected order 1004 * @param prevKV 1005 * @param kv 1006 * @param comparator 1007 * @throws IOException 1008 */ 1009 protected void checkScanOrder(Cell prevKV, Cell kv, 1010 CellComparator comparator) throws IOException { 1011 // Check that the heap gives us KVs in an increasing order. 1012 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key " 1013 + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1014 } 1015 1016 protected boolean seekToNextRow(Cell c) throws IOException { 1017 return reseek(PrivateCellUtil.createLastOnRow(c)); 1018 } 1019 1020 /** 1021 * Do a reseek in a normal StoreScanner(scan forward) 1022 * @param kv 1023 * @return true if scanner has values left, false if end of scanner 1024 * @throws IOException 1025 */ 1026 protected boolean seekAsDirection(Cell kv) 1027 throws IOException { 1028 return reseek(kv); 1029 } 1030 1031 @Override 1032 public boolean reseek(Cell kv) throws IOException { 1033 if (checkFlushed()) { 1034 reopenAfterFlush(); 1035 } 1036 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1037 return heap.requestSeek(kv, true, useRowColBloom); 1038 } 1039 return heap.reseek(kv); 1040 } 1041 1042 @VisibleForTesting 1043 void trySwitchToStreamRead() { 1044 if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || 1045 heap.peek() == null || bytesRead < preadMaxBytes) { 1046 return; 1047 } 1048 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1049 this.store.getColumnFamilyName()); 1050 scanUsePread = false; 1051 Cell lastTop = heap.peek(); 1052 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1053 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1054 for (KeyValueScanner kvs : currentScanners) { 1055 if (!kvs.isFileScanner()) { 1056 // collect memstorescanners here 1057 memstoreScanners.add(kvs); 1058 } else { 1059 scannersToClose.add(kvs); 1060 } 1061 } 1062 List<KeyValueScanner> fileScanners = null; 1063 List<KeyValueScanner> newCurrentScanners; 1064 KeyValueHeap newHeap; 1065 try { 1066 // We must have a store instance here so no null check 1067 // recreate the scanners on the current file scanners 1068 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, 1069 matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 1070 scan.includeStopRow(), readPt, false); 1071 if (fileScanners == null) { 1072 return; 1073 } 1074 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1075 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1076 newCurrentScanners.addAll(fileScanners); 1077 newCurrentScanners.addAll(memstoreScanners); 1078 newHeap = newKVHeap(newCurrentScanners, comparator); 1079 } catch (Exception e) { 1080 LOG.warn("failed to switch to stream read", e); 1081 if (fileScanners != null) { 1082 fileScanners.forEach(KeyValueScanner::close); 1083 } 1084 return; 1085 } 1086 currentScanners.clear(); 1087 addCurrentScanners(newCurrentScanners); 1088 this.heap = newHeap; 1089 resetQueryMatcher(lastTop); 1090 scannersToClose.forEach(KeyValueScanner::close); 1091 } 1092 1093 protected final boolean checkFlushed() { 1094 // check the var without any lock. Suppose even if we see the old 1095 // value here still it is ok to continue because we will not be resetting 1096 // the heap but will continue with the referenced memstore's snapshot. For compactions 1097 // any way we don't need the updateReaders at all to happen as we still continue with 1098 // the older files 1099 if (flushed) { 1100 // If there is a flush and the current scan is notified on the flush ensure that the 1101 // scan's heap gets reset and we do a seek on the newly flushed file. 1102 if (this.closing) { 1103 return false; 1104 } 1105 // reset the flag 1106 flushed = false; 1107 return true; 1108 } 1109 return false; 1110 } 1111 1112 1113 /** 1114 * Seek storefiles in parallel to optimize IO latency as much as possible 1115 * @param scanners the list {@link KeyValueScanner}s to be read from 1116 * @param kv the KeyValue on which the operation is being requested 1117 * @throws IOException 1118 */ 1119 private void parallelSeek(final List<? extends KeyValueScanner> 1120 scanners, final Cell kv) throws IOException { 1121 if (scanners.isEmpty()) return; 1122 int storeFileScannerCount = scanners.size(); 1123 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1124 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1125 for (KeyValueScanner scanner : scanners) { 1126 if (scanner instanceof StoreFileScanner) { 1127 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, 1128 this.readPt, latch); 1129 executor.submit(seekHandler); 1130 handlers.add(seekHandler); 1131 } else { 1132 scanner.seek(kv); 1133 latch.countDown(); 1134 } 1135 } 1136 1137 try { 1138 latch.await(); 1139 } catch (InterruptedException ie) { 1140 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 1141 } 1142 1143 for (ParallelSeekHandler handler : handlers) { 1144 if (handler.getErr() != null) { 1145 throw new IOException(handler.getErr()); 1146 } 1147 } 1148 } 1149 1150 /** 1151 * Used in testing. 1152 * @return all scanners in no particular order 1153 */ 1154 @VisibleForTesting 1155 List<KeyValueScanner> getAllScannersForTesting() { 1156 List<KeyValueScanner> allScanners = new ArrayList<>(); 1157 KeyValueScanner current = heap.getCurrentForTesting(); 1158 if (current != null) 1159 allScanners.add(current); 1160 for (KeyValueScanner scanner : heap.getHeap()) 1161 allScanners.add(scanner); 1162 return allScanners; 1163 } 1164 1165 static void enableLazySeekGlobally(boolean enable) { 1166 lazySeekEnabledGlobally = enable; 1167 } 1168 1169 /** 1170 * @return The estimated number of KVs seen by this scanner (includes some skipped KVs). 1171 */ 1172 public long getEstimatedNumberOfKvsScanned() { 1173 return this.kvsScanned; 1174 } 1175 1176 @Override 1177 public Cell getNextIndexedKey() { 1178 return this.heap.getNextIndexedKey(); 1179 } 1180 1181 @Override 1182 public void shipped() throws IOException { 1183 if (prevCell != null) { 1184 // Do the copy here so that in case the prevCell ref is pointing to the previous 1185 // blocks we can safely release those blocks. 1186 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1187 // fetched from HDFS. Copying this would ensure that we let go the references to these 1188 // blocks so that they can be GCed safely(in case of bucket cache) 1189 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1190 } 1191 matcher.beforeShipped(); 1192 // There wont be further fetch of Cells from these scanners. Just close. 1193 clearAndClose(scannersForDelayedClose); 1194 if (this.heap != null) { 1195 this.heap.shipped(); 1196 // When switching from pread to stream, we will open a new scanner for each store file, but 1197 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1198 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1199 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1200 // method, so we here will also open new scanners and close old scanners in shipped method. 1201 // See HBASE-18055 for more details. 1202 trySwitchToStreamRead(); 1203 } 1204 } 1205} 1206