001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.Optional; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.locks.ReentrantLock; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.KeyValueUtil; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.PrivateConstants; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.conf.ConfigKey; 041import org.apache.hadoop.hbase.executor.ExecutorService; 042import org.apache.hadoop.hbase.filter.Filter; 043import org.apache.hadoop.hbase.ipc.RpcCall; 044import org.apache.hadoop.hbase.ipc.RpcServer; 045import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 046import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 047import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 048import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 049import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 050import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 058 059/** 060 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 061 * for a single row. 062 * <p> 063 * The implementation is not thread safe. So there will be no race between next and close. The only 064 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 065 * is a flush. 066 */ 067@InterfaceAudience.Private 068public class StoreScanner extends NonReversedNonLazyKeyValueScanner 069 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 070 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 071 // In unit tests, the store could be null 072 protected final HStore store; 073 private final CellComparator comparator; 074 private ScanQueryMatcher matcher; 075 protected KeyValueHeap heap; 076 private boolean cacheBlocks; 077 078 private long countPerRow = 0; 079 private int storeLimit = -1; 080 private int storeOffset = 0; 081 082 // Used to indicate that the scanner has closed (see HBASE-1107) 083 private volatile boolean closing = false; 084 private final boolean get; 085 private final boolean explicitColumnQuery; 086 private final boolean useRowColBloom; 087 /** 088 * A flag that enables StoreFileScanner parallel-seeking 089 */ 090 private boolean parallelSeekEnabled = false; 091 private ExecutorService executor; 092 private final Scan scan; 093 private final long oldestUnexpiredTS; 094 private final long now; 095 private final int minVersions; 096 private final long maxRowSize; 097 private final long cellsPerHeartbeatCheck; 098 long memstoreOnlyReads; 099 long mixedReads; 100 101 // 1) Collects all the KVHeap that are eagerly getting closed during the 102 // course of a scan 103 // 2) Collects the unused memstore scanners. If we close the memstore scanners 104 // before sending data to client, the chunk may be reclaimed by other 105 // updates and the data will be corrupt. 106 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 107 108 /** 109 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via 110 * seeking to next row/column. TODO: estimate them? 111 */ 112 private long kvsScanned = 0; 113 private ExtendedCell prevCell = null; 114 115 private final long preadMaxBytes; 116 private long bytesRead; 117 118 /** We don't ever expect to change this, the constant is just for clarity. */ 119 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 120 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 121 "hbase.storescanner.parallel.seek.enable"; 122 123 /** Used during unit testing to ensure that lazy seek does save seek ops */ 124 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 125 126 /** 127 * The number of cells scanned in between timeout checks. Specifying a larger value means that 128 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 129 * timeout checks. 130 */ 131 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 132 ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check"); 133 134 /** 135 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 136 */ 137 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 138 139 /** 140 * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 141 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 142 * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT, 143 * we will open scanner with stream mode itself. 144 */ 145 public static final String STORESCANNER_PREAD_MAX_BYTES = 146 ConfigKey.LONG("hbase.storescanner.pread.max.bytes"); 147 148 private final Scan.ReadType readType; 149 150 // A flag whether use pread for scan 151 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 152 private boolean scanUsePread; 153 // Indicates whether there was flush during the course of the scan 154 private volatile boolean flushed = false; 155 // generally we get one file from a flush 156 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 157 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 158 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 159 // The current list of scanners 160 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 161 // flush update lock 162 private final ReentrantLock flushLock = new ReentrantLock(); 163 // lock for closing. 164 private final ReentrantLock closeLock = new ReentrantLock(); 165 166 protected final long readPt; 167 private boolean topChanged = false; 168 169 /** An internal constructor. */ 170 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, 171 boolean cacheBlocks, ScanType scanType) { 172 this.readPt = readPt; 173 this.store = store; 174 this.cacheBlocks = cacheBlocks; 175 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 176 get = scan.isGetScan(); 177 explicitColumnQuery = numColumns > 0; 178 this.scan = scan; 179 this.now = EnvironmentEdgeManager.currentTime(); 180 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 181 this.minVersions = scanInfo.getMinVersions(); 182 183 // We look up row-column Bloom filters for multi-column queries as part of 184 // the seek operation. However, we also look the row-column Bloom filter 185 // for multi-row (non-"get") scans because this is not done in 186 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 187 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null 188 || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); 189 this.maxRowSize = scanInfo.getTableMaxRowSize(); 190 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 191 if (get) { 192 this.readType = Scan.ReadType.PREAD; 193 this.scanUsePread = true; 194 } else if (scanType != ScanType.USER_SCAN) { 195 // For compaction scanners never use Pread as already we have stream based scanners on the 196 // store files to be compacted 197 this.readType = Scan.ReadType.STREAM; 198 this.scanUsePread = false; 199 } else { 200 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 201 if (scanInfo.isUsePread()) { 202 this.readType = Scan.ReadType.PREAD; 203 } else if (this.preadMaxBytes < 0) { 204 this.readType = Scan.ReadType.STREAM; 205 } else { 206 this.readType = Scan.ReadType.DEFAULT; 207 } 208 } else { 209 this.readType = scan.getReadType(); 210 } 211 // Always start with pread unless user specific stream. Will change to stream later if 212 // readType is default if the scan keeps running for a long time. 213 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 214 } 215 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 216 // Parallel seeking is on if the config allows and more there is more than one store file. 217 if (store != null && store.getStorefilesCount() > 1) { 218 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 219 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 220 this.parallelSeekEnabled = true; 221 this.executor = rsService.getExecutorService(); 222 } 223 } 224 } 225 226 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 227 this.currentScanners.addAll(scanners); 228 } 229 230 private static boolean isOnlyLatestVersionScan(Scan scan) { 231 // No need to check for Scan#getMaxVersions because live version files generated by store file 232 // writer retains max versions specified in ColumnFamilyDescriptor for the given CF 233 return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; 234 } 235 236 /** 237 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a 238 * compaction. 239 * @param store who we scan 240 * @param scan the spec 241 * @param columns which columns we are scanning 242 */ 243 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 244 long readPt) throws IOException { 245 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), 246 ScanType.USER_SCAN); 247 if (columns != null && scan.isRaw()) { 248 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 249 } 250 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 251 store.getCoprocessorHost()); 252 253 store.addChangedReaderObserver(this); 254 255 List<KeyValueScanner> scanners = null; 256 try { 257 // Pass columns to try to filter out unnecessary StoreFiles. 258 scanners = selectScannersFrom(store, 259 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 260 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt, 261 isOnlyLatestVersionScan(scan))); 262 263 // Seek all scanners to the start of the Row (or if the exact matching row 264 // key does not exist, then to the start of the next matching Row). 265 // Always check bloom filter to optimize the top row seek for delete 266 // family marker. 267 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 268 parallelSeekEnabled); 269 270 // set storeLimit 271 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 272 273 // set rowOffset 274 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 275 addCurrentScanners(scanners); 276 // Combine all seeked scanners with a heap 277 resetKVHeap(scanners, comparator); 278 } catch (IOException e) { 279 clearAndClose(scanners); 280 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 281 // and might cause memory leak 282 store.deleteChangedReaderObserver(this); 283 throw e; 284 } 285 } 286 287 // a dummy scan instance for compaction. 288 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 289 290 /** 291 * Used for store file compaction and memstore compaction. 292 * <p> 293 * Opens a scanner across specified StoreFiles/MemStoreSegments. 294 * @param store who we scan 295 * @param scanners ancillary scanners 296 * @param smallestReadPoint the readPoint that we should use for tracking versions 297 */ 298 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 299 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 300 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 301 } 302 303 /** 304 * Used for compactions that drop deletes from a limited range of rows. 305 * <p> 306 * Opens a scanner across specified StoreFiles. 307 * @param store who we scan 308 * @param scanners ancillary scanners 309 * @param smallestReadPoint the readPoint that we should use for tracking versions 310 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 311 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 312 */ 313 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 314 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 315 throws IOException { 316 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 317 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 318 } 319 320 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 321 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 322 byte[] dropDeletesToRow) throws IOException { 323 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 324 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 325 assert scanType != ScanType.USER_SCAN; 326 matcher = 327 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 328 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 329 330 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 331 scanners = selectScannersFrom(store, scanners); 332 333 // Seek all scanners to the initial key 334 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 335 addCurrentScanners(scanners); 336 // Combine all seeked scanners with a heap 337 resetKVHeap(scanners, comparator); 338 } 339 340 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 341 throws IOException { 342 // Seek all scanners to the initial key 343 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 344 addCurrentScanners(scanners); 345 resetKVHeap(scanners, comparator); 346 } 347 348 // For mob compaction only as we do not have a Store instance when doing mob compaction. 349 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 350 List<? extends KeyValueScanner> scanners) throws IOException { 351 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 352 assert scanType != ScanType.USER_SCAN; 353 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 354 oldestUnexpiredTS, now, null, null, null); 355 seekAllScanner(scanInfo, scanners); 356 } 357 358 // Used to instantiate a scanner for user scan in test 359 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 360 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 361 // 0 is passed as readpoint because the test bypasses Store 362 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 363 scanType); 364 if (scanType == ScanType.USER_SCAN) { 365 this.matcher = 366 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 367 } else { 368 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 369 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 370 } 371 seekAllScanner(scanInfo, scanners); 372 } 373 374 // Used to instantiate a scanner for user scan in test 375 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 376 List<? extends KeyValueScanner> scanners) throws IOException { 377 // 0 is passed as readpoint because the test bypasses Store 378 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 379 ScanType.USER_SCAN); 380 this.matcher = 381 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 382 seekAllScanner(scanInfo, scanners); 383 } 384 385 // Used to instantiate a scanner for compaction in test 386 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 387 List<? extends KeyValueScanner> scanners) throws IOException { 388 // 0 is passed as readpoint because the test bypasses Store 389 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION, 390 scanInfo, 0, 0L, false, scanType); 391 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 392 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 393 seekAllScanner(scanInfo, scanners); 394 } 395 396 boolean isScanUsePread() { 397 return this.scanUsePread; 398 } 399 400 /** 401 * Seek the specified scanners with the given key 402 * @param isLazy true if using lazy seek 403 * @param isParallelSeek true if using parallel seek 404 */ 405 protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey, 406 boolean isLazy, boolean isParallelSeek) throws IOException { 407 // Seek all scanners to the start of the Row (or if the exact matching row 408 // key does not exist, then to the start of the next matching Row). 409 // Always check bloom filter to optimize the top row seek for delete 410 // family marker. 411 if (isLazy) { 412 for (KeyValueScanner scanner : scanners) { 413 scanner.requestSeek(seekKey, false, true); 414 } 415 } else { 416 if (!isParallelSeek) { 417 long totalScannersSoughtBytes = 0; 418 for (KeyValueScanner scanner : scanners) { 419 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 420 throw new RowTooBigException( 421 "Max row size allowed: " + maxRowSize + ", but row is bigger than that"); 422 } 423 scanner.seek(seekKey); 424 Cell c = scanner.peek(); 425 if (c != null) { 426 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 427 } 428 } 429 } else { 430 parallelSeek(scanners, seekKey); 431 } 432 } 433 } 434 435 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 436 throws IOException { 437 // Combine all seeked scanners with a heap 438 heap = newKVHeap(scanners, comparator); 439 } 440 441 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 442 CellComparator comparator) throws IOException { 443 return new KeyValueHeap(scanners, comparator); 444 } 445 446 /** 447 * Filters the given list of scanners using Bloom filter, time range, and TTL. 448 * <p> 449 * Will be overridden by testcase so declared as protected. 450 */ 451 protected List<KeyValueScanner> selectScannersFrom(HStore store, 452 List<? extends KeyValueScanner> allScanners) { 453 boolean memOnly; 454 boolean filesOnly; 455 if (scan instanceof InternalScan) { 456 InternalScan iscan = (InternalScan) scan; 457 memOnly = iscan.isCheckOnlyMemStore(); 458 filesOnly = iscan.isCheckOnlyStoreFiles(); 459 } else { 460 memOnly = false; 461 filesOnly = false; 462 } 463 464 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 465 466 // We can only exclude store files based on TTL if minVersions is set to 0. 467 // Otherwise, we might have to return KVs that have technically expired. 468 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 469 470 // include only those scan files which pass all filters 471 for (KeyValueScanner kvs : allScanners) { 472 boolean isFile = kvs.isFileScanner(); 473 if ((!isFile && filesOnly) || (isFile && memOnly)) { 474 kvs.close(); 475 continue; 476 } 477 478 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 479 scanners.add(kvs); 480 } else { 481 kvs.close(); 482 } 483 } 484 return scanners; 485 } 486 487 @Override 488 public ExtendedCell peek() { 489 return heap != null ? heap.peek() : null; 490 } 491 492 @Override 493 public KeyValue next() { 494 // throw runtime exception perhaps? 495 throw new RuntimeException("Never call StoreScanner.next()"); 496 } 497 498 @Override 499 public void close() { 500 close(true); 501 } 502 503 private void close(boolean withDelayedScannersClose) { 504 closeLock.lock(); 505 // If the closeLock is acquired then any subsequent updateReaders() 506 // call is ignored. 507 try { 508 if (this.closing) { 509 return; 510 } 511 if (withDelayedScannersClose) { 512 this.closing = true; 513 } 514 // For mob compaction, we do not have a store. 515 if (this.store != null) { 516 this.store.deleteChangedReaderObserver(this); 517 } 518 if (withDelayedScannersClose) { 519 clearAndClose(scannersForDelayedClose); 520 clearAndClose(memStoreScannersAfterFlush); 521 clearAndClose(flushedstoreFileScanners); 522 if (this.heap != null) { 523 this.heap.close(); 524 this.currentScanners.clear(); 525 this.heap = null; // CLOSED! 526 } 527 } else { 528 if (this.heap != null) { 529 this.scannersForDelayedClose.add(this.heap); 530 this.currentScanners.clear(); 531 this.heap = null; 532 } 533 } 534 } finally { 535 closeLock.unlock(); 536 } 537 } 538 539 @Override 540 public boolean seek(ExtendedCell key) throws IOException { 541 if (checkFlushed()) { 542 reopenAfterFlush(); 543 } 544 return this.heap.seek(key); 545 } 546 547 /** 548 * Get the next row of values from this Store. 549 * @return true if there are more rows, false if scanner is done 550 */ 551 @Override 552 public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext) 553 throws IOException { 554 if (scannerContext == null) { 555 throw new IllegalArgumentException("Scanner context cannot be null"); 556 } 557 if (checkFlushed() && reopenAfterFlush()) { 558 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 559 } 560 561 // if the heap was left null, then the scanners had previously run out anyways, close and 562 // return. 563 if (this.heap == null) { 564 // By this time partial close should happened because already heap is null 565 close(false);// Do all cleanup except heap.close() 566 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 567 } 568 569 ExtendedCell cell = this.heap.peek(); 570 if (cell == null) { 571 close(false);// Do all cleanup except heap.close() 572 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 573 } 574 575 // only call setRow if the row changes; avoids confusing the query matcher 576 // if scanning intra-row 577 578 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 579 // rows. Else it is possible we are still traversing the same row so we must perform the row 580 // comparison. 581 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 582 this.countPerRow = 0; 583 matcher.setToNewRow(cell); 584 } 585 586 // Clear progress away unless invoker has indicated it should be kept. 587 if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { 588 scannerContext.clearProgress(); 589 } 590 591 Optional<RpcCall> rpcCall = 592 matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); 593 594 int count = 0; 595 long totalBytesRead = 0; 596 // track the cells for metrics only if it is a user read request. 597 boolean onlyFromMemstore = matcher.isUserScan(); 598 try { 599 LOOP: do { 600 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 601 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 602 // in 603 // the shipped method below. 604 if ( 605 kvsScanned % cellsPerHeartbeatCheck == 0 606 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) 607 ) { 608 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 609 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 610 } 611 } 612 // Do object compare - we set prevKV from the same heap. 613 if (prevCell != cell) { 614 ++kvsScanned; 615 } 616 checkScanOrder(prevCell, cell, comparator); 617 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 618 bytesRead += cellSize; 619 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 620 // return immediately if we want to switch from pread to stream. We need this because we 621 // can 622 // only switch in the shipped method, if user use a filter to filter out everything and 623 // rpc 624 // timeout is very large then the shipped method will never be called until the whole scan 625 // is finished, but at that time we have already scan all the data... 626 // See HBASE-20457 for more details. 627 // And there is still a scenario that can not be handled. If we have a very large row, 628 // which 629 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 630 // here, we still need to scan all the qualifiers before returning... 631 scannerContext.returnImmediately(); 632 } 633 634 heap.recordBlockSize(blockSize -> { 635 if (rpcCall.isPresent()) { 636 rpcCall.get().incrementBlockBytesScanned(blockSize); 637 } 638 scannerContext.incrementBlockProgress(blockSize); 639 }); 640 641 prevCell = cell; 642 scannerContext.setLastPeekedCell(cell); 643 topChanged = false; 644 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 645 switch (qcode) { 646 case INCLUDE: 647 case INCLUDE_AND_SEEK_NEXT_ROW: 648 case INCLUDE_AND_SEEK_NEXT_COL: 649 Filter f = matcher.getFilter(); 650 if (f != null) { 651 Cell transformedCell = f.transformCell(cell); 652 // fast path, most filters just return the same cell instance 653 if (transformedCell != cell) { 654 if (transformedCell instanceof ExtendedCell) { 655 cell = (ExtendedCell) transformedCell; 656 } else { 657 throw new DoNotRetryIOException("Incorrect filter implementation, " 658 + "the Cell returned by transformCell is not an ExtendedCell. Filter class: " 659 + f.getClass().getName()); 660 } 661 } 662 } 663 this.countPerRow++; 664 665 // add to results only if we have skipped #storeOffset kvs 666 // also update metric accordingly 667 if (this.countPerRow > storeOffset) { 668 outResult.add(cell); 669 670 // Update local tracking information 671 count++; 672 totalBytesRead += cellSize; 673 674 /** 675 * Increment the metric if all the cells are from memstore. If not we will account it 676 * for mixed reads 677 */ 678 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 679 // Update the progress of the scanner context 680 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 681 scannerContext.incrementBatchProgress(1); 682 683 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 684 String message = "Max row size allowed: " + maxRowSize 685 + ", but the row is bigger than that, the row info: " 686 + CellUtil.toString(cell, false) + ", already have process row cells = " 687 + outResult.size() + ", it belong to region = " 688 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 689 LOG.warn(message); 690 throw new RowTooBigException(message); 691 } 692 693 if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) { 694 // do what SEEK_NEXT_ROW does. 695 if (!matcher.moreRowsMayExistAfter(cell)) { 696 close(false);// Do all cleanup except heap.close() 697 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 698 } 699 matcher.clearCurrentRow(); 700 seekToNextRow(cell); 701 break LOOP; 702 } 703 } 704 705 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 706 if (!matcher.moreRowsMayExistAfter(cell)) { 707 close(false);// Do all cleanup except heap.close() 708 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 709 } 710 matcher.clearCurrentRow(); 711 seekOrSkipToNextRow(cell); 712 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 713 seekOrSkipToNextColumn(cell); 714 } else { 715 this.heap.next(); 716 } 717 718 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 719 break LOOP; 720 } 721 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 722 break LOOP; 723 } 724 continue; 725 726 case DONE: 727 // Optimization for Gets! If DONE, no more to get on this row, early exit! 728 if (get) { 729 // Then no more to this row... exit. 730 close(false);// Do all cleanup except heap.close() 731 // update metric 732 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 733 } 734 matcher.clearCurrentRow(); 735 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 736 737 case DONE_SCAN: 738 close(false);// Do all cleanup except heap.close() 739 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 740 741 case SEEK_NEXT_ROW: 742 // This is just a relatively simple end of scan fix, to short-cut end 743 // us if there is an endKey in the scan. 744 if (!matcher.moreRowsMayExistAfter(cell)) { 745 close(false);// Do all cleanup except heap.close() 746 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 747 } 748 matcher.clearCurrentRow(); 749 seekOrSkipToNextRow(cell); 750 NextState stateAfterSeekNextRow = needToReturn(outResult); 751 if (stateAfterSeekNextRow != null) { 752 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 753 } 754 break; 755 756 case SEEK_NEXT_COL: 757 seekOrSkipToNextColumn(cell); 758 NextState stateAfterSeekNextColumn = needToReturn(outResult); 759 if (stateAfterSeekNextColumn != null) { 760 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 761 } 762 break; 763 764 case SKIP: 765 this.heap.next(); 766 break; 767 768 case SEEK_NEXT_USING_HINT: 769 ExtendedCell nextKV = matcher.getNextKeyHint(cell); 770 if (nextKV != null) { 771 int difference = comparator.compare(nextKV, cell); 772 if ( 773 ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) 774 ) { 775 seekAsDirection(nextKV); 776 NextState stateAfterSeekByHint = needToReturn(outResult); 777 if (stateAfterSeekByHint != null) { 778 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 779 } 780 break; 781 } 782 } 783 heap.next(); 784 break; 785 786 default: 787 throw new RuntimeException("UNEXPECTED"); 788 } 789 790 // One last chance to break due to size limit. The INCLUDE* cases above already check 791 // limit and continue. For the various filtered cases, we need to check because block 792 // size limit may have been exceeded even if we don't add cells to result list. 793 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 794 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 795 } 796 } while ((cell = this.heap.peek()) != null); 797 798 if (count > 0) { 799 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 800 } 801 802 // No more keys 803 close(false);// Do all cleanup except heap.close() 804 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 805 } finally { 806 // increment only if we have some result 807 if (count > 0 && matcher.isUserScan()) { 808 // if true increment memstore metrics, if not the mixed one 809 updateMetricsStore(onlyFromMemstore); 810 } 811 } 812 } 813 814 private void updateMetricsStore(boolean memstoreRead) { 815 if (store != null) { 816 store.updateMetricsStore(memstoreRead); 817 } else { 818 // for testing. 819 if (memstoreRead) { 820 memstoreOnlyReads++; 821 } else { 822 mixedReads++; 823 } 824 } 825 } 826 827 /** 828 * If the top cell won't be flushed into disk, the new top cell may be changed after 829 * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the 830 * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell 831 * is changed, we should return the current cells. Otherwise, we may return the cells across 832 * different rows. 833 * @param outResult the cells which are visible for user scan 834 * @return null is the top cell doesn't change. Otherwise, the NextState to return 835 */ 836 private NextState needToReturn(List<? super ExtendedCell> outResult) { 837 if (!outResult.isEmpty() && topChanged) { 838 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 839 } 840 return null; 841 } 842 843 private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException { 844 // If it is a Get Scan, then we know that we are done with this row; there are no more 845 // rows beyond the current one: don't try to optimize. 846 if (!get) { 847 if (trySkipToNextRow(cell)) { 848 return; 849 } 850 } 851 seekToNextRow(cell); 852 } 853 854 private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException { 855 if (!trySkipToNextColumn(cell)) { 856 seekAsDirection(matcher.getKeyForNextColumn(cell)); 857 } 858 } 859 860 /** 861 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 862 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an 863 * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to 864 * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the 865 * current, loaded block). It does this by looking at the next indexed key of the current HFile. 866 * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible 867 * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work 868 * with the current Cell but compare as though it were a seek key; see down in 869 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK, 870 * otherwise we just SKIP to the next requested cell. 871 * <p> 872 * Other notes: 873 * <ul> 874 * <li>Rows can straddle block boundaries</li> 875 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 876 * different block than column C1 at T2)</li> 877 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few 878 * SKIPs...</li> 879 * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells, 880 * especially if we know we need to go to the next block.</li> 881 * </ul> 882 * <p> 883 * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll 884 * likely end up seeking to the next block (or past the next block) to get our next column. 885 * Example: 886 * 887 * <pre> 888 * | BLOCK 1 | BLOCK 2 | 889 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 890 * ^ ^ 891 * | | 892 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 893 * 894 * 895 * | BLOCK 1 | BLOCK 2 | 896 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 897 * ^ ^ 898 * | | 899 * Next Index Key SEEK_NEXT_COL 900 * </pre> 901 * 902 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 903 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 904 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 905 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 906 * where the SEEK will not land us in the next block, it is very likely better to issues a series 907 * of SKIPs. 908 * @param cell current cell 909 * @return true means skip to next row, false means not 910 */ 911 protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException { 912 ExtendedCell nextCell = null; 913 // used to guard against a changed next indexed key by doing a identity comparison 914 // when the identity changes we need to compare the bytes again 915 ExtendedCell previousIndexedKey = null; 916 do { 917 ExtendedCell nextIndexedKey = getNextIndexedKey(); 918 if ( 919 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 920 && (nextIndexedKey == previousIndexedKey 921 || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) 922 ) { 923 this.heap.next(); 924 ++kvsScanned; 925 previousIndexedKey = nextIndexedKey; 926 } else { 927 return false; 928 } 929 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 930 return true; 931 } 932 933 /** 934 * See {@link #trySkipToNextRow(ExtendedCell)} 935 * @param cell current cell 936 * @return true means skip to next column, false means not 937 */ 938 protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException { 939 ExtendedCell nextCell = null; 940 // used to guard against a changed next indexed key by doing a identity comparison 941 // when the identity changes we need to compare the bytes again 942 ExtendedCell previousIndexedKey = null; 943 do { 944 ExtendedCell nextIndexedKey = getNextIndexedKey(); 945 if ( 946 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 947 && (nextIndexedKey == previousIndexedKey 948 || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) 949 ) { 950 this.heap.next(); 951 ++kvsScanned; 952 previousIndexedKey = nextIndexedKey; 953 } else { 954 return false; 955 } 956 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 957 // We need this check because it may happen that the new scanner that we get 958 // during heap.next() is requiring reseek due of fake KV previously generated for 959 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 960 if ( 961 useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP 962 ) { 963 return false; 964 } 965 return true; 966 } 967 968 @Override 969 public long getReadPoint() { 970 return this.readPt; 971 } 972 973 private static void clearAndClose(List<KeyValueScanner> scanners) { 974 if (scanners == null) { 975 return; 976 } 977 for (KeyValueScanner s : scanners) { 978 s.close(); 979 } 980 scanners.clear(); 981 } 982 983 // Implementation of ChangedReadersObserver 984 @Override 985 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 986 throws IOException { 987 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 988 return; 989 } 990 boolean updateReaders = false; 991 flushLock.lock(); 992 try { 993 if (!closeLock.tryLock()) { 994 // The reason for doing this is that when the current store scanner does not retrieve 995 // any new cells, then the scanner is considered to be done. The heap of this scanner 996 // is not closed till the shipped() call is completed. Hence in that case if at all 997 // the partial close (close (false)) has been called before updateReaders(), there is no 998 // need for the updateReaders() to happen. 999 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 1000 // no lock acquired. 1001 clearAndClose(memStoreScanners); 1002 return; 1003 } 1004 // lock acquired 1005 updateReaders = true; 1006 if (this.closing) { 1007 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 1008 clearAndClose(memStoreScanners); 1009 return; 1010 } 1011 flushed = true; 1012 final boolean isCompaction = false; 1013 boolean usePread = get || scanUsePread; 1014 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 1015 // calls next(). So its better we create scanners here rather than next() call. Ensure 1016 // these scanners are properly closed() whether or not the scan is completed successfully 1017 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 1018 // store files. In case of stream scanners this eager creation does not induce performance 1019 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 1020 List<KeyValueScanner> scanners = 1021 store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher, 1022 scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan)); 1023 flushedstoreFileScanners.addAll(scanners); 1024 if (!CollectionUtils.isEmpty(memStoreScanners)) { 1025 clearAndClose(memStoreScannersAfterFlush); 1026 memStoreScannersAfterFlush.addAll(memStoreScanners); 1027 } 1028 } finally { 1029 flushLock.unlock(); 1030 if (updateReaders) { 1031 closeLock.unlock(); 1032 } 1033 } 1034 // Let the next() call handle re-creating and seeking 1035 } 1036 1037 /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */ 1038 protected final boolean reopenAfterFlush() throws IOException { 1039 // here we can make sure that we have a Store instance so no null check on store. 1040 ExtendedCell lastTop = heap.peek(); 1041 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 1042 // scanners? We did so in the constructor and we could have done it now by storing the scan 1043 // object from the constructor 1044 List<KeyValueScanner> scanners; 1045 flushLock.lock(); 1046 try { 1047 List<KeyValueScanner> allScanners = 1048 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1049 allScanners.addAll(flushedstoreFileScanners); 1050 allScanners.addAll(memStoreScannersAfterFlush); 1051 scanners = selectScannersFrom(store, allScanners); 1052 // Clear the current set of flushed store files scanners so that they don't get added again 1053 flushedstoreFileScanners.clear(); 1054 memStoreScannersAfterFlush.clear(); 1055 } finally { 1056 flushLock.unlock(); 1057 } 1058 1059 // Seek the new scanners to the last key 1060 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1061 // remove the older memstore scanner 1062 for (int i = currentScanners.size() - 1; i >= 0; i--) { 1063 if (!currentScanners.get(i).isFileScanner()) { 1064 scannersForDelayedClose.add(currentScanners.remove(i)); 1065 } else { 1066 // we add the memstore scanner to the end of currentScanners 1067 break; 1068 } 1069 } 1070 // add the newly created scanners on the flushed files and the current active memstore scanner 1071 addCurrentScanners(scanners); 1072 // Combine all seeked scanners with a heap 1073 resetKVHeap(this.currentScanners, store.getComparator()); 1074 resetQueryMatcher(lastTop); 1075 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1076 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() 1077 + ",and after = " + heap.peek()); 1078 topChanged = true; 1079 } else { 1080 topChanged = false; 1081 } 1082 return topChanged; 1083 } 1084 1085 private void resetQueryMatcher(ExtendedCell lastTopKey) { 1086 // Reset the state of the Query Matcher and set to top row. 1087 // Only reset and call setRow if the row changes; avoids confusing the 1088 // query matcher if scanning intra-row. 1089 ExtendedCell cell = heap.peek(); 1090 if (cell == null) { 1091 cell = lastTopKey; 1092 } 1093 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1094 this.countPerRow = 0; 1095 // The setToNewRow will call reset internally 1096 matcher.setToNewRow(cell); 1097 } 1098 } 1099 1100 /** 1101 * Check whether scan as expected order 1102 */ 1103 protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) 1104 throws IOException { 1105 // Check that the heap gives us KVs in an increasing order. 1106 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 1107 : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1108 } 1109 1110 protected boolean seekToNextRow(ExtendedCell c) throws IOException { 1111 return reseek(PrivateCellUtil.createLastOnRow(c)); 1112 } 1113 1114 /** 1115 * Do a reseek in a normal StoreScanner(scan forward) 1116 * @return true if scanner has values left, false if end of scanner 1117 */ 1118 protected boolean seekAsDirection(ExtendedCell kv) throws IOException { 1119 return reseek(kv); 1120 } 1121 1122 @Override 1123 public boolean reseek(ExtendedCell kv) throws IOException { 1124 if (checkFlushed()) { 1125 reopenAfterFlush(); 1126 } 1127 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1128 return heap.requestSeek(kv, true, useRowColBloom); 1129 } 1130 return heap.reseek(kv); 1131 } 1132 1133 void trySwitchToStreamRead() { 1134 if ( 1135 readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null 1136 || bytesRead < preadMaxBytes 1137 ) { 1138 return; 1139 } 1140 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1141 this.store.getColumnFamilyName()); 1142 scanUsePread = false; 1143 ExtendedCell lastTop = heap.peek(); 1144 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1145 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1146 for (KeyValueScanner kvs : currentScanners) { 1147 if (!kvs.isFileScanner()) { 1148 // collect memstorescanners here 1149 memstoreScanners.add(kvs); 1150 } else { 1151 scannersToClose.add(kvs); 1152 } 1153 } 1154 List<KeyValueScanner> fileScanners = null; 1155 List<KeyValueScanner> newCurrentScanners; 1156 KeyValueHeap newHeap; 1157 try { 1158 // We must have a store instance here so no null check 1159 // recreate the scanners on the current file scanners 1160 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, 1161 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 1162 readPt, false); 1163 if (fileScanners == null) { 1164 return; 1165 } 1166 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1167 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1168 newCurrentScanners.addAll(fileScanners); 1169 newCurrentScanners.addAll(memstoreScanners); 1170 newHeap = newKVHeap(newCurrentScanners, comparator); 1171 } catch (Exception e) { 1172 LOG.warn("failed to switch to stream read", e); 1173 if (fileScanners != null) { 1174 fileScanners.forEach(KeyValueScanner::close); 1175 } 1176 return; 1177 } 1178 currentScanners.clear(); 1179 addCurrentScanners(newCurrentScanners); 1180 this.heap = newHeap; 1181 resetQueryMatcher(lastTop); 1182 scannersToClose.forEach(KeyValueScanner::close); 1183 } 1184 1185 protected final boolean checkFlushed() { 1186 // check the var without any lock. Suppose even if we see the old 1187 // value here still it is ok to continue because we will not be resetting 1188 // the heap but will continue with the referenced memstore's snapshot. For compactions 1189 // any way we don't need the updateReaders at all to happen as we still continue with 1190 // the older files 1191 if (flushed) { 1192 // If there is a flush and the current scan is notified on the flush ensure that the 1193 // scan's heap gets reset and we do a seek on the newly flushed file. 1194 if (this.closing) { 1195 return false; 1196 } 1197 // reset the flag 1198 flushed = false; 1199 return true; 1200 } 1201 return false; 1202 } 1203 1204 /** 1205 * Seek storefiles in parallel to optimize IO latency as much as possible 1206 * @param scanners the list {@link KeyValueScanner}s to be read from 1207 * @param kv the KeyValue on which the operation is being requested 1208 */ 1209 private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv) 1210 throws IOException { 1211 if (scanners.isEmpty()) return; 1212 int storeFileScannerCount = scanners.size(); 1213 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1214 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1215 for (KeyValueScanner scanner : scanners) { 1216 if (scanner instanceof StoreFileScanner) { 1217 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch); 1218 executor.submit(seekHandler); 1219 handlers.add(seekHandler); 1220 } else { 1221 scanner.seek(kv); 1222 latch.countDown(); 1223 } 1224 } 1225 1226 try { 1227 latch.await(); 1228 } catch (InterruptedException ie) { 1229 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 1230 } 1231 1232 for (ParallelSeekHandler handler : handlers) { 1233 if (handler.getErr() != null) { 1234 throw new IOException(handler.getErr()); 1235 } 1236 } 1237 } 1238 1239 /** 1240 * Used in testing. 1241 * @return all scanners in no particular order 1242 */ 1243 List<KeyValueScanner> getAllScannersForTesting() { 1244 List<KeyValueScanner> allScanners = new ArrayList<>(); 1245 KeyValueScanner current = heap.getCurrentForTesting(); 1246 if (current != null) allScanners.add(current); 1247 for (KeyValueScanner scanner : heap.getHeap()) 1248 allScanners.add(scanner); 1249 return allScanners; 1250 } 1251 1252 static void enableLazySeekGlobally(boolean enable) { 1253 lazySeekEnabledGlobally = enable; 1254 } 1255 1256 /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */ 1257 public long getEstimatedNumberOfKvsScanned() { 1258 return this.kvsScanned; 1259 } 1260 1261 @Override 1262 public ExtendedCell getNextIndexedKey() { 1263 return this.heap.getNextIndexedKey(); 1264 } 1265 1266 @Override 1267 public void shipped() throws IOException { 1268 if (prevCell != null) { 1269 // Do the copy here so that in case the prevCell ref is pointing to the previous 1270 // blocks we can safely release those blocks. 1271 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1272 // fetched from HDFS. Copying this would ensure that we let go the references to these 1273 // blocks so that they can be GCed safely(in case of bucket cache) 1274 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1275 } 1276 matcher.beforeShipped(); 1277 // There wont be further fetch of Cells from these scanners. Just close. 1278 clearAndClose(scannersForDelayedClose); 1279 if (this.heap != null) { 1280 this.heap.shipped(); 1281 // When switching from pread to stream, we will open a new scanner for each store file, but 1282 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1283 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1284 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1285 // method, so we here will also open new scanners and close old scanners in shipped method. 1286 // See HBASE-18055 for more details. 1287 trySwitchToStreamRead(); 1288 } 1289 } 1290}