001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.UnsupportedEncodingException; 023import java.net.URLEncoder; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Map; 027import java.util.Optional; 028import java.util.OptionalLong; 029import java.util.Set; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellComparator; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HDFSBlocksDistribution; 040import org.apache.hadoop.hbase.io.TimeRange; 041import org.apache.hadoop.hbase.io.hfile.BlockType; 042import org.apache.hadoop.hbase.io.hfile.CacheConfig; 043import org.apache.hadoop.hbase.io.hfile.HFile; 044import org.apache.hadoop.hbase.util.BloomFilterFactory; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053 054/** 055 * A Store data file. Stores usually have one or more of these files. They 056 * are produced by flushing the memstore to disk. To 057 * create, instantiate a writer using {@link StoreFileWriter.Builder} 058 * and append data. Be sure to add any metadata before calling close on the 059 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile 060 * is sitting in the Filesystem. To refer to it, create a StoreFile instance 061 * passing filesystem and path. To read, call {@link #initReader()} 062 * <p>StoreFiles may also reference store files in another Store. 063 * 064 * The reason for this weird pattern where you use a different instance for the 065 * writer and a reader is that we write once but read a lot more. 066 */ 067@InterfaceAudience.Private 068public class HStoreFile implements StoreFile { 069 070 private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName()); 071 072 public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead"; 073 074 private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false; 075 076 // Keys for fileinfo values in HFile 077 078 /** Max Sequence ID in FileInfo */ 079 public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); 080 081 /** Major compaction flag in FileInfo */ 082 public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); 083 084 /** Minor compaction flag in FileInfo */ 085 public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY = 086 Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); 087 088 /** 089 * Key for compaction event which contains the compacted storefiles in FileInfo 090 */ 091 public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY"); 092 093 /** Bloom filter Type in FileInfo */ 094 public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); 095 096 /** Bloom filter param in FileInfo */ 097 public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM"); 098 099 /** Delete Family Count in FileInfo */ 100 public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); 101 102 /** Last Bloom filter key in FileInfo */ 103 public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); 104 105 /** Key for Timerange information in metadata */ 106 public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); 107 108 /** Key for timestamp of earliest-put in metadata */ 109 public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); 110 111 /** Key for the number of mob cells in metadata */ 112 public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); 113 114 /** Meta key set when store file is a result of a bulk load */ 115 public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK"); 116 public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); 117 118 /** 119 * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets 120 * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped. 121 */ 122 public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); 123 124 private final StoreFileInfo fileInfo; 125 private final FileSystem fs; 126 127 // Block cache configuration and reference. 128 private final CacheConfig cacheConf; 129 130 // Counter that is incremented every time a scanner is created on the 131 // store file. It is decremented when the scan on the store file is 132 // done. 133 private final AtomicInteger refCount = new AtomicInteger(0); 134 135 private final boolean noReadahead; 136 137 private final boolean primaryReplica; 138 139 // Indicates if the file got compacted 140 private volatile boolean compactedAway = false; 141 142 // Keys for metadata stored in backing HFile. 143 // Set when we obtain a Reader. 144 private long sequenceid = -1; 145 146 // max of the MemstoreTS in the KV's in this store 147 // Set when we obtain a Reader. 148 private long maxMemstoreTS = -1; 149 150 // firstKey, lastkey and cellComparator will be set when openReader. 151 private Optional<Cell> firstKey; 152 153 private Optional<Cell> lastKey; 154 155 private CellComparator comparator; 156 157 public CacheConfig getCacheConf() { 158 return cacheConf; 159 } 160 161 @Override 162 public Optional<Cell> getFirstKey() { 163 return firstKey; 164 } 165 166 @Override 167 public Optional<Cell> getLastKey() { 168 return lastKey; 169 } 170 171 @Override 172 public CellComparator getComparator() { 173 return comparator; 174 } 175 176 @Override 177 public long getMaxMemStoreTS() { 178 return maxMemstoreTS; 179 } 180 181 // If true, this file was product of a major compaction. Its then set 182 // whenever you get a Reader. 183 private AtomicBoolean majorCompaction = null; 184 185 // If true, this file should not be included in minor compactions. 186 // It's set whenever you get a Reader. 187 private boolean excludeFromMinorCompaction = false; 188 189 // This file was product of these compacted store files 190 private final Set<String> compactedStoreFiles = new HashSet<>(); 191 192 /** 193 * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened 194 * after which it is not modified again. 195 */ 196 private Map<byte[], byte[]> metadataMap; 197 198 // StoreFile.Reader 199 private volatile StoreFileReader reader; 200 201 /** 202 * Bloom filter type specified in column family configuration. Does not 203 * necessarily correspond to the Bloom filter type present in the HFile. 204 */ 205 private final BloomType cfBloomType; 206 207 /** 208 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 209 * depending on the underlying files (10-20MB?). 210 * @param fs The current file system to use. 211 * @param p The path of the file. 212 * @param conf The current configuration. 213 * @param cacheConf The cache configuration and block cache reference. 214 * @param cfBloomType The bloom type to use for this store file as specified by column family 215 * configuration. This may or may not be the same as the Bloom filter type actually 216 * present in the HFile, because column family configuration might change. If this is 217 * {@link BloomType#NONE}, the existing Bloom filter is ignored. 218 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 219 * @throws IOException 220 */ 221 public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf, 222 BloomType cfBloomType, boolean primaryReplica) throws IOException { 223 this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica); 224 } 225 226 /** 227 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 228 * depending on the underlying files (10-20MB?). 229 * @param fs fs The current file system to use. 230 * @param fileInfo The store file information. 231 * @param conf The current configuration. 232 * @param cacheConf The cache configuration and block cache reference. 233 * @param cfBloomType The bloom type to use for this store file as specified by column 234 * family configuration. This may or may not be the same as the Bloom filter type 235 * actually present in the HFile, because column family configuration might change. If 236 * this is {@link BloomType#NONE}, the existing Bloom filter is ignored. 237 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 238 */ 239 public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf, 240 BloomType cfBloomType, boolean primaryReplica) { 241 this.fs = fs; 242 this.fileInfo = fileInfo; 243 this.cacheConf = cacheConf; 244 this.noReadahead = 245 conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD); 246 if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { 247 this.cfBloomType = cfBloomType; 248 } else { 249 LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" + 250 cfBloomType + " (disabled in config)"); 251 this.cfBloomType = BloomType.NONE; 252 } 253 this.primaryReplica = primaryReplica; 254 } 255 256 /** 257 * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a 258 * reference. 259 */ 260 public StoreFileInfo getFileInfo() { 261 return this.fileInfo; 262 } 263 264 @Override 265 public Path getPath() { 266 return this.fileInfo.getPath(); 267 } 268 269 @Override 270 public Path getEncodedPath() { 271 try { 272 return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING)); 273 } catch (UnsupportedEncodingException ex) { 274 throw new RuntimeException("URLEncoder doesn't support UTF-8", ex); 275 } 276 } 277 278 @Override 279 public Path getQualifiedPath() { 280 return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory()); 281 } 282 283 @Override 284 public boolean isReference() { 285 return this.fileInfo.isReference(); 286 } 287 288 @Override 289 public boolean isHFile() { 290 return StoreFileInfo.isHFile(this.fileInfo.getPath()); 291 } 292 293 @Override 294 public boolean isMajorCompactionResult() { 295 if (this.majorCompaction == null) { 296 throw new NullPointerException("This has not been set yet"); 297 } 298 return this.majorCompaction.get(); 299 } 300 301 @Override 302 public boolean excludeFromMinorCompaction() { 303 return this.excludeFromMinorCompaction; 304 } 305 306 @Override 307 public long getMaxSequenceId() { 308 return this.sequenceid; 309 } 310 311 @Override 312 public long getModificationTimeStamp() throws IOException { 313 return getModificationTimestamp(); 314 } 315 316 @Override 317 public long getModificationTimestamp() throws IOException { 318 return fileInfo.getModificationTime(); 319 } 320 321 /** 322 * Only used by the Striped Compaction Policy 323 * @param key 324 * @return value associated with the metadata key 325 */ 326 public byte[] getMetadataValue(byte[] key) { 327 return metadataMap.get(key); 328 } 329 330 @Override 331 public boolean isBulkLoadResult() { 332 boolean bulkLoadedHFile = false; 333 String fileName = this.getPath().getName(); 334 int startPos = fileName.indexOf("SeqId_"); 335 if (startPos != -1) { 336 bulkLoadedHFile = true; 337 } 338 return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); 339 } 340 341 public boolean isCompactedAway() { 342 return compactedAway; 343 } 344 345 @VisibleForTesting 346 public int getRefCount() { 347 return refCount.get(); 348 } 349 350 /** 351 * @return true if the file is still used in reads 352 */ 353 public boolean isReferencedInReads() { 354 int rc = refCount.get(); 355 assert rc >= 0; // we should not go negative. 356 return rc > 0; 357 } 358 359 @Override 360 public OptionalLong getBulkLoadTimestamp() { 361 byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); 362 return bulkLoadTimestamp == null ? OptionalLong.empty() 363 : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp)); 364 } 365 366 /** 367 * @return the cached value of HDFS blocks distribution. The cached value is calculated when store 368 * file is opened. 369 */ 370 public HDFSBlocksDistribution getHDFSBlockDistribution() { 371 return this.fileInfo.getHDFSBlockDistribution(); 372 } 373 374 /** 375 * Opens reader on this store file. Called by Constructor. 376 * @see #closeStoreFile(boolean) 377 */ 378 private void open() throws IOException { 379 if (this.reader != null) { 380 throw new IllegalAccessError("Already open"); 381 } 382 383 // Open the StoreFile.Reader 384 this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L, 385 primaryReplica, refCount, true); 386 387 // Load up indices and fileinfo. This also loads Bloom filter type. 388 metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); 389 390 // Read in our metadata. 391 byte [] b = metadataMap.get(MAX_SEQ_ID_KEY); 392 if (b != null) { 393 // By convention, if halfhfile, top half has a sequence number > bottom 394 // half. Thats why we add one in below. Its done for case the two halves 395 // are ever merged back together --rare. Without it, on open of store, 396 // since store files are distinguished by sequence id, the one half would 397 // subsume the other. 398 this.sequenceid = Bytes.toLong(b); 399 if (fileInfo.isTopReference()) { 400 this.sequenceid += 1; 401 } 402 } 403 404 if (isBulkLoadResult()){ 405 // generate the sequenceId from the fileName 406 // fileName is of the form <randomName>_SeqId_<id-when-loaded>_ 407 String fileName = this.getPath().getName(); 408 // Use lastIndexOf() to get the last, most recent bulk load seqId. 409 int startPos = fileName.lastIndexOf("SeqId_"); 410 if (startPos != -1) { 411 this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, 412 fileName.indexOf('_', startPos + 6))); 413 // Handle reference files as done above. 414 if (fileInfo.isTopReference()) { 415 this.sequenceid += 1; 416 } 417 } 418 // SKIP_RESET_SEQ_ID only works in bulk loaded file. 419 // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk 420 // loaded to hbase, these cells have the same seqIds with the old ones. We do not want 421 // to reset new seqIds for them since this might make a mess of the visibility of cells that 422 // have the same row key but different seqIds. 423 boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)); 424 if (skipResetSeqId) { 425 // increase the seqId when it is a bulk loaded file from mob compaction. 426 this.sequenceid += 1; 427 } 428 this.reader.setSkipResetSeqId(skipResetSeqId); 429 this.reader.setBulkLoaded(true); 430 } 431 this.reader.setSequenceID(this.sequenceid); 432 433 b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 434 if (b != null) { 435 this.maxMemstoreTS = Bytes.toLong(b); 436 } 437 438 b = metadataMap.get(MAJOR_COMPACTION_KEY); 439 if (b != null) { 440 boolean mc = Bytes.toBoolean(b); 441 if (this.majorCompaction == null) { 442 this.majorCompaction = new AtomicBoolean(mc); 443 } else { 444 this.majorCompaction.set(mc); 445 } 446 } else { 447 // Presume it is not major compacted if it doesn't explicity say so 448 // HFileOutputFormat explicitly sets the major compacted key. 449 this.majorCompaction = new AtomicBoolean(false); 450 } 451 452 b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY); 453 this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b)); 454 455 BloomType hfileBloomType = reader.getBloomFilterType(); 456 if (cfBloomType != BloomType.NONE) { 457 reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META); 458 if (hfileBloomType != cfBloomType) { 459 LOG.info("HFile Bloom filter type for " 460 + reader.getHFileReader().getName() + ": " + hfileBloomType 461 + ", but " + cfBloomType + " specified in column family " 462 + "configuration"); 463 } 464 } else if (hfileBloomType != BloomType.NONE) { 465 LOG.info("Bloom filter turned off by CF config for " 466 + reader.getHFileReader().getName()); 467 } 468 469 // load delete family bloom filter 470 reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META); 471 472 try { 473 byte[] data = metadataMap.get(TIMERANGE_KEY); 474 this.reader.timeRange = data == null ? null : TimeRangeTracker.parseFrom(data).toTimeRange(); 475 } catch (IllegalArgumentException e) { 476 LOG.error("Error reading timestamp range data from meta -- " + 477 "proceeding without", e); 478 this.reader.timeRange = null; 479 } 480 481 try { 482 byte[] data = metadataMap.get(COMPACTION_EVENT_KEY); 483 this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data)); 484 } catch (IOException e) { 485 LOG.error("Error reading compacted storefiles from meta data", e); 486 } 487 488 // initialize so we can reuse them after reader closed. 489 firstKey = reader.getFirstKey(); 490 lastKey = reader.getLastKey(); 491 comparator = reader.getComparator(); 492 } 493 494 /** 495 * Initialize the reader used for pread. 496 */ 497 public void initReader() throws IOException { 498 if (reader == null) { 499 try { 500 open(); 501 } catch (Exception e) { 502 try { 503 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 504 this.closeStoreFile(evictOnClose); 505 } catch (IOException ee) { 506 LOG.warn("failed to close reader", ee); 507 } 508 throw e; 509 } 510 } 511 } 512 513 private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException { 514 initReader(); 515 StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L, 516 primaryReplica, refCount, false); 517 reader.copyFields(this.reader); 518 return reader; 519 } 520 521 /** 522 * Get a scanner which uses pread. 523 * <p> 524 * Must be called after initReader. 525 */ 526 public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, 527 boolean canOptimizeForNonNullColumn) { 528 return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, 529 canOptimizeForNonNullColumn); 530 } 531 532 /** 533 * Get a scanner which uses streaming read. 534 * <p> 535 * Must be called after initReader. 536 */ 537 public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, 538 boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) 539 throws IOException { 540 return createStreamReader(canUseDropBehind) 541 .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder, 542 canOptimizeForNonNullColumn); 543 } 544 545 /** 546 * @return Current reader. Must call initReader first else returns null. 547 * @see #initReader() 548 */ 549 public StoreFileReader getReader() { 550 return this.reader; 551 } 552 553 /** 554 * @param evictOnClose whether to evict blocks belonging to this file 555 * @throws IOException 556 */ 557 public synchronized void closeStoreFile(boolean evictOnClose) throws IOException { 558 if (this.reader != null) { 559 this.reader.close(evictOnClose); 560 this.reader = null; 561 } 562 } 563 564 /** 565 * Delete this file 566 * @throws IOException 567 */ 568 public void deleteStoreFile() throws IOException { 569 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 570 closeStoreFile(evictOnClose); 571 this.fs.delete(getPath(), true); 572 } 573 574 public void markCompactedAway() { 575 this.compactedAway = true; 576 } 577 578 @Override 579 public String toString() { 580 return this.fileInfo.toString(); 581 } 582 583 @Override 584 public String toStringDetailed() { 585 StringBuilder sb = new StringBuilder(); 586 sb.append(this.getPath().toString()); 587 sb.append(", isReference=").append(isReference()); 588 sb.append(", isBulkLoadResult=").append(isBulkLoadResult()); 589 if (isBulkLoadResult()) { 590 sb.append(", bulkLoadTS="); 591 OptionalLong bulkLoadTS = getBulkLoadTimestamp(); 592 if (bulkLoadTS.isPresent()) { 593 sb.append(bulkLoadTS.getAsLong()); 594 } else { 595 sb.append("NotPresent"); 596 } 597 } else { 598 sb.append(", seqid=").append(getMaxSequenceId()); 599 } 600 sb.append(", majorCompaction=").append(isMajorCompactionResult()); 601 602 return sb.toString(); 603 } 604 605 /** 606 * Gets whether to skip resetting the sequence id for cells. 607 * @param skipResetSeqId The byte array of boolean. 608 * @return Whether to skip resetting the sequence id. 609 */ 610 private boolean isSkipResetSeqId(byte[] skipResetSeqId) { 611 if (skipResetSeqId != null && skipResetSeqId.length == 1) { 612 return Bytes.toBoolean(skipResetSeqId); 613 } 614 return false; 615 } 616 617 @Override 618 public OptionalLong getMinimumTimestamp() { 619 TimeRange tr = getReader().timeRange; 620 return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty(); 621 } 622 623 @Override 624 public OptionalLong getMaximumTimestamp() { 625 TimeRange tr = getReader().timeRange; 626 return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty(); 627 } 628 629 Set<String> getCompactedStoreFiles() { 630 return Collections.unmodifiableSet(this.compactedStoreFiles); 631 } 632}