001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.NavigableSet; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.locks.ReentrantLock; 028 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.KeyValueUtil; 037import org.apache.hadoop.hbase.client.IsolationLevel; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.executor.ExecutorService; 040import org.apache.hadoop.hbase.filter.Filter; 041import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 042import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 043import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 044import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 045import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 046import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.yetus.audience.InterfaceAudience; 049 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 056 057/** 058 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 059 * for a single row. 060 * <p> 061 * The implementation is not thread safe. So there will be no race between next and close. The only 062 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 063 * is a flush. 064 */ 065@InterfaceAudience.Private 066public class StoreScanner extends NonReversedNonLazyKeyValueScanner 067 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 068 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 069 // In unit tests, the store could be null 070 protected final HStore store; 071 private final CellComparator comparator; 072 private ScanQueryMatcher matcher; 073 protected KeyValueHeap heap; 074 private boolean cacheBlocks; 075 076 private long countPerRow = 0; 077 private int storeLimit = -1; 078 private int storeOffset = 0; 079 080 // Used to indicate that the scanner has closed (see HBASE-1107) 081 private volatile boolean closing = false; 082 private final boolean get; 083 private final boolean explicitColumnQuery; 084 private final boolean useRowColBloom; 085 /** 086 * A flag that enables StoreFileScanner parallel-seeking 087 */ 088 private boolean parallelSeekEnabled = false; 089 private ExecutorService executor; 090 private final Scan scan; 091 private final long oldestUnexpiredTS; 092 private final long now; 093 private final int minVersions; 094 private final long maxRowSize; 095 private final long cellsPerHeartbeatCheck; 096 @VisibleForTesting 097 long memstoreOnlyReads; 098 @VisibleForTesting 099 long mixedReads; 100 101 // 1) Collects all the KVHeap that are eagerly getting closed during the 102 // course of a scan 103 // 2) Collects the unused memstore scanners. If we close the memstore scanners 104 // before sending data to client, the chunk may be reclaimed by other 105 // updates and the data will be corrupt. 106 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 107 108 /** 109 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not 110 * KVs skipped via seeking to next row/column. TODO: estimate them? 111 */ 112 private long kvsScanned = 0; 113 private Cell prevCell = null; 114 115 private final long preadMaxBytes; 116 private long bytesRead; 117 118 /** We don't ever expect to change this, the constant is just for clarity. */ 119 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 120 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 121 "hbase.storescanner.parallel.seek.enable"; 122 123 /** Used during unit testing to ensure that lazy seek does save seek ops */ 124 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 125 126 /** 127 * The number of cells scanned in between timeout checks. Specifying a larger value means that 128 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 129 * timeout checks. 130 */ 131 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 132 "hbase.cells.scanned.per.heartbeat.check"; 133 134 /** 135 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 136 */ 137 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 138 139 /** 140 * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 141 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 142 * block size for this store. 143 */ 144 public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; 145 146 private final Scan.ReadType readType; 147 148 // A flag whether use pread for scan 149 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 150 private boolean scanUsePread; 151 // Indicates whether there was flush during the course of the scan 152 private volatile boolean flushed = false; 153 // generally we get one file from a flush 154 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 155 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 156 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 157 // The current list of scanners 158 @VisibleForTesting 159 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 160 // flush update lock 161 private final ReentrantLock flushLock = new ReentrantLock(); 162 // lock for closing. 163 private final ReentrantLock closeLock = new ReentrantLock(); 164 165 protected final long readPt; 166 private boolean topChanged = false; 167 168 /** An internal constructor. */ 169 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, 170 int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) { 171 this.readPt = readPt; 172 this.store = store; 173 this.cacheBlocks = cacheBlocks; 174 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 175 get = scan.isGetScan(); 176 explicitColumnQuery = numColumns > 0; 177 this.scan = scan; 178 this.now = EnvironmentEdgeManager.currentTime(); 179 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 180 this.minVersions = scanInfo.getMinVersions(); 181 182 // We look up row-column Bloom filters for multi-column queries as part of 183 // the seek operation. However, we also look the row-column Bloom filter 184 // for multi-row (non-"get") scans because this is not done in 185 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 186 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1); 187 this.maxRowSize = scanInfo.getTableMaxRowSize(); 188 if (get) { 189 this.readType = Scan.ReadType.PREAD; 190 this.scanUsePread = true; 191 } else if (scanType != ScanType.USER_SCAN) { 192 // For compaction scanners never use Pread as already we have stream based scanners on the 193 // store files to be compacted 194 this.readType = Scan.ReadType.STREAM; 195 this.scanUsePread = false; 196 } else { 197 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 198 this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT; 199 } else { 200 this.readType = scan.getReadType(); 201 } 202 // Always start with pread unless user specific stream. Will change to stream later if 203 // readType is default if the scan keeps running for a long time. 204 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 205 } 206 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 207 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 208 // Parallel seeking is on if the config allows and more there is more than one store file. 209 if (store != null && store.getStorefilesCount() > 1) { 210 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 211 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 212 this.parallelSeekEnabled = true; 213 this.executor = rsService.getExecutorService(); 214 } 215 } 216 } 217 218 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 219 this.currentScanners.addAll(scanners); 220 } 221 222 /** 223 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we 224 * are not in a compaction. 225 * 226 * @param store who we scan 227 * @param scan the spec 228 * @param columns which columns we are scanning 229 * @throws IOException 230 */ 231 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 232 long readPt) throws IOException { 233 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, 234 scan.getCacheBlocks(), ScanType.USER_SCAN); 235 if (columns != null && scan.isRaw()) { 236 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 237 } 238 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 239 store.getCoprocessorHost()); 240 241 store.addChangedReaderObserver(this); 242 243 List<KeyValueScanner> scanners = null; 244 try { 245 // Pass columns to try to filter out unnecessary StoreFiles. 246 scanners = selectScannersFrom(store, 247 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 248 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); 249 250 // Seek all scanners to the start of the Row (or if the exact matching row 251 // key does not exist, then to the start of the next matching Row). 252 // Always check bloom filter to optimize the top row seek for delete 253 // family marker. 254 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 255 parallelSeekEnabled); 256 257 // set storeLimit 258 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 259 260 // set rowOffset 261 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 262 addCurrentScanners(scanners); 263 // Combine all seeked scanners with a heap 264 resetKVHeap(scanners, comparator); 265 } catch (IOException e) { 266 clearAndClose(scanners); 267 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 268 // and might cause memory leak 269 store.deleteChangedReaderObserver(this); 270 throw e; 271 } 272 } 273 274 // a dummy scan instance for compaction. 275 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 276 277 /** 278 * Used for store file compaction and memstore compaction. 279 * <p> 280 * Opens a scanner across specified StoreFiles/MemStoreSegments. 281 * @param store who we scan 282 * @param scanners ancillary scanners 283 * @param smallestReadPoint the readPoint that we should use for tracking versions 284 */ 285 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 286 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 287 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 288 } 289 290 /** 291 * Used for compactions that drop deletes from a limited range of rows. 292 * <p> 293 * Opens a scanner across specified StoreFiles. 294 * @param store who we scan 295 * @param scanners ancillary scanners 296 * @param smallestReadPoint the readPoint that we should use for tracking versions 297 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 298 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 299 */ 300 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 301 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 302 byte[] dropDeletesToRow) throws IOException { 303 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 304 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 305 } 306 307 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 308 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 309 byte[] dropDeletesToRow) throws IOException { 310 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 311 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 312 assert scanType != ScanType.USER_SCAN; 313 matcher = 314 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 315 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 316 317 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 318 scanners = selectScannersFrom(store, scanners); 319 320 // Seek all scanners to the initial key 321 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 322 addCurrentScanners(scanners); 323 // Combine all seeked scanners with a heap 324 resetKVHeap(scanners, comparator); 325 } 326 327 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 328 throws IOException { 329 // Seek all scanners to the initial key 330 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 331 addCurrentScanners(scanners); 332 resetKVHeap(scanners, comparator); 333 } 334 335 // For mob compaction only as we do not have a Store instance when doing mob compaction. 336 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 337 List<? extends KeyValueScanner> scanners) throws IOException { 338 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 339 assert scanType != ScanType.USER_SCAN; 340 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 341 oldestUnexpiredTS, now, null, null, null); 342 seekAllScanner(scanInfo, scanners); 343 } 344 345 // Used to instantiate a scanner for user scan in test 346 @VisibleForTesting 347 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 348 List<? extends KeyValueScanner> scanners) throws IOException { 349 // 0 is passed as readpoint because the test bypasses Store 350 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, 351 scan.getCacheBlocks(), ScanType.USER_SCAN); 352 this.matcher = 353 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 354 seekAllScanner(scanInfo, scanners); 355 } 356 357 // Used to instantiate a scanner for user scan in test 358 @VisibleForTesting 359 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 360 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 361 // 0 is passed as readpoint because the test bypasses Store 362 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 363 scanType); 364 if (scanType == ScanType.USER_SCAN) { 365 this.matcher = 366 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 367 } else { 368 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 369 HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 370 } 371 seekAllScanner(scanInfo, scanners); 372 } 373 374 // Used to instantiate a scanner for compaction in test 375 @VisibleForTesting 376 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 377 List<? extends KeyValueScanner> scanners) throws IOException { 378 // 0 is passed as readpoint because the test bypasses Store 379 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) 380 : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType); 381 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 382 HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 383 seekAllScanner(scanInfo, scanners); 384 } 385 386 @VisibleForTesting 387 boolean isScanUsePread() { 388 return this.scanUsePread; 389 } 390 /** 391 * Seek the specified scanners with the given key 392 * @param scanners 393 * @param seekKey 394 * @param isLazy true if using lazy seek 395 * @param isParallelSeek true if using parallel seek 396 * @throws IOException 397 */ 398 protected void seekScanners(List<? extends KeyValueScanner> scanners, 399 Cell seekKey, boolean isLazy, boolean isParallelSeek) 400 throws IOException { 401 // Seek all scanners to the start of the Row (or if the exact matching row 402 // key does not exist, then to the start of the next matching Row). 403 // Always check bloom filter to optimize the top row seek for delete 404 // family marker. 405 if (isLazy) { 406 for (KeyValueScanner scanner : scanners) { 407 scanner.requestSeek(seekKey, false, true); 408 } 409 } else { 410 if (!isParallelSeek) { 411 long totalScannersSoughtBytes = 0; 412 for (KeyValueScanner scanner : scanners) { 413 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 414 throw new RowTooBigException("Max row size allowed: " + maxRowSize 415 + ", but row is bigger than that"); 416 } 417 scanner.seek(seekKey); 418 Cell c = scanner.peek(); 419 if (c != null) { 420 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 421 } 422 } 423 } else { 424 parallelSeek(scanners, seekKey); 425 } 426 } 427 } 428 429 @VisibleForTesting 430 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, 431 CellComparator comparator) throws IOException { 432 // Combine all seeked scanners with a heap 433 heap = newKVHeap(scanners, comparator); 434 } 435 436 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 437 CellComparator comparator) throws IOException { 438 return new KeyValueHeap(scanners, comparator); 439 } 440 441 /** 442 * Filters the given list of scanners using Bloom filter, time range, and TTL. 443 * <p> 444 * Will be overridden by testcase so declared as protected. 445 */ 446 @VisibleForTesting 447 protected List<KeyValueScanner> selectScannersFrom(HStore store, 448 List<? extends KeyValueScanner> allScanners) { 449 boolean memOnly; 450 boolean filesOnly; 451 if (scan instanceof InternalScan) { 452 InternalScan iscan = (InternalScan) scan; 453 memOnly = iscan.isCheckOnlyMemStore(); 454 filesOnly = iscan.isCheckOnlyStoreFiles(); 455 } else { 456 memOnly = false; 457 filesOnly = false; 458 } 459 460 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 461 462 // We can only exclude store files based on TTL if minVersions is set to 0. 463 // Otherwise, we might have to return KVs that have technically expired. 464 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 465 466 // include only those scan files which pass all filters 467 for (KeyValueScanner kvs : allScanners) { 468 boolean isFile = kvs.isFileScanner(); 469 if ((!isFile && filesOnly) || (isFile && memOnly)) { 470 continue; 471 } 472 473 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 474 scanners.add(kvs); 475 } else { 476 kvs.close(); 477 } 478 } 479 return scanners; 480 } 481 482 @Override 483 public Cell peek() { 484 return heap != null ? heap.peek() : null; 485 } 486 487 @Override 488 public KeyValue next() { 489 // throw runtime exception perhaps? 490 throw new RuntimeException("Never call StoreScanner.next()"); 491 } 492 493 @Override 494 public void close() { 495 close(true); 496 } 497 498 private void close(boolean withDelayedScannersClose) { 499 closeLock.lock(); 500 // If the closeLock is acquired then any subsequent updateReaders() 501 // call is ignored. 502 try { 503 if (this.closing) { 504 return; 505 } 506 if (withDelayedScannersClose) { 507 this.closing = true; 508 } 509 // For mob compaction, we do not have a store. 510 if (this.store != null) { 511 this.store.deleteChangedReaderObserver(this); 512 } 513 if (withDelayedScannersClose) { 514 clearAndClose(scannersForDelayedClose); 515 clearAndClose(memStoreScannersAfterFlush); 516 clearAndClose(flushedstoreFileScanners); 517 if (this.heap != null) { 518 this.heap.close(); 519 this.currentScanners.clear(); 520 this.heap = null; // CLOSED! 521 } 522 } else { 523 if (this.heap != null) { 524 this.scannersForDelayedClose.add(this.heap); 525 this.currentScanners.clear(); 526 this.heap = null; 527 } 528 } 529 } finally { 530 closeLock.unlock(); 531 } 532 } 533 534 @Override 535 public boolean seek(Cell key) throws IOException { 536 if (checkFlushed()) { 537 reopenAfterFlush(); 538 } 539 return this.heap.seek(key); 540 } 541 542 /** 543 * Get the next row of values from this Store. 544 * @param outResult 545 * @param scannerContext 546 * @return true if there are more rows, false if scanner is done 547 */ 548 @Override 549 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { 550 if (scannerContext == null) { 551 throw new IllegalArgumentException("Scanner context cannot be null"); 552 } 553 if (checkFlushed() && reopenAfterFlush()) { 554 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 555 } 556 557 // if the heap was left null, then the scanners had previously run out anyways, close and 558 // return. 559 if (this.heap == null) { 560 // By this time partial close should happened because already heap is null 561 close(false);// Do all cleanup except heap.close() 562 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 563 } 564 565 Cell cell = this.heap.peek(); 566 if (cell == null) { 567 close(false);// Do all cleanup except heap.close() 568 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 569 } 570 571 // only call setRow if the row changes; avoids confusing the query matcher 572 // if scanning intra-row 573 574 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 575 // rows. Else it is possible we are still traversing the same row so we must perform the row 576 // comparison. 577 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 578 this.countPerRow = 0; 579 matcher.setToNewRow(cell); 580 } 581 582 // Clear progress away unless invoker has indicated it should be kept. 583 if (!scannerContext.getKeepProgress()) { 584 scannerContext.clearProgress(); 585 } 586 587 int count = 0; 588 long totalBytesRead = 0; 589 boolean onlyFromMemstore = matcher.isUserScan(); 590 try { 591 LOOP: do { 592 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 593 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 594 // in 595 // the shipped method below. 596 if (kvsScanned % cellsPerHeartbeatCheck == 0 597 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { 598 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 599 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 600 } 601 } 602 // Do object compare - we set prevKV from the same heap. 603 if (prevCell != cell) { 604 ++kvsScanned; 605 } 606 checkScanOrder(prevCell, cell, comparator); 607 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 608 bytesRead += cellSize; 609 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 610 // return immediately if we want to switch from pread to stream. We need this because we 611 // can 612 // only switch in the shipped method, if user use a filter to filter out everything and 613 // rpc 614 // timeout is very large then the shipped method will never be called until the whole scan 615 // is finished, but at that time we have already scan all the data... 616 // See HBASE-20457 for more details. 617 // And there is still a scenario that can not be handled. If we have a very large row, 618 // which 619 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 620 // here, we still need to scan all the qualifiers before returning... 621 scannerContext.returnImmediately(); 622 } 623 prevCell = cell; 624 scannerContext.setLastPeekedCell(cell); 625 topChanged = false; 626 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 627 switch (qcode) { 628 case INCLUDE: 629 case INCLUDE_AND_SEEK_NEXT_ROW: 630 case INCLUDE_AND_SEEK_NEXT_COL: 631 632 Filter f = matcher.getFilter(); 633 if (f != null) { 634 cell = f.transformCell(cell); 635 } 636 this.countPerRow++; 637 if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { 638 // do what SEEK_NEXT_ROW does. 639 if (!matcher.moreRowsMayExistAfter(cell)) { 640 close(false);// Do all cleanup except heap.close() 641 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 642 } 643 matcher.clearCurrentRow(); 644 seekToNextRow(cell); 645 break LOOP; 646 } 647 648 // add to results only if we have skipped #storeOffset kvs 649 // also update metric accordingly 650 if (this.countPerRow > storeOffset) { 651 outResult.add(cell); 652 653 // Update local tracking information 654 count++; 655 totalBytesRead += cellSize; 656 657 /** 658 * Increment the metric if all the cells are from memstore. 659 * If not we will account it for mixed reads 660 */ 661 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 662 // Update the progress of the scanner context 663 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 664 scannerContext.incrementBatchProgress(1); 665 666 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 667 String message = "Max row size allowed: " + maxRowSize 668 + ", but the row is bigger than that, the row info: " 669 + CellUtil.toString(cell, false) + ", already have process row cells = " 670 + outResult.size() + ", it belong to region = " 671 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 672 LOG.warn(message); 673 throw new RowTooBigException(message); 674 } 675 } 676 677 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 678 if (!matcher.moreRowsMayExistAfter(cell)) { 679 close(false);// Do all cleanup except heap.close() 680 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 681 } 682 matcher.clearCurrentRow(); 683 seekOrSkipToNextRow(cell); 684 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 685 seekOrSkipToNextColumn(cell); 686 } else { 687 this.heap.next(); 688 } 689 690 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 691 break LOOP; 692 } 693 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 694 break LOOP; 695 } 696 continue; 697 698 case DONE: 699 // Optimization for Gets! If DONE, no more to get on this row, early exit! 700 if (get) { 701 // Then no more to this row... exit. 702 close(false);// Do all cleanup except heap.close() 703 // update metric 704 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 705 } 706 matcher.clearCurrentRow(); 707 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 708 709 case DONE_SCAN: 710 close(false);// Do all cleanup except heap.close() 711 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 712 713 case SEEK_NEXT_ROW: 714 // This is just a relatively simple end of scan fix, to short-cut end 715 // us if there is an endKey in the scan. 716 if (!matcher.moreRowsMayExistAfter(cell)) { 717 close(false);// Do all cleanup except heap.close() 718 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 719 } 720 matcher.clearCurrentRow(); 721 seekOrSkipToNextRow(cell); 722 NextState stateAfterSeekNextRow = needToReturn(outResult); 723 if (stateAfterSeekNextRow != null) { 724 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 725 } 726 break; 727 728 case SEEK_NEXT_COL: 729 seekOrSkipToNextColumn(cell); 730 NextState stateAfterSeekNextColumn = needToReturn(outResult); 731 if (stateAfterSeekNextColumn != null) { 732 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 733 } 734 break; 735 736 case SKIP: 737 this.heap.next(); 738 break; 739 740 case SEEK_NEXT_USING_HINT: 741 Cell nextKV = matcher.getNextKeyHint(cell); 742 if (nextKV != null && comparator.compare(nextKV, cell) > 0) { 743 seekAsDirection(nextKV); 744 NextState stateAfterSeekByHint = needToReturn(outResult); 745 if (stateAfterSeekByHint != null) { 746 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 747 } 748 } else { 749 heap.next(); 750 } 751 break; 752 753 default: 754 throw new RuntimeException("UNEXPECTED"); 755 } 756 } while ((cell = this.heap.peek()) != null); 757 758 if (count > 0) { 759 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 760 } 761 762 // No more keys 763 close(false);// Do all cleanup except heap.close() 764 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 765 } finally { 766 // increment only if we have some result 767 if (count > 0 && matcher.isUserScan()) { 768 // if true increment memstore metrics, if not the mixed one 769 updateMetricsStore(onlyFromMemstore); 770 } 771 } 772 } 773 774 private void updateMetricsStore(boolean memstoreRead) { 775 if (store != null) { 776 store.updateMetricsStore(memstoreRead); 777 } else { 778 // for testing. 779 if (memstoreRead) { 780 memstoreOnlyReads++; 781 } else { 782 mixedReads++; 783 } 784 } 785 } 786 787 /** 788 * If the top cell won't be flushed into disk, the new top cell may be 789 * changed after #reopenAfterFlush. Because the older top cell only exist 790 * in the memstore scanner but the memstore scanner is replaced by hfile 791 * scanner after #reopenAfterFlush. If the row of top cell is changed, 792 * we should return the current cells. Otherwise, we may return 793 * the cells across different rows. 794 * @param outResult the cells which are visible for user scan 795 * @return null is the top cell doesn't change. Otherwise, the NextState 796 * to return 797 */ 798 private NextState needToReturn(List<Cell> outResult) { 799 if (!outResult.isEmpty() && topChanged) { 800 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 801 } 802 return null; 803 } 804 805 private void seekOrSkipToNextRow(Cell cell) throws IOException { 806 // If it is a Get Scan, then we know that we are done with this row; there are no more 807 // rows beyond the current one: don't try to optimize. 808 if (!get) { 809 if (trySkipToNextRow(cell)) { 810 return; 811 } 812 } 813 seekToNextRow(cell); 814 } 815 816 private void seekOrSkipToNextColumn(Cell cell) throws IOException { 817 if (!trySkipToNextColumn(cell)) { 818 seekAsDirection(matcher.getKeyForNextColumn(cell)); 819 } 820 } 821 822 /** 823 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 824 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, 825 * or seek to an arbitrary seek key. This method decides whether a seek is the most efficient 826 * _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, 827 * SKIP inside the current, loaded block). 828 * It does this by looking at the next indexed key of the current HFile. This key 829 * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key 830 * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with 831 * the current Cell but compare as though it were a seek key; see down in 832 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the 833 * next block we *_SEEK, otherwise we just SKIP to the next requested cell. 834 * 835 * <p>Other notes: 836 * <ul> 837 * <li>Rows can straddle block boundaries</li> 838 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 839 * different block than column C1 at T2)</li> 840 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a 841 * few SKIPs...</li> 842 * <li>We want to SEEK when the chance is high that we'll be able to seek 843 * past many Cells, especially if we know we need to go to the next block.</li> 844 * </ul> 845 * <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether 846 * we'll likely end up seeking to the next block (or past the next block) to get our next column. 847 * Example: 848 * <pre> 849 * | BLOCK 1 | BLOCK 2 | 850 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 851 * ^ ^ 852 * | | 853 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 854 * 855 * 856 * | BLOCK 1 | BLOCK 2 | 857 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 858 * ^ ^ 859 * | | 860 * Next Index Key SEEK_NEXT_COL 861 * </pre> 862 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 863 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 864 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 865 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 866 * where the SEEK will not land us in the next block, it is very likely better to issues a series 867 * of SKIPs. 868 * @param cell current cell 869 * @return true means skip to next row, false means not 870 */ 871 @VisibleForTesting 872 protected boolean trySkipToNextRow(Cell cell) throws IOException { 873 Cell nextCell = null; 874 // used to guard against a changed next indexed key by doing a identity comparison 875 // when the identity changes we need to compare the bytes again 876 Cell previousIndexedKey = null; 877 do { 878 Cell nextIndexedKey = getNextIndexedKey(); 879 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && 880 (nextIndexedKey == previousIndexedKey || 881 matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) { 882 this.heap.next(); 883 ++kvsScanned; 884 previousIndexedKey = nextIndexedKey; 885 } else { 886 return false; 887 } 888 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 889 return true; 890 } 891 892 /** 893 * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)} 894 * @param cell current cell 895 * @return true means skip to next column, false means not 896 */ 897 @VisibleForTesting 898 protected boolean trySkipToNextColumn(Cell cell) throws IOException { 899 Cell nextCell = null; 900 // used to guard against a changed next indexed key by doing a identity comparison 901 // when the identity changes we need to compare the bytes again 902 Cell previousIndexedKey = null; 903 do { 904 Cell nextIndexedKey = getNextIndexedKey(); 905 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && 906 (nextIndexedKey == previousIndexedKey || 907 matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) { 908 this.heap.next(); 909 ++kvsScanned; 910 previousIndexedKey = nextIndexedKey; 911 } else { 912 return false; 913 } 914 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 915 // We need this check because it may happen that the new scanner that we get 916 // during heap.next() is requiring reseek due of fake KV previously generated for 917 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 918 if (nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) { 919 return false; 920 } 921 return true; 922 } 923 924 @Override 925 public long getReadPoint() { 926 return this.readPt; 927 } 928 929 private static void clearAndClose(List<KeyValueScanner> scanners) { 930 if (scanners == null) { 931 return; 932 } 933 for (KeyValueScanner s : scanners) { 934 s.close(); 935 } 936 scanners.clear(); 937 } 938 939 // Implementation of ChangedReadersObserver 940 @Override 941 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 942 throws IOException { 943 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 944 return; 945 } 946 boolean updateReaders = false; 947 flushLock.lock(); 948 try { 949 if (!closeLock.tryLock()) { 950 // The reason for doing this is that when the current store scanner does not retrieve 951 // any new cells, then the scanner is considered to be done. The heap of this scanner 952 // is not closed till the shipped() call is completed. Hence in that case if at all 953 // the partial close (close (false)) has been called before updateReaders(), there is no 954 // need for the updateReaders() to happen. 955 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 956 // no lock acquired. 957 clearAndClose(memStoreScanners); 958 return; 959 } 960 // lock acquired 961 updateReaders = true; 962 if (this.closing) { 963 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 964 clearAndClose(memStoreScanners); 965 return; 966 } 967 flushed = true; 968 final boolean isCompaction = false; 969 boolean usePread = get || scanUsePread; 970 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 971 // calls next(). So its better we create scanners here rather than next() call. Ensure 972 // these scanners are properly closed() whether or not the scan is completed successfully 973 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 974 // store files. In case of stream scanners this eager creation does not induce performance 975 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 976 List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread, 977 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); 978 flushedstoreFileScanners.addAll(scanners); 979 if (!CollectionUtils.isEmpty(memStoreScanners)) { 980 clearAndClose(memStoreScannersAfterFlush); 981 memStoreScannersAfterFlush.addAll(memStoreScanners); 982 } 983 } finally { 984 flushLock.unlock(); 985 if (updateReaders) { 986 closeLock.unlock(); 987 } 988 } 989 // Let the next() call handle re-creating and seeking 990 } 991 992 /** 993 * @return if top of heap has changed (and KeyValueHeap has to try the next KV) 994 */ 995 protected final boolean reopenAfterFlush() throws IOException { 996 // here we can make sure that we have a Store instance so no null check on store. 997 Cell lastTop = heap.peek(); 998 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 999 // scanners? We did so in the constructor and we could have done it now by storing the scan 1000 // object from the constructor 1001 List<KeyValueScanner> scanners; 1002 flushLock.lock(); 1003 try { 1004 List<KeyValueScanner> allScanners = 1005 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1006 allScanners.addAll(flushedstoreFileScanners); 1007 allScanners.addAll(memStoreScannersAfterFlush); 1008 scanners = selectScannersFrom(store, allScanners); 1009 // Clear the current set of flushed store files scanners so that they don't get added again 1010 flushedstoreFileScanners.clear(); 1011 memStoreScannersAfterFlush.clear(); 1012 } finally { 1013 flushLock.unlock(); 1014 } 1015 1016 // Seek the new scanners to the last key 1017 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1018 // remove the older memstore scanner 1019 for (int i = currentScanners.size() - 1; i >=0; i--) { 1020 if (!currentScanners.get(i).isFileScanner()) { 1021 scannersForDelayedClose.add(currentScanners.remove(i)); 1022 } else { 1023 // we add the memstore scanner to the end of currentScanners 1024 break; 1025 } 1026 } 1027 // add the newly created scanners on the flushed files and the current active memstore scanner 1028 addCurrentScanners(scanners); 1029 // Combine all seeked scanners with a heap 1030 resetKVHeap(this.currentScanners, store.getComparator()); 1031 resetQueryMatcher(lastTop); 1032 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1033 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() + 1034 ",and after = " + heap.peek()); 1035 topChanged = true; 1036 } else { 1037 topChanged = false; 1038 } 1039 return topChanged; 1040 } 1041 1042 private void resetQueryMatcher(Cell lastTopKey) { 1043 // Reset the state of the Query Matcher and set to top row. 1044 // Only reset and call setRow if the row changes; avoids confusing the 1045 // query matcher if scanning intra-row. 1046 Cell cell = heap.peek(); 1047 if (cell == null) { 1048 cell = lastTopKey; 1049 } 1050 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1051 this.countPerRow = 0; 1052 // The setToNewRow will call reset internally 1053 matcher.setToNewRow(cell); 1054 } 1055 } 1056 1057 /** 1058 * Check whether scan as expected order 1059 * @param prevKV 1060 * @param kv 1061 * @param comparator 1062 * @throws IOException 1063 */ 1064 protected void checkScanOrder(Cell prevKV, Cell kv, 1065 CellComparator comparator) throws IOException { 1066 // Check that the heap gives us KVs in an increasing order. 1067 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key " 1068 + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1069 } 1070 1071 protected boolean seekToNextRow(Cell c) throws IOException { 1072 return reseek(PrivateCellUtil.createLastOnRow(c)); 1073 } 1074 1075 /** 1076 * Do a reseek in a normal StoreScanner(scan forward) 1077 * @param kv 1078 * @return true if scanner has values left, false if end of scanner 1079 * @throws IOException 1080 */ 1081 protected boolean seekAsDirection(Cell kv) 1082 throws IOException { 1083 return reseek(kv); 1084 } 1085 1086 @Override 1087 public boolean reseek(Cell kv) throws IOException { 1088 if (checkFlushed()) { 1089 reopenAfterFlush(); 1090 } 1091 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1092 return heap.requestSeek(kv, true, useRowColBloom); 1093 } 1094 return heap.reseek(kv); 1095 } 1096 1097 @VisibleForTesting 1098 void trySwitchToStreamRead() { 1099 if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || 1100 heap.peek() == null || bytesRead < preadMaxBytes) { 1101 return; 1102 } 1103 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1104 this.store.getColumnFamilyName()); 1105 scanUsePread = false; 1106 Cell lastTop = heap.peek(); 1107 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1108 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1109 for (KeyValueScanner kvs : currentScanners) { 1110 if (!kvs.isFileScanner()) { 1111 // collect memstorescanners here 1112 memstoreScanners.add(kvs); 1113 } else { 1114 scannersToClose.add(kvs); 1115 } 1116 } 1117 List<KeyValueScanner> fileScanners = null; 1118 List<KeyValueScanner> newCurrentScanners; 1119 KeyValueHeap newHeap; 1120 try { 1121 // We must have a store instance here so no null check 1122 // recreate the scanners on the current file scanners 1123 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, 1124 matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 1125 scan.includeStopRow(), readPt, false); 1126 if (fileScanners == null) { 1127 return; 1128 } 1129 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1130 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1131 newCurrentScanners.addAll(fileScanners); 1132 newCurrentScanners.addAll(memstoreScanners); 1133 newHeap = newKVHeap(newCurrentScanners, comparator); 1134 } catch (Exception e) { 1135 LOG.warn("failed to switch to stream read", e); 1136 if (fileScanners != null) { 1137 fileScanners.forEach(KeyValueScanner::close); 1138 } 1139 return; 1140 } 1141 currentScanners.clear(); 1142 addCurrentScanners(newCurrentScanners); 1143 this.heap = newHeap; 1144 resetQueryMatcher(lastTop); 1145 scannersToClose.forEach(KeyValueScanner::close); 1146 } 1147 1148 protected final boolean checkFlushed() { 1149 // check the var without any lock. Suppose even if we see the old 1150 // value here still it is ok to continue because we will not be resetting 1151 // the heap but will continue with the referenced memstore's snapshot. For compactions 1152 // any way we don't need the updateReaders at all to happen as we still continue with 1153 // the older files 1154 if (flushed) { 1155 // If there is a flush and the current scan is notified on the flush ensure that the 1156 // scan's heap gets reset and we do a seek on the newly flushed file. 1157 if (this.closing) { 1158 return false; 1159 } 1160 // reset the flag 1161 flushed = false; 1162 return true; 1163 } 1164 return false; 1165 } 1166 1167 1168 /** 1169 * Seek storefiles in parallel to optimize IO latency as much as possible 1170 * @param scanners the list {@link KeyValueScanner}s to be read from 1171 * @param kv the KeyValue on which the operation is being requested 1172 * @throws IOException 1173 */ 1174 private void parallelSeek(final List<? extends KeyValueScanner> 1175 scanners, final Cell kv) throws IOException { 1176 if (scanners.isEmpty()) return; 1177 int storeFileScannerCount = scanners.size(); 1178 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1179 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1180 for (KeyValueScanner scanner : scanners) { 1181 if (scanner instanceof StoreFileScanner) { 1182 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, 1183 this.readPt, latch); 1184 executor.submit(seekHandler); 1185 handlers.add(seekHandler); 1186 } else { 1187 scanner.seek(kv); 1188 latch.countDown(); 1189 } 1190 } 1191 1192 try { 1193 latch.await(); 1194 } catch (InterruptedException ie) { 1195 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 1196 } 1197 1198 for (ParallelSeekHandler handler : handlers) { 1199 if (handler.getErr() != null) { 1200 throw new IOException(handler.getErr()); 1201 } 1202 } 1203 } 1204 1205 /** 1206 * Used in testing. 1207 * @return all scanners in no particular order 1208 */ 1209 @VisibleForTesting 1210 List<KeyValueScanner> getAllScannersForTesting() { 1211 List<KeyValueScanner> allScanners = new ArrayList<>(); 1212 KeyValueScanner current = heap.getCurrentForTesting(); 1213 if (current != null) 1214 allScanners.add(current); 1215 for (KeyValueScanner scanner : heap.getHeap()) 1216 allScanners.add(scanner); 1217 return allScanners; 1218 } 1219 1220 static void enableLazySeekGlobally(boolean enable) { 1221 lazySeekEnabledGlobally = enable; 1222 } 1223 1224 /** 1225 * @return The estimated number of KVs seen by this scanner (includes some skipped KVs). 1226 */ 1227 public long getEstimatedNumberOfKvsScanned() { 1228 return this.kvsScanned; 1229 } 1230 1231 @Override 1232 public Cell getNextIndexedKey() { 1233 return this.heap.getNextIndexedKey(); 1234 } 1235 1236 @Override 1237 public void shipped() throws IOException { 1238 if (prevCell != null) { 1239 // Do the copy here so that in case the prevCell ref is pointing to the previous 1240 // blocks we can safely release those blocks. 1241 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1242 // fetched from HDFS. Copying this would ensure that we let go the references to these 1243 // blocks so that they can be GCed safely(in case of bucket cache) 1244 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1245 } 1246 matcher.beforeShipped(); 1247 // There wont be further fetch of Cells from these scanners. Just close. 1248 clearAndClose(scannersForDelayedClose); 1249 if (this.heap != null) { 1250 this.heap.shipped(); 1251 // When switching from pread to stream, we will open a new scanner for each store file, but 1252 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1253 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1254 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1255 // method, so we here will also open new scanners and close old scanners in shipped method. 1256 // See HBASE-18055 for more details. 1257 trySwitchToStreamRead(); 1258 } 1259 } 1260}