001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.List; 028import java.util.Optional; 029import java.util.PriorityQueue; 030import java.util.concurrent.atomic.LongAdder; 031 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.PrivateCellUtil; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.io.TimeRange; 042import org.apache.hadoop.hbase.io.hfile.HFileScanner; 043import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 044 045/** 046 * KeyValueScanner adaptor over the Reader. It also provides hooks into 047 * bloom filter things. 048 */ 049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 050@InterfaceStability.Evolving 051public class StoreFileScanner implements KeyValueScanner { 052 // the reader it comes from: 053 private final StoreFileReader reader; 054 private final HFileScanner hfs; 055 private Cell cur = null; 056 private boolean closed = false; 057 058 private boolean realSeekDone; 059 private boolean delayedReseek; 060 private Cell delayedSeekKV; 061 062 private final boolean enforceMVCC; 063 private final boolean hasMVCCInfo; 064 // A flag represents whether could stop skipping KeyValues for MVCC 065 // if have encountered the next row. Only used for reversed scan 066 private boolean stopSkippingKVsIfNextRow = false; 067 068 private static LongAdder seekCount; 069 070 private final boolean canOptimizeForNonNullColumn; 071 072 private final long readPt; 073 074 // Order of this scanner relative to other scanners when duplicate key-value is found. 075 // Higher values means scanner has newer data. 076 private final long scannerOrder; 077 078 /** 079 * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} 080 * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. 081 * @param readPt MVCC value to use to filter out the updates newer than this scanner. 082 * @param hasMVCC Set to true if underlying store file reader has MVCC info. 083 * @param scannerOrder Order of the scanner relative to other scanners. See 084 * {@link KeyValueScanner#getScannerOrder()}. 085 * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, 086 * otherwise {@code false}. This is a hint for optimization. 087 */ 088 public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC, 089 boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { 090 this.readPt = readPt; 091 this.reader = reader; 092 this.hfs = hfs; 093 this.enforceMVCC = useMVCC; 094 this.hasMVCCInfo = hasMVCC; 095 this.scannerOrder = scannerOrder; 096 this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn; 097 this.reader.incrementRefCount(); 098 } 099 100 boolean isPrimaryReplica() { 101 return reader.isPrimaryReplicaReader(); 102 } 103 104 /** 105 * Return an array of scanners corresponding to the given set of store files. 106 */ 107 public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, 108 boolean cacheBlocks, boolean usePread, long readPt) throws IOException { 109 return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt); 110 } 111 112 /** 113 * Return an array of scanners corresponding to the given set of store files. 114 */ 115 public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, 116 boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind, 117 long readPt) throws IOException { 118 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null, 119 readPt); 120 } 121 122 /** 123 * Return an array of scanners corresponding to the given set of store files, And set the 124 * ScanQueryMatcher for each store file scanner for further optimization 125 */ 126 public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, 127 boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, 128 ScanQueryMatcher matcher, long readPt) throws IOException { 129 if (files.isEmpty()) { 130 return Collections.emptyList(); 131 } 132 List<StoreFileScanner> scanners = new ArrayList<>(files.size()); 133 boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false; 134 PriorityQueue<HStoreFile> sortedFiles = 135 new PriorityQueue<>(files.size(), StoreFileComparators.SEQ_ID); 136 for (HStoreFile file : files) { 137 // The sort function needs metadata so we need to open reader first before sorting the list. 138 file.initReader(); 139 sortedFiles.add(file); 140 } 141 boolean succ = false; 142 try { 143 for (int i = 0, n = files.size(); i < n; i++) { 144 HStoreFile sf = sortedFiles.remove(); 145 StoreFileScanner scanner; 146 if (usePread) { 147 scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn); 148 } else { 149 scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i, 150 canOptimizeForNonNullColumn); 151 } 152 scanners.add(scanner); 153 } 154 succ = true; 155 } finally { 156 if (!succ) { 157 for (StoreFileScanner scanner : scanners) { 158 scanner.close(); 159 } 160 } 161 } 162 return scanners; 163 } 164 165 /** 166 * Get scanners for compaction. We will create a separated reader for each store file to avoid 167 * contention with normal read request. 168 */ 169 public static List<StoreFileScanner> getScannersForCompaction(Collection<HStoreFile> files, 170 boolean canUseDropBehind, long readPt) throws IOException { 171 List<StoreFileScanner> scanners = new ArrayList<>(files.size()); 172 List<HStoreFile> sortedFiles = new ArrayList<>(files); 173 Collections.sort(sortedFiles, StoreFileComparators.SEQ_ID); 174 boolean succ = false; 175 try { 176 for (int i = 0, n = sortedFiles.size(); i < n; i++) { 177 scanners.add( 178 sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false)); 179 } 180 succ = true; 181 } finally { 182 if (!succ) { 183 for (StoreFileScanner scanner : scanners) { 184 scanner.close(); 185 } 186 } 187 } 188 return scanners; 189 } 190 191 @Override 192 public String toString() { 193 return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]"; 194 } 195 196 @Override 197 public Cell peek() { 198 return cur; 199 } 200 201 @Override 202 public Cell next() throws IOException { 203 Cell retKey = cur; 204 205 try { 206 // only seek if we aren't at the end. cur == null implies 'end'. 207 if (cur != null) { 208 hfs.next(); 209 setCurrentCell(hfs.getCell()); 210 if (hasMVCCInfo || this.reader.isBulkLoaded()) { 211 skipKVsNewerThanReadpoint(); 212 } 213 } 214 } catch (FileNotFoundException e) { 215 throw e; 216 } catch(IOException e) { 217 throw new IOException("Could not iterate " + this, e); 218 } 219 return retKey; 220 } 221 222 @Override 223 public boolean seek(Cell key) throws IOException { 224 if (seekCount != null) seekCount.increment(); 225 226 try { 227 try { 228 if(!seekAtOrAfter(hfs, key)) { 229 this.cur = null; 230 return false; 231 } 232 233 setCurrentCell(hfs.getCell()); 234 235 if (!hasMVCCInfo && this.reader.isBulkLoaded()) { 236 return skipKVsNewerThanReadpoint(); 237 } else { 238 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); 239 } 240 } finally { 241 realSeekDone = true; 242 } 243 } catch (FileNotFoundException e) { 244 throw e; 245 } catch (IOException ioe) { 246 throw new IOException("Could not seek " + this + " to key " + key, ioe); 247 } 248 } 249 250 @Override 251 public boolean reseek(Cell key) throws IOException { 252 if (seekCount != null) seekCount.increment(); 253 254 try { 255 try { 256 if (!reseekAtOrAfter(hfs, key)) { 257 this.cur = null; 258 return false; 259 } 260 setCurrentCell(hfs.getCell()); 261 262 if (!hasMVCCInfo && this.reader.isBulkLoaded()) { 263 return skipKVsNewerThanReadpoint(); 264 } else { 265 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); 266 } 267 } finally { 268 realSeekDone = true; 269 } 270 } catch (FileNotFoundException e) { 271 throw e; 272 } catch (IOException ioe) { 273 throw new IOException("Could not reseek " + this + " to key " + key, 274 ioe); 275 } 276 } 277 278 protected void setCurrentCell(Cell newVal) throws IOException { 279 this.cur = newVal; 280 if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) { 281 PrivateCellUtil.setSequenceId(cur, this.reader.getSequenceID()); 282 } 283 } 284 285 protected boolean skipKVsNewerThanReadpoint() throws IOException { 286 // We want to ignore all key-values that are newer than our current 287 // readPoint 288 Cell startKV = cur; 289 while(enforceMVCC 290 && cur != null 291 && (cur.getSequenceId() > readPt)) { 292 boolean hasNext = hfs.next(); 293 setCurrentCell(hfs.getCell()); 294 if (hasNext && this.stopSkippingKVsIfNextRow 295 && getComparator().compareRows(cur, startKV) > 0) { 296 return false; 297 } 298 } 299 300 if (cur == null) { 301 return false; 302 } 303 304 return true; 305 } 306 307 @Override 308 public void close() { 309 if (closed) return; 310 cur = null; 311 this.hfs.close(); 312 if (this.reader != null) { 313 this.reader.readCompleted(); 314 } 315 closed = true; 316 } 317 318 /** 319 * 320 * @param s 321 * @param k 322 * @return false if not found or if k is after the end. 323 * @throws IOException 324 */ 325 public static boolean seekAtOrAfter(HFileScanner s, Cell k) 326 throws IOException { 327 int result = s.seekTo(k); 328 if(result < 0) { 329 if (result == HConstants.INDEX_KEY_MAGIC) { 330 // using faked key 331 return true; 332 } 333 // Passed KV is smaller than first KV in file, work from start of file 334 return s.seekTo(); 335 } else if(result > 0) { 336 // Passed KV is larger than current KV in file, if there is a next 337 // it is the "after", if not then this scanner is done. 338 return s.next(); 339 } 340 // Seeked to the exact key 341 return true; 342 } 343 344 static boolean reseekAtOrAfter(HFileScanner s, Cell k) 345 throws IOException { 346 //This function is similar to seekAtOrAfter function 347 int result = s.reseekTo(k); 348 if (result <= 0) { 349 if (result == HConstants.INDEX_KEY_MAGIC) { 350 // using faked key 351 return true; 352 } 353 // If up to now scanner is not seeked yet, this means passed KV is smaller 354 // than first KV in file, and it is the first time we seek on this file. 355 // So we also need to work from the start of file. 356 if (!s.isSeeked()) { 357 return s.seekTo(); 358 } 359 return true; 360 } 361 // passed KV is larger than current KV in file, if there is a next 362 // it is after, if not then this scanner is done. 363 return s.next(); 364 } 365 366 /** 367 * @see KeyValueScanner#getScannerOrder() 368 */ 369 @Override 370 public long getScannerOrder() { 371 return scannerOrder; 372 } 373 374 /** 375 * Pretend we have done a seek but don't do it yet, if possible. The hope is 376 * that we find requested columns in more recent files and won't have to seek 377 * in older files. Creates a fake key/value with the given row/column and the 378 * highest (most recent) possible timestamp we might get from this file. When 379 * users of such "lazy scanner" need to know the next KV precisely (e.g. when 380 * this scanner is at the top of the heap), they run {@link #enforceSeek()}. 381 * <p> 382 * Note that this function does guarantee that the current KV of this scanner 383 * will be advanced to at least the given KV. Because of this, it does have 384 * to do a real seek in cases when the seek timestamp is older than the 385 * highest timestamp of the file, e.g. when we are trying to seek to the next 386 * row/column and use OLDEST_TIMESTAMP in the seek key. 387 */ 388 @Override 389 public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) 390 throws IOException { 391 if (kv.getFamilyLength() == 0) { 392 useBloom = false; 393 } 394 395 boolean haveToSeek = true; 396 if (useBloom) { 397 // check ROWCOL Bloom filter first. 398 if (reader.getBloomFilterType() == BloomType.ROWCOL) { 399 haveToSeek = reader.passesGeneralRowColBloomFilter(kv); 400 } else if (canOptimizeForNonNullColumn 401 && ((PrivateCellUtil.isDeleteFamily(kv) 402 || PrivateCellUtil.isDeleteFamilyVersion(kv)))) { 403 // if there is no such delete family kv in the store file, 404 // then no need to seek. 405 haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(), 406 kv.getRowLength()); 407 } 408 } 409 410 delayedReseek = forward; 411 delayedSeekKV = kv; 412 413 if (haveToSeek) { 414 // This row/column might be in this store file (or we did not use the 415 // Bloom filter), so we still need to seek. 416 realSeekDone = false; 417 long maxTimestampInFile = reader.getMaxTimestamp(); 418 long seekTimestamp = kv.getTimestamp(); 419 if (seekTimestamp > maxTimestampInFile) { 420 // Create a fake key that is not greater than the real next key. 421 // (Lower timestamps correspond to higher KVs.) 422 // To understand this better, consider that we are asked to seek to 423 // a higher timestamp than the max timestamp in this file. We know that 424 // the next point when we have to consider this file again is when we 425 // pass the max timestamp of this file (with the same row/column). 426 setCurrentCell(PrivateCellUtil.createFirstOnRowColTS(kv, maxTimestampInFile)); 427 } else { 428 // This will be the case e.g. when we need to seek to the next 429 // row/column, and we don't know exactly what they are, so we set the 430 // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this 431 // row/column. 432 enforceSeek(); 433 } 434 return cur != null; 435 } 436 437 // Multi-column Bloom filter optimization. 438 // Create a fake key/value, so that this scanner only bubbles up to the top 439 // of the KeyValueHeap in StoreScanner after we scanned this row/column in 440 // all other store files. The query matcher will then just skip this fake 441 // key/value and the store scanner will progress to the next column. This 442 // is obviously not a "real real" seek, but unlike the fake KV earlier in 443 // this method, we want this to be propagated to ScanQueryMatcher. 444 setCurrentCell(PrivateCellUtil.createLastOnRowCol(kv)); 445 446 realSeekDone = true; 447 return true; 448 } 449 450 StoreFileReader getReader() { 451 return reader; 452 } 453 454 CellComparator getComparator() { 455 return reader.getComparator(); 456 } 457 458 @Override 459 public boolean realSeekDone() { 460 return realSeekDone; 461 } 462 463 @Override 464 public void enforceSeek() throws IOException { 465 if (realSeekDone) 466 return; 467 468 if (delayedReseek) { 469 reseek(delayedSeekKV); 470 } else { 471 seek(delayedSeekKV); 472 } 473 } 474 475 @Override 476 public boolean isFileScanner() { 477 return true; 478 } 479 480 @Override 481 public Path getFilePath() { 482 return reader.getHFileReader().getPath(); 483 } 484 485 // Test methods 486 static final long getSeekCount() { 487 return seekCount.sum(); 488 } 489 490 static final void instrument() { 491 seekCount = new LongAdder(); 492 } 493 494 @Override 495 public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) { 496 // if the file has no entries, no need to validate or create a scanner. 497 byte[] cf = store.getColumnFamilyDescriptor().getName(); 498 TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); 499 if (timeRange == null) { 500 timeRange = scan.getTimeRange(); 501 } 502 return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader 503 .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf)); 504 } 505 506 @Override 507 public boolean seekToPreviousRow(Cell originalKey) throws IOException { 508 try { 509 try { 510 boolean keepSeeking = false; 511 Cell key = originalKey; 512 do { 513 Cell seekKey = PrivateCellUtil.createFirstOnRow(key); 514 if (seekCount != null) seekCount.increment(); 515 if (!hfs.seekBefore(seekKey)) { 516 this.cur = null; 517 return false; 518 } 519 Cell curCell = hfs.getCell(); 520 Cell firstKeyOfPreviousRow = PrivateCellUtil.createFirstOnRow(curCell); 521 522 if (seekCount != null) seekCount.increment(); 523 if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { 524 this.cur = null; 525 return false; 526 } 527 528 setCurrentCell(hfs.getCell()); 529 this.stopSkippingKVsIfNextRow = true; 530 boolean resultOfSkipKVs; 531 try { 532 resultOfSkipKVs = skipKVsNewerThanReadpoint(); 533 } finally { 534 this.stopSkippingKVsIfNextRow = false; 535 } 536 if (!resultOfSkipKVs 537 || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) { 538 keepSeeking = true; 539 key = firstKeyOfPreviousRow; 540 continue; 541 } else { 542 keepSeeking = false; 543 } 544 } while (keepSeeking); 545 return true; 546 } finally { 547 realSeekDone = true; 548 } 549 } catch (FileNotFoundException e) { 550 throw e; 551 } catch (IOException ioe) { 552 throw new IOException("Could not seekToPreviousRow " + this + " to key " 553 + originalKey, ioe); 554 } 555 } 556 557 @Override 558 public boolean seekToLastRow() throws IOException { 559 Optional<byte[]> lastRow = reader.getLastRowKey(); 560 if (!lastRow.isPresent()) { 561 return false; 562 } 563 Cell seekKey = PrivateCellUtil.createFirstOnRow(lastRow.get()); 564 if (seek(seekKey)) { 565 return true; 566 } else { 567 return seekToPreviousRow(seekKey); 568 } 569 } 570 571 @Override 572 public boolean backwardSeek(Cell key) throws IOException { 573 seek(key); 574 if (cur == null 575 || getComparator().compareRows(cur, key) > 0) { 576 return seekToPreviousRow(key); 577 } 578 return true; 579 } 580 581 @Override 582 public Cell getNextIndexedKey() { 583 return hfs.getNextIndexedKey(); 584 } 585 586 @Override 587 public void shipped() throws IOException { 588 this.hfs.shipped(); 589 } 590}