001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.UnsupportedEncodingException; 023import java.net.URLEncoder; 024import java.util.Collections; 025import java.util.Map; 026import java.util.Optional; 027import java.util.OptionalLong; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellComparator; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HDFSBlocksDistribution; 040import org.apache.hadoop.hbase.io.TimeRange; 041import org.apache.hadoop.hbase.io.hfile.BlockType; 042import org.apache.hadoop.hbase.io.hfile.CacheConfig; 043import org.apache.hadoop.hbase.io.hfile.HFile; 044import org.apache.hadoop.hbase.util.BloomFilterFactory; 045import org.apache.hadoop.hbase.util.Bytes; 046 047import org.apache.yetus.audience.InterfaceAudience; 048 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053 054 055/** 056 * A Store data file. Stores usually have one or more of these files. They 057 * are produced by flushing the memstore to disk. To 058 * create, instantiate a writer using {@link StoreFileWriter.Builder} 059 * and append data. Be sure to add any metadata before calling close on the 060 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile 061 * is sitting in the Filesystem. To refer to it, create a StoreFile instance 062 * passing filesystem and path. To read, call {@link #initReader()} 063 * <p>StoreFiles may also reference store files in another Store. 064 * 065 * The reason for this weird pattern where you use a different instance for the 066 * writer and a reader is that we write once but read a lot more. 067 */ 068@InterfaceAudience.Private 069public class HStoreFile implements StoreFile, StoreFileReader.Listener { 070 071 private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName()); 072 073 public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead"; 074 075 private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false; 076 077 // Keys for fileinfo values in HFile 078 079 /** Max Sequence ID in FileInfo */ 080 public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); 081 082 /** Major compaction flag in FileInfo */ 083 public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); 084 085 /** Minor compaction flag in FileInfo */ 086 public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY = 087 Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); 088 089 /** Bloom filter Type in FileInfo */ 090 public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); 091 092 /** Delete Family Count in FileInfo */ 093 public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); 094 095 /** Last Bloom filter key in FileInfo */ 096 public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); 097 098 /** Key for Timerange information in metadata */ 099 public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); 100 101 /** Key for timestamp of earliest-put in metadata */ 102 public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); 103 104 /** Key for the number of mob cells in metadata */ 105 public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); 106 107 /** Meta key set when store file is a result of a bulk load */ 108 public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK"); 109 public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); 110 111 /** 112 * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets 113 * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped. 114 */ 115 public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); 116 117 private final StoreFileInfo fileInfo; 118 private final FileSystem fs; 119 120 // Block cache configuration and reference. 121 private final CacheConfig cacheConf; 122 123 // Counter that is incremented every time a scanner is created on the 124 // store file. It is decremented when the scan on the store file is 125 // done. 126 private final AtomicInteger refCount = new AtomicInteger(0); 127 128 // Set implementation must be of concurrent type 129 @VisibleForTesting 130 final Set<StoreFileReader> streamReaders; 131 132 private final boolean noReadahead; 133 134 private final boolean primaryReplica; 135 136 // Indicates if the file got compacted 137 private volatile boolean compactedAway = false; 138 139 // Keys for metadata stored in backing HFile. 140 // Set when we obtain a Reader. 141 private long sequenceid = -1; 142 143 // max of the MemstoreTS in the KV's in this store 144 // Set when we obtain a Reader. 145 private long maxMemstoreTS = -1; 146 147 // firstKey, lastkey and cellComparator will be set when openReader. 148 private Optional<Cell> firstKey; 149 150 private Optional<Cell> lastKey; 151 152 private CellComparator comparator; 153 154 public CacheConfig getCacheConf() { 155 return cacheConf; 156 } 157 158 @Override 159 public Optional<Cell> getFirstKey() { 160 return firstKey; 161 } 162 163 @Override 164 public Optional<Cell> getLastKey() { 165 return lastKey; 166 } 167 168 @Override 169 public CellComparator getComparator() { 170 return comparator; 171 } 172 173 @Override 174 public long getMaxMemStoreTS() { 175 return maxMemstoreTS; 176 } 177 178 // If true, this file was product of a major compaction. Its then set 179 // whenever you get a Reader. 180 private AtomicBoolean majorCompaction = null; 181 182 // If true, this file should not be included in minor compactions. 183 // It's set whenever you get a Reader. 184 private boolean excludeFromMinorCompaction = false; 185 186 /** 187 * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened 188 * after which it is not modified again. 189 */ 190 private Map<byte[], byte[]> metadataMap; 191 192 // StoreFile.Reader 193 private volatile StoreFileReader reader; 194 195 /** 196 * Bloom filter type specified in column family configuration. Does not 197 * necessarily correspond to the Bloom filter type present in the HFile. 198 */ 199 private final BloomType cfBloomType; 200 201 /** 202 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 203 * depending on the underlying files (10-20MB?). 204 * @param fs The current file system to use. 205 * @param p The path of the file. 206 * @param conf The current configuration. 207 * @param cacheConf The cache configuration and block cache reference. 208 * @param cfBloomType The bloom type to use for this store file as specified by column family 209 * configuration. This may or may not be the same as the Bloom filter type actually 210 * present in the HFile, because column family configuration might change. If this is 211 * {@link BloomType#NONE}, the existing Bloom filter is ignored. 212 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 213 * @throws IOException 214 */ 215 public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf, 216 BloomType cfBloomType, boolean primaryReplica) throws IOException { 217 this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica); 218 } 219 220 /** 221 * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram 222 * depending on the underlying files (10-20MB?). 223 * @param fs fs The current file system to use. 224 * @param fileInfo The store file information. 225 * @param conf The current configuration. 226 * @param cacheConf The cache configuration and block cache reference. 227 * @param cfBloomType The bloom type to use for this store file as specified by column 228 * family configuration. This may or may not be the same as the Bloom filter type 229 * actually present in the HFile, because column family configuration might change. If 230 * this is {@link BloomType#NONE}, the existing Bloom filter is ignored. 231 * @param primaryReplica true if this is a store file for primary replica, otherwise false. 232 */ 233 public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf, 234 BloomType cfBloomType, boolean primaryReplica) { 235 this.streamReaders = ConcurrentHashMap.newKeySet(); 236 this.fs = fs; 237 this.fileInfo = fileInfo; 238 this.cacheConf = cacheConf; 239 this.noReadahead = 240 conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD); 241 if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { 242 this.cfBloomType = cfBloomType; 243 } else { 244 LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" + 245 cfBloomType + " (disabled in config)"); 246 this.cfBloomType = BloomType.NONE; 247 } 248 this.primaryReplica = primaryReplica; 249 } 250 251 /** 252 * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a 253 * reference. 254 */ 255 public StoreFileInfo getFileInfo() { 256 return this.fileInfo; 257 } 258 259 @Override 260 public Path getPath() { 261 return this.fileInfo.getPath(); 262 } 263 264 @Override 265 public Path getEncodedPath() { 266 try { 267 return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING)); 268 } catch (UnsupportedEncodingException ex) { 269 throw new RuntimeException("URLEncoder doesn't support UTF-8", ex); 270 } 271 } 272 273 @Override 274 public Path getQualifiedPath() { 275 return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory()); 276 } 277 278 @Override 279 public boolean isReference() { 280 return this.fileInfo.isReference(); 281 } 282 283 @Override 284 public boolean isHFile() { 285 return StoreFileInfo.isHFile(this.fileInfo.getPath()); 286 } 287 288 @Override 289 public boolean isMajorCompactionResult() { 290 if (this.majorCompaction == null) { 291 throw new NullPointerException("This has not been set yet"); 292 } 293 return this.majorCompaction.get(); 294 } 295 296 @Override 297 public boolean excludeFromMinorCompaction() { 298 return this.excludeFromMinorCompaction; 299 } 300 301 @Override 302 public long getMaxSequenceId() { 303 return this.sequenceid; 304 } 305 306 @Override 307 public long getModificationTimeStamp() throws IOException { 308 return getModificationTimestamp(); 309 } 310 311 @Override 312 public long getModificationTimestamp() throws IOException { 313 return fileInfo.getModificationTime(); 314 } 315 316 /** 317 * Only used by the Striped Compaction Policy 318 * @param key 319 * @return value associated with the metadata key 320 */ 321 public byte[] getMetadataValue(byte[] key) { 322 return metadataMap.get(key); 323 } 324 325 @Override 326 public boolean isBulkLoadResult() { 327 boolean bulkLoadedHFile = false; 328 String fileName = this.getPath().getName(); 329 int startPos = fileName.indexOf("SeqId_"); 330 if (startPos != -1) { 331 bulkLoadedHFile = true; 332 } 333 return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); 334 } 335 336 public boolean isCompactedAway() { 337 return compactedAway; 338 } 339 340 @VisibleForTesting 341 public int getRefCount() { 342 return refCount.get(); 343 } 344 345 /** 346 * @return true if the file is still used in reads 347 */ 348 public boolean isReferencedInReads() { 349 int rc = refCount.get(); 350 assert rc >= 0; // we should not go negative. 351 return rc > 0; 352 } 353 354 @Override 355 public OptionalLong getBulkLoadTimestamp() { 356 byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); 357 return bulkLoadTimestamp == null ? OptionalLong.empty() 358 : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp)); 359 } 360 361 /** 362 * @return the cached value of HDFS blocks distribution. The cached value is calculated when store 363 * file is opened. 364 */ 365 public HDFSBlocksDistribution getHDFSBlockDistribution() { 366 return this.fileInfo.getHDFSBlockDistribution(); 367 } 368 369 /** 370 * Opens reader on this store file. Called by Constructor. 371 * @throws IOException 372 * @see #closeStoreFile(boolean) 373 */ 374 private void open() throws IOException { 375 if (this.reader != null) { 376 throw new IllegalAccessError("Already open"); 377 } 378 379 // Open the StoreFile.Reader 380 this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L, 381 primaryReplica, refCount, true); 382 383 // Load up indices and fileinfo. This also loads Bloom filter type. 384 metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); 385 386 // Read in our metadata. 387 byte [] b = metadataMap.get(MAX_SEQ_ID_KEY); 388 if (b != null) { 389 // By convention, if halfhfile, top half has a sequence number > bottom 390 // half. Thats why we add one in below. Its done for case the two halves 391 // are ever merged back together --rare. Without it, on open of store, 392 // since store files are distinguished by sequence id, the one half would 393 // subsume the other. 394 this.sequenceid = Bytes.toLong(b); 395 if (fileInfo.isTopReference()) { 396 this.sequenceid += 1; 397 } 398 } 399 400 if (isBulkLoadResult()){ 401 // generate the sequenceId from the fileName 402 // fileName is of the form <randomName>_SeqId_<id-when-loaded>_ 403 String fileName = this.getPath().getName(); 404 // Use lastIndexOf() to get the last, most recent bulk load seqId. 405 int startPos = fileName.lastIndexOf("SeqId_"); 406 if (startPos != -1) { 407 this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, 408 fileName.indexOf('_', startPos + 6))); 409 // Handle reference files as done above. 410 if (fileInfo.isTopReference()) { 411 this.sequenceid += 1; 412 } 413 } 414 // SKIP_RESET_SEQ_ID only works in bulk loaded file. 415 // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk 416 // loaded to hbase, these cells have the same seqIds with the old ones. We do not want 417 // to reset new seqIds for them since this might make a mess of the visibility of cells that 418 // have the same row key but different seqIds. 419 boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)); 420 if (skipResetSeqId) { 421 // increase the seqId when it is a bulk loaded file from mob compaction. 422 this.sequenceid += 1; 423 } 424 this.reader.setSkipResetSeqId(skipResetSeqId); 425 this.reader.setBulkLoaded(true); 426 } 427 this.reader.setSequenceID(this.sequenceid); 428 429 b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 430 if (b != null) { 431 this.maxMemstoreTS = Bytes.toLong(b); 432 } 433 434 b = metadataMap.get(MAJOR_COMPACTION_KEY); 435 if (b != null) { 436 boolean mc = Bytes.toBoolean(b); 437 if (this.majorCompaction == null) { 438 this.majorCompaction = new AtomicBoolean(mc); 439 } else { 440 this.majorCompaction.set(mc); 441 } 442 } else { 443 // Presume it is not major compacted if it doesn't explicity say so 444 // HFileOutputFormat explicitly sets the major compacted key. 445 this.majorCompaction = new AtomicBoolean(false); 446 } 447 448 b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY); 449 this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b)); 450 451 BloomType hfileBloomType = reader.getBloomFilterType(); 452 if (cfBloomType != BloomType.NONE) { 453 reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META); 454 if (hfileBloomType != cfBloomType) { 455 LOG.info("HFile Bloom filter type for " 456 + reader.getHFileReader().getName() + ": " + hfileBloomType 457 + ", but " + cfBloomType + " specified in column family " 458 + "configuration"); 459 } 460 } else if (hfileBloomType != BloomType.NONE) { 461 LOG.info("Bloom filter turned off by CF config for " 462 + reader.getHFileReader().getName()); 463 } 464 465 // load delete family bloom filter 466 reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META); 467 468 try { 469 byte[] data = metadataMap.get(TIMERANGE_KEY); 470 this.reader.timeRange = data == null ? null : TimeRangeTracker.parseFrom(data).toTimeRange(); 471 } catch (IllegalArgumentException e) { 472 LOG.error("Error reading timestamp range data from meta -- " + 473 "proceeding without", e); 474 this.reader.timeRange = null; 475 } 476 // initialize so we can reuse them after reader closed. 477 firstKey = reader.getFirstKey(); 478 lastKey = reader.getLastKey(); 479 comparator = reader.getComparator(); 480 } 481 482 /** 483 * Initialize the reader used for pread. 484 */ 485 public void initReader() throws IOException { 486 if (reader == null) { 487 try { 488 open(); 489 } catch (Exception e) { 490 try { 491 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 492 this.closeStoreFile(evictOnClose); 493 } catch (IOException ee) { 494 LOG.warn("failed to close reader", ee); 495 } 496 throw e; 497 } 498 } 499 } 500 501 private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException { 502 initReader(); 503 StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L, 504 primaryReplica, refCount, false); 505 reader.copyFields(this.reader); 506 return reader; 507 } 508 509 /** 510 * Get a scanner which uses pread. 511 * <p> 512 * Must be called after initReader. 513 */ 514 public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, 515 boolean canOptimizeForNonNullColumn) { 516 return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, 517 canOptimizeForNonNullColumn); 518 } 519 520 /** 521 * Get a scanner which uses streaming read. 522 * <p> 523 * Must be called after initReader. 524 */ 525 public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, 526 boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) 527 throws IOException { 528 StoreFileReader reader = createStreamReader(canUseDropBehind); 529 reader.setListener(this); 530 StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false, 531 isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); 532 //Add reader once the scanner is created 533 streamReaders.add(reader); 534 return sfScanner; 535 } 536 537 /** 538 * @return Current reader. Must call initReader first else returns null. 539 * @see #initReader() 540 */ 541 public StoreFileReader getReader() { 542 return this.reader; 543 } 544 545 /** 546 * @param evictOnClose whether to evict blocks belonging to this file 547 * @throws IOException 548 */ 549 public synchronized void closeStoreFile(boolean evictOnClose) throws IOException { 550 if (this.reader != null) { 551 this.reader.close(evictOnClose); 552 this.reader = null; 553 } 554 closeStreamReaders(evictOnClose); 555 } 556 557 public void closeStreamReaders(boolean evictOnClose) throws IOException { 558 synchronized (this) { 559 for (StoreFileReader entry : streamReaders) { 560 //closing the reader will remove itself from streamReaders thanks to the Listener 561 entry.close(evictOnClose); 562 } 563 int size = streamReaders.size(); 564 Preconditions.checkState(size == 0, 565 "There are still streamReaders post close: " + size); 566 } 567 } 568 569 /** 570 * Delete this file 571 * @throws IOException 572 */ 573 public void deleteStoreFile() throws IOException { 574 boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; 575 closeStoreFile(evictOnClose); 576 this.fs.delete(getPath(), true); 577 } 578 579 public void markCompactedAway() { 580 this.compactedAway = true; 581 } 582 583 @Override 584 public String toString() { 585 return this.fileInfo.toString(); 586 } 587 588 @Override 589 public String toStringDetailed() { 590 StringBuilder sb = new StringBuilder(); 591 sb.append(this.getPath().toString()); 592 sb.append(", isReference=").append(isReference()); 593 sb.append(", isBulkLoadResult=").append(isBulkLoadResult()); 594 if (isBulkLoadResult()) { 595 sb.append(", bulkLoadTS="); 596 OptionalLong bulkLoadTS = getBulkLoadTimestamp(); 597 if (bulkLoadTS.isPresent()) { 598 sb.append(bulkLoadTS.getAsLong()); 599 } else { 600 sb.append("NotPresent"); 601 } 602 } else { 603 sb.append(", seqid=").append(getMaxSequenceId()); 604 } 605 sb.append(", majorCompaction=").append(isMajorCompactionResult()); 606 607 return sb.toString(); 608 } 609 610 /** 611 * Gets whether to skip resetting the sequence id for cells. 612 * @param skipResetSeqId The byte array of boolean. 613 * @return Whether to skip resetting the sequence id. 614 */ 615 private boolean isSkipResetSeqId(byte[] skipResetSeqId) { 616 if (skipResetSeqId != null && skipResetSeqId.length == 1) { 617 return Bytes.toBoolean(skipResetSeqId); 618 } 619 return false; 620 } 621 622 @Override 623 public OptionalLong getMinimumTimestamp() { 624 TimeRange tr = getReader().timeRange; 625 return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty(); 626 } 627 628 @Override 629 public OptionalLong getMaximumTimestamp() { 630 TimeRange tr = getReader().timeRange; 631 return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty(); 632 } 633 634 @Override 635 public void storeFileReaderClosed(StoreFileReader reader) { 636 streamReaders.remove(reader); 637 } 638}