001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
029import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
030
031import java.io.IOException;
032import java.net.InetSocketAddress;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.HashSet;
036import java.util.Set;
037import java.util.UUID;
038import java.util.function.Supplier;
039import java.util.regex.Pattern;
040import java.util.stream.Collectors;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.io.hfile.CacheConfig;
051import org.apache.hadoop.hbase.io.hfile.HFile;
052import org.apache.hadoop.hbase.io.hfile.HFileContext;
053import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
054import org.apache.hadoop.hbase.mob.MobUtils;
055import org.apache.hadoop.hbase.util.BloomContext;
056import org.apache.hadoop.hbase.util.BloomFilterFactory;
057import org.apache.hadoop.hbase.util.BloomFilterUtil;
058import org.apache.hadoop.hbase.util.BloomFilterWriter;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.RowBloomContext;
062import org.apache.hadoop.hbase.util.RowColBloomContext;
063import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
069import org.apache.hbase.thirdparty.com.google.common.base.Strings;
070import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
071
072import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
073
074/**
075 * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
076 * local because it is an implementation detail of the HBase regionserver.
077 */
078@InterfaceAudience.Private
079public class StoreFileWriter implements CellSink, ShipperListener {
080  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
081  private static final Pattern dash = Pattern.compile("-");
082  private final BloomFilterWriter generalBloomFilterWriter;
083  private final BloomFilterWriter deleteFamilyBloomFilterWriter;
084  private final BloomType bloomType;
085  private byte[] bloomParam = null;
086  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
087  private long deleteFamilyCnt = 0;
088  private BloomContext bloomContext = null;
089  private BloomContext deleteFamilyBloomContext = null;
090  private final TimeRangeTracker timeRangeTracker;
091  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
092
093  protected HFile.Writer writer;
094
095  /**
096   * Creates an HFile.Writer that also write helpful meta data.
097   *
098   * @param fs                     file system to write to
099   * @param path                   file name to create
100   * @param conf                   user configuration
101   * @param bloomType              bloom filter setting
102   * @param maxKeys                the expected maximum number of keys to be added. Was used
103   *                               for Bloom filter size in {@link HFile} format version 1.
104   * @param favoredNodes           an array of favored nodes or possibly null
105   * @param fileContext            The HFile context
106   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
107   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
108   * @throws IOException problem writing to FS
109   */
110  private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
111      BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext,
112      boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier)
113        throws IOException {
114    this.compactedFilesSupplier = compactedFilesSupplier;
115    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
116    // TODO : Change all writers to be specifically created for compaction context
117    writer = HFile.getWriterFactory(conf, cacheConf)
118        .withPath(fs, path)
119        .withFavoredNodes(favoredNodes)
120        .withFileContext(fileContext)
121        .withShouldDropCacheBehind(shouldDropCacheBehind)
122        .create();
123
124    generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
125        conf, cacheConf, bloomType,
126        (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
127
128    if (generalBloomFilterWriter != null) {
129      this.bloomType = bloomType;
130      this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
131      if (LOG.isTraceEnabled()) {
132        LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
133            + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH?
134            Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam))
135            + ", " + generalBloomFilterWriter.getClass().getSimpleName());
136      }
137      // init bloom context
138      switch (bloomType) {
139        case ROW:
140          bloomContext =
141            new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
142          break;
143        case ROWCOL:
144          bloomContext =
145            new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
146          break;
147        case ROWPREFIX_FIXED_LENGTH:
148          bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
149            fileContext.getCellComparator(), Bytes.toInt(bloomParam));
150          break;
151        default:
152          throw new IOException(
153              "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
154      }
155    } else {
156      // Not using Bloom filters.
157      this.bloomType = BloomType.NONE;
158    }
159
160    // initialize delete family Bloom filter when there is NO RowCol Bloom
161    // filter
162    if (this.bloomType != BloomType.ROWCOL) {
163      this.deleteFamilyBloomFilterWriter = BloomFilterFactory
164          .createDeleteBloomAtWrite(conf, cacheConf,
165              (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
166      deleteFamilyBloomContext =
167        new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
168    } else {
169      deleteFamilyBloomFilterWriter = null;
170    }
171    if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
172      LOG.trace("Delete Family Bloom filter type for " + path + ": " +
173          deleteFamilyBloomFilterWriter.getClass().getSimpleName());
174    }
175  }
176
177  public long getPos() throws IOException {
178    return ((HFileWriterImpl) writer).getPos();
179  }
180  /**
181   * Writes meta data.
182   * Call before {@link #close()} since its written as meta data to this file.
183   * @param maxSequenceId Maximum sequence id.
184   * @param majorCompaction True if this file is product of a major compaction
185   * @throws IOException problem writing to FS
186   */
187  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
188      throws IOException {
189    appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
190  }
191
192  /**
193   * Writes meta data.
194   * Call before {@link #close()} since its written as meta data to this file.
195   * @param maxSequenceId Maximum sequence id.
196   * @param majorCompaction True if this file is product of a major compaction
197   * @param storeFiles The compacted store files to generate this new file
198   * @throws IOException problem writing to FS
199   */
200  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
201      final Collection<HStoreFile> storeFiles) throws IOException {
202    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
203    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
204    writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
205    appendTrackedTimestampsToMetadata();
206  }
207
208  /**
209   * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted
210   * store files's name is needed. But if the compacted store file is a result of compaction, it's
211   * compacted files which still not archived is needed, too. And don't need to add compacted files
212   * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will
213   * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E
214   * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to
215   * add D's compacted file, as D's compacted files has been in E's compacted files, too.
216   * See HBASE-20724 for more details.
217   *
218   * @param storeFiles The compacted store files to generate this new file
219   * @return bytes of CompactionEventTracker
220   */
221  private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
222    Set<String> notArchivedCompactedStoreFiles =
223        this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName())
224            .collect(Collectors.toSet());
225    Set<String> compactedStoreFiles = new HashSet<>();
226    for (HStoreFile storeFile : storeFiles) {
227      compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
228      for (String csf : storeFile.getCompactedStoreFiles()) {
229        if (notArchivedCompactedStoreFiles.contains(csf)) {
230          compactedStoreFiles.add(csf);
231        }
232      }
233    }
234    return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
235  }
236
237  /**
238   * Writes meta data.
239   * Call before {@link #close()} since its written as meta data to this file.
240   * @param maxSequenceId Maximum sequence id.
241   * @param majorCompaction True if this file is product of a major compaction
242   * @param mobCellsCount The number of mob cells.
243   * @throws IOException problem writing to FS
244   */
245  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
246      final long mobCellsCount) throws IOException {
247    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
248    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
249    writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
250    appendTrackedTimestampsToMetadata();
251  }
252
253  /**
254   * Appends MOB - specific metadata (even if it is empty)
255   * @param mobRefSet - original table -> set of MOB file names
256   * @throws IOException problem writing to FS
257   */
258  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
259    writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
260  }
261
262  /**
263   * Add TimestampRange and earliest put timestamp to Metadata
264   */
265  public void appendTrackedTimestampsToMetadata() throws IOException {
266    // TODO: The StoreFileReader always converts the byte[] to TimeRange
267    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
268    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
269    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
270  }
271
272  /**
273   * Record the earlest Put timestamp.
274   *
275   * If the timeRangeTracker is not set,
276   * update TimeRangeTracker to include the timestamp of this key
277   */
278  public void trackTimestamps(final Cell cell) {
279    if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
280      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
281    }
282    timeRangeTracker.includeTimestamp(cell);
283  }
284
285  private void appendGeneralBloomfilter(final Cell cell) throws IOException {
286    if (this.generalBloomFilterWriter != null) {
287      /*
288       * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
289       * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp
290       *
291       * 3 Types of Filtering:
292       *  1. Row = Row
293       *  2. RowCol = Row + Qualifier
294       *  3. RowPrefixFixedLength  = Fixed Length Row Prefix
295       */
296      bloomContext.writeBloom(cell);
297    }
298  }
299
300  private void appendDeleteFamilyBloomFilter(final Cell cell)
301      throws IOException {
302    if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
303      return;
304    }
305
306    // increase the number of delete family in the store file
307    deleteFamilyCnt++;
308    if (this.deleteFamilyBloomFilterWriter != null) {
309      deleteFamilyBloomContext.writeBloom(cell);
310    }
311  }
312
313  @Override
314  public void append(final Cell cell) throws IOException {
315    appendGeneralBloomfilter(cell);
316    appendDeleteFamilyBloomFilter(cell);
317    writer.append(cell);
318    trackTimestamps(cell);
319  }
320
321  @Override
322  public void beforeShipped() throws IOException {
323    // For now these writer will always be of type ShipperListener true.
324    // TODO : Change all writers to be specifically created for compaction context
325    writer.beforeShipped();
326    if (generalBloomFilterWriter != null) {
327      generalBloomFilterWriter.beforeShipped();
328    }
329    if (deleteFamilyBloomFilterWriter != null) {
330      deleteFamilyBloomFilterWriter.beforeShipped();
331    }
332  }
333
334  public Path getPath() {
335    return this.writer.getPath();
336  }
337
338  public boolean hasGeneralBloom() {
339    return this.generalBloomFilterWriter != null;
340  }
341
342  /**
343   * For unit testing only.
344   *
345   * @return the Bloom filter used by this writer.
346   */
347  BloomFilterWriter getGeneralBloomWriter() {
348    return generalBloomFilterWriter;
349  }
350
351  private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
352    boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
353    if (haveBloom) {
354      bfw.compactBloom();
355    }
356    return haveBloom;
357  }
358
359  private boolean closeGeneralBloomFilter() throws IOException {
360    boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
361
362    // add the general Bloom filter writer and append file info
363    if (hasGeneralBloom) {
364      writer.addGeneralBloomFilter(generalBloomFilterWriter);
365      writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
366      if (bloomParam != null) {
367        writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
368      }
369      bloomContext.addLastBloomKey(writer);
370    }
371    return hasGeneralBloom;
372  }
373
374  private boolean closeDeleteFamilyBloomFilter() throws IOException {
375    boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
376
377    // add the delete family Bloom filter writer
378    if (hasDeleteFamilyBloom) {
379      writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
380    }
381
382    // append file info about the number of delete family kvs
383    // even if there is no delete family Bloom.
384    writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
385
386    return hasDeleteFamilyBloom;
387  }
388
389  public void close() throws IOException {
390    boolean hasGeneralBloom = this.closeGeneralBloomFilter();
391    boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
392
393    writer.close();
394
395    // Log final Bloom filter statistics. This needs to be done after close()
396    // because compound Bloom filters might be finalized as part of closing.
397    if (LOG.isTraceEnabled()) {
398      LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
399        (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
400        getPath());
401    }
402
403  }
404
405  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
406    writer.appendFileInfo(key, value);
407  }
408
409  /** For use in testing.
410   */
411  HFile.Writer getHFileWriter() {
412    return writer;
413  }
414
415  /**
416   * @param dir Directory to create file in.
417   * @return random filename inside passed <code>dir</code>
418   */
419  static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
420    if (!fs.getFileStatus(dir).isDirectory()) {
421      throw new IOException("Expecting " + dir.toString() + " to be a directory");
422    }
423    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
424  }
425
426  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
427      justification="Will not overflow")
428  public static class Builder {
429    private final Configuration conf;
430    private final CacheConfig cacheConf;
431    private final FileSystem fs;
432
433    private BloomType bloomType = BloomType.NONE;
434    private long maxKeyCount = 0;
435    private Path dir;
436    private Path filePath;
437    private InetSocketAddress[] favoredNodes;
438    private HFileContext fileContext;
439    private boolean shouldDropCacheBehind;
440    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
441    private String fileStoragePolicy;
442
443    public Builder(Configuration conf, CacheConfig cacheConf,
444        FileSystem fs) {
445      this.conf = conf;
446      this.cacheConf = cacheConf;
447      this.fs = fs;
448    }
449
450    /**
451     * Creates Builder with cache configuration disabled
452     */
453    public Builder(Configuration conf, FileSystem fs) {
454      this.conf = conf;
455      this.cacheConf = CacheConfig.DISABLED;
456      this.fs = fs;
457    }
458
459    /**
460     * Use either this method or {@link #withFilePath}, but not both.
461     * @param dir Path to column family directory. The directory is created if
462     *          does not exist. The file is given a unique name within this
463     *          directory.
464     * @return this (for chained invocation)
465     */
466    public Builder withOutputDir(Path dir) {
467      Preconditions.checkNotNull(dir);
468      this.dir = dir;
469      return this;
470    }
471
472    /**
473     * Use either this method or {@link #withOutputDir}, but not both.
474     * @param filePath the StoreFile path to write
475     * @return this (for chained invocation)
476     */
477    public Builder withFilePath(Path filePath) {
478      Preconditions.checkNotNull(filePath);
479      this.filePath = filePath;
480      return this;
481    }
482
483    /**
484     * @param favoredNodes an array of favored nodes or possibly null
485     * @return this (for chained invocation)
486     */
487    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
488      this.favoredNodes = favoredNodes;
489      return this;
490    }
491
492    public Builder withBloomType(BloomType bloomType) {
493      Preconditions.checkNotNull(bloomType);
494      this.bloomType = bloomType;
495      return this;
496    }
497
498    /**
499     * @param maxKeyCount estimated maximum number of keys we expect to add
500     * @return this (for chained invocation)
501     */
502    public Builder withMaxKeyCount(long maxKeyCount) {
503      this.maxKeyCount = maxKeyCount;
504      return this;
505    }
506
507    public Builder withFileContext(HFileContext fileContext) {
508      this.fileContext = fileContext;
509      return this;
510    }
511
512    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
513      this.shouldDropCacheBehind = shouldDropCacheBehind;
514      return this;
515    }
516
517    public Builder withCompactedFilesSupplier(
518        Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
519      this.compactedFilesSupplier = compactedFilesSupplier;
520      return this;
521    }
522
523    public Builder withFileStoragePolicy(String fileStoragePolicy) {
524      this.fileStoragePolicy = fileStoragePolicy;
525      return this;
526    }
527
528    /**
529     * Create a store file writer. Client is responsible for closing file when
530     * done. If metadata, add BEFORE closing using
531     * {@link StoreFileWriter#appendMetadata}.
532     */
533    public StoreFileWriter build() throws IOException {
534      if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
535        throw new IllegalArgumentException("Either specify parent directory " +
536            "or file path");
537      }
538
539      if (dir == null) {
540        dir = filePath.getParent();
541      }
542
543      if (!fs.exists(dir)) {
544        // Handle permission for non-HDFS filesystem properly
545        // See HBASE-17710
546        HRegionFileSystem.mkdirs(fs, conf, dir);
547      }
548
549      // set block storage policy for temp path
550      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
551      if (null == policyName) {
552        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
553      }
554      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
555
556      if (filePath == null) {
557        // The stored file and related blocks will used the directory based StoragePolicy.
558        // Because HDFS DistributedFileSystem does not support create files with storage policy
559        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
560        // satisfy the specific storage policy when writing. So as to avoid later data movement.
561        // We don't want to change whole temp dir to 'fileStoragePolicy'.
562        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
563          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
564          if (!fs.exists(dir)) {
565            HRegionFileSystem.mkdirs(fs, conf, dir);
566            LOG.info(
567              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
568          }
569          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
570        }
571        filePath = getUniqueFile(fs, dir);
572        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
573          bloomType = BloomType.NONE;
574        }
575      }
576
577      return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
578          favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
579    }
580  }
581}