001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.locks.ReentrantLock; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellComparator; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.client.IsolationLevel; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.executor.ExecutorService; 038import org.apache.hadoop.hbase.filter.Filter; 039import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 040import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 041import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 042import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 043import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 044import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 051import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 052 053/** 054 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 055 * for a single row. 056 * <p> 057 * The implementation is not thread safe. So there will be no race between next and close. The only 058 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 059 * is a flush. 060 */ 061@InterfaceAudience.Private 062public class StoreScanner extends NonReversedNonLazyKeyValueScanner 063 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 064 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 065 // In unit tests, the store could be null 066 protected final HStore store; 067 private final CellComparator comparator; 068 private ScanQueryMatcher matcher; 069 protected KeyValueHeap heap; 070 private boolean cacheBlocks; 071 072 private long countPerRow = 0; 073 private int storeLimit = -1; 074 private int storeOffset = 0; 075 076 // Used to indicate that the scanner has closed (see HBASE-1107) 077 private volatile boolean closing = false; 078 private final boolean get; 079 private final boolean explicitColumnQuery; 080 private final boolean useRowColBloom; 081 /** 082 * A flag that enables StoreFileScanner parallel-seeking 083 */ 084 private boolean parallelSeekEnabled = false; 085 private ExecutorService executor; 086 private final Scan scan; 087 private final long oldestUnexpiredTS; 088 private final long now; 089 private final int minVersions; 090 private final long maxRowSize; 091 private final long cellsPerHeartbeatCheck; 092 long memstoreOnlyReads; 093 long mixedReads; 094 095 // 1) Collects all the KVHeap that are eagerly getting closed during the 096 // course of a scan 097 // 2) Collects the unused memstore scanners. If we close the memstore scanners 098 // before sending data to client, the chunk may be reclaimed by other 099 // updates and the data will be corrupt. 100 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 101 102 /** 103 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via 104 * seeking to next row/column. TODO: estimate them? 105 */ 106 private long kvsScanned = 0; 107 private Cell prevCell = null; 108 109 private final long preadMaxBytes; 110 private long bytesRead; 111 112 /** We don't ever expect to change this, the constant is just for clarity. */ 113 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 114 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 115 "hbase.storescanner.parallel.seek.enable"; 116 117 /** Used during unit testing to ensure that lazy seek does save seek ops */ 118 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 119 120 /** 121 * The number of cells scanned in between timeout checks. Specifying a larger value means that 122 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 123 * timeout checks. 124 */ 125 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 126 "hbase.cells.scanned.per.heartbeat.check"; 127 128 /** 129 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 130 */ 131 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 132 133 /** 134 * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 135 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 136 * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT, 137 * we will open scanner with stream mode itself. 138 */ 139 public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; 140 141 private final Scan.ReadType readType; 142 143 // A flag whether use pread for scan 144 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 145 private boolean scanUsePread; 146 // Indicates whether there was flush during the course of the scan 147 private volatile boolean flushed = false; 148 // generally we get one file from a flush 149 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 150 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 151 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 152 // The current list of scanners 153 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 154 // flush update lock 155 private final ReentrantLock flushLock = new ReentrantLock(); 156 // lock for closing. 157 private final ReentrantLock closeLock = new ReentrantLock(); 158 159 protected final long readPt; 160 private boolean topChanged = false; 161 162 /** An internal constructor. */ 163 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, 164 boolean cacheBlocks, ScanType scanType) { 165 this.readPt = readPt; 166 this.store = store; 167 this.cacheBlocks = cacheBlocks; 168 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 169 get = scan.isGetScan(); 170 explicitColumnQuery = numColumns > 0; 171 this.scan = scan; 172 this.now = EnvironmentEdgeManager.currentTime(); 173 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 174 this.minVersions = scanInfo.getMinVersions(); 175 176 // We look up row-column Bloom filters for multi-column queries as part of 177 // the seek operation. However, we also look the row-column Bloom filter 178 // for multi-row (non-"get") scans because this is not done in 179 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 180 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null 181 || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); 182 this.maxRowSize = scanInfo.getTableMaxRowSize(); 183 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 184 if (get) { 185 this.readType = Scan.ReadType.PREAD; 186 this.scanUsePread = true; 187 } else if (scanType != ScanType.USER_SCAN) { 188 // For compaction scanners never use Pread as already we have stream based scanners on the 189 // store files to be compacted 190 this.readType = Scan.ReadType.STREAM; 191 this.scanUsePread = false; 192 } else { 193 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 194 if (scanInfo.isUsePread()) { 195 this.readType = Scan.ReadType.PREAD; 196 } else if (this.preadMaxBytes < 0) { 197 this.readType = Scan.ReadType.STREAM; 198 } else { 199 this.readType = Scan.ReadType.DEFAULT; 200 } 201 } else { 202 this.readType = scan.getReadType(); 203 } 204 // Always start with pread unless user specific stream. Will change to stream later if 205 // readType is default if the scan keeps running for a long time. 206 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 207 } 208 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 209 // Parallel seeking is on if the config allows and more there is more than one store file. 210 if (store != null && store.getStorefilesCount() > 1) { 211 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 212 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 213 this.parallelSeekEnabled = true; 214 this.executor = rsService.getExecutorService(); 215 } 216 } 217 } 218 219 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 220 this.currentScanners.addAll(scanners); 221 } 222 223 /** 224 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a 225 * compaction. 226 * @param store who we scan 227 * @param scan the spec 228 * @param columns which columns we are scanning n 229 */ 230 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 231 long readPt) throws IOException { 232 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), 233 ScanType.USER_SCAN); 234 if (columns != null && scan.isRaw()) { 235 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 236 } 237 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 238 store.getCoprocessorHost()); 239 240 store.addChangedReaderObserver(this); 241 242 List<KeyValueScanner> scanners = null; 243 try { 244 // Pass columns to try to filter out unnecessary StoreFiles. 245 scanners = selectScannersFrom(store, 246 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 247 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); 248 249 // Seek all scanners to the start of the Row (or if the exact matching row 250 // key does not exist, then to the start of the next matching Row). 251 // Always check bloom filter to optimize the top row seek for delete 252 // family marker. 253 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 254 parallelSeekEnabled); 255 256 // set storeLimit 257 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 258 259 // set rowOffset 260 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 261 addCurrentScanners(scanners); 262 // Combine all seeked scanners with a heap 263 resetKVHeap(scanners, comparator); 264 } catch (IOException e) { 265 clearAndClose(scanners); 266 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 267 // and might cause memory leak 268 store.deleteChangedReaderObserver(this); 269 throw e; 270 } 271 } 272 273 // a dummy scan instance for compaction. 274 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 275 276 /** 277 * Used for store file compaction and memstore compaction. 278 * <p> 279 * Opens a scanner across specified StoreFiles/MemStoreSegments. 280 * @param store who we scan 281 * @param scanners ancillary scanners 282 * @param smallestReadPoint the readPoint that we should use for tracking versions 283 */ 284 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 285 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 286 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 287 } 288 289 /** 290 * Used for compactions that drop deletes from a limited range of rows. 291 * <p> 292 * Opens a scanner across specified StoreFiles. 293 * @param store who we scan 294 * @param scanners ancillary scanners 295 * @param smallestReadPoint the readPoint that we should use for tracking versions 296 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 297 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 298 */ 299 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 300 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 301 throws IOException { 302 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 303 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 304 } 305 306 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 307 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 308 byte[] dropDeletesToRow) throws IOException { 309 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 310 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 311 assert scanType != ScanType.USER_SCAN; 312 matcher = 313 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 314 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 315 316 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 317 scanners = selectScannersFrom(store, scanners); 318 319 // Seek all scanners to the initial key 320 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 321 addCurrentScanners(scanners); 322 // Combine all seeked scanners with a heap 323 resetKVHeap(scanners, comparator); 324 } 325 326 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 327 throws IOException { 328 // Seek all scanners to the initial key 329 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 330 addCurrentScanners(scanners); 331 resetKVHeap(scanners, comparator); 332 } 333 334 // For mob compaction only as we do not have a Store instance when doing mob compaction. 335 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 336 List<? extends KeyValueScanner> scanners) throws IOException { 337 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 338 assert scanType != ScanType.USER_SCAN; 339 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 340 oldestUnexpiredTS, now, null, null, null); 341 seekAllScanner(scanInfo, scanners); 342 } 343 344 // Used to instantiate a scanner for user scan in test 345 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 346 List<? extends KeyValueScanner> scanners) throws IOException { 347 // 0 is passed as readpoint because the test bypasses Store 348 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 349 ScanType.USER_SCAN); 350 this.matcher = 351 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 352 seekAllScanner(scanInfo, scanners); 353 } 354 355 // Used to instantiate a scanner for user scan in test 356 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 357 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 358 // 0 is passed as readpoint because the test bypasses Store 359 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 360 scanType); 361 if (scanType == ScanType.USER_SCAN) { 362 this.matcher = 363 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 364 } else { 365 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 366 HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 367 } 368 seekAllScanner(scanInfo, scanners); 369 } 370 371 // Used to instantiate a scanner for compaction in test 372 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 373 List<? extends KeyValueScanner> scanners) throws IOException { 374 // 0 is passed as readpoint because the test bypasses Store 375 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION, 376 scanInfo, 0, 0L, false, scanType); 377 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 378 HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 379 seekAllScanner(scanInfo, scanners); 380 } 381 382 boolean isScanUsePread() { 383 return this.scanUsePread; 384 } 385 386 /** 387 * Seek the specified scanners with the given key nn * @param isLazy true if using lazy seek 388 * @param isParallelSeek true if using parallel seek n 389 */ 390 protected void seekScanners(List<? extends KeyValueScanner> scanners, Cell seekKey, 391 boolean isLazy, boolean isParallelSeek) throws IOException { 392 // Seek all scanners to the start of the Row (or if the exact matching row 393 // key does not exist, then to the start of the next matching Row). 394 // Always check bloom filter to optimize the top row seek for delete 395 // family marker. 396 if (isLazy) { 397 for (KeyValueScanner scanner : scanners) { 398 scanner.requestSeek(seekKey, false, true); 399 } 400 } else { 401 if (!isParallelSeek) { 402 long totalScannersSoughtBytes = 0; 403 for (KeyValueScanner scanner : scanners) { 404 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 405 throw new RowTooBigException( 406 "Max row size allowed: " + maxRowSize + ", but row is bigger than that"); 407 } 408 scanner.seek(seekKey); 409 Cell c = scanner.peek(); 410 if (c != null) { 411 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 412 } 413 } 414 } else { 415 parallelSeek(scanners, seekKey); 416 } 417 } 418 } 419 420 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 421 throws IOException { 422 // Combine all seeked scanners with a heap 423 heap = newKVHeap(scanners, comparator); 424 } 425 426 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 427 CellComparator comparator) throws IOException { 428 return new KeyValueHeap(scanners, comparator); 429 } 430 431 /** 432 * Filters the given list of scanners using Bloom filter, time range, and TTL. 433 * <p> 434 * Will be overridden by testcase so declared as protected. 435 */ 436 protected List<KeyValueScanner> selectScannersFrom(HStore store, 437 List<? extends KeyValueScanner> allScanners) { 438 boolean memOnly; 439 boolean filesOnly; 440 if (scan instanceof InternalScan) { 441 InternalScan iscan = (InternalScan) scan; 442 memOnly = iscan.isCheckOnlyMemStore(); 443 filesOnly = iscan.isCheckOnlyStoreFiles(); 444 } else { 445 memOnly = false; 446 filesOnly = false; 447 } 448 449 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 450 451 // We can only exclude store files based on TTL if minVersions is set to 0. 452 // Otherwise, we might have to return KVs that have technically expired. 453 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 454 455 // include only those scan files which pass all filters 456 for (KeyValueScanner kvs : allScanners) { 457 boolean isFile = kvs.isFileScanner(); 458 if ((!isFile && filesOnly) || (isFile && memOnly)) { 459 kvs.close(); 460 continue; 461 } 462 463 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 464 scanners.add(kvs); 465 } else { 466 kvs.close(); 467 } 468 } 469 return scanners; 470 } 471 472 @Override 473 public Cell peek() { 474 return heap != null ? heap.peek() : null; 475 } 476 477 @Override 478 public KeyValue next() { 479 // throw runtime exception perhaps? 480 throw new RuntimeException("Never call StoreScanner.next()"); 481 } 482 483 @Override 484 public void close() { 485 close(true); 486 } 487 488 private void close(boolean withDelayedScannersClose) { 489 closeLock.lock(); 490 // If the closeLock is acquired then any subsequent updateReaders() 491 // call is ignored. 492 try { 493 if (this.closing) { 494 return; 495 } 496 if (withDelayedScannersClose) { 497 this.closing = true; 498 } 499 // For mob compaction, we do not have a store. 500 if (this.store != null) { 501 this.store.deleteChangedReaderObserver(this); 502 } 503 if (withDelayedScannersClose) { 504 clearAndClose(scannersForDelayedClose); 505 clearAndClose(memStoreScannersAfterFlush); 506 clearAndClose(flushedstoreFileScanners); 507 if (this.heap != null) { 508 this.heap.close(); 509 this.currentScanners.clear(); 510 this.heap = null; // CLOSED! 511 } 512 } else { 513 if (this.heap != null) { 514 this.scannersForDelayedClose.add(this.heap); 515 this.currentScanners.clear(); 516 this.heap = null; 517 } 518 } 519 } finally { 520 closeLock.unlock(); 521 } 522 } 523 524 @Override 525 public boolean seek(Cell key) throws IOException { 526 if (checkFlushed()) { 527 reopenAfterFlush(); 528 } 529 return this.heap.seek(key); 530 } 531 532 /** 533 * Get the next row of values from this Store. nn * @return true if there are more rows, false if 534 * scanner is done 535 */ 536 @Override 537 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { 538 if (scannerContext == null) { 539 throw new IllegalArgumentException("Scanner context cannot be null"); 540 } 541 if (checkFlushed() && reopenAfterFlush()) { 542 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 543 } 544 545 // if the heap was left null, then the scanners had previously run out anyways, close and 546 // return. 547 if (this.heap == null) { 548 // By this time partial close should happened because already heap is null 549 close(false);// Do all cleanup except heap.close() 550 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 551 } 552 553 Cell cell = this.heap.peek(); 554 if (cell == null) { 555 close(false);// Do all cleanup except heap.close() 556 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 557 } 558 559 // only call setRow if the row changes; avoids confusing the query matcher 560 // if scanning intra-row 561 562 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 563 // rows. Else it is possible we are still traversing the same row so we must perform the row 564 // comparison. 565 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 566 this.countPerRow = 0; 567 matcher.setToNewRow(cell); 568 } 569 570 // Clear progress away unless invoker has indicated it should be kept. 571 if (!scannerContext.getKeepProgress()) { 572 scannerContext.clearProgress(); 573 } 574 575 int count = 0; 576 long totalBytesRead = 0; 577 boolean onlyFromMemstore = matcher.isUserScan(); 578 try { 579 LOOP: do { 580 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 581 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 582 // in 583 // the shipped method below. 584 if ( 585 kvsScanned % cellsPerHeartbeatCheck == 0 586 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) 587 ) { 588 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 589 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 590 } 591 } 592 // Do object compare - we set prevKV from the same heap. 593 if (prevCell != cell) { 594 ++kvsScanned; 595 } 596 checkScanOrder(prevCell, cell, comparator); 597 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 598 bytesRead += cellSize; 599 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 600 // return immediately if we want to switch from pread to stream. We need this because we 601 // can 602 // only switch in the shipped method, if user use a filter to filter out everything and 603 // rpc 604 // timeout is very large then the shipped method will never be called until the whole scan 605 // is finished, but at that time we have already scan all the data... 606 // See HBASE-20457 for more details. 607 // And there is still a scenario that can not be handled. If we have a very large row, 608 // which 609 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 610 // here, we still need to scan all the qualifiers before returning... 611 scannerContext.returnImmediately(); 612 } 613 prevCell = cell; 614 scannerContext.setLastPeekedCell(cell); 615 topChanged = false; 616 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 617 switch (qcode) { 618 case INCLUDE: 619 case INCLUDE_AND_SEEK_NEXT_ROW: 620 case INCLUDE_AND_SEEK_NEXT_COL: 621 Filter f = matcher.getFilter(); 622 if (f != null) { 623 cell = f.transformCell(cell); 624 } 625 this.countPerRow++; 626 627 // add to results only if we have skipped #storeOffset kvs 628 // also update metric accordingly 629 if (this.countPerRow > storeOffset) { 630 outResult.add(cell); 631 632 // Update local tracking information 633 count++; 634 totalBytesRead += cellSize; 635 636 /** 637 * Increment the metric if all the cells are from memstore. If not we will account it 638 * for mixed reads 639 */ 640 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 641 // Update the progress of the scanner context 642 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 643 scannerContext.incrementBatchProgress(1); 644 645 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 646 String message = "Max row size allowed: " + maxRowSize 647 + ", but the row is bigger than that, the row info: " 648 + CellUtil.toString(cell, false) + ", already have process row cells = " 649 + outResult.size() + ", it belong to region = " 650 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 651 LOG.warn(message); 652 throw new RowTooBigException(message); 653 } 654 655 if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) { 656 // do what SEEK_NEXT_ROW does. 657 if (!matcher.moreRowsMayExistAfter(cell)) { 658 close(false);// Do all cleanup except heap.close() 659 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 660 } 661 matcher.clearCurrentRow(); 662 seekToNextRow(cell); 663 break LOOP; 664 } 665 } 666 667 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 668 if (!matcher.moreRowsMayExistAfter(cell)) { 669 close(false);// Do all cleanup except heap.close() 670 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 671 } 672 matcher.clearCurrentRow(); 673 seekOrSkipToNextRow(cell); 674 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 675 seekOrSkipToNextColumn(cell); 676 } else { 677 this.heap.next(); 678 } 679 680 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 681 break LOOP; 682 } 683 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 684 break LOOP; 685 } 686 continue; 687 688 case DONE: 689 // Optimization for Gets! If DONE, no more to get on this row, early exit! 690 if (get) { 691 // Then no more to this row... exit. 692 close(false);// Do all cleanup except heap.close() 693 // update metric 694 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 695 } 696 matcher.clearCurrentRow(); 697 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 698 699 case DONE_SCAN: 700 close(false);// Do all cleanup except heap.close() 701 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 702 703 case SEEK_NEXT_ROW: 704 // This is just a relatively simple end of scan fix, to short-cut end 705 // us if there is an endKey in the scan. 706 if (!matcher.moreRowsMayExistAfter(cell)) { 707 close(false);// Do all cleanup except heap.close() 708 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 709 } 710 matcher.clearCurrentRow(); 711 seekOrSkipToNextRow(cell); 712 NextState stateAfterSeekNextRow = needToReturn(outResult); 713 if (stateAfterSeekNextRow != null) { 714 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 715 } 716 break; 717 718 case SEEK_NEXT_COL: 719 seekOrSkipToNextColumn(cell); 720 NextState stateAfterSeekNextColumn = needToReturn(outResult); 721 if (stateAfterSeekNextColumn != null) { 722 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 723 } 724 break; 725 726 case SKIP: 727 this.heap.next(); 728 break; 729 730 case SEEK_NEXT_USING_HINT: 731 Cell nextKV = matcher.getNextKeyHint(cell); 732 if (nextKV != null) { 733 int difference = comparator.compare(nextKV, cell); 734 if ( 735 ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) 736 ) { 737 seekAsDirection(nextKV); 738 NextState stateAfterSeekByHint = needToReturn(outResult); 739 if (stateAfterSeekByHint != null) { 740 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 741 } 742 break; 743 } 744 } 745 heap.next(); 746 break; 747 748 default: 749 throw new RuntimeException("UNEXPECTED"); 750 } 751 } while ((cell = this.heap.peek()) != null); 752 753 if (count > 0) { 754 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 755 } 756 757 // No more keys 758 close(false);// Do all cleanup except heap.close() 759 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 760 } finally { 761 // increment only if we have some result 762 if (count > 0 && matcher.isUserScan()) { 763 // if true increment memstore metrics, if not the mixed one 764 updateMetricsStore(onlyFromMemstore); 765 } 766 } 767 } 768 769 private void updateMetricsStore(boolean memstoreRead) { 770 if (store != null) { 771 store.updateMetricsStore(memstoreRead); 772 } else { 773 // for testing. 774 if (memstoreRead) { 775 memstoreOnlyReads++; 776 } else { 777 mixedReads++; 778 } 779 } 780 } 781 782 /** 783 * If the top cell won't be flushed into disk, the new top cell may be changed after 784 * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the 785 * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell 786 * is changed, we should return the current cells. Otherwise, we may return the cells across 787 * different rows. 788 * @param outResult the cells which are visible for user scan 789 * @return null is the top cell doesn't change. Otherwise, the NextState to return 790 */ 791 private NextState needToReturn(List<Cell> outResult) { 792 if (!outResult.isEmpty() && topChanged) { 793 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 794 } 795 return null; 796 } 797 798 private void seekOrSkipToNextRow(Cell cell) throws IOException { 799 // If it is a Get Scan, then we know that we are done with this row; there are no more 800 // rows beyond the current one: don't try to optimize. 801 if (!get) { 802 if (trySkipToNextRow(cell)) { 803 return; 804 } 805 } 806 seekToNextRow(cell); 807 } 808 809 private void seekOrSkipToNextColumn(Cell cell) throws IOException { 810 if (!trySkipToNextColumn(cell)) { 811 seekAsDirection(matcher.getKeyForNextColumn(cell)); 812 } 813 } 814 815 /** 816 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 817 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an 818 * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to 819 * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the 820 * current, loaded block). It does this by looking at the next indexed key of the current HFile. 821 * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible 822 * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work 823 * with the current Cell but compare as though it were a seek key; see down in 824 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK, 825 * otherwise we just SKIP to the next requested cell. 826 * <p> 827 * Other notes: 828 * <ul> 829 * <li>Rows can straddle block boundaries</li> 830 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 831 * different block than column C1 at T2)</li> 832 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few 833 * SKIPs...</li> 834 * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells, 835 * especially if we know we need to go to the next block.</li> 836 * </ul> 837 * <p> 838 * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll 839 * likely end up seeking to the next block (or past the next block) to get our next column. 840 * Example: 841 * 842 * <pre> 843 * | BLOCK 1 | BLOCK 2 | 844 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 845 * ^ ^ 846 * | | 847 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 848 * 849 * 850 * | BLOCK 1 | BLOCK 2 | 851 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 852 * ^ ^ 853 * | | 854 * Next Index Key SEEK_NEXT_COL 855 * </pre> 856 * 857 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 858 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 859 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 860 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 861 * where the SEEK will not land us in the next block, it is very likely better to issues a series 862 * of SKIPs. 863 * @param cell current cell 864 * @return true means skip to next row, false means not 865 */ 866 protected boolean trySkipToNextRow(Cell cell) throws IOException { 867 Cell nextCell = null; 868 // used to guard against a changed next indexed key by doing a identity comparison 869 // when the identity changes we need to compare the bytes again 870 Cell previousIndexedKey = null; 871 do { 872 Cell nextIndexedKey = getNextIndexedKey(); 873 if ( 874 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 875 && (nextIndexedKey == previousIndexedKey 876 || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) 877 ) { 878 this.heap.next(); 879 ++kvsScanned; 880 previousIndexedKey = nextIndexedKey; 881 } else { 882 return false; 883 } 884 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 885 return true; 886 } 887 888 /** 889 * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)} 890 * @param cell current cell 891 * @return true means skip to next column, false means not 892 */ 893 protected boolean trySkipToNextColumn(Cell cell) throws IOException { 894 Cell nextCell = null; 895 // used to guard against a changed next indexed key by doing a identity comparison 896 // when the identity changes we need to compare the bytes again 897 Cell previousIndexedKey = null; 898 do { 899 Cell nextIndexedKey = getNextIndexedKey(); 900 if ( 901 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 902 && (nextIndexedKey == previousIndexedKey 903 || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) 904 ) { 905 this.heap.next(); 906 ++kvsScanned; 907 previousIndexedKey = nextIndexedKey; 908 } else { 909 return false; 910 } 911 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 912 // We need this check because it may happen that the new scanner that we get 913 // during heap.next() is requiring reseek due of fake KV previously generated for 914 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 915 if (useRowColBloom && nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) { 916 return false; 917 } 918 return true; 919 } 920 921 @Override 922 public long getReadPoint() { 923 return this.readPt; 924 } 925 926 private static void clearAndClose(List<KeyValueScanner> scanners) { 927 if (scanners == null) { 928 return; 929 } 930 for (KeyValueScanner s : scanners) { 931 s.close(); 932 } 933 scanners.clear(); 934 } 935 936 // Implementation of ChangedReadersObserver 937 @Override 938 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 939 throws IOException { 940 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 941 return; 942 } 943 boolean updateReaders = false; 944 flushLock.lock(); 945 try { 946 if (!closeLock.tryLock()) { 947 // The reason for doing this is that when the current store scanner does not retrieve 948 // any new cells, then the scanner is considered to be done. The heap of this scanner 949 // is not closed till the shipped() call is completed. Hence in that case if at all 950 // the partial close (close (false)) has been called before updateReaders(), there is no 951 // need for the updateReaders() to happen. 952 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 953 // no lock acquired. 954 clearAndClose(memStoreScanners); 955 return; 956 } 957 // lock acquired 958 updateReaders = true; 959 if (this.closing) { 960 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 961 clearAndClose(memStoreScanners); 962 return; 963 } 964 flushed = true; 965 final boolean isCompaction = false; 966 boolean usePread = get || scanUsePread; 967 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 968 // calls next(). So its better we create scanners here rather than next() call. Ensure 969 // these scanners are properly closed() whether or not the scan is completed successfully 970 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 971 // store files. In case of stream scanners this eager creation does not induce performance 972 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 973 List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread, 974 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); 975 flushedstoreFileScanners.addAll(scanners); 976 if (!CollectionUtils.isEmpty(memStoreScanners)) { 977 clearAndClose(memStoreScannersAfterFlush); 978 memStoreScannersAfterFlush.addAll(memStoreScanners); 979 } 980 } finally { 981 flushLock.unlock(); 982 if (updateReaders) { 983 closeLock.unlock(); 984 } 985 } 986 // Let the next() call handle re-creating and seeking 987 } 988 989 /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */ 990 protected final boolean reopenAfterFlush() throws IOException { 991 // here we can make sure that we have a Store instance so no null check on store. 992 Cell lastTop = heap.peek(); 993 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 994 // scanners? We did so in the constructor and we could have done it now by storing the scan 995 // object from the constructor 996 List<KeyValueScanner> scanners; 997 flushLock.lock(); 998 try { 999 List<KeyValueScanner> allScanners = 1000 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1001 allScanners.addAll(flushedstoreFileScanners); 1002 allScanners.addAll(memStoreScannersAfterFlush); 1003 scanners = selectScannersFrom(store, allScanners); 1004 // Clear the current set of flushed store files scanners so that they don't get added again 1005 flushedstoreFileScanners.clear(); 1006 memStoreScannersAfterFlush.clear(); 1007 } finally { 1008 flushLock.unlock(); 1009 } 1010 1011 // Seek the new scanners to the last key 1012 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1013 // remove the older memstore scanner 1014 for (int i = currentScanners.size() - 1; i >= 0; i--) { 1015 if (!currentScanners.get(i).isFileScanner()) { 1016 scannersForDelayedClose.add(currentScanners.remove(i)); 1017 } else { 1018 // we add the memstore scanner to the end of currentScanners 1019 break; 1020 } 1021 } 1022 // add the newly created scanners on the flushed files and the current active memstore scanner 1023 addCurrentScanners(scanners); 1024 // Combine all seeked scanners with a heap 1025 resetKVHeap(this.currentScanners, store.getComparator()); 1026 resetQueryMatcher(lastTop); 1027 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1028 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() 1029 + ",and after = " + heap.peek()); 1030 topChanged = true; 1031 } else { 1032 topChanged = false; 1033 } 1034 return topChanged; 1035 } 1036 1037 private void resetQueryMatcher(Cell lastTopKey) { 1038 // Reset the state of the Query Matcher and set to top row. 1039 // Only reset and call setRow if the row changes; avoids confusing the 1040 // query matcher if scanning intra-row. 1041 Cell cell = heap.peek(); 1042 if (cell == null) { 1043 cell = lastTopKey; 1044 } 1045 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1046 this.countPerRow = 0; 1047 // The setToNewRow will call reset internally 1048 matcher.setToNewRow(cell); 1049 } 1050 } 1051 1052 /** 1053 * Check whether scan as expected order nnnn 1054 */ 1055 protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) 1056 throws IOException { 1057 // Check that the heap gives us KVs in an increasing order. 1058 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 1059 : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1060 } 1061 1062 protected boolean seekToNextRow(Cell c) throws IOException { 1063 return reseek(PrivateCellUtil.createLastOnRow(c)); 1064 } 1065 1066 /** 1067 * Do a reseek in a normal StoreScanner(scan forward) n * @return true if scanner has values left, 1068 * false if end of scanner n 1069 */ 1070 protected boolean seekAsDirection(Cell kv) throws IOException { 1071 return reseek(kv); 1072 } 1073 1074 @Override 1075 public boolean reseek(Cell kv) throws IOException { 1076 if (checkFlushed()) { 1077 reopenAfterFlush(); 1078 } 1079 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1080 return heap.requestSeek(kv, true, useRowColBloom); 1081 } 1082 return heap.reseek(kv); 1083 } 1084 1085 void trySwitchToStreamRead() { 1086 if ( 1087 readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null 1088 || bytesRead < preadMaxBytes 1089 ) { 1090 return; 1091 } 1092 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1093 this.store.getColumnFamilyName()); 1094 scanUsePread = false; 1095 Cell lastTop = heap.peek(); 1096 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1097 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1098 for (KeyValueScanner kvs : currentScanners) { 1099 if (!kvs.isFileScanner()) { 1100 // collect memstorescanners here 1101 memstoreScanners.add(kvs); 1102 } else { 1103 scannersToClose.add(kvs); 1104 } 1105 } 1106 List<KeyValueScanner> fileScanners = null; 1107 List<KeyValueScanner> newCurrentScanners; 1108 KeyValueHeap newHeap; 1109 try { 1110 // We must have a store instance here so no null check 1111 // recreate the scanners on the current file scanners 1112 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, 1113 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 1114 readPt, false); 1115 if (fileScanners == null) { 1116 return; 1117 } 1118 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1119 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1120 newCurrentScanners.addAll(fileScanners); 1121 newCurrentScanners.addAll(memstoreScanners); 1122 newHeap = newKVHeap(newCurrentScanners, comparator); 1123 } catch (Exception e) { 1124 LOG.warn("failed to switch to stream read", e); 1125 if (fileScanners != null) { 1126 fileScanners.forEach(KeyValueScanner::close); 1127 } 1128 return; 1129 } 1130 currentScanners.clear(); 1131 addCurrentScanners(newCurrentScanners); 1132 this.heap = newHeap; 1133 resetQueryMatcher(lastTop); 1134 scannersToClose.forEach(KeyValueScanner::close); 1135 } 1136 1137 protected final boolean checkFlushed() { 1138 // check the var without any lock. Suppose even if we see the old 1139 // value here still it is ok to continue because we will not be resetting 1140 // the heap but will continue with the referenced memstore's snapshot. For compactions 1141 // any way we don't need the updateReaders at all to happen as we still continue with 1142 // the older files 1143 if (flushed) { 1144 // If there is a flush and the current scan is notified on the flush ensure that the 1145 // scan's heap gets reset and we do a seek on the newly flushed file. 1146 if (this.closing) { 1147 return false; 1148 } 1149 // reset the flag 1150 flushed = false; 1151 return true; 1152 } 1153 return false; 1154 } 1155 1156 /** 1157 * Seek storefiles in parallel to optimize IO latency as much as possible 1158 * @param scanners the list {@link KeyValueScanner}s to be read from 1159 * @param kv the KeyValue on which the operation is being requested n 1160 */ 1161 private void parallelSeek(final List<? extends KeyValueScanner> scanners, final Cell kv) 1162 throws IOException { 1163 if (scanners.isEmpty()) return; 1164 int storeFileScannerCount = scanners.size(); 1165 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1166 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1167 for (KeyValueScanner scanner : scanners) { 1168 if (scanner instanceof StoreFileScanner) { 1169 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch); 1170 executor.submit(seekHandler); 1171 handlers.add(seekHandler); 1172 } else { 1173 scanner.seek(kv); 1174 latch.countDown(); 1175 } 1176 } 1177 1178 try { 1179 latch.await(); 1180 } catch (InterruptedException ie) { 1181 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 1182 } 1183 1184 for (ParallelSeekHandler handler : handlers) { 1185 if (handler.getErr() != null) { 1186 throw new IOException(handler.getErr()); 1187 } 1188 } 1189 } 1190 1191 /** 1192 * Used in testing. 1193 * @return all scanners in no particular order 1194 */ 1195 List<KeyValueScanner> getAllScannersForTesting() { 1196 List<KeyValueScanner> allScanners = new ArrayList<>(); 1197 KeyValueScanner current = heap.getCurrentForTesting(); 1198 if (current != null) allScanners.add(current); 1199 for (KeyValueScanner scanner : heap.getHeap()) 1200 allScanners.add(scanner); 1201 return allScanners; 1202 } 1203 1204 static void enableLazySeekGlobally(boolean enable) { 1205 lazySeekEnabledGlobally = enable; 1206 } 1207 1208 /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */ 1209 public long getEstimatedNumberOfKvsScanned() { 1210 return this.kvsScanned; 1211 } 1212 1213 @Override 1214 public Cell getNextIndexedKey() { 1215 return this.heap.getNextIndexedKey(); 1216 } 1217 1218 @Override 1219 public void shipped() throws IOException { 1220 if (prevCell != null) { 1221 // Do the copy here so that in case the prevCell ref is pointing to the previous 1222 // blocks we can safely release those blocks. 1223 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1224 // fetched from HDFS. Copying this would ensure that we let go the references to these 1225 // blocks so that they can be GCed safely(in case of bucket cache) 1226 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1227 } 1228 matcher.beforeShipped(); 1229 // There wont be further fetch of Cells from these scanners. Just close. 1230 clearAndClose(scannersForDelayedClose); 1231 if (this.heap != null) { 1232 this.heap.shipped(); 1233 // When switching from pread to stream, we will open a new scanner for each store file, but 1234 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1235 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1236 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1237 // method, so we here will also open new scanners and close old scanners in shipped method. 1238 // See HBASE-18055 for more details. 1239 trySwitchToStreamRead(); 1240 } 1241 } 1242}