001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.Optional; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.locks.ReentrantLock; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.KeyValueUtil; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.PrivateConstants; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.executor.ExecutorService; 041import org.apache.hadoop.hbase.filter.Filter; 042import org.apache.hadoop.hbase.ipc.RpcCall; 043import org.apache.hadoop.hbase.ipc.RpcServer; 044import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 045import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 046import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 047import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 048import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 049import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 056import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 057 058/** 059 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 060 * for a single row. 061 * <p> 062 * The implementation is not thread safe. So there will be no race between next and close. The only 063 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 064 * is a flush. 065 */ 066@InterfaceAudience.Private 067public class StoreScanner extends NonReversedNonLazyKeyValueScanner 068 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 069 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 070 // In unit tests, the store could be null 071 protected final HStore store; 072 private final CellComparator comparator; 073 private ScanQueryMatcher matcher; 074 protected KeyValueHeap heap; 075 private boolean cacheBlocks; 076 077 private long countPerRow = 0; 078 private int storeLimit = -1; 079 private int storeOffset = 0; 080 081 // Used to indicate that the scanner has closed (see HBASE-1107) 082 private volatile boolean closing = false; 083 private final boolean get; 084 private final boolean explicitColumnQuery; 085 private final boolean useRowColBloom; 086 /** 087 * A flag that enables StoreFileScanner parallel-seeking 088 */ 089 private boolean parallelSeekEnabled = false; 090 private ExecutorService executor; 091 private final Scan scan; 092 private final long oldestUnexpiredTS; 093 private final long now; 094 private final int minVersions; 095 private final long maxRowSize; 096 private final long cellsPerHeartbeatCheck; 097 long memstoreOnlyReads; 098 long mixedReads; 099 100 // 1) Collects all the KVHeap that are eagerly getting closed during the 101 // course of a scan 102 // 2) Collects the unused memstore scanners. If we close the memstore scanners 103 // before sending data to client, the chunk may be reclaimed by other 104 // updates and the data will be corrupt. 105 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 106 107 /** 108 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via 109 * seeking to next row/column. TODO: estimate them? 110 */ 111 private long kvsScanned = 0; 112 private ExtendedCell prevCell = null; 113 114 private final long preadMaxBytes; 115 private long bytesRead; 116 117 /** We don't ever expect to change this, the constant is just for clarity. */ 118 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 119 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 120 "hbase.storescanner.parallel.seek.enable"; 121 122 /** Used during unit testing to ensure that lazy seek does save seek ops */ 123 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 124 125 /** 126 * The number of cells scanned in between timeout checks. Specifying a larger value means that 127 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 128 * timeout checks. 129 */ 130 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 131 "hbase.cells.scanned.per.heartbeat.check"; 132 133 /** 134 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 135 */ 136 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 137 138 /** 139 * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 140 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 141 * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT, 142 * we will open scanner with stream mode itself. 143 */ 144 public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; 145 146 private final Scan.ReadType readType; 147 148 // A flag whether use pread for scan 149 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 150 private boolean scanUsePread; 151 // Indicates whether there was flush during the course of the scan 152 private volatile boolean flushed = false; 153 // generally we get one file from a flush 154 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 155 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 156 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 157 // The current list of scanners 158 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 159 // flush update lock 160 private final ReentrantLock flushLock = new ReentrantLock(); 161 // lock for closing. 162 private final ReentrantLock closeLock = new ReentrantLock(); 163 164 protected final long readPt; 165 private boolean topChanged = false; 166 167 /** An internal constructor. */ 168 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, 169 boolean cacheBlocks, ScanType scanType) { 170 this.readPt = readPt; 171 this.store = store; 172 this.cacheBlocks = cacheBlocks; 173 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 174 get = scan.isGetScan(); 175 explicitColumnQuery = numColumns > 0; 176 this.scan = scan; 177 this.now = EnvironmentEdgeManager.currentTime(); 178 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 179 this.minVersions = scanInfo.getMinVersions(); 180 181 // We look up row-column Bloom filters for multi-column queries as part of 182 // the seek operation. However, we also look the row-column Bloom filter 183 // for multi-row (non-"get") scans because this is not done in 184 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 185 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null 186 || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); 187 this.maxRowSize = scanInfo.getTableMaxRowSize(); 188 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 189 if (get) { 190 this.readType = Scan.ReadType.PREAD; 191 this.scanUsePread = true; 192 } else if (scanType != ScanType.USER_SCAN) { 193 // For compaction scanners never use Pread as already we have stream based scanners on the 194 // store files to be compacted 195 this.readType = Scan.ReadType.STREAM; 196 this.scanUsePread = false; 197 } else { 198 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 199 if (scanInfo.isUsePread()) { 200 this.readType = Scan.ReadType.PREAD; 201 } else if (this.preadMaxBytes < 0) { 202 this.readType = Scan.ReadType.STREAM; 203 } else { 204 this.readType = Scan.ReadType.DEFAULT; 205 } 206 } else { 207 this.readType = scan.getReadType(); 208 } 209 // Always start with pread unless user specific stream. Will change to stream later if 210 // readType is default if the scan keeps running for a long time. 211 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 212 } 213 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 214 // Parallel seeking is on if the config allows and more there is more than one store file. 215 if (store != null && store.getStorefilesCount() > 1) { 216 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 217 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 218 this.parallelSeekEnabled = true; 219 this.executor = rsService.getExecutorService(); 220 } 221 } 222 } 223 224 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 225 this.currentScanners.addAll(scanners); 226 } 227 228 private static boolean isOnlyLatestVersionScan(Scan scan) { 229 // No need to check for Scan#getMaxVersions because live version files generated by store file 230 // writer retains max versions specified in ColumnFamilyDescriptor for the given CF 231 return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; 232 } 233 234 /** 235 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a 236 * compaction. 237 * @param store who we scan 238 * @param scan the spec 239 * @param columns which columns we are scanning 240 */ 241 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 242 long readPt) throws IOException { 243 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), 244 ScanType.USER_SCAN); 245 if (columns != null && scan.isRaw()) { 246 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 247 } 248 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 249 store.getCoprocessorHost()); 250 251 store.addChangedReaderObserver(this); 252 253 List<KeyValueScanner> scanners = null; 254 try { 255 // Pass columns to try to filter out unnecessary StoreFiles. 256 scanners = selectScannersFrom(store, 257 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 258 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt, 259 isOnlyLatestVersionScan(scan))); 260 261 // Seek all scanners to the start of the Row (or if the exact matching row 262 // key does not exist, then to the start of the next matching Row). 263 // Always check bloom filter to optimize the top row seek for delete 264 // family marker. 265 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 266 parallelSeekEnabled); 267 268 // set storeLimit 269 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 270 271 // set rowOffset 272 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 273 addCurrentScanners(scanners); 274 // Combine all seeked scanners with a heap 275 resetKVHeap(scanners, comparator); 276 } catch (IOException e) { 277 clearAndClose(scanners); 278 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 279 // and might cause memory leak 280 store.deleteChangedReaderObserver(this); 281 throw e; 282 } 283 } 284 285 // a dummy scan instance for compaction. 286 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 287 288 /** 289 * Used for store file compaction and memstore compaction. 290 * <p> 291 * Opens a scanner across specified StoreFiles/MemStoreSegments. 292 * @param store who we scan 293 * @param scanners ancillary scanners 294 * @param smallestReadPoint the readPoint that we should use for tracking versions 295 */ 296 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 297 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 298 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 299 } 300 301 /** 302 * Used for compactions that drop deletes from a limited range of rows. 303 * <p> 304 * Opens a scanner across specified StoreFiles. 305 * @param store who we scan 306 * @param scanners ancillary scanners 307 * @param smallestReadPoint the readPoint that we should use for tracking versions 308 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 309 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 310 */ 311 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 312 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 313 throws IOException { 314 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 315 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 316 } 317 318 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 319 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 320 byte[] dropDeletesToRow) throws IOException { 321 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 322 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 323 assert scanType != ScanType.USER_SCAN; 324 matcher = 325 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 326 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 327 328 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 329 scanners = selectScannersFrom(store, scanners); 330 331 // Seek all scanners to the initial key 332 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 333 addCurrentScanners(scanners); 334 // Combine all seeked scanners with a heap 335 resetKVHeap(scanners, comparator); 336 } 337 338 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 339 throws IOException { 340 // Seek all scanners to the initial key 341 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 342 addCurrentScanners(scanners); 343 resetKVHeap(scanners, comparator); 344 } 345 346 // For mob compaction only as we do not have a Store instance when doing mob compaction. 347 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 348 List<? extends KeyValueScanner> scanners) throws IOException { 349 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 350 assert scanType != ScanType.USER_SCAN; 351 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 352 oldestUnexpiredTS, now, null, null, null); 353 seekAllScanner(scanInfo, scanners); 354 } 355 356 // Used to instantiate a scanner for user scan in test 357 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 358 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 359 // 0 is passed as readpoint because the test bypasses Store 360 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 361 scanType); 362 if (scanType == ScanType.USER_SCAN) { 363 this.matcher = 364 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 365 } else { 366 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 367 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 368 } 369 seekAllScanner(scanInfo, scanners); 370 } 371 372 // Used to instantiate a scanner for user scan in test 373 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 374 List<? extends KeyValueScanner> scanners) throws IOException { 375 // 0 is passed as readpoint because the test bypasses Store 376 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 377 ScanType.USER_SCAN); 378 this.matcher = 379 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 380 seekAllScanner(scanInfo, scanners); 381 } 382 383 // Used to instantiate a scanner for compaction in test 384 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 385 List<? extends KeyValueScanner> scanners) throws IOException { 386 // 0 is passed as readpoint because the test bypasses Store 387 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION, 388 scanInfo, 0, 0L, false, scanType); 389 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 390 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 391 seekAllScanner(scanInfo, scanners); 392 } 393 394 boolean isScanUsePread() { 395 return this.scanUsePread; 396 } 397 398 /** 399 * Seek the specified scanners with the given key 400 * @param isLazy true if using lazy seek 401 * @param isParallelSeek true if using parallel seek 402 */ 403 protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey, 404 boolean isLazy, boolean isParallelSeek) throws IOException { 405 // Seek all scanners to the start of the Row (or if the exact matching row 406 // key does not exist, then to the start of the next matching Row). 407 // Always check bloom filter to optimize the top row seek for delete 408 // family marker. 409 if (isLazy) { 410 for (KeyValueScanner scanner : scanners) { 411 scanner.requestSeek(seekKey, false, true); 412 } 413 } else { 414 if (!isParallelSeek) { 415 long totalScannersSoughtBytes = 0; 416 for (KeyValueScanner scanner : scanners) { 417 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 418 throw new RowTooBigException( 419 "Max row size allowed: " + maxRowSize + ", but row is bigger than that"); 420 } 421 scanner.seek(seekKey); 422 Cell c = scanner.peek(); 423 if (c != null) { 424 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 425 } 426 } 427 } else { 428 parallelSeek(scanners, seekKey); 429 } 430 } 431 } 432 433 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 434 throws IOException { 435 // Combine all seeked scanners with a heap 436 heap = newKVHeap(scanners, comparator); 437 } 438 439 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 440 CellComparator comparator) throws IOException { 441 return new KeyValueHeap(scanners, comparator); 442 } 443 444 /** 445 * Filters the given list of scanners using Bloom filter, time range, and TTL. 446 * <p> 447 * Will be overridden by testcase so declared as protected. 448 */ 449 protected List<KeyValueScanner> selectScannersFrom(HStore store, 450 List<? extends KeyValueScanner> allScanners) { 451 boolean memOnly; 452 boolean filesOnly; 453 if (scan instanceof InternalScan) { 454 InternalScan iscan = (InternalScan) scan; 455 memOnly = iscan.isCheckOnlyMemStore(); 456 filesOnly = iscan.isCheckOnlyStoreFiles(); 457 } else { 458 memOnly = false; 459 filesOnly = false; 460 } 461 462 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 463 464 // We can only exclude store files based on TTL if minVersions is set to 0. 465 // Otherwise, we might have to return KVs that have technically expired. 466 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 467 468 // include only those scan files which pass all filters 469 for (KeyValueScanner kvs : allScanners) { 470 boolean isFile = kvs.isFileScanner(); 471 if ((!isFile && filesOnly) || (isFile && memOnly)) { 472 kvs.close(); 473 continue; 474 } 475 476 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 477 scanners.add(kvs); 478 } else { 479 kvs.close(); 480 } 481 } 482 return scanners; 483 } 484 485 @Override 486 public ExtendedCell peek() { 487 return heap != null ? heap.peek() : null; 488 } 489 490 @Override 491 public KeyValue next() { 492 // throw runtime exception perhaps? 493 throw new RuntimeException("Never call StoreScanner.next()"); 494 } 495 496 @Override 497 public void close() { 498 close(true); 499 } 500 501 private void close(boolean withDelayedScannersClose) { 502 closeLock.lock(); 503 // If the closeLock is acquired then any subsequent updateReaders() 504 // call is ignored. 505 try { 506 if (this.closing) { 507 return; 508 } 509 if (withDelayedScannersClose) { 510 this.closing = true; 511 } 512 // For mob compaction, we do not have a store. 513 if (this.store != null) { 514 this.store.deleteChangedReaderObserver(this); 515 } 516 if (withDelayedScannersClose) { 517 clearAndClose(scannersForDelayedClose); 518 clearAndClose(memStoreScannersAfterFlush); 519 clearAndClose(flushedstoreFileScanners); 520 if (this.heap != null) { 521 this.heap.close(); 522 this.currentScanners.clear(); 523 this.heap = null; // CLOSED! 524 } 525 } else { 526 if (this.heap != null) { 527 this.scannersForDelayedClose.add(this.heap); 528 this.currentScanners.clear(); 529 this.heap = null; 530 } 531 } 532 } finally { 533 closeLock.unlock(); 534 } 535 } 536 537 @Override 538 public boolean seek(ExtendedCell key) throws IOException { 539 if (checkFlushed()) { 540 reopenAfterFlush(); 541 } 542 return this.heap.seek(key); 543 } 544 545 /** 546 * Get the next row of values from this Store. 547 * @return true if there are more rows, false if scanner is done 548 */ 549 @Override 550 public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext) 551 throws IOException { 552 if (scannerContext == null) { 553 throw new IllegalArgumentException("Scanner context cannot be null"); 554 } 555 if (checkFlushed() && reopenAfterFlush()) { 556 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 557 } 558 559 // if the heap was left null, then the scanners had previously run out anyways, close and 560 // return. 561 if (this.heap == null) { 562 // By this time partial close should happened because already heap is null 563 close(false);// Do all cleanup except heap.close() 564 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 565 } 566 567 ExtendedCell cell = this.heap.peek(); 568 if (cell == null) { 569 close(false);// Do all cleanup except heap.close() 570 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 571 } 572 573 // only call setRow if the row changes; avoids confusing the query matcher 574 // if scanning intra-row 575 576 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 577 // rows. Else it is possible we are still traversing the same row so we must perform the row 578 // comparison. 579 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 580 this.countPerRow = 0; 581 matcher.setToNewRow(cell); 582 } 583 584 // Clear progress away unless invoker has indicated it should be kept. 585 if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { 586 scannerContext.clearProgress(); 587 } 588 589 Optional<RpcCall> rpcCall = 590 matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); 591 592 int count = 0; 593 long totalBytesRead = 0; 594 // track the cells for metrics only if it is a user read request. 595 boolean onlyFromMemstore = matcher.isUserScan(); 596 try { 597 LOOP: do { 598 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 599 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 600 // in 601 // the shipped method below. 602 if ( 603 kvsScanned % cellsPerHeartbeatCheck == 0 604 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) 605 ) { 606 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 607 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 608 } 609 } 610 // Do object compare - we set prevKV from the same heap. 611 if (prevCell != cell) { 612 ++kvsScanned; 613 } 614 checkScanOrder(prevCell, cell, comparator); 615 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 616 bytesRead += cellSize; 617 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 618 // return immediately if we want to switch from pread to stream. We need this because we 619 // can 620 // only switch in the shipped method, if user use a filter to filter out everything and 621 // rpc 622 // timeout is very large then the shipped method will never be called until the whole scan 623 // is finished, but at that time we have already scan all the data... 624 // See HBASE-20457 for more details. 625 // And there is still a scenario that can not be handled. If we have a very large row, 626 // which 627 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 628 // here, we still need to scan all the qualifiers before returning... 629 scannerContext.returnImmediately(); 630 } 631 632 heap.recordBlockSize(blockSize -> { 633 if (rpcCall.isPresent()) { 634 rpcCall.get().incrementBlockBytesScanned(blockSize); 635 } 636 scannerContext.incrementBlockProgress(blockSize); 637 }); 638 639 prevCell = cell; 640 scannerContext.setLastPeekedCell(cell); 641 topChanged = false; 642 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 643 switch (qcode) { 644 case INCLUDE: 645 case INCLUDE_AND_SEEK_NEXT_ROW: 646 case INCLUDE_AND_SEEK_NEXT_COL: 647 Filter f = matcher.getFilter(); 648 if (f != null) { 649 Cell transformedCell = f.transformCell(cell); 650 // fast path, most filters just return the same cell instance 651 if (transformedCell != cell) { 652 if (transformedCell instanceof ExtendedCell) { 653 cell = (ExtendedCell) transformedCell; 654 } else { 655 throw new DoNotRetryIOException("Incorrect filter implementation, " 656 + "the Cell returned by transformCell is not an ExtendedCell. Filter class: " 657 + f.getClass().getName()); 658 } 659 } 660 } 661 this.countPerRow++; 662 663 // add to results only if we have skipped #storeOffset kvs 664 // also update metric accordingly 665 if (this.countPerRow > storeOffset) { 666 outResult.add(cell); 667 668 // Update local tracking information 669 count++; 670 totalBytesRead += cellSize; 671 672 /** 673 * Increment the metric if all the cells are from memstore. If not we will account it 674 * for mixed reads 675 */ 676 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 677 // Update the progress of the scanner context 678 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 679 scannerContext.incrementBatchProgress(1); 680 681 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 682 String message = "Max row size allowed: " + maxRowSize 683 + ", but the row is bigger than that, the row info: " 684 + CellUtil.toString(cell, false) + ", already have process row cells = " 685 + outResult.size() + ", it belong to region = " 686 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 687 LOG.warn(message); 688 throw new RowTooBigException(message); 689 } 690 691 if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) { 692 // do what SEEK_NEXT_ROW does. 693 if (!matcher.moreRowsMayExistAfter(cell)) { 694 close(false);// Do all cleanup except heap.close() 695 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 696 } 697 matcher.clearCurrentRow(); 698 seekToNextRow(cell); 699 break LOOP; 700 } 701 } 702 703 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 704 if (!matcher.moreRowsMayExistAfter(cell)) { 705 close(false);// Do all cleanup except heap.close() 706 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 707 } 708 matcher.clearCurrentRow(); 709 seekOrSkipToNextRow(cell); 710 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 711 seekOrSkipToNextColumn(cell); 712 } else { 713 this.heap.next(); 714 } 715 716 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 717 break LOOP; 718 } 719 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 720 break LOOP; 721 } 722 continue; 723 724 case DONE: 725 // Optimization for Gets! If DONE, no more to get on this row, early exit! 726 if (get) { 727 // Then no more to this row... exit. 728 close(false);// Do all cleanup except heap.close() 729 // update metric 730 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 731 } 732 matcher.clearCurrentRow(); 733 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 734 735 case DONE_SCAN: 736 close(false);// Do all cleanup except heap.close() 737 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 738 739 case SEEK_NEXT_ROW: 740 // This is just a relatively simple end of scan fix, to short-cut end 741 // us if there is an endKey in the scan. 742 if (!matcher.moreRowsMayExistAfter(cell)) { 743 close(false);// Do all cleanup except heap.close() 744 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 745 } 746 matcher.clearCurrentRow(); 747 seekOrSkipToNextRow(cell); 748 NextState stateAfterSeekNextRow = needToReturn(outResult); 749 if (stateAfterSeekNextRow != null) { 750 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 751 } 752 break; 753 754 case SEEK_NEXT_COL: 755 seekOrSkipToNextColumn(cell); 756 NextState stateAfterSeekNextColumn = needToReturn(outResult); 757 if (stateAfterSeekNextColumn != null) { 758 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 759 } 760 break; 761 762 case SKIP: 763 this.heap.next(); 764 break; 765 766 case SEEK_NEXT_USING_HINT: 767 ExtendedCell nextKV = matcher.getNextKeyHint(cell); 768 if (nextKV != null) { 769 int difference = comparator.compare(nextKV, cell); 770 if ( 771 ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) 772 ) { 773 seekAsDirection(nextKV); 774 NextState stateAfterSeekByHint = needToReturn(outResult); 775 if (stateAfterSeekByHint != null) { 776 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 777 } 778 break; 779 } 780 } 781 heap.next(); 782 break; 783 784 default: 785 throw new RuntimeException("UNEXPECTED"); 786 } 787 788 // One last chance to break due to size limit. The INCLUDE* cases above already check 789 // limit and continue. For the various filtered cases, we need to check because block 790 // size limit may have been exceeded even if we don't add cells to result list. 791 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 792 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 793 } 794 } while ((cell = this.heap.peek()) != null); 795 796 if (count > 0) { 797 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 798 } 799 800 // No more keys 801 close(false);// Do all cleanup except heap.close() 802 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 803 } finally { 804 // increment only if we have some result 805 if (count > 0 && matcher.isUserScan()) { 806 // if true increment memstore metrics, if not the mixed one 807 updateMetricsStore(onlyFromMemstore); 808 } 809 } 810 } 811 812 private void updateMetricsStore(boolean memstoreRead) { 813 if (store != null) { 814 store.updateMetricsStore(memstoreRead); 815 } else { 816 // for testing. 817 if (memstoreRead) { 818 memstoreOnlyReads++; 819 } else { 820 mixedReads++; 821 } 822 } 823 } 824 825 /** 826 * If the top cell won't be flushed into disk, the new top cell may be changed after 827 * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the 828 * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell 829 * is changed, we should return the current cells. Otherwise, we may return the cells across 830 * different rows. 831 * @param outResult the cells which are visible for user scan 832 * @return null is the top cell doesn't change. Otherwise, the NextState to return 833 */ 834 private NextState needToReturn(List<? super ExtendedCell> outResult) { 835 if (!outResult.isEmpty() && topChanged) { 836 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 837 } 838 return null; 839 } 840 841 private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException { 842 // If it is a Get Scan, then we know that we are done with this row; there are no more 843 // rows beyond the current one: don't try to optimize. 844 if (!get) { 845 if (trySkipToNextRow(cell)) { 846 return; 847 } 848 } 849 seekToNextRow(cell); 850 } 851 852 private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException { 853 if (!trySkipToNextColumn(cell)) { 854 seekAsDirection(matcher.getKeyForNextColumn(cell)); 855 } 856 } 857 858 /** 859 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 860 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an 861 * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to 862 * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the 863 * current, loaded block). It does this by looking at the next indexed key of the current HFile. 864 * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible 865 * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work 866 * with the current Cell but compare as though it were a seek key; see down in 867 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK, 868 * otherwise we just SKIP to the next requested cell. 869 * <p> 870 * Other notes: 871 * <ul> 872 * <li>Rows can straddle block boundaries</li> 873 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 874 * different block than column C1 at T2)</li> 875 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few 876 * SKIPs...</li> 877 * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells, 878 * especially if we know we need to go to the next block.</li> 879 * </ul> 880 * <p> 881 * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll 882 * likely end up seeking to the next block (or past the next block) to get our next column. 883 * Example: 884 * 885 * <pre> 886 * | BLOCK 1 | BLOCK 2 | 887 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 888 * ^ ^ 889 * | | 890 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 891 * 892 * 893 * | BLOCK 1 | BLOCK 2 | 894 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 895 * ^ ^ 896 * | | 897 * Next Index Key SEEK_NEXT_COL 898 * </pre> 899 * 900 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 901 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 902 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 903 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 904 * where the SEEK will not land us in the next block, it is very likely better to issues a series 905 * of SKIPs. 906 * @param cell current cell 907 * @return true means skip to next row, false means not 908 */ 909 protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException { 910 ExtendedCell nextCell = null; 911 // used to guard against a changed next indexed key by doing a identity comparison 912 // when the identity changes we need to compare the bytes again 913 ExtendedCell previousIndexedKey = null; 914 do { 915 ExtendedCell nextIndexedKey = getNextIndexedKey(); 916 if ( 917 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 918 && (nextIndexedKey == previousIndexedKey 919 || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) 920 ) { 921 this.heap.next(); 922 ++kvsScanned; 923 previousIndexedKey = nextIndexedKey; 924 } else { 925 return false; 926 } 927 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 928 return true; 929 } 930 931 /** 932 * See {@link #trySkipToNextRow(ExtendedCell)} 933 * @param cell current cell 934 * @return true means skip to next column, false means not 935 */ 936 protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException { 937 ExtendedCell nextCell = null; 938 // used to guard against a changed next indexed key by doing a identity comparison 939 // when the identity changes we need to compare the bytes again 940 ExtendedCell previousIndexedKey = null; 941 do { 942 ExtendedCell nextIndexedKey = getNextIndexedKey(); 943 if ( 944 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 945 && (nextIndexedKey == previousIndexedKey 946 || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) 947 ) { 948 this.heap.next(); 949 ++kvsScanned; 950 previousIndexedKey = nextIndexedKey; 951 } else { 952 return false; 953 } 954 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 955 // We need this check because it may happen that the new scanner that we get 956 // during heap.next() is requiring reseek due of fake KV previously generated for 957 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 958 if ( 959 useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP 960 ) { 961 return false; 962 } 963 return true; 964 } 965 966 @Override 967 public long getReadPoint() { 968 return this.readPt; 969 } 970 971 private static void clearAndClose(List<KeyValueScanner> scanners) { 972 if (scanners == null) { 973 return; 974 } 975 for (KeyValueScanner s : scanners) { 976 s.close(); 977 } 978 scanners.clear(); 979 } 980 981 // Implementation of ChangedReadersObserver 982 @Override 983 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 984 throws IOException { 985 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 986 return; 987 } 988 boolean updateReaders = false; 989 flushLock.lock(); 990 try { 991 if (!closeLock.tryLock()) { 992 // The reason for doing this is that when the current store scanner does not retrieve 993 // any new cells, then the scanner is considered to be done. The heap of this scanner 994 // is not closed till the shipped() call is completed. Hence in that case if at all 995 // the partial close (close (false)) has been called before updateReaders(), there is no 996 // need for the updateReaders() to happen. 997 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 998 // no lock acquired. 999 clearAndClose(memStoreScanners); 1000 return; 1001 } 1002 // lock acquired 1003 updateReaders = true; 1004 if (this.closing) { 1005 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 1006 clearAndClose(memStoreScanners); 1007 return; 1008 } 1009 flushed = true; 1010 final boolean isCompaction = false; 1011 boolean usePread = get || scanUsePread; 1012 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 1013 // calls next(). So its better we create scanners here rather than next() call. Ensure 1014 // these scanners are properly closed() whether or not the scan is completed successfully 1015 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 1016 // store files. In case of stream scanners this eager creation does not induce performance 1017 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 1018 List<KeyValueScanner> scanners = 1019 store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher, 1020 scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan)); 1021 flushedstoreFileScanners.addAll(scanners); 1022 if (!CollectionUtils.isEmpty(memStoreScanners)) { 1023 clearAndClose(memStoreScannersAfterFlush); 1024 memStoreScannersAfterFlush.addAll(memStoreScanners); 1025 } 1026 } finally { 1027 flushLock.unlock(); 1028 if (updateReaders) { 1029 closeLock.unlock(); 1030 } 1031 } 1032 // Let the next() call handle re-creating and seeking 1033 } 1034 1035 /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */ 1036 protected final boolean reopenAfterFlush() throws IOException { 1037 // here we can make sure that we have a Store instance so no null check on store. 1038 ExtendedCell lastTop = heap.peek(); 1039 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 1040 // scanners? We did so in the constructor and we could have done it now by storing the scan 1041 // object from the constructor 1042 List<KeyValueScanner> scanners; 1043 flushLock.lock(); 1044 try { 1045 List<KeyValueScanner> allScanners = 1046 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1047 allScanners.addAll(flushedstoreFileScanners); 1048 allScanners.addAll(memStoreScannersAfterFlush); 1049 scanners = selectScannersFrom(store, allScanners); 1050 // Clear the current set of flushed store files scanners so that they don't get added again 1051 flushedstoreFileScanners.clear(); 1052 memStoreScannersAfterFlush.clear(); 1053 } finally { 1054 flushLock.unlock(); 1055 } 1056 1057 // Seek the new scanners to the last key 1058 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1059 // remove the older memstore scanner 1060 for (int i = currentScanners.size() - 1; i >= 0; i--) { 1061 if (!currentScanners.get(i).isFileScanner()) { 1062 scannersForDelayedClose.add(currentScanners.remove(i)); 1063 } else { 1064 // we add the memstore scanner to the end of currentScanners 1065 break; 1066 } 1067 } 1068 // add the newly created scanners on the flushed files and the current active memstore scanner 1069 addCurrentScanners(scanners); 1070 // Combine all seeked scanners with a heap 1071 resetKVHeap(this.currentScanners, store.getComparator()); 1072 resetQueryMatcher(lastTop); 1073 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1074 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() 1075 + ",and after = " + heap.peek()); 1076 topChanged = true; 1077 } else { 1078 topChanged = false; 1079 } 1080 return topChanged; 1081 } 1082 1083 private void resetQueryMatcher(ExtendedCell lastTopKey) { 1084 // Reset the state of the Query Matcher and set to top row. 1085 // Only reset and call setRow if the row changes; avoids confusing the 1086 // query matcher if scanning intra-row. 1087 ExtendedCell cell = heap.peek(); 1088 if (cell == null) { 1089 cell = lastTopKey; 1090 } 1091 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1092 this.countPerRow = 0; 1093 // The setToNewRow will call reset internally 1094 matcher.setToNewRow(cell); 1095 } 1096 } 1097 1098 /** 1099 * Check whether scan as expected order 1100 */ 1101 protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) 1102 throws IOException { 1103 // Check that the heap gives us KVs in an increasing order. 1104 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 1105 : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1106 } 1107 1108 protected boolean seekToNextRow(ExtendedCell c) throws IOException { 1109 return reseek(PrivateCellUtil.createLastOnRow(c)); 1110 } 1111 1112 /** 1113 * Do a reseek in a normal StoreScanner(scan forward) 1114 * @return true if scanner has values left, false if end of scanner 1115 */ 1116 protected boolean seekAsDirection(ExtendedCell kv) throws IOException { 1117 return reseek(kv); 1118 } 1119 1120 @Override 1121 public boolean reseek(ExtendedCell kv) throws IOException { 1122 if (checkFlushed()) { 1123 reopenAfterFlush(); 1124 } 1125 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1126 return heap.requestSeek(kv, true, useRowColBloom); 1127 } 1128 return heap.reseek(kv); 1129 } 1130 1131 void trySwitchToStreamRead() { 1132 if ( 1133 readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null 1134 || bytesRead < preadMaxBytes 1135 ) { 1136 return; 1137 } 1138 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1139 this.store.getColumnFamilyName()); 1140 scanUsePread = false; 1141 ExtendedCell lastTop = heap.peek(); 1142 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1143 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1144 for (KeyValueScanner kvs : currentScanners) { 1145 if (!kvs.isFileScanner()) { 1146 // collect memstorescanners here 1147 memstoreScanners.add(kvs); 1148 } else { 1149 scannersToClose.add(kvs); 1150 } 1151 } 1152 List<KeyValueScanner> fileScanners = null; 1153 List<KeyValueScanner> newCurrentScanners; 1154 KeyValueHeap newHeap; 1155 try { 1156 // We must have a store instance here so no null check 1157 // recreate the scanners on the current file scanners 1158 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, 1159 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 1160 readPt, false); 1161 if (fileScanners == null) { 1162 return; 1163 } 1164 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1165 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1166 newCurrentScanners.addAll(fileScanners); 1167 newCurrentScanners.addAll(memstoreScanners); 1168 newHeap = newKVHeap(newCurrentScanners, comparator); 1169 } catch (Exception e) { 1170 LOG.warn("failed to switch to stream read", e); 1171 if (fileScanners != null) { 1172 fileScanners.forEach(KeyValueScanner::close); 1173 } 1174 return; 1175 } 1176 currentScanners.clear(); 1177 addCurrentScanners(newCurrentScanners); 1178 this.heap = newHeap; 1179 resetQueryMatcher(lastTop); 1180 scannersToClose.forEach(KeyValueScanner::close); 1181 } 1182 1183 protected final boolean checkFlushed() { 1184 // check the var without any lock. Suppose even if we see the old 1185 // value here still it is ok to continue because we will not be resetting 1186 // the heap but will continue with the referenced memstore's snapshot. For compactions 1187 // any way we don't need the updateReaders at all to happen as we still continue with 1188 // the older files 1189 if (flushed) { 1190 // If there is a flush and the current scan is notified on the flush ensure that the 1191 // scan's heap gets reset and we do a seek on the newly flushed file. 1192 if (this.closing) { 1193 return false; 1194 } 1195 // reset the flag 1196 flushed = false; 1197 return true; 1198 } 1199 return false; 1200 } 1201 1202 /** 1203 * Seek storefiles in parallel to optimize IO latency as much as possible 1204 * @param scanners the list {@link KeyValueScanner}s to be read from 1205 * @param kv the KeyValue on which the operation is being requested 1206 */ 1207 private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv) 1208 throws IOException { 1209 if (scanners.isEmpty()) return; 1210 int storeFileScannerCount = scanners.size(); 1211 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1212 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1213 for (KeyValueScanner scanner : scanners) { 1214 if (scanner instanceof StoreFileScanner) { 1215 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch); 1216 executor.submit(seekHandler); 1217 handlers.add(seekHandler); 1218 } else { 1219 scanner.seek(kv); 1220 latch.countDown(); 1221 } 1222 } 1223 1224 try { 1225 latch.await(); 1226 } catch (InterruptedException ie) { 1227 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 1228 } 1229 1230 for (ParallelSeekHandler handler : handlers) { 1231 if (handler.getErr() != null) { 1232 throw new IOException(handler.getErr()); 1233 } 1234 } 1235 } 1236 1237 /** 1238 * Used in testing. 1239 * @return all scanners in no particular order 1240 */ 1241 List<KeyValueScanner> getAllScannersForTesting() { 1242 List<KeyValueScanner> allScanners = new ArrayList<>(); 1243 KeyValueScanner current = heap.getCurrentForTesting(); 1244 if (current != null) allScanners.add(current); 1245 for (KeyValueScanner scanner : heap.getHeap()) 1246 allScanners.add(scanner); 1247 return allScanners; 1248 } 1249 1250 static void enableLazySeekGlobally(boolean enable) { 1251 lazySeekEnabledGlobally = enable; 1252 } 1253 1254 /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */ 1255 public long getEstimatedNumberOfKvsScanned() { 1256 return this.kvsScanned; 1257 } 1258 1259 @Override 1260 public ExtendedCell getNextIndexedKey() { 1261 return this.heap.getNextIndexedKey(); 1262 } 1263 1264 @Override 1265 public void shipped() throws IOException { 1266 if (prevCell != null) { 1267 // Do the copy here so that in case the prevCell ref is pointing to the previous 1268 // blocks we can safely release those blocks. 1269 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1270 // fetched from HDFS. Copying this would ensure that we let go the references to these 1271 // blocks so that they can be GCed safely(in case of bucket cache) 1272 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1273 } 1274 matcher.beforeShipped(); 1275 // There wont be further fetch of Cells from these scanners. Just close. 1276 clearAndClose(scannersForDelayedClose); 1277 if (this.heap != null) { 1278 this.heap.shipped(); 1279 // When switching from pread to stream, we will open a new scanner for each store file, but 1280 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1281 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1282 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1283 // method, so we here will also open new scanners and close old scanners in shipped method. 1284 // See HBASE-18055 for more details. 1285 trySwitchToStreamRead(); 1286 } 1287 } 1288}