001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
028import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
029
030import java.io.IOException;
031import java.net.InetSocketAddress;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.HashSet;
035import java.util.Set;
036import java.util.UUID;
037import java.util.function.Supplier;
038import java.util.regex.Pattern;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.PrivateCellUtil;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.io.hfile.CacheConfig;
049import org.apache.hadoop.hbase.io.hfile.HFile;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.util.BloomContext;
052import org.apache.hadoop.hbase.util.BloomFilterFactory;
053import org.apache.hadoop.hbase.util.BloomFilterUtil;
054import org.apache.hadoop.hbase.util.BloomFilterWriter;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.RowBloomContext;
058import org.apache.hadoop.hbase.util.RowColBloomContext;
059import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067
068/**
069 * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
070 * local because it is an implementation detail of the HBase regionserver.
071 */
072@InterfaceAudience.Private
073public class StoreFileWriter implements CellSink, ShipperListener {
074  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
075  private static final Pattern dash = Pattern.compile("-");
076  private final BloomFilterWriter generalBloomFilterWriter;
077  private final BloomFilterWriter deleteFamilyBloomFilterWriter;
078  private final BloomType bloomType;
079  private byte[] bloomParam = null;
080  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
081  private long deleteFamilyCnt = 0;
082  private BloomContext bloomContext = null;
083  private BloomContext deleteFamilyBloomContext = null;
084  private final TimeRangeTracker timeRangeTracker;
085  private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
086
087  protected HFile.Writer writer;
088
089  /**
090   * Creates an HFile.Writer that also write helpful meta data.
091   *
092   * @param fs                     file system to write to
093   * @param path                   file name to create
094   * @param conf                   user configuration
095   * @param bloomType              bloom filter setting
096   * @param maxKeys                the expected maximum number of keys to be added. Was used
097   *                               for Bloom filter size in {@link HFile} format version 1.
098   * @param favoredNodes           an array of favored nodes or possibly null
099   * @param fileContext            The HFile context
100   * @param shouldDropCacheBehind  Drop pages written to page cache after writing the store file.
101   * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
102   * @throws IOException problem writing to FS
103   */
104  private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
105      BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext,
106      boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier)
107        throws IOException {
108    this.compactedFilesSupplier = compactedFilesSupplier;
109    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
110    // TODO : Change all writers to be specifically created for compaction context
111    writer = HFile.getWriterFactory(conf, cacheConf)
112        .withPath(fs, path)
113        .withFavoredNodes(favoredNodes)
114        .withFileContext(fileContext)
115        .withShouldDropCacheBehind(shouldDropCacheBehind)
116        .create();
117
118    generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
119        conf, cacheConf, bloomType,
120        (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
121
122    if (generalBloomFilterWriter != null) {
123      this.bloomType = bloomType;
124      this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
125      if (LOG.isTraceEnabled()) {
126        LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
127            + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH?
128            Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam))
129            + ", " + generalBloomFilterWriter.getClass().getSimpleName());
130      }
131      // init bloom context
132      switch (bloomType) {
133        case ROW:
134          bloomContext =
135            new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
136          break;
137        case ROWCOL:
138          bloomContext =
139            new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
140          break;
141        case ROWPREFIX_FIXED_LENGTH:
142          bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
143            fileContext.getCellComparator(), Bytes.toInt(bloomParam));
144          break;
145        default:
146          throw new IOException(
147              "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
148      }
149    } else {
150      // Not using Bloom filters.
151      this.bloomType = BloomType.NONE;
152    }
153
154    // initialize delete family Bloom filter when there is NO RowCol Bloom
155    // filter
156    if (this.bloomType != BloomType.ROWCOL) {
157      this.deleteFamilyBloomFilterWriter = BloomFilterFactory
158          .createDeleteBloomAtWrite(conf, cacheConf,
159              (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
160      deleteFamilyBloomContext =
161        new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
162    } else {
163      deleteFamilyBloomFilterWriter = null;
164    }
165    if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
166      LOG.trace("Delete Family Bloom filter type for " + path + ": " +
167          deleteFamilyBloomFilterWriter.getClass().getSimpleName());
168    }
169  }
170
171  /**
172   * Writes meta data.
173   * Call before {@link #close()} since its written as meta data to this file.
174   * @param maxSequenceId Maximum sequence id.
175   * @param majorCompaction True if this file is product of a major compaction
176   * @throws IOException problem writing to FS
177   */
178  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
179      throws IOException {
180    appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
181  }
182
183  /**
184   * Writes meta data.
185   * Call before {@link #close()} since its written as meta data to this file.
186   * @param maxSequenceId Maximum sequence id.
187   * @param majorCompaction True if this file is product of a major compaction
188   * @param storeFiles The compacted store files to generate this new file
189   * @throws IOException problem writing to FS
190   */
191  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
192      final Collection<HStoreFile> storeFiles) throws IOException {
193    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
194    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
195    writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
196    appendTrackedTimestampsToMetadata();
197  }
198
199  /**
200   * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted
201   * store files's name is needed. But if the compacted store file is a result of compaction, it's
202   * compacted files which still not archived is needed, too. And don't need to add compacted files
203   * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will
204   * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E
205   * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to
206   * add D's compacted file, as D's compacted files has been in E's compacted files, too.
207   * See HBASE-20724 for more details.
208   *
209   * @param storeFiles The compacted store files to generate this new file
210   * @return bytes of CompactionEventTracker
211   */
212  private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
213    Set<String> notArchivedCompactedStoreFiles =
214        this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName())
215            .collect(Collectors.toSet());
216    Set<String> compactedStoreFiles = new HashSet<>();
217    for (HStoreFile storeFile : storeFiles) {
218      compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
219      for (String csf : storeFile.getCompactedStoreFiles()) {
220        if (notArchivedCompactedStoreFiles.contains(csf)) {
221          compactedStoreFiles.add(csf);
222        }
223      }
224    }
225    return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
226  }
227
228  /**
229   * Writes meta data.
230   * Call before {@link #close()} since its written as meta data to this file.
231   * @param maxSequenceId Maximum sequence id.
232   * @param majorCompaction True if this file is product of a major compaction
233   * @param mobCellsCount The number of mob cells.
234   * @throws IOException problem writing to FS
235   */
236  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
237      final long mobCellsCount) throws IOException {
238    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
239    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
240    writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
241    appendTrackedTimestampsToMetadata();
242  }
243
244  /**
245   * Add TimestampRange and earliest put timestamp to Metadata
246   */
247  public void appendTrackedTimestampsToMetadata() throws IOException {
248    // TODO: The StoreFileReader always converts the byte[] to TimeRange
249    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
250    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
251    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
252  }
253
254  /**
255   * Record the earlest Put timestamp.
256   *
257   * If the timeRangeTracker is not set,
258   * update TimeRangeTracker to include the timestamp of this key
259   */
260  public void trackTimestamps(final Cell cell) {
261    if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
262      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
263    }
264    timeRangeTracker.includeTimestamp(cell);
265  }
266
267  private void appendGeneralBloomfilter(final Cell cell) throws IOException {
268    if (this.generalBloomFilterWriter != null) {
269      /*
270       * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
271       * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp
272       *
273       * 3 Types of Filtering:
274       *  1. Row = Row
275       *  2. RowCol = Row + Qualifier
276       *  3. RowPrefixFixedLength  = Fixed Length Row Prefix
277       */
278      bloomContext.writeBloom(cell);
279    }
280  }
281
282  private void appendDeleteFamilyBloomFilter(final Cell cell)
283      throws IOException {
284    if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
285      return;
286    }
287
288    // increase the number of delete family in the store file
289    deleteFamilyCnt++;
290    if (this.deleteFamilyBloomFilterWriter != null) {
291      deleteFamilyBloomContext.writeBloom(cell);
292    }
293  }
294
295  @Override
296  public void append(final Cell cell) throws IOException {
297    appendGeneralBloomfilter(cell);
298    appendDeleteFamilyBloomFilter(cell);
299    writer.append(cell);
300    trackTimestamps(cell);
301  }
302
303  @Override
304  public void beforeShipped() throws IOException {
305    // For now these writer will always be of type ShipperListener true.
306    // TODO : Change all writers to be specifically created for compaction context
307    writer.beforeShipped();
308    if (generalBloomFilterWriter != null) {
309      generalBloomFilterWriter.beforeShipped();
310    }
311    if (deleteFamilyBloomFilterWriter != null) {
312      deleteFamilyBloomFilterWriter.beforeShipped();
313    }
314  }
315
316  public Path getPath() {
317    return this.writer.getPath();
318  }
319
320  public boolean hasGeneralBloom() {
321    return this.generalBloomFilterWriter != null;
322  }
323
324  /**
325   * For unit testing only.
326   *
327   * @return the Bloom filter used by this writer.
328   */
329  BloomFilterWriter getGeneralBloomWriter() {
330    return generalBloomFilterWriter;
331  }
332
333  private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
334    boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
335    if (haveBloom) {
336      bfw.compactBloom();
337    }
338    return haveBloom;
339  }
340
341  private boolean closeGeneralBloomFilter() throws IOException {
342    boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
343
344    // add the general Bloom filter writer and append file info
345    if (hasGeneralBloom) {
346      writer.addGeneralBloomFilter(generalBloomFilterWriter);
347      writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
348      if (bloomParam != null) {
349        writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
350      }
351      bloomContext.addLastBloomKey(writer);
352    }
353    return hasGeneralBloom;
354  }
355
356  private boolean closeDeleteFamilyBloomFilter() throws IOException {
357    boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
358
359    // add the delete family Bloom filter writer
360    if (hasDeleteFamilyBloom) {
361      writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
362    }
363
364    // append file info about the number of delete family kvs
365    // even if there is no delete family Bloom.
366    writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
367
368    return hasDeleteFamilyBloom;
369  }
370
371  public void close() throws IOException {
372    boolean hasGeneralBloom = this.closeGeneralBloomFilter();
373    boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
374
375    writer.close();
376
377    // Log final Bloom filter statistics. This needs to be done after close()
378    // because compound Bloom filters might be finalized as part of closing.
379    if (LOG.isTraceEnabled()) {
380      LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
381        (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
382        getPath());
383    }
384
385  }
386
387  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
388    writer.appendFileInfo(key, value);
389  }
390
391  /** For use in testing.
392   */
393  HFile.Writer getHFileWriter() {
394    return writer;
395  }
396
397  /**
398   * @param dir Directory to create file in.
399   * @return random filename inside passed <code>dir</code>
400   */
401  static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
402    if (!fs.getFileStatus(dir).isDirectory()) {
403      throw new IOException("Expecting " + dir.toString() + " to be a directory");
404    }
405    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
406  }
407
408  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
409      justification="Will not overflow")
410  public static class Builder {
411    private final Configuration conf;
412    private final CacheConfig cacheConf;
413    private final FileSystem fs;
414
415    private BloomType bloomType = BloomType.NONE;
416    private long maxKeyCount = 0;
417    private Path dir;
418    private Path filePath;
419    private InetSocketAddress[] favoredNodes;
420    private HFileContext fileContext;
421    private boolean shouldDropCacheBehind;
422    private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
423
424    public Builder(Configuration conf, CacheConfig cacheConf,
425        FileSystem fs) {
426      this.conf = conf;
427      this.cacheConf = cacheConf;
428      this.fs = fs;
429    }
430
431    /**
432     * Creates Builder with cache configuration disabled
433     */
434    public Builder(Configuration conf, FileSystem fs) {
435      this.conf = conf;
436      this.cacheConf = CacheConfig.DISABLED;
437      this.fs = fs;
438    }
439
440    /**
441     * Use either this method or {@link #withFilePath}, but not both.
442     * @param dir Path to column family directory. The directory is created if
443     *          does not exist. The file is given a unique name within this
444     *          directory.
445     * @return this (for chained invocation)
446     */
447    public Builder withOutputDir(Path dir) {
448      Preconditions.checkNotNull(dir);
449      this.dir = dir;
450      return this;
451    }
452
453    /**
454     * Use either this method or {@link #withOutputDir}, but not both.
455     * @param filePath the StoreFile path to write
456     * @return this (for chained invocation)
457     */
458    public Builder withFilePath(Path filePath) {
459      Preconditions.checkNotNull(filePath);
460      this.filePath = filePath;
461      return this;
462    }
463
464    /**
465     * @param favoredNodes an array of favored nodes or possibly null
466     * @return this (for chained invocation)
467     */
468    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
469      this.favoredNodes = favoredNodes;
470      return this;
471    }
472
473    public Builder withBloomType(BloomType bloomType) {
474      Preconditions.checkNotNull(bloomType);
475      this.bloomType = bloomType;
476      return this;
477    }
478
479    /**
480     * @param maxKeyCount estimated maximum number of keys we expect to add
481     * @return this (for chained invocation)
482     */
483    public Builder withMaxKeyCount(long maxKeyCount) {
484      this.maxKeyCount = maxKeyCount;
485      return this;
486    }
487
488    public Builder withFileContext(HFileContext fileContext) {
489      this.fileContext = fileContext;
490      return this;
491    }
492
493    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
494      this.shouldDropCacheBehind = shouldDropCacheBehind;
495      return this;
496    }
497
498    public Builder withCompactedFilesSupplier(
499        Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
500      this.compactedFilesSupplier = compactedFilesSupplier;
501      return this;
502    }
503
504    /**
505     * Create a store file writer. Client is responsible for closing file when
506     * done. If metadata, add BEFORE closing using
507     * {@link StoreFileWriter#appendMetadata}.
508     */
509    public StoreFileWriter build() throws IOException {
510      if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
511        throw new IllegalArgumentException("Either specify parent directory " +
512            "or file path");
513      }
514
515      if (dir == null) {
516        dir = filePath.getParent();
517      }
518
519      if (!fs.exists(dir)) {
520        // Handle permission for non-HDFS filesystem properly
521        // See HBASE-17710
522        HRegionFileSystem.mkdirs(fs, conf, dir);
523      }
524
525      // set block storage policy for temp path
526      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
527      if (null == policyName) {
528        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
529      }
530      CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
531
532      if (filePath == null) {
533        filePath = getUniqueFile(fs, dir);
534        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
535          bloomType = BloomType.NONE;
536        }
537      }
538
539      return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
540          favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
541    }
542  }
543}