001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.List; 028import java.util.Optional; 029import java.util.PriorityQueue; 030import java.util.concurrent.atomic.LongAdder; 031 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.PrivateCellUtil; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.io.TimeRange; 042import org.apache.hadoop.hbase.io.hfile.HFileScanner; 043import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 044 045/** 046 * KeyValueScanner adaptor over the Reader. It also provides hooks into 047 * bloom filter things. 048 */ 049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 050@InterfaceStability.Evolving 051public class StoreFileScanner implements KeyValueScanner { 052 // the reader it comes from: 053 private final StoreFileReader reader; 054 private final HFileScanner hfs; 055 private Cell cur = null; 056 private boolean closed = false; 057 058 private boolean realSeekDone; 059 private boolean delayedReseek; 060 private Cell delayedSeekKV; 061 062 private final boolean enforceMVCC; 063 private final boolean hasMVCCInfo; 064 // A flag represents whether could stop skipping KeyValues for MVCC 065 // if have encountered the next row. Only used for reversed scan 066 private boolean stopSkippingKVsIfNextRow = false; 067 068 private static LongAdder seekCount; 069 070 private final boolean canOptimizeForNonNullColumn; 071 072 private final long readPt; 073 074 // Order of this scanner relative to other scanners when duplicate key-value is found. 075 // Higher values means scanner has newer data. 076 private final long scannerOrder; 077 078 /** 079 * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} 080 * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. 081 * @param readPt MVCC value to use to filter out the updates newer than this scanner. 082 * @param hasMVCC Set to true if underlying store file reader has MVCC info. 083 * @param scannerOrder Order of the scanner relative to other scanners. See 084 * {@link KeyValueScanner#getScannerOrder()}. 085 * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, 086 * otherwise {@code false}. This is a hint for optimization. 087 */ 088 public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC, 089 boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { 090 this.readPt = readPt; 091 this.reader = reader; 092 this.hfs = hfs; 093 this.enforceMVCC = useMVCC; 094 this.hasMVCCInfo = hasMVCC; 095 this.scannerOrder = scannerOrder; 096 this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn; 097 this.reader.incrementRefCount(); 098 } 099 100 /** 101 * Return an array of scanners corresponding to the given set of store files. 102 */ 103 public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, 104 boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind, 105 long readPt) throws IOException { 106 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null, 107 readPt); 108 } 109 110 /** 111 * Return an array of scanners corresponding to the given set of store files, And set the 112 * ScanQueryMatcher for each store file scanner for further optimization 113 */ 114 public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, 115 boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, 116 ScanQueryMatcher matcher, long readPt) throws IOException { 117 if (files.isEmpty()) { 118 return Collections.emptyList(); 119 } 120 List<StoreFileScanner> scanners = new ArrayList<>(files.size()); 121 boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false; 122 PriorityQueue<HStoreFile> sortedFiles = 123 new PriorityQueue<>(files.size(), StoreFileComparators.SEQ_ID); 124 for (HStoreFile file : files) { 125 // The sort function needs metadata so we need to open reader first before sorting the list. 126 file.initReader(); 127 sortedFiles.add(file); 128 } 129 boolean succ = false; 130 try { 131 for (int i = 0, n = files.size(); i < n; i++) { 132 HStoreFile sf = sortedFiles.remove(); 133 StoreFileScanner scanner; 134 if (usePread) { 135 scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn); 136 } else { 137 scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i, 138 canOptimizeForNonNullColumn); 139 } 140 scanners.add(scanner); 141 } 142 succ = true; 143 } finally { 144 if (!succ) { 145 for (StoreFileScanner scanner : scanners) { 146 scanner.close(); 147 } 148 } 149 } 150 return scanners; 151 } 152 153 /** 154 * Get scanners for compaction. We will create a separated reader for each store file to avoid 155 * contention with normal read request. 156 */ 157 public static List<StoreFileScanner> getScannersForCompaction(Collection<HStoreFile> files, 158 boolean canUseDropBehind, long readPt) throws IOException { 159 List<StoreFileScanner> scanners = new ArrayList<>(files.size()); 160 List<HStoreFile> sortedFiles = new ArrayList<>(files); 161 Collections.sort(sortedFiles, StoreFileComparators.SEQ_ID); 162 boolean succ = false; 163 try { 164 for (int i = 0, n = sortedFiles.size(); i < n; i++) { 165 scanners.add( 166 sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false)); 167 } 168 succ = true; 169 } finally { 170 if (!succ) { 171 for (StoreFileScanner scanner : scanners) { 172 scanner.close(); 173 } 174 } 175 } 176 return scanners; 177 } 178 179 @Override 180 public String toString() { 181 return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]"; 182 } 183 184 @Override 185 public Cell peek() { 186 return cur; 187 } 188 189 @Override 190 public Cell next() throws IOException { 191 Cell retKey = cur; 192 193 try { 194 // only seek if we aren't at the end. cur == null implies 'end'. 195 if (cur != null) { 196 hfs.next(); 197 setCurrentCell(hfs.getCell()); 198 if (hasMVCCInfo || this.reader.isBulkLoaded()) { 199 skipKVsNewerThanReadpoint(); 200 } 201 } 202 } catch (FileNotFoundException e) { 203 throw e; 204 } catch(IOException e) { 205 throw new IOException("Could not iterate " + this, e); 206 } 207 return retKey; 208 } 209 210 @Override 211 public boolean seek(Cell key) throws IOException { 212 if (seekCount != null) seekCount.increment(); 213 214 try { 215 try { 216 if(!seekAtOrAfter(hfs, key)) { 217 this.cur = null; 218 return false; 219 } 220 221 setCurrentCell(hfs.getCell()); 222 223 if (!hasMVCCInfo && this.reader.isBulkLoaded()) { 224 return skipKVsNewerThanReadpoint(); 225 } else { 226 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); 227 } 228 } finally { 229 realSeekDone = true; 230 } 231 } catch (FileNotFoundException e) { 232 throw e; 233 } catch (IOException ioe) { 234 throw new IOException("Could not seek " + this + " to key " + key, ioe); 235 } 236 } 237 238 @Override 239 public boolean reseek(Cell key) throws IOException { 240 if (seekCount != null) seekCount.increment(); 241 242 try { 243 try { 244 if (!reseekAtOrAfter(hfs, key)) { 245 this.cur = null; 246 return false; 247 } 248 setCurrentCell(hfs.getCell()); 249 250 if (!hasMVCCInfo && this.reader.isBulkLoaded()) { 251 return skipKVsNewerThanReadpoint(); 252 } else { 253 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); 254 } 255 } finally { 256 realSeekDone = true; 257 } 258 } catch (FileNotFoundException e) { 259 throw e; 260 } catch (IOException ioe) { 261 throw new IOException("Could not reseek " + this + " to key " + key, 262 ioe); 263 } 264 } 265 266 protected void setCurrentCell(Cell newVal) throws IOException { 267 this.cur = newVal; 268 if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) { 269 PrivateCellUtil.setSequenceId(cur, this.reader.getSequenceID()); 270 } 271 } 272 273 protected boolean skipKVsNewerThanReadpoint() throws IOException { 274 // We want to ignore all key-values that are newer than our current 275 // readPoint 276 Cell startKV = cur; 277 while(enforceMVCC 278 && cur != null 279 && (cur.getSequenceId() > readPt)) { 280 boolean hasNext = hfs.next(); 281 setCurrentCell(hfs.getCell()); 282 if (hasNext && this.stopSkippingKVsIfNextRow 283 && getComparator().compareRows(cur, startKV) > 0) { 284 return false; 285 } 286 } 287 288 if (cur == null) { 289 return false; 290 } 291 292 return true; 293 } 294 295 @Override 296 public void close() { 297 if (closed) return; 298 cur = null; 299 this.hfs.close(); 300 if (this.reader != null) { 301 this.reader.readCompleted(); 302 } 303 closed = true; 304 } 305 306 /** 307 * 308 * @param s 309 * @param k 310 * @return false if not found or if k is after the end. 311 * @throws IOException 312 */ 313 public static boolean seekAtOrAfter(HFileScanner s, Cell k) 314 throws IOException { 315 int result = s.seekTo(k); 316 if(result < 0) { 317 if (result == HConstants.INDEX_KEY_MAGIC) { 318 // using faked key 319 return true; 320 } 321 // Passed KV is smaller than first KV in file, work from start of file 322 return s.seekTo(); 323 } else if(result > 0) { 324 // Passed KV is larger than current KV in file, if there is a next 325 // it is the "after", if not then this scanner is done. 326 return s.next(); 327 } 328 // Seeked to the exact key 329 return true; 330 } 331 332 static boolean reseekAtOrAfter(HFileScanner s, Cell k) 333 throws IOException { 334 //This function is similar to seekAtOrAfter function 335 int result = s.reseekTo(k); 336 if (result <= 0) { 337 if (result == HConstants.INDEX_KEY_MAGIC) { 338 // using faked key 339 return true; 340 } 341 // If up to now scanner is not seeked yet, this means passed KV is smaller 342 // than first KV in file, and it is the first time we seek on this file. 343 // So we also need to work from the start of file. 344 if (!s.isSeeked()) { 345 return s.seekTo(); 346 } 347 return true; 348 } 349 // passed KV is larger than current KV in file, if there is a next 350 // it is after, if not then this scanner is done. 351 return s.next(); 352 } 353 354 /** 355 * @see KeyValueScanner#getScannerOrder() 356 */ 357 @Override 358 public long getScannerOrder() { 359 return scannerOrder; 360 } 361 362 /** 363 * Pretend we have done a seek but don't do it yet, if possible. The hope is 364 * that we find requested columns in more recent files and won't have to seek 365 * in older files. Creates a fake key/value with the given row/column and the 366 * highest (most recent) possible timestamp we might get from this file. When 367 * users of such "lazy scanner" need to know the next KV precisely (e.g. when 368 * this scanner is at the top of the heap), they run {@link #enforceSeek()}. 369 * <p> 370 * Note that this function does guarantee that the current KV of this scanner 371 * will be advanced to at least the given KV. Because of this, it does have 372 * to do a real seek in cases when the seek timestamp is older than the 373 * highest timestamp of the file, e.g. when we are trying to seek to the next 374 * row/column and use OLDEST_TIMESTAMP in the seek key. 375 */ 376 @Override 377 public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) 378 throws IOException { 379 if (kv.getFamilyLength() == 0) { 380 useBloom = false; 381 } 382 383 boolean haveToSeek = true; 384 if (useBloom) { 385 // check ROWCOL Bloom filter first. 386 if (reader.getBloomFilterType() == BloomType.ROWCOL) { 387 haveToSeek = reader.passesGeneralRowColBloomFilter(kv); 388 } else if (canOptimizeForNonNullColumn 389 && ((PrivateCellUtil.isDeleteFamily(kv) 390 || PrivateCellUtil.isDeleteFamilyVersion(kv)))) { 391 // if there is no such delete family kv in the store file, 392 // then no need to seek. 393 haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(), 394 kv.getRowLength()); 395 } 396 } 397 398 delayedReseek = forward; 399 delayedSeekKV = kv; 400 401 if (haveToSeek) { 402 // This row/column might be in this store file (or we did not use the 403 // Bloom filter), so we still need to seek. 404 realSeekDone = false; 405 long maxTimestampInFile = reader.getMaxTimestamp(); 406 long seekTimestamp = kv.getTimestamp(); 407 if (seekTimestamp > maxTimestampInFile) { 408 // Create a fake key that is not greater than the real next key. 409 // (Lower timestamps correspond to higher KVs.) 410 // To understand this better, consider that we are asked to seek to 411 // a higher timestamp than the max timestamp in this file. We know that 412 // the next point when we have to consider this file again is when we 413 // pass the max timestamp of this file (with the same row/column). 414 setCurrentCell(PrivateCellUtil.createFirstOnRowColTS(kv, maxTimestampInFile)); 415 } else { 416 // This will be the case e.g. when we need to seek to the next 417 // row/column, and we don't know exactly what they are, so we set the 418 // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this 419 // row/column. 420 enforceSeek(); 421 } 422 return cur != null; 423 } 424 425 // Multi-column Bloom filter optimization. 426 // Create a fake key/value, so that this scanner only bubbles up to the top 427 // of the KeyValueHeap in StoreScanner after we scanned this row/column in 428 // all other store files. The query matcher will then just skip this fake 429 // key/value and the store scanner will progress to the next column. This 430 // is obviously not a "real real" seek, but unlike the fake KV earlier in 431 // this method, we want this to be propagated to ScanQueryMatcher. 432 setCurrentCell(PrivateCellUtil.createLastOnRowCol(kv)); 433 434 realSeekDone = true; 435 return true; 436 } 437 438 StoreFileReader getReader() { 439 return reader; 440 } 441 442 CellComparator getComparator() { 443 return reader.getComparator(); 444 } 445 446 @Override 447 public boolean realSeekDone() { 448 return realSeekDone; 449 } 450 451 @Override 452 public void enforceSeek() throws IOException { 453 if (realSeekDone) 454 return; 455 456 if (delayedReseek) { 457 reseek(delayedSeekKV); 458 } else { 459 seek(delayedSeekKV); 460 } 461 } 462 463 @Override 464 public boolean isFileScanner() { 465 return true; 466 } 467 468 @Override 469 public Path getFilePath() { 470 return reader.getHFileReader().getPath(); 471 } 472 473 // Test methods 474 static final long getSeekCount() { 475 return seekCount.sum(); 476 } 477 478 static final void instrument() { 479 seekCount = new LongAdder(); 480 } 481 482 @Override 483 public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) { 484 // if the file has no entries, no need to validate or create a scanner. 485 byte[] cf = store.getColumnFamilyDescriptor().getName(); 486 TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); 487 if (timeRange == null) { 488 timeRange = scan.getTimeRange(); 489 } 490 return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader 491 .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf)); 492 } 493 494 @Override 495 public boolean seekToPreviousRow(Cell originalKey) throws IOException { 496 try { 497 try { 498 boolean keepSeeking = false; 499 Cell key = originalKey; 500 do { 501 Cell seekKey = PrivateCellUtil.createFirstOnRow(key); 502 if (seekCount != null) seekCount.increment(); 503 if (!hfs.seekBefore(seekKey)) { 504 this.cur = null; 505 return false; 506 } 507 Cell curCell = hfs.getCell(); 508 Cell firstKeyOfPreviousRow = PrivateCellUtil.createFirstOnRow(curCell); 509 510 if (seekCount != null) seekCount.increment(); 511 if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { 512 this.cur = null; 513 return false; 514 } 515 516 setCurrentCell(hfs.getCell()); 517 this.stopSkippingKVsIfNextRow = true; 518 boolean resultOfSkipKVs; 519 try { 520 resultOfSkipKVs = skipKVsNewerThanReadpoint(); 521 } finally { 522 this.stopSkippingKVsIfNextRow = false; 523 } 524 if (!resultOfSkipKVs 525 || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) { 526 keepSeeking = true; 527 key = firstKeyOfPreviousRow; 528 continue; 529 } else { 530 keepSeeking = false; 531 } 532 } while (keepSeeking); 533 return true; 534 } finally { 535 realSeekDone = true; 536 } 537 } catch (FileNotFoundException e) { 538 throw e; 539 } catch (IOException ioe) { 540 throw new IOException("Could not seekToPreviousRow " + this + " to key " 541 + originalKey, ioe); 542 } 543 } 544 545 @Override 546 public boolean seekToLastRow() throws IOException { 547 Optional<byte[]> lastRow = reader.getLastRowKey(); 548 if (!lastRow.isPresent()) { 549 return false; 550 } 551 Cell seekKey = PrivateCellUtil.createFirstOnRow(lastRow.get()); 552 if (seek(seekKey)) { 553 return true; 554 } else { 555 return seekToPreviousRow(seekKey); 556 } 557 } 558 559 @Override 560 public boolean backwardSeek(Cell key) throws IOException { 561 seek(key); 562 if (cur == null 563 || getComparator().compareRows(cur, key) > 0) { 564 return seekToPreviousRow(key); 565 } 566 return true; 567 } 568 569 @Override 570 public Cell getNextIndexedKey() { 571 return hfs.getNextIndexedKey(); 572 } 573 574 @Override 575 public void shipped() throws IOException { 576 this.hfs.shipped(); 577 } 578}