001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
029import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
030
031import java.io.IOException;
032import java.net.InetSocketAddress;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.HashSet;
036import java.util.Set;
037import java.util.UUID;
038import java.util.function.Consumer;
039import java.util.function.Supplier;
040import java.util.regex.Pattern;
041import java.util.stream.Collectors;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.PrivateCellUtil;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileContext;
054import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
055import org.apache.hadoop.hbase.mob.MobUtils;
056import org.apache.hadoop.hbase.util.BloomContext;
057import org.apache.hadoop.hbase.util.BloomFilterFactory;
058import org.apache.hadoop.hbase.util.BloomFilterUtil;
059import org.apache.hadoop.hbase.util.BloomFilterWriter;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.RowBloomContext;
063import org.apache.hadoop.hbase.util.RowColBloomContext;
064import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
070import org.apache.hbase.thirdparty.com.google.common.base.Strings;
071import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
072
073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
074
075/**
076 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is
077 * an implementation detail of the HBase regionserver.
078 */
079@InterfaceAudience.Private
080public class StoreFileWriter implements CellSink, ShipperListener {
081  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
082  private static final Pattern dash = Pattern.compile("-");
083  private final BloomFilterWriter generalBloomFilterWriter;
084  private final BloomFilterWriter deleteFamilyBloomFilterWriter;
085  private final BloomType bloomType;
086  private byte[] bloomParam = null;
087  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
088  private long deleteFamilyCnt = 0;
089  private BloomContext bloomContext = null;
090  private BloomContext deleteFamilyBloomContext = null;
091  private final TimeRangeTracker timeRangeTracker;
092  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
093
094  protected HFile.Writer writer;
095
096  /**
097   * Creates an HFile.Writer that also write helpful meta data.
098   * @param fs                     file system to write to
099   * @param path                   file name to create
100   * @param conf                   user configuration
101   * @param bloomType              bloom filter setting
102   * @param maxKeys                the expected maximum number of keys to be added. Was used for
103   *                               Bloom filter size in {@link HFile} format version 1.
104   * @param favoredNodes           an array of favored nodes or possibly null
105   * @param fileContext            The HFile context
106   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
107   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
108   * @throws IOException problem writing to FS
109   */
110  private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
111    BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext,
112    boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier)
113    throws IOException {
114    this.compactedFilesSupplier = compactedFilesSupplier;
115    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
116    // TODO : Change all writers to be specifically created for compaction context
117    writer =
118      HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes)
119        .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create();
120
121    generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf,
122      bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
123
124    if (generalBloomFilterWriter != null) {
125      this.bloomType = bloomType;
126      this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
127      if (LOG.isTraceEnabled()) {
128        LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
129          + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH
130            ? Bytes.toInt(bloomParam)
131            : Bytes.toStringBinary(bloomParam))
132          + ", " + generalBloomFilterWriter.getClass().getSimpleName());
133      }
134      // init bloom context
135      switch (bloomType) {
136        case ROW:
137          bloomContext =
138            new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
139          break;
140        case ROWCOL:
141          bloomContext =
142            new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
143          break;
144        case ROWPREFIX_FIXED_LENGTH:
145          bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
146            fileContext.getCellComparator(), Bytes.toInt(bloomParam));
147          break;
148        default:
149          throw new IOException(
150            "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
151      }
152    } else {
153      // Not using Bloom filters.
154      this.bloomType = BloomType.NONE;
155    }
156
157    // initialize delete family Bloom filter when there is NO RowCol Bloom filter
158    if (this.bloomType != BloomType.ROWCOL) {
159      this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf,
160        cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
161      deleteFamilyBloomContext =
162        new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
163    } else {
164      deleteFamilyBloomFilterWriter = null;
165    }
166    if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
167      LOG.trace("Delete Family Bloom filter type for " + path + ": "
168        + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
169    }
170  }
171
172  public long getPos() throws IOException {
173    return ((HFileWriterImpl) writer).getPos();
174  }
175
176  /**
177   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
178   * @param maxSequenceId   Maximum sequence id.
179   * @param majorCompaction True if this file is product of a major compaction
180   * @throws IOException problem writing to FS
181   */
182  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
183    throws IOException {
184    appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
185  }
186
187  /**
188   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
189   * @param maxSequenceId   Maximum sequence id.
190   * @param majorCompaction True if this file is product of a major compaction
191   * @param storeFiles      The compacted store files to generate this new file
192   * @throws IOException problem writing to FS
193   */
194  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
195    final Collection<HStoreFile> storeFiles) throws IOException {
196    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
197    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
198    writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
199    appendTrackedTimestampsToMetadata();
200  }
201
202  /**
203   * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted
204   * store files's name is needed. But if the compacted store file is a result of compaction, it's
205   * compacted files which still not archived is needed, too. And don't need to add compacted files
206   * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will
207   * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E
208   * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to
209   * add D's compacted file, as D's compacted files has been in E's compacted files, too. See
210   * HBASE-20724 for more details.
211   * @param storeFiles The compacted store files to generate this new file
212   * @return bytes of CompactionEventTracker
213   */
214  private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
215    Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream()
216      .map(sf -> sf.getPath().getName()).collect(Collectors.toSet());
217    Set<String> compactedStoreFiles = new HashSet<>();
218    for (HStoreFile storeFile : storeFiles) {
219      compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
220      for (String csf : storeFile.getCompactedStoreFiles()) {
221        if (notArchivedCompactedStoreFiles.contains(csf)) {
222          compactedStoreFiles.add(csf);
223        }
224      }
225    }
226    return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
227  }
228
229  /**
230   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
231   * @param maxSequenceId   Maximum sequence id.
232   * @param majorCompaction True if this file is product of a major compaction
233   * @param mobCellsCount   The number of mob cells.
234   * @throws IOException problem writing to FS
235   */
236  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
237    final long mobCellsCount) throws IOException {
238    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
239    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
240    writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
241    appendTrackedTimestampsToMetadata();
242  }
243
244  /**
245   * Appends MOB - specific metadata (even if it is empty)
246   * @param mobRefSet - original table -> set of MOB file names
247   * @throws IOException problem writing to FS
248   */
249  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
250    writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
251  }
252
253  /**
254   * Add TimestampRange and earliest put timestamp to Metadata
255   */
256  public void appendTrackedTimestampsToMetadata() throws IOException {
257    // TODO: The StoreFileReader always converts the byte[] to TimeRange
258    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
259    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
260    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
261  }
262
263  /**
264   * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
265   * to include the timestamp of this key
266   */
267  public void trackTimestamps(final Cell cell) {
268    if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
269      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
270    }
271    timeRangeTracker.includeTimestamp(cell);
272  }
273
274  private void appendGeneralBloomfilter(final Cell cell) throws IOException {
275    if (this.generalBloomFilterWriter != null) {
276      /*
277       * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
278       * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of
279       * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed Length
280       * Row Prefix
281       */
282      bloomContext.writeBloom(cell);
283    }
284  }
285
286  private void appendDeleteFamilyBloomFilter(final Cell cell) throws IOException {
287    if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
288      return;
289    }
290
291    // increase the number of delete family in the store file
292    deleteFamilyCnt++;
293    if (this.deleteFamilyBloomFilterWriter != null) {
294      deleteFamilyBloomContext.writeBloom(cell);
295    }
296  }
297
298  @Override
299  public void append(final Cell cell) throws IOException {
300    appendGeneralBloomfilter(cell);
301    appendDeleteFamilyBloomFilter(cell);
302    writer.append(cell);
303    trackTimestamps(cell);
304  }
305
306  @Override
307  public void beforeShipped() throws IOException {
308    // For now these writer will always be of type ShipperListener true.
309    // TODO : Change all writers to be specifically created for compaction context
310    writer.beforeShipped();
311    if (generalBloomFilterWriter != null) {
312      generalBloomFilterWriter.beforeShipped();
313    }
314    if (deleteFamilyBloomFilterWriter != null) {
315      deleteFamilyBloomFilterWriter.beforeShipped();
316    }
317  }
318
319  public Path getPath() {
320    return this.writer.getPath();
321  }
322
323  public boolean hasGeneralBloom() {
324    return this.generalBloomFilterWriter != null;
325  }
326
327  /**
328   * For unit testing only.
329   * @return the Bloom filter used by this writer.
330   */
331  BloomFilterWriter getGeneralBloomWriter() {
332    return generalBloomFilterWriter;
333  }
334
335  private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
336    boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
337    if (haveBloom) {
338      bfw.compactBloom();
339    }
340    return haveBloom;
341  }
342
343  private boolean closeGeneralBloomFilter() throws IOException {
344    boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
345
346    // add the general Bloom filter writer and append file info
347    if (hasGeneralBloom) {
348      writer.addGeneralBloomFilter(generalBloomFilterWriter);
349      writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
350      if (bloomParam != null) {
351        writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
352      }
353      bloomContext.addLastBloomKey(writer);
354    }
355    return hasGeneralBloom;
356  }
357
358  private boolean closeDeleteFamilyBloomFilter() throws IOException {
359    boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
360
361    // add the delete family Bloom filter writer
362    if (hasDeleteFamilyBloom) {
363      writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
364    }
365
366    // append file info about the number of delete family kvs
367    // even if there is no delete family Bloom.
368    writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
369
370    return hasDeleteFamilyBloom;
371  }
372
373  public void close() throws IOException {
374    boolean hasGeneralBloom = this.closeGeneralBloomFilter();
375    boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
376
377    writer.close();
378
379    // Log final Bloom filter statistics. This needs to be done after close()
380    // because compound Bloom filters might be finalized as part of closing.
381    if (LOG.isTraceEnabled()) {
382      LOG.trace(
383        (hasGeneralBloom ? "" : "NO ") + "General Bloom and " + (hasDeleteFamilyBloom ? "" : "NO ")
384          + "DeleteFamily" + " was added to HFile " + getPath());
385    }
386
387  }
388
389  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
390    writer.appendFileInfo(key, value);
391  }
392
393  /**
394   * For use in testing.
395   */
396  HFile.Writer getHFileWriter() {
397    return writer;
398  }
399
400  /**
401   * @param dir Directory to create file in.
402   * @return random filename inside passed <code>dir</code>
403   */
404  public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
405    if (!fs.getFileStatus(dir).isDirectory()) {
406      throw new IOException("Expecting " + dir.toString() + " to be a directory");
407    }
408    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
409  }
410
411  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
412      justification = "Will not overflow")
413  public static class Builder {
414    private final Configuration conf;
415    private final CacheConfig cacheConf;
416    private final FileSystem fs;
417
418    private BloomType bloomType = BloomType.NONE;
419    private long maxKeyCount = 0;
420    private Path dir;
421    private Path filePath;
422    private InetSocketAddress[] favoredNodes;
423    private HFileContext fileContext;
424    private boolean shouldDropCacheBehind;
425    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
426    private String fileStoragePolicy;
427    // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
428    // implementation where we will write store files directly to the final place, instead of
429    // writing a tmp file first. Under this scenario, we will have a background task to purge the
430    // store files which are not recorded in the SFT, but for the newly created store file writer,
431    // they are not tracked in SFT, so here we need to record them and treat them specially.
432    private Consumer<Path> writerCreationTracker;
433
434    public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
435      this.conf = conf;
436      this.cacheConf = cacheConf;
437      this.fs = fs;
438    }
439
440    /**
441     * Creates Builder with cache configuration disabled
442     */
443    public Builder(Configuration conf, FileSystem fs) {
444      this.conf = conf;
445      this.cacheConf = CacheConfig.DISABLED;
446      this.fs = fs;
447    }
448
449    /**
450     * Use either this method or {@link #withFilePath}, but not both.
451     * @param dir Path to column family directory. The directory is created if does not exist. The
452     *            file is given a unique name within this directory.
453     * @return this (for chained invocation)
454     */
455    public Builder withOutputDir(Path dir) {
456      Preconditions.checkNotNull(dir);
457      this.dir = dir;
458      return this;
459    }
460
461    /**
462     * Use either this method or {@link #withOutputDir}, but not both.
463     * @param filePath the StoreFile path to write
464     * @return this (for chained invocation)
465     */
466    public Builder withFilePath(Path filePath) {
467      Preconditions.checkNotNull(filePath);
468      this.filePath = filePath;
469      return this;
470    }
471
472    /**
473     * @param favoredNodes an array of favored nodes or possibly null
474     * @return this (for chained invocation)
475     */
476    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
477      this.favoredNodes = favoredNodes;
478      return this;
479    }
480
481    public Builder withBloomType(BloomType bloomType) {
482      Preconditions.checkNotNull(bloomType);
483      this.bloomType = bloomType;
484      return this;
485    }
486
487    /**
488     * @param maxKeyCount estimated maximum number of keys we expect to add
489     * @return this (for chained invocation)
490     */
491    public Builder withMaxKeyCount(long maxKeyCount) {
492      this.maxKeyCount = maxKeyCount;
493      return this;
494    }
495
496    public Builder withFileContext(HFileContext fileContext) {
497      this.fileContext = fileContext;
498      return this;
499    }
500
501    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
502      this.shouldDropCacheBehind = shouldDropCacheBehind;
503      return this;
504    }
505
506    public Builder
507      withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
508      this.compactedFilesSupplier = compactedFilesSupplier;
509      return this;
510    }
511
512    public Builder withFileStoragePolicy(String fileStoragePolicy) {
513      this.fileStoragePolicy = fileStoragePolicy;
514      return this;
515    }
516
517    public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
518      this.writerCreationTracker = writerCreationTracker;
519      return this;
520    }
521
522    /**
523     * Create a store file writer. Client is responsible for closing file when done. If metadata,
524     * add BEFORE closing using {@link StoreFileWriter#appendMetadata}.
525     */
526    public StoreFileWriter build() throws IOException {
527      if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
528        throw new IllegalArgumentException("Either specify parent directory " + "or file path");
529      }
530
531      if (dir == null) {
532        dir = filePath.getParent();
533      }
534
535      if (!fs.exists(dir)) {
536        // Handle permission for non-HDFS filesystem properly
537        // See HBASE-17710
538        HRegionFileSystem.mkdirs(fs, conf, dir);
539      }
540
541      // set block storage policy for temp path
542      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
543      if (null == policyName) {
544        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
545      }
546      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
547
548      if (filePath == null) {
549        // The stored file and related blocks will used the directory based StoragePolicy.
550        // Because HDFS DistributedFileSystem does not support create files with storage policy
551        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
552        // satisfy the specific storage policy when writing. So as to avoid later data movement.
553        // We don't want to change whole temp dir to 'fileStoragePolicy'.
554        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
555          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
556          if (!fs.exists(dir)) {
557            HRegionFileSystem.mkdirs(fs, conf, dir);
558            LOG.info(
559              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
560          }
561          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
562        }
563        filePath = getUniqueFile(fs, dir);
564        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
565          bloomType = BloomType.NONE;
566        }
567      }
568      // make sure we call this before actually create the writer
569      // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
570      // try to delete it and finally we will clean the tracker up after compaction. But if the file
571      // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
572      // and cause problem.
573      if (writerCreationTracker != null) {
574        writerCreationTracker.accept(filePath);
575      }
576      return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
577        favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
578    }
579  }
580}