001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.Optional; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.locks.ReentrantLock; 029import java.util.function.IntConsumer; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.ExtendedCell; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.PrivateConstants; 041import org.apache.hadoop.hbase.client.IsolationLevel; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.conf.ConfigKey; 044import org.apache.hadoop.hbase.executor.ExecutorService; 045import org.apache.hadoop.hbase.filter.Filter; 046import org.apache.hadoop.hbase.ipc.RpcCall; 047import org.apache.hadoop.hbase.ipc.RpcServer; 048import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 049import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 050import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 051import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 052import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 053import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 061 062/** 063 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 064 * for a single row. 065 * <p> 066 * The implementation is not thread safe. So there will be no race between next and close. The only 067 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 068 * is a flush. 069 */ 070@InterfaceAudience.Private 071public class StoreScanner extends NonReversedNonLazyKeyValueScanner 072 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 073 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 074 // In unit tests, the store could be null 075 protected final HStore store; 076 private final CellComparator comparator; 077 private ScanQueryMatcher matcher; 078 protected KeyValueHeap heap; 079 private boolean cacheBlocks; 080 081 private long countPerRow = 0; 082 private int storeLimit = -1; 083 private int storeOffset = 0; 084 085 // Used to indicate that the scanner has closed (see HBASE-1107) 086 private volatile boolean closing = false; 087 private final boolean get; 088 private final boolean explicitColumnQuery; 089 private final boolean useRowColBloom; 090 /** 091 * A flag that enables StoreFileScanner parallel-seeking 092 */ 093 private boolean parallelSeekEnabled = false; 094 private ExecutorService executor; 095 private final Scan scan; 096 private final long oldestUnexpiredTS; 097 private final long now; 098 private final int minVersions; 099 private final long maxRowSize; 100 private final long cellsPerHeartbeatCheck; 101 long memstoreOnlyReads; 102 long mixedReads; 103 104 // 1) Collects all the KVHeap that are eagerly getting closed during the 105 // course of a scan 106 // 2) Collects the unused memstore scanners. If we close the memstore scanners 107 // before sending data to client, the chunk may be reclaimed by other 108 // updates and the data will be corrupt. 109 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 110 111 /** 112 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via 113 * seeking to next row/column. TODO: estimate them? 114 */ 115 private long kvsScanned = 0; 116 private ExtendedCell prevCell = null; 117 118 private final long preadMaxBytes; 119 private long bytesRead; 120 121 /** We don't ever expect to change this, the constant is just for clarity. */ 122 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 123 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 124 "hbase.storescanner.parallel.seek.enable"; 125 126 /** Used during unit testing to ensure that lazy seek does save seek ops */ 127 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 128 129 /** 130 * The number of cells scanned in between timeout checks. Specifying a larger value means that 131 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 132 * timeout checks. 133 */ 134 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 135 ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check"); 136 137 /** 138 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 139 */ 140 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 141 142 /** 143 * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 144 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 145 * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT, 146 * we will open scanner with stream mode itself. 147 */ 148 public static final String STORESCANNER_PREAD_MAX_BYTES = 149 ConfigKey.LONG("hbase.storescanner.pread.max.bytes"); 150 151 private final Scan.ReadType readType; 152 153 // A flag whether use pread for scan 154 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 155 private boolean scanUsePread; 156 // Indicates whether there was flush during the course of the scan 157 private volatile boolean flushed = false; 158 // generally we get one file from a flush 159 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 160 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 161 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 162 // The current list of scanners 163 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 164 // flush update lock 165 private final ReentrantLock flushLock = new ReentrantLock(); 166 // lock for closing. 167 private final ReentrantLock closeLock = new ReentrantLock(); 168 169 protected final long readPt; 170 private boolean topChanged = false; 171 172 // These are used to verify the state of the scanner during testing. 173 private static AtomicBoolean hasUpdatedReaders; 174 private static AtomicBoolean hasSwitchedToStreamRead; 175 176 /** An internal constructor. */ 177 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, 178 boolean cacheBlocks, ScanType scanType) { 179 this.readPt = readPt; 180 this.store = store; 181 this.cacheBlocks = cacheBlocks; 182 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 183 get = scan.isGetScan(); 184 explicitColumnQuery = numColumns > 0; 185 this.scan = scan; 186 this.now = EnvironmentEdgeManager.currentTime(); 187 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 188 this.minVersions = scanInfo.getMinVersions(); 189 190 // We look up row-column Bloom filters for multi-column queries as part of 191 // the seek operation. However, we also look the row-column Bloom filter 192 // for multi-row (non-"get") scans because this is not done in 193 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 194 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null 195 || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); 196 this.maxRowSize = scanInfo.getTableMaxRowSize(); 197 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 198 if (get) { 199 this.readType = Scan.ReadType.PREAD; 200 this.scanUsePread = true; 201 } else if (scanType != ScanType.USER_SCAN) { 202 // For compaction scanners never use Pread as already we have stream based scanners on the 203 // store files to be compacted 204 this.readType = Scan.ReadType.STREAM; 205 this.scanUsePread = false; 206 } else { 207 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 208 if (scanInfo.isUsePread()) { 209 this.readType = Scan.ReadType.PREAD; 210 } else if (this.preadMaxBytes < 0) { 211 this.readType = Scan.ReadType.STREAM; 212 } else { 213 this.readType = Scan.ReadType.DEFAULT; 214 } 215 } else { 216 this.readType = scan.getReadType(); 217 } 218 // Always start with pread unless user specific stream. Will change to stream later if 219 // readType is default if the scan keeps running for a long time. 220 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 221 } 222 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 223 // Parallel seeking is on if the config allows and more there is more than one store file. 224 if (store != null && store.getStorefilesCount() > 1) { 225 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 226 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 227 this.parallelSeekEnabled = true; 228 this.executor = rsService.getExecutorService(); 229 } 230 } 231 } 232 233 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 234 this.currentScanners.addAll(scanners); 235 } 236 237 private static boolean isOnlyLatestVersionScan(Scan scan) { 238 // No need to check for Scan#getMaxVersions because live version files generated by store file 239 // writer retains max versions specified in ColumnFamilyDescriptor for the given CF 240 return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; 241 } 242 243 /** 244 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a 245 * compaction. 246 * @param store who we scan 247 * @param scan the spec 248 * @param columns which columns we are scanning 249 */ 250 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 251 long readPt) throws IOException { 252 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), 253 ScanType.USER_SCAN); 254 if (columns != null && scan.isRaw()) { 255 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 256 } 257 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 258 store.getCoprocessorHost()); 259 260 store.addChangedReaderObserver(this); 261 262 List<KeyValueScanner> scanners = null; 263 try { 264 // Pass columns to try to filter out unnecessary StoreFiles. 265 scanners = selectScannersFrom(store, 266 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 267 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt, 268 isOnlyLatestVersionScan(scan))); 269 270 // Seek all scanners to the start of the Row (or if the exact matching row 271 // key does not exist, then to the start of the next matching Row). 272 // Always check bloom filter to optimize the top row seek for delete 273 // family marker. 274 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 275 parallelSeekEnabled); 276 277 // set storeLimit 278 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 279 280 // set rowOffset 281 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 282 addCurrentScanners(scanners); 283 // Combine all seeked scanners with a heap 284 resetKVHeap(scanners, comparator); 285 } catch (IOException e) { 286 clearAndClose(scanners); 287 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 288 // and might cause memory leak 289 store.deleteChangedReaderObserver(this); 290 throw e; 291 } 292 } 293 294 // a dummy scan instance for compaction. 295 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 296 297 /** 298 * Used for store file compaction and memstore compaction. 299 * <p> 300 * Opens a scanner across specified StoreFiles/MemStoreSegments. 301 * @param store who we scan 302 * @param scanners ancillary scanners 303 * @param smallestReadPoint the readPoint that we should use for tracking versions 304 */ 305 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 306 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 307 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 308 } 309 310 /** 311 * Used for compactions that drop deletes from a limited range of rows. 312 * <p> 313 * Opens a scanner across specified StoreFiles. 314 * @param store who we scan 315 * @param scanners ancillary scanners 316 * @param smallestReadPoint the readPoint that we should use for tracking versions 317 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 318 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 319 */ 320 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 321 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 322 throws IOException { 323 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 324 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 325 } 326 327 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 328 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 329 byte[] dropDeletesToRow) throws IOException { 330 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 331 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 332 assert scanType != ScanType.USER_SCAN; 333 matcher = 334 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 335 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 336 337 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 338 scanners = selectScannersFrom(store, scanners); 339 340 // Seek all scanners to the initial key 341 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 342 addCurrentScanners(scanners); 343 // Combine all seeked scanners with a heap 344 resetKVHeap(scanners, comparator); 345 } 346 347 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 348 throws IOException { 349 // Seek all scanners to the initial key 350 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 351 addCurrentScanners(scanners); 352 resetKVHeap(scanners, comparator); 353 } 354 355 // For mob compaction only as we do not have a Store instance when doing mob compaction. 356 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 357 List<? extends KeyValueScanner> scanners) throws IOException { 358 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 359 assert scanType != ScanType.USER_SCAN; 360 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 361 oldestUnexpiredTS, now, null, null, null); 362 seekAllScanner(scanInfo, scanners); 363 } 364 365 // Used to instantiate a scanner for user scan in test 366 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 367 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 368 // 0 is passed as readpoint because the test bypasses Store 369 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 370 scanType); 371 if (scanType == ScanType.USER_SCAN) { 372 this.matcher = 373 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 374 } else { 375 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 376 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 377 } 378 seekAllScanner(scanInfo, scanners); 379 } 380 381 // Used to instantiate a scanner for user scan in test 382 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 383 List<? extends KeyValueScanner> scanners) throws IOException { 384 // 0 is passed as readpoint because the test bypasses Store 385 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 386 ScanType.USER_SCAN); 387 this.matcher = 388 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 389 seekAllScanner(scanInfo, scanners); 390 } 391 392 // Used to instantiate a scanner for compaction in test 393 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 394 List<? extends KeyValueScanner> scanners) throws IOException { 395 // 0 is passed as readpoint because the test bypasses Store 396 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION, 397 scanInfo, 0, 0L, false, scanType); 398 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 399 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 400 seekAllScanner(scanInfo, scanners); 401 } 402 403 boolean isScanUsePread() { 404 return this.scanUsePread; 405 } 406 407 /** 408 * Seek the specified scanners with the given key 409 * @param isLazy true if using lazy seek 410 * @param isParallelSeek true if using parallel seek 411 */ 412 protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey, 413 boolean isLazy, boolean isParallelSeek) throws IOException { 414 // Seek all scanners to the start of the Row (or if the exact matching row 415 // key does not exist, then to the start of the next matching Row). 416 // Always check bloom filter to optimize the top row seek for delete 417 // family marker. 418 if (isLazy) { 419 for (KeyValueScanner scanner : scanners) { 420 scanner.requestSeek(seekKey, false, true); 421 } 422 } else { 423 if (!isParallelSeek) { 424 long totalScannersSoughtBytes = 0; 425 for (KeyValueScanner scanner : scanners) { 426 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 427 throw new RowTooBigException( 428 "Max row size allowed: " + maxRowSize + ", but row is bigger than that"); 429 } 430 scanner.seek(seekKey); 431 Cell c = scanner.peek(); 432 if (c != null) { 433 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 434 } 435 } 436 } else { 437 parallelSeek(scanners, seekKey); 438 } 439 } 440 } 441 442 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 443 throws IOException { 444 // Combine all seeked scanners with a heap 445 heap = newKVHeap(scanners, comparator); 446 } 447 448 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 449 CellComparator comparator) throws IOException { 450 return new KeyValueHeap(scanners, comparator); 451 } 452 453 /** 454 * Filters the given list of scanners using Bloom filter, time range, and TTL. 455 * <p> 456 * Will be overridden by testcase so declared as protected. 457 */ 458 protected List<KeyValueScanner> selectScannersFrom(HStore store, 459 List<? extends KeyValueScanner> allScanners) { 460 boolean memOnly; 461 boolean filesOnly; 462 if (scan instanceof InternalScan) { 463 InternalScan iscan = (InternalScan) scan; 464 memOnly = iscan.isCheckOnlyMemStore(); 465 filesOnly = iscan.isCheckOnlyStoreFiles(); 466 } else { 467 memOnly = false; 468 filesOnly = false; 469 } 470 471 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 472 473 // We can only exclude store files based on TTL if minVersions is set to 0. 474 // Otherwise, we might have to return KVs that have technically expired. 475 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 476 477 // include only those scan files which pass all filters 478 for (KeyValueScanner kvs : allScanners) { 479 boolean isFile = kvs.isFileScanner(); 480 if ((!isFile && filesOnly) || (isFile && memOnly)) { 481 kvs.close(); 482 continue; 483 } 484 485 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 486 scanners.add(kvs); 487 } else { 488 kvs.close(); 489 } 490 } 491 return scanners; 492 } 493 494 @Override 495 public ExtendedCell peek() { 496 return heap != null ? heap.peek() : null; 497 } 498 499 @Override 500 public KeyValue next() { 501 // throw runtime exception perhaps? 502 throw new RuntimeException("Never call StoreScanner.next()"); 503 } 504 505 @Override 506 public void close() { 507 close(true); 508 } 509 510 private void close(boolean withDelayedScannersClose) { 511 closeLock.lock(); 512 // If the closeLock is acquired then any subsequent updateReaders() 513 // call is ignored. 514 try { 515 if (this.closing) { 516 return; 517 } 518 if (withDelayedScannersClose) { 519 this.closing = true; 520 } 521 // For mob compaction, we do not have a store. 522 if (this.store != null) { 523 this.store.deleteChangedReaderObserver(this); 524 } 525 if (withDelayedScannersClose) { 526 clearAndClose(scannersForDelayedClose); 527 clearAndClose(memStoreScannersAfterFlush); 528 clearAndClose(flushedstoreFileScanners); 529 if (this.heap != null) { 530 this.heap.close(); 531 this.currentScanners.clear(); 532 this.heap = null; // CLOSED! 533 } 534 } else { 535 if (this.heap != null) { 536 this.scannersForDelayedClose.add(this.heap); 537 this.currentScanners.clear(); 538 this.heap = null; 539 } 540 } 541 } finally { 542 closeLock.unlock(); 543 } 544 } 545 546 @Override 547 public boolean seek(ExtendedCell key) throws IOException { 548 if (checkFlushed()) { 549 reopenAfterFlush(); 550 } 551 return this.heap.seek(key); 552 } 553 554 /** 555 * Get the next row of values from this Store. 556 * @return true if there are more rows, false if scanner is done 557 */ 558 @Override 559 public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext) 560 throws IOException { 561 if (scannerContext == null) { 562 throw new IllegalArgumentException("Scanner context cannot be null"); 563 } 564 if (checkFlushed() && reopenAfterFlush()) { 565 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 566 } 567 568 // if the heap was left null, then the scanners had previously run out anyways, close and 569 // return. 570 if (this.heap == null) { 571 // By this time partial close should happened because already heap is null 572 close(false);// Do all cleanup except heap.close() 573 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 574 } 575 576 ExtendedCell cell = this.heap.peek(); 577 if (cell == null) { 578 close(false);// Do all cleanup except heap.close() 579 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 580 } 581 582 // only call setRow if the row changes; avoids confusing the query matcher 583 // if scanning intra-row 584 585 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 586 // rows. Else it is possible we are still traversing the same row so we must perform the row 587 // comparison. 588 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 589 this.countPerRow = 0; 590 matcher.setToNewRow(cell); 591 } 592 593 // Clear progress away unless invoker has indicated it should be kept. 594 if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { 595 scannerContext.clearProgress(); 596 } 597 598 Optional<RpcCall> rpcCall = 599 matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); 600 // re-useable closure to avoid allocations 601 IntConsumer recordBlockSize = blockSize -> { 602 if (rpcCall.isPresent()) { 603 rpcCall.get().incrementBlockBytesScanned(blockSize); 604 } 605 scannerContext.incrementBlockProgress(blockSize); 606 }; 607 608 int count = 0; 609 long totalBytesRead = 0; 610 // track the cells for metrics only if it is a user read request. 611 boolean onlyFromMemstore = matcher.isUserScan(); 612 try { 613 LOOP: do { 614 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 615 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 616 // in 617 // the shipped method below. 618 if ( 619 kvsScanned % cellsPerHeartbeatCheck == 0 620 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) 621 ) { 622 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 623 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 624 } 625 } 626 // Do object compare - we set prevKV from the same heap. 627 if (prevCell != cell) { 628 ++kvsScanned; 629 } 630 checkScanOrder(prevCell, cell, comparator); 631 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 632 bytesRead += cellSize; 633 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 634 // return immediately if we want to switch from pread to stream. We need this because we 635 // can 636 // only switch in the shipped method, if user use a filter to filter out everything and 637 // rpc 638 // timeout is very large then the shipped method will never be called until the whole scan 639 // is finished, but at that time we have already scan all the data... 640 // See HBASE-20457 for more details. 641 // And there is still a scenario that can not be handled. If we have a very large row, 642 // which 643 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 644 // here, we still need to scan all the qualifiers before returning... 645 scannerContext.returnImmediately(); 646 } 647 648 heap.recordBlockSize(recordBlockSize); 649 650 prevCell = cell; 651 scannerContext.setLastPeekedCell(cell); 652 topChanged = false; 653 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 654 switch (qcode) { 655 case INCLUDE: 656 case INCLUDE_AND_SEEK_NEXT_ROW: 657 case INCLUDE_AND_SEEK_NEXT_COL: 658 Filter f = matcher.getFilter(); 659 if (f != null) { 660 Cell transformedCell = f.transformCell(cell); 661 // fast path, most filters just return the same cell instance 662 if (transformedCell != cell) { 663 if (transformedCell instanceof ExtendedCell) { 664 cell = (ExtendedCell) transformedCell; 665 } else { 666 throw new DoNotRetryIOException("Incorrect filter implementation, " 667 + "the Cell returned by transformCell is not an ExtendedCell. Filter class: " 668 + f.getClass().getName()); 669 } 670 } 671 } 672 this.countPerRow++; 673 674 // add to results only if we have skipped #storeOffset kvs 675 // also update metric accordingly 676 if (this.countPerRow > storeOffset) { 677 outResult.add(cell); 678 679 // Update local tracking information 680 count++; 681 totalBytesRead += cellSize; 682 683 /** 684 * Increment the metric if all the cells are from memstore. If not we will account it 685 * for mixed reads 686 */ 687 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 688 // Update the progress of the scanner context 689 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 690 scannerContext.incrementBatchProgress(1); 691 692 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 693 String message = "Max row size allowed: " + maxRowSize 694 + ", but the row is bigger than that, the row info: " 695 + CellUtil.toString(cell, false) + ", already have process row cells = " 696 + outResult.size() + ", it belong to region = " 697 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 698 LOG.warn(message); 699 throw new RowTooBigException(message); 700 } 701 702 if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) { 703 // do what SEEK_NEXT_ROW does. 704 if (!matcher.moreRowsMayExistAfter(cell)) { 705 close(false);// Do all cleanup except heap.close() 706 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 707 } 708 matcher.clearCurrentRow(); 709 seekToNextRow(cell); 710 break LOOP; 711 } 712 } 713 714 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 715 if (!matcher.moreRowsMayExistAfter(cell)) { 716 close(false);// Do all cleanup except heap.close() 717 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 718 } 719 matcher.clearCurrentRow(); 720 seekOrSkipToNextRow(cell); 721 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 722 seekOrSkipToNextColumn(cell); 723 } else { 724 this.heap.next(); 725 } 726 727 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 728 break LOOP; 729 } 730 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 731 break LOOP; 732 } 733 continue; 734 735 case DONE: 736 // Optimization for Gets! If DONE, no more to get on this row, early exit! 737 if (get) { 738 // Then no more to this row... exit. 739 close(false);// Do all cleanup except heap.close() 740 // update metric 741 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 742 } 743 matcher.clearCurrentRow(); 744 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 745 746 case DONE_SCAN: 747 close(false);// Do all cleanup except heap.close() 748 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 749 750 case SEEK_NEXT_ROW: 751 // This is just a relatively simple end of scan fix, to short-cut end 752 // us if there is an endKey in the scan. 753 if (!matcher.moreRowsMayExistAfter(cell)) { 754 close(false);// Do all cleanup except heap.close() 755 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 756 } 757 matcher.clearCurrentRow(); 758 seekOrSkipToNextRow(cell); 759 NextState stateAfterSeekNextRow = needToReturn(); 760 if (stateAfterSeekNextRow != null) { 761 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 762 } 763 break; 764 765 case SEEK_NEXT_COL: 766 seekOrSkipToNextColumn(cell); 767 NextState stateAfterSeekNextColumn = needToReturn(); 768 if (stateAfterSeekNextColumn != null) { 769 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 770 } 771 break; 772 773 case SKIP: 774 this.heap.next(); 775 break; 776 777 case SEEK_NEXT_USING_HINT: 778 ExtendedCell nextKV = matcher.getNextKeyHint(cell); 779 if (nextKV != null) { 780 int difference = comparator.compare(nextKV, cell); 781 if ( 782 ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) 783 ) { 784 seekAsDirection(nextKV); 785 NextState stateAfterSeekByHint = needToReturn(); 786 if (stateAfterSeekByHint != null) { 787 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 788 } 789 break; 790 } 791 } 792 heap.next(); 793 break; 794 795 default: 796 throw new RuntimeException("UNEXPECTED"); 797 } 798 799 // One last chance to break due to size limit. The INCLUDE* cases above already check 800 // limit and continue. For the various filtered cases, we need to check because block 801 // size limit may have been exceeded even if we don't add cells to result list. 802 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 803 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 804 } 805 } while ((cell = this.heap.peek()) != null); 806 807 if (count > 0) { 808 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 809 } 810 811 // No more keys 812 close(false);// Do all cleanup except heap.close() 813 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 814 } finally { 815 // increment only if we have some result 816 if (count > 0 && matcher.isUserScan()) { 817 // if true increment memstore metrics, if not the mixed one 818 updateMetricsStore(onlyFromMemstore); 819 } 820 } 821 } 822 823 private void updateMetricsStore(boolean memstoreRead) { 824 if (store != null) { 825 store.updateMetricsStore(memstoreRead); 826 } else { 827 // for testing. 828 if (memstoreRead) { 829 memstoreOnlyReads++; 830 } else { 831 mixedReads++; 832 } 833 } 834 } 835 836 /** 837 * If the top cell won't be flushed into disk, the new top cell may be changed after 838 * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the 839 * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell 840 * is changed, we should return the current cells. Otherwise, we may return the cells across 841 * different rows. 842 * @return null is the top cell doesn't change. Otherwise, the NextState to return 843 */ 844 private NextState needToReturn() { 845 if (topChanged) { 846 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 847 } 848 return null; 849 } 850 851 private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException { 852 // If it is a Get Scan, then we know that we are done with this row; there are no more 853 // rows beyond the current one: don't try to optimize. 854 if (!get) { 855 if (trySkipToNextRow(cell)) { 856 return; 857 } 858 } 859 seekToNextRow(cell); 860 } 861 862 private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException { 863 if (!trySkipToNextColumn(cell)) { 864 seekAsDirection(matcher.getKeyForNextColumn(cell)); 865 } 866 } 867 868 /** 869 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 870 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an 871 * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to 872 * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the 873 * current, loaded block). It does this by looking at the next indexed key of the current HFile. 874 * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible 875 * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work 876 * with the current Cell but compare as though it were a seek key; see down in 877 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK, 878 * otherwise we just SKIP to the next requested cell. 879 * <p> 880 * Other notes: 881 * <ul> 882 * <li>Rows can straddle block boundaries</li> 883 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 884 * different block than column C1 at T2)</li> 885 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few 886 * SKIPs...</li> 887 * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells, 888 * especially if we know we need to go to the next block.</li> 889 * </ul> 890 * <p> 891 * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll 892 * likely end up seeking to the next block (or past the next block) to get our next column. 893 * Example: 894 * 895 * <pre> 896 * | BLOCK 1 | BLOCK 2 | 897 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 898 * ^ ^ 899 * | | 900 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 901 * 902 * 903 * | BLOCK 1 | BLOCK 2 | 904 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 905 * ^ ^ 906 * | | 907 * Next Index Key SEEK_NEXT_COL 908 * </pre> 909 * 910 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 911 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 912 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 913 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 914 * where the SEEK will not land us in the next block, it is very likely better to issues a series 915 * of SKIPs. 916 * @param cell current cell 917 * @return true means skip to next row, false means not 918 */ 919 protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException { 920 ExtendedCell nextCell = null; 921 // used to guard against a changed next indexed key by doing a identity comparison 922 // when the identity changes we need to compare the bytes again 923 ExtendedCell previousIndexedKey = null; 924 do { 925 ExtendedCell nextIndexedKey = getNextIndexedKey(); 926 if ( 927 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 928 && (nextIndexedKey == previousIndexedKey 929 || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) 930 ) { 931 this.heap.next(); 932 ++kvsScanned; 933 previousIndexedKey = nextIndexedKey; 934 } else { 935 return false; 936 } 937 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 938 return true; 939 } 940 941 /** 942 * See {@link #trySkipToNextRow(ExtendedCell)} 943 * @param cell current cell 944 * @return true means skip to next column, false means not 945 */ 946 protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException { 947 ExtendedCell nextCell = null; 948 // used to guard against a changed next indexed key by doing a identity comparison 949 // when the identity changes we need to compare the bytes again 950 ExtendedCell previousIndexedKey = null; 951 do { 952 ExtendedCell nextIndexedKey = getNextIndexedKey(); 953 if ( 954 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 955 && (nextIndexedKey == previousIndexedKey 956 || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) 957 ) { 958 this.heap.next(); 959 ++kvsScanned; 960 previousIndexedKey = nextIndexedKey; 961 } else { 962 return false; 963 } 964 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 965 // We need this check because it may happen that the new scanner that we get 966 // during heap.next() is requiring reseek due of fake KV previously generated for 967 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 968 if ( 969 useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP 970 ) { 971 return false; 972 } 973 return true; 974 } 975 976 @Override 977 public long getReadPoint() { 978 return this.readPt; 979 } 980 981 private static void clearAndClose(List<KeyValueScanner> scanners) { 982 if (scanners == null) { 983 return; 984 } 985 for (KeyValueScanner s : scanners) { 986 s.close(); 987 } 988 scanners.clear(); 989 } 990 991 // Implementation of ChangedReadersObserver 992 @Override 993 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 994 throws IOException { 995 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 996 return; 997 } 998 boolean updateReaders = false; 999 flushLock.lock(); 1000 try { 1001 if (!closeLock.tryLock()) { 1002 // The reason for doing this is that when the current store scanner does not retrieve 1003 // any new cells, then the scanner is considered to be done. The heap of this scanner 1004 // is not closed till the shipped() call is completed. Hence in that case if at all 1005 // the partial close (close (false)) has been called before updateReaders(), there is no 1006 // need for the updateReaders() to happen. 1007 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 1008 // no lock acquired. 1009 clearAndClose(memStoreScanners); 1010 return; 1011 } 1012 // lock acquired 1013 updateReaders = true; 1014 if (this.closing) { 1015 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 1016 clearAndClose(memStoreScanners); 1017 return; 1018 } 1019 flushed = true; 1020 final boolean isCompaction = false; 1021 boolean usePread = get || scanUsePread; 1022 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 1023 // calls next(). So its better we create scanners here rather than next() call. Ensure 1024 // these scanners are properly closed() whether or not the scan is completed successfully 1025 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 1026 // store files. In case of stream scanners this eager creation does not induce performance 1027 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 1028 List<KeyValueScanner> scanners = 1029 store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher, 1030 scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan)); 1031 flushedstoreFileScanners.addAll(scanners); 1032 if (!CollectionUtils.isEmpty(memStoreScanners)) { 1033 clearAndClose(memStoreScannersAfterFlush); 1034 memStoreScannersAfterFlush.addAll(memStoreScanners); 1035 } 1036 } finally { 1037 flushLock.unlock(); 1038 if (updateReaders) { 1039 closeLock.unlock(); 1040 } 1041 if (hasUpdatedReaders != null) { 1042 hasUpdatedReaders.set(true); 1043 } 1044 } 1045 // Let the next() call handle re-creating and seeking 1046 } 1047 1048 /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */ 1049 protected final boolean reopenAfterFlush() throws IOException { 1050 // here we can make sure that we have a Store instance so no null check on store. 1051 ExtendedCell lastTop = heap.peek(); 1052 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 1053 // scanners? We did so in the constructor and we could have done it now by storing the scan 1054 // object from the constructor 1055 List<KeyValueScanner> scanners; 1056 flushLock.lock(); 1057 try { 1058 List<KeyValueScanner> allScanners = 1059 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1060 allScanners.addAll(flushedstoreFileScanners); 1061 allScanners.addAll(memStoreScannersAfterFlush); 1062 scanners = selectScannersFrom(store, allScanners); 1063 // Clear the current set of flushed store files scanners so that they don't get added again 1064 flushedstoreFileScanners.clear(); 1065 memStoreScannersAfterFlush.clear(); 1066 } finally { 1067 flushLock.unlock(); 1068 } 1069 1070 // Seek the new scanners to the last key 1071 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1072 // remove the older memstore scanner 1073 for (int i = currentScanners.size() - 1; i >= 0; i--) { 1074 if (!currentScanners.get(i).isFileScanner()) { 1075 scannersForDelayedClose.add(currentScanners.remove(i)); 1076 } else { 1077 // we add the memstore scanner to the end of currentScanners 1078 break; 1079 } 1080 } 1081 // add the newly created scanners on the flushed files and the current active memstore scanner 1082 addCurrentScanners(scanners); 1083 // Combine all seeked scanners with a heap 1084 resetKVHeap(this.currentScanners, store.getComparator()); 1085 resetQueryMatcher(lastTop); 1086 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1087 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() 1088 + ",and after = " + heap.peek()); 1089 topChanged = true; 1090 } else { 1091 topChanged = false; 1092 } 1093 return topChanged; 1094 } 1095 1096 private void resetQueryMatcher(ExtendedCell lastTopKey) { 1097 // Reset the state of the Query Matcher and set to top row. 1098 // Only reset and call setRow if the row changes; avoids confusing the 1099 // query matcher if scanning intra-row. 1100 ExtendedCell cell = heap.peek(); 1101 if (cell == null) { 1102 cell = lastTopKey; 1103 } 1104 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1105 this.countPerRow = 0; 1106 // The setToNewRow will call reset internally 1107 matcher.setToNewRow(cell); 1108 } 1109 } 1110 1111 /** 1112 * Check whether scan as expected order 1113 */ 1114 protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) 1115 throws IOException { 1116 // Check that the heap gives us KVs in an increasing order. 1117 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 1118 : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1119 } 1120 1121 protected boolean seekToNextRow(ExtendedCell c) throws IOException { 1122 return reseek(PrivateCellUtil.createLastOnRow(c)); 1123 } 1124 1125 /** 1126 * Do a reseek in a normal StoreScanner(scan forward) 1127 * @return true if scanner has values left, false if end of scanner 1128 */ 1129 protected boolean seekAsDirection(ExtendedCell kv) throws IOException { 1130 return reseek(kv); 1131 } 1132 1133 @Override 1134 public boolean reseek(ExtendedCell kv) throws IOException { 1135 if (checkFlushed()) { 1136 reopenAfterFlush(); 1137 } 1138 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1139 return heap.requestSeek(kv, true, useRowColBloom); 1140 } 1141 return heap.reseek(kv); 1142 } 1143 1144 void trySwitchToStreamRead() { 1145 if ( 1146 readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null 1147 || bytesRead < preadMaxBytes 1148 ) { 1149 return; 1150 } 1151 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1152 this.store.getColumnFamilyName()); 1153 scanUsePread = false; 1154 ExtendedCell lastTop = heap.peek(); 1155 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1156 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1157 for (KeyValueScanner kvs : currentScanners) { 1158 if (!kvs.isFileScanner()) { 1159 // collect memstorescanners here 1160 memstoreScanners.add(kvs); 1161 } else { 1162 scannersToClose.add(kvs); 1163 } 1164 } 1165 List<KeyValueScanner> fileScanners = null; 1166 List<KeyValueScanner> newCurrentScanners; 1167 KeyValueHeap newHeap; 1168 try { 1169 // We must have a store instance here so no null check 1170 // recreate the scanners on the current file scanners 1171 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, 1172 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 1173 readPt, false); 1174 if (fileScanners == null) { 1175 return; 1176 } 1177 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1178 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1179 newCurrentScanners.addAll(fileScanners); 1180 newCurrentScanners.addAll(memstoreScanners); 1181 newHeap = newKVHeap(newCurrentScanners, comparator); 1182 } catch (Exception e) { 1183 LOG.warn("failed to switch to stream read", e); 1184 if (fileScanners != null) { 1185 fileScanners.forEach(KeyValueScanner::close); 1186 } 1187 return; 1188 } 1189 currentScanners.clear(); 1190 addCurrentScanners(newCurrentScanners); 1191 this.heap = newHeap; 1192 resetQueryMatcher(lastTop); 1193 scannersToClose.forEach(KeyValueScanner::close); 1194 if (hasSwitchedToStreamRead != null) { 1195 hasSwitchedToStreamRead.set(true); 1196 } 1197 } 1198 1199 protected final boolean checkFlushed() { 1200 // check the var without any lock. Suppose even if we see the old 1201 // value here still it is ok to continue because we will not be resetting 1202 // the heap but will continue with the referenced memstore's snapshot. For compactions 1203 // any way we don't need the updateReaders at all to happen as we still continue with 1204 // the older files 1205 if (flushed) { 1206 // If there is a flush and the current scan is notified on the flush ensure that the 1207 // scan's heap gets reset and we do a seek on the newly flushed file. 1208 if (this.closing) { 1209 return false; 1210 } 1211 // reset the flag 1212 flushed = false; 1213 return true; 1214 } 1215 return false; 1216 } 1217 1218 /** 1219 * Seek storefiles in parallel to optimize IO latency as much as possible 1220 * @param scanners the list {@link KeyValueScanner}s to be read from 1221 * @param kv the KeyValue on which the operation is being requested 1222 */ 1223 private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv) 1224 throws IOException { 1225 if (scanners.isEmpty()) return; 1226 int storeFileScannerCount = scanners.size(); 1227 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1228 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1229 for (KeyValueScanner scanner : scanners) { 1230 if (scanner instanceof StoreFileScanner) { 1231 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch); 1232 executor.submit(seekHandler); 1233 handlers.add(seekHandler); 1234 } else { 1235 scanner.seek(kv); 1236 latch.countDown(); 1237 } 1238 } 1239 1240 try { 1241 latch.await(); 1242 } catch (InterruptedException ie) { 1243 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 1244 } 1245 1246 for (ParallelSeekHandler handler : handlers) { 1247 if (handler.getErr() != null) { 1248 throw new IOException(handler.getErr()); 1249 } 1250 } 1251 } 1252 1253 /** 1254 * Used in testing. 1255 * @return all scanners in no particular order 1256 */ 1257 List<KeyValueScanner> getAllScannersForTesting() { 1258 List<KeyValueScanner> allScanners = new ArrayList<>(); 1259 KeyValueScanner current = heap.getCurrentForTesting(); 1260 if (current != null) allScanners.add(current); 1261 for (KeyValueScanner scanner : heap.getHeap()) 1262 allScanners.add(scanner); 1263 return allScanners; 1264 } 1265 1266 static void enableLazySeekGlobally(boolean enable) { 1267 lazySeekEnabledGlobally = enable; 1268 } 1269 1270 /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */ 1271 public long getEstimatedNumberOfKvsScanned() { 1272 return this.kvsScanned; 1273 } 1274 1275 @Override 1276 public ExtendedCell getNextIndexedKey() { 1277 return this.heap.getNextIndexedKey(); 1278 } 1279 1280 @Override 1281 public void shipped() throws IOException { 1282 if (prevCell != null) { 1283 // Do the copy here so that in case the prevCell ref is pointing to the previous 1284 // blocks we can safely release those blocks. 1285 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1286 // fetched from HDFS. Copying this would ensure that we let go the references to these 1287 // blocks so that they can be GCed safely(in case of bucket cache) 1288 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1289 } 1290 matcher.beforeShipped(); 1291 // There wont be further fetch of Cells from these scanners. Just close. 1292 clearAndClose(scannersForDelayedClose); 1293 if (this.heap != null) { 1294 this.heap.shipped(); 1295 // When switching from pread to stream, we will open a new scanner for each store file, but 1296 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1297 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1298 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1299 // method, so we here will also open new scanners and close old scanners in shipped method. 1300 // See HBASE-18055 for more details. 1301 trySwitchToStreamRead(); 1302 } 1303 } 1304 1305 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1306 static final void instrument() { 1307 hasUpdatedReaders = new AtomicBoolean(false); 1308 hasSwitchedToStreamRead = new AtomicBoolean(false); 1309 } 1310 1311 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1312 static final boolean hasUpdatedReaders() { 1313 return hasUpdatedReaders.get(); 1314 } 1315 1316 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1317 static final boolean hasSwitchedToStreamRead() { 1318 return hasSwitchedToStreamRead.get(); 1319 } 1320 1321 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1322 static final void resetHasUpdatedReaders() { 1323 hasUpdatedReaders.set(false); 1324 } 1325 1326 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1327 static final void resetHasSwitchedToStreamRead() { 1328 hasSwitchedToStreamRead.set(false); 1329 } 1330}