001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
029import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
030import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
031import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
032import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY;
033
034import java.io.IOException;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.HashSet;
040import java.util.List;
041import java.util.Set;
042import java.util.UUID;
043import java.util.function.Consumer;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.CellComparator;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.KeyValue;
055import org.apache.hadoop.hbase.PrivateCellUtil;
056import org.apache.hadoop.hbase.TableName;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
058import org.apache.hadoop.hbase.io.hfile.CacheConfig;
059import org.apache.hadoop.hbase.io.hfile.HFile;
060import org.apache.hadoop.hbase.io.hfile.HFileContext;
061import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
062import org.apache.hadoop.hbase.mob.MobUtils;
063import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
064import org.apache.hadoop.hbase.util.BloomContext;
065import org.apache.hadoop.hbase.util.BloomFilterFactory;
066import org.apache.hadoop.hbase.util.BloomFilterUtil;
067import org.apache.hadoop.hbase.util.BloomFilterWriter;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.RowBloomContext;
071import org.apache.hadoop.hbase.util.RowColBloomContext;
072import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
078import org.apache.hbase.thirdparty.com.google.common.base.Strings;
079import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
080import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
081
082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
083
084/**
085 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is
086 * an implementation detail of the HBase regionserver.
087 */
088@InterfaceAudience.Private
089public class StoreFileWriter implements CellSink, ShipperListener {
090  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
091  public static final String ENABLE_HISTORICAL_COMPACTION_FILES =
092    "hbase.enable.historical.compaction.files";
093  public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false;
094  private static final Pattern dash = Pattern.compile("-");
095  private SingleStoreFileWriter liveFileWriter;
096  private SingleStoreFileWriter historicalFileWriter;
097  private final FileSystem fs;
098  private final Path historicalFilePath;
099  private final Configuration conf;
100  private final CacheConfig cacheConf;
101  private final BloomType bloomType;
102  private final long maxKeys;
103  private final InetSocketAddress[] favoredNodes;
104  private final HFileContext fileContext;
105  private final boolean shouldDropCacheBehind;
106  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
107  private final CellComparator comparator;
108  private Cell lastCell;
109  // The first (latest) delete family marker of the current row
110  private Cell deleteFamily;
111  // The list of delete family version markers of the current row
112  private List<Cell> deleteFamilyVersionList = new ArrayList<>();
113  // The first (latest) delete column marker of the current column
114  private Cell deleteColumn;
115  // The list of delete column version markers of the current column
116  private List<Cell> deleteColumnVersionList = new ArrayList<>();
117  // The live put cell count for the current column
118  private int livePutCellCount;
119  private final int maxVersions;
120  private final boolean newVersionBehavior;
121
122  /**
123   * Creates an HFile.Writer that also write helpful meta data.
124   * @param fs                     file system to write to
125   * @param liveFilePath           the name of the live file to create
126   * @param historicalFilePath     the name of the historical file name to create
127   * @param conf                   user configuration
128   * @param bloomType              bloom filter setting
129   * @param maxKeys                the expected maximum number of keys to be added. Was used for
130   *                               Bloom filter size in {@link HFile} format version 1.
131   * @param favoredNodes           an array of favored nodes or possibly null
132   * @param fileContext            The HFile context
133   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
134   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
135   * @param comparator             Cell comparator
136   * @param maxVersions            max cell versions
137   * @param newVersionBehavior     enable new version behavior
138   * @throws IOException problem writing to FS
139   */
140  private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath,
141    final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys,
142    InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind,
143    Supplier<Collection<HStoreFile>> compactedFilesSupplier, CellComparator comparator,
144    int maxVersions, boolean newVersionBehavior) throws IOException {
145    this.fs = fs;
146    this.historicalFilePath = historicalFilePath;
147    this.conf = conf;
148    this.cacheConf = cacheConf;
149    this.bloomType = bloomType;
150    this.maxKeys = maxKeys;
151    this.favoredNodes = favoredNodes;
152    this.fileContext = fileContext;
153    this.shouldDropCacheBehind = shouldDropCacheBehind;
154    this.compactedFilesSupplier = compactedFilesSupplier;
155    this.comparator = comparator;
156    this.maxVersions = maxVersions;
157    this.newVersionBehavior = newVersionBehavior;
158    liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType,
159      maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
160  }
161
162  public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) {
163    if (
164      conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES,
165        DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES)
166    ) {
167      // Historical compaction files are supported only for default store engine with
168      // default compactor.
169      String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
170      if (!storeEngine.equals(DefaultStoreEngine.class.getName())) {
171        LOG.warn("Historical compaction file generation is ignored for " + storeEngine
172          + ". hbase.enable.historical.compaction.files can be set to true only for the "
173          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
174        return false;
175      }
176      String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName());
177      if (!compactor.equals(DefaultCompactor.class.getName())) {
178        LOG.warn("Historical compaction file generation is ignored for " + compactor
179          + ". hbase.enable.historical.compaction.files can be set to true only for the "
180          + "default compaction (DefaultStoreEngine and DefaultCompactor)");
181        return false;
182      }
183      return true;
184    }
185    return false;
186  }
187
188  public long getPos() throws IOException {
189    return liveFileWriter.getPos();
190  }
191
192  /**
193   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
194   * @param maxSequenceId   Maximum sequence id.
195   * @param majorCompaction True if this file is product of a major compaction
196   * @throws IOException problem writing to FS
197   */
198  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
199    throws IOException {
200    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction);
201    if (historicalFileWriter != null) {
202      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction);
203    }
204  }
205
206  /**
207   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
208   * @param maxSequenceId   Maximum sequence id.
209   * @param majorCompaction True if this file is product of a major compaction
210   * @param storeFiles      The compacted store files to generate this new file
211   * @throws IOException problem writing to FS
212   */
213  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
214    final Collection<HStoreFile> storeFiles) throws IOException {
215    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
216    if (historicalFileWriter != null) {
217      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles);
218    }
219  }
220
221  /**
222   * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
223   * @param maxSequenceId   Maximum sequence id.
224   * @param majorCompaction True if this file is product of a major compaction
225   * @param mobCellsCount   The number of mob cells.
226   * @throws IOException problem writing to FS
227   */
228  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
229    final long mobCellsCount) throws IOException {
230    liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
231    if (historicalFileWriter != null) {
232      historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount);
233    }
234  }
235
236  /**
237   * Appends MOB - specific metadata (even if it is empty)
238   * @param mobRefSet - original table -> set of MOB file names
239   * @throws IOException problem writing to FS
240   */
241  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
242    liveFileWriter.appendMobMetadata(mobRefSet);
243    if (historicalFileWriter != null) {
244      historicalFileWriter.appendMobMetadata(mobRefSet);
245    }
246  }
247
248  /**
249   * Add TimestampRange and earliest put timestamp to Metadata
250   */
251  public void appendTrackedTimestampsToMetadata() throws IOException {
252    // TODO: The StoreFileReader always converts the byte[] to TimeRange
253    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
254    liveFileWriter.appendTrackedTimestampsToMetadata();
255    if (historicalFileWriter != null) {
256      historicalFileWriter.appendTrackedTimestampsToMetadata();
257    }
258  }
259
260  @Override
261  public void beforeShipped() throws IOException {
262    liveFileWriter.beforeShipped();
263    if (historicalFileWriter != null) {
264      historicalFileWriter.beforeShipped();
265    }
266  }
267
268  public Path getPath() {
269    return liveFileWriter.getPath();
270  }
271
272  public List<Path> getPaths() {
273    if (historicalFileWriter == null) {
274      return Lists.newArrayList(liveFileWriter.getPath());
275    }
276    return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath());
277  }
278
279  public boolean hasGeneralBloom() {
280    return liveFileWriter.hasGeneralBloom();
281  }
282
283  /**
284   * For unit testing only.
285   * @return the Bloom filter used by this writer.
286   */
287  BloomFilterWriter getGeneralBloomWriter() {
288    return liveFileWriter.generalBloomFilterWriter;
289  }
290
291  public void close() throws IOException {
292    liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false));
293    liveFileWriter.close();
294    if (historicalFileWriter != null) {
295      historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true));
296      historicalFileWriter.close();
297    }
298  }
299
300  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
301    liveFileWriter.appendFileInfo(key, value);
302    if (historicalFileWriter != null) {
303      historicalFileWriter.appendFileInfo(key, value);
304    }
305  }
306
307  /**
308   * For use in testing.
309   */
310  HFile.Writer getLiveFileWriter() {
311    return liveFileWriter.getHFileWriter();
312  }
313
314  /**
315   * @param dir Directory to create file in.
316   * @return random filename inside passed <code>dir</code>
317   */
318  public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
319    if (!fs.getFileStatus(dir).isDirectory()) {
320      throw new IOException("Expecting " + dir.toString() + " to be a directory");
321    }
322    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
323  }
324
325  private SingleStoreFileWriter getHistoricalFileWriter() throws IOException {
326    if (historicalFileWriter == null) {
327      historicalFileWriter =
328        new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys,
329          favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
330    }
331    return historicalFileWriter;
332  }
333
334  private void initRowState() {
335    deleteFamily = null;
336    deleteFamilyVersionList.clear();
337    lastCell = null;
338  }
339
340  private void initColumnState() {
341    livePutCellCount = 0;
342    deleteColumn = null;
343    deleteColumnVersionList.clear();
344
345  }
346
347  private boolean isDeletedByDeleteFamily(Cell cell) {
348    return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp()
349      || (deleteFamily.getTimestamp() == cell.getTimestamp()
350        && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId())));
351  }
352
353  private boolean isDeletedByDeleteFamilyVersion(Cell cell) {
354    for (Cell deleteFamilyVersion : deleteFamilyVersionList) {
355      if (
356        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
357          && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId())
358      ) {
359        return true;
360      }
361    }
362    return false;
363  }
364
365  private boolean isDeletedByDeleteColumn(Cell cell) {
366    return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp()
367      || (deleteColumn.getTimestamp() == cell.getTimestamp()
368        && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId())));
369  }
370
371  private boolean isDeletedByDeleteColumnVersion(Cell cell) {
372    for (Cell deleteColumnVersion : deleteColumnVersionList) {
373      if (
374        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
375          && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId())
376      ) {
377        return true;
378      }
379    }
380    return false;
381  }
382
383  private boolean isDeleted(Cell cell) {
384    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
385      || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell);
386  }
387
388  private void appendCell(Cell cell) throws IOException {
389    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
390      initColumnState();
391    }
392    if (cell.getType() == Cell.Type.DeleteFamily) {
393      if (deleteFamily == null) {
394        deleteFamily = cell;
395        liveFileWriter.append(cell);
396      } else {
397        getHistoricalFileWriter().append(cell);
398      }
399    } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
400      if (!isDeletedByDeleteFamily(cell)) {
401        deleteFamilyVersionList.add(cell);
402        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
403          // This means both the delete-family and delete-family-version markers have the same
404          // timestamp but the sequence id of delete-family-version marker is higher than that of
405          // the delete-family marker. In this case, there is no need to add the
406          // delete-family-version marker to the live version file. This case happens only with
407          // the new version behavior.
408          liveFileWriter.append(cell);
409        } else {
410          liveFileWriter.append(cell);
411        }
412      } else {
413        getHistoricalFileWriter().append(cell);
414      }
415    } else if (cell.getType() == Cell.Type.DeleteColumn) {
416      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
417        deleteColumn = cell;
418        liveFileWriter.append(cell);
419      } else {
420        getHistoricalFileWriter().append(cell);
421      }
422    } else if (cell.getType() == Cell.Type.Delete) {
423      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
424        deleteColumnVersionList.add(cell);
425        if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) {
426          // This means both the delete-family and delete-column-version markers have the same
427          // timestamp but the sequence id of delete-column-version marker is higher than that of
428          // the delete-family marker. In this case, there is no need to add the
429          // delete-column-version marker to the live version file. This case happens only with
430          // the new version behavior.
431          getHistoricalFileWriter().append(cell);
432        } else {
433          liveFileWriter.append(cell);
434        }
435      } else {
436        getHistoricalFileWriter().append(cell);
437      }
438    } else if (cell.getType() == Cell.Type.Put) {
439      if (livePutCellCount < maxVersions) {
440        // This is a live put cell (i.e., the latest version) of a column. Is it deleted?
441        if (!isDeleted(cell)) {
442          liveFileWriter.append(cell);
443          livePutCellCount++;
444        } else {
445          // It is deleted
446          getHistoricalFileWriter().append(cell);
447          if (newVersionBehavior) {
448            // Deleted versions are considered toward total version count when newVersionBehavior
449            livePutCellCount++;
450          }
451        }
452      } else {
453        // It is an older put cell
454        getHistoricalFileWriter().append(cell);
455      }
456    }
457    lastCell = cell;
458  }
459
460  @Override
461  public void appendAll(List<Cell> cellList) throws IOException {
462    if (historicalFilePath == null) {
463      // The dual writing is not enabled and all cells are written to one file. We use
464      // the live version file in this case
465      for (Cell cell : cellList) {
466        liveFileWriter.append(cell);
467      }
468      return;
469    }
470    if (cellList.isEmpty()) {
471      return;
472    }
473    if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) {
474      // It is a new row and thus time to reset the state
475      initRowState();
476    }
477    for (Cell cell : cellList) {
478      appendCell(cell);
479    }
480  }
481
482  @Override
483  public void append(Cell cell) throws IOException {
484    if (historicalFilePath == null) {
485      // The dual writing is not enabled and all cells are written to one file. We use
486      // the live version file in this case
487      liveFileWriter.append(cell);
488      return;
489    }
490    appendCell(cell);
491  }
492
493  private static class SingleStoreFileWriter {
494    private final BloomFilterWriter generalBloomFilterWriter;
495    private final BloomFilterWriter deleteFamilyBloomFilterWriter;
496    private final BloomType bloomType;
497    private byte[] bloomParam = null;
498    private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
499    private long deleteFamilyCnt = 0;
500    private BloomContext bloomContext = null;
501    private BloomContext deleteFamilyBloomContext = null;
502    private final TimeRangeTracker timeRangeTracker;
503    private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
504
505    private HFile.Writer writer;
506
507    /**
508     * Creates an HFile.Writer that also write helpful meta data.
509     * @param fs                     file system to write to
510     * @param path                   file name to create
511     * @param conf                   user configuration
512     * @param bloomType              bloom filter setting
513     * @param maxKeys                the expected maximum number of keys to be added. Was used for
514     *                               Bloom filter size in {@link HFile} format version 1.
515     * @param favoredNodes           an array of favored nodes or possibly null
516     * @param fileContext            The HFile context
517     * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
518     * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
519     * @throws IOException problem writing to FS
520     */
521    private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf,
522      CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes,
523      HFileContext fileContext, boolean shouldDropCacheBehind,
524      Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException {
525      this.compactedFilesSupplier = compactedFilesSupplier;
526      this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
527      // TODO : Change all writers to be specifically created for compaction context
528      writer =
529        HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes)
530          .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create();
531
532      generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf,
533        bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
534
535      if (generalBloomFilterWriter != null) {
536        this.bloomType = bloomType;
537        this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
538        if (LOG.isTraceEnabled()) {
539          LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
540            + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH
541              ? Bytes.toInt(bloomParam)
542              : Bytes.toStringBinary(bloomParam))
543            + ", " + generalBloomFilterWriter.getClass().getSimpleName());
544        }
545        // init bloom context
546        switch (bloomType) {
547          case ROW:
548            bloomContext =
549              new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
550            break;
551          case ROWCOL:
552            bloomContext =
553              new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
554            break;
555          case ROWPREFIX_FIXED_LENGTH:
556            bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
557              fileContext.getCellComparator(), Bytes.toInt(bloomParam));
558            break;
559          default:
560            throw new IOException(
561              "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
562        }
563      } else {
564        // Not using Bloom filters.
565        this.bloomType = BloomType.NONE;
566      }
567
568      // initialize delete family Bloom filter when there is NO RowCol Bloom filter
569      if (this.bloomType != BloomType.ROWCOL) {
570        this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf,
571          cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
572        deleteFamilyBloomContext =
573          new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
574      } else {
575        deleteFamilyBloomFilterWriter = null;
576      }
577      if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
578        LOG.trace("Delete Family Bloom filter type for " + path + ": "
579          + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
580      }
581    }
582
583    private long getPos() throws IOException {
584      return ((HFileWriterImpl) writer).getPos();
585    }
586
587    /**
588     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
589     * @param maxSequenceId   Maximum sequence id.
590     * @param majorCompaction True if this file is product of a major compaction
591     * @throws IOException problem writing to FS
592     */
593    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
594      throws IOException {
595      appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
596    }
597
598    /**
599     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
600     * @param maxSequenceId   Maximum sequence id.
601     * @param majorCompaction True if this file is product of a major compaction
602     * @param storeFiles      The compacted store files to generate this new file
603     * @throws IOException problem writing to FS
604     */
605    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
606      final Collection<HStoreFile> storeFiles) throws IOException {
607      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
608      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
609      writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
610      appendTrackedTimestampsToMetadata();
611    }
612
613    /**
614     * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The
615     * compacted store files's name is needed. But if the compacted store file is a result of
616     * compaction, it's compacted files which still not archived is needed, too. And don't need to
617     * add compacted files recursively. If file A, B, C compacted to new file D, and file D
618     * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E
619     * compacted to new file F, will add E to F's compacted files first, then add E's compacted
620     * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has
621     * been in E's compacted files, too. See HBASE-20724 for more details.
622     * @param storeFiles The compacted store files to generate this new file
623     * @return bytes of CompactionEventTracker
624     */
625    private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
626      Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream()
627        .map(sf -> sf.getPath().getName()).collect(Collectors.toSet());
628      Set<String> compactedStoreFiles = new HashSet<>();
629      for (HStoreFile storeFile : storeFiles) {
630        compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
631        for (String csf : storeFile.getCompactedStoreFiles()) {
632          if (notArchivedCompactedStoreFiles.contains(csf)) {
633            compactedStoreFiles.add(csf);
634          }
635        }
636      }
637      return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
638    }
639
640    /**
641     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
642     * @param maxSequenceId   Maximum sequence id.
643     * @param majorCompaction True if this file is product of a major compaction
644     * @param mobCellsCount   The number of mob cells.
645     * @throws IOException problem writing to FS
646     */
647    private void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
648      final long mobCellsCount) throws IOException {
649      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
650      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
651      writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
652      appendTrackedTimestampsToMetadata();
653    }
654
655    /**
656     * Appends MOB - specific metadata (even if it is empty)
657     * @param mobRefSet - original table -> set of MOB file names
658     * @throws IOException problem writing to FS
659     */
660    private void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
661      writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
662    }
663
664    /**
665     * Add TimestampRange and earliest put timestamp to Metadata
666     */
667    private void appendTrackedTimestampsToMetadata() throws IOException {
668      // TODO: The StoreFileReader always converts the byte[] to TimeRange
669      // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
670      appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
671      appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
672    }
673
674    /**
675     * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
676     * to include the timestamp of this key
677     */
678    private void trackTimestamps(final Cell cell) {
679      if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
680        earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
681      }
682      timeRangeTracker.includeTimestamp(cell);
683    }
684
685    private void appendGeneralBloomfilter(final Cell cell) throws IOException {
686      if (this.generalBloomFilterWriter != null) {
687        /*
688         * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.
689         * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of
690         * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed
691         * Length Row Prefix
692         */
693        bloomContext.writeBloom(cell);
694      }
695    }
696
697    private void appendDeleteFamilyBloomFilter(final Cell cell) throws IOException {
698      if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
699        return;
700      }
701
702      // increase the number of delete family in the store file
703      deleteFamilyCnt++;
704      if (this.deleteFamilyBloomFilterWriter != null) {
705        deleteFamilyBloomContext.writeBloom(cell);
706      }
707    }
708
709    private void append(final Cell cell) throws IOException {
710      appendGeneralBloomfilter(cell);
711      appendDeleteFamilyBloomFilter(cell);
712      writer.append(cell);
713      trackTimestamps(cell);
714    }
715
716    private void beforeShipped() throws IOException {
717      // For now these writer will always be of type ShipperListener true.
718      // TODO : Change all writers to be specifically created for compaction context
719      writer.beforeShipped();
720      if (generalBloomFilterWriter != null) {
721        generalBloomFilterWriter.beforeShipped();
722      }
723      if (deleteFamilyBloomFilterWriter != null) {
724        deleteFamilyBloomFilterWriter.beforeShipped();
725      }
726    }
727
728    private Path getPath() {
729      return this.writer.getPath();
730    }
731
732    private boolean hasGeneralBloom() {
733      return this.generalBloomFilterWriter != null;
734    }
735
736    /**
737     * For unit testing only.
738     * @return the Bloom filter used by this writer.
739     */
740    BloomFilterWriter getGeneralBloomWriter() {
741      return generalBloomFilterWriter;
742    }
743
744    private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
745      boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
746      if (haveBloom) {
747        bfw.compactBloom();
748      }
749      return haveBloom;
750    }
751
752    private boolean closeGeneralBloomFilter() throws IOException {
753      boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
754
755      // add the general Bloom filter writer and append file info
756      if (hasGeneralBloom) {
757        writer.addGeneralBloomFilter(generalBloomFilterWriter);
758        writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
759        if (bloomParam != null) {
760          writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
761        }
762        bloomContext.addLastBloomKey(writer);
763      }
764      return hasGeneralBloom;
765    }
766
767    private boolean closeDeleteFamilyBloomFilter() throws IOException {
768      boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
769
770      // add the delete family Bloom filter writer
771      if (hasDeleteFamilyBloom) {
772        writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
773      }
774
775      // append file info about the number of delete family kvs
776      // even if there is no delete family Bloom.
777      writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
778
779      return hasDeleteFamilyBloom;
780    }
781
782    private void close() throws IOException {
783      boolean hasGeneralBloom = this.closeGeneralBloomFilter();
784      boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
785
786      writer.close();
787
788      // Log final Bloom filter statistics. This needs to be done after close()
789      // because compound Bloom filters might be finalized as part of closing.
790      if (LOG.isTraceEnabled()) {
791        LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and "
792          + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile "
793          + getPath());
794      }
795
796    }
797
798    private void appendFileInfo(byte[] key, byte[] value) throws IOException {
799      writer.appendFileInfo(key, value);
800    }
801
802    /**
803     * For use in testing.
804     */
805    private HFile.Writer getHFileWriter() {
806      return writer;
807    }
808  }
809
810  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
811      justification = "Will not overflow")
812  public static class Builder {
813    private final Configuration conf;
814    private final CacheConfig cacheConf;
815    private final FileSystem fs;
816
817    private BloomType bloomType = BloomType.NONE;
818    private long maxKeyCount = 0;
819    private Path dir;
820    private Path liveFilePath;
821    private Path historicalFilePath;
822
823    private InetSocketAddress[] favoredNodes;
824    private HFileContext fileContext;
825    private boolean shouldDropCacheBehind;
826    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
827    private String fileStoragePolicy;
828    // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
829    // implementation where we will write store files directly to the final place, instead of
830    // writing a tmp file first. Under this scenario, we will have a background task to purge the
831    // store files which are not recorded in the SFT, but for the newly created store file writer,
832    // they are not tracked in SFT, so here we need to record them and treat them specially.
833    private Consumer<Path> writerCreationTracker;
834    private int maxVersions;
835    private boolean newVersionBehavior;
836    private CellComparator comparator;
837    private boolean isCompaction;
838
839    public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
840      this.conf = conf;
841      this.cacheConf = cacheConf;
842      this.fs = fs;
843    }
844
845    /**
846     * Creates Builder with cache configuration disabled
847     */
848    public Builder(Configuration conf, FileSystem fs) {
849      this.conf = conf;
850      this.cacheConf = CacheConfig.DISABLED;
851      this.fs = fs;
852    }
853
854    /**
855     * Use either this method or {@link #withFilePath}, but not both.
856     * @param dir Path to column family directory. The directory is created if does not exist. The
857     *            file is given a unique name within this directory.
858     * @return this (for chained invocation)
859     */
860    public Builder withOutputDir(Path dir) {
861      Preconditions.checkNotNull(dir);
862      this.dir = dir;
863      return this;
864    }
865
866    /**
867     * Use either this method or {@link #withOutputDir}, but not both.
868     * @param filePath the StoreFile path to write
869     * @return this (for chained invocation)
870     */
871    public Builder withFilePath(Path filePath) {
872      Preconditions.checkNotNull(filePath);
873      this.liveFilePath = filePath;
874      return this;
875    }
876
877    /**
878     * @param favoredNodes an array of favored nodes or possibly null
879     * @return this (for chained invocation)
880     */
881    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
882      this.favoredNodes = favoredNodes;
883      return this;
884    }
885
886    public Builder withBloomType(BloomType bloomType) {
887      Preconditions.checkNotNull(bloomType);
888      this.bloomType = bloomType;
889      return this;
890    }
891
892    /**
893     * @param maxKeyCount estimated maximum number of keys we expect to add
894     * @return this (for chained invocation)
895     */
896    public Builder withMaxKeyCount(long maxKeyCount) {
897      this.maxKeyCount = maxKeyCount;
898      return this;
899    }
900
901    public Builder withFileContext(HFileContext fileContext) {
902      this.fileContext = fileContext;
903      return this;
904    }
905
906    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
907      this.shouldDropCacheBehind = shouldDropCacheBehind;
908      return this;
909    }
910
911    public Builder
912      withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
913      this.compactedFilesSupplier = compactedFilesSupplier;
914      return this;
915    }
916
917    public Builder withFileStoragePolicy(String fileStoragePolicy) {
918      this.fileStoragePolicy = fileStoragePolicy;
919      return this;
920    }
921
922    public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
923      this.writerCreationTracker = writerCreationTracker;
924      return this;
925    }
926
927    public Builder withMaxVersions(int maxVersions) {
928      this.maxVersions = maxVersions;
929      return this;
930    }
931
932    public Builder withNewVersionBehavior(boolean newVersionBehavior) {
933      this.newVersionBehavior = newVersionBehavior;
934      return this;
935    }
936
937    public Builder withCellComparator(CellComparator comparator) {
938      this.comparator = comparator;
939      return this;
940    }
941
942    public Builder withIsCompaction(boolean isCompaction) {
943      this.isCompaction = isCompaction;
944      return this;
945    }
946
947    /**
948     * Create a store file writer. Client is responsible for closing file when done. If metadata,
949     * add BEFORE closing using {@link StoreFileWriter#appendMetadata}.
950     */
951    public StoreFileWriter build() throws IOException {
952      if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) {
953        throw new IllegalArgumentException("Either specify parent directory " + "or file path");
954      }
955
956      if (dir == null) {
957        dir = liveFilePath.getParent();
958      }
959
960      if (!fs.exists(dir)) {
961        // Handle permission for non-HDFS filesystem properly
962        // See HBASE-17710
963        HRegionFileSystem.mkdirs(fs, conf, dir);
964      }
965
966      // set block storage policy for temp path
967      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
968      if (null == policyName) {
969        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
970      }
971      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
972
973      if (liveFilePath == null) {
974        // The stored file and related blocks will used the directory based StoragePolicy.
975        // Because HDFS DistributedFileSystem does not support create files with storage policy
976        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
977        // satisfy the specific storage policy when writing. So as to avoid later data movement.
978        // We don't want to change whole temp dir to 'fileStoragePolicy'.
979        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
980          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
981          if (!fs.exists(dir)) {
982            HRegionFileSystem.mkdirs(fs, conf, dir);
983            LOG.info(
984              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
985          }
986          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
987        }
988        liveFilePath = getUniqueFile(fs, dir);
989        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
990          bloomType = BloomType.NONE;
991        }
992      }
993
994      if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) {
995        historicalFilePath = getUniqueFile(fs, dir);
996      }
997
998      // make sure we call this before actually create the writer
999      // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
1000      // try to delete it and finally we will clean the tracker up after compaction. But if the file
1001      // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
1002      // and cause problem.
1003      if (writerCreationTracker != null) {
1004        writerCreationTracker.accept(liveFilePath);
1005        if (historicalFilePath != null) {
1006          writerCreationTracker.accept(historicalFilePath);
1007        }
1008      }
1009      return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType,
1010        maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier,
1011        comparator, maxVersions, newVersionBehavior);
1012    }
1013  }
1014}