001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
029
030import java.io.IOException;
031import java.net.InetSocketAddress;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.HashSet;
035import java.util.Set;
036import java.util.UUID;
037import java.util.function.Supplier;
038import java.util.regex.Pattern;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.PrivateCellUtil;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.io.hfile.CacheConfig;
049import org.apache.hadoop.hbase.io.hfile.HFile;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.util.BloomContext;
052import org.apache.hadoop.hbase.util.BloomFilterFactory;
053import org.apache.hadoop.hbase.util.BloomFilterUtil;
054import org.apache.hadoop.hbase.util.BloomFilterWriter;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.RowBloomContext;
058import org.apache.hadoop.hbase.util.RowColBloomContext;
059import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
065import org.apache.hbase.thirdparty.com.google.common.base.Strings;
066import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
067
068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
069
070/**
071 * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
072 * local because it is an implementation detail of the HBase regionserver.
073 */
074@InterfaceAudience.Private
075public class StoreFileWriter implements CellSink, ShipperListener {
076  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
077  private static final Pattern dash = Pattern.compile("-");
078  private final BloomFilterWriter generalBloomFilterWriter;
079  private final BloomFilterWriter deleteFamilyBloomFilterWriter;
080  private final BloomType bloomType;
081  private byte[] bloomParam = null;
082  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
083  private long deleteFamilyCnt = 0;
084  private BloomContext bloomContext = null;
085  private BloomContext deleteFamilyBloomContext = null;
086  private final TimeRangeTracker timeRangeTracker;
087  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
088
089  protected HFile.Writer writer;
090
091  /**
092   * Creates an HFile.Writer that also write helpful meta data.
093   *
094   * @param fs                     file system to write to
095   * @param path                   file name to create
096   * @param conf                   user configuration
097   * @param bloomType              bloom filter setting
098   * @param maxKeys                the expected maximum number of keys to be added. Was used
099   *                               for Bloom filter size in {@link HFile} format version 1.
100   * @param favoredNodes           an array of favored nodes or possibly null
101   * @param fileContext            The HFile context
102   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
103   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
104   * @throws IOException problem writing to FS
105   */
106  private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
107      BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext,
108      boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier)
109        throws IOException {
110    this.compactedFilesSupplier = compactedFilesSupplier;
111    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
112    // TODO : Change all writers to be specifically created for compaction context
113    writer = HFile.getWriterFactory(conf, cacheConf)
114        .withPath(fs, path)
115        .withFavoredNodes(favoredNodes)
116        .withFileContext(fileContext)
117        .withShouldDropCacheBehind(shouldDropCacheBehind)
118        .create();
119
120    generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
121        conf, cacheConf, bloomType,
122        (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
123
124    if (generalBloomFilterWriter != null) {
125      this.bloomType = bloomType;
126      this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
127      if (LOG.isTraceEnabled()) {
128        LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
129            + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH?
130            Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam))
131            + ", " + generalBloomFilterWriter.getClass().getSimpleName());
132      }
133      // init bloom context
134      switch (bloomType) {
135        case ROW:
136          bloomContext =
137            new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
138          break;
139        case ROWCOL:
140          bloomContext =
141            new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
142          break;
143        case ROWPREFIX_FIXED_LENGTH:
144          bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
145            fileContext.getCellComparator(), Bytes.toInt(bloomParam));
146          break;
147        default:
148          throw new IOException(
149              "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
150      }
151    } else {
152      // Not using Bloom filters.
153      this.bloomType = BloomType.NONE;
154    }
155
156    // initialize delete family Bloom filter when there is NO RowCol Bloom
157    // filter
158    if (this.bloomType != BloomType.ROWCOL) {
159      this.deleteFamilyBloomFilterWriter = BloomFilterFactory
160          .createDeleteBloomAtWrite(conf, cacheConf,
161              (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
162      deleteFamilyBloomContext =
163        new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
164    } else {
165      deleteFamilyBloomFilterWriter = null;
166    }
167    if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
168      LOG.trace("Delete Family Bloom filter type for " + path + ": " +
169          deleteFamilyBloomFilterWriter.getClass().getSimpleName());
170    }
171  }
172
173  /**
174   * Writes meta data.
175   * Call before {@link #close()} since its written as meta data to this file.
176   * @param maxSequenceId Maximum sequence id.
177   * @param majorCompaction True if this file is product of a major compaction
178   * @throws IOException problem writing to FS
179   */
180  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
181      throws IOException {
182    appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
183  }
184
185  /**
186   * Writes meta data.
187   * Call before {@link #close()} since its written as meta data to this file.
188   * @param maxSequenceId Maximum sequence id.
189   * @param majorCompaction True if this file is product of a major compaction
190   * @param storeFiles The compacted store files to generate this new file
191   * @throws IOException problem writing to FS
192   */
193  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
194      final Collection<HStoreFile> storeFiles) throws IOException {
195    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
196    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
197    writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
198    appendTrackedTimestampsToMetadata();
199  }
200
201  /**
202   * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted
203   * store files's name is needed. But if the compacted store file is a result of compaction, it's
204   * compacted files which still not archived is needed, too. And don't need to add compacted files
205   * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will
206   * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E
207   * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to
208   * add D's compacted file, as D's compacted files has been in E's compacted files, too.
209   * See HBASE-20724 for more details.
210   *
211   * @param storeFiles The compacted store files to generate this new file
212   * @return bytes of CompactionEventTracker
213   */
214  private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
215    Set<String> notArchivedCompactedStoreFiles =
216        this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName())
217            .collect(Collectors.toSet());
218    Set<String> compactedStoreFiles = new HashSet<>();
219    for (HStoreFile storeFile : storeFiles) {
220      compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
221      for (String csf : storeFile.getCompactedStoreFiles()) {
222        if (notArchivedCompactedStoreFiles.contains(csf)) {
223          compactedStoreFiles.add(csf);
224        }
225      }
226    }
227    return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
228  }
229
230  /**
231   * Writes meta data.
232   * Call before {@link #close()} since its written as meta data to this file.
233   * @param maxSequenceId Maximum sequence id.
234   * @param majorCompaction True if this file is product of a major compaction
235   * @param mobCellsCount The number of mob cells.
236   * @throws IOException problem writing to FS
237   */
238  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
239      final long mobCellsCount) throws IOException {
240    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
241    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
242    writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
243    appendTrackedTimestampsToMetadata();
244  }
245
246  /**
247   * Add TimestampRange and earliest put timestamp to Metadata
248   */
249  public void appendTrackedTimestampsToMetadata() throws IOException {
250    // TODO: The StoreFileReader always converts the byte[] to TimeRange
251    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
252    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
253    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
254  }
255
256  /**
257   * Record the earlest Put timestamp.
258   *
259   * If the timeRangeTracker is not set,
260   * update TimeRangeTracker to include the timestamp of this key
261   */
262  public void trackTimestamps(final Cell cell) {
263    if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
264      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
265    }
266    timeRangeTracker.includeTimestamp(cell);
267  }
268
269  private void appendGeneralBloomfilter(final Cell cell) throws IOException {
270    if (this.generalBloomFilterWriter != null) {
271      /*
272       * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
273       * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp
274       *
275       * 3 Types of Filtering:
276       *  1. Row = Row
277       *  2. RowCol = Row + Qualifier
278       *  3. RowPrefixFixedLength  = Fixed Length Row Prefix
279       */
280      bloomContext.writeBloom(cell);
281    }
282  }
283
284  private void appendDeleteFamilyBloomFilter(final Cell cell)
285      throws IOException {
286    if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
287      return;
288    }
289
290    // increase the number of delete family in the store file
291    deleteFamilyCnt++;
292    if (this.deleteFamilyBloomFilterWriter != null) {
293      deleteFamilyBloomContext.writeBloom(cell);
294    }
295  }
296
297  @Override
298  public void append(final Cell cell) throws IOException {
299    appendGeneralBloomfilter(cell);
300    appendDeleteFamilyBloomFilter(cell);
301    writer.append(cell);
302    trackTimestamps(cell);
303  }
304
305  @Override
306  public void beforeShipped() throws IOException {
307    // For now these writer will always be of type ShipperListener true.
308    // TODO : Change all writers to be specifically created for compaction context
309    writer.beforeShipped();
310    if (generalBloomFilterWriter != null) {
311      generalBloomFilterWriter.beforeShipped();
312    }
313    if (deleteFamilyBloomFilterWriter != null) {
314      deleteFamilyBloomFilterWriter.beforeShipped();
315    }
316  }
317
318  public Path getPath() {
319    return this.writer.getPath();
320  }
321
322  public boolean hasGeneralBloom() {
323    return this.generalBloomFilterWriter != null;
324  }
325
326  /**
327   * For unit testing only.
328   *
329   * @return the Bloom filter used by this writer.
330   */
331  BloomFilterWriter getGeneralBloomWriter() {
332    return generalBloomFilterWriter;
333  }
334
335  private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
336    boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
337    if (haveBloom) {
338      bfw.compactBloom();
339    }
340    return haveBloom;
341  }
342
343  private boolean closeGeneralBloomFilter() throws IOException {
344    boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
345
346    // add the general Bloom filter writer and append file info
347    if (hasGeneralBloom) {
348      writer.addGeneralBloomFilter(generalBloomFilterWriter);
349      writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
350      if (bloomParam != null) {
351        writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
352      }
353      bloomContext.addLastBloomKey(writer);
354    }
355    return hasGeneralBloom;
356  }
357
358  private boolean closeDeleteFamilyBloomFilter() throws IOException {
359    boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
360
361    // add the delete family Bloom filter writer
362    if (hasDeleteFamilyBloom) {
363      writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
364    }
365
366    // append file info about the number of delete family kvs
367    // even if there is no delete family Bloom.
368    writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
369
370    return hasDeleteFamilyBloom;
371  }
372
373  public void close() throws IOException {
374    boolean hasGeneralBloom = this.closeGeneralBloomFilter();
375    boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
376
377    writer.close();
378
379    // Log final Bloom filter statistics. This needs to be done after close()
380    // because compound Bloom filters might be finalized as part of closing.
381    if (LOG.isTraceEnabled()) {
382      LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
383        (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
384        getPath());
385    }
386
387  }
388
389  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
390    writer.appendFileInfo(key, value);
391  }
392
393  /** For use in testing.
394   */
395  HFile.Writer getHFileWriter() {
396    return writer;
397  }
398
399  /**
400   * @param dir Directory to create file in.
401   * @return random filename inside passed <code>dir</code>
402   */
403  static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
404    if (!fs.getFileStatus(dir).isDirectory()) {
405      throw new IOException("Expecting " + dir.toString() + " to be a directory");
406    }
407    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
408  }
409
410  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
411      justification="Will not overflow")
412  public static class Builder {
413    private final Configuration conf;
414    private final CacheConfig cacheConf;
415    private final FileSystem fs;
416
417    private BloomType bloomType = BloomType.NONE;
418    private long maxKeyCount = 0;
419    private Path dir;
420    private Path filePath;
421    private InetSocketAddress[] favoredNodes;
422    private HFileContext fileContext;
423    private boolean shouldDropCacheBehind;
424    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
425    private String fileStoragePolicy;
426
427    public Builder(Configuration conf, CacheConfig cacheConf,
428        FileSystem fs) {
429      this.conf = conf;
430      this.cacheConf = cacheConf;
431      this.fs = fs;
432    }
433
434    /**
435     * Creates Builder with cache configuration disabled
436     */
437    public Builder(Configuration conf, FileSystem fs) {
438      this.conf = conf;
439      this.cacheConf = CacheConfig.DISABLED;
440      this.fs = fs;
441    }
442
443    /**
444     * Use either this method or {@link #withFilePath}, but not both.
445     * @param dir Path to column family directory. The directory is created if
446     *          does not exist. The file is given a unique name within this
447     *          directory.
448     * @return this (for chained invocation)
449     */
450    public Builder withOutputDir(Path dir) {
451      Preconditions.checkNotNull(dir);
452      this.dir = dir;
453      return this;
454    }
455
456    /**
457     * Use either this method or {@link #withOutputDir}, but not both.
458     * @param filePath the StoreFile path to write
459     * @return this (for chained invocation)
460     */
461    public Builder withFilePath(Path filePath) {
462      Preconditions.checkNotNull(filePath);
463      this.filePath = filePath;
464      return this;
465    }
466
467    /**
468     * @param favoredNodes an array of favored nodes or possibly null
469     * @return this (for chained invocation)
470     */
471    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
472      this.favoredNodes = favoredNodes;
473      return this;
474    }
475
476    public Builder withBloomType(BloomType bloomType) {
477      Preconditions.checkNotNull(bloomType);
478      this.bloomType = bloomType;
479      return this;
480    }
481
482    /**
483     * @param maxKeyCount estimated maximum number of keys we expect to add
484     * @return this (for chained invocation)
485     */
486    public Builder withMaxKeyCount(long maxKeyCount) {
487      this.maxKeyCount = maxKeyCount;
488      return this;
489    }
490
491    public Builder withFileContext(HFileContext fileContext) {
492      this.fileContext = fileContext;
493      return this;
494    }
495
496    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
497      this.shouldDropCacheBehind = shouldDropCacheBehind;
498      return this;
499    }
500
501    public Builder withCompactedFilesSupplier(
502        Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
503      this.compactedFilesSupplier = compactedFilesSupplier;
504      return this;
505    }
506
507    public Builder withFileStoragePolicy(String fileStoragePolicy) {
508      this.fileStoragePolicy = fileStoragePolicy;
509      return this;
510    }
511
512    /**
513     * Create a store file writer. Client is responsible for closing file when
514     * done. If metadata, add BEFORE closing using
515     * {@link StoreFileWriter#appendMetadata}.
516     */
517    public StoreFileWriter build() throws IOException {
518      if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
519        throw new IllegalArgumentException("Either specify parent directory " +
520            "or file path");
521      }
522
523      if (dir == null) {
524        dir = filePath.getParent();
525      }
526
527      if (!fs.exists(dir)) {
528        // Handle permission for non-HDFS filesystem properly
529        // See HBASE-17710
530        HRegionFileSystem.mkdirs(fs, conf, dir);
531      }
532
533      // set block storage policy for temp path
534      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
535      if (null == policyName) {
536        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
537      }
538      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
539
540      if (filePath == null) {
541        // The stored file and related blocks will used the directory based StoragePolicy.
542        // Because HDFS DistributedFileSystem does not support create files with storage policy
543        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
544        // satisfy the specific storage policy when writing. So as to avoid later data movement.
545        // We don't want to change whole temp dir to 'fileStoragePolicy'.
546        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
547          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
548          if (!fs.exists(dir)) {
549            HRegionFileSystem.mkdirs(fs, conf, dir);
550            LOG.info(
551              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
552          }
553          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
554        }
555        filePath = getUniqueFile(fs, dir);
556        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
557          bloomType = BloomType.NONE;
558        }
559      }
560
561      return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
562          favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
563    }
564  }
565}