001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
029import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
030
031import java.io.IOException;
032import java.net.InetSocketAddress;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.HashSet;
036import java.util.Set;
037import java.util.UUID;
038import java.util.function.Consumer;
039import java.util.function.Supplier;
040import java.util.regex.Pattern;
041import java.util.stream.Collectors;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.PrivateCellUtil;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileContext;
054import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
055import org.apache.hadoop.hbase.mob.MobUtils;
056import org.apache.hadoop.hbase.util.BloomContext;
057import org.apache.hadoop.hbase.util.BloomFilterFactory;
058import org.apache.hadoop.hbase.util.BloomFilterUtil;
059import org.apache.hadoop.hbase.util.BloomFilterWriter;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.RowBloomContext;
063import org.apache.hadoop.hbase.util.RowColBloomContext;
064import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
070import org.apache.hbase.thirdparty.com.google.common.base.Strings;
071import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
072
073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
074
075/**
076 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is
077 * an implementation detail of the HBase regionserver.
078 */
079@InterfaceAudience.Private
080public class StoreFileWriter implements CellSink, ShipperListener {
081  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
082  private static final Pattern dash = Pattern.compile("-");
083  private final BloomFilterWriter generalBloomFilterWriter;
084  private final BloomFilterWriter deleteFamilyBloomFilterWriter;
085  private final BloomType bloomType;
086  private byte[] bloomParam = null;
087  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
088  private long deleteFamilyCnt = 0;
089  private BloomContext bloomContext = null;
090  private BloomContext deleteFamilyBloomContext = null;
091  private final TimeRangeTracker timeRangeTracker;
092  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
093
094  protected HFile.Writer writer;
095
096  /**
097   * Creates an HFile.Writer that also write helpful meta data.
098   * @param fs                     file system to write to
099   * @param path                   file name to create
100   * @param conf                   user configuration
101   * @param bloomType              bloom filter setting
102   * @param maxKeys                the expected maximum number of keys to be added. Was used for
103   *                               Bloom filter size in {@link HFile} format version 1.
104   * @param favoredNodes           an array of favored nodes or possibly null
105   * @param fileContext            The HFile context
106   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
107   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
108   * @throws IOException problem writing to FS
109   */
110  private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
111    BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext,
112    boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier)
113    throws IOException {
114    this.compactedFilesSupplier = compactedFilesSupplier;
115    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
116    // TODO : Change all writers to be specifically created for compaction context
117    writer =
118      HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes)
119        .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create();
120
121    generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf,
122      bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
123
124    if (generalBloomFilterWriter != null) {
125      this.bloomType = bloomType;
126      this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
127      if (LOG.isTraceEnabled()) {
128        LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
129          + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH
130            ? Bytes.toInt(bloomParam)
131            : Bytes.toStringBinary(bloomParam))
132          + ", " + generalBloomFilterWriter.getClass().getSimpleName());
133      }
134      // init bloom context
135      switch (bloomType) {
136        case ROW:
137          bloomContext =
138            new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
139          break;
140        case ROWCOL:
141          bloomContext =
142            new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
143          break;
144        case ROWPREFIX_FIXED_LENGTH:
145          bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
146            fileContext.getCellComparator(), Bytes.toInt(bloomParam));
147          break;
148        default:
149          throw new IOException(
150            "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
151      }
152    } else {
153      // Not using Bloom filters.
154      this.bloomType = BloomType.NONE;
155    }
156
157    // initialize delete family Bloom filter when there is NO RowCol Bloom
158    // filter
159    if (this.bloomType != BloomType.ROWCOL) {
160      this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf,
161        cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
162      deleteFamilyBloomContext =
163        new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
164    } else {
165      deleteFamilyBloomFilterWriter = null;
166    }
167    if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
168      LOG.trace("Delete Family Bloom filter type for " + path + ": "
169        + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
170    }
171  }
172
173  public long getPos() throws IOException {
174    return ((HFileWriterImpl) writer).getPos();
175  }
176
177  /**
178   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
179   * @param maxSequenceId   Maximum sequence id.
180   * @param majorCompaction True if this file is product of a major compaction
181   * @throws IOException problem writing to FS
182   */
183  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
184    throws IOException {
185    appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
186  }
187
188  /**
189   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
190   * @param maxSequenceId   Maximum sequence id.
191   * @param majorCompaction True if this file is product of a major compaction
192   * @param storeFiles      The compacted store files to generate this new file
193   * @throws IOException problem writing to FS
194   */
195  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
196    final Collection<HStoreFile> storeFiles) throws IOException {
197    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
198    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
199    writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
200    appendTrackedTimestampsToMetadata();
201  }
202
203  /**
204   * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted
205   * store files's name is needed. But if the compacted store file is a result of compaction, it's
206   * compacted files which still not archived is needed, too. And don't need to add compacted files
207   * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will
208   * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E
209   * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to
210   * add D's compacted file, as D's compacted files has been in E's compacted files, too. See
211   * HBASE-20724 for more details.
212   * @param storeFiles The compacted store files to generate this new file
213   * @return bytes of CompactionEventTracker
214   */
215  private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
216    Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream()
217      .map(sf -> sf.getPath().getName()).collect(Collectors.toSet());
218    Set<String> compactedStoreFiles = new HashSet<>();
219    for (HStoreFile storeFile : storeFiles) {
220      compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
221      for (String csf : storeFile.getCompactedStoreFiles()) {
222        if (notArchivedCompactedStoreFiles.contains(csf)) {
223          compactedStoreFiles.add(csf);
224        }
225      }
226    }
227    return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
228  }
229
230  /**
231   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
232   * @param maxSequenceId   Maximum sequence id.
233   * @param majorCompaction True if this file is product of a major compaction
234   * @param mobCellsCount   The number of mob cells.
235   * @throws IOException problem writing to FS
236   */
237  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
238    final long mobCellsCount) throws IOException {
239    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
240    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
241    writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
242    appendTrackedTimestampsToMetadata();
243  }
244
245  /**
246   * Appends MOB - specific metadata (even if it is empty)
247   * @param mobRefSet - original table -> set of MOB file names
248   * @throws IOException problem writing to FS
249   */
250  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
251    writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
252  }
253
254  /**
255   * Add TimestampRange and earliest put timestamp to Metadata
256   */
257  public void appendTrackedTimestampsToMetadata() throws IOException {
258    // TODO: The StoreFileReader always converts the byte[] to TimeRange
259    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
260    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
261    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
262  }
263
264  /**
265   * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
266   * to include the timestamp of this key
267   */
268  public void trackTimestamps(final Cell cell) {
269    if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
270      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
271    }
272    timeRangeTracker.includeTimestamp(cell);
273  }
274
275  private void appendGeneralBloomfilter(final Cell cell) throws IOException {
276    if (this.generalBloomFilterWriter != null) {
277      /*
278       * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
279       * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of
280       * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed Length
281       * Row Prefix
282       */
283      bloomContext.writeBloom(cell);
284    }
285  }
286
287  private void appendDeleteFamilyBloomFilter(final Cell cell) throws IOException {
288    if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
289      return;
290    }
291
292    // increase the number of delete family in the store file
293    deleteFamilyCnt++;
294    if (this.deleteFamilyBloomFilterWriter != null) {
295      deleteFamilyBloomContext.writeBloom(cell);
296    }
297  }
298
299  @Override
300  public void append(final Cell cell) throws IOException {
301    appendGeneralBloomfilter(cell);
302    appendDeleteFamilyBloomFilter(cell);
303    writer.append(cell);
304    trackTimestamps(cell);
305  }
306
307  @Override
308  public void beforeShipped() throws IOException {
309    // For now these writer will always be of type ShipperListener true.
310    // TODO : Change all writers to be specifically created for compaction context
311    writer.beforeShipped();
312    if (generalBloomFilterWriter != null) {
313      generalBloomFilterWriter.beforeShipped();
314    }
315    if (deleteFamilyBloomFilterWriter != null) {
316      deleteFamilyBloomFilterWriter.beforeShipped();
317    }
318  }
319
320  public Path getPath() {
321    return this.writer.getPath();
322  }
323
324  public boolean hasGeneralBloom() {
325    return this.generalBloomFilterWriter != null;
326  }
327
328  /**
329   * For unit testing only.
330   * @return the Bloom filter used by this writer.
331   */
332  BloomFilterWriter getGeneralBloomWriter() {
333    return generalBloomFilterWriter;
334  }
335
336  private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
337    boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
338    if (haveBloom) {
339      bfw.compactBloom();
340    }
341    return haveBloom;
342  }
343
344  private boolean closeGeneralBloomFilter() throws IOException {
345    boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
346
347    // add the general Bloom filter writer and append file info
348    if (hasGeneralBloom) {
349      writer.addGeneralBloomFilter(generalBloomFilterWriter);
350      writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
351      if (bloomParam != null) {
352        writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
353      }
354      bloomContext.addLastBloomKey(writer);
355    }
356    return hasGeneralBloom;
357  }
358
359  private boolean closeDeleteFamilyBloomFilter() throws IOException {
360    boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
361
362    // add the delete family Bloom filter writer
363    if (hasDeleteFamilyBloom) {
364      writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
365    }
366
367    // append file info about the number of delete family kvs
368    // even if there is no delete family Bloom.
369    writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
370
371    return hasDeleteFamilyBloom;
372  }
373
374  public void close() throws IOException {
375    boolean hasGeneralBloom = this.closeGeneralBloomFilter();
376    boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
377
378    writer.close();
379
380    // Log final Bloom filter statistics. This needs to be done after close()
381    // because compound Bloom filters might be finalized as part of closing.
382    if (LOG.isTraceEnabled()) {
383      LOG.trace(
384        (hasGeneralBloom ? "" : "NO ") + "General Bloom and " + (hasDeleteFamilyBloom ? "" : "NO ")
385          + "DeleteFamily" + " was added to HFile " + getPath());
386    }
387
388  }
389
390  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
391    writer.appendFileInfo(key, value);
392  }
393
394  /**
395   * For use in testing.
396   */
397  HFile.Writer getHFileWriter() {
398    return writer;
399  }
400
401  /**
402   * @param dir Directory to create file in.
403   * @return random filename inside passed <code>dir</code>
404   */
405  static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
406    if (!fs.getFileStatus(dir).isDirectory()) {
407      throw new IOException("Expecting " + dir.toString() + " to be a directory");
408    }
409    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
410  }
411
412  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
413      justification = "Will not overflow")
414  public static class Builder {
415    private final Configuration conf;
416    private final CacheConfig cacheConf;
417    private final FileSystem fs;
418
419    private BloomType bloomType = BloomType.NONE;
420    private long maxKeyCount = 0;
421    private Path dir;
422    private Path filePath;
423    private InetSocketAddress[] favoredNodes;
424    private HFileContext fileContext;
425    private boolean shouldDropCacheBehind;
426    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
427    private String fileStoragePolicy;
428    // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
429    // implementation where we will write store files directly to the final place, instead of
430    // writing a tmp file first. Under this scenario, we will have a background task to purge the
431    // store files which are not recorded in the SFT, but for the newly created store file writer,
432    // they are not tracked in SFT, so here we need to record them and treat them specially.
433    private Consumer<Path> writerCreationTracker;
434
435    public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
436      this.conf = conf;
437      this.cacheConf = cacheConf;
438      this.fs = fs;
439    }
440
441    /**
442     * Creates Builder with cache configuration disabled
443     */
444    public Builder(Configuration conf, FileSystem fs) {
445      this.conf = conf;
446      this.cacheConf = CacheConfig.DISABLED;
447      this.fs = fs;
448    }
449
450    /**
451     * Use either this method or {@link #withFilePath}, but not both.
452     * @param dir Path to column family directory. The directory is created if does not exist. The
453     *            file is given a unique name within this directory.
454     * @return this (for chained invocation)
455     */
456    public Builder withOutputDir(Path dir) {
457      Preconditions.checkNotNull(dir);
458      this.dir = dir;
459      return this;
460    }
461
462    /**
463     * Use either this method or {@link #withOutputDir}, but not both.
464     * @param filePath the StoreFile path to write
465     * @return this (for chained invocation)
466     */
467    public Builder withFilePath(Path filePath) {
468      Preconditions.checkNotNull(filePath);
469      this.filePath = filePath;
470      return this;
471    }
472
473    /**
474     * @param favoredNodes an array of favored nodes or possibly null
475     * @return this (for chained invocation)
476     */
477    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
478      this.favoredNodes = favoredNodes;
479      return this;
480    }
481
482    public Builder withBloomType(BloomType bloomType) {
483      Preconditions.checkNotNull(bloomType);
484      this.bloomType = bloomType;
485      return this;
486    }
487
488    /**
489     * @param maxKeyCount estimated maximum number of keys we expect to add
490     * @return this (for chained invocation)
491     */
492    public Builder withMaxKeyCount(long maxKeyCount) {
493      this.maxKeyCount = maxKeyCount;
494      return this;
495    }
496
497    public Builder withFileContext(HFileContext fileContext) {
498      this.fileContext = fileContext;
499      return this;
500    }
501
502    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
503      this.shouldDropCacheBehind = shouldDropCacheBehind;
504      return this;
505    }
506
507    public Builder
508      withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
509      this.compactedFilesSupplier = compactedFilesSupplier;
510      return this;
511    }
512
513    public Builder withFileStoragePolicy(String fileStoragePolicy) {
514      this.fileStoragePolicy = fileStoragePolicy;
515      return this;
516    }
517
518    public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
519      this.writerCreationTracker = writerCreationTracker;
520      return this;
521    }
522
523    /**
524     * Create a store file writer. Client is responsible for closing file when done. If metadata,
525     * add BEFORE closing using {@link StoreFileWriter#appendMetadata}.
526     */
527    public StoreFileWriter build() throws IOException {
528      if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
529        throw new IllegalArgumentException("Either specify parent directory " + "or file path");
530      }
531
532      if (dir == null) {
533        dir = filePath.getParent();
534      }
535
536      if (!fs.exists(dir)) {
537        // Handle permission for non-HDFS filesystem properly
538        // See HBASE-17710
539        HRegionFileSystem.mkdirs(fs, conf, dir);
540      }
541
542      // set block storage policy for temp path
543      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
544      if (null == policyName) {
545        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
546      }
547      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
548
549      if (filePath == null) {
550        // The stored file and related blocks will used the directory based StoragePolicy.
551        // Because HDFS DistributedFileSystem does not support create files with storage policy
552        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
553        // satisfy the specific storage policy when writing. So as to avoid later data movement.
554        // We don't want to change whole temp dir to 'fileStoragePolicy'.
555        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
556          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
557          if (!fs.exists(dir)) {
558            HRegionFileSystem.mkdirs(fs, conf, dir);
559            LOG.info(
560              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
561          }
562          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
563        }
564        filePath = getUniqueFile(fs, dir);
565        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
566          bloomType = BloomType.NONE;
567        }
568      }
569      // make sure we call this before actually create the writer
570      // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
571      // try to delete it and finally we will clean the tracker up after compaction. But if the file
572      // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
573      // and cause problem.
574      if (writerCreationTracker != null) {
575        writerCreationTracker.accept(filePath);
576      }
577      return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
578        favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
579    }
580  }
581}