001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.List; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.Set; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.locks.ReentrantLock; 032import java.util.function.IntConsumer; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseInterfaceAudience; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.KeyValueUtil; 043import org.apache.hadoop.hbase.PrivateCellUtil; 044import org.apache.hadoop.hbase.PrivateConstants; 045import org.apache.hadoop.hbase.client.IsolationLevel; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.conf.ConfigKey; 048import org.apache.hadoop.hbase.executor.ExecutorService; 049import org.apache.hadoop.hbase.filter.Filter; 050import org.apache.hadoop.hbase.ipc.RpcCall; 051import org.apache.hadoop.hbase.ipc.RpcServer; 052import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 053import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 054import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 055import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 056import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 057import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 064import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 065 066/** 067 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 068 * for a single row. 069 * <p> 070 * The implementation is not thread safe. So there will be no race between next and close. The only 071 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 072 * is a flush. 073 */ 074@InterfaceAudience.Private 075public class StoreScanner extends NonReversedNonLazyKeyValueScanner 076 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 077 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 078 // In unit tests, the store could be null 079 protected final HStore store; 080 private final CellComparator comparator; 081 private ScanQueryMatcher matcher; 082 protected KeyValueHeap heap; 083 private boolean cacheBlocks; 084 085 private long countPerRow = 0; 086 private int storeLimit = -1; 087 private int storeOffset = 0; 088 089 // Used to indicate that the scanner has closed (see HBASE-1107) 090 private volatile boolean closing = false; 091 private final boolean get; 092 private final boolean explicitColumnQuery; 093 private final boolean useRowColBloom; 094 /** 095 * A flag that enables StoreFileScanner parallel-seeking 096 */ 097 private boolean parallelSeekEnabled = false; 098 private ExecutorService executor; 099 private final Scan scan; 100 private final long oldestUnexpiredTS; 101 private final long now; 102 private final int minVersions; 103 private final long maxRowSize; 104 private final long cellsPerHeartbeatCheck; 105 long memstoreOnlyReads; 106 long mixedReads; 107 108 // 1) Collects all the KVHeap that are eagerly getting closed during the 109 // course of a scan 110 // 2) Collects the unused memstore scanners. If we close the memstore scanners 111 // before sending data to client, the chunk may be reclaimed by other 112 // updates and the data will be corrupt. 113 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 114 115 // Tracks file paths successfully read (scanners closed) by this store scanner. 116 private final Set<Path> filesRead = new HashSet<>(); 117 118 /** 119 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via 120 * seeking to next row/column. TODO: estimate them? 121 */ 122 private long kvsScanned = 0; 123 private ExtendedCell prevCell = null; 124 125 private final long preadMaxBytes; 126 private long bytesRead; 127 128 /** We don't ever expect to change this, the constant is just for clarity. */ 129 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 130 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 131 "hbase.storescanner.parallel.seek.enable"; 132 133 /** Used during unit testing to ensure that lazy seek does save seek ops */ 134 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 135 136 /** 137 * The number of cells scanned in between timeout checks. Specifying a larger value means that 138 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 139 * timeout checks. 140 */ 141 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 142 ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check"); 143 144 /** 145 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 146 */ 147 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 148 149 /** 150 * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 151 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 152 * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT, 153 * we will open scanner with stream mode itself. 154 */ 155 public static final String STORESCANNER_PREAD_MAX_BYTES = 156 ConfigKey.LONG("hbase.storescanner.pread.max.bytes"); 157 158 private final Scan.ReadType readType; 159 160 // A flag whether use pread for scan 161 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 162 private boolean scanUsePread; 163 // Indicates whether there was flush during the course of the scan 164 private volatile boolean flushed = false; 165 // generally we get one file from a flush 166 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 167 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 168 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 169 // The current list of scanners 170 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 171 // flush update lock 172 private final ReentrantLock flushLock = new ReentrantLock(); 173 // lock for closing. 174 private final ReentrantLock closeLock = new ReentrantLock(); 175 176 protected final long readPt; 177 private boolean topChanged = false; 178 179 // These are used to verify the state of the scanner during testing. 180 private static AtomicBoolean hasUpdatedReaders; 181 private static AtomicBoolean hasSwitchedToStreamRead; 182 183 /** An internal constructor. */ 184 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, 185 boolean cacheBlocks, ScanType scanType) { 186 this.readPt = readPt; 187 this.store = store; 188 this.cacheBlocks = cacheBlocks; 189 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 190 get = scan.isGetScan(); 191 explicitColumnQuery = numColumns > 0; 192 this.scan = scan; 193 this.now = EnvironmentEdgeManager.currentTime(); 194 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 195 this.minVersions = scanInfo.getMinVersions(); 196 197 // We look up row-column Bloom filters for multi-column queries as part of 198 // the seek operation. However, we also look the row-column Bloom filter 199 // for multi-row (non-"get") scans because this is not done in 200 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 201 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null 202 || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); 203 this.maxRowSize = scanInfo.getTableMaxRowSize(); 204 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 205 if (get) { 206 this.readType = Scan.ReadType.PREAD; 207 this.scanUsePread = true; 208 } else if (scanType != ScanType.USER_SCAN) { 209 // For compaction scanners never use Pread as already we have stream based scanners on the 210 // store files to be compacted 211 this.readType = Scan.ReadType.STREAM; 212 this.scanUsePread = false; 213 } else { 214 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 215 if (scanInfo.isUsePread()) { 216 this.readType = Scan.ReadType.PREAD; 217 } else if (this.preadMaxBytes < 0) { 218 this.readType = Scan.ReadType.STREAM; 219 } else { 220 this.readType = Scan.ReadType.DEFAULT; 221 } 222 } else { 223 this.readType = scan.getReadType(); 224 } 225 // Always start with pread unless user specific stream. Will change to stream later if 226 // readType is default if the scan keeps running for a long time. 227 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 228 } 229 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 230 // Parallel seeking is on if the config allows and more there is more than one store file. 231 if (store != null && store.getStorefilesCount() > 1) { 232 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 233 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 234 this.parallelSeekEnabled = true; 235 this.executor = rsService.getExecutorService(); 236 } 237 } 238 } 239 240 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 241 this.currentScanners.addAll(scanners); 242 } 243 244 private static boolean isOnlyLatestVersionScan(Scan scan) { 245 // No need to check for Scan#getMaxVersions because live version files generated by store file 246 // writer retains max versions specified in ColumnFamilyDescriptor for the given CF 247 return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; 248 } 249 250 /** 251 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a 252 * compaction. 253 * @param store who we scan 254 * @param scan the spec 255 * @param columns which columns we are scanning 256 */ 257 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 258 long readPt) throws IOException { 259 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), 260 ScanType.USER_SCAN); 261 if (columns != null && scan.isRaw()) { 262 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 263 } 264 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 265 store.getCoprocessorHost()); 266 267 store.addChangedReaderObserver(this); 268 269 List<KeyValueScanner> scanners = null; 270 try { 271 // Pass columns to try to filter out unnecessary StoreFiles. 272 scanners = selectScannersFrom(store, 273 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 274 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt, 275 isOnlyLatestVersionScan(scan))); 276 277 // Seek all scanners to the start of the Row (or if the exact matching row 278 // key does not exist, then to the start of the next matching Row). 279 // Always check bloom filter to optimize the top row seek for delete 280 // family marker. 281 282 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 283 parallelSeekEnabled); 284 285 // set storeLimit 286 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 287 288 // set rowOffset 289 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 290 addCurrentScanners(scanners); 291 // Combine all seeked scanners with a heap 292 resetKVHeap(scanners, comparator); 293 } catch (IOException e) { 294 clearAndClose(scanners, false); // do not track files when closing due to exception 295 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 296 // and might cause memory leak 297 store.deleteChangedReaderObserver(this); 298 throw e; 299 } 300 } 301 302 // a dummy scan instance for compaction. 303 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 304 305 /** 306 * Used for store file compaction and memstore compaction. 307 * <p> 308 * Opens a scanner across specified StoreFiles/MemStoreSegments. 309 * @param store who we scan 310 * @param scanners ancillary scanners 311 * @param smallestReadPoint the readPoint that we should use for tracking versions 312 */ 313 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 314 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 315 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 316 } 317 318 /** 319 * Used for compactions that drop deletes from a limited range of rows. 320 * <p> 321 * Opens a scanner across specified StoreFiles. 322 * @param store who we scan 323 * @param scanners ancillary scanners 324 * @param smallestReadPoint the readPoint that we should use for tracking versions 325 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 326 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 327 */ 328 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 329 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 330 throws IOException { 331 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 332 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 333 } 334 335 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 336 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 337 byte[] dropDeletesToRow) throws IOException { 338 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 339 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 340 assert scanType != ScanType.USER_SCAN; 341 matcher = 342 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 343 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 344 345 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 346 scanners = selectScannersFrom(store, scanners); 347 348 // Seek all scanners to the initial key 349 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 350 addCurrentScanners(scanners); 351 // Combine all seeked scanners with a heap 352 resetKVHeap(scanners, comparator); 353 } 354 355 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 356 throws IOException { 357 // Seek all scanners to the initial key 358 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 359 addCurrentScanners(scanners); 360 resetKVHeap(scanners, comparator); 361 } 362 363 // For mob compaction only as we do not have a Store instance when doing mob compaction. 364 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 365 List<? extends KeyValueScanner> scanners) throws IOException { 366 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 367 assert scanType != ScanType.USER_SCAN; 368 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 369 oldestUnexpiredTS, now, null, null, null); 370 seekAllScanner(scanInfo, scanners); 371 } 372 373 // Used to instantiate a scanner for user scan in test 374 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 375 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 376 // 0 is passed as readpoint because the test bypasses Store 377 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 378 scanType); 379 if (scanType == ScanType.USER_SCAN) { 380 this.matcher = 381 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 382 } else { 383 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 384 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 385 } 386 seekAllScanner(scanInfo, scanners); 387 } 388 389 // Used to instantiate a scanner for user scan in test 390 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 391 List<? extends KeyValueScanner> scanners) throws IOException { 392 // 0 is passed as readpoint because the test bypasses Store 393 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 394 ScanType.USER_SCAN); 395 this.matcher = 396 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 397 seekAllScanner(scanInfo, scanners); 398 } 399 400 // Used to instantiate a scanner for compaction in test 401 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 402 List<? extends KeyValueScanner> scanners) throws IOException { 403 // 0 is passed as readpoint because the test bypasses Store 404 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION, 405 scanInfo, 0, 0L, false, scanType); 406 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 407 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 408 seekAllScanner(scanInfo, scanners); 409 } 410 411 boolean isScanUsePread() { 412 return this.scanUsePread; 413 } 414 415 /** 416 * Seek the specified scanners with the given key 417 * @param isLazy true if using lazy seek 418 * @param isParallelSeek true if using parallel seek 419 */ 420 protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey, 421 boolean isLazy, boolean isParallelSeek) throws IOException { 422 // Seek all scanners to the start of the Row (or if the exact matching row 423 // key does not exist, then to the start of the next matching Row). 424 // Always check bloom filter to optimize the top row seek for delete 425 // family marker. 426 if (isLazy) { 427 for (KeyValueScanner scanner : scanners) { 428 scanner.requestSeek(seekKey, false, true); 429 } 430 } else { 431 if (!isParallelSeek) { 432 long totalScannersSoughtBytes = 0; 433 for (KeyValueScanner scanner : scanners) { 434 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 435 throw new RowTooBigException( 436 "Max row size allowed: " + maxRowSize + ", but row is bigger than that"); 437 } 438 scanner.seek(seekKey); 439 Cell c = scanner.peek(); 440 if (c != null) { 441 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 442 } 443 } 444 } else { 445 parallelSeek(scanners, seekKey); 446 } 447 } 448 } 449 450 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 451 throws IOException { 452 // Combine all seeked scanners with a heap 453 heap = newKVHeap(scanners, comparator); 454 } 455 456 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 457 CellComparator comparator) throws IOException { 458 return new KeyValueHeap(scanners, comparator); 459 } 460 461 /** 462 * Filters the given list of scanners using Bloom filter, time range, and TTL. 463 * <p> 464 * Will be overridden by testcase so declared as protected. 465 */ 466 protected List<KeyValueScanner> selectScannersFrom(HStore store, 467 List<? extends KeyValueScanner> allScanners) { 468 boolean memOnly; 469 boolean filesOnly; 470 if (scan instanceof InternalScan) { 471 InternalScan iscan = (InternalScan) scan; 472 memOnly = iscan.isCheckOnlyMemStore(); 473 filesOnly = iscan.isCheckOnlyStoreFiles(); 474 } else { 475 memOnly = false; 476 filesOnly = false; 477 } 478 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 479 480 // We can only exclude store files based on TTL if minVersions is set to 0. 481 // Otherwise, we might have to return KVs that have technically expired. 482 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 483 484 // include only those scan files which pass all filters 485 for (KeyValueScanner kvs : allScanners) { 486 boolean isFile = kvs.isFileScanner(); 487 if ((!isFile && filesOnly) || (isFile && memOnly)) { 488 kvs.close(); 489 filesRead.addAll(kvs.getFilesRead()); 490 continue; 491 } 492 493 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 494 scanners.add(kvs); 495 } else { 496 kvs.close(); 497 filesRead.addAll(kvs.getFilesRead()); 498 } 499 } 500 return scanners; 501 } 502 503 @Override 504 public ExtendedCell peek() { 505 return heap != null ? heap.peek() : null; 506 } 507 508 @Override 509 public KeyValue next() { 510 // throw runtime exception perhaps? 511 throw new RuntimeException("Never call StoreScanner.next()"); 512 } 513 514 @Override 515 public void close() { 516 close(true); 517 } 518 519 private void close(boolean withDelayedScannersClose) { 520 closeLock.lock(); 521 // If the closeLock is acquired then any subsequent updateReaders() 522 // call is ignored. 523 try { 524 if (this.closing) { 525 return; 526 } 527 if (withDelayedScannersClose) { 528 this.closing = true; 529 } 530 // For mob compaction, we do not have a store. 531 if (this.store != null) { 532 this.store.deleteChangedReaderObserver(this); 533 } 534 if (withDelayedScannersClose) { 535 clearAndClose(scannersForDelayedClose); 536 clearAndClose(memStoreScannersAfterFlush); 537 clearAndClose(flushedstoreFileScanners); 538 if (this.heap != null) { 539 this.heap.close(); 540 this.filesRead.addAll(this.heap.getFilesRead()); 541 this.currentScanners.clear(); 542 this.heap = null; // CLOSED! 543 } 544 } else { 545 if (this.heap != null) { 546 this.scannersForDelayedClose.add(this.heap); 547 this.currentScanners.clear(); 548 this.heap = null; 549 } 550 } 551 } finally { 552 closeLock.unlock(); 553 } 554 } 555 556 @Override 557 public boolean seek(ExtendedCell key) throws IOException { 558 if (checkFlushed()) { 559 reopenAfterFlush(); 560 } 561 return this.heap.seek(key); 562 } 563 564 /** 565 * Get the next row of values from this Store. 566 * @return true if there are more rows, false if scanner is done 567 */ 568 @Override 569 public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext) 570 throws IOException { 571 if (scannerContext == null) { 572 throw new IllegalArgumentException("Scanner context cannot be null"); 573 } 574 if (checkFlushed() && reopenAfterFlush()) { 575 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 576 } 577 578 // if the heap was left null, then the scanners had previously run out anyways, close and 579 // return. 580 if (this.heap == null) { 581 // By this time partial close should happened because already heap is null 582 close(false);// Do all cleanup except heap.close() 583 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 584 } 585 586 ExtendedCell cell = this.heap.peek(); 587 if (cell == null) { 588 close(false);// Do all cleanup except heap.close() 589 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 590 } 591 592 // only call setRow if the row changes; avoids confusing the query matcher 593 // if scanning intra-row 594 595 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 596 // rows. Else it is possible we are still traversing the same row so we must perform the row 597 // comparison. 598 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 599 this.countPerRow = 0; 600 matcher.setToNewRow(cell); 601 } 602 603 // Clear progress away unless invoker has indicated it should be kept. 604 if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { 605 scannerContext.clearProgress(); 606 } 607 608 Optional<RpcCall> rpcCall = 609 matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); 610 // re-useable closure to avoid allocations 611 IntConsumer recordBlockSize = blockSize -> { 612 if (rpcCall.isPresent()) { 613 rpcCall.get().incrementBlockBytesScanned(blockSize); 614 } 615 scannerContext.incrementBlockProgress(blockSize); 616 }; 617 618 int count = 0; 619 long totalBytesRead = 0; 620 // track the cells for metrics only if it is a user read request. 621 boolean onlyFromMemstore = matcher.isUserScan(); 622 try { 623 LOOP: do { 624 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 625 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 626 // in 627 // the shipped method below. 628 if ( 629 kvsScanned % cellsPerHeartbeatCheck == 0 630 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) 631 ) { 632 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 633 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 634 } 635 } 636 // Do object compare - we set prevKV from the same heap. 637 if (prevCell != cell) { 638 ++kvsScanned; 639 } 640 checkScanOrder(prevCell, cell, comparator); 641 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 642 bytesRead += cellSize; 643 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 644 // return immediately if we want to switch from pread to stream. We need this because we 645 // can 646 // only switch in the shipped method, if user use a filter to filter out everything and 647 // rpc 648 // timeout is very large then the shipped method will never be called until the whole scan 649 // is finished, but at that time we have already scan all the data... 650 // See HBASE-20457 for more details. 651 // And there is still a scenario that can not be handled. If we have a very large row, 652 // which 653 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 654 // here, we still need to scan all the qualifiers before returning... 655 scannerContext.returnImmediately(); 656 } 657 658 heap.recordBlockSize(recordBlockSize); 659 660 prevCell = cell; 661 scannerContext.setLastPeekedCell(cell); 662 topChanged = false; 663 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 664 switch (qcode) { 665 case INCLUDE: 666 case INCLUDE_AND_SEEK_NEXT_ROW: 667 case INCLUDE_AND_SEEK_NEXT_COL: 668 Filter f = matcher.getFilter(); 669 if (f != null) { 670 Cell transformedCell = f.transformCell(cell); 671 // fast path, most filters just return the same cell instance 672 if (transformedCell != cell) { 673 if (transformedCell instanceof ExtendedCell) { 674 cell = (ExtendedCell) transformedCell; 675 } else { 676 throw new DoNotRetryIOException("Incorrect filter implementation, " 677 + "the Cell returned by transformCell is not an ExtendedCell. Filter class: " 678 + f.getClass().getName()); 679 } 680 } 681 } 682 this.countPerRow++; 683 684 // add to results only if we have skipped #storeOffset kvs 685 // also update metric accordingly 686 if (this.countPerRow > storeOffset) { 687 outResult.add(cell); 688 689 // Update local tracking information 690 count++; 691 totalBytesRead += cellSize; 692 693 /** 694 * Increment the metric if all the cells are from memstore. If not we will account it 695 * for mixed reads 696 */ 697 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 698 // Update the progress of the scanner context 699 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 700 scannerContext.incrementBatchProgress(1); 701 702 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 703 String message = "Max row size allowed: " + maxRowSize 704 + ", but the row is bigger than that, the row info: " 705 + CellUtil.toString(cell, false) + ", already have process row cells = " 706 + outResult.size() + ", it belong to region = " 707 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 708 LOG.warn(message); 709 throw new RowTooBigException(message); 710 } 711 712 if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) { 713 // do what SEEK_NEXT_ROW does. 714 if (!matcher.moreRowsMayExistAfter(cell)) { 715 close(false);// Do all cleanup except heap.close() 716 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 717 } 718 matcher.clearCurrentRow(); 719 seekToNextRow(cell); 720 break LOOP; 721 } 722 } 723 724 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 725 if (!matcher.moreRowsMayExistAfter(cell)) { 726 close(false);// Do all cleanup except heap.close() 727 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 728 } 729 matcher.clearCurrentRow(); 730 seekOrSkipToNextRow(cell); 731 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 732 seekOrSkipToNextColumn(cell); 733 } else { 734 this.heap.next(); 735 } 736 737 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 738 break LOOP; 739 } 740 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 741 break LOOP; 742 } 743 continue; 744 745 case DONE: 746 // Optimization for Gets! If DONE, no more to get on this row, early exit! 747 if (get) { 748 // Then no more to this row... exit. 749 close(false);// Do all cleanup except heap.close() 750 // update metric 751 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 752 } 753 matcher.clearCurrentRow(); 754 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 755 756 case DONE_SCAN: 757 close(false);// Do all cleanup except heap.close() 758 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 759 760 case SEEK_NEXT_ROW: 761 // This is just a relatively simple end of scan fix, to short-cut end 762 // us if there is an endKey in the scan. 763 if (!matcher.moreRowsMayExistAfter(cell)) { 764 close(false);// Do all cleanup except heap.close() 765 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 766 } 767 matcher.clearCurrentRow(); 768 seekOrSkipToNextRow(cell); 769 NextState stateAfterSeekNextRow = needToReturn(); 770 if (stateAfterSeekNextRow != null) { 771 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 772 } 773 break; 774 775 case SEEK_NEXT_COL: 776 seekOrSkipToNextColumn(cell); 777 NextState stateAfterSeekNextColumn = needToReturn(); 778 if (stateAfterSeekNextColumn != null) { 779 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 780 } 781 break; 782 783 case SKIP: 784 this.heap.next(); 785 break; 786 787 case SEEK_NEXT_USING_HINT: 788 ExtendedCell nextKV = matcher.getNextKeyHint(cell); 789 if (nextKV != null) { 790 int difference = comparator.compare(nextKV, cell); 791 if ( 792 ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) 793 ) { 794 seekAsDirection(nextKV); 795 NextState stateAfterSeekByHint = needToReturn(); 796 if (stateAfterSeekByHint != null) { 797 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 798 } 799 break; 800 } 801 } 802 heap.next(); 803 break; 804 805 default: 806 throw new RuntimeException("UNEXPECTED"); 807 } 808 809 // One last chance to break due to size limit. The INCLUDE* cases above already check 810 // limit and continue. For the various filtered cases, we need to check because block 811 // size limit may have been exceeded even if we don't add cells to result list. 812 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 813 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 814 } 815 } while ((cell = this.heap.peek()) != null); 816 817 if (count > 0) { 818 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 819 } 820 821 // No more keys 822 close(false);// Do all cleanup except heap.close() 823 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 824 } finally { 825 // increment only if we have some result 826 if (count > 0 && matcher.isUserScan()) { 827 // if true increment memstore metrics, if not the mixed one 828 updateMetricsStore(onlyFromMemstore); 829 } 830 } 831 } 832 833 private void updateMetricsStore(boolean memstoreRead) { 834 if (store != null) { 835 store.updateMetricsStore(memstoreRead); 836 } else { 837 // for testing. 838 if (memstoreRead) { 839 memstoreOnlyReads++; 840 } else { 841 mixedReads++; 842 } 843 } 844 } 845 846 /** 847 * If the top cell won't be flushed into disk, the new top cell may be changed after 848 * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the 849 * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell 850 * is changed, we should return the current cells. Otherwise, we may return the cells across 851 * different rows. 852 * @return null is the top cell doesn't change. Otherwise, the NextState to return 853 */ 854 private NextState needToReturn() { 855 if (topChanged) { 856 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 857 } 858 return null; 859 } 860 861 private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException { 862 // If it is a Get Scan, then we know that we are done with this row; there are no more 863 // rows beyond the current one: don't try to optimize. 864 if (!get) { 865 if (trySkipToNextRow(cell)) { 866 return; 867 } 868 } 869 seekToNextRow(cell); 870 } 871 872 private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException { 873 if (!trySkipToNextColumn(cell)) { 874 seekAsDirection(matcher.getKeyForNextColumn(cell)); 875 } 876 } 877 878 /** 879 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 880 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an 881 * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to 882 * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the 883 * current, loaded block). It does this by looking at the next indexed key of the current HFile. 884 * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible 885 * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work 886 * with the current Cell but compare as though it were a seek key; see down in 887 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK, 888 * otherwise we just SKIP to the next requested cell. 889 * <p> 890 * Other notes: 891 * <ul> 892 * <li>Rows can straddle block boundaries</li> 893 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 894 * different block than column C1 at T2)</li> 895 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few 896 * SKIPs...</li> 897 * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells, 898 * especially if we know we need to go to the next block.</li> 899 * </ul> 900 * <p> 901 * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll 902 * likely end up seeking to the next block (or past the next block) to get our next column. 903 * Example: 904 * 905 * <pre> 906 * | BLOCK 1 | BLOCK 2 | 907 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 908 * ^ ^ 909 * | | 910 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 911 * 912 * 913 * | BLOCK 1 | BLOCK 2 | 914 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 915 * ^ ^ 916 * | | 917 * Next Index Key SEEK_NEXT_COL 918 * </pre> 919 * 920 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 921 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 922 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 923 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 924 * where the SEEK will not land us in the next block, it is very likely better to issues a series 925 * of SKIPs. 926 * @param cell current cell 927 * @return true means skip to next row, false means not 928 */ 929 protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException { 930 ExtendedCell nextCell = null; 931 // used to guard against a changed next indexed key by doing a identity comparison 932 // when the identity changes we need to compare the bytes again 933 ExtendedCell previousIndexedKey = null; 934 do { 935 ExtendedCell nextIndexedKey = getNextIndexedKey(); 936 if ( 937 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 938 && (nextIndexedKey == previousIndexedKey 939 || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) 940 ) { 941 this.heap.next(); 942 ++kvsScanned; 943 previousIndexedKey = nextIndexedKey; 944 } else { 945 return false; 946 } 947 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 948 return true; 949 } 950 951 /** 952 * See {@link #trySkipToNextRow(ExtendedCell)} 953 * @param cell current cell 954 * @return true means skip to next column, false means not 955 */ 956 protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException { 957 ExtendedCell nextCell = null; 958 // used to guard against a changed next indexed key by doing a identity comparison 959 // when the identity changes we need to compare the bytes again 960 ExtendedCell previousIndexedKey = null; 961 do { 962 ExtendedCell nextIndexedKey = getNextIndexedKey(); 963 if ( 964 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 965 && (nextIndexedKey == previousIndexedKey 966 || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) 967 ) { 968 this.heap.next(); 969 ++kvsScanned; 970 previousIndexedKey = nextIndexedKey; 971 } else { 972 return false; 973 } 974 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 975 // We need this check because it may happen that the new scanner that we get 976 // during heap.next() is requiring reseek due of fake KV previously generated for 977 // ROWCOL bloom filter optimization. See HBASE-19863 and HBASE-29907 for more details 978 if (useRowColBloom && nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) { 979 return false; 980 } 981 return true; 982 } 983 984 @Override 985 public long getReadPoint() { 986 return this.readPt; 987 } 988 989 private void clearAndClose(List<KeyValueScanner> scanners) { 990 clearAndClose(scanners, true); 991 } 992 993 private void clearAndClose(List<KeyValueScanner> scanners, boolean trackFiles) { 994 if (scanners == null) { 995 return; 996 } 997 for (KeyValueScanner s : scanners) { 998 s.close(); 999 if (trackFiles) { 1000 this.filesRead.addAll(s.getFilesRead()); 1001 } 1002 } 1003 scanners.clear(); 1004 } 1005 1006 // Implementation of ChangedReadersObserver 1007 @Override 1008 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 1009 throws IOException { 1010 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 1011 return; 1012 } 1013 boolean updateReaders = false; 1014 flushLock.lock(); 1015 try { 1016 if (!closeLock.tryLock()) { 1017 // The reason for doing this is that when the current store scanner does not retrieve 1018 // any new cells, then the scanner is considered to be done. The heap of this scanner 1019 // is not closed till the shipped() call is completed. Hence in that case if at all 1020 // the partial close (close (false)) has been called before updateReaders(), there is no 1021 // need for the updateReaders() to happen. 1022 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 1023 // no lock acquired. 1024 clearAndClose(memStoreScanners); 1025 return; 1026 } 1027 // lock acquired 1028 updateReaders = true; 1029 if (this.closing) { 1030 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 1031 clearAndClose(memStoreScanners); 1032 return; 1033 } 1034 flushed = true; 1035 final boolean isCompaction = false; 1036 boolean usePread = get || scanUsePread; 1037 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 1038 // calls next(). So its better we create scanners here rather than next() call. Ensure 1039 // these scanners are properly closed() whether or not the scan is completed successfully 1040 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 1041 // store files. In case of stream scanners this eager creation does not induce performance 1042 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 1043 List<KeyValueScanner> scanners = 1044 store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher, 1045 scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan)); 1046 flushedstoreFileScanners.addAll(scanners); 1047 if (!CollectionUtils.isEmpty(memStoreScanners)) { 1048 clearAndClose(memStoreScannersAfterFlush); 1049 memStoreScannersAfterFlush.addAll(memStoreScanners); 1050 } 1051 } finally { 1052 flushLock.unlock(); 1053 if (updateReaders) { 1054 closeLock.unlock(); 1055 } 1056 if (hasUpdatedReaders != null) { 1057 hasUpdatedReaders.set(true); 1058 } 1059 } 1060 // Let the next() call handle re-creating and seeking 1061 } 1062 1063 /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */ 1064 protected final boolean reopenAfterFlush() throws IOException { 1065 // here we can make sure that we have a Store instance so no null check on store. 1066 ExtendedCell lastTop = heap.peek(); 1067 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 1068 // scanners? We did so in the constructor and we could have done it now by storing the scan 1069 // object from the constructor 1070 List<KeyValueScanner> scanners; 1071 flushLock.lock(); 1072 try { 1073 List<KeyValueScanner> allScanners = 1074 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1075 allScanners.addAll(flushedstoreFileScanners); 1076 allScanners.addAll(memStoreScannersAfterFlush); 1077 scanners = selectScannersFrom(store, allScanners); 1078 // Clear the current set of flushed store files scanners so that they don't get added again 1079 flushedstoreFileScanners.clear(); 1080 memStoreScannersAfterFlush.clear(); 1081 } finally { 1082 flushLock.unlock(); 1083 } 1084 1085 // Seek the new scanners to the last key 1086 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1087 // remove the older memstore scanner 1088 for (int i = currentScanners.size() - 1; i >= 0; i--) { 1089 if (!currentScanners.get(i).isFileScanner()) { 1090 scannersForDelayedClose.add(currentScanners.remove(i)); 1091 } else { 1092 // we add the memstore scanner to the end of currentScanners 1093 break; 1094 } 1095 } 1096 // add the newly created scanners on the flushed files and the current active memstore scanner 1097 addCurrentScanners(scanners); 1098 // Combine all seeked scanners with a heap 1099 resetKVHeap(this.currentScanners, store.getComparator()); 1100 resetQueryMatcher(lastTop); 1101 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1102 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() 1103 + ",and after = " + heap.peek()); 1104 topChanged = true; 1105 } else { 1106 topChanged = false; 1107 } 1108 return topChanged; 1109 } 1110 1111 private void resetQueryMatcher(ExtendedCell lastTopKey) { 1112 // Reset the state of the Query Matcher and set to top row. 1113 // Only reset and call setRow if the row changes; avoids confusing the 1114 // query matcher if scanning intra-row. 1115 ExtendedCell cell = heap.peek(); 1116 if (cell == null) { 1117 cell = lastTopKey; 1118 } 1119 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1120 this.countPerRow = 0; 1121 // The setToNewRow will call reset internally 1122 matcher.setToNewRow(cell); 1123 } 1124 } 1125 1126 /** 1127 * Check whether scan as expected order 1128 */ 1129 protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) 1130 throws IOException { 1131 // Check that the heap gives us KVs in an increasing order. 1132 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 1133 : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1134 } 1135 1136 protected boolean seekToNextRow(ExtendedCell c) throws IOException { 1137 return reseek(PrivateCellUtil.createLastOnRow(c)); 1138 } 1139 1140 /** 1141 * Do a reseek in a normal StoreScanner(scan forward) 1142 * @return true if scanner has values left, false if end of scanner 1143 */ 1144 protected boolean seekAsDirection(ExtendedCell kv) throws IOException { 1145 return reseek(kv); 1146 } 1147 1148 @Override 1149 public boolean reseek(ExtendedCell kv) throws IOException { 1150 if (checkFlushed()) { 1151 reopenAfterFlush(); 1152 } 1153 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1154 return heap.requestSeek(kv, true, useRowColBloom); 1155 } 1156 return heap.reseek(kv); 1157 } 1158 1159 void trySwitchToStreamRead() { 1160 if ( 1161 readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null 1162 || bytesRead < preadMaxBytes 1163 ) { 1164 return; 1165 } 1166 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1167 this.store.getColumnFamilyName()); 1168 scanUsePread = false; 1169 ExtendedCell lastTop = heap.peek(); 1170 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1171 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1172 for (KeyValueScanner kvs : currentScanners) { 1173 if (!kvs.isFileScanner()) { 1174 // collect memstorescanners here 1175 memstoreScanners.add(kvs); 1176 } else { 1177 scannersToClose.add(kvs); 1178 } 1179 } 1180 List<KeyValueScanner> fileScanners = null; 1181 List<KeyValueScanner> newCurrentScanners; 1182 KeyValueHeap newHeap; 1183 try { 1184 // We must have a store instance here so no null check 1185 // recreate the scanners on the current file scanners 1186 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, 1187 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 1188 readPt, false); 1189 if (fileScanners == null) { 1190 return; 1191 } 1192 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1193 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1194 newCurrentScanners.addAll(fileScanners); 1195 newCurrentScanners.addAll(memstoreScanners); 1196 newHeap = newKVHeap(newCurrentScanners, comparator); 1197 } catch (Exception e) { 1198 LOG.warn("failed to switch to stream read", e); 1199 if (fileScanners != null) { 1200 fileScanners.forEach(KeyValueScanner::close); 1201 } 1202 return; 1203 } 1204 currentScanners.clear(); 1205 addCurrentScanners(newCurrentScanners); 1206 this.heap = newHeap; 1207 resetQueryMatcher(lastTop); 1208 for (KeyValueScanner scanner : scannersToClose) { 1209 scanner.close(); 1210 this.filesRead.addAll(scanner.getFilesRead()); 1211 } 1212 if (hasSwitchedToStreamRead != null) { 1213 hasSwitchedToStreamRead.set(true); 1214 } 1215 } 1216 1217 protected final boolean checkFlushed() { 1218 // check the var without any lock. Suppose even if we see the old 1219 // value here still it is ok to continue because we will not be resetting 1220 // the heap but will continue with the referenced memstore's snapshot. For compactions 1221 // any way we don't need the updateReaders at all to happen as we still continue with 1222 // the older files 1223 if (flushed) { 1224 // If there is a flush and the current scan is notified on the flush ensure that the 1225 // scan's heap gets reset and we do a seek on the newly flushed file. 1226 if (this.closing) { 1227 return false; 1228 } 1229 // reset the flag 1230 flushed = false; 1231 return true; 1232 } 1233 return false; 1234 } 1235 1236 /** 1237 * Seek storefiles in parallel to optimize IO latency as much as possible 1238 * @param scanners the list {@link KeyValueScanner}s to be read from 1239 * @param kv the KeyValue on which the operation is being requested 1240 */ 1241 private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv) 1242 throws IOException { 1243 if (scanners.isEmpty()) return; 1244 int storeFileScannerCount = scanners.size(); 1245 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1246 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1247 for (KeyValueScanner scanner : scanners) { 1248 if (scanner instanceof StoreFileScanner) { 1249 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch); 1250 executor.submit(seekHandler); 1251 handlers.add(seekHandler); 1252 } else { 1253 scanner.seek(kv); 1254 latch.countDown(); 1255 } 1256 } 1257 1258 try { 1259 latch.await(); 1260 } catch (InterruptedException ie) { 1261 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 1262 } 1263 1264 for (ParallelSeekHandler handler : handlers) { 1265 if (handler.getErr() != null) { 1266 throw new IOException(handler.getErr()); 1267 } 1268 } 1269 } 1270 1271 /** 1272 * Used in testing. 1273 * @return all scanners in no particular order 1274 */ 1275 List<KeyValueScanner> getAllScannersForTesting() { 1276 List<KeyValueScanner> allScanners = new ArrayList<>(); 1277 KeyValueScanner current = heap.getCurrentForTesting(); 1278 if (current != null) allScanners.add(current); 1279 for (KeyValueScanner scanner : heap.getHeap()) 1280 allScanners.add(scanner); 1281 return allScanners; 1282 } 1283 1284 static void enableLazySeekGlobally(boolean enable) { 1285 lazySeekEnabledGlobally = enable; 1286 } 1287 1288 /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */ 1289 public long getEstimatedNumberOfKvsScanned() { 1290 return this.kvsScanned; 1291 } 1292 1293 /** 1294 * Returns the set of store file paths that were successfully read by this scanner. Populated at 1295 * close from the key-value heap and any closed child scanners. 1296 */ 1297 @Override 1298 public Set<Path> getFilesRead() { 1299 return Collections.unmodifiableSet(filesRead); 1300 } 1301 1302 @Override 1303 public ExtendedCell getNextIndexedKey() { 1304 return this.heap.getNextIndexedKey(); 1305 } 1306 1307 @Override 1308 public void shipped() throws IOException { 1309 if (prevCell != null) { 1310 // Do the copy here so that in case the prevCell ref is pointing to the previous 1311 // blocks we can safely release those blocks. 1312 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1313 // fetched from HDFS. Copying this would ensure that we let go the references to these 1314 // blocks so that they can be GCed safely(in case of bucket cache) 1315 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1316 } 1317 matcher.beforeShipped(); 1318 // There wont be further fetch of Cells from these scanners. Just close. 1319 clearAndClose(scannersForDelayedClose); 1320 if (this.heap != null) { 1321 this.heap.shipped(); 1322 // When switching from pread to stream, we will open a new scanner for each store file, but 1323 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1324 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1325 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1326 // method, so we here will also open new scanners and close old scanners in shipped method. 1327 // See HBASE-18055 for more details. 1328 trySwitchToStreamRead(); 1329 } 1330 } 1331 1332 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1333 static final void instrument() { 1334 hasUpdatedReaders = new AtomicBoolean(false); 1335 hasSwitchedToStreamRead = new AtomicBoolean(false); 1336 } 1337 1338 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1339 static final boolean hasUpdatedReaders() { 1340 return hasUpdatedReaders.get(); 1341 } 1342 1343 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1344 static final boolean hasSwitchedToStreamRead() { 1345 return hasSwitchedToStreamRead.get(); 1346 } 1347 1348 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1349 static final void resetHasUpdatedReaders() { 1350 hasUpdatedReaders.set(false); 1351 } 1352 1353 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1354 static final void resetHasSwitchedToStreamRead() { 1355 hasSwitchedToStreamRead.set(false); 1356 } 1357}