001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.UnsupportedEncodingException; 023import java.net.URLEncoder; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Map; 027import java.util.Optional; 028import java.util.OptionalLong; 029import java.util.Set; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HDFSBlocksDistribution; 038import org.apache.hadoop.hbase.io.TimeRange; 039import org.apache.hadoop.hbase.io.hfile.BlockType; 040import org.apache.hadoop.hbase.io.hfile.CacheConfig; 041import org.apache.hadoop.hbase.io.hfile.HFile; 042import org.apache.hadoop.hbase.io.hfile.ReaderContext; 043import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; 044import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 045import org.apache.hadoop.hbase.util.BloomFilterFactory; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 052 053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 054 055/** 056 * A Store data file. Stores usually have one or more of these files. They 057 * are produced by flushing the memstore to disk. To 058 * create, instantiate a writer using {@link StoreFileWriter.Builder} 059 * and append data. Be sure to add any metadata before calling close on the 060 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile 061 * is sitting in the Filesystem. To refer to it, create a StoreFile instance 062 * passing filesystem and path. To read, call {@link #initReader()} 063 * <p>StoreFiles may also reference store files in another Store. 064 * 065 * The reason for this weird pattern where you use a different instance for the 066 * writer and a reader is that we write once but read a lot more. 067 */ 068@InterfaceAudience.Private 069public class HStoreFile implements StoreFile { 070 071 private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName()); 072 073 // Keys for fileinfo values in HFile 074 075 /** Max Sequence ID in FileInfo */ 076 public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); 077 078 /** Major compaction flag in FileInfo */ 079 public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); 080 081 /** Minor compaction flag in FileInfo */ 082 public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY = 083 Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); 084 085 /** 086 * Key for compaction event which contains the compacted storefiles in FileInfo 087 */ 088 public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY"); 089 090 /** Bloom filter Type in FileInfo */ 091 public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); 092 093 /** Bloom filter param in FileInfo */ 094 public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM"); 095 096 /** Delete Family Count in FileInfo */ 097 public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); 098 099 /** Last Bloom filter key in FileInfo */ 100 public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); 101 102 /** Key for Timerange information in metadata */ 103 public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); 104 105 /** Key for timestamp of earliest-put in metadata */ 106 public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); 107 108 /** Key for the number of mob cells in metadata */ 109 public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); 110 111 /** Meta key set when store file is a result of a bulk load */ 112 public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK"); 113 public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); 114 115 /** 116 * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets 117 * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped. 118 */ 119 public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); 120 121 private final StoreFileInfo fileInfo; 122 123 // StoreFile.Reader 124 private volatile StoreFileReader initialReader; 125 126 // Block cache configuration and reference. 127 private final CacheConfig cacheConf; 128 129 // Indicates if the file got compacted 130 private volatile boolean compactedAway = false; 131 132 // Keys for metadata stored in backing HFile. 133 // Set when we obtain a Reader. 134 private long sequenceid = -1; 135 136 // max of the MemstoreTS in the KV's in this store 137 // Set when we obtain a Reader. 138 private long maxMemstoreTS = -1; 139 140 // firstKey, lastkey and cellComparator will be set when openReader. 141 private Optional<Cell> firstKey; 142 143 private Optional<Cell> lastKey; 144 145 private CellComparator comparator; 146 147 public CacheConfig getCacheConf() { 148 return this.cacheConf; 149 } 150 151 @Override 152 public Optional<Cell> getFirstKey() { 153 return firstKey; 154 } 155 156 @Override 157 public Optional<Cell> getLastKey() { 158 return lastKey; 159 } 160 161 @Override 162 public CellComparator getComparator() { 163 return comparator; 164 } 165 166 @Override 167 public long getMaxMemStoreTS() { 168 return maxMemstoreTS; 169 } 170 171 // If true, this file was product of a major compaction. Its then set 172 // whenever you get a Reader. 173 private AtomicBoolean majorCompaction = null; 174 175 // If true, this file should not be included in minor compactions. 176 // It's set whenever you get a Reader. 177 private boolean excludeFromMinorCompaction = false; 178 179 // This file was product of these compacted store files 180 private final Set<String> compactedStoreFiles = new HashSet<>(); 181 182 /** 183 * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened 184 * after which it is not modified again. 185 */ 186 private Map<byte[], byte[]> metadataMap; 187 188 /** 189 * Bloom filter type specified in column family configuration. Does not 190 * necessarily correspond to the Bloom filter type present in the HFile. 191 */ 192 private final BloomType cfBloomType; 193 194 /** 195 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 196 * depending on the underlying files (10-20MB?). 197 * @param fs The current file system to use. 198 * @param p The path of the file. 199 * @param conf The current configuration. 200 * @param cacheConf The cache configuration and block cache reference. 201 * @param cfBloomType The bloom type to use for this store file as specified by column family 202 * configuration. This may or may not be the same as the Bloom filter type actually 203 * present in the HFile, because column family configuration might change. If this is 204 * {@link BloomType#NONE}, the existing Bloom filter is ignored. 205 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 206 * @throws IOException 207 */ 208 public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf, 209 BloomType cfBloomType, boolean primaryReplica) throws IOException { 210 this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf); 211 } 212 213 /** 214 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 215 * depending on the underlying files (10-20MB?). 216 * @param fileInfo The store file information. 217 * @param cfBloomType The bloom type to use for this store file as specified by column 218 * family configuration. This may or may not be the same as the Bloom filter type 219 * actually present in the HFile, because column family configuration might change. If 220 * this is {@link BloomType#NONE}, the existing Bloom filter is ignored. 221 * @param cacheConf The cache configuration and block cache reference. 222 */ 223 public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) { 224 this.fileInfo = fileInfo; 225 this.cacheConf = cacheConf; 226 if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) { 227 this.cfBloomType = cfBloomType; 228 } else { 229 LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" + 230 cfBloomType + " (disabled in config)"); 231 this.cfBloomType = BloomType.NONE; 232 } 233 } 234 235 /** 236 * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a 237 * reference. 238 */ 239 public StoreFileInfo getFileInfo() { 240 return this.fileInfo; 241 } 242 243 @Override 244 public Path getPath() { 245 return this.fileInfo.getPath(); 246 } 247 248 @Override 249 public Path getEncodedPath() { 250 try { 251 return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING)); 252 } catch (UnsupportedEncodingException ex) { 253 throw new RuntimeException("URLEncoder doesn't support UTF-8", ex); 254 } 255 } 256 257 @Override 258 public Path getQualifiedPath() { 259 FileSystem fs = fileInfo.getFileSystem(); 260 return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory()); 261 } 262 263 @Override 264 public boolean isReference() { 265 return this.fileInfo.isReference(); 266 } 267 268 @Override 269 public boolean isHFile() { 270 return StoreFileInfo.isHFile(this.fileInfo.getPath()); 271 } 272 273 @Override 274 public boolean isMajorCompactionResult() { 275 if (this.majorCompaction == null) { 276 throw new NullPointerException("This has not been set yet"); 277 } 278 return this.majorCompaction.get(); 279 } 280 281 @Override 282 public boolean excludeFromMinorCompaction() { 283 return this.excludeFromMinorCompaction; 284 } 285 286 @Override 287 public long getMaxSequenceId() { 288 return this.sequenceid; 289 } 290 291 @Override 292 public long getModificationTimeStamp() throws IOException { 293 return getModificationTimestamp(); 294 } 295 296 @Override 297 public long getModificationTimestamp() throws IOException { 298 return fileInfo.getModificationTime(); 299 } 300 301 /** 302 * Only used by the Striped Compaction Policy 303 * @param key 304 * @return value associated with the metadata key 305 */ 306 public byte[] getMetadataValue(byte[] key) { 307 return metadataMap.get(key); 308 } 309 310 @Override 311 public boolean isBulkLoadResult() { 312 boolean bulkLoadedHFile = false; 313 String fileName = this.getPath().getName(); 314 int startPos = fileName.indexOf("SeqId_"); 315 if (startPos != -1) { 316 bulkLoadedHFile = true; 317 } 318 return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); 319 } 320 321 public boolean isCompactedAway() { 322 return compactedAway; 323 } 324 325 public int getRefCount() { 326 return fileInfo.refCount.get(); 327 } 328 329 /** 330 * @return true if the file is still used in reads 331 */ 332 public boolean isReferencedInReads() { 333 int rc = fileInfo.refCount.get(); 334 assert rc >= 0; // we should not go negative. 335 return rc > 0; 336 } 337 338 @Override 339 public OptionalLong getBulkLoadTimestamp() { 340 byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); 341 return bulkLoadTimestamp == null ? OptionalLong.empty() 342 : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp)); 343 } 344 345 /** 346 * @return the cached value of HDFS blocks distribution. The cached value is calculated when store 347 * file is opened. 348 */ 349 public HDFSBlocksDistribution getHDFSBlockDistribution() { 350 return this.fileInfo.getHDFSBlockDistribution(); 351 } 352 353 /** 354 * Opens reader on this store file. Called by Constructor. 355 * @see #closeStoreFile(boolean) 356 */ 357 private void open() throws IOException { 358 fileInfo.initHDFSBlocksDistribution(); 359 long readahead = fileInfo.isNoReadahead() ? 0L : -1L; 360 ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD); 361 fileInfo.initHFileInfo(context); 362 StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf); 363 if (reader == null) { 364 reader = fileInfo.createReader(context, cacheConf); 365 fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader()); 366 } 367 this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader); 368 // Load up indices and fileinfo. This also loads Bloom filter type. 369 metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo()); 370 371 // Read in our metadata. 372 byte [] b = metadataMap.get(MAX_SEQ_ID_KEY); 373 if (b != null) { 374 // By convention, if halfhfile, top half has a sequence number > bottom 375 // half. Thats why we add one in below. Its done for case the two halves 376 // are ever merged back together --rare. Without it, on open of store, 377 // since store files are distinguished by sequence id, the one half would 378 // subsume the other. 379 this.sequenceid = Bytes.toLong(b); 380 if (fileInfo.isTopReference()) { 381 this.sequenceid += 1; 382 } 383 } 384 385 if (isBulkLoadResult()){ 386 // generate the sequenceId from the fileName 387 // fileName is of the form <randomName>_SeqId_<id-when-loaded>_ 388 String fileName = this.getPath().getName(); 389 // Use lastIndexOf() to get the last, most recent bulk load seqId. 390 int startPos = fileName.lastIndexOf("SeqId_"); 391 if (startPos != -1) { 392 this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, 393 fileName.indexOf('_', startPos + 6))); 394 // Handle reference files as done above. 395 if (fileInfo.isTopReference()) { 396 this.sequenceid += 1; 397 } 398 } 399 // SKIP_RESET_SEQ_ID only works in bulk loaded file. 400 // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk 401 // loaded to hbase, these cells have the same seqIds with the old ones. We do not want 402 // to reset new seqIds for them since this might make a mess of the visibility of cells that 403 // have the same row key but different seqIds. 404 boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)); 405 if (skipResetSeqId) { 406 // increase the seqId when it is a bulk loaded file from mob compaction. 407 this.sequenceid += 1; 408 } 409 initialReader.setSkipResetSeqId(skipResetSeqId); 410 initialReader.setBulkLoaded(true); 411 } 412 initialReader.setSequenceID(this.sequenceid); 413 414 b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 415 if (b != null) { 416 this.maxMemstoreTS = Bytes.toLong(b); 417 } 418 419 b = metadataMap.get(MAJOR_COMPACTION_KEY); 420 if (b != null) { 421 boolean mc = Bytes.toBoolean(b); 422 if (this.majorCompaction == null) { 423 this.majorCompaction = new AtomicBoolean(mc); 424 } else { 425 this.majorCompaction.set(mc); 426 } 427 } else { 428 // Presume it is not major compacted if it doesn't explicity say so 429 // HFileOutputFormat explicitly sets the major compacted key. 430 this.majorCompaction = new AtomicBoolean(false); 431 } 432 433 b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY); 434 this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b)); 435 436 BloomType hfileBloomType = initialReader.getBloomFilterType(); 437 if (cfBloomType != BloomType.NONE) { 438 initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META); 439 if (hfileBloomType != cfBloomType) { 440 LOG.debug("HFile Bloom filter type for " 441 + initialReader.getHFileReader().getName() + ": " + hfileBloomType 442 + ", but " + cfBloomType + " specified in column family " 443 + "configuration"); 444 } 445 } else if (hfileBloomType != BloomType.NONE) { 446 LOG.info("Bloom filter turned off by CF config for " 447 + initialReader.getHFileReader().getName()); 448 } 449 450 // load delete family bloom filter 451 initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META); 452 453 try { 454 byte[] data = metadataMap.get(TIMERANGE_KEY); 455 initialReader.timeRange = data == null ? null : 456 TimeRangeTracker.parseFrom(data).toTimeRange(); 457 } catch (IllegalArgumentException e) { 458 LOG.error("Error reading timestamp range data from meta -- " + 459 "proceeding without", e); 460 this.initialReader.timeRange = null; 461 } 462 463 try { 464 byte[] data = metadataMap.get(COMPACTION_EVENT_KEY); 465 this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data)); 466 } catch (IOException e) { 467 LOG.error("Error reading compacted storefiles from meta data", e); 468 } 469 470 // initialize so we can reuse them after reader closed. 471 firstKey = initialReader.getFirstKey(); 472 lastKey = initialReader.getLastKey(); 473 comparator = initialReader.getComparator(); 474 } 475 476 /** 477 * Initialize the reader used for pread. 478 */ 479 public void initReader() throws IOException { 480 if (initialReader == null) { 481 synchronized (this) { 482 if (initialReader == null) { 483 try { 484 open(); 485 } catch (Exception e) { 486 try { 487 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 488 this.closeStoreFile(evictOnClose); 489 } catch (IOException ee) { 490 LOG.warn("failed to close reader", ee); 491 } 492 throw e; 493 } 494 } 495 } 496 } 497 } 498 499 private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException { 500 initReader(); 501 final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction(); 502 ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM); 503 StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf); 504 if (reader == null) { 505 reader = fileInfo.createReader(context, cacheConf); 506 // steam reader need copy stuffs from pread reader 507 reader.copyFields(initialReader); 508 } 509 return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader); 510 } 511 512 /** 513 * Get a scanner which uses pread. 514 * <p> 515 * Must be called after initReader. 516 */ 517 public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, 518 boolean canOptimizeForNonNullColumn) { 519 return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, 520 canOptimizeForNonNullColumn); 521 } 522 523 /** 524 * Get a scanner which uses streaming read. 525 * <p> 526 * Must be called after initReader. 527 */ 528 public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, 529 boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) 530 throws IOException { 531 return createStreamReader(canUseDropBehind) 532 .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder, 533 canOptimizeForNonNullColumn); 534 } 535 536 /** 537 * @return Current reader. Must call initReader first else returns null. 538 * @see #initReader() 539 */ 540 public StoreFileReader getReader() { 541 return this.initialReader; 542 } 543 544 /** 545 * @param evictOnClose whether to evict blocks belonging to this file 546 * @throws IOException 547 */ 548 public synchronized void closeStoreFile(boolean evictOnClose) throws IOException { 549 if (this.initialReader != null) { 550 this.initialReader.close(evictOnClose); 551 this.initialReader = null; 552 } 553 } 554 555 /** 556 * Delete this file 557 * @throws IOException 558 */ 559 public void deleteStoreFile() throws IOException { 560 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 561 closeStoreFile(evictOnClose); 562 this.fileInfo.getFileSystem().delete(getPath(), true); 563 } 564 565 public void markCompactedAway() { 566 this.compactedAway = true; 567 } 568 569 @Override 570 public String toString() { 571 return this.fileInfo.toString(); 572 } 573 574 @Override 575 public String toStringDetailed() { 576 StringBuilder sb = new StringBuilder(); 577 sb.append(this.getPath().toString()); 578 sb.append(", isReference=").append(isReference()); 579 sb.append(", isBulkLoadResult=").append(isBulkLoadResult()); 580 if (isBulkLoadResult()) { 581 sb.append(", bulkLoadTS="); 582 OptionalLong bulkLoadTS = getBulkLoadTimestamp(); 583 if (bulkLoadTS.isPresent()) { 584 sb.append(bulkLoadTS.getAsLong()); 585 } else { 586 sb.append("NotPresent"); 587 } 588 } else { 589 sb.append(", seqid=").append(getMaxSequenceId()); 590 } 591 sb.append(", majorCompaction=").append(isMajorCompactionResult()); 592 593 return sb.toString(); 594 } 595 596 /** 597 * Gets whether to skip resetting the sequence id for cells. 598 * @param skipResetSeqId The byte array of boolean. 599 * @return Whether to skip resetting the sequence id. 600 */ 601 private boolean isSkipResetSeqId(byte[] skipResetSeqId) { 602 if (skipResetSeqId != null && skipResetSeqId.length == 1) { 603 return Bytes.toBoolean(skipResetSeqId); 604 } 605 return false; 606 } 607 608 @Override 609 public OptionalLong getMinimumTimestamp() { 610 TimeRange tr = getReader().timeRange; 611 return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty(); 612 } 613 614 @Override 615 public OptionalLong getMaximumTimestamp() { 616 TimeRange tr = getReader().timeRange; 617 return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty(); 618 } 619 620 Set<String> getCompactedStoreFiles() { 621 return Collections.unmodifiableSet(this.compactedStoreFiles); 622 } 623}