001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
029import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
030import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
031import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
032import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY;
033
034import java.io.IOException;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.HashSet;
040import java.util.List;
041import java.util.Set;
042import java.util.UUID;
043import java.util.function.Consumer;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.CellComparator;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.ExtendedCell;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.KeyValue;
056import org.apache.hadoop.hbase.PrivateCellUtil;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.io.hfile.CacheConfig;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.io.hfile.HFileContext;
062import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
063import org.apache.hadoop.hbase.mob.MobUtils;
064import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
065import org.apache.hadoop.hbase.util.BloomContext;
066import org.apache.hadoop.hbase.util.BloomFilterFactory;
067import org.apache.hadoop.hbase.util.BloomFilterUtil;
068import org.apache.hadoop.hbase.util.BloomFilterWriter;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.RowBloomContext;
072import org.apache.hadoop.hbase.util.RowColBloomContext;
073import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
074import org.apache.yetus.audience.InterfaceAudience;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
079import org.apache.hbase.thirdparty.com.google.common.base.Strings;
080import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
081import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
082
083import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
084
085/**
086 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is
087 * an implementation detail of the HBase regionserver.
088 */
089@InterfaceAudience.Private
090public class StoreFileWriter implements CellSink, ShipperListener {
091  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
092  public static final String ENABLE_HISTORICAL_COMPACTION_FILES =
093    "hbase.enable.historical.compaction.files";
094  public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false;
095  private static final Pattern dash = Pattern.compile("-");
096  private SingleStoreFileWriter liveFileWriter;
097  private SingleStoreFileWriter historicalFileWriter;
098  private final FileSystem fs;
099  private final Path historicalFilePath;
100  private final Configuration conf;
101  private final CacheConfig cacheConf;
102  private final BloomType bloomType;
103  private final long maxKeys;
104  private final InetSocketAddress[] favoredNodes;
105  private final HFileContext fileContext;
106  private final boolean shouldDropCacheBehind;
107  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
108  private final CellComparator comparator;
109  private ExtendedCell lastCell;
110  // The first (latest) delete family marker of the current row
111  private ExtendedCell deleteFamily;
112  // The list of delete family version markers of the current row
113  private List<ExtendedCell> deleteFamilyVersionList = new ArrayList<>();
114  // The first (latest) delete column marker of the current column
115  private ExtendedCell deleteColumn;
116  // The list of delete column version markers of the current column
117  private List<ExtendedCell> deleteColumnVersionList = new ArrayList<>();
118  // The live put cell count for the current column
119  private int livePutCellCount;
120  private final int maxVersions;
121  private final boolean newVersionBehavior;
122
123  /**
124   * Creates an HFile.Writer that also write helpful meta data.
125   * @param fs                     file system to write to
126   * @param liveFilePath           the name of the live file to create
127   * @param historicalFilePath     the name of the historical file name to create
128   * @param conf                   user configuration
129   * @param bloomType              bloom filter setting
130   * @param maxKeys                the expected maximum number of keys to be added. Was used for
131   *                               Bloom filter size in {@link HFile} format version 1.
132   * @param favoredNodes           an array of favored nodes or possibly null
133   * @param fileContext            The HFile context
134   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
135   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
136   * @param comparator             Cell comparator
137   * @param maxVersions            max cell versions
138   * @param newVersionBehavior     enable new version behavior
139   * @throws IOException problem writing to FS
140   */
141  private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath,
142    final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys,
143    InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind,
144    Supplier<Collection<HStoreFile>> compactedFilesSupplier, CellComparator comparator,
145    int maxVersions, boolean newVersionBehavior) throws IOException {
146    this.fs = fs;
147    this.historicalFilePath = historicalFilePath;
148    this.conf = conf;
149    this.cacheConf = cacheConf;
150    this.bloomType = bloomType;
151    this.maxKeys = maxKeys;
152    this.favoredNodes = favoredNodes;
153    this.fileContext = fileContext;
154    this.shouldDropCacheBehind = shouldDropCacheBehind;
155    this.compactedFilesSupplier = compactedFilesSupplier;
156    this.comparator = comparator;
157    this.maxVersions = maxVersions;
158    this.newVersionBehavior = newVersionBehavior;
159    liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType,
160      maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
161  }
162
163  public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) {
164    if (
165      conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES,
166        DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES)
167    ) {
168      // Historical compaction files are supported only for default store engine with
169      // default compactor.
170      String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
171      if (!storeEngine.equals(DefaultStoreEngine.class.getName())) {
172        LOG.warn("Historical compaction file generation is ignored for " + storeEngine
173          + ". hbase.enable.historical.compaction.files can be set to true only for the "
174          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
175        return false;
176      }
177      String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName());
178      if (!compactor.equals(DefaultCompactor.class.getName())) {
179        LOG.warn("Historical compaction file generation is ignored for " + compactor
180          + ". hbase.enable.historical.compaction.files can be set to true only for the "
181          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
182        return false;
183      }
184      return true;
185    }
186    return false;
187  }
188
189  public long getPos() throws IOException {
190    return liveFileWriter.getPos();
191  }
192
193  /**
194   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
195   * @param maxSequenceId   Maximum sequence id.
196   * @param majorCompaction True if this file is product of a major compaction
197   * @throws IOException problem writing to FS
198   */
199  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
200    throws IOException {
201    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction);
202    if (historicalFileWriter != null) {
203      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction);
204    }
205  }
206
207  /**
208   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
209   * @param maxSequenceId   Maximum sequence id.
210   * @param majorCompaction True if this file is product of a major compaction
211   * @param storeFiles      The compacted store files to generate this new file
212   * @throws IOException problem writing to FS
213   */
214  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
215    final Collection<HStoreFile> storeFiles) throws IOException {
216    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
217    if (historicalFileWriter != null) {
218      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
219    }
220  }
221
222  /**
223   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
224   * @param maxSequenceId   Maximum sequence id.
225   * @param majorCompaction True if this file is product of a major compaction
226   * @param mobCellsCount   The number of mob cells.
227   * @throws IOException problem writing to FS
228   */
229  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
230    final long mobCellsCount) throws IOException {
231    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
232    if (historicalFileWriter != null) {
233      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
234    }
235  }
236
237  /**
238   * Appends MOB - specific metadata (even if it is empty)
239   * @param mobRefSet - original table -> set of MOB file names
240   * @throws IOException problem writing to FS
241   */
242  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
243    liveFileWriter.appendMobMetadata(mobRefSet);
244    if (historicalFileWriter != null) {
245      historicalFileWriter.appendMobMetadata(mobRefSet);
246    }
247  }
248
249  /**
250   * Add TimestampRange and earliest put timestamp to Metadata
251   */
252  public void appendTrackedTimestampsToMetadata() throws IOException {
253    // TODO: The StoreFileReader always converts the byte[] to TimeRange
254    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
255    liveFileWriter.appendTrackedTimestampsToMetadata();
256    if (historicalFileWriter != null) {
257      historicalFileWriter.appendTrackedTimestampsToMetadata();
258    }
259  }
260
261  @Override
262  public void beforeShipped() throws IOException {
263    liveFileWriter.beforeShipped();
264    if (historicalFileWriter != null) {
265      historicalFileWriter.beforeShipped();
266    }
267  }
268
269  public Path getPath() {
270    return liveFileWriter.getPath();
271  }
272
273  public List<Path> getPaths() {
274    if (historicalFileWriter == null) {
275      return Lists.newArrayList(liveFileWriter.getPath());
276    }
277    return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath());
278  }
279
280  public boolean hasGeneralBloom() {
281    return liveFileWriter.hasGeneralBloom();
282  }
283
284  /**
285   * For unit testing only.
286   * @return the Bloom filter used by this writer.
287   */
288  BloomFilterWriter getGeneralBloomWriter() {
289    return liveFileWriter.generalBloomFilterWriter;
290  }
291
292  public void close() throws IOException {
293    liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false));
294    liveFileWriter.close();
295    if (historicalFileWriter != null) {
296      historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true));
297      historicalFileWriter.close();
298    }
299  }
300
301  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
302    liveFileWriter.appendFileInfo(key, value);
303    if (historicalFileWriter != null) {
304      historicalFileWriter.appendFileInfo(key, value);
305    }
306  }
307
308  /**
309   * For use in testing.
310   */
311  HFile.Writer getLiveFileWriter() {
312    return liveFileWriter.getHFileWriter();
313  }
314
315  /**
316   * @param dir Directory to create file in.
317   * @return random filename inside passed <code>dir</code>
318   */
319  public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
320    if (!fs.getFileStatus(dir).isDirectory()) {
321      throw new IOException("Expecting " + dir.toString() + " to be a directory");
322    }
323    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
324  }
325
326  private SingleStoreFileWriter getHistoricalFileWriter() throws IOException {
327    if (historicalFileWriter == null) {
328      historicalFileWriter =
329        new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys,
330          favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
331    }
332    return historicalFileWriter;
333  }
334
335  private void initRowState() {
336    deleteFamily = null;
337    deleteFamilyVersionList.clear();
338    lastCell = null;
339  }
340
341  private void initColumnState() {
342    livePutCellCount = 0;
343    deleteColumn = null;
344    deleteColumnVersionList.clear();
345
346  }
347
348  private boolean isDeletedByDeleteFamily(ExtendedCell cell) {
349    return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp()
350      || (deleteFamily.getTimestamp() == cell.getTimestamp()
351        && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId())));
352  }
353
354  private boolean isDeletedByDeleteFamilyVersion(ExtendedCell cell) {
355    for (ExtendedCell deleteFamilyVersion : deleteFamilyVersionList) {
356      if (
357        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
358          && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId())
359      ) {
360        return true;
361      }
362    }
363    return false;
364  }
365
366  private boolean isDeletedByDeleteColumn(ExtendedCell cell) {
367    return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp()
368      || (deleteColumn.getTimestamp() == cell.getTimestamp()
369        && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId())));
370  }
371
372  private boolean isDeletedByDeleteColumnVersion(ExtendedCell cell) {
373    for (ExtendedCell deleteColumnVersion : deleteColumnVersionList) {
374      if (
375        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
376          && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId())
377      ) {
378        return true;
379      }
380    }
381    return false;
382  }
383
384  private boolean isDeleted(ExtendedCell cell) {
385    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
386      || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell);
387  }
388
389  private void appendCell(ExtendedCell cell) throws IOException {
390    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
391      initColumnState();
392    }
393    if (cell.getType() == Cell.Type.DeleteFamily) {
394      if (deleteFamily == null) {
395        deleteFamily = cell;
396        liveFileWriter.append(cell);
397      } else {
398        getHistoricalFileWriter().append(cell);
399      }
400    } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
401      if (!isDeletedByDeleteFamily(cell)) {
402        deleteFamilyVersionList.add(cell);
403        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
404          // This means both the delete-family and delete-family-version markers have the same
405          // timestamp but the sequence id of delete-family-version marker is higher than that of
406          // the delete-family marker. In this case, there is no need to add the
407          // delete-family-version marker to the live version file. This case happens only with
408          // the new version behavior.
409          liveFileWriter.append(cell);
410        } else {
411          liveFileWriter.append(cell);
412        }
413      } else {
414        getHistoricalFileWriter().append(cell);
415      }
416    } else if (cell.getType() == Cell.Type.DeleteColumn) {
417      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
418        deleteColumn = cell;
419        liveFileWriter.append(cell);
420      } else {
421        getHistoricalFileWriter().append(cell);
422      }
423    } else if (cell.getType() == Cell.Type.Delete) {
424      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
425        deleteColumnVersionList.add(cell);
426        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
427          // This means both the delete-family and delete-column-version markers have the same
428          // timestamp but the sequence id of delete-column-version marker is higher than that of
429          // the delete-family marker. In this case, there is no need to add the
430          // delete-column-version marker to the live version file. This case happens only with
431          // the new version behavior.
432          getHistoricalFileWriter().append(cell);
433        } else {
434          liveFileWriter.append(cell);
435        }
436      } else {
437        getHistoricalFileWriter().append(cell);
438      }
439    } else if (cell.getType() == Cell.Type.Put) {
440      if (livePutCellCount < maxVersions) {
441        // This is a live put cell (i.e., the latest version) of a column. Is it deleted?
442        if (!isDeleted(cell)) {
443          liveFileWriter.append(cell);
444          livePutCellCount++;
445        } else {
446          // It is deleted
447          getHistoricalFileWriter().append(cell);
448          if (newVersionBehavior) {
449            // Deleted versions are considered toward total version count when newVersionBehavior
450            livePutCellCount++;
451          }
452        }
453      } else {
454        // It is an older put cell
455        getHistoricalFileWriter().append(cell);
456      }
457    }
458    lastCell = cell;
459  }
460
461  @Override
462  public void appendAll(List<ExtendedCell> cellList) throws IOException {
463    if (historicalFilePath == null) {
464      // The dual writing is not enabled and all cells are written to one file. We use
465      // the live version file in this case
466      for (ExtendedCell cell : cellList) {
467        liveFileWriter.append(cell);
468      }
469      return;
470    }
471    if (cellList.isEmpty()) {
472      return;
473    }
474    if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) {
475      // It is a new row and thus time to reset the state
476      initRowState();
477    }
478    for (ExtendedCell cell : cellList) {
479      appendCell(cell);
480    }
481  }
482
483  @Override
484  public void append(ExtendedCell cell) throws IOException {
485    if (historicalFilePath == null) {
486      // The dual writing is not enabled and all cells are written to one file. We use
487      // the live version file in this case
488      liveFileWriter.append(cell);
489      return;
490    }
491    appendCell(cell);
492  }
493
494  private static class SingleStoreFileWriter {
495    private final BloomFilterWriter generalBloomFilterWriter;
496    private final BloomFilterWriter deleteFamilyBloomFilterWriter;
497    private final BloomType bloomType;
498    private byte[] bloomParam = null;
499    private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
500    private long deleteFamilyCnt = 0;
501    private BloomContext bloomContext = null;
502    private BloomContext deleteFamilyBloomContext = null;
503    private final TimeRangeTracker timeRangeTracker;
504    private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
505
506    private HFile.Writer writer;
507
508    /**
509     * Creates an HFile.Writer that also write helpful meta data.
510     * @param fs                     file system to write to
511     * @param path                   file name to create
512     * @param conf                   user configuration
513     * @param bloomType              bloom filter setting
514     * @param maxKeys                the expected maximum number of keys to be added. Was used for
515     *                               Bloom filter size in {@link HFile} format version 1.
516     * @param favoredNodes           an array of favored nodes or possibly null
517     * @param fileContext            The HFile context
518     * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
519     * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
520     * @throws IOException problem writing to FS
521     */
522    private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf,
523      CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes,
524      HFileContext fileContext, boolean shouldDropCacheBehind,
525      Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException {
526      this.compactedFilesSupplier = compactedFilesSupplier;
527      this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
528      // TODO : Change all writers to be specifically created for compaction context
529      writer =
530        HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes)
531          .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create();
532
533      generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf,
534        bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
535
536      if (generalBloomFilterWriter != null) {
537        this.bloomType = bloomType;
538        this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
539        if (LOG.isTraceEnabled()) {
540          LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
541            + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH
542              ? Bytes.toInt(bloomParam)
543              : Bytes.toStringBinary(bloomParam))
544            + ", " + generalBloomFilterWriter.getClass().getSimpleName());
545        }
546        // init bloom context
547        switch (bloomType) {
548          case ROW:
549            bloomContext =
550              new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
551            break;
552          case ROWCOL:
553            bloomContext =
554              new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
555            break;
556          case ROWPREFIX_FIXED_LENGTH:
557            bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
558              fileContext.getCellComparator(), Bytes.toInt(bloomParam));
559            break;
560          default:
561            throw new IOException(
562              "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
563        }
564      } else {
565        // Not using Bloom filters.
566        this.bloomType = BloomType.NONE;
567      }
568
569      // initialize delete family Bloom filter when there is NO RowCol Bloom filter
570      if (this.bloomType != BloomType.ROWCOL) {
571        this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf,
572          cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
573        deleteFamilyBloomContext =
574          new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
575      } else {
576        deleteFamilyBloomFilterWriter = null;
577      }
578      if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
579        LOG.trace("Delete Family Bloom filter type for " + path + ": "
580          + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
581      }
582    }
583
584    private long getPos() throws IOException {
585      return ((HFileWriterImpl) writer).getPos();
586    }
587
588    /**
589     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
590     * @param maxSequenceId   Maximum sequence id.
591     * @param majorCompaction True if this file is product of a major compaction
592     * @throws IOException problem writing to FS
593     */
594    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
595      throws IOException {
596      appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
597    }
598
599    /**
600     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
601     * @param maxSequenceId   Maximum sequence id.
602     * @param majorCompaction True if this file is product of a major compaction
603     * @param storeFiles      The compacted store files to generate this new file
604     * @throws IOException problem writing to FS
605     */
606    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
607      final Collection<HStoreFile> storeFiles) throws IOException {
608      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
609      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
610      writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
611      appendTrackedTimestampsToMetadata();
612    }
613
614    /**
615     * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The
616     * compacted store files's name is needed. But if the compacted store file is a result of
617     * compaction, it's compacted files which still not archived is needed, too. And don't need to
618     * add compacted files recursively. If file A, B, C compacted to new file D, and file D
619     * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E
620     * compacted to new file F, will add E to F's compacted files first, then add E's compacted
621     * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has
622     * been in E's compacted files, too. See HBASE-20724 for more details.
623     * @param storeFiles The compacted store files to generate this new file
624     * @return bytes of CompactionEventTracker
625     */
626    private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
627      Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream()
628        .map(sf -> sf.getPath().getName()).collect(Collectors.toSet());
629      Set<String> compactedStoreFiles = new HashSet<>();
630      for (HStoreFile storeFile : storeFiles) {
631        compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
632        for (String csf : storeFile.getCompactedStoreFiles()) {
633          if (notArchivedCompactedStoreFiles.contains(csf)) {
634            compactedStoreFiles.add(csf);
635          }
636        }
637      }
638      return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
639    }
640
641    /**
642     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
643     * @param maxSequenceId   Maximum sequence id.
644     * @param majorCompaction True if this file is product of a major compaction
645     * @param mobCellsCount   The number of mob cells.
646     * @throws IOException problem writing to FS
647     */
648    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
649      final long mobCellsCount) throws IOException {
650      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
651      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
652      writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
653      appendTrackedTimestampsToMetadata();
654    }
655
656    /**
657     * Appends MOB - specific metadata (even if it is empty)
658     * @param mobRefSet - original table -> set of MOB file names
659     * @throws IOException problem writing to FS
660     */
661    private void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
662      writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
663    }
664
665    /**
666     * Add TimestampRange and earliest put timestamp to Metadata
667     */
668    private void appendTrackedTimestampsToMetadata() throws IOException {
669      // TODO: The StoreFileReader always converts the byte[] to TimeRange
670      // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
671      appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
672      appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
673    }
674
675    /**
676     * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
677     * to include the timestamp of this key
678     */
679    private void trackTimestamps(final ExtendedCell cell) {
680      if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
681        earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
682      }
683      timeRangeTracker.includeTimestamp(cell);
684    }
685
686    private void appendGeneralBloomfilter(final ExtendedCell cell) throws IOException {
687      if (this.generalBloomFilterWriter != null) {
688        /*
689         * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.
690         * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of
691         * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed
692         * Length Row Prefix
693         */
694        bloomContext.writeBloom(cell);
695      }
696    }
697
698    private void appendDeleteFamilyBloomFilter(final ExtendedCell cell) throws IOException {
699      if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
700        return;
701      }
702
703      // increase the number of delete family in the store file
704      deleteFamilyCnt++;
705      if (this.deleteFamilyBloomFilterWriter != null) {
706        deleteFamilyBloomContext.writeBloom(cell);
707      }
708    }
709
710    private void append(final ExtendedCell cell) throws IOException {
711      appendGeneralBloomfilter(cell);
712      appendDeleteFamilyBloomFilter(cell);
713      writer.append(cell);
714      trackTimestamps(cell);
715    }
716
717    private void beforeShipped() throws IOException {
718      // For now these writer will always be of type ShipperListener true.
719      // TODO : Change all writers to be specifically created for compaction context
720      writer.beforeShipped();
721      if (generalBloomFilterWriter != null) {
722        generalBloomFilterWriter.beforeShipped();
723      }
724      if (deleteFamilyBloomFilterWriter != null) {
725        deleteFamilyBloomFilterWriter.beforeShipped();
726      }
727    }
728
729    private Path getPath() {
730      return this.writer.getPath();
731    }
732
733    private boolean hasGeneralBloom() {
734      return this.generalBloomFilterWriter != null;
735    }
736
737    /**
738     * For unit testing only.
739     * @return the Bloom filter used by this writer.
740     */
741    BloomFilterWriter getGeneralBloomWriter() {
742      return generalBloomFilterWriter;
743    }
744
745    private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
746      boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
747      if (haveBloom) {
748        bfw.compactBloom();
749      }
750      return haveBloom;
751    }
752
753    private boolean closeGeneralBloomFilter() throws IOException {
754      boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
755
756      // add the general Bloom filter writer and append file info
757      if (hasGeneralBloom) {
758        writer.addGeneralBloomFilter(generalBloomFilterWriter);
759        writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
760        if (bloomParam != null) {
761          writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
762        }
763        bloomContext.addLastBloomKey(writer);
764      }
765      return hasGeneralBloom;
766    }
767
768    private boolean closeDeleteFamilyBloomFilter() throws IOException {
769      boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
770
771      // add the delete family Bloom filter writer
772      if (hasDeleteFamilyBloom) {
773        writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
774      }
775
776      // append file info about the number of delete family kvs
777      // even if there is no delete family Bloom.
778      writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
779
780      return hasDeleteFamilyBloom;
781    }
782
783    private void close() throws IOException {
784      boolean hasGeneralBloom = this.closeGeneralBloomFilter();
785      boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
786
787      writer.close();
788
789      // Log final Bloom filter statistics. This needs to be done after close()
790      // because compound Bloom filters might be finalized as part of closing.
791      if (LOG.isTraceEnabled()) {
792        LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and "
793          + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile "
794          + getPath());
795      }
796
797    }
798
799    private void appendFileInfo(byte[] key, byte[] value) throws IOException {
800      writer.appendFileInfo(key, value);
801    }
802
803    /**
804     * For use in testing.
805     */
806    private HFile.Writer getHFileWriter() {
807      return writer;
808    }
809  }
810
811  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
812      justification = "Will not overflow")
813  public static class Builder {
814    private final Configuration conf;
815    private final CacheConfig cacheConf;
816    private final FileSystem fs;
817
818    private BloomType bloomType = BloomType.NONE;
819    private long maxKeyCount = 0;
820    private Path dir;
821    private Path liveFilePath;
822    private Path historicalFilePath;
823
824    private InetSocketAddress[] favoredNodes;
825    private HFileContext fileContext;
826    private boolean shouldDropCacheBehind;
827    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
828    private String fileStoragePolicy;
829    // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
830    // implementation where we will write store files directly to the final place, instead of
831    // writing a tmp file first. Under this scenario, we will have a background task to purge the
832    // store files which are not recorded in the SFT, but for the newly created store file writer,
833    // they are not tracked in SFT, so here we need to record them and treat them specially.
834    private Consumer<Path> writerCreationTracker;
835    private int maxVersions;
836    private boolean newVersionBehavior;
837    private CellComparator comparator;
838    private boolean isCompaction;
839
840    public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
841      this.conf = conf;
842      this.cacheConf = cacheConf;
843      this.fs = fs;
844    }
845
846    /**
847     * Creates Builder with cache configuration disabled
848     */
849    public Builder(Configuration conf, FileSystem fs) {
850      this.conf = conf;
851      this.cacheConf = CacheConfig.DISABLED;
852      this.fs = fs;
853    }
854
855    /**
856     * Use either this method or {@link #withFilePath}, but not both.
857     * @param dir Path to column family directory. The directory is created if does not exist. The
858     *            file is given a unique name within this directory.
859     * @return this (for chained invocation)
860     */
861    public Builder withOutputDir(Path dir) {
862      Preconditions.checkNotNull(dir);
863      this.dir = dir;
864      return this;
865    }
866
867    /**
868     * Use either this method or {@link #withOutputDir}, but not both.
869     * @param filePath the StoreFile path to write
870     * @return this (for chained invocation)
871     */
872    public Builder withFilePath(Path filePath) {
873      Preconditions.checkNotNull(filePath);
874      this.liveFilePath = filePath;
875      return this;
876    }
877
878    /**
879     * @param favoredNodes an array of favored nodes or possibly null
880     * @return this (for chained invocation)
881     */
882    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
883      this.favoredNodes = favoredNodes;
884      return this;
885    }
886
887    public Builder withBloomType(BloomType bloomType) {
888      Preconditions.checkNotNull(bloomType);
889      this.bloomType = bloomType;
890      return this;
891    }
892
893    /**
894     * @param maxKeyCount estimated maximum number of keys we expect to add
895     * @return this (for chained invocation)
896     */
897    public Builder withMaxKeyCount(long maxKeyCount) {
898      this.maxKeyCount = maxKeyCount;
899      return this;
900    }
901
902    public Builder withFileContext(HFileContext fileContext) {
903      this.fileContext = fileContext;
904      return this;
905    }
906
907    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
908      this.shouldDropCacheBehind = shouldDropCacheBehind;
909      return this;
910    }
911
912    public Builder
913      withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
914      this.compactedFilesSupplier = compactedFilesSupplier;
915      return this;
916    }
917
918    public Builder withFileStoragePolicy(String fileStoragePolicy) {
919      this.fileStoragePolicy = fileStoragePolicy;
920      return this;
921    }
922
923    public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
924      this.writerCreationTracker = writerCreationTracker;
925      return this;
926    }
927
928    public Builder withMaxVersions(int maxVersions) {
929      this.maxVersions = maxVersions;
930      return this;
931    }
932
933    public Builder withNewVersionBehavior(boolean newVersionBehavior) {
934      this.newVersionBehavior = newVersionBehavior;
935      return this;
936    }
937
938    public Builder withCellComparator(CellComparator comparator) {
939      this.comparator = comparator;
940      return this;
941    }
942
943    public Builder withIsCompaction(boolean isCompaction) {
944      this.isCompaction = isCompaction;
945      return this;
946    }
947
948    /**
949     * Create a store file writer. Client is responsible for closing file when done. If metadata,
950     * add BEFORE closing using {@link StoreFileWriter#appendMetadata}.
951     */
952    public StoreFileWriter build() throws IOException {
953      if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) {
954        throw new IllegalArgumentException("Either specify parent directory " + "or file path");
955      }
956
957      if (dir == null) {
958        dir = liveFilePath.getParent();
959      }
960
961      if (!fs.exists(dir)) {
962        // Handle permission for non-HDFS filesystem properly
963        // See HBASE-17710
964        HRegionFileSystem.mkdirs(fs, conf, dir);
965      }
966
967      // set block storage policy for temp path
968      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
969      if (null == policyName) {
970        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
971      }
972      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
973
974      if (liveFilePath == null) {
975        // The stored file and related blocks will used the directory based StoragePolicy.
976        // Because HDFS DistributedFileSystem does not support create files with storage policy
977        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
978        // satisfy the specific storage policy when writing. So as to avoid later data movement.
979        // We don't want to change whole temp dir to 'fileStoragePolicy'.
980        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
981          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
982          if (!fs.exists(dir)) {
983            HRegionFileSystem.mkdirs(fs, conf, dir);
984            LOG.info(
985              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
986          }
987          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
988        }
989        liveFilePath = getUniqueFile(fs, dir);
990        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
991          bloomType = BloomType.NONE;
992        }
993      }
994
995      if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) {
996        historicalFilePath = getUniqueFile(fs, dir);
997      }
998
999      // make sure we call this before actually create the writer
1000      // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
1001      // try to delete it and finally we will clean the tracker up after compaction. But if the file
1002      // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
1003      // and cause problem.
1004      if (writerCreationTracker != null) {
1005        writerCreationTracker.accept(liveFilePath);
1006        if (historicalFilePath != null) {
1007          writerCreationTracker.accept(historicalFilePath);
1008        }
1009      }
1010      return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType,
1011        maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier,
1012        comparator, maxVersions, newVersionBehavior);
1013    }
1014  }
1015}