001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.List; 026import java.util.Optional; 027import java.util.PriorityQueue; 028import java.util.concurrent.atomic.LongAdder; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.HBaseInterfaceAudience; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.io.TimeRange; 037import org.apache.hadoop.hbase.io.hfile.HFileScanner; 038import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041 042/** 043 * KeyValueScanner adaptor over the Reader. It also provides hooks into bloom filter things. 044 */ 045@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 046@InterfaceStability.Evolving 047public class StoreFileScanner implements KeyValueScanner { 048 // the reader it comes from: 049 private final StoreFileReader reader; 050 private final HFileScanner hfs; 051 private Cell cur = null; 052 private boolean closed = false; 053 054 private boolean realSeekDone; 055 private boolean delayedReseek; 056 private Cell delayedSeekKV; 057 058 private final boolean enforceMVCC; 059 private final boolean hasMVCCInfo; 060 // A flag represents whether could stop skipping KeyValues for MVCC 061 // if have encountered the next row. Only used for reversed scan 062 private boolean stopSkippingKVsIfNextRow = false; 063 064 private static LongAdder seekCount; 065 066 private final boolean canOptimizeForNonNullColumn; 067 068 private final long readPt; 069 070 // Order of this scanner relative to other scanners when duplicate key-value is found. 071 // Higher values means scanner has newer data. 072 private final long scannerOrder; 073 074 /** 075 * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} 076 * @param useMVCC If true, scanner will filter out updates with MVCC larger 077 * than {@code readPt}. 078 * @param readPt MVCC value to use to filter out the updates newer than this 079 * scanner. 080 * @param hasMVCC Set to true if underlying store file reader has MVCC info. 081 * @param scannerOrder Order of the scanner relative to other scanners. See 082 * {@link KeyValueScanner#getScannerOrder()}. 083 * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, 084 * otherwise {@code false}. This is a hint for optimization. 085 */ 086 public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC, 087 boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { 088 this.readPt = readPt; 089 this.reader = reader; 090 this.hfs = hfs; 091 this.enforceMVCC = useMVCC; 092 this.hasMVCCInfo = hasMVCC; 093 this.scannerOrder = scannerOrder; 094 this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn; 095 this.reader.incrementRefCount(); 096 } 097 098 /** 099 * Return an array of scanners corresponding to the given set of store files. 100 */ 101 public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, 102 boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind, long readPt) 103 throws IOException { 104 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null, 105 readPt); 106 } 107 108 /** 109 * Return an array of scanners corresponding to the given set of store files, And set the 110 * ScanQueryMatcher for each store file scanner for further optimization 111 */ 112 public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, 113 boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, 114 ScanQueryMatcher matcher, long readPt) throws IOException { 115 if (files.isEmpty()) { 116 return Collections.emptyList(); 117 } 118 List<StoreFileScanner> scanners = new ArrayList<>(files.size()); 119 boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false; 120 PriorityQueue<HStoreFile> sortedFiles = 121 new PriorityQueue<>(files.size(), StoreFileComparators.SEQ_ID); 122 for (HStoreFile file : files) { 123 // The sort function needs metadata so we need to open reader first before sorting the list. 124 file.initReader(); 125 sortedFiles.add(file); 126 } 127 boolean succ = false; 128 try { 129 for (int i = 0, n = files.size(); i < n; i++) { 130 HStoreFile sf = sortedFiles.remove(); 131 StoreFileScanner scanner; 132 if (usePread) { 133 scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn); 134 } else { 135 scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i, 136 canOptimizeForNonNullColumn); 137 } 138 scanners.add(scanner); 139 } 140 succ = true; 141 } finally { 142 if (!succ) { 143 for (StoreFileScanner scanner : scanners) { 144 scanner.close(); 145 } 146 } 147 } 148 return scanners; 149 } 150 151 /** 152 * Get scanners for compaction. We will create a separated reader for each store file to avoid 153 * contention with normal read request. 154 */ 155 public static List<StoreFileScanner> getScannersForCompaction(Collection<HStoreFile> files, 156 boolean canUseDropBehind, long readPt) throws IOException { 157 List<StoreFileScanner> scanners = new ArrayList<>(files.size()); 158 List<HStoreFile> sortedFiles = new ArrayList<>(files); 159 Collections.sort(sortedFiles, StoreFileComparators.SEQ_ID); 160 boolean succ = false; 161 try { 162 for (int i = 0, n = sortedFiles.size(); i < n; i++) { 163 scanners.add( 164 sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false)); 165 } 166 succ = true; 167 } finally { 168 if (!succ) { 169 for (StoreFileScanner scanner : scanners) { 170 scanner.close(); 171 } 172 } 173 } 174 return scanners; 175 } 176 177 @Override 178 public String toString() { 179 return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]"; 180 } 181 182 @Override 183 public Cell peek() { 184 return cur; 185 } 186 187 @Override 188 public Cell next() throws IOException { 189 Cell retKey = cur; 190 191 try { 192 // only seek if we aren't at the end. cur == null implies 'end'. 193 if (cur != null) { 194 hfs.next(); 195 setCurrentCell(hfs.getCell()); 196 if (hasMVCCInfo || this.reader.isBulkLoaded()) { 197 skipKVsNewerThanReadpoint(); 198 } 199 } 200 } catch (FileNotFoundException e) { 201 throw e; 202 } catch (IOException e) { 203 throw new IOException("Could not iterate " + this, e); 204 } 205 return retKey; 206 } 207 208 @Override 209 public boolean seek(Cell key) throws IOException { 210 if (seekCount != null) seekCount.increment(); 211 212 try { 213 try { 214 if (!seekAtOrAfter(hfs, key)) { 215 this.cur = null; 216 return false; 217 } 218 219 setCurrentCell(hfs.getCell()); 220 221 if (!hasMVCCInfo && this.reader.isBulkLoaded()) { 222 return skipKVsNewerThanReadpoint(); 223 } else { 224 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); 225 } 226 } finally { 227 realSeekDone = true; 228 } 229 } catch (FileNotFoundException e) { 230 throw e; 231 } catch (IOException ioe) { 232 throw new IOException("Could not seek " + this + " to key " + key, ioe); 233 } 234 } 235 236 @Override 237 public boolean reseek(Cell key) throws IOException { 238 if (seekCount != null) seekCount.increment(); 239 240 try { 241 try { 242 if (!reseekAtOrAfter(hfs, key)) { 243 this.cur = null; 244 return false; 245 } 246 setCurrentCell(hfs.getCell()); 247 248 if (!hasMVCCInfo && this.reader.isBulkLoaded()) { 249 return skipKVsNewerThanReadpoint(); 250 } else { 251 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); 252 } 253 } finally { 254 realSeekDone = true; 255 } 256 } catch (FileNotFoundException e) { 257 throw e; 258 } catch (IOException ioe) { 259 throw new IOException("Could not reseek " + this + " to key " + key, ioe); 260 } 261 } 262 263 protected void setCurrentCell(Cell newVal) throws IOException { 264 this.cur = newVal; 265 if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) { 266 PrivateCellUtil.setSequenceId(cur, this.reader.getSequenceID()); 267 } 268 } 269 270 protected boolean skipKVsNewerThanReadpoint() throws IOException { 271 // We want to ignore all key-values that are newer than our current 272 // readPoint 273 Cell startKV = cur; 274 while (enforceMVCC && cur != null && (cur.getSequenceId() > readPt)) { 275 boolean hasNext = hfs.next(); 276 setCurrentCell(hfs.getCell()); 277 if ( 278 hasNext && this.stopSkippingKVsIfNextRow && getComparator().compareRows(cur, startKV) > 0 279 ) { 280 return false; 281 } 282 } 283 284 if (cur == null) { 285 return false; 286 } 287 288 return true; 289 } 290 291 @Override 292 public void close() { 293 if (closed) return; 294 cur = null; 295 this.hfs.close(); 296 if (this.reader != null) { 297 this.reader.readCompleted(); 298 } 299 closed = true; 300 } 301 302 /** 303 * nn * @return false if not found or if k is after the end. n 304 */ 305 public static boolean seekAtOrAfter(HFileScanner s, Cell k) throws IOException { 306 int result = s.seekTo(k); 307 if (result < 0) { 308 if (result == HConstants.INDEX_KEY_MAGIC) { 309 // using faked key 310 return true; 311 } 312 // Passed KV is smaller than first KV in file, work from start of file 313 return s.seekTo(); 314 } else if (result > 0) { 315 // Passed KV is larger than current KV in file, if there is a next 316 // it is the "after", if not then this scanner is done. 317 return s.next(); 318 } 319 // Seeked to the exact key 320 return true; 321 } 322 323 static boolean reseekAtOrAfter(HFileScanner s, Cell k) throws IOException { 324 // This function is similar to seekAtOrAfter function 325 int result = s.reseekTo(k); 326 if (result <= 0) { 327 if (result == HConstants.INDEX_KEY_MAGIC) { 328 // using faked key 329 return true; 330 } 331 // If up to now scanner is not seeked yet, this means passed KV is smaller 332 // than first KV in file, and it is the first time we seek on this file. 333 // So we also need to work from the start of file. 334 if (!s.isSeeked()) { 335 return s.seekTo(); 336 } 337 return true; 338 } 339 // passed KV is larger than current KV in file, if there is a next 340 // it is after, if not then this scanner is done. 341 return s.next(); 342 } 343 344 /** 345 * @see KeyValueScanner#getScannerOrder() 346 */ 347 @Override 348 public long getScannerOrder() { 349 return scannerOrder; 350 } 351 352 /** 353 * Pretend we have done a seek but don't do it yet, if possible. The hope is that we find 354 * requested columns in more recent files and won't have to seek in older files. Creates a fake 355 * key/value with the given row/column and the highest (most recent) possible timestamp we might 356 * get from this file. When users of such "lazy scanner" need to know the next KV precisely (e.g. 357 * when this scanner is at the top of the heap), they run {@link #enforceSeek()}. 358 * <p> 359 * Note that this function does guarantee that the current KV of this scanner will be advanced to 360 * at least the given KV. Because of this, it does have to do a real seek in cases when the seek 361 * timestamp is older than the highest timestamp of the file, e.g. when we are trying to seek to 362 * the next row/column and use OLDEST_TIMESTAMP in the seek key. 363 */ 364 @Override 365 public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException { 366 if (kv.getFamilyLength() == 0) { 367 useBloom = false; 368 } 369 370 boolean haveToSeek = true; 371 if (useBloom) { 372 // check ROWCOL Bloom filter first. 373 if (reader.getBloomFilterType() == BloomType.ROWCOL) { 374 haveToSeek = reader.passesGeneralRowColBloomFilter(kv); 375 } else if ( 376 canOptimizeForNonNullColumn 377 && ((PrivateCellUtil.isDeleteFamily(kv) || PrivateCellUtil.isDeleteFamilyVersion(kv))) 378 ) { 379 // if there is no such delete family kv in the store file, 380 // then no need to seek. 381 haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(), 382 kv.getRowLength()); 383 } 384 } 385 386 delayedReseek = forward; 387 delayedSeekKV = kv; 388 389 if (haveToSeek) { 390 // This row/column might be in this store file (or we did not use the 391 // Bloom filter), so we still need to seek. 392 realSeekDone = false; 393 long maxTimestampInFile = reader.getMaxTimestamp(); 394 long seekTimestamp = kv.getTimestamp(); 395 if (seekTimestamp > maxTimestampInFile) { 396 // Create a fake key that is not greater than the real next key. 397 // (Lower timestamps correspond to higher KVs.) 398 // To understand this better, consider that we are asked to seek to 399 // a higher timestamp than the max timestamp in this file. We know that 400 // the next point when we have to consider this file again is when we 401 // pass the max timestamp of this file (with the same row/column). 402 setCurrentCell(PrivateCellUtil.createFirstOnRowColTS(kv, maxTimestampInFile)); 403 } else { 404 // This will be the case e.g. when we need to seek to the next 405 // row/column, and we don't know exactly what they are, so we set the 406 // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this 407 // row/column. 408 enforceSeek(); 409 } 410 return cur != null; 411 } 412 413 // Multi-column Bloom filter optimization. 414 // Create a fake key/value, so that this scanner only bubbles up to the top 415 // of the KeyValueHeap in StoreScanner after we scanned this row/column in 416 // all other store files. The query matcher will then just skip this fake 417 // key/value and the store scanner will progress to the next column. This 418 // is obviously not a "real real" seek, but unlike the fake KV earlier in 419 // this method, we want this to be propagated to ScanQueryMatcher. 420 setCurrentCell(PrivateCellUtil.createLastOnRowCol(kv)); 421 422 realSeekDone = true; 423 return true; 424 } 425 426 StoreFileReader getReader() { 427 return reader; 428 } 429 430 CellComparator getComparator() { 431 return reader.getComparator(); 432 } 433 434 @Override 435 public boolean realSeekDone() { 436 return realSeekDone; 437 } 438 439 @Override 440 public void enforceSeek() throws IOException { 441 if (realSeekDone) return; 442 443 if (delayedReseek) { 444 reseek(delayedSeekKV); 445 } else { 446 seek(delayedSeekKV); 447 } 448 } 449 450 @Override 451 public boolean isFileScanner() { 452 return true; 453 } 454 455 @Override 456 public Path getFilePath() { 457 return reader.getHFileReader().getPath(); 458 } 459 460 // Test methods 461 static final long getSeekCount() { 462 return seekCount.sum(); 463 } 464 465 static final void instrument() { 466 seekCount = new LongAdder(); 467 } 468 469 @Override 470 public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) { 471 // if the file has no entries, no need to validate or create a scanner. 472 byte[] cf = store.getColumnFamilyDescriptor().getName(); 473 TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); 474 if (timeRange == null) { 475 timeRange = scan.getTimeRange(); 476 } 477 return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) 478 && reader.passesKeyRangeFilter(scan) 479 && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf)); 480 } 481 482 @Override 483 public boolean seekToPreviousRow(Cell originalKey) throws IOException { 484 try { 485 try { 486 boolean keepSeeking = false; 487 Cell key = originalKey; 488 do { 489 Cell seekKey = PrivateCellUtil.createFirstOnRow(key); 490 if (seekCount != null) seekCount.increment(); 491 if (!hfs.seekBefore(seekKey)) { 492 this.cur = null; 493 return false; 494 } 495 Cell curCell = hfs.getCell(); 496 Cell firstKeyOfPreviousRow = PrivateCellUtil.createFirstOnRow(curCell); 497 498 if (seekCount != null) seekCount.increment(); 499 if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { 500 this.cur = null; 501 return false; 502 } 503 504 setCurrentCell(hfs.getCell()); 505 this.stopSkippingKVsIfNextRow = true; 506 boolean resultOfSkipKVs; 507 try { 508 resultOfSkipKVs = skipKVsNewerThanReadpoint(); 509 } finally { 510 this.stopSkippingKVsIfNextRow = false; 511 } 512 if (!resultOfSkipKVs || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) { 513 keepSeeking = true; 514 key = firstKeyOfPreviousRow; 515 continue; 516 } else { 517 keepSeeking = false; 518 } 519 } while (keepSeeking); 520 return true; 521 } finally { 522 realSeekDone = true; 523 } 524 } catch (FileNotFoundException e) { 525 throw e; 526 } catch (IOException ioe) { 527 throw new IOException("Could not seekToPreviousRow " + this + " to key " + originalKey, ioe); 528 } 529 } 530 531 @Override 532 public boolean seekToLastRow() throws IOException { 533 Optional<byte[]> lastRow = reader.getLastRowKey(); 534 if (!lastRow.isPresent()) { 535 return false; 536 } 537 Cell seekKey = PrivateCellUtil.createFirstOnRow(lastRow.get()); 538 if (seek(seekKey)) { 539 return true; 540 } else { 541 return seekToPreviousRow(seekKey); 542 } 543 } 544 545 @Override 546 public boolean backwardSeek(Cell key) throws IOException { 547 seek(key); 548 if (cur == null || getComparator().compareRows(cur, key) > 0) { 549 return seekToPreviousRow(key); 550 } 551 return true; 552 } 553 554 @Override 555 public Cell getNextIndexedKey() { 556 return hfs.getNextIndexedKey(); 557 } 558 559 @Override 560 public void shipped() throws IOException { 561 this.hfs.shipped(); 562 } 563}