001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.NavigableSet; 026import java.util.OptionalInt; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.KeyValueUtil; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.executor.ExecutorService; 041import org.apache.hadoop.hbase.filter.Filter; 042import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 043import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 044import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 045import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 046import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 047import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 048import org.apache.hadoop.hbase.util.CollectionUtils; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055 056/** 057 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 058 * for a single row. 059 * <p> 060 * The implementation is not thread safe. So there will be no race between next and close. The only 061 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 062 * is a flush. 063 */ 064@InterfaceAudience.Private 065public class StoreScanner extends NonReversedNonLazyKeyValueScanner 066 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 067 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 068 // In unit tests, the store could be null 069 protected final HStore store; 070 private final CellComparator comparator; 071 private ScanQueryMatcher matcher; 072 protected KeyValueHeap heap; 073 private boolean cacheBlocks; 074 075 private long countPerRow = 0; 076 private int storeLimit = -1; 077 private int storeOffset = 0; 078 079 // Used to indicate that the scanner has closed (see HBASE-1107) 080 private volatile boolean closing = false; 081 private final boolean get; 082 private final boolean explicitColumnQuery; 083 private final boolean useRowColBloom; 084 /** 085 * A flag that enables StoreFileScanner parallel-seeking 086 */ 087 private boolean parallelSeekEnabled = false; 088 private ExecutorService executor; 089 private final Scan scan; 090 private final long oldestUnexpiredTS; 091 private final long now; 092 private final int minVersions; 093 private final long maxRowSize; 094 private final long cellsPerHeartbeatCheck; 095 096 // 1) Collects all the KVHeap that are eagerly getting closed during the 097 // course of a scan 098 // 2) Collects the unused memstore scanners. If we close the memstore scanners 099 // before sending data to client, the chunk may be reclaimed by other 100 // updates and the data will be corrupt. 101 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 102 103 /** 104 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not 105 * KVs skipped via seeking to next row/column. TODO: estimate them? 106 */ 107 private long kvsScanned = 0; 108 private Cell prevCell = null; 109 110 private final long preadMaxBytes; 111 private long bytesRead; 112 113 /** We don't ever expect to change this, the constant is just for clarity. */ 114 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 115 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 116 "hbase.storescanner.parallel.seek.enable"; 117 118 /** Used during unit testing to ensure that lazy seek does save seek ops */ 119 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 120 121 /** 122 * The number of cells scanned in between timeout checks. Specifying a larger value means that 123 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 124 * timeout checks. 125 */ 126 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 127 "hbase.cells.scanned.per.heartbeat.check"; 128 129 /** 130 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 131 */ 132 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 133 134 /** 135 * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 136 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 137 * block size for this store. 138 */ 139 public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; 140 141 private final Scan.ReadType readType; 142 143 // A flag whether use pread for scan 144 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 145 private boolean scanUsePread; 146 // Indicates whether there was flush during the course of the scan 147 private volatile boolean flushed = false; 148 // generally we get one file from a flush 149 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 150 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 151 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 152 // The current list of scanners 153 @VisibleForTesting 154 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 155 // flush update lock 156 private final ReentrantLock flushLock = new ReentrantLock(); 157 // lock for closing. 158 private final ReentrantLock closeLock = new ReentrantLock(); 159 160 protected final long readPt; 161 private boolean topChanged = false; 162 163 /** An internal constructor. */ 164 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, 165 int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) { 166 this.readPt = readPt; 167 this.store = store; 168 this.cacheBlocks = cacheBlocks; 169 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 170 get = scan.isGetScan(); 171 explicitColumnQuery = numColumns > 0; 172 this.scan = scan; 173 this.now = EnvironmentEdgeManager.currentTime(); 174 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 175 this.minVersions = scanInfo.getMinVersions(); 176 177 // We look up row-column Bloom filters for multi-column queries as part of 178 // the seek operation. However, we also look the row-column Bloom filter 179 // for multi-row (non-"get") scans because this is not done in 180 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 181 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1); 182 this.maxRowSize = scanInfo.getTableMaxRowSize(); 183 if (get) { 184 this.readType = Scan.ReadType.PREAD; 185 this.scanUsePread = true; 186 } else if(scanType != ScanType.USER_SCAN) { 187 // For compaction scanners never use Pread as already we have stream based scanners on the 188 // store files to be compacted 189 this.readType = Scan.ReadType.STREAM; 190 this.scanUsePread = false; 191 } else { 192 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 193 this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT; 194 } else { 195 this.readType = scan.getReadType(); 196 } 197 // Always start with pread unless user specific stream. Will change to stream later if 198 // readType is default if the scan keeps running for a long time. 199 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 200 } 201 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 202 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 203 // Parallel seeking is on if the config allows and more there is more than one store file. 204 if (store != null && store.getStorefilesCount() > 1) { 205 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 206 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 207 this.parallelSeekEnabled = true; 208 this.executor = rsService.getExecutorService(); 209 } 210 } 211 } 212 213 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 214 this.currentScanners.addAll(scanners); 215 } 216 217 /** 218 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we 219 * are not in a compaction. 220 * 221 * @param store who we scan 222 * @param scan the spec 223 * @param columns which columns we are scanning 224 * @throws IOException 225 */ 226 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 227 long readPt) throws IOException { 228 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, 229 scan.getCacheBlocks(), ScanType.USER_SCAN); 230 if (columns != null && scan.isRaw()) { 231 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 232 } 233 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 234 store.getCoprocessorHost()); 235 236 store.addChangedReaderObserver(this); 237 238 try { 239 // Pass columns to try to filter out unnecessary StoreFiles. 240 List<KeyValueScanner> scanners = selectScannersFrom(store, 241 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 242 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); 243 244 // Seek all scanners to the start of the Row (or if the exact matching row 245 // key does not exist, then to the start of the next matching Row). 246 // Always check bloom filter to optimize the top row seek for delete 247 // family marker. 248 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 249 parallelSeekEnabled); 250 251 // set storeLimit 252 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 253 254 // set rowOffset 255 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 256 addCurrentScanners(scanners); 257 // Combine all seeked scanners with a heap 258 resetKVHeap(scanners, comparator); 259 } catch (IOException e) { 260 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 261 // and might cause memory leak 262 store.deleteChangedReaderObserver(this); 263 throw e; 264 } 265 } 266 267 // a dummy scan instance for compaction. 268 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 269 270 /** 271 * Used for store file compaction and memstore compaction. 272 * <p> 273 * Opens a scanner across specified StoreFiles/MemStoreSegments. 274 * @param store who we scan 275 * @param scanners ancillary scanners 276 * @param smallestReadPoint the readPoint that we should use for tracking versions 277 */ 278 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 279 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 280 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 281 } 282 283 /** 284 * Used for compactions that drop deletes from a limited range of rows. 285 * <p> 286 * Opens a scanner across specified StoreFiles. 287 * @param store who we scan 288 * @param scanners ancillary scanners 289 * @param smallestReadPoint the readPoint that we should use for tracking versions 290 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 291 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 292 */ 293 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 294 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 295 byte[] dropDeletesToRow) throws IOException { 296 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 297 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 298 } 299 300 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 301 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 302 byte[] dropDeletesToRow) throws IOException { 303 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 304 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 305 assert scanType != ScanType.USER_SCAN; 306 matcher = 307 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 308 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 309 310 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 311 scanners = selectScannersFrom(store, scanners); 312 313 // Seek all scanners to the initial key 314 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 315 addCurrentScanners(scanners); 316 // Combine all seeked scanners with a heap 317 resetKVHeap(scanners, comparator); 318 } 319 320 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 321 throws IOException { 322 // Seek all scanners to the initial key 323 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 324 addCurrentScanners(scanners); 325 resetKVHeap(scanners, comparator); 326 } 327 328 // For mob compaction only as we do not have a Store instance when doing mob compaction. 329 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 330 List<? extends KeyValueScanner> scanners) throws IOException { 331 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 332 assert scanType != ScanType.USER_SCAN; 333 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 334 oldestUnexpiredTS, now, null, null, null); 335 seekAllScanner(scanInfo, scanners); 336 } 337 338 // Used to instantiate a scanner for user scan in test 339 @VisibleForTesting 340 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 341 List<? extends KeyValueScanner> scanners) throws IOException { 342 // 0 is passed as readpoint because the test bypasses Store 343 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, 344 scan.getCacheBlocks(), ScanType.USER_SCAN); 345 this.matcher = 346 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 347 seekAllScanner(scanInfo, scanners); 348 } 349 350 // Used to instantiate a scanner for compaction in test 351 @VisibleForTesting 352 StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType, 353 List<? extends KeyValueScanner> scanners) throws IOException { 354 // 0 is passed as readpoint because the test bypasses Store 355 this(null, maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt()) 356 : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType); 357 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 358 HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 359 seekAllScanner(scanInfo, scanners); 360 } 361 362 @VisibleForTesting 363 boolean isScanUsePread() { 364 return this.scanUsePread; 365 } 366 /** 367 * Seek the specified scanners with the given key 368 * @param scanners 369 * @param seekKey 370 * @param isLazy true if using lazy seek 371 * @param isParallelSeek true if using parallel seek 372 * @throws IOException 373 */ 374 protected void seekScanners(List<? extends KeyValueScanner> scanners, 375 Cell seekKey, boolean isLazy, boolean isParallelSeek) 376 throws IOException { 377 // Seek all scanners to the start of the Row (or if the exact matching row 378 // key does not exist, then to the start of the next matching Row). 379 // Always check bloom filter to optimize the top row seek for delete 380 // family marker. 381 if (isLazy) { 382 for (KeyValueScanner scanner : scanners) { 383 scanner.requestSeek(seekKey, false, true); 384 } 385 } else { 386 if (!isParallelSeek) { 387 long totalScannersSoughtBytes = 0; 388 for (KeyValueScanner scanner : scanners) { 389 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 390 throw new RowTooBigException("Max row size allowed: " + maxRowSize 391 + ", but row is bigger than that"); 392 } 393 scanner.seek(seekKey); 394 Cell c = scanner.peek(); 395 if (c != null) { 396 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 397 } 398 } 399 } else { 400 parallelSeek(scanners, seekKey); 401 } 402 } 403 } 404 405 @VisibleForTesting 406 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, 407 CellComparator comparator) throws IOException { 408 // Combine all seeked scanners with a heap 409 heap = newKVHeap(scanners, comparator); 410 } 411 412 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 413 CellComparator comparator) throws IOException { 414 return new KeyValueHeap(scanners, comparator); 415 } 416 417 /** 418 * Filters the given list of scanners using Bloom filter, time range, and TTL. 419 * <p> 420 * Will be overridden by testcase so declared as protected. 421 */ 422 @VisibleForTesting 423 protected List<KeyValueScanner> selectScannersFrom(HStore store, 424 List<? extends KeyValueScanner> allScanners) { 425 boolean memOnly; 426 boolean filesOnly; 427 if (scan instanceof InternalScan) { 428 InternalScan iscan = (InternalScan) scan; 429 memOnly = iscan.isCheckOnlyMemStore(); 430 filesOnly = iscan.isCheckOnlyStoreFiles(); 431 } else { 432 memOnly = false; 433 filesOnly = false; 434 } 435 436 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 437 438 // We can only exclude store files based on TTL if minVersions is set to 0. 439 // Otherwise, we might have to return KVs that have technically expired. 440 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 441 442 // include only those scan files which pass all filters 443 for (KeyValueScanner kvs : allScanners) { 444 boolean isFile = kvs.isFileScanner(); 445 if ((!isFile && filesOnly) || (isFile && memOnly)) { 446 continue; 447 } 448 449 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 450 scanners.add(kvs); 451 } else { 452 kvs.close(); 453 } 454 } 455 return scanners; 456 } 457 458 @Override 459 public Cell peek() { 460 return heap != null ? heap.peek() : null; 461 } 462 463 @Override 464 public KeyValue next() { 465 // throw runtime exception perhaps? 466 throw new RuntimeException("Never call StoreScanner.next()"); 467 } 468 469 @Override 470 public void close() { 471 close(true); 472 } 473 474 private void close(boolean withDelayedScannersClose) { 475 closeLock.lock(); 476 // If the closeLock is acquired then any subsequent updateReaders() 477 // call is ignored. 478 try { 479 if (this.closing) { 480 return; 481 } 482 if (withDelayedScannersClose) { 483 this.closing = true; 484 } 485 // For mob compaction, we do not have a store. 486 if (this.store != null) { 487 this.store.deleteChangedReaderObserver(this); 488 } 489 if (withDelayedScannersClose) { 490 clearAndClose(scannersForDelayedClose); 491 clearAndClose(memStoreScannersAfterFlush); 492 clearAndClose(flushedstoreFileScanners); 493 if (this.heap != null) { 494 this.heap.close(); 495 this.currentScanners.clear(); 496 this.heap = null; // CLOSED! 497 } 498 } else { 499 if (this.heap != null) { 500 this.scannersForDelayedClose.add(this.heap); 501 this.currentScanners.clear(); 502 this.heap = null; 503 } 504 } 505 } finally { 506 closeLock.unlock(); 507 } 508 } 509 510 @Override 511 public boolean seek(Cell key) throws IOException { 512 if (checkFlushed()) { 513 reopenAfterFlush(); 514 } 515 return this.heap.seek(key); 516 } 517 518 /** 519 * Get the next row of values from this Store. 520 * @param outResult 521 * @param scannerContext 522 * @return true if there are more rows, false if scanner is done 523 */ 524 @Override 525 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { 526 if (scannerContext == null) { 527 throw new IllegalArgumentException("Scanner context cannot be null"); 528 } 529 if (checkFlushed() && reopenAfterFlush()) { 530 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 531 } 532 533 // if the heap was left null, then the scanners had previously run out anyways, close and 534 // return. 535 if (this.heap == null) { 536 // By this time partial close should happened because already heap is null 537 close(false);// Do all cleanup except heap.close() 538 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 539 } 540 541 Cell cell = this.heap.peek(); 542 if (cell == null) { 543 close(false);// Do all cleanup except heap.close() 544 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 545 } 546 547 // only call setRow if the row changes; avoids confusing the query matcher 548 // if scanning intra-row 549 550 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 551 // rows. Else it is possible we are still traversing the same row so we must perform the row 552 // comparison. 553 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 554 this.countPerRow = 0; 555 matcher.setToNewRow(cell); 556 } 557 558 // Clear progress away unless invoker has indicated it should be kept. 559 if (!scannerContext.getKeepProgress()) { 560 scannerContext.clearProgress(); 561 } 562 563 int count = 0; 564 long totalBytesRead = 0; 565 566 LOOP: do { 567 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 568 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { 569 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 570 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 571 } 572 } 573 // Do object compare - we set prevKV from the same heap. 574 if (prevCell != cell) { 575 ++kvsScanned; 576 } 577 checkScanOrder(prevCell, cell, comparator); 578 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 579 bytesRead += cellSize; 580 prevCell = cell; 581 scannerContext.setLastPeekedCell(cell); 582 topChanged = false; 583 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 584 switch (qcode) { 585 case INCLUDE: 586 case INCLUDE_AND_SEEK_NEXT_ROW: 587 case INCLUDE_AND_SEEK_NEXT_COL: 588 589 Filter f = matcher.getFilter(); 590 if (f != null) { 591 cell = f.transformCell(cell); 592 } 593 594 this.countPerRow++; 595 if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { 596 // do what SEEK_NEXT_ROW does. 597 if (!matcher.moreRowsMayExistAfter(cell)) { 598 close(false);// Do all cleanup except heap.close() 599 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 600 } 601 matcher.clearCurrentRow(); 602 seekToNextRow(cell); 603 break LOOP; 604 } 605 606 // add to results only if we have skipped #storeOffset kvs 607 // also update metric accordingly 608 if (this.countPerRow > storeOffset) { 609 outResult.add(cell); 610 611 // Update local tracking information 612 count++; 613 totalBytesRead += cellSize; 614 615 // Update the progress of the scanner context 616 scannerContext.incrementSizeProgress(cellSize, 617 PrivateCellUtil.estimatedSizeOfCell(cell)); 618 scannerContext.incrementBatchProgress(1); 619 620 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 621 throw new RowTooBigException( 622 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); 623 } 624 } 625 626 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 627 if (!matcher.moreRowsMayExistAfter(cell)) { 628 close(false);// Do all cleanup except heap.close() 629 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 630 } 631 matcher.clearCurrentRow(); 632 seekOrSkipToNextRow(cell); 633 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 634 seekOrSkipToNextColumn(cell); 635 } else { 636 this.heap.next(); 637 } 638 639 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 640 break LOOP; 641 } 642 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 643 break LOOP; 644 } 645 continue; 646 647 case DONE: 648 // Optimization for Gets! If DONE, no more to get on this row, early exit! 649 if (get) { 650 // Then no more to this row... exit. 651 close(false);// Do all cleanup except heap.close() 652 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 653 } 654 matcher.clearCurrentRow(); 655 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 656 657 case DONE_SCAN: 658 close(false);// Do all cleanup except heap.close() 659 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 660 661 case SEEK_NEXT_ROW: 662 // This is just a relatively simple end of scan fix, to short-cut end 663 // us if there is an endKey in the scan. 664 if (!matcher.moreRowsMayExistAfter(cell)) { 665 close(false);// Do all cleanup except heap.close() 666 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 667 } 668 matcher.clearCurrentRow(); 669 seekOrSkipToNextRow(cell); 670 NextState stateAfterSeekNextRow = needToReturn(outResult); 671 if (stateAfterSeekNextRow != null) { 672 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 673 } 674 break; 675 676 case SEEK_NEXT_COL: 677 seekOrSkipToNextColumn(cell); 678 NextState stateAfterSeekNextColumn = needToReturn(outResult); 679 if (stateAfterSeekNextColumn != null) { 680 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 681 } 682 break; 683 684 case SKIP: 685 this.heap.next(); 686 break; 687 688 case SEEK_NEXT_USING_HINT: 689 Cell nextKV = matcher.getNextKeyHint(cell); 690 if (nextKV != null && comparator.compare(nextKV, cell) > 0) { 691 seekAsDirection(nextKV); 692 NextState stateAfterSeekByHint = needToReturn(outResult); 693 if (stateAfterSeekByHint != null) { 694 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 695 } 696 } else { 697 heap.next(); 698 } 699 break; 700 701 default: 702 throw new RuntimeException("UNEXPECTED"); 703 } 704 } while ((cell = this.heap.peek()) != null); 705 706 if (count > 0) { 707 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 708 } 709 710 // No more keys 711 close(false);// Do all cleanup except heap.close() 712 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 713 } 714 715 /** 716 * If the top cell won't be flushed into disk, the new top cell may be 717 * changed after #reopenAfterFlush. Because the older top cell only exist 718 * in the memstore scanner but the memstore scanner is replaced by hfile 719 * scanner after #reopenAfterFlush. If the row of top cell is changed, 720 * we should return the current cells. Otherwise, we may return 721 * the cells across different rows. 722 * @param outResult the cells which are visible for user scan 723 * @return null is the top cell doesn't change. Otherwise, the NextState 724 * to return 725 */ 726 private NextState needToReturn(List<Cell> outResult) { 727 if (!outResult.isEmpty() && topChanged) { 728 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 729 } 730 return null; 731 } 732 733 private void seekOrSkipToNextRow(Cell cell) throws IOException { 734 // If it is a Get Scan, then we know that we are done with this row; there are no more 735 // rows beyond the current one: don't try to optimize. 736 if (!get) { 737 if (trySkipToNextRow(cell)) { 738 return; 739 } 740 } 741 seekToNextRow(cell); 742 } 743 744 private void seekOrSkipToNextColumn(Cell cell) throws IOException { 745 if (!trySkipToNextColumn(cell)) { 746 seekAsDirection(matcher.getKeyForNextColumn(cell)); 747 } 748 } 749 750 /** 751 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 752 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, 753 * or seek to an arbitrary seek key. This method decides whether a seek is the most efficient 754 * _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, 755 * SKIP inside the current, loaded block). 756 * It does this by looking at the next indexed key of the current HFile. This key 757 * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key 758 * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with 759 * the current Cell but compare as though it were a seek key; see down in 760 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the 761 * next block we *_SEEK, otherwise we just SKIP to the next requested cell. 762 * 763 * <p>Other notes: 764 * <ul> 765 * <li>Rows can straddle block boundaries</li> 766 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 767 * different block than column C1 at T2)</li> 768 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a 769 * few SKIPs...</li> 770 * <li>We want to SEEK when the chance is high that we'll be able to seek 771 * past many Cells, especially if we know we need to go to the next block.</li> 772 * </ul> 773 * <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether 774 * we'll likely end up seeking to the next block (or past the next block) to get our next column. 775 * Example: 776 * <pre> 777 * | BLOCK 1 | BLOCK 2 | 778 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 779 * ^ ^ 780 * | | 781 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 782 * 783 * 784 * | BLOCK 1 | BLOCK 2 | 785 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 786 * ^ ^ 787 * | | 788 * Next Index Key SEEK_NEXT_COL 789 * </pre> 790 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 791 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 792 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 793 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 794 * where the SEEK will not land us in the next block, it is very likely better to issues a series 795 * of SKIPs. 796 * @param cell current cell 797 * @return true means skip to next row, false means not 798 */ 799 @VisibleForTesting 800 protected boolean trySkipToNextRow(Cell cell) throws IOException { 801 Cell nextCell = null; 802 // used to guard against a changed next indexed key by doing a identity comparison 803 // when the identity changes we need to compare the bytes again 804 Cell previousIndexedKey = null; 805 do { 806 Cell nextIndexedKey = getNextIndexedKey(); 807 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && 808 (nextIndexedKey == previousIndexedKey || 809 matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) { 810 this.heap.next(); 811 ++kvsScanned; 812 previousIndexedKey = nextIndexedKey; 813 } else { 814 return false; 815 } 816 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 817 return true; 818 } 819 820 /** 821 * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)} 822 * @param cell current cell 823 * @return true means skip to next column, false means not 824 */ 825 @VisibleForTesting 826 protected boolean trySkipToNextColumn(Cell cell) throws IOException { 827 Cell nextCell = null; 828 // used to guard against a changed next indexed key by doing a identity comparison 829 // when the identity changes we need to compare the bytes again 830 Cell previousIndexedKey = null; 831 do { 832 Cell nextIndexedKey = getNextIndexedKey(); 833 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && 834 (nextIndexedKey == previousIndexedKey || 835 matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) { 836 this.heap.next(); 837 ++kvsScanned; 838 previousIndexedKey = nextIndexedKey; 839 } else { 840 return false; 841 } 842 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 843 // We need this check because it may happen that the new scanner that we get 844 // during heap.next() is requiring reseek due of fake KV previously generated for 845 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 846 if (nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) { 847 return false; 848 } 849 return true; 850 } 851 852 @Override 853 public long getReadPoint() { 854 return this.readPt; 855 } 856 857 private static void clearAndClose(List<KeyValueScanner> scanners) { 858 for (KeyValueScanner s : scanners) { 859 s.close(); 860 } 861 scanners.clear(); 862 } 863 864 // Implementation of ChangedReadersObserver 865 @Override 866 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 867 throws IOException { 868 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 869 return; 870 } 871 boolean updateReaders = false; 872 flushLock.lock(); 873 try { 874 if (!closeLock.tryLock()) { 875 // The reason for doing this is that when the current store scanner does not retrieve 876 // any new cells, then the scanner is considered to be done. The heap of this scanner 877 // is not closed till the shipped() call is completed. Hence in that case if at all 878 // the partial close (close (false)) has been called before updateReaders(), there is no 879 // need for the updateReaders() to happen. 880 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 881 // no lock acquired. 882 return; 883 } 884 // lock acquired 885 updateReaders = true; 886 if (this.closing) { 887 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 888 return; 889 } 890 flushed = true; 891 final boolean isCompaction = false; 892 boolean usePread = get || scanUsePread; 893 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 894 // calls next(). So its better we create scanners here rather than next() call. Ensure 895 // these scanners are properly closed() whether or not the scan is completed successfully 896 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 897 // store files. In case of stream scanners this eager creation does not induce performance 898 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 899 List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread, 900 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); 901 flushedstoreFileScanners.addAll(scanners); 902 if (!CollectionUtils.isEmpty(memStoreScanners)) { 903 clearAndClose(memStoreScannersAfterFlush); 904 memStoreScannersAfterFlush.addAll(memStoreScanners); 905 } 906 } finally { 907 flushLock.unlock(); 908 if (updateReaders) { 909 closeLock.unlock(); 910 } 911 } 912 // Let the next() call handle re-creating and seeking 913 } 914 915 /** 916 * @return if top of heap has changed (and KeyValueHeap has to try the next KV) 917 */ 918 protected final boolean reopenAfterFlush() throws IOException { 919 // here we can make sure that we have a Store instance so no null check on store. 920 Cell lastTop = heap.peek(); 921 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 922 // scanners? We did so in the constructor and we could have done it now by storing the scan 923 // object from the constructor 924 List<KeyValueScanner> scanners; 925 flushLock.lock(); 926 try { 927 List<KeyValueScanner> allScanners = 928 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 929 allScanners.addAll(flushedstoreFileScanners); 930 allScanners.addAll(memStoreScannersAfterFlush); 931 scanners = selectScannersFrom(store, allScanners); 932 // Clear the current set of flushed store files scanners so that they don't get added again 933 flushedstoreFileScanners.clear(); 934 memStoreScannersAfterFlush.clear(); 935 } finally { 936 flushLock.unlock(); 937 } 938 939 // Seek the new scanners to the last key 940 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 941 // remove the older memstore scanner 942 for (int i = currentScanners.size() - 1; i >=0; i--) { 943 if (!currentScanners.get(i).isFileScanner()) { 944 scannersForDelayedClose.add(currentScanners.remove(i)); 945 } else { 946 // we add the memstore scanner to the end of currentScanners 947 break; 948 } 949 } 950 // add the newly created scanners on the flushed files and the current active memstore scanner 951 addCurrentScanners(scanners); 952 // Combine all seeked scanners with a heap 953 resetKVHeap(this.currentScanners, store.getComparator()); 954 resetQueryMatcher(lastTop); 955 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 956 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() + 957 ",and after = " + heap.peek()); 958 topChanged = true; 959 } else { 960 topChanged = false; 961 } 962 return topChanged; 963 } 964 965 private void resetQueryMatcher(Cell lastTopKey) { 966 // Reset the state of the Query Matcher and set to top row. 967 // Only reset and call setRow if the row changes; avoids confusing the 968 // query matcher if scanning intra-row. 969 Cell cell = heap.peek(); 970 if (cell == null) { 971 cell = lastTopKey; 972 } 973 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 974 this.countPerRow = 0; 975 // The setToNewRow will call reset internally 976 matcher.setToNewRow(cell); 977 } 978 } 979 980 /** 981 * Check whether scan as expected order 982 * @param prevKV 983 * @param kv 984 * @param comparator 985 * @throws IOException 986 */ 987 protected void checkScanOrder(Cell prevKV, Cell kv, 988 CellComparator comparator) throws IOException { 989 // Check that the heap gives us KVs in an increasing order. 990 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key " 991 + prevKV + " followed by a smaller key " + kv + " in cf " + store; 992 } 993 994 protected boolean seekToNextRow(Cell c) throws IOException { 995 return reseek(PrivateCellUtil.createLastOnRow(c)); 996 } 997 998 /** 999 * Do a reseek in a normal StoreScanner(scan forward) 1000 * @param kv 1001 * @return true if scanner has values left, false if end of scanner 1002 * @throws IOException 1003 */ 1004 protected boolean seekAsDirection(Cell kv) 1005 throws IOException { 1006 return reseek(kv); 1007 } 1008 1009 @Override 1010 public boolean reseek(Cell kv) throws IOException { 1011 if (checkFlushed()) { 1012 reopenAfterFlush(); 1013 } 1014 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1015 return heap.requestSeek(kv, true, useRowColBloom); 1016 } 1017 return heap.reseek(kv); 1018 } 1019 1020 @VisibleForTesting 1021 void trySwitchToStreamRead() { 1022 if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || 1023 heap.peek() == null || bytesRead < preadMaxBytes) { 1024 return; 1025 } 1026 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1027 this.store.getColumnFamilyName()); 1028 scanUsePread = false; 1029 Cell lastTop = heap.peek(); 1030 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1031 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1032 for (KeyValueScanner kvs : currentScanners) { 1033 if (!kvs.isFileScanner()) { 1034 // collect memstorescanners here 1035 memstoreScanners.add(kvs); 1036 } else { 1037 scannersToClose.add(kvs); 1038 } 1039 } 1040 List<KeyValueScanner> fileScanners = null; 1041 List<KeyValueScanner> newCurrentScanners; 1042 KeyValueHeap newHeap; 1043 try { 1044 // We must have a store instance here so no null check 1045 // recreate the scanners on the current file scanners 1046 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, 1047 matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 1048 scan.includeStopRow(), readPt, false); 1049 if (fileScanners == null) { 1050 return; 1051 } 1052 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1053 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1054 newCurrentScanners.addAll(fileScanners); 1055 newCurrentScanners.addAll(memstoreScanners); 1056 newHeap = newKVHeap(newCurrentScanners, comparator); 1057 } catch (Exception e) { 1058 LOG.warn("failed to switch to stream read", e); 1059 if (fileScanners != null) { 1060 fileScanners.forEach(KeyValueScanner::close); 1061 } 1062 return; 1063 } 1064 currentScanners.clear(); 1065 addCurrentScanners(newCurrentScanners); 1066 this.heap = newHeap; 1067 resetQueryMatcher(lastTop); 1068 scannersToClose.forEach(KeyValueScanner::close); 1069 } 1070 1071 protected final boolean checkFlushed() { 1072 // check the var without any lock. Suppose even if we see the old 1073 // value here still it is ok to continue because we will not be resetting 1074 // the heap but will continue with the referenced memstore's snapshot. For compactions 1075 // any way we don't need the updateReaders at all to happen as we still continue with 1076 // the older files 1077 if (flushed) { 1078 // If there is a flush and the current scan is notified on the flush ensure that the 1079 // scan's heap gets reset and we do a seek on the newly flushed file. 1080 if (this.closing) { 1081 return false; 1082 } 1083 // reset the flag 1084 flushed = false; 1085 return true; 1086 } 1087 return false; 1088 } 1089 1090 /** 1091 * @see KeyValueScanner#getScannerOrder() 1092 */ 1093 @Override 1094 public long getScannerOrder() { 1095 return 0; 1096 } 1097 1098 /** 1099 * Seek storefiles in parallel to optimize IO latency as much as possible 1100 * @param scanners the list {@link KeyValueScanner}s to be read from 1101 * @param kv the KeyValue on which the operation is being requested 1102 * @throws IOException 1103 */ 1104 private void parallelSeek(final List<? extends KeyValueScanner> 1105 scanners, final Cell kv) throws IOException { 1106 if (scanners.isEmpty()) return; 1107 int storeFileScannerCount = scanners.size(); 1108 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1109 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1110 for (KeyValueScanner scanner : scanners) { 1111 if (scanner instanceof StoreFileScanner) { 1112 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, 1113 this.readPt, latch); 1114 executor.submit(seekHandler); 1115 handlers.add(seekHandler); 1116 } else { 1117 scanner.seek(kv); 1118 latch.countDown(); 1119 } 1120 } 1121 1122 try { 1123 latch.await(); 1124 } catch (InterruptedException ie) { 1125 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 1126 } 1127 1128 for (ParallelSeekHandler handler : handlers) { 1129 if (handler.getErr() != null) { 1130 throw new IOException(handler.getErr()); 1131 } 1132 } 1133 } 1134 1135 /** 1136 * Used in testing. 1137 * @return all scanners in no particular order 1138 */ 1139 @VisibleForTesting 1140 List<KeyValueScanner> getAllScannersForTesting() { 1141 List<KeyValueScanner> allScanners = new ArrayList<>(); 1142 KeyValueScanner current = heap.getCurrentForTesting(); 1143 if (current != null) 1144 allScanners.add(current); 1145 for (KeyValueScanner scanner : heap.getHeap()) 1146 allScanners.add(scanner); 1147 return allScanners; 1148 } 1149 1150 static void enableLazySeekGlobally(boolean enable) { 1151 lazySeekEnabledGlobally = enable; 1152 } 1153 1154 /** 1155 * @return The estimated number of KVs seen by this scanner (includes some skipped KVs). 1156 */ 1157 public long getEstimatedNumberOfKvsScanned() { 1158 return this.kvsScanned; 1159 } 1160 1161 @Override 1162 public Cell getNextIndexedKey() { 1163 return this.heap.getNextIndexedKey(); 1164 } 1165 1166 @Override 1167 public void shipped() throws IOException { 1168 if (prevCell != null) { 1169 // Do the copy here so that in case the prevCell ref is pointing to the previous 1170 // blocks we can safely release those blocks. 1171 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1172 // fetched from HDFS. Copying this would ensure that we let go the references to these 1173 // blocks so that they can be GCed safely(in case of bucket cache) 1174 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1175 } 1176 matcher.beforeShipped(); 1177 // There wont be further fetch of Cells from these scanners. Just close. 1178 clearAndClose(scannersForDelayedClose); 1179 if (this.heap != null) { 1180 this.heap.shipped(); 1181 // When switching from pread to stream, we will open a new scanner for each store file, but 1182 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1183 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1184 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1185 // method, so we here will also open new scanners and close old scanners in shipped method. 1186 // See HBASE-18055 for more details. 1187 trySwitchToStreamRead(); 1188 } 1189 } 1190} 1191