001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.UnsupportedEncodingException; 023import java.net.URLEncoder; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Map; 027import java.util.Optional; 028import java.util.OptionalLong; 029import java.util.Set; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HDFSBlocksDistribution; 038import org.apache.hadoop.hbase.io.TimeRange; 039import org.apache.hadoop.hbase.io.hfile.BlockType; 040import org.apache.hadoop.hbase.io.hfile.CacheConfig; 041import org.apache.hadoop.hbase.io.hfile.HFile; 042import org.apache.hadoop.hbase.io.hfile.ReaderContext; 043import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; 044import org.apache.hadoop.hbase.util.BloomFilterFactory; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 051 052/** 053 * A Store data file. Stores usually have one or more of these files. They 054 * are produced by flushing the memstore to disk. To 055 * create, instantiate a writer using {@link StoreFileWriter.Builder} 056 * and append data. Be sure to add any metadata before calling close on the 057 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile 058 * is sitting in the Filesystem. To refer to it, create a StoreFile instance 059 * passing filesystem and path. To read, call {@link #initReader()} 060 * <p>StoreFiles may also reference store files in another Store. 061 * 062 * The reason for this weird pattern where you use a different instance for the 063 * writer and a reader is that we write once but read a lot more. 064 */ 065@InterfaceAudience.Private 066public class HStoreFile implements StoreFile { 067 068 private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName()); 069 070 // Keys for fileinfo values in HFile 071 072 /** Max Sequence ID in FileInfo */ 073 public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); 074 075 /** Major compaction flag in FileInfo */ 076 public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); 077 078 /** Minor compaction flag in FileInfo */ 079 public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY = 080 Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); 081 082 /** 083 * Key for compaction event which contains the compacted storefiles in FileInfo 084 */ 085 public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY"); 086 087 /** Bloom filter Type in FileInfo */ 088 public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); 089 090 /** Bloom filter param in FileInfo */ 091 public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM"); 092 093 /** Delete Family Count in FileInfo */ 094 public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); 095 096 /** Last Bloom filter key in FileInfo */ 097 public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); 098 099 /** Key for Timerange information in metadata */ 100 public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); 101 102 /** Key for timestamp of earliest-put in metadata */ 103 public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); 104 105 /** Key for the number of mob cells in metadata */ 106 public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); 107 108 /** Meta key set when store file is a result of a bulk load */ 109 public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK"); 110 public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); 111 112 /** 113 * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets 114 * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped. 115 */ 116 public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); 117 118 private final StoreFileInfo fileInfo; 119 120 // StoreFile.Reader 121 private volatile StoreFileReader initialReader; 122 123 // Block cache configuration and reference. 124 private final CacheConfig cacheConf; 125 126 // Indicates if the file got compacted 127 private volatile boolean compactedAway = false; 128 129 // Keys for metadata stored in backing HFile. 130 // Set when we obtain a Reader. 131 private long sequenceid = -1; 132 133 // max of the MemstoreTS in the KV's in this store 134 // Set when we obtain a Reader. 135 private long maxMemstoreTS = -1; 136 137 // firstKey, lastkey and cellComparator will be set when openReader. 138 private Optional<Cell> firstKey; 139 140 private Optional<Cell> lastKey; 141 142 private CellComparator comparator; 143 144 public CacheConfig getCacheConf() { 145 return this.cacheConf; 146 } 147 148 @Override 149 public Optional<Cell> getFirstKey() { 150 return firstKey; 151 } 152 153 @Override 154 public Optional<Cell> getLastKey() { 155 return lastKey; 156 } 157 158 @Override 159 public CellComparator getComparator() { 160 return comparator; 161 } 162 163 @Override 164 public long getMaxMemStoreTS() { 165 return maxMemstoreTS; 166 } 167 168 // If true, this file was product of a major compaction. Its then set 169 // whenever you get a Reader. 170 private AtomicBoolean majorCompaction = null; 171 172 // If true, this file should not be included in minor compactions. 173 // It's set whenever you get a Reader. 174 private boolean excludeFromMinorCompaction = false; 175 176 // This file was product of these compacted store files 177 private final Set<String> compactedStoreFiles = new HashSet<>(); 178 179 /** 180 * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened 181 * after which it is not modified again. 182 */ 183 private Map<byte[], byte[]> metadataMap; 184 185 /** 186 * Bloom filter type specified in column family configuration. Does not 187 * necessarily correspond to the Bloom filter type present in the HFile. 188 */ 189 private final BloomType cfBloomType; 190 191 /** 192 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 193 * depending on the underlying files (10-20MB?). 194 * @param fs The current file system to use. 195 * @param p The path of the file. 196 * @param conf The current configuration. 197 * @param cacheConf The cache configuration and block cache reference. 198 * @param cfBloomType The bloom type to use for this store file as specified by column family 199 * configuration. This may or may not be the same as the Bloom filter type actually 200 * present in the HFile, because column family configuration might change. If this is 201 * {@link BloomType#NONE}, the existing Bloom filter is ignored. 202 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 203 * @throws IOException 204 */ 205 public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf, 206 BloomType cfBloomType, boolean primaryReplica) throws IOException { 207 this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf); 208 } 209 210 /** 211 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 212 * depending on the underlying files (10-20MB?). 213 * @param fileInfo The store file information. 214 * @param cfBloomType The bloom type to use for this store file as specified by column 215 * family configuration. This may or may not be the same as the Bloom filter type 216 * actually present in the HFile, because column family configuration might change. If 217 * this is {@link BloomType#NONE}, the existing Bloom filter is ignored. 218 * @param cacheConf The cache configuration and block cache reference. 219 */ 220 public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) { 221 this.fileInfo = fileInfo; 222 this.cacheConf = cacheConf; 223 if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) { 224 this.cfBloomType = cfBloomType; 225 } else { 226 LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" + 227 cfBloomType + " (disabled in config)"); 228 this.cfBloomType = BloomType.NONE; 229 } 230 } 231 232 /** 233 * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a 234 * reference. 235 */ 236 public StoreFileInfo getFileInfo() { 237 return this.fileInfo; 238 } 239 240 @Override 241 public Path getPath() { 242 return this.fileInfo.getPath(); 243 } 244 245 @Override 246 public Path getEncodedPath() { 247 try { 248 return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING)); 249 } catch (UnsupportedEncodingException ex) { 250 throw new RuntimeException("URLEncoder doesn't support UTF-8", ex); 251 } 252 } 253 254 @Override 255 public Path getQualifiedPath() { 256 FileSystem fs = fileInfo.getFileSystem(); 257 return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory()); 258 } 259 260 @Override 261 public boolean isReference() { 262 return this.fileInfo.isReference(); 263 } 264 265 @Override 266 public boolean isHFile() { 267 return StoreFileInfo.isHFile(this.fileInfo.getPath()); 268 } 269 270 @Override 271 public boolean isMajorCompactionResult() { 272 if (this.majorCompaction == null) { 273 throw new NullPointerException("This has not been set yet"); 274 } 275 return this.majorCompaction.get(); 276 } 277 278 @Override 279 public boolean excludeFromMinorCompaction() { 280 return this.excludeFromMinorCompaction; 281 } 282 283 @Override 284 public long getMaxSequenceId() { 285 return this.sequenceid; 286 } 287 288 @Override 289 public long getModificationTimeStamp() throws IOException { 290 return getModificationTimestamp(); 291 } 292 293 @Override 294 public long getModificationTimestamp() throws IOException { 295 return fileInfo.getModificationTime(); 296 } 297 298 /** 299 * Only used by the Striped Compaction Policy 300 * @param key 301 * @return value associated with the metadata key 302 */ 303 public byte[] getMetadataValue(byte[] key) { 304 return metadataMap.get(key); 305 } 306 307 @Override 308 public boolean isBulkLoadResult() { 309 boolean bulkLoadedHFile = false; 310 String fileName = this.getPath().getName(); 311 int startPos = fileName.indexOf("SeqId_"); 312 if (startPos != -1) { 313 bulkLoadedHFile = true; 314 } 315 return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); 316 } 317 318 public boolean isCompactedAway() { 319 return compactedAway; 320 } 321 322 @VisibleForTesting 323 public int getRefCount() { 324 return fileInfo.refCount.get(); 325 } 326 327 /** 328 * @return true if the file is still used in reads 329 */ 330 public boolean isReferencedInReads() { 331 int rc = fileInfo.refCount.get(); 332 assert rc >= 0; // we should not go negative. 333 return rc > 0; 334 } 335 336 @Override 337 public OptionalLong getBulkLoadTimestamp() { 338 byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); 339 return bulkLoadTimestamp == null ? OptionalLong.empty() 340 : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp)); 341 } 342 343 /** 344 * @return the cached value of HDFS blocks distribution. The cached value is calculated when store 345 * file is opened. 346 */ 347 public HDFSBlocksDistribution getHDFSBlockDistribution() { 348 return this.fileInfo.getHDFSBlockDistribution(); 349 } 350 351 /** 352 * Opens reader on this store file. Called by Constructor. 353 * @see #closeStoreFile(boolean) 354 */ 355 private void open() throws IOException { 356 fileInfo.initHDFSBlocksDistribution(); 357 long readahead = fileInfo.isNoReadahead() ? 0L : -1L; 358 ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD); 359 fileInfo.initHFileInfo(context); 360 StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf); 361 if (reader == null) { 362 reader = fileInfo.createReader(context, cacheConf); 363 fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader()); 364 } 365 this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader); 366 // Load up indices and fileinfo. This also loads Bloom filter type. 367 metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo()); 368 369 // Read in our metadata. 370 byte [] b = metadataMap.get(MAX_SEQ_ID_KEY); 371 if (b != null) { 372 // By convention, if halfhfile, top half has a sequence number > bottom 373 // half. Thats why we add one in below. Its done for case the two halves 374 // are ever merged back together --rare. Without it, on open of store, 375 // since store files are distinguished by sequence id, the one half would 376 // subsume the other. 377 this.sequenceid = Bytes.toLong(b); 378 if (fileInfo.isTopReference()) { 379 this.sequenceid += 1; 380 } 381 } 382 383 if (isBulkLoadResult()){ 384 // generate the sequenceId from the fileName 385 // fileName is of the form <randomName>_SeqId_<id-when-loaded>_ 386 String fileName = this.getPath().getName(); 387 // Use lastIndexOf() to get the last, most recent bulk load seqId. 388 int startPos = fileName.lastIndexOf("SeqId_"); 389 if (startPos != -1) { 390 this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, 391 fileName.indexOf('_', startPos + 6))); 392 // Handle reference files as done above. 393 if (fileInfo.isTopReference()) { 394 this.sequenceid += 1; 395 } 396 } 397 // SKIP_RESET_SEQ_ID only works in bulk loaded file. 398 // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk 399 // loaded to hbase, these cells have the same seqIds with the old ones. We do not want 400 // to reset new seqIds for them since this might make a mess of the visibility of cells that 401 // have the same row key but different seqIds. 402 boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)); 403 if (skipResetSeqId) { 404 // increase the seqId when it is a bulk loaded file from mob compaction. 405 this.sequenceid += 1; 406 } 407 initialReader.setSkipResetSeqId(skipResetSeqId); 408 initialReader.setBulkLoaded(true); 409 } 410 initialReader.setSequenceID(this.sequenceid); 411 412 b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 413 if (b != null) { 414 this.maxMemstoreTS = Bytes.toLong(b); 415 } 416 417 b = metadataMap.get(MAJOR_COMPACTION_KEY); 418 if (b != null) { 419 boolean mc = Bytes.toBoolean(b); 420 if (this.majorCompaction == null) { 421 this.majorCompaction = new AtomicBoolean(mc); 422 } else { 423 this.majorCompaction.set(mc); 424 } 425 } else { 426 // Presume it is not major compacted if it doesn't explicity say so 427 // HFileOutputFormat explicitly sets the major compacted key. 428 this.majorCompaction = new AtomicBoolean(false); 429 } 430 431 b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY); 432 this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b)); 433 434 BloomType hfileBloomType = initialReader.getBloomFilterType(); 435 if (cfBloomType != BloomType.NONE) { 436 initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META); 437 if (hfileBloomType != cfBloomType) { 438 LOG.debug("HFile Bloom filter type for " 439 + initialReader.getHFileReader().getName() + ": " + hfileBloomType 440 + ", but " + cfBloomType + " specified in column family " 441 + "configuration"); 442 } 443 } else if (hfileBloomType != BloomType.NONE) { 444 LOG.info("Bloom filter turned off by CF config for " 445 + initialReader.getHFileReader().getName()); 446 } 447 448 // load delete family bloom filter 449 initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META); 450 451 try { 452 byte[] data = metadataMap.get(TIMERANGE_KEY); 453 initialReader.timeRange = data == null ? null : 454 TimeRangeTracker.parseFrom(data).toTimeRange(); 455 } catch (IllegalArgumentException e) { 456 LOG.error("Error reading timestamp range data from meta -- " + 457 "proceeding without", e); 458 this.initialReader.timeRange = null; 459 } 460 461 try { 462 byte[] data = metadataMap.get(COMPACTION_EVENT_KEY); 463 this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data)); 464 } catch (IOException e) { 465 LOG.error("Error reading compacted storefiles from meta data", e); 466 } 467 468 // initialize so we can reuse them after reader closed. 469 firstKey = initialReader.getFirstKey(); 470 lastKey = initialReader.getLastKey(); 471 comparator = initialReader.getComparator(); 472 } 473 474 /** 475 * Initialize the reader used for pread. 476 */ 477 public void initReader() throws IOException { 478 if (initialReader == null) { 479 synchronized (this) { 480 if (initialReader == null) { 481 try { 482 open(); 483 } catch (Exception e) { 484 try { 485 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 486 this.closeStoreFile(evictOnClose); 487 } catch (IOException ee) { 488 LOG.warn("failed to close reader", ee); 489 } 490 throw e; 491 } 492 } 493 } 494 } 495 } 496 497 private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException { 498 initReader(); 499 final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction(); 500 ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM); 501 StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf); 502 if (reader == null) { 503 reader = fileInfo.createReader(context, cacheConf); 504 // steam reader need copy stuffs from pread reader 505 reader.copyFields(initialReader); 506 } 507 return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader); 508 } 509 510 /** 511 * Get a scanner which uses pread. 512 * <p> 513 * Must be called after initReader. 514 */ 515 public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, 516 boolean canOptimizeForNonNullColumn) { 517 return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, 518 canOptimizeForNonNullColumn); 519 } 520 521 /** 522 * Get a scanner which uses streaming read. 523 * <p> 524 * Must be called after initReader. 525 */ 526 public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, 527 boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) 528 throws IOException { 529 return createStreamReader(canUseDropBehind) 530 .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder, 531 canOptimizeForNonNullColumn); 532 } 533 534 /** 535 * @return Current reader. Must call initReader first else returns null. 536 * @see #initReader() 537 */ 538 public StoreFileReader getReader() { 539 return this.initialReader; 540 } 541 542 /** 543 * @param evictOnClose whether to evict blocks belonging to this file 544 * @throws IOException 545 */ 546 public synchronized void closeStoreFile(boolean evictOnClose) throws IOException { 547 if (this.initialReader != null) { 548 this.initialReader.close(evictOnClose); 549 this.initialReader = null; 550 } 551 } 552 553 /** 554 * Delete this file 555 * @throws IOException 556 */ 557 public void deleteStoreFile() throws IOException { 558 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 559 closeStoreFile(evictOnClose); 560 this.fileInfo.getFileSystem().delete(getPath(), true); 561 } 562 563 public void markCompactedAway() { 564 this.compactedAway = true; 565 } 566 567 @Override 568 public String toString() { 569 return this.fileInfo.toString(); 570 } 571 572 @Override 573 public String toStringDetailed() { 574 StringBuilder sb = new StringBuilder(); 575 sb.append(this.getPath().toString()); 576 sb.append(", isReference=").append(isReference()); 577 sb.append(", isBulkLoadResult=").append(isBulkLoadResult()); 578 if (isBulkLoadResult()) { 579 sb.append(", bulkLoadTS="); 580 OptionalLong bulkLoadTS = getBulkLoadTimestamp(); 581 if (bulkLoadTS.isPresent()) { 582 sb.append(bulkLoadTS.getAsLong()); 583 } else { 584 sb.append("NotPresent"); 585 } 586 } else { 587 sb.append(", seqid=").append(getMaxSequenceId()); 588 } 589 sb.append(", majorCompaction=").append(isMajorCompactionResult()); 590 591 return sb.toString(); 592 } 593 594 /** 595 * Gets whether to skip resetting the sequence id for cells. 596 * @param skipResetSeqId The byte array of boolean. 597 * @return Whether to skip resetting the sequence id. 598 */ 599 private boolean isSkipResetSeqId(byte[] skipResetSeqId) { 600 if (skipResetSeqId != null && skipResetSeqId.length == 1) { 601 return Bytes.toBoolean(skipResetSeqId); 602 } 603 return false; 604 } 605 606 @Override 607 public OptionalLong getMinimumTimestamp() { 608 TimeRange tr = getReader().timeRange; 609 return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty(); 610 } 611 612 @Override 613 public OptionalLong getMaximumTimestamp() { 614 TimeRange tr = getReader().timeRange; 615 return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty(); 616 } 617 618 Set<String> getCompactedStoreFiles() { 619 return Collections.unmodifiableSet(this.compactedStoreFiles); 620 } 621}