001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
029import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
030import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
031import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
032import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY;
033
034import java.io.IOException;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.HashSet;
040import java.util.List;
041import java.util.Set;
042import java.util.UUID;
043import java.util.function.Consumer;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.CellComparator;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.ExtendedCell;
054import org.apache.hadoop.hbase.HBaseInterfaceAudience;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.PrivateCellUtil;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.io.hfile.CacheConfig;
061import org.apache.hadoop.hbase.io.hfile.HFile;
062import org.apache.hadoop.hbase.io.hfile.HFileContext;
063import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
064import org.apache.hadoop.hbase.mob.MobUtils;
065import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
066import org.apache.hadoop.hbase.util.BloomContext;
067import org.apache.hadoop.hbase.util.BloomFilterFactory;
068import org.apache.hadoop.hbase.util.BloomFilterUtil;
069import org.apache.hadoop.hbase.util.BloomFilterWriter;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.CommonFSUtils;
072import org.apache.hadoop.hbase.util.RowBloomContext;
073import org.apache.hadoop.hbase.util.RowColBloomContext;
074import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
080import org.apache.hbase.thirdparty.com.google.common.base.Strings;
081import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
082import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
083
084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
085
086/**
087 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is
088 * an implementation detail of the HBase regionserver.
089 */
090@InterfaceAudience.Private
091public class StoreFileWriter implements CellSink, ShipperListener {
092  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
093  public static final String ENABLE_HISTORICAL_COMPACTION_FILES =
094    "hbase.enable.historical.compaction.files";
095  public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false;
096  private static final Pattern dash = Pattern.compile("-");
097  private SingleStoreFileWriter liveFileWriter;
098  private SingleStoreFileWriter historicalFileWriter;
099  private final FileSystem fs;
100  private final Path historicalFilePath;
101  private final Configuration conf;
102  private final CacheConfig cacheConf;
103  private final BloomType bloomType;
104  private final long maxKeys;
105  private final InetSocketAddress[] favoredNodes;
106  private final HFileContext fileContext;
107  private final boolean shouldDropCacheBehind;
108  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
109  private final CellComparator comparator;
110  private ExtendedCell lastCell;
111  // The first (latest) delete family marker of the current row
112  private ExtendedCell deleteFamily;
113  // The list of delete family version markers of the current row
114  private List<ExtendedCell> deleteFamilyVersionList = new ArrayList<>();
115  // The first (latest) delete column marker of the current column
116  private ExtendedCell deleteColumn;
117  // The list of delete column version markers of the current column
118  private List<ExtendedCell> deleteColumnVersionList = new ArrayList<>();
119  // The live put cell count for the current column
120  private int livePutCellCount;
121  private final int maxVersions;
122  private final boolean newVersionBehavior;
123
124  /**
125   * Creates an HFile.Writer that also write helpful meta data.
126   * @param fs                     file system to write to
127   * @param liveFilePath           the name of the live file to create
128   * @param historicalFilePath     the name of the historical file name to create
129   * @param conf                   user configuration
130   * @param bloomType              bloom filter setting
131   * @param maxKeys                the expected maximum number of keys to be added. Was used for
132   *                               Bloom filter size in {@link HFile} format version 1.
133   * @param favoredNodes           an array of favored nodes or possibly null
134   * @param fileContext            The HFile context
135   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
136   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
137   * @param comparator             Cell comparator
138   * @param maxVersions            max cell versions
139   * @param newVersionBehavior     enable new version behavior
140   * @throws IOException problem writing to FS
141   */
142  private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath,
143    final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys,
144    InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind,
145    Supplier<Collection<HStoreFile>> compactedFilesSupplier, CellComparator comparator,
146    int maxVersions, boolean newVersionBehavior) throws IOException {
147    this.fs = fs;
148    this.historicalFilePath = historicalFilePath;
149    this.conf = conf;
150    this.cacheConf = cacheConf;
151    this.bloomType = bloomType;
152    this.maxKeys = maxKeys;
153    this.favoredNodes = favoredNodes;
154    this.fileContext = fileContext;
155    this.shouldDropCacheBehind = shouldDropCacheBehind;
156    this.compactedFilesSupplier = compactedFilesSupplier;
157    this.comparator = comparator;
158    this.maxVersions = maxVersions;
159    this.newVersionBehavior = newVersionBehavior;
160    liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType,
161      maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
162  }
163
164  public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) {
165    if (
166      conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES,
167        DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES)
168    ) {
169      // Historical compaction files are supported only for default store engine with
170      // default compactor.
171      String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
172      if (!storeEngine.equals(DefaultStoreEngine.class.getName())) {
173        LOG.warn("Historical compaction file generation is ignored for " + storeEngine
174          + ". hbase.enable.historical.compaction.files can be set to true only for the "
175          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
176        return false;
177      }
178      String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName());
179      if (!compactor.equals(DefaultCompactor.class.getName())) {
180        LOG.warn("Historical compaction file generation is ignored for " + compactor
181          + ". hbase.enable.historical.compaction.files can be set to true only for the "
182          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
183        return false;
184      }
185      return true;
186    }
187    return false;
188  }
189
190  public long getPos() throws IOException {
191    return liveFileWriter.getPos();
192  }
193
194  /**
195   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
196   * @param maxSequenceId   Maximum sequence id.
197   * @param majorCompaction True if this file is product of a major compaction
198   * @throws IOException problem writing to FS
199   */
200  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
201    throws IOException {
202    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction);
203    if (historicalFileWriter != null) {
204      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction);
205    }
206  }
207
208  /**
209   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
210   * @param maxSequenceId   Maximum sequence id.
211   * @param majorCompaction True if this file is product of a major compaction
212   * @param storeFiles      The compacted store files to generate this new file
213   * @throws IOException problem writing to FS
214   */
215  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
216    final Collection<HStoreFile> storeFiles) throws IOException {
217    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
218    if (historicalFileWriter != null) {
219      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
220    }
221  }
222
223  /**
224   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
225   * @param maxSequenceId   Maximum sequence id.
226   * @param majorCompaction True if this file is product of a major compaction
227   * @param mobCellsCount   The number of mob cells.
228   * @throws IOException problem writing to FS
229   */
230  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
231    final long mobCellsCount) throws IOException {
232    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
233    if (historicalFileWriter != null) {
234      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
235    }
236  }
237
238  /**
239   * Appends MOB - specific metadata (even if it is empty)
240   * @param mobRefSet - original table -> set of MOB file names
241   * @throws IOException problem writing to FS
242   */
243  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
244    liveFileWriter.appendMobMetadata(mobRefSet);
245    if (historicalFileWriter != null) {
246      historicalFileWriter.appendMobMetadata(mobRefSet);
247    }
248  }
249
250  /**
251   * Add TimestampRange and earliest put timestamp to Metadata
252   */
253  public void appendTrackedTimestampsToMetadata() throws IOException {
254    // TODO: The StoreFileReader always converts the byte[] to TimeRange
255    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
256    liveFileWriter.appendTrackedTimestampsToMetadata();
257    if (historicalFileWriter != null) {
258      historicalFileWriter.appendTrackedTimestampsToMetadata();
259    }
260  }
261
262  @Override
263  public void beforeShipped() throws IOException {
264    liveFileWriter.beforeShipped();
265    if (historicalFileWriter != null) {
266      historicalFileWriter.beforeShipped();
267    }
268  }
269
270  public Path getPath() {
271    return liveFileWriter.getPath();
272  }
273
274  public List<Path> getPaths() {
275    if (historicalFileWriter == null) {
276      return Lists.newArrayList(liveFileWriter.getPath());
277    }
278    return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath());
279  }
280
281  public boolean hasGeneralBloom() {
282    return liveFileWriter.hasGeneralBloom();
283  }
284
285  /**
286   * For unit testing only.
287   * @return the Bloom filter used by this writer.
288   */
289  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
290  public BloomFilterWriter getGeneralBloomWriter() {
291    return liveFileWriter.generalBloomFilterWriter;
292  }
293
294  public void close() throws IOException {
295    liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false));
296    liveFileWriter.close();
297    if (historicalFileWriter != null) {
298      historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true));
299      historicalFileWriter.close();
300    }
301  }
302
303  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
304    liveFileWriter.appendFileInfo(key, value);
305    if (historicalFileWriter != null) {
306      historicalFileWriter.appendFileInfo(key, value);
307    }
308  }
309
310  /**
311   * For use in testing.
312   */
313  HFile.Writer getLiveFileWriter() {
314    return liveFileWriter.getHFileWriter();
315  }
316
317  /**
318   * @param dir Directory to create file in.
319   * @return random filename inside passed <code>dir</code>
320   */
321  public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
322    if (!fs.getFileStatus(dir).isDirectory()) {
323      throw new IOException("Expecting " + dir.toString() + " to be a directory");
324    }
325    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
326  }
327
328  private SingleStoreFileWriter getHistoricalFileWriter() throws IOException {
329    if (historicalFileWriter == null) {
330      historicalFileWriter =
331        new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys,
332          favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
333    }
334    return historicalFileWriter;
335  }
336
337  private void initRowState() {
338    deleteFamily = null;
339    deleteFamilyVersionList.clear();
340    lastCell = null;
341  }
342
343  private void initColumnState() {
344    livePutCellCount = 0;
345    deleteColumn = null;
346    deleteColumnVersionList.clear();
347
348  }
349
350  private boolean isDeletedByDeleteFamily(ExtendedCell cell) {
351    return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp()
352      || (deleteFamily.getTimestamp() == cell.getTimestamp()
353        && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId())));
354  }
355
356  private boolean isDeletedByDeleteFamilyVersion(ExtendedCell cell) {
357    for (ExtendedCell deleteFamilyVersion : deleteFamilyVersionList) {
358      if (
359        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
360          && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId())
361      ) {
362        return true;
363      }
364    }
365    return false;
366  }
367
368  private boolean isDeletedByDeleteColumn(ExtendedCell cell) {
369    return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp()
370      || (deleteColumn.getTimestamp() == cell.getTimestamp()
371        && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId())));
372  }
373
374  private boolean isDeletedByDeleteColumnVersion(ExtendedCell cell) {
375    for (ExtendedCell deleteColumnVersion : deleteColumnVersionList) {
376      if (
377        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
378          && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId())
379      ) {
380        return true;
381      }
382    }
383    return false;
384  }
385
386  private boolean isDeleted(ExtendedCell cell) {
387    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
388      || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell);
389  }
390
391  private void appendCell(ExtendedCell cell) throws IOException {
392    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
393      initColumnState();
394    }
395    if (cell.getType() == Cell.Type.DeleteFamily) {
396      if (deleteFamily == null) {
397        deleteFamily = cell;
398        liveFileWriter.append(cell);
399      } else {
400        getHistoricalFileWriter().append(cell);
401      }
402    } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
403      if (!isDeletedByDeleteFamily(cell)) {
404        deleteFamilyVersionList.add(cell);
405        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
406          // This means both the delete-family and delete-family-version markers have the same
407          // timestamp but the sequence id of delete-family-version marker is higher than that of
408          // the delete-family marker. In this case, there is no need to add the
409          // delete-family-version marker to the live version file. This case happens only with
410          // the new version behavior.
411          liveFileWriter.append(cell);
412        } else {
413          liveFileWriter.append(cell);
414        }
415      } else {
416        getHistoricalFileWriter().append(cell);
417      }
418    } else if (cell.getType() == Cell.Type.DeleteColumn) {
419      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
420        deleteColumn = cell;
421        liveFileWriter.append(cell);
422      } else {
423        getHistoricalFileWriter().append(cell);
424      }
425    } else if (cell.getType() == Cell.Type.Delete) {
426      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
427        deleteColumnVersionList.add(cell);
428        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
429          // This means both the delete-family and delete-column-version markers have the same
430          // timestamp but the sequence id of delete-column-version marker is higher than that of
431          // the delete-family marker. In this case, there is no need to add the
432          // delete-column-version marker to the live version file. This case happens only with
433          // the new version behavior.
434          getHistoricalFileWriter().append(cell);
435        } else {
436          liveFileWriter.append(cell);
437        }
438      } else {
439        getHistoricalFileWriter().append(cell);
440      }
441    } else if (cell.getType() == Cell.Type.Put) {
442      if (livePutCellCount < maxVersions) {
443        // This is a live put cell (i.e., the latest version) of a column. Is it deleted?
444        if (!isDeleted(cell)) {
445          liveFileWriter.append(cell);
446          livePutCellCount++;
447        } else {
448          // It is deleted
449          getHistoricalFileWriter().append(cell);
450          if (newVersionBehavior) {
451            // Deleted versions are considered toward total version count when newVersionBehavior
452            livePutCellCount++;
453          }
454        }
455      } else {
456        // It is an older put cell
457        getHistoricalFileWriter().append(cell);
458      }
459    }
460    lastCell = cell;
461  }
462
463  @Override
464  public void appendAll(List<ExtendedCell> cellList) throws IOException {
465    if (historicalFilePath == null) {
466      // The dual writing is not enabled and all cells are written to one file. We use
467      // the live version file in this case
468      for (ExtendedCell cell : cellList) {
469        liveFileWriter.append(cell);
470      }
471      return;
472    }
473    if (cellList.isEmpty()) {
474      return;
475    }
476    if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) {
477      // It is a new row and thus time to reset the state
478      initRowState();
479    }
480    for (ExtendedCell cell : cellList) {
481      appendCell(cell);
482    }
483  }
484
485  @Override
486  public void append(ExtendedCell cell) throws IOException {
487    if (historicalFilePath == null) {
488      // The dual writing is not enabled and all cells are written to one file. We use
489      // the live version file in this case
490      liveFileWriter.append(cell);
491      return;
492    }
493    appendCell(cell);
494  }
495
496  private static class SingleStoreFileWriter {
497    private final BloomFilterWriter generalBloomFilterWriter;
498    private final BloomFilterWriter deleteFamilyBloomFilterWriter;
499    private final BloomType bloomType;
500    private byte[] bloomParam = null;
501    private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
502    private long deleteFamilyCnt = 0;
503    private BloomContext bloomContext = null;
504    private BloomContext deleteFamilyBloomContext = null;
505    private final TimeRangeTracker timeRangeTracker;
506    private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
507
508    private HFile.Writer writer;
509
510    /**
511     * Creates an HFile.Writer that also write helpful meta data.
512     * @param fs                     file system to write to
513     * @param path                   file name to create
514     * @param conf                   user configuration
515     * @param bloomType              bloom filter setting
516     * @param maxKeys                the expected maximum number of keys to be added. Was used for
517     *                               Bloom filter size in {@link HFile} format version 1.
518     * @param favoredNodes           an array of favored nodes or possibly null
519     * @param fileContext            The HFile context
520     * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
521     * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
522     * @throws IOException problem writing to FS
523     */
524    private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf,
525      CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes,
526      HFileContext fileContext, boolean shouldDropCacheBehind,
527      Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException {
528      this.compactedFilesSupplier = compactedFilesSupplier;
529      this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
530      // TODO : Change all writers to be specifically created for compaction context
531      writer =
532        HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes)
533          .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create();
534
535      generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf,
536        bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
537
538      if (generalBloomFilterWriter != null) {
539        this.bloomType = bloomType;
540        this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
541        if (LOG.isTraceEnabled()) {
542          LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
543            + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH
544              ? Bytes.toInt(bloomParam)
545              : Bytes.toStringBinary(bloomParam))
546            + ", " + generalBloomFilterWriter.getClass().getSimpleName());
547        }
548        // init bloom context
549        switch (bloomType) {
550          case ROW:
551            bloomContext =
552              new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
553            break;
554          case ROWCOL:
555            bloomContext =
556              new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
557            break;
558          case ROWPREFIX_FIXED_LENGTH:
559            bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
560              fileContext.getCellComparator(), Bytes.toInt(bloomParam));
561            break;
562          default:
563            throw new IOException(
564              "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
565        }
566      } else {
567        // Not using Bloom filters.
568        this.bloomType = BloomType.NONE;
569      }
570
571      // initialize delete family Bloom filter when there is NO RowCol Bloom filter
572      if (this.bloomType != BloomType.ROWCOL) {
573        this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf,
574          cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
575        deleteFamilyBloomContext =
576          new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
577      } else {
578        deleteFamilyBloomFilterWriter = null;
579      }
580      if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
581        LOG.trace("Delete Family Bloom filter type for " + path + ": "
582          + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
583      }
584    }
585
586    private long getPos() throws IOException {
587      return ((HFileWriterImpl) writer).getPos();
588    }
589
590    /**
591     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
592     * @param maxSequenceId   Maximum sequence id.
593     * @param majorCompaction True if this file is product of a major compaction
594     * @throws IOException problem writing to FS
595     */
596    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
597      throws IOException {
598      appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
599    }
600
601    /**
602     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
603     * @param maxSequenceId   Maximum sequence id.
604     * @param majorCompaction True if this file is product of a major compaction
605     * @param storeFiles      The compacted store files to generate this new file
606     * @throws IOException problem writing to FS
607     */
608    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
609      final Collection<HStoreFile> storeFiles) throws IOException {
610      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
611      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
612      writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
613      appendTrackedTimestampsToMetadata();
614    }
615
616    /**
617     * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The
618     * compacted store files's name is needed. But if the compacted store file is a result of
619     * compaction, it's compacted files which still not archived is needed, too. And don't need to
620     * add compacted files recursively. If file A, B, C compacted to new file D, and file D
621     * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E
622     * compacted to new file F, will add E to F's compacted files first, then add E's compacted
623     * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has
624     * been in E's compacted files, too. See HBASE-20724 for more details.
625     * @param storeFiles The compacted store files to generate this new file
626     * @return bytes of CompactionEventTracker
627     */
628    private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
629      Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream()
630        .map(sf -> sf.getPath().getName()).collect(Collectors.toSet());
631      Set<String> compactedStoreFiles = new HashSet<>();
632      for (HStoreFile storeFile : storeFiles) {
633        compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
634        for (String csf : storeFile.getCompactedStoreFiles()) {
635          if (notArchivedCompactedStoreFiles.contains(csf)) {
636            compactedStoreFiles.add(csf);
637          }
638        }
639      }
640      return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
641    }
642
643    /**
644     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
645     * @param maxSequenceId   Maximum sequence id.
646     * @param majorCompaction True if this file is product of a major compaction
647     * @param mobCellsCount   The number of mob cells.
648     * @throws IOException problem writing to FS
649     */
650    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
651      final long mobCellsCount) throws IOException {
652      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
653      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
654      writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
655      appendTrackedTimestampsToMetadata();
656    }
657
658    /**
659     * Appends MOB - specific metadata (even if it is empty)
660     * @param mobRefSet - original table -> set of MOB file names
661     * @throws IOException problem writing to FS
662     */
663    private void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
664      writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
665    }
666
667    /**
668     * Add TimestampRange and earliest put timestamp to Metadata
669     */
670    private void appendTrackedTimestampsToMetadata() throws IOException {
671      // TODO: The StoreFileReader always converts the byte[] to TimeRange
672      // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
673      appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
674      appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
675    }
676
677    /**
678     * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
679     * to include the timestamp of this key
680     */
681    private void trackTimestamps(final ExtendedCell cell) {
682      if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
683        earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
684      }
685      timeRangeTracker.includeTimestamp(cell);
686    }
687
688    private void appendGeneralBloomfilter(final ExtendedCell cell) throws IOException {
689      if (this.generalBloomFilterWriter != null) {
690        /*
691         * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.
692         * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of
693         * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed
694         * Length Row Prefix
695         */
696        bloomContext.writeBloom(cell);
697      }
698    }
699
700    private void appendDeleteFamilyBloomFilter(final ExtendedCell cell) throws IOException {
701      if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
702        return;
703      }
704
705      // increase the number of delete family in the store file
706      deleteFamilyCnt++;
707      if (this.deleteFamilyBloomFilterWriter != null) {
708        deleteFamilyBloomContext.writeBloom(cell);
709      }
710    }
711
712    private void append(final ExtendedCell cell) throws IOException {
713      appendGeneralBloomfilter(cell);
714      appendDeleteFamilyBloomFilter(cell);
715      writer.append(cell);
716      trackTimestamps(cell);
717    }
718
719    private void beforeShipped() throws IOException {
720      // For now these writer will always be of type ShipperListener true.
721      // TODO : Change all writers to be specifically created for compaction context
722      writer.beforeShipped();
723      if (generalBloomFilterWriter != null) {
724        generalBloomFilterWriter.beforeShipped();
725      }
726      if (deleteFamilyBloomFilterWriter != null) {
727        deleteFamilyBloomFilterWriter.beforeShipped();
728      }
729    }
730
731    private Path getPath() {
732      return this.writer.getPath();
733    }
734
735    private boolean hasGeneralBloom() {
736      return this.generalBloomFilterWriter != null;
737    }
738
739    /**
740     * For unit testing only.
741     * @return the Bloom filter used by this writer.
742     */
743    BloomFilterWriter getGeneralBloomWriter() {
744      return generalBloomFilterWriter;
745    }
746
747    private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
748      boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
749      if (haveBloom) {
750        bfw.compactBloom();
751      }
752      return haveBloom;
753    }
754
755    private boolean closeGeneralBloomFilter() throws IOException {
756      boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
757
758      // add the general Bloom filter writer and append file info
759      if (hasGeneralBloom) {
760        writer.addGeneralBloomFilter(generalBloomFilterWriter);
761        writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
762        if (bloomParam != null) {
763          writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
764        }
765        bloomContext.addLastBloomKey(writer);
766      }
767      return hasGeneralBloom;
768    }
769
770    private boolean closeDeleteFamilyBloomFilter() throws IOException {
771      boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
772
773      // add the delete family Bloom filter writer
774      if (hasDeleteFamilyBloom) {
775        writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
776      }
777
778      // append file info about the number of delete family kvs
779      // even if there is no delete family Bloom.
780      writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
781
782      return hasDeleteFamilyBloom;
783    }
784
785    private void close() throws IOException {
786      boolean hasGeneralBloom = this.closeGeneralBloomFilter();
787      boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
788
789      writer.close();
790
791      // Log final Bloom filter statistics. This needs to be done after close()
792      // because compound Bloom filters might be finalized as part of closing.
793      if (LOG.isTraceEnabled()) {
794        LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and "
795          + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile "
796          + getPath());
797      }
798
799    }
800
801    private void appendFileInfo(byte[] key, byte[] value) throws IOException {
802      writer.appendFileInfo(key, value);
803    }
804
805    /**
806     * For use in testing.
807     */
808    private HFile.Writer getHFileWriter() {
809      return writer;
810    }
811  }
812
813  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
814      justification = "Will not overflow")
815  public static class Builder {
816    private final Configuration conf;
817    private final CacheConfig cacheConf;
818    private final FileSystem fs;
819
820    private BloomType bloomType = BloomType.NONE;
821    private long maxKeyCount = 0;
822    private Path dir;
823    private Path liveFilePath;
824    private Path historicalFilePath;
825
826    private InetSocketAddress[] favoredNodes;
827    private HFileContext fileContext;
828    private boolean shouldDropCacheBehind;
829    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
830    private String fileStoragePolicy;
831    // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
832    // implementation where we will write store files directly to the final place, instead of
833    // writing a tmp file first. Under this scenario, we will have a background task to purge the
834    // store files which are not recorded in the SFT, but for the newly created store file writer,
835    // they are not tracked in SFT, so here we need to record them and treat them specially.
836    private Consumer<Path> writerCreationTracker;
837    private int maxVersions;
838    private boolean newVersionBehavior;
839    private CellComparator comparator;
840    private boolean isCompaction;
841
842    public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
843      this.conf = conf;
844      this.cacheConf = cacheConf;
845      this.fs = fs;
846    }
847
848    /**
849     * Creates Builder with cache configuration disabled
850     */
851    public Builder(Configuration conf, FileSystem fs) {
852      this.conf = conf;
853      this.cacheConf = CacheConfig.DISABLED;
854      this.fs = fs;
855    }
856
857    /**
858     * Use either this method or {@link #withFilePath}, but not both.
859     * @param dir Path to column family directory. The directory is created if does not exist. The
860     *            file is given a unique name within this directory.
861     * @return this (for chained invocation)
862     */
863    public Builder withOutputDir(Path dir) {
864      Preconditions.checkNotNull(dir);
865      this.dir = dir;
866      return this;
867    }
868
869    /**
870     * Use either this method or {@link #withOutputDir}, but not both.
871     * @param filePath the StoreFile path to write
872     * @return this (for chained invocation)
873     */
874    public Builder withFilePath(Path filePath) {
875      Preconditions.checkNotNull(filePath);
876      this.liveFilePath = filePath;
877      return this;
878    }
879
880    /**
881     * @param favoredNodes an array of favored nodes or possibly null
882     * @return this (for chained invocation)
883     */
884    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
885      this.favoredNodes = favoredNodes;
886      return this;
887    }
888
889    public Builder withBloomType(BloomType bloomType) {
890      Preconditions.checkNotNull(bloomType);
891      this.bloomType = bloomType;
892      return this;
893    }
894
895    /**
896     * @param maxKeyCount estimated maximum number of keys we expect to add
897     * @return this (for chained invocation)
898     */
899    public Builder withMaxKeyCount(long maxKeyCount) {
900      this.maxKeyCount = maxKeyCount;
901      return this;
902    }
903
904    public Builder withFileContext(HFileContext fileContext) {
905      this.fileContext = fileContext;
906      return this;
907    }
908
909    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
910      this.shouldDropCacheBehind = shouldDropCacheBehind;
911      return this;
912    }
913
914    public Builder
915      withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
916      this.compactedFilesSupplier = compactedFilesSupplier;
917      return this;
918    }
919
920    public Builder withFileStoragePolicy(String fileStoragePolicy) {
921      this.fileStoragePolicy = fileStoragePolicy;
922      return this;
923    }
924
925    public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
926      this.writerCreationTracker = writerCreationTracker;
927      return this;
928    }
929
930    public Builder withMaxVersions(int maxVersions) {
931      this.maxVersions = maxVersions;
932      return this;
933    }
934
935    public Builder withNewVersionBehavior(boolean newVersionBehavior) {
936      this.newVersionBehavior = newVersionBehavior;
937      return this;
938    }
939
940    public Builder withCellComparator(CellComparator comparator) {
941      this.comparator = comparator;
942      return this;
943    }
944
945    public Builder withIsCompaction(boolean isCompaction) {
946      this.isCompaction = isCompaction;
947      return this;
948    }
949
950    /**
951     * Create a store file writer. Client is responsible for closing file when done. If metadata,
952     * add BEFORE closing using {@link StoreFileWriter#appendMetadata}.
953     */
954    public StoreFileWriter build() throws IOException {
955      if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) {
956        throw new IllegalArgumentException("Either specify parent directory " + "or file path");
957      }
958
959      if (dir == null) {
960        dir = liveFilePath.getParent();
961      }
962
963      if (!fs.exists(dir)) {
964        // Handle permission for non-HDFS filesystem properly
965        // See HBASE-17710
966        HRegionFileSystem.mkdirs(fs, conf, dir);
967      }
968
969      // set block storage policy for temp path
970      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
971      if (null == policyName) {
972        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
973      }
974      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
975
976      if (liveFilePath == null) {
977        // The stored file and related blocks will used the directory based StoragePolicy.
978        // Because HDFS DistributedFileSystem does not support create files with storage policy
979        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
980        // satisfy the specific storage policy when writing. So as to avoid later data movement.
981        // We don't want to change whole temp dir to 'fileStoragePolicy'.
982        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
983          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
984          if (!fs.exists(dir)) {
985            HRegionFileSystem.mkdirs(fs, conf, dir);
986            LOG.info(
987              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
988          }
989          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
990        }
991        liveFilePath = getUniqueFile(fs, dir);
992        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
993          bloomType = BloomType.NONE;
994        }
995      }
996
997      if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) {
998        historicalFilePath = getUniqueFile(fs, dir);
999      }
1000
1001      // make sure we call this before actually create the writer
1002      // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
1003      // try to delete it and finally we will clean the tracker up after compaction. But if the file
1004      // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
1005      // and cause problem.
1006      if (writerCreationTracker != null) {
1007        writerCreationTracker.accept(liveFilePath);
1008        if (historicalFilePath != null) {
1009          writerCreationTracker.accept(historicalFilePath);
1010        }
1011      }
1012      return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType,
1013        maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier,
1014        comparator, maxVersions, newVersionBehavior);
1015    }
1016  }
1017}