001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS; 029import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 030 031import java.io.IOException; 032import java.net.InetSocketAddress; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.HashSet; 036import java.util.Set; 037import java.util.UUID; 038import java.util.function.Consumer; 039import java.util.function.Supplier; 040import java.util.regex.Pattern; 041import java.util.stream.Collectors; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.PrivateCellUtil; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFile; 053import org.apache.hadoop.hbase.io.hfile.HFileContext; 054import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 055import org.apache.hadoop.hbase.mob.MobUtils; 056import org.apache.hadoop.hbase.util.BloomContext; 057import org.apache.hadoop.hbase.util.BloomFilterFactory; 058import org.apache.hadoop.hbase.util.BloomFilterUtil; 059import org.apache.hadoop.hbase.util.BloomFilterWriter; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.RowBloomContext; 063import org.apache.hadoop.hbase.util.RowColBloomContext; 064import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 070import org.apache.hbase.thirdparty.com.google.common.base.Strings; 071import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 072 073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 074 075/** 076 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is 077 * an implementation detail of the HBase regionserver. 078 */ 079@InterfaceAudience.Private 080public class StoreFileWriter implements CellSink, ShipperListener { 081 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 082 private static final Pattern dash = Pattern.compile("-"); 083 private final BloomFilterWriter generalBloomFilterWriter; 084 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 085 private final BloomType bloomType; 086 private byte[] bloomParam = null; 087 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 088 private long deleteFamilyCnt = 0; 089 private BloomContext bloomContext = null; 090 private BloomContext deleteFamilyBloomContext = null; 091 private final TimeRangeTracker timeRangeTracker; 092 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 093 094 protected HFile.Writer writer; 095 096 /** 097 * Creates an HFile.Writer that also write helpful meta data. 098 * @param fs file system to write to 099 * @param path file name to create 100 * @param conf user configuration 101 * @param bloomType bloom filter setting 102 * @param maxKeys the expected maximum number of keys to be added. Was used for 103 * Bloom filter size in {@link HFile} format version 1. 104 * @param favoredNodes an array of favored nodes or possibly null 105 * @param fileContext The HFile context 106 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 107 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 108 * @throws IOException problem writing to FS 109 */ 110 private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, 111 BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext, 112 boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier) 113 throws IOException { 114 this.compactedFilesSupplier = compactedFilesSupplier; 115 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 116 // TODO : Change all writers to be specifically created for compaction context 117 writer = 118 HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) 119 .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create(); 120 121 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, 122 bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 123 124 if (generalBloomFilterWriter != null) { 125 this.bloomType = bloomType; 126 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 127 if (LOG.isTraceEnabled()) { 128 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 129 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH 130 ? Bytes.toInt(bloomParam) 131 : Bytes.toStringBinary(bloomParam)) 132 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 133 } 134 // init bloom context 135 switch (bloomType) { 136 case ROW: 137 bloomContext = 138 new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 139 break; 140 case ROWCOL: 141 bloomContext = 142 new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 143 break; 144 case ROWPREFIX_FIXED_LENGTH: 145 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, 146 fileContext.getCellComparator(), Bytes.toInt(bloomParam)); 147 break; 148 default: 149 throw new IOException( 150 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 151 } 152 } else { 153 // Not using Bloom filters. 154 this.bloomType = BloomType.NONE; 155 } 156 157 // initialize delete family Bloom filter when there is NO RowCol Bloom 158 // filter 159 if (this.bloomType != BloomType.ROWCOL) { 160 this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf, 161 cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 162 deleteFamilyBloomContext = 163 new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); 164 } else { 165 deleteFamilyBloomFilterWriter = null; 166 } 167 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 168 LOG.trace("Delete Family Bloom filter type for " + path + ": " 169 + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 170 } 171 } 172 173 public long getPos() throws IOException { 174 return ((HFileWriterImpl) writer).getPos(); 175 } 176 177 /** 178 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 179 * @param maxSequenceId Maximum sequence id. 180 * @param majorCompaction True if this file is product of a major compaction 181 * @throws IOException problem writing to FS 182 */ 183 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 184 throws IOException { 185 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 186 } 187 188 /** 189 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 190 * @param maxSequenceId Maximum sequence id. 191 * @param majorCompaction True if this file is product of a major compaction 192 * @param storeFiles The compacted store files to generate this new file 193 * @throws IOException problem writing to FS 194 */ 195 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 196 final Collection<HStoreFile> storeFiles) throws IOException { 197 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 198 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 199 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 200 appendTrackedTimestampsToMetadata(); 201 } 202 203 /** 204 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted 205 * store files's name is needed. But if the compacted store file is a result of compaction, it's 206 * compacted files which still not archived is needed, too. And don't need to add compacted files 207 * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will 208 * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E 209 * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to 210 * add D's compacted file, as D's compacted files has been in E's compacted files, too. See 211 * HBASE-20724 for more details. 212 * @param storeFiles The compacted store files to generate this new file 213 * @return bytes of CompactionEventTracker 214 */ 215 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 216 Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream() 217 .map(sf -> sf.getPath().getName()).collect(Collectors.toSet()); 218 Set<String> compactedStoreFiles = new HashSet<>(); 219 for (HStoreFile storeFile : storeFiles) { 220 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 221 for (String csf : storeFile.getCompactedStoreFiles()) { 222 if (notArchivedCompactedStoreFiles.contains(csf)) { 223 compactedStoreFiles.add(csf); 224 } 225 } 226 } 227 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 228 } 229 230 /** 231 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 232 * @param maxSequenceId Maximum sequence id. 233 * @param majorCompaction True if this file is product of a major compaction 234 * @param mobCellsCount The number of mob cells. 235 * @throws IOException problem writing to FS 236 */ 237 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 238 final long mobCellsCount) throws IOException { 239 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 240 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 241 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 242 appendTrackedTimestampsToMetadata(); 243 } 244 245 /** 246 * Appends MOB - specific metadata (even if it is empty) 247 * @param mobRefSet - original table -> set of MOB file names 248 * @throws IOException problem writing to FS 249 */ 250 public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 251 writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet)); 252 } 253 254 /** 255 * Add TimestampRange and earliest put timestamp to Metadata 256 */ 257 public void appendTrackedTimestampsToMetadata() throws IOException { 258 // TODO: The StoreFileReader always converts the byte[] to TimeRange 259 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 260 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 261 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 262 } 263 264 /** 265 * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker 266 * to include the timestamp of this key 267 */ 268 public void trackTimestamps(final Cell cell) { 269 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 270 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 271 } 272 timeRangeTracker.includeTimestamp(cell); 273 } 274 275 private void appendGeneralBloomfilter(final Cell cell) throws IOException { 276 if (this.generalBloomFilterWriter != null) { 277 /* 278 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png 279 * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of 280 * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed Length 281 * Row Prefix 282 */ 283 bloomContext.writeBloom(cell); 284 } 285 } 286 287 private void appendDeleteFamilyBloomFilter(final Cell cell) throws IOException { 288 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 289 return; 290 } 291 292 // increase the number of delete family in the store file 293 deleteFamilyCnt++; 294 if (this.deleteFamilyBloomFilterWriter != null) { 295 deleteFamilyBloomContext.writeBloom(cell); 296 } 297 } 298 299 @Override 300 public void append(final Cell cell) throws IOException { 301 appendGeneralBloomfilter(cell); 302 appendDeleteFamilyBloomFilter(cell); 303 writer.append(cell); 304 trackTimestamps(cell); 305 } 306 307 @Override 308 public void beforeShipped() throws IOException { 309 // For now these writer will always be of type ShipperListener true. 310 // TODO : Change all writers to be specifically created for compaction context 311 writer.beforeShipped(); 312 if (generalBloomFilterWriter != null) { 313 generalBloomFilterWriter.beforeShipped(); 314 } 315 if (deleteFamilyBloomFilterWriter != null) { 316 deleteFamilyBloomFilterWriter.beforeShipped(); 317 } 318 } 319 320 public Path getPath() { 321 return this.writer.getPath(); 322 } 323 324 public boolean hasGeneralBloom() { 325 return this.generalBloomFilterWriter != null; 326 } 327 328 /** 329 * For unit testing only. 330 * @return the Bloom filter used by this writer. 331 */ 332 BloomFilterWriter getGeneralBloomWriter() { 333 return generalBloomFilterWriter; 334 } 335 336 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 337 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 338 if (haveBloom) { 339 bfw.compactBloom(); 340 } 341 return haveBloom; 342 } 343 344 private boolean closeGeneralBloomFilter() throws IOException { 345 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 346 347 // add the general Bloom filter writer and append file info 348 if (hasGeneralBloom) { 349 writer.addGeneralBloomFilter(generalBloomFilterWriter); 350 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 351 if (bloomParam != null) { 352 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 353 } 354 bloomContext.addLastBloomKey(writer); 355 } 356 return hasGeneralBloom; 357 } 358 359 private boolean closeDeleteFamilyBloomFilter() throws IOException { 360 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 361 362 // add the delete family Bloom filter writer 363 if (hasDeleteFamilyBloom) { 364 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 365 } 366 367 // append file info about the number of delete family kvs 368 // even if there is no delete family Bloom. 369 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 370 371 return hasDeleteFamilyBloom; 372 } 373 374 public void close() throws IOException { 375 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 376 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 377 378 writer.close(); 379 380 // Log final Bloom filter statistics. This needs to be done after close() 381 // because compound Bloom filters might be finalized as part of closing. 382 if (LOG.isTraceEnabled()) { 383 LOG.trace( 384 (hasGeneralBloom ? "" : "NO ") + "General Bloom and " + (hasDeleteFamilyBloom ? "" : "NO ") 385 + "DeleteFamily" + " was added to HFile " + getPath()); 386 } 387 388 } 389 390 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 391 writer.appendFileInfo(key, value); 392 } 393 394 /** 395 * For use in testing. 396 */ 397 HFile.Writer getHFileWriter() { 398 return writer; 399 } 400 401 /** 402 * @param dir Directory to create file in. 403 * @return random filename inside passed <code>dir</code> 404 */ 405 static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 406 if (!fs.getFileStatus(dir).isDirectory()) { 407 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 408 } 409 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 410 } 411 412 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 413 justification = "Will not overflow") 414 public static class Builder { 415 private final Configuration conf; 416 private final CacheConfig cacheConf; 417 private final FileSystem fs; 418 419 private BloomType bloomType = BloomType.NONE; 420 private long maxKeyCount = 0; 421 private Path dir; 422 private Path filePath; 423 private InetSocketAddress[] favoredNodes; 424 private HFileContext fileContext; 425 private boolean shouldDropCacheBehind; 426 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 427 private String fileStoragePolicy; 428 // this is used to track the creation of the StoreFileWriter, mainly used for the SFT 429 // implementation where we will write store files directly to the final place, instead of 430 // writing a tmp file first. Under this scenario, we will have a background task to purge the 431 // store files which are not recorded in the SFT, but for the newly created store file writer, 432 // they are not tracked in SFT, so here we need to record them and treat them specially. 433 private Consumer<Path> writerCreationTracker; 434 435 public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { 436 this.conf = conf; 437 this.cacheConf = cacheConf; 438 this.fs = fs; 439 } 440 441 /** 442 * Creates Builder with cache configuration disabled 443 */ 444 public Builder(Configuration conf, FileSystem fs) { 445 this.conf = conf; 446 this.cacheConf = CacheConfig.DISABLED; 447 this.fs = fs; 448 } 449 450 /** 451 * Use either this method or {@link #withFilePath}, but not both. 452 * @param dir Path to column family directory. The directory is created if does not exist. The 453 * file is given a unique name within this directory. 454 * @return this (for chained invocation) 455 */ 456 public Builder withOutputDir(Path dir) { 457 Preconditions.checkNotNull(dir); 458 this.dir = dir; 459 return this; 460 } 461 462 /** 463 * Use either this method or {@link #withOutputDir}, but not both. 464 * @param filePath the StoreFile path to write 465 * @return this (for chained invocation) 466 */ 467 public Builder withFilePath(Path filePath) { 468 Preconditions.checkNotNull(filePath); 469 this.filePath = filePath; 470 return this; 471 } 472 473 /** 474 * @param favoredNodes an array of favored nodes or possibly null 475 * @return this (for chained invocation) 476 */ 477 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 478 this.favoredNodes = favoredNodes; 479 return this; 480 } 481 482 public Builder withBloomType(BloomType bloomType) { 483 Preconditions.checkNotNull(bloomType); 484 this.bloomType = bloomType; 485 return this; 486 } 487 488 /** 489 * @param maxKeyCount estimated maximum number of keys we expect to add 490 * @return this (for chained invocation) 491 */ 492 public Builder withMaxKeyCount(long maxKeyCount) { 493 this.maxKeyCount = maxKeyCount; 494 return this; 495 } 496 497 public Builder withFileContext(HFileContext fileContext) { 498 this.fileContext = fileContext; 499 return this; 500 } 501 502 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 503 this.shouldDropCacheBehind = shouldDropCacheBehind; 504 return this; 505 } 506 507 public Builder 508 withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 509 this.compactedFilesSupplier = compactedFilesSupplier; 510 return this; 511 } 512 513 public Builder withFileStoragePolicy(String fileStoragePolicy) { 514 this.fileStoragePolicy = fileStoragePolicy; 515 return this; 516 } 517 518 public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) { 519 this.writerCreationTracker = writerCreationTracker; 520 return this; 521 } 522 523 /** 524 * Create a store file writer. Client is responsible for closing file when done. If metadata, 525 * add BEFORE closing using {@link StoreFileWriter#appendMetadata}. 526 */ 527 public StoreFileWriter build() throws IOException { 528 if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { 529 throw new IllegalArgumentException("Either specify parent directory " + "or file path"); 530 } 531 532 if (dir == null) { 533 dir = filePath.getParent(); 534 } 535 536 if (!fs.exists(dir)) { 537 // Handle permission for non-HDFS filesystem properly 538 // See HBASE-17710 539 HRegionFileSystem.mkdirs(fs, conf, dir); 540 } 541 542 // set block storage policy for temp path 543 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 544 if (null == policyName) { 545 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 546 } 547 CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); 548 549 if (filePath == null) { 550 // The stored file and related blocks will used the directory based StoragePolicy. 551 // Because HDFS DistributedFileSystem does not support create files with storage policy 552 // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files 553 // satisfy the specific storage policy when writing. So as to avoid later data movement. 554 // We don't want to change whole temp dir to 'fileStoragePolicy'. 555 if (!Strings.isNullOrEmpty(fileStoragePolicy)) { 556 dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy); 557 if (!fs.exists(dir)) { 558 HRegionFileSystem.mkdirs(fs, conf, dir); 559 LOG.info( 560 "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy); 561 } 562 CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); 563 } 564 filePath = getUniqueFile(fs, dir); 565 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 566 bloomType = BloomType.NONE; 567 } 568 } 569 // make sure we call this before actually create the writer 570 // in fact, it is not a big deal to even add an inexistent file to the track, as we will never 571 // try to delete it and finally we will clean the tracker up after compaction. But if the file 572 // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file 573 // and cause problem. 574 if (writerCreationTracker != null) { 575 writerCreationTracker.accept(filePath); 576 } 577 return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, 578 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 579 } 580 } 581}