001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
029import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
030import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY;
031
032import java.io.IOException;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Set;
040import java.util.UUID;
041import java.util.function.Consumer;
042import java.util.function.Supplier;
043import java.util.regex.Pattern;
044import java.util.stream.Collectors;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellComparator;
050import org.apache.hadoop.hbase.CellUtil;
051import org.apache.hadoop.hbase.ExtendedCell;
052import org.apache.hadoop.hbase.HBaseInterfaceAudience;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.PrivateCellUtil;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.io.hfile.CacheConfig;
058import org.apache.hadoop.hbase.io.hfile.HFile;
059import org.apache.hadoop.hbase.io.hfile.HFileContext;
060import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
061import org.apache.hadoop.hbase.mob.MobUtils;
062import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
063import org.apache.hadoop.hbase.util.BloomContext;
064import org.apache.hadoop.hbase.util.BloomFilterFactory;
065import org.apache.hadoop.hbase.util.BloomFilterUtil;
066import org.apache.hadoop.hbase.util.BloomFilterWriter;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.CommonFSUtils;
069import org.apache.hadoop.hbase.util.RowBloomContext;
070import org.apache.hadoop.hbase.util.RowColBloomContext;
071import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
072import org.apache.yetus.audience.InterfaceAudience;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
077import org.apache.hbase.thirdparty.com.google.common.base.Strings;
078import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
079import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
080
081import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
082
083/**
084 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is
085 * an implementation detail of the HBase regionserver.
086 */
087@InterfaceAudience.Private
088public class StoreFileWriter implements CellSink, ShipperListener {
089  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
090  public static final String ENABLE_HISTORICAL_COMPACTION_FILES =
091    "hbase.enable.historical.compaction.files";
092  public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false;
093  private static final Pattern dash = Pattern.compile("-");
094  private SingleStoreFileWriter liveFileWriter;
095  private SingleStoreFileWriter historicalFileWriter;
096  private final FileSystem fs;
097  private final Path historicalFilePath;
098  private final Configuration conf;
099  private final CacheConfig cacheConf;
100  private final BloomType bloomType;
101  private final long maxKeys;
102  private final InetSocketAddress[] favoredNodes;
103  private final HFileContext fileContext;
104  private final boolean shouldDropCacheBehind;
105  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
106  private final CellComparator comparator;
107  private ExtendedCell lastCell;
108  // The first (latest) delete family marker of the current row
109  private ExtendedCell deleteFamily;
110  // The list of delete family version markers of the current row
111  private List<ExtendedCell> deleteFamilyVersionList = new ArrayList<>();
112  // The first (latest) delete column marker of the current column
113  private ExtendedCell deleteColumn;
114  // The list of delete column version markers of the current column
115  private List<ExtendedCell> deleteColumnVersionList = new ArrayList<>();
116  // The live put cell count for the current column
117  private int livePutCellCount;
118  private final int maxVersions;
119  private final boolean newVersionBehavior;
120
121  /**
122   * Creates an HFile.Writer that also write helpful meta data.
123   * @param fs                     file system to write to
124   * @param liveFilePath           the name of the live file to create
125   * @param historicalFilePath     the name of the historical file name to create
126   * @param conf                   user configuration
127   * @param bloomType              bloom filter setting
128   * @param maxKeys                the expected maximum number of keys to be added. Was used for
129   *                               Bloom filter size in {@link HFile} format version 1.
130   * @param favoredNodes           an array of favored nodes or possibly null
131   * @param fileContext            The HFile context
132   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
133   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
134   * @param comparator             Cell comparator
135   * @param maxVersions            max cell versions
136   * @param newVersionBehavior     enable new version behavior
137   * @throws IOException problem writing to FS
138   */
139  private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath,
140    final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys,
141    InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind,
142    Supplier<Collection<HStoreFile>> compactedFilesSupplier, CellComparator comparator,
143    int maxVersions, boolean newVersionBehavior) throws IOException {
144    this.fs = fs;
145    this.historicalFilePath = historicalFilePath;
146    this.conf = conf;
147    this.cacheConf = cacheConf;
148    this.bloomType = bloomType;
149    this.maxKeys = maxKeys;
150    this.favoredNodes = favoredNodes;
151    this.fileContext = fileContext;
152    this.shouldDropCacheBehind = shouldDropCacheBehind;
153    this.compactedFilesSupplier = compactedFilesSupplier;
154    this.comparator = comparator;
155    this.maxVersions = maxVersions;
156    this.newVersionBehavior = newVersionBehavior;
157    liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType,
158      maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
159  }
160
161  public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) {
162    if (
163      conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES,
164        DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES)
165    ) {
166      // Historical compaction files are supported only for default store engine with
167      // default compactor.
168      String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
169      if (!storeEngine.equals(DefaultStoreEngine.class.getName())) {
170        LOG.warn("Historical compaction file generation is ignored for " + storeEngine
171          + ". hbase.enable.historical.compaction.files can be set to true only for the "
172          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
173        return false;
174      }
175      String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName());
176      if (!compactor.equals(DefaultCompactor.class.getName())) {
177        LOG.warn("Historical compaction file generation is ignored for " + compactor
178          + ". hbase.enable.historical.compaction.files can be set to true only for the "
179          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
180        return false;
181      }
182      return true;
183    }
184    return false;
185  }
186
187  public long getPos() throws IOException {
188    return liveFileWriter.getPos();
189  }
190
191  /**
192   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
193   * @param maxSequenceId   Maximum sequence id.
194   * @param majorCompaction True if this file is product of a major compaction
195   * @throws IOException problem writing to FS
196   */
197  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
198    throws IOException {
199    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction);
200    if (historicalFileWriter != null) {
201      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction);
202    }
203  }
204
205  /**
206   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
207   * @param maxSequenceId   Maximum sequence id.
208   * @param majorCompaction True if this file is product of a major compaction
209   * @param storeFiles      The compacted store files to generate this new file
210   * @throws IOException problem writing to FS
211   */
212  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
213    final Collection<HStoreFile> storeFiles) throws IOException {
214    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
215    if (historicalFileWriter != null) {
216      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
217    }
218  }
219
220  /**
221   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
222   * @param maxSequenceId   Maximum sequence id.
223   * @param majorCompaction True if this file is product of a major compaction
224   * @param mobCellsCount   The number of mob cells.
225   * @throws IOException problem writing to FS
226   */
227  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
228    final long mobCellsCount) throws IOException {
229    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
230    if (historicalFileWriter != null) {
231      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
232    }
233  }
234
235  /**
236   * Appends MOB - specific metadata (even if it is empty)
237   * @param mobRefSet - original table -> set of MOB file names
238   * @throws IOException problem writing to FS
239   */
240  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
241    liveFileWriter.appendMobMetadata(mobRefSet);
242    if (historicalFileWriter != null) {
243      historicalFileWriter.appendMobMetadata(mobRefSet);
244    }
245  }
246
247  /**
248   * Add TimestampRange and earliest put timestamp to Metadata
249   */
250  public void appendTrackedTimestampsToMetadata() throws IOException {
251    // TODO: The StoreFileReader always converts the byte[] to TimeRange
252    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
253    liveFileWriter.appendTrackedTimestampsToMetadata();
254    if (historicalFileWriter != null) {
255      historicalFileWriter.appendTrackedTimestampsToMetadata();
256    }
257  }
258
259  public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
260    throws IOException {
261    liveFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker);
262    if (historicalFileWriter != null) {
263      historicalFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker);
264    }
265  }
266
267  @Override
268  public void beforeShipped() throws IOException {
269    liveFileWriter.beforeShipped();
270    if (historicalFileWriter != null) {
271      historicalFileWriter.beforeShipped();
272    }
273  }
274
275  public Path getPath() {
276    return liveFileWriter.getPath();
277  }
278
279  public List<Path> getPaths() {
280    if (historicalFileWriter == null) {
281      return Lists.newArrayList(liveFileWriter.getPath());
282    }
283    return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath());
284  }
285
286  public boolean hasGeneralBloom() {
287    return liveFileWriter.hasGeneralBloom();
288  }
289
290  /**
291   * For unit testing only.
292   * @return the Bloom filter used by this writer.
293   */
294  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
295  public BloomFilterWriter getGeneralBloomWriter() {
296    return liveFileWriter.generalBloomFilterWriter;
297  }
298
299  public void close() throws IOException {
300    liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false));
301    liveFileWriter.close();
302    if (historicalFileWriter != null) {
303      historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true));
304      historicalFileWriter.close();
305    }
306  }
307
308  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
309    liveFileWriter.appendFileInfo(key, value);
310    if (historicalFileWriter != null) {
311      historicalFileWriter.appendFileInfo(key, value);
312    }
313  }
314
315  /**
316   * For use in testing.
317   */
318  HFile.Writer getLiveFileWriter() {
319    return liveFileWriter.getHFileWriter();
320  }
321
322  /**
323   * @param dir Directory to create file in.
324   * @return random filename inside passed <code>dir</code>
325   */
326  public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
327    if (!fs.getFileStatus(dir).isDirectory()) {
328      throw new IOException("Expecting " + dir.toString() + " to be a directory");
329    }
330    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
331  }
332
333  private SingleStoreFileWriter getHistoricalFileWriter() throws IOException {
334    if (historicalFileWriter == null) {
335      historicalFileWriter =
336        new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys,
337          favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
338    }
339    return historicalFileWriter;
340  }
341
342  private void initRowState() {
343    deleteFamily = null;
344    deleteFamilyVersionList.clear();
345    lastCell = null;
346  }
347
348  private void initColumnState() {
349    livePutCellCount = 0;
350    deleteColumn = null;
351    deleteColumnVersionList.clear();
352
353  }
354
355  private boolean isDeletedByDeleteFamily(ExtendedCell cell) {
356    return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp()
357      || (deleteFamily.getTimestamp() == cell.getTimestamp()
358        && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId())));
359  }
360
361  private boolean isDeletedByDeleteFamilyVersion(ExtendedCell cell) {
362    for (ExtendedCell deleteFamilyVersion : deleteFamilyVersionList) {
363      if (
364        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
365          && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId())
366      ) {
367        return true;
368      }
369    }
370    return false;
371  }
372
373  private boolean isDeletedByDeleteColumn(ExtendedCell cell) {
374    return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp()
375      || (deleteColumn.getTimestamp() == cell.getTimestamp()
376        && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId())));
377  }
378
379  private boolean isDeletedByDeleteColumnVersion(ExtendedCell cell) {
380    for (ExtendedCell deleteColumnVersion : deleteColumnVersionList) {
381      if (
382        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
383          && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId())
384      ) {
385        return true;
386      }
387    }
388    return false;
389  }
390
391  private boolean isDeleted(ExtendedCell cell) {
392    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
393      || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell);
394  }
395
396  private void appendCell(ExtendedCell cell) throws IOException {
397    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
398      initColumnState();
399    }
400    if (cell.getType() == Cell.Type.DeleteFamily) {
401      if (deleteFamily == null) {
402        deleteFamily = cell;
403        liveFileWriter.append(cell);
404      } else {
405        getHistoricalFileWriter().append(cell);
406      }
407    } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
408      if (!isDeletedByDeleteFamily(cell)) {
409        deleteFamilyVersionList.add(cell);
410        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
411          // This means both the delete-family and delete-family-version markers have the same
412          // timestamp but the sequence id of delete-family-version marker is higher than that of
413          // the delete-family marker. In this case, there is no need to add the
414          // delete-family-version marker to the live version file. This case happens only with
415          // the new version behavior.
416          liveFileWriter.append(cell);
417        } else {
418          liveFileWriter.append(cell);
419        }
420      } else {
421        getHistoricalFileWriter().append(cell);
422      }
423    } else if (cell.getType() == Cell.Type.DeleteColumn) {
424      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
425        deleteColumn = cell;
426        liveFileWriter.append(cell);
427      } else {
428        getHistoricalFileWriter().append(cell);
429      }
430    } else if (cell.getType() == Cell.Type.Delete) {
431      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
432        deleteColumnVersionList.add(cell);
433        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
434          // This means both the delete-family and delete-column-version markers have the same
435          // timestamp but the sequence id of delete-column-version marker is higher than that of
436          // the delete-family marker. In this case, there is no need to add the
437          // delete-column-version marker to the live version file. This case happens only with
438          // the new version behavior.
439          getHistoricalFileWriter().append(cell);
440        } else {
441          liveFileWriter.append(cell);
442        }
443      } else {
444        getHistoricalFileWriter().append(cell);
445      }
446    } else if (cell.getType() == Cell.Type.Put) {
447      if (livePutCellCount < maxVersions) {
448        // This is a live put cell (i.e., the latest version) of a column. Is it deleted?
449        if (!isDeleted(cell)) {
450          liveFileWriter.append(cell);
451          livePutCellCount++;
452        } else {
453          // It is deleted
454          getHistoricalFileWriter().append(cell);
455          if (newVersionBehavior) {
456            // Deleted versions are considered toward total version count when newVersionBehavior
457            livePutCellCount++;
458          }
459        }
460      } else {
461        // It is an older put cell
462        getHistoricalFileWriter().append(cell);
463      }
464    }
465    lastCell = cell;
466  }
467
468  @Override
469  public void appendAll(List<ExtendedCell> cellList) throws IOException {
470    if (historicalFilePath == null) {
471      // The dual writing is not enabled and all cells are written to one file. We use
472      // the live version file in this case
473      for (ExtendedCell cell : cellList) {
474        liveFileWriter.append(cell);
475      }
476      return;
477    }
478    if (cellList.isEmpty()) {
479      return;
480    }
481    if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) {
482      // It is a new row and thus time to reset the state
483      initRowState();
484    }
485    for (ExtendedCell cell : cellList) {
486      appendCell(cell);
487    }
488  }
489
490  @Override
491  public void append(ExtendedCell cell) throws IOException {
492    if (historicalFilePath == null) {
493      // The dual writing is not enabled and all cells are written to one file. We use
494      // the live version file in this case
495      liveFileWriter.append(cell);
496      return;
497    }
498    appendCell(cell);
499  }
500
501  private static class SingleStoreFileWriter {
502    private final BloomFilterWriter generalBloomFilterWriter;
503    private final BloomFilterWriter deleteFamilyBloomFilterWriter;
504    private final BloomType bloomType;
505    private byte[] bloomParam = null;
506    private long deleteFamilyCnt = 0;
507    private BloomContext bloomContext = null;
508    private BloomContext deleteFamilyBloomContext = null;
509    private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
510
511    private HFile.Writer writer;
512
513    /**
514     * Creates an HFile.Writer that also write helpful meta data.
515     * @param fs                     file system to write to
516     * @param path                   file name to create
517     * @param conf                   user configuration
518     * @param bloomType              bloom filter setting
519     * @param maxKeys                the expected maximum number of keys to be added. Was used for
520     *                               Bloom filter size in {@link HFile} format version 1.
521     * @param favoredNodes           an array of favored nodes or possibly null
522     * @param fileContext            The HFile context
523     * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
524     * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
525     * @throws IOException problem writing to FS
526     */
527    private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf,
528      CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes,
529      HFileContext fileContext, boolean shouldDropCacheBehind,
530      Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException {
531      this.compactedFilesSupplier = compactedFilesSupplier;
532      // TODO : Change all writers to be specifically created for compaction context
533      writer =
534        HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes)
535          .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create();
536
537      generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf,
538        bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
539
540      if (generalBloomFilterWriter != null) {
541        this.bloomType = bloomType;
542        this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
543        if (LOG.isTraceEnabled()) {
544          LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
545            + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH
546              ? Bytes.toInt(bloomParam)
547              : Bytes.toStringBinary(bloomParam))
548            + ", " + generalBloomFilterWriter.getClass().getSimpleName());
549        }
550        // init bloom context
551        switch (bloomType) {
552          case ROW:
553            bloomContext =
554              new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
555            break;
556          case ROWCOL:
557            bloomContext =
558              new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
559            break;
560          case ROWPREFIX_FIXED_LENGTH:
561            bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
562              fileContext.getCellComparator(), Bytes.toInt(bloomParam));
563            break;
564          default:
565            throw new IOException(
566              "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
567        }
568      } else {
569        // Not using Bloom filters.
570        this.bloomType = BloomType.NONE;
571      }
572
573      // initialize delete family Bloom filter when there is NO RowCol Bloom filter
574      if (this.bloomType != BloomType.ROWCOL) {
575        this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf,
576          cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
577        deleteFamilyBloomContext =
578          new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
579      } else {
580        deleteFamilyBloomFilterWriter = null;
581      }
582      if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
583        LOG.trace("Delete Family Bloom filter type for " + path + ": "
584          + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
585      }
586    }
587
588    private long getPos() throws IOException {
589      return ((HFileWriterImpl) writer).getPos();
590    }
591
592    /**
593     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
594     * @param maxSequenceId   Maximum sequence id.
595     * @param majorCompaction True if this file is product of a major compaction
596     * @throws IOException problem writing to FS
597     */
598    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
599      throws IOException {
600      appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
601    }
602
603    /**
604     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
605     * @param maxSequenceId   Maximum sequence id.
606     * @param majorCompaction True if this file is product of a major compaction
607     * @param storeFiles      The compacted store files to generate this new file
608     * @throws IOException problem writing to FS
609     */
610    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
611      final Collection<HStoreFile> storeFiles) throws IOException {
612      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
613      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
614      writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
615      appendTrackedTimestampsToMetadata();
616    }
617
618    /**
619     * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The
620     * compacted store files's name is needed. But if the compacted store file is a result of
621     * compaction, it's compacted files which still not archived is needed, too. And don't need to
622     * add compacted files recursively. If file A, B, C compacted to new file D, and file D
623     * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E
624     * compacted to new file F, will add E to F's compacted files first, then add E's compacted
625     * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has
626     * been in E's compacted files, too. See HBASE-20724 for more details.
627     * @param storeFiles The compacted store files to generate this new file
628     * @return bytes of CompactionEventTracker
629     */
630    private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
631      Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream()
632        .map(sf -> sf.getPath().getName()).collect(Collectors.toSet());
633      Set<String> compactedStoreFiles = new HashSet<>();
634      for (HStoreFile storeFile : storeFiles) {
635        compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
636        for (String csf : storeFile.getCompactedStoreFiles()) {
637          if (notArchivedCompactedStoreFiles.contains(csf)) {
638            compactedStoreFiles.add(csf);
639          }
640        }
641      }
642      return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
643    }
644
645    /**
646     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
647     * @param maxSequenceId   Maximum sequence id.
648     * @param majorCompaction True if this file is product of a major compaction
649     * @param mobCellsCount   The number of mob cells.
650     * @throws IOException problem writing to FS
651     */
652    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
653      final long mobCellsCount) throws IOException {
654      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
655      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
656      writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
657      appendTrackedTimestampsToMetadata();
658    }
659
660    /**
661     * Appends MOB - specific metadata (even if it is empty)
662     * @param mobRefSet - original table -> set of MOB file names
663     * @throws IOException problem writing to FS
664     */
665    private void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
666      writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
667    }
668
669    /**
670     * Add TimestampRange and earliest put timestamp to Metadata
671     */
672    private void appendTrackedTimestampsToMetadata() throws IOException {
673      writer.appendTrackedTimestampsToMetadata();
674    }
675
676    public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
677      throws IOException {
678      writer.appendCustomCellTimestampsToMetadata(timeRangeTracker);
679    }
680
681    private void appendGeneralBloomfilter(final ExtendedCell cell) throws IOException {
682      if (this.generalBloomFilterWriter != null) {
683        /*
684         * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.
685         * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of
686         * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed
687         * Length Row Prefix
688         */
689        bloomContext.writeBloom(cell);
690      }
691    }
692
693    private void appendDeleteFamilyBloomFilter(final ExtendedCell cell) throws IOException {
694      if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
695        return;
696      }
697
698      // increase the number of delete family in the store file
699      deleteFamilyCnt++;
700      if (this.deleteFamilyBloomFilterWriter != null) {
701        deleteFamilyBloomContext.writeBloom(cell);
702      }
703    }
704
705    private void append(final ExtendedCell cell) throws IOException {
706      appendGeneralBloomfilter(cell);
707      appendDeleteFamilyBloomFilter(cell);
708      writer.append(cell);
709    }
710
711    private void beforeShipped() throws IOException {
712      // For now these writer will always be of type ShipperListener true.
713      // TODO : Change all writers to be specifically created for compaction context
714      writer.beforeShipped();
715      if (generalBloomFilterWriter != null) {
716        generalBloomFilterWriter.beforeShipped();
717      }
718      if (deleteFamilyBloomFilterWriter != null) {
719        deleteFamilyBloomFilterWriter.beforeShipped();
720      }
721    }
722
723    private Path getPath() {
724      return this.writer.getPath();
725    }
726
727    private boolean hasGeneralBloom() {
728      return this.generalBloomFilterWriter != null;
729    }
730
731    /**
732     * For unit testing only.
733     * @return the Bloom filter used by this writer.
734     */
735    BloomFilterWriter getGeneralBloomWriter() {
736      return generalBloomFilterWriter;
737    }
738
739    private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
740      boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
741      if (haveBloom) {
742        bfw.compactBloom();
743      }
744      return haveBloom;
745    }
746
747    private boolean closeGeneralBloomFilter() throws IOException {
748      boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
749
750      // add the general Bloom filter writer and append file info
751      if (hasGeneralBloom) {
752        writer.addGeneralBloomFilter(generalBloomFilterWriter);
753        writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
754        if (bloomParam != null) {
755          writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
756        }
757        bloomContext.addLastBloomKey(writer);
758      }
759      return hasGeneralBloom;
760    }
761
762    private boolean closeDeleteFamilyBloomFilter() throws IOException {
763      boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
764
765      // add the delete family Bloom filter writer
766      if (hasDeleteFamilyBloom) {
767        writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
768      }
769
770      // append file info about the number of delete family kvs
771      // even if there is no delete family Bloom.
772      writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
773
774      return hasDeleteFamilyBloom;
775    }
776
777    private void close() throws IOException {
778      boolean hasGeneralBloom = this.closeGeneralBloomFilter();
779      boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
780
781      writer.close();
782
783      // Log final Bloom filter statistics. This needs to be done after close()
784      // because compound Bloom filters might be finalized as part of closing.
785      if (LOG.isTraceEnabled()) {
786        LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and "
787          + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile "
788          + getPath());
789      }
790
791    }
792
793    private void appendFileInfo(byte[] key, byte[] value) throws IOException {
794      writer.appendFileInfo(key, value);
795    }
796
797    /**
798     * For use in testing.
799     */
800    private HFile.Writer getHFileWriter() {
801      return writer;
802    }
803  }
804
805  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
806      justification = "Will not overflow")
807  public static class Builder {
808    private final Configuration conf;
809    private final CacheConfig cacheConf;
810    private final FileSystem fs;
811
812    private BloomType bloomType = BloomType.NONE;
813    private long maxKeyCount = 0;
814    private Path dir;
815    private Path liveFilePath;
816    private Path historicalFilePath;
817
818    private InetSocketAddress[] favoredNodes;
819    private HFileContext fileContext;
820    private boolean shouldDropCacheBehind;
821    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
822    private String fileStoragePolicy;
823    // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
824    // implementation where we will write store files directly to the final place, instead of
825    // writing a tmp file first. Under this scenario, we will have a background task to purge the
826    // store files which are not recorded in the SFT, but for the newly created store file writer,
827    // they are not tracked in SFT, so here we need to record them and treat them specially.
828    private Consumer<Path> writerCreationTracker;
829    private int maxVersions;
830    private boolean newVersionBehavior;
831    private CellComparator comparator;
832    private boolean isCompaction;
833
834    public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
835      this.conf = conf;
836      this.cacheConf = cacheConf;
837      this.fs = fs;
838    }
839
840    /**
841     * Creates Builder with cache configuration disabled
842     */
843    public Builder(Configuration conf, FileSystem fs) {
844      this.conf = conf;
845      this.cacheConf = CacheConfig.DISABLED;
846      this.fs = fs;
847    }
848
849    /**
850     * Use either this method or {@link #withFilePath}, but not both.
851     * @param dir Path to column family directory. The directory is created if does not exist. The
852     *            file is given a unique name within this directory.
853     * @return this (for chained invocation)
854     */
855    public Builder withOutputDir(Path dir) {
856      Preconditions.checkNotNull(dir);
857      this.dir = dir;
858      return this;
859    }
860
861    /**
862     * Use either this method or {@link #withOutputDir}, but not both.
863     * @param filePath the StoreFile path to write
864     * @return this (for chained invocation)
865     */
866    public Builder withFilePath(Path filePath) {
867      Preconditions.checkNotNull(filePath);
868      this.liveFilePath = filePath;
869      return this;
870    }
871
872    /**
873     * @param favoredNodes an array of favored nodes or possibly null
874     * @return this (for chained invocation)
875     */
876    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
877      this.favoredNodes = favoredNodes;
878      return this;
879    }
880
881    public Builder withBloomType(BloomType bloomType) {
882      Preconditions.checkNotNull(bloomType);
883      this.bloomType = bloomType;
884      return this;
885    }
886
887    /**
888     * @param maxKeyCount estimated maximum number of keys we expect to add
889     * @return this (for chained invocation)
890     */
891    public Builder withMaxKeyCount(long maxKeyCount) {
892      this.maxKeyCount = maxKeyCount;
893      return this;
894    }
895
896    public Builder withFileContext(HFileContext fileContext) {
897      this.fileContext = fileContext;
898      return this;
899    }
900
901    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
902      this.shouldDropCacheBehind = shouldDropCacheBehind;
903      return this;
904    }
905
906    public Builder
907      withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
908      this.compactedFilesSupplier = compactedFilesSupplier;
909      return this;
910    }
911
912    public Builder withFileStoragePolicy(String fileStoragePolicy) {
913      this.fileStoragePolicy = fileStoragePolicy;
914      return this;
915    }
916
917    public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
918      this.writerCreationTracker = writerCreationTracker;
919      return this;
920    }
921
922    public Builder withMaxVersions(int maxVersions) {
923      this.maxVersions = maxVersions;
924      return this;
925    }
926
927    public Builder withNewVersionBehavior(boolean newVersionBehavior) {
928      this.newVersionBehavior = newVersionBehavior;
929      return this;
930    }
931
932    public Builder withCellComparator(CellComparator comparator) {
933      this.comparator = comparator;
934      return this;
935    }
936
937    public Builder withIsCompaction(boolean isCompaction) {
938      this.isCompaction = isCompaction;
939      return this;
940    }
941
942    /**
943     * Create a store file writer. Client is responsible for closing file when done. If metadata,
944     * add BEFORE closing using {@link StoreFileWriter#appendMetadata}.
945     */
946    public StoreFileWriter build() throws IOException {
947      if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) {
948        throw new IllegalArgumentException("Either specify parent directory " + "or file path");
949      }
950
951      if (dir == null) {
952        dir = liveFilePath.getParent();
953      }
954
955      if (!fs.exists(dir)) {
956        // Handle permission for non-HDFS filesystem properly
957        // See HBASE-17710
958        HRegionFileSystem.mkdirs(fs, conf, dir);
959      }
960
961      // set block storage policy for temp path
962      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
963      if (null == policyName) {
964        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
965      }
966      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
967
968      if (liveFilePath == null) {
969        // The stored file and related blocks will used the directory based StoragePolicy.
970        // Because HDFS DistributedFileSystem does not support create files with storage policy
971        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
972        // satisfy the specific storage policy when writing. So as to avoid later data movement.
973        // We don't want to change whole temp dir to 'fileStoragePolicy'.
974        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
975          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
976          if (!fs.exists(dir)) {
977            HRegionFileSystem.mkdirs(fs, conf, dir);
978            LOG.info(
979              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
980          }
981          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
982        }
983        liveFilePath = getUniqueFile(fs, dir);
984        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
985          bloomType = BloomType.NONE;
986        }
987      }
988
989      if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) {
990        historicalFilePath = getUniqueFile(fs, dir);
991      }
992
993      // make sure we call this before actually create the writer
994      // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
995      // try to delete it and finally we will clean the tracker up after compaction. But if the file
996      // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
997      // and cause problem.
998      if (writerCreationTracker != null) {
999        writerCreationTracker.accept(liveFilePath);
1000        if (historicalFilePath != null) {
1001          writerCreationTracker.accept(historicalFilePath);
1002        }
1003      }
1004      return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType,
1005        maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier,
1006        comparator, maxVersions, newVersionBehavior);
1007    }
1008  }
1009}