001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.Optional; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.locks.ReentrantLock; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.PrivateConstants; 036import org.apache.hadoop.hbase.client.IsolationLevel; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.executor.ExecutorService; 039import org.apache.hadoop.hbase.filter.Filter; 040import org.apache.hadoop.hbase.ipc.RpcCall; 041import org.apache.hadoop.hbase.ipc.RpcServer; 042import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 043import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 044import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 045import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 046import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 047import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 055 056/** 057 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 058 * for a single row. 059 * <p> 060 * The implementation is not thread safe. So there will be no race between next and close. The only 061 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 062 * is a flush. 063 */ 064@InterfaceAudience.Private 065public class StoreScanner extends NonReversedNonLazyKeyValueScanner 066 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 067 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 068 // In unit tests, the store could be null 069 protected final HStore store; 070 private final CellComparator comparator; 071 private ScanQueryMatcher matcher; 072 protected KeyValueHeap heap; 073 private boolean cacheBlocks; 074 075 private long countPerRow = 0; 076 private int storeLimit = -1; 077 private int storeOffset = 0; 078 079 // Used to indicate that the scanner has closed (see HBASE-1107) 080 private volatile boolean closing = false; 081 private final boolean get; 082 private final boolean explicitColumnQuery; 083 private final boolean useRowColBloom; 084 /** 085 * A flag that enables StoreFileScanner parallel-seeking 086 */ 087 private boolean parallelSeekEnabled = false; 088 private ExecutorService executor; 089 private final Scan scan; 090 private final long oldestUnexpiredTS; 091 private final long now; 092 private final int minVersions; 093 private final long maxRowSize; 094 private final long cellsPerHeartbeatCheck; 095 long memstoreOnlyReads; 096 long mixedReads; 097 098 // 1) Collects all the KVHeap that are eagerly getting closed during the 099 // course of a scan 100 // 2) Collects the unused memstore scanners. If we close the memstore scanners 101 // before sending data to client, the chunk may be reclaimed by other 102 // updates and the data will be corrupt. 103 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 104 105 /** 106 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via 107 * seeking to next row/column. TODO: estimate them? 108 */ 109 private long kvsScanned = 0; 110 private Cell prevCell = null; 111 112 private final long preadMaxBytes; 113 private long bytesRead; 114 115 /** We don't ever expect to change this, the constant is just for clarity. */ 116 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 117 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 118 "hbase.storescanner.parallel.seek.enable"; 119 120 /** Used during unit testing to ensure that lazy seek does save seek ops */ 121 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 122 123 /** 124 * The number of cells scanned in between timeout checks. Specifying a larger value means that 125 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 126 * timeout checks. 127 */ 128 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 129 "hbase.cells.scanned.per.heartbeat.check"; 130 131 /** 132 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 133 */ 134 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 135 136 /** 137 * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 138 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 139 * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT, 140 * we will open scanner with stream mode itself. 141 */ 142 public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; 143 144 private final Scan.ReadType readType; 145 146 // A flag whether use pread for scan 147 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 148 private boolean scanUsePread; 149 // Indicates whether there was flush during the course of the scan 150 private volatile boolean flushed = false; 151 // generally we get one file from a flush 152 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 153 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 154 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 155 // The current list of scanners 156 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 157 // flush update lock 158 private final ReentrantLock flushLock = new ReentrantLock(); 159 // lock for closing. 160 private final ReentrantLock closeLock = new ReentrantLock(); 161 162 protected final long readPt; 163 private boolean topChanged = false; 164 165 /** An internal constructor. */ 166 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, 167 boolean cacheBlocks, ScanType scanType) { 168 this.readPt = readPt; 169 this.store = store; 170 this.cacheBlocks = cacheBlocks; 171 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 172 get = scan.isGetScan(); 173 explicitColumnQuery = numColumns > 0; 174 this.scan = scan; 175 this.now = EnvironmentEdgeManager.currentTime(); 176 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 177 this.minVersions = scanInfo.getMinVersions(); 178 179 // We look up row-column Bloom filters for multi-column queries as part of 180 // the seek operation. However, we also look the row-column Bloom filter 181 // for multi-row (non-"get") scans because this is not done in 182 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 183 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null 184 || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); 185 this.maxRowSize = scanInfo.getTableMaxRowSize(); 186 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 187 if (get) { 188 this.readType = Scan.ReadType.PREAD; 189 this.scanUsePread = true; 190 } else if (scanType != ScanType.USER_SCAN) { 191 // For compaction scanners never use Pread as already we have stream based scanners on the 192 // store files to be compacted 193 this.readType = Scan.ReadType.STREAM; 194 this.scanUsePread = false; 195 } else { 196 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 197 if (scanInfo.isUsePread()) { 198 this.readType = Scan.ReadType.PREAD; 199 } else if (this.preadMaxBytes < 0) { 200 this.readType = Scan.ReadType.STREAM; 201 } else { 202 this.readType = Scan.ReadType.DEFAULT; 203 } 204 } else { 205 this.readType = scan.getReadType(); 206 } 207 // Always start with pread unless user specific stream. Will change to stream later if 208 // readType is default if the scan keeps running for a long time. 209 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 210 } 211 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 212 // Parallel seeking is on if the config allows and more there is more than one store file. 213 if (store != null && store.getStorefilesCount() > 1) { 214 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 215 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 216 this.parallelSeekEnabled = true; 217 this.executor = rsService.getExecutorService(); 218 } 219 } 220 } 221 222 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 223 this.currentScanners.addAll(scanners); 224 } 225 226 /** 227 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a 228 * compaction. 229 * @param store who we scan 230 * @param scan the spec 231 * @param columns which columns we are scanning 232 */ 233 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 234 long readPt) throws IOException { 235 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), 236 ScanType.USER_SCAN); 237 if (columns != null && scan.isRaw()) { 238 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 239 } 240 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 241 store.getCoprocessorHost()); 242 243 store.addChangedReaderObserver(this); 244 245 List<KeyValueScanner> scanners = null; 246 try { 247 // Pass columns to try to filter out unnecessary StoreFiles. 248 scanners = selectScannersFrom(store, 249 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 250 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); 251 252 // Seek all scanners to the start of the Row (or if the exact matching row 253 // key does not exist, then to the start of the next matching Row). 254 // Always check bloom filter to optimize the top row seek for delete 255 // family marker. 256 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 257 parallelSeekEnabled); 258 259 // set storeLimit 260 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 261 262 // set rowOffset 263 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 264 addCurrentScanners(scanners); 265 // Combine all seeked scanners with a heap 266 resetKVHeap(scanners, comparator); 267 } catch (IOException e) { 268 clearAndClose(scanners); 269 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 270 // and might cause memory leak 271 store.deleteChangedReaderObserver(this); 272 throw e; 273 } 274 } 275 276 // a dummy scan instance for compaction. 277 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 278 279 /** 280 * Used for store file compaction and memstore compaction. 281 * <p> 282 * Opens a scanner across specified StoreFiles/MemStoreSegments. 283 * @param store who we scan 284 * @param scanners ancillary scanners 285 * @param smallestReadPoint the readPoint that we should use for tracking versions 286 */ 287 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 288 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 289 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 290 } 291 292 /** 293 * Used for compactions that drop deletes from a limited range of rows. 294 * <p> 295 * Opens a scanner across specified StoreFiles. 296 * @param store who we scan 297 * @param scanners ancillary scanners 298 * @param smallestReadPoint the readPoint that we should use for tracking versions 299 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 300 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 301 */ 302 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 303 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 304 throws IOException { 305 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 306 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 307 } 308 309 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 310 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 311 byte[] dropDeletesToRow) throws IOException { 312 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 313 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 314 assert scanType != ScanType.USER_SCAN; 315 matcher = 316 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 317 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 318 319 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 320 scanners = selectScannersFrom(store, scanners); 321 322 // Seek all scanners to the initial key 323 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 324 addCurrentScanners(scanners); 325 // Combine all seeked scanners with a heap 326 resetKVHeap(scanners, comparator); 327 } 328 329 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 330 throws IOException { 331 // Seek all scanners to the initial key 332 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 333 addCurrentScanners(scanners); 334 resetKVHeap(scanners, comparator); 335 } 336 337 // For mob compaction only as we do not have a Store instance when doing mob compaction. 338 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 339 List<? extends KeyValueScanner> scanners) throws IOException { 340 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 341 assert scanType != ScanType.USER_SCAN; 342 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 343 oldestUnexpiredTS, now, null, null, null); 344 seekAllScanner(scanInfo, scanners); 345 } 346 347 // Used to instantiate a scanner for user scan in test 348 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 349 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 350 // 0 is passed as readpoint because the test bypasses Store 351 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 352 scanType); 353 if (scanType == ScanType.USER_SCAN) { 354 this.matcher = 355 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 356 } else { 357 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 358 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 359 } 360 seekAllScanner(scanInfo, scanners); 361 } 362 363 // Used to instantiate a scanner for user scan in test 364 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 365 List<? extends KeyValueScanner> scanners) throws IOException { 366 // 0 is passed as readpoint because the test bypasses Store 367 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 368 ScanType.USER_SCAN); 369 this.matcher = 370 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 371 seekAllScanner(scanInfo, scanners); 372 } 373 374 // Used to instantiate a scanner for compaction in test 375 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 376 List<? extends KeyValueScanner> scanners) throws IOException { 377 // 0 is passed as readpoint because the test bypasses Store 378 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION, 379 scanInfo, 0, 0L, false, scanType); 380 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 381 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 382 seekAllScanner(scanInfo, scanners); 383 } 384 385 boolean isScanUsePread() { 386 return this.scanUsePread; 387 } 388 389 /** 390 * Seek the specified scanners with the given key 391 * @param isLazy true if using lazy seek 392 * @param isParallelSeek true if using parallel seek 393 */ 394 protected void seekScanners(List<? extends KeyValueScanner> scanners, Cell seekKey, 395 boolean isLazy, boolean isParallelSeek) throws IOException { 396 // Seek all scanners to the start of the Row (or if the exact matching row 397 // key does not exist, then to the start of the next matching Row). 398 // Always check bloom filter to optimize the top row seek for delete 399 // family marker. 400 if (isLazy) { 401 for (KeyValueScanner scanner : scanners) { 402 scanner.requestSeek(seekKey, false, true); 403 } 404 } else { 405 if (!isParallelSeek) { 406 long totalScannersSoughtBytes = 0; 407 for (KeyValueScanner scanner : scanners) { 408 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 409 throw new RowTooBigException( 410 "Max row size allowed: " + maxRowSize + ", but row is bigger than that"); 411 } 412 scanner.seek(seekKey); 413 Cell c = scanner.peek(); 414 if (c != null) { 415 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 416 } 417 } 418 } else { 419 parallelSeek(scanners, seekKey); 420 } 421 } 422 } 423 424 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 425 throws IOException { 426 // Combine all seeked scanners with a heap 427 heap = newKVHeap(scanners, comparator); 428 } 429 430 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 431 CellComparator comparator) throws IOException { 432 return new KeyValueHeap(scanners, comparator); 433 } 434 435 /** 436 * Filters the given list of scanners using Bloom filter, time range, and TTL. 437 * <p> 438 * Will be overridden by testcase so declared as protected. 439 */ 440 protected List<KeyValueScanner> selectScannersFrom(HStore store, 441 List<? extends KeyValueScanner> allScanners) { 442 boolean memOnly; 443 boolean filesOnly; 444 if (scan instanceof InternalScan) { 445 InternalScan iscan = (InternalScan) scan; 446 memOnly = iscan.isCheckOnlyMemStore(); 447 filesOnly = iscan.isCheckOnlyStoreFiles(); 448 } else { 449 memOnly = false; 450 filesOnly = false; 451 } 452 453 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 454 455 // We can only exclude store files based on TTL if minVersions is set to 0. 456 // Otherwise, we might have to return KVs that have technically expired. 457 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 458 459 // include only those scan files which pass all filters 460 for (KeyValueScanner kvs : allScanners) { 461 boolean isFile = kvs.isFileScanner(); 462 if ((!isFile && filesOnly) || (isFile && memOnly)) { 463 kvs.close(); 464 continue; 465 } 466 467 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 468 scanners.add(kvs); 469 } else { 470 kvs.close(); 471 } 472 } 473 return scanners; 474 } 475 476 @Override 477 public Cell peek() { 478 return heap != null ? heap.peek() : null; 479 } 480 481 @Override 482 public KeyValue next() { 483 // throw runtime exception perhaps? 484 throw new RuntimeException("Never call StoreScanner.next()"); 485 } 486 487 @Override 488 public void close() { 489 close(true); 490 } 491 492 private void close(boolean withDelayedScannersClose) { 493 closeLock.lock(); 494 // If the closeLock is acquired then any subsequent updateReaders() 495 // call is ignored. 496 try { 497 if (this.closing) { 498 return; 499 } 500 if (withDelayedScannersClose) { 501 this.closing = true; 502 } 503 // For mob compaction, we do not have a store. 504 if (this.store != null) { 505 this.store.deleteChangedReaderObserver(this); 506 } 507 if (withDelayedScannersClose) { 508 clearAndClose(scannersForDelayedClose); 509 clearAndClose(memStoreScannersAfterFlush); 510 clearAndClose(flushedstoreFileScanners); 511 if (this.heap != null) { 512 this.heap.close(); 513 this.currentScanners.clear(); 514 this.heap = null; // CLOSED! 515 } 516 } else { 517 if (this.heap != null) { 518 this.scannersForDelayedClose.add(this.heap); 519 this.currentScanners.clear(); 520 this.heap = null; 521 } 522 } 523 } finally { 524 closeLock.unlock(); 525 } 526 } 527 528 @Override 529 public boolean seek(Cell key) throws IOException { 530 if (checkFlushed()) { 531 reopenAfterFlush(); 532 } 533 return this.heap.seek(key); 534 } 535 536 /** 537 * Get the next row of values from this Store. 538 * @return true if there are more rows, false if scanner is done 539 */ 540 @Override 541 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { 542 if (scannerContext == null) { 543 throw new IllegalArgumentException("Scanner context cannot be null"); 544 } 545 if (checkFlushed() && reopenAfterFlush()) { 546 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 547 } 548 549 // if the heap was left null, then the scanners had previously run out anyways, close and 550 // return. 551 if (this.heap == null) { 552 // By this time partial close should happened because already heap is null 553 close(false);// Do all cleanup except heap.close() 554 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 555 } 556 557 Cell cell = this.heap.peek(); 558 if (cell == null) { 559 close(false);// Do all cleanup except heap.close() 560 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 561 } 562 563 // only call setRow if the row changes; avoids confusing the query matcher 564 // if scanning intra-row 565 566 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 567 // rows. Else it is possible we are still traversing the same row so we must perform the row 568 // comparison. 569 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 570 this.countPerRow = 0; 571 matcher.setToNewRow(cell); 572 } 573 574 // Clear progress away unless invoker has indicated it should be kept. 575 if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { 576 scannerContext.clearProgress(); 577 } 578 579 Optional<RpcCall> rpcCall = 580 matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); 581 582 int count = 0; 583 long totalBytesRead = 0; 584 // track the cells for metrics only if it is a user read request. 585 boolean onlyFromMemstore = matcher.isUserScan(); 586 try { 587 LOOP: do { 588 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 589 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 590 // in 591 // the shipped method below. 592 if ( 593 kvsScanned % cellsPerHeartbeatCheck == 0 594 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) 595 ) { 596 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 597 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 598 } 599 } 600 // Do object compare - we set prevKV from the same heap. 601 if (prevCell != cell) { 602 ++kvsScanned; 603 } 604 checkScanOrder(prevCell, cell, comparator); 605 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 606 bytesRead += cellSize; 607 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 608 // return immediately if we want to switch from pread to stream. We need this because we 609 // can 610 // only switch in the shipped method, if user use a filter to filter out everything and 611 // rpc 612 // timeout is very large then the shipped method will never be called until the whole scan 613 // is finished, but at that time we have already scan all the data... 614 // See HBASE-20457 for more details. 615 // And there is still a scenario that can not be handled. If we have a very large row, 616 // which 617 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 618 // here, we still need to scan all the qualifiers before returning... 619 scannerContext.returnImmediately(); 620 } 621 622 heap.recordBlockSize(blockSize -> { 623 if (rpcCall.isPresent()) { 624 rpcCall.get().incrementBlockBytesScanned(blockSize); 625 } 626 scannerContext.incrementBlockProgress(blockSize); 627 }); 628 629 prevCell = cell; 630 scannerContext.setLastPeekedCell(cell); 631 topChanged = false; 632 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 633 switch (qcode) { 634 case INCLUDE: 635 case INCLUDE_AND_SEEK_NEXT_ROW: 636 case INCLUDE_AND_SEEK_NEXT_COL: 637 Filter f = matcher.getFilter(); 638 if (f != null) { 639 cell = f.transformCell(cell); 640 } 641 this.countPerRow++; 642 643 // add to results only if we have skipped #storeOffset kvs 644 // also update metric accordingly 645 if (this.countPerRow > storeOffset) { 646 outResult.add(cell); 647 648 // Update local tracking information 649 count++; 650 totalBytesRead += cellSize; 651 652 /** 653 * Increment the metric if all the cells are from memstore. If not we will account it 654 * for mixed reads 655 */ 656 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 657 // Update the progress of the scanner context 658 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 659 scannerContext.incrementBatchProgress(1); 660 661 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 662 String message = "Max row size allowed: " + maxRowSize 663 + ", but the row is bigger than that, the row info: " 664 + CellUtil.toString(cell, false) + ", already have process row cells = " 665 + outResult.size() + ", it belong to region = " 666 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 667 LOG.warn(message); 668 throw new RowTooBigException(message); 669 } 670 671 if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) { 672 // do what SEEK_NEXT_ROW does. 673 if (!matcher.moreRowsMayExistAfter(cell)) { 674 close(false);// Do all cleanup except heap.close() 675 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 676 } 677 matcher.clearCurrentRow(); 678 seekToNextRow(cell); 679 break LOOP; 680 } 681 } 682 683 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 684 if (!matcher.moreRowsMayExistAfter(cell)) { 685 close(false);// Do all cleanup except heap.close() 686 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 687 } 688 matcher.clearCurrentRow(); 689 seekOrSkipToNextRow(cell); 690 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 691 seekOrSkipToNextColumn(cell); 692 } else { 693 this.heap.next(); 694 } 695 696 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 697 break LOOP; 698 } 699 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 700 break LOOP; 701 } 702 continue; 703 704 case DONE: 705 // Optimization for Gets! If DONE, no more to get on this row, early exit! 706 if (get) { 707 // Then no more to this row... exit. 708 close(false);// Do all cleanup except heap.close() 709 // update metric 710 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 711 } 712 matcher.clearCurrentRow(); 713 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 714 715 case DONE_SCAN: 716 close(false);// Do all cleanup except heap.close() 717 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 718 719 case SEEK_NEXT_ROW: 720 // This is just a relatively simple end of scan fix, to short-cut end 721 // us if there is an endKey in the scan. 722 if (!matcher.moreRowsMayExistAfter(cell)) { 723 close(false);// Do all cleanup except heap.close() 724 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 725 } 726 matcher.clearCurrentRow(); 727 seekOrSkipToNextRow(cell); 728 NextState stateAfterSeekNextRow = needToReturn(outResult); 729 if (stateAfterSeekNextRow != null) { 730 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 731 } 732 break; 733 734 case SEEK_NEXT_COL: 735 seekOrSkipToNextColumn(cell); 736 NextState stateAfterSeekNextColumn = needToReturn(outResult); 737 if (stateAfterSeekNextColumn != null) { 738 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 739 } 740 break; 741 742 case SKIP: 743 this.heap.next(); 744 break; 745 746 case SEEK_NEXT_USING_HINT: 747 Cell nextKV = matcher.getNextKeyHint(cell); 748 if (nextKV != null) { 749 int difference = comparator.compare(nextKV, cell); 750 if ( 751 ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) 752 ) { 753 seekAsDirection(nextKV); 754 NextState stateAfterSeekByHint = needToReturn(outResult); 755 if (stateAfterSeekByHint != null) { 756 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 757 } 758 break; 759 } 760 } 761 heap.next(); 762 break; 763 764 default: 765 throw new RuntimeException("UNEXPECTED"); 766 } 767 768 // One last chance to break due to size limit. The INCLUDE* cases above already check 769 // limit and continue. For the various filtered cases, we need to check because block 770 // size limit may have been exceeded even if we don't add cells to result list. 771 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 772 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 773 } 774 } while ((cell = this.heap.peek()) != null); 775 776 if (count > 0) { 777 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 778 } 779 780 // No more keys 781 close(false);// Do all cleanup except heap.close() 782 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 783 } finally { 784 // increment only if we have some result 785 if (count > 0 && matcher.isUserScan()) { 786 // if true increment memstore metrics, if not the mixed one 787 updateMetricsStore(onlyFromMemstore); 788 } 789 } 790 } 791 792 private void updateMetricsStore(boolean memstoreRead) { 793 if (store != null) { 794 store.updateMetricsStore(memstoreRead); 795 } else { 796 // for testing. 797 if (memstoreRead) { 798 memstoreOnlyReads++; 799 } else { 800 mixedReads++; 801 } 802 } 803 } 804 805 /** 806 * If the top cell won't be flushed into disk, the new top cell may be changed after 807 * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the 808 * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell 809 * is changed, we should return the current cells. Otherwise, we may return the cells across 810 * different rows. 811 * @param outResult the cells which are visible for user scan 812 * @return null is the top cell doesn't change. Otherwise, the NextState to return 813 */ 814 private NextState needToReturn(List<Cell> outResult) { 815 if (!outResult.isEmpty() && topChanged) { 816 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 817 } 818 return null; 819 } 820 821 private void seekOrSkipToNextRow(Cell cell) throws IOException { 822 // If it is a Get Scan, then we know that we are done with this row; there are no more 823 // rows beyond the current one: don't try to optimize. 824 if (!get) { 825 if (trySkipToNextRow(cell)) { 826 return; 827 } 828 } 829 seekToNextRow(cell); 830 } 831 832 private void seekOrSkipToNextColumn(Cell cell) throws IOException { 833 if (!trySkipToNextColumn(cell)) { 834 seekAsDirection(matcher.getKeyForNextColumn(cell)); 835 } 836 } 837 838 /** 839 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 840 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an 841 * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to 842 * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the 843 * current, loaded block). It does this by looking at the next indexed key of the current HFile. 844 * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible 845 * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work 846 * with the current Cell but compare as though it were a seek key; see down in 847 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK, 848 * otherwise we just SKIP to the next requested cell. 849 * <p> 850 * Other notes: 851 * <ul> 852 * <li>Rows can straddle block boundaries</li> 853 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 854 * different block than column C1 at T2)</li> 855 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few 856 * SKIPs...</li> 857 * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells, 858 * especially if we know we need to go to the next block.</li> 859 * </ul> 860 * <p> 861 * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll 862 * likely end up seeking to the next block (or past the next block) to get our next column. 863 * Example: 864 * 865 * <pre> 866 * | BLOCK 1 | BLOCK 2 | 867 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 868 * ^ ^ 869 * | | 870 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 871 * 872 * 873 * | BLOCK 1 | BLOCK 2 | 874 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 875 * ^ ^ 876 * | | 877 * Next Index Key SEEK_NEXT_COL 878 * </pre> 879 * 880 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 881 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 882 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 883 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 884 * where the SEEK will not land us in the next block, it is very likely better to issues a series 885 * of SKIPs. 886 * @param cell current cell 887 * @return true means skip to next row, false means not 888 */ 889 protected boolean trySkipToNextRow(Cell cell) throws IOException { 890 Cell nextCell = null; 891 // used to guard against a changed next indexed key by doing a identity comparison 892 // when the identity changes we need to compare the bytes again 893 Cell previousIndexedKey = null; 894 do { 895 Cell nextIndexedKey = getNextIndexedKey(); 896 if ( 897 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 898 && (nextIndexedKey == previousIndexedKey 899 || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) 900 ) { 901 this.heap.next(); 902 ++kvsScanned; 903 previousIndexedKey = nextIndexedKey; 904 } else { 905 return false; 906 } 907 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 908 return true; 909 } 910 911 /** 912 * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)} 913 * @param cell current cell 914 * @return true means skip to next column, false means not 915 */ 916 protected boolean trySkipToNextColumn(Cell cell) throws IOException { 917 Cell nextCell = null; 918 // used to guard against a changed next indexed key by doing a identity comparison 919 // when the identity changes we need to compare the bytes again 920 Cell previousIndexedKey = null; 921 do { 922 Cell nextIndexedKey = getNextIndexedKey(); 923 if ( 924 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 925 && (nextIndexedKey == previousIndexedKey 926 || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) 927 ) { 928 this.heap.next(); 929 ++kvsScanned; 930 previousIndexedKey = nextIndexedKey; 931 } else { 932 return false; 933 } 934 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 935 // We need this check because it may happen that the new scanner that we get 936 // during heap.next() is requiring reseek due of fake KV previously generated for 937 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 938 if ( 939 useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP 940 ) { 941 return false; 942 } 943 return true; 944 } 945 946 @Override 947 public long getReadPoint() { 948 return this.readPt; 949 } 950 951 private static void clearAndClose(List<KeyValueScanner> scanners) { 952 if (scanners == null) { 953 return; 954 } 955 for (KeyValueScanner s : scanners) { 956 s.close(); 957 } 958 scanners.clear(); 959 } 960 961 // Implementation of ChangedReadersObserver 962 @Override 963 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 964 throws IOException { 965 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 966 return; 967 } 968 boolean updateReaders = false; 969 flushLock.lock(); 970 try { 971 if (!closeLock.tryLock()) { 972 // The reason for doing this is that when the current store scanner does not retrieve 973 // any new cells, then the scanner is considered to be done. The heap of this scanner 974 // is not closed till the shipped() call is completed. Hence in that case if at all 975 // the partial close (close (false)) has been called before updateReaders(), there is no 976 // need for the updateReaders() to happen. 977 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 978 // no lock acquired. 979 clearAndClose(memStoreScanners); 980 return; 981 } 982 // lock acquired 983 updateReaders = true; 984 if (this.closing) { 985 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 986 clearAndClose(memStoreScanners); 987 return; 988 } 989 flushed = true; 990 final boolean isCompaction = false; 991 boolean usePread = get || scanUsePread; 992 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 993 // calls next(). So its better we create scanners here rather than next() call. Ensure 994 // these scanners are properly closed() whether or not the scan is completed successfully 995 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 996 // store files. In case of stream scanners this eager creation does not induce performance 997 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 998 List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread, 999 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); 1000 flushedstoreFileScanners.addAll(scanners); 1001 if (!CollectionUtils.isEmpty(memStoreScanners)) { 1002 clearAndClose(memStoreScannersAfterFlush); 1003 memStoreScannersAfterFlush.addAll(memStoreScanners); 1004 } 1005 } finally { 1006 flushLock.unlock(); 1007 if (updateReaders) { 1008 closeLock.unlock(); 1009 } 1010 } 1011 // Let the next() call handle re-creating and seeking 1012 } 1013 1014 /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */ 1015 protected final boolean reopenAfterFlush() throws IOException { 1016 // here we can make sure that we have a Store instance so no null check on store. 1017 Cell lastTop = heap.peek(); 1018 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 1019 // scanners? We did so in the constructor and we could have done it now by storing the scan 1020 // object from the constructor 1021 List<KeyValueScanner> scanners; 1022 flushLock.lock(); 1023 try { 1024 List<KeyValueScanner> allScanners = 1025 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1026 allScanners.addAll(flushedstoreFileScanners); 1027 allScanners.addAll(memStoreScannersAfterFlush); 1028 scanners = selectScannersFrom(store, allScanners); 1029 // Clear the current set of flushed store files scanners so that they don't get added again 1030 flushedstoreFileScanners.clear(); 1031 memStoreScannersAfterFlush.clear(); 1032 } finally { 1033 flushLock.unlock(); 1034 } 1035 1036 // Seek the new scanners to the last key 1037 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1038 // remove the older memstore scanner 1039 for (int i = currentScanners.size() - 1; i >= 0; i--) { 1040 if (!currentScanners.get(i).isFileScanner()) { 1041 scannersForDelayedClose.add(currentScanners.remove(i)); 1042 } else { 1043 // we add the memstore scanner to the end of currentScanners 1044 break; 1045 } 1046 } 1047 // add the newly created scanners on the flushed files and the current active memstore scanner 1048 addCurrentScanners(scanners); 1049 // Combine all seeked scanners with a heap 1050 resetKVHeap(this.currentScanners, store.getComparator()); 1051 resetQueryMatcher(lastTop); 1052 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1053 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() 1054 + ",and after = " + heap.peek()); 1055 topChanged = true; 1056 } else { 1057 topChanged = false; 1058 } 1059 return topChanged; 1060 } 1061 1062 private void resetQueryMatcher(Cell lastTopKey) { 1063 // Reset the state of the Query Matcher and set to top row. 1064 // Only reset and call setRow if the row changes; avoids confusing the 1065 // query matcher if scanning intra-row. 1066 Cell cell = heap.peek(); 1067 if (cell == null) { 1068 cell = lastTopKey; 1069 } 1070 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1071 this.countPerRow = 0; 1072 // The setToNewRow will call reset internally 1073 matcher.setToNewRow(cell); 1074 } 1075 } 1076 1077 /** 1078 * Check whether scan as expected order 1079 */ 1080 protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) 1081 throws IOException { 1082 // Check that the heap gives us KVs in an increasing order. 1083 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 1084 : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1085 } 1086 1087 protected boolean seekToNextRow(Cell c) throws IOException { 1088 return reseek(PrivateCellUtil.createLastOnRow(c)); 1089 } 1090 1091 /** 1092 * Do a reseek in a normal StoreScanner(scan forward) 1093 * @return true if scanner has values left, false if end of scanner 1094 */ 1095 protected boolean seekAsDirection(Cell kv) throws IOException { 1096 return reseek(kv); 1097 } 1098 1099 @Override 1100 public boolean reseek(Cell kv) throws IOException { 1101 if (checkFlushed()) { 1102 reopenAfterFlush(); 1103 } 1104 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1105 return heap.requestSeek(kv, true, useRowColBloom); 1106 } 1107 return heap.reseek(kv); 1108 } 1109 1110 void trySwitchToStreamRead() { 1111 if ( 1112 readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null 1113 || bytesRead < preadMaxBytes 1114 ) { 1115 return; 1116 } 1117 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1118 this.store.getColumnFamilyName()); 1119 scanUsePread = false; 1120 Cell lastTop = heap.peek(); 1121 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1122 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1123 for (KeyValueScanner kvs : currentScanners) { 1124 if (!kvs.isFileScanner()) { 1125 // collect memstorescanners here 1126 memstoreScanners.add(kvs); 1127 } else { 1128 scannersToClose.add(kvs); 1129 } 1130 } 1131 List<KeyValueScanner> fileScanners = null; 1132 List<KeyValueScanner> newCurrentScanners; 1133 KeyValueHeap newHeap; 1134 try { 1135 // We must have a store instance here so no null check 1136 // recreate the scanners on the current file scanners 1137 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, 1138 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 1139 readPt, false); 1140 if (fileScanners == null) { 1141 return; 1142 } 1143 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1144 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1145 newCurrentScanners.addAll(fileScanners); 1146 newCurrentScanners.addAll(memstoreScanners); 1147 newHeap = newKVHeap(newCurrentScanners, comparator); 1148 } catch (Exception e) { 1149 LOG.warn("failed to switch to stream read", e); 1150 if (fileScanners != null) { 1151 fileScanners.forEach(KeyValueScanner::close); 1152 } 1153 return; 1154 } 1155 currentScanners.clear(); 1156 addCurrentScanners(newCurrentScanners); 1157 this.heap = newHeap; 1158 resetQueryMatcher(lastTop); 1159 scannersToClose.forEach(KeyValueScanner::close); 1160 } 1161 1162 protected final boolean checkFlushed() { 1163 // check the var without any lock. Suppose even if we see the old 1164 // value here still it is ok to continue because we will not be resetting 1165 // the heap but will continue with the referenced memstore's snapshot. For compactions 1166 // any way we don't need the updateReaders at all to happen as we still continue with 1167 // the older files 1168 if (flushed) { 1169 // If there is a flush and the current scan is notified on the flush ensure that the 1170 // scan's heap gets reset and we do a seek on the newly flushed file. 1171 if (this.closing) { 1172 return false; 1173 } 1174 // reset the flag 1175 flushed = false; 1176 return true; 1177 } 1178 return false; 1179 } 1180 1181 /** 1182 * Seek storefiles in parallel to optimize IO latency as much as possible 1183 * @param scanners the list {@link KeyValueScanner}s to be read from 1184 * @param kv the KeyValue on which the operation is being requested 1185 */ 1186 private void parallelSeek(final List<? extends KeyValueScanner> scanners, final Cell kv) 1187 throws IOException { 1188 if (scanners.isEmpty()) return; 1189 int storeFileScannerCount = scanners.size(); 1190 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1191 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1192 for (KeyValueScanner scanner : scanners) { 1193 if (scanner instanceof StoreFileScanner) { 1194 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch); 1195 executor.submit(seekHandler); 1196 handlers.add(seekHandler); 1197 } else { 1198 scanner.seek(kv); 1199 latch.countDown(); 1200 } 1201 } 1202 1203 try { 1204 latch.await(); 1205 } catch (InterruptedException ie) { 1206 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 1207 } 1208 1209 for (ParallelSeekHandler handler : handlers) { 1210 if (handler.getErr() != null) { 1211 throw new IOException(handler.getErr()); 1212 } 1213 } 1214 } 1215 1216 /** 1217 * Used in testing. 1218 * @return all scanners in no particular order 1219 */ 1220 List<KeyValueScanner> getAllScannersForTesting() { 1221 List<KeyValueScanner> allScanners = new ArrayList<>(); 1222 KeyValueScanner current = heap.getCurrentForTesting(); 1223 if (current != null) allScanners.add(current); 1224 for (KeyValueScanner scanner : heap.getHeap()) 1225 allScanners.add(scanner); 1226 return allScanners; 1227 } 1228 1229 static void enableLazySeekGlobally(boolean enable) { 1230 lazySeekEnabledGlobally = enable; 1231 } 1232 1233 /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */ 1234 public long getEstimatedNumberOfKvsScanned() { 1235 return this.kvsScanned; 1236 } 1237 1238 @Override 1239 public Cell getNextIndexedKey() { 1240 return this.heap.getNextIndexedKey(); 1241 } 1242 1243 @Override 1244 public void shipped() throws IOException { 1245 if (prevCell != null) { 1246 // Do the copy here so that in case the prevCell ref is pointing to the previous 1247 // blocks we can safely release those blocks. 1248 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1249 // fetched from HDFS. Copying this would ensure that we let go the references to these 1250 // blocks so that they can be GCed safely(in case of bucket cache) 1251 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1252 } 1253 matcher.beforeShipped(); 1254 // There wont be further fetch of Cells from these scanners. Just close. 1255 clearAndClose(scannersForDelayedClose); 1256 if (this.heap != null) { 1257 this.heap.shipped(); 1258 // When switching from pread to stream, we will open a new scanner for each store file, but 1259 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1260 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1261 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1262 // method, so we here will also open new scanners and close old scanners in shipped method. 1263 // See HBASE-18055 for more details. 1264 trySwitchToStreamRead(); 1265 } 1266 } 1267}