001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.Collections; 023import java.util.Map; 024import java.util.Optional; 025import java.util.OptionalLong; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.HDFSBlocksDistribution; 037import org.apache.hadoop.hbase.io.TimeRange; 038import org.apache.hadoop.hbase.io.hfile.BlockType; 039import org.apache.hadoop.hbase.io.hfile.CacheConfig; 040import org.apache.hadoop.hbase.io.hfile.HFile; 041import org.apache.hadoop.hbase.util.BloomFilterFactory; 042import org.apache.hadoop.hbase.util.Bytes; 043 044import org.apache.yetus.audience.InterfaceAudience; 045 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 050 051 052/** 053 * A Store data file. Stores usually have one or more of these files. They 054 * are produced by flushing the memstore to disk. To 055 * create, instantiate a writer using {@link StoreFileWriter.Builder} 056 * and append data. Be sure to add any metadata before calling close on the 057 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile 058 * is sitting in the Filesystem. To refer to it, create a StoreFile instance 059 * passing filesystem and path. To read, call {@link #initReader()} 060 * <p>StoreFiles may also reference store files in another Store. 061 * 062 * The reason for this weird pattern where you use a different instance for the 063 * writer and a reader is that we write once but read a lot more. 064 */ 065@InterfaceAudience.Private 066public class HStoreFile implements StoreFile, StoreFileReader.Listener { 067 068 private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName()); 069 070 public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead"; 071 072 private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false; 073 074 // Keys for fileinfo values in HFile 075 076 /** Max Sequence ID in FileInfo */ 077 public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); 078 079 /** Major compaction flag in FileInfo */ 080 public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); 081 082 /** Minor compaction flag in FileInfo */ 083 public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY = 084 Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); 085 086 /** Bloom filter Type in FileInfo */ 087 public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); 088 089 /** Delete Family Count in FileInfo */ 090 public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); 091 092 /** Last Bloom filter key in FileInfo */ 093 public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); 094 095 /** Key for Timerange information in metadata */ 096 public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); 097 098 /** Key for timestamp of earliest-put in metadata */ 099 public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); 100 101 /** Key for the number of mob cells in metadata */ 102 public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); 103 104 /** Meta key set when store file is a result of a bulk load */ 105 public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK"); 106 public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); 107 108 /** 109 * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets 110 * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped. 111 */ 112 public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); 113 114 private final StoreFileInfo fileInfo; 115 private final FileSystem fs; 116 117 // Block cache configuration and reference. 118 private final CacheConfig cacheConf; 119 120 // Counter that is incremented every time a scanner is created on the 121 // store file. It is decremented when the scan on the store file is 122 // done. 123 private final AtomicInteger refCount = new AtomicInteger(0); 124 125 // Set implementation must be of concurrent type 126 @VisibleForTesting 127 final Set<StoreFileReader> streamReaders; 128 129 private final boolean noReadahead; 130 131 private final boolean primaryReplica; 132 133 // Indicates if the file got compacted 134 private volatile boolean compactedAway = false; 135 136 // Keys for metadata stored in backing HFile. 137 // Set when we obtain a Reader. 138 private long sequenceid = -1; 139 140 // max of the MemstoreTS in the KV's in this store 141 // Set when we obtain a Reader. 142 private long maxMemstoreTS = -1; 143 144 // firstKey, lastkey and cellComparator will be set when openReader. 145 private Optional<Cell> firstKey; 146 147 private Optional<Cell> lastKey; 148 149 private CellComparator comparator; 150 151 public CacheConfig getCacheConf() { 152 return cacheConf; 153 } 154 155 @Override 156 public Optional<Cell> getFirstKey() { 157 return firstKey; 158 } 159 160 @Override 161 public Optional<Cell> getLastKey() { 162 return lastKey; 163 } 164 165 @Override 166 public CellComparator getComparator() { 167 return comparator; 168 } 169 170 @Override 171 public long getMaxMemStoreTS() { 172 return maxMemstoreTS; 173 } 174 175 // If true, this file was product of a major compaction. Its then set 176 // whenever you get a Reader. 177 private AtomicBoolean majorCompaction = null; 178 179 // If true, this file should not be included in minor compactions. 180 // It's set whenever you get a Reader. 181 private boolean excludeFromMinorCompaction = false; 182 183 /** 184 * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened 185 * after which it is not modified again. 186 */ 187 private Map<byte[], byte[]> metadataMap; 188 189 // StoreFile.Reader 190 private volatile StoreFileReader reader; 191 192 /** 193 * Bloom filter type specified in column family configuration. Does not 194 * necessarily correspond to the Bloom filter type present in the HFile. 195 */ 196 private final BloomType cfBloomType; 197 198 /** 199 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 200 * depending on the underlying files (10-20MB?). 201 * @param fs The current file system to use. 202 * @param p The path of the file. 203 * @param conf The current configuration. 204 * @param cacheConf The cache configuration and block cache reference. 205 * @param cfBloomType The bloom type to use for this store file as specified by column family 206 * configuration. This may or may not be the same as the Bloom filter type actually 207 * present in the HFile, because column family configuration might change. If this is 208 * {@link BloomType#NONE}, the existing Bloom filter is ignored. 209 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 210 * @throws IOException 211 */ 212 public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf, 213 BloomType cfBloomType, boolean primaryReplica) throws IOException { 214 this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica); 215 } 216 217 /** 218 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 219 * depending on the underlying files (10-20MB?). 220 * @param fs fs The current file system to use. 221 * @param fileInfo The store file information. 222 * @param conf The current configuration. 223 * @param cacheConf The cache configuration and block cache reference. 224 * @param cfBloomType The bloom type to use for this store file as specified by column 225 * family configuration. This may or may not be the same as the Bloom filter type 226 * actually present in the HFile, because column family configuration might change. If 227 * this is {@link BloomType#NONE}, the existing Bloom filter is ignored. 228 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 229 */ 230 public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf, 231 BloomType cfBloomType, boolean primaryReplica) { 232 this.streamReaders = ConcurrentHashMap.newKeySet(); 233 this.fs = fs; 234 this.fileInfo = fileInfo; 235 this.cacheConf = cacheConf; 236 this.noReadahead = 237 conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD); 238 if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { 239 this.cfBloomType = cfBloomType; 240 } else { 241 LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" + 242 cfBloomType + " (disabled in config)"); 243 this.cfBloomType = BloomType.NONE; 244 } 245 this.primaryReplica = primaryReplica; 246 } 247 248 /** 249 * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a 250 * reference. 251 */ 252 public StoreFileInfo getFileInfo() { 253 return this.fileInfo; 254 } 255 256 @Override 257 public Path getPath() { 258 return this.fileInfo.getPath(); 259 } 260 261 @Override 262 public Path getQualifiedPath() { 263 return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory()); 264 } 265 266 @Override 267 public boolean isReference() { 268 return this.fileInfo.isReference(); 269 } 270 271 @Override 272 public boolean isHFile() { 273 return StoreFileInfo.isHFile(this.fileInfo.getPath()); 274 } 275 276 @Override 277 public boolean isMajorCompactionResult() { 278 if (this.majorCompaction == null) { 279 throw new NullPointerException("This has not been set yet"); 280 } 281 return this.majorCompaction.get(); 282 } 283 284 @Override 285 public boolean excludeFromMinorCompaction() { 286 return this.excludeFromMinorCompaction; 287 } 288 289 @Override 290 public long getMaxSequenceId() { 291 return this.sequenceid; 292 } 293 294 @Override 295 public long getModificationTimeStamp() throws IOException { 296 return getModificationTimestamp(); 297 } 298 299 @Override 300 public long getModificationTimestamp() throws IOException { 301 return fileInfo.getModificationTime(); 302 } 303 304 /** 305 * Only used by the Striped Compaction Policy 306 * @param key 307 * @return value associated with the metadata key 308 */ 309 public byte[] getMetadataValue(byte[] key) { 310 return metadataMap.get(key); 311 } 312 313 @Override 314 public boolean isBulkLoadResult() { 315 boolean bulkLoadedHFile = false; 316 String fileName = this.getPath().getName(); 317 int startPos = fileName.indexOf("SeqId_"); 318 if (startPos != -1) { 319 bulkLoadedHFile = true; 320 } 321 return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); 322 } 323 324 public boolean isCompactedAway() { 325 return compactedAway; 326 } 327 328 @VisibleForTesting 329 public int getRefCount() { 330 return refCount.get(); 331 } 332 333 /** 334 * @return true if the file is still used in reads 335 */ 336 public boolean isReferencedInReads() { 337 int rc = refCount.get(); 338 assert rc >= 0; // we should not go negative. 339 return rc > 0; 340 } 341 342 @Override 343 public OptionalLong getBulkLoadTimestamp() { 344 byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); 345 return bulkLoadTimestamp == null ? OptionalLong.empty() 346 : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp)); 347 } 348 349 /** 350 * @return the cached value of HDFS blocks distribution. The cached value is calculated when store 351 * file is opened. 352 */ 353 public HDFSBlocksDistribution getHDFSBlockDistribution() { 354 return this.fileInfo.getHDFSBlockDistribution(); 355 } 356 357 /** 358 * Opens reader on this store file. Called by Constructor. 359 * @throws IOException 360 * @see #closeStoreFile(boolean) 361 */ 362 private void open() throws IOException { 363 if (this.reader != null) { 364 throw new IllegalAccessError("Already open"); 365 } 366 367 // Open the StoreFile.Reader 368 this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L, 369 primaryReplica, refCount, true); 370 371 // Load up indices and fileinfo. This also loads Bloom filter type. 372 metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); 373 374 // Read in our metadata. 375 byte [] b = metadataMap.get(MAX_SEQ_ID_KEY); 376 if (b != null) { 377 // By convention, if halfhfile, top half has a sequence number > bottom 378 // half. Thats why we add one in below. Its done for case the two halves 379 // are ever merged back together --rare. Without it, on open of store, 380 // since store files are distinguished by sequence id, the one half would 381 // subsume the other. 382 this.sequenceid = Bytes.toLong(b); 383 if (fileInfo.isTopReference()) { 384 this.sequenceid += 1; 385 } 386 } 387 388 if (isBulkLoadResult()){ 389 // generate the sequenceId from the fileName 390 // fileName is of the form <randomName>_SeqId_<id-when-loaded>_ 391 String fileName = this.getPath().getName(); 392 // Use lastIndexOf() to get the last, most recent bulk load seqId. 393 int startPos = fileName.lastIndexOf("SeqId_"); 394 if (startPos != -1) { 395 this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, 396 fileName.indexOf('_', startPos + 6))); 397 // Handle reference files as done above. 398 if (fileInfo.isTopReference()) { 399 this.sequenceid += 1; 400 } 401 } 402 // SKIP_RESET_SEQ_ID only works in bulk loaded file. 403 // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk 404 // loaded to hbase, these cells have the same seqIds with the old ones. We do not want 405 // to reset new seqIds for them since this might make a mess of the visibility of cells that 406 // have the same row key but different seqIds. 407 boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)); 408 if (skipResetSeqId) { 409 // increase the seqId when it is a bulk loaded file from mob compaction. 410 this.sequenceid += 1; 411 } 412 this.reader.setSkipResetSeqId(skipResetSeqId); 413 this.reader.setBulkLoaded(true); 414 } 415 this.reader.setSequenceID(this.sequenceid); 416 417 b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 418 if (b != null) { 419 this.maxMemstoreTS = Bytes.toLong(b); 420 } 421 422 b = metadataMap.get(MAJOR_COMPACTION_KEY); 423 if (b != null) { 424 boolean mc = Bytes.toBoolean(b); 425 if (this.majorCompaction == null) { 426 this.majorCompaction = new AtomicBoolean(mc); 427 } else { 428 this.majorCompaction.set(mc); 429 } 430 } else { 431 // Presume it is not major compacted if it doesn't explicity say so 432 // HFileOutputFormat explicitly sets the major compacted key. 433 this.majorCompaction = new AtomicBoolean(false); 434 } 435 436 b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY); 437 this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b)); 438 439 BloomType hfileBloomType = reader.getBloomFilterType(); 440 if (cfBloomType != BloomType.NONE) { 441 reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META); 442 if (hfileBloomType != cfBloomType) { 443 LOG.info("HFile Bloom filter type for " 444 + reader.getHFileReader().getName() + ": " + hfileBloomType 445 + ", but " + cfBloomType + " specified in column family " 446 + "configuration"); 447 } 448 } else if (hfileBloomType != BloomType.NONE) { 449 LOG.info("Bloom filter turned off by CF config for " 450 + reader.getHFileReader().getName()); 451 } 452 453 // load delete family bloom filter 454 reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META); 455 456 try { 457 byte[] data = metadataMap.get(TIMERANGE_KEY); 458 this.reader.timeRange = data == null ? null : TimeRangeTracker.parseFrom(data).toTimeRange(); 459 } catch (IllegalArgumentException e) { 460 LOG.error("Error reading timestamp range data from meta -- " + 461 "proceeding without", e); 462 this.reader.timeRange = null; 463 } 464 // initialize so we can reuse them after reader closed. 465 firstKey = reader.getFirstKey(); 466 lastKey = reader.getLastKey(); 467 comparator = reader.getComparator(); 468 } 469 470 /** 471 * Initialize the reader used for pread. 472 */ 473 public void initReader() throws IOException { 474 if (reader == null) { 475 try { 476 open(); 477 } catch (Exception e) { 478 try { 479 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 480 this.closeStoreFile(evictOnClose); 481 } catch (IOException ee) { 482 LOG.warn("failed to close reader", ee); 483 } 484 throw e; 485 } 486 } 487 } 488 489 private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException { 490 initReader(); 491 StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L, 492 primaryReplica, refCount, false); 493 reader.copyFields(this.reader); 494 return reader; 495 } 496 497 /** 498 * Get a scanner which uses pread. 499 * <p> 500 * Must be called after initReader. 501 */ 502 public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, 503 boolean canOptimizeForNonNullColumn) { 504 return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, 505 canOptimizeForNonNullColumn); 506 } 507 508 /** 509 * Get a scanner which uses streaming read. 510 * <p> 511 * Must be called after initReader. 512 */ 513 public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, 514 boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) 515 throws IOException { 516 StoreFileReader reader = createStreamReader(canUseDropBehind); 517 reader.setListener(this); 518 StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false, 519 isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); 520 //Add reader once the scanner is created 521 streamReaders.add(reader); 522 return sfScanner; 523 } 524 525 /** 526 * @return Current reader. Must call initReader first else returns null. 527 * @see #initReader() 528 */ 529 public StoreFileReader getReader() { 530 return this.reader; 531 } 532 533 /** 534 * @param evictOnClose whether to evict blocks belonging to this file 535 * @throws IOException 536 */ 537 public synchronized void closeStoreFile(boolean evictOnClose) throws IOException { 538 if (this.reader != null) { 539 this.reader.close(evictOnClose); 540 this.reader = null; 541 } 542 closeStreamReaders(evictOnClose); 543 } 544 545 public void closeStreamReaders(boolean evictOnClose) throws IOException { 546 synchronized (this) { 547 for (StoreFileReader entry : streamReaders) { 548 //closing the reader will remove itself from streamReaders thanks to the Listener 549 entry.close(evictOnClose); 550 } 551 int size = streamReaders.size(); 552 Preconditions.checkState(size == 0, 553 "There are still streamReaders post close: " + size); 554 } 555 } 556 557 /** 558 * Delete this file 559 * @throws IOException 560 */ 561 public void deleteStoreFile() throws IOException { 562 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 563 closeStoreFile(evictOnClose); 564 this.fs.delete(getPath(), true); 565 } 566 567 public void markCompactedAway() { 568 this.compactedAway = true; 569 } 570 571 @Override 572 public String toString() { 573 return this.fileInfo.toString(); 574 } 575 576 @Override 577 public String toStringDetailed() { 578 StringBuilder sb = new StringBuilder(); 579 sb.append(this.getPath().toString()); 580 sb.append(", isReference=").append(isReference()); 581 sb.append(", isBulkLoadResult=").append(isBulkLoadResult()); 582 if (isBulkLoadResult()) { 583 sb.append(", bulkLoadTS="); 584 OptionalLong bulkLoadTS = getBulkLoadTimestamp(); 585 if (bulkLoadTS.isPresent()) { 586 sb.append(bulkLoadTS.getAsLong()); 587 } else { 588 sb.append("NotPresent"); 589 } 590 } else { 591 sb.append(", seqid=").append(getMaxSequenceId()); 592 } 593 sb.append(", majorCompaction=").append(isMajorCompactionResult()); 594 595 return sb.toString(); 596 } 597 598 /** 599 * Gets whether to skip resetting the sequence id for cells. 600 * @param skipResetSeqId The byte array of boolean. 601 * @return Whether to skip resetting the sequence id. 602 */ 603 private boolean isSkipResetSeqId(byte[] skipResetSeqId) { 604 if (skipResetSeqId != null && skipResetSeqId.length == 1) { 605 return Bytes.toBoolean(skipResetSeqId); 606 } 607 return false; 608 } 609 610 @Override 611 public OptionalLong getMinimumTimestamp() { 612 TimeRange tr = getReader().timeRange; 613 return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty(); 614 } 615 616 @Override 617 public OptionalLong getMaximumTimestamp() { 618 TimeRange tr = getReader().timeRange; 619 return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty(); 620 } 621 622 @Override 623 public void storeFileReaderClosed(StoreFileReader reader) { 624 streamReaders.remove(reader); 625 } 626}