001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
021
022import java.io.DataOutput;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.permission.FsPermission;
034import org.apache.hadoop.hbase.ByteBufferExtendedCell;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellComparator;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.KeyValueUtil;
041import org.apache.hadoop.hbase.MetaCellComparator;
042import org.apache.hadoop.hbase.PrivateCellUtil;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.io.crypto.Encryption;
045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
046import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
047import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
048import org.apache.hadoop.hbase.security.EncryptionUtil;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.BloomFilterWriter;
051import org.apache.hadoop.hbase.util.ByteBufferUtils;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FSUtils;
056import org.apache.hadoop.io.Writable;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Common functionality needed by all versions of {@link HFile} writers.
063 */
064@InterfaceAudience.Private
065public class HFileWriterImpl implements HFile.Writer {
066  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
067
068  private static final long UNSET = -1;
069
070  /** if this feature is enabled, preCalculate encoded data size before real encoding happens */
071  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
072    "hbase.writer.unified.encoded.blocksize.ratio";
073
074  /** Block size limit after encoding, used to unify encoded block Cache entry size */
075  private final int encodedBlockSizeLimit;
076
077  /** The Cell previously appended. Becomes the last cell in the file. */
078  protected Cell lastCell = null;
079
080  /** FileSystem stream to write into. */
081  protected FSDataOutputStream outputStream;
082
083  /** True if we opened the <code>outputStream</code> (and so will close it). */
084  protected final boolean closeOutputStream;
085
086  /** A "file info" block: a key-value map of file-wide metadata. */
087  protected HFileInfo fileInfo = new HFileInfo();
088
089  /** Total # of key/value entries, i.e. how many times add() was called. */
090  protected long entryCount = 0;
091
092  /** Used for calculating the average key length. */
093  protected long totalKeyLength = 0;
094
095  /** Used for calculating the average value length. */
096  protected long totalValueLength = 0;
097
098  /** Len of the biggest cell. */
099  protected long lenOfBiggestCell = 0;
100  /** Key of the biggest cell. */
101  protected byte[] keyOfBiggestCell;
102
103  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
104  protected long totalUncompressedBytes = 0;
105
106  /** Meta block names. */
107  protected List<byte[]> metaNames = new ArrayList<>();
108
109  /** {@link Writable}s representing meta block data. */
110  protected List<Writable> metaData = new ArrayList<>();
111
112  /**
113   * First cell in a block. This reference should be short-lived since we write hfiles in a burst.
114   */
115  protected Cell firstCellInBlock = null;
116
117  /** May be null if we were passed a stream. */
118  protected final Path path;
119
120  /** Cache configuration for caching data on write. */
121  protected final CacheConfig cacheConf;
122
123  /**
124   * Name for this object used when logging or in toString. Is either the result of a toString on
125   * stream or else name of passed file Path.
126   */
127  protected final String name;
128
129  /**
130   * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is
131   * no encoding.
132   */
133  protected final HFileDataBlockEncoder blockEncoder;
134
135  protected final HFileIndexBlockEncoder indexBlockEncoder;
136
137  protected final HFileContext hFileContext;
138
139  private int maxTagsLength = 0;
140
141  /** KeyValue version in FileInfo */
142  public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
143
144  /** Version for KeyValue which includes memstore timestamp */
145  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
146
147  /** Inline block writers for multi-level block index and compound Blooms. */
148  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
149
150  /** block writer */
151  protected HFileBlock.Writer blockWriter;
152
153  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
154  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
155
156  /** The offset of the first data block or -1 if the file is empty. */
157  private long firstDataBlockOffset = UNSET;
158
159  /** The offset of the last data block or 0 if the file is empty. */
160  protected long lastDataBlockOffset = UNSET;
161
162  /**
163   * The last(stop) Cell of the previous data block. This reference should be short-lived since we
164   * write hfiles in a burst.
165   */
166  private Cell lastCellOfPreviousBlock = null;
167
168  /** Additional data items to be written to the "load-on-open" section. */
169  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
170
171  protected long maxMemstoreTS = 0;
172
173  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
174    FSDataOutputStream outputStream, HFileContext fileContext) {
175    this.outputStream = outputStream;
176    this.path = path;
177    this.name = path != null ? path.getName() : outputStream.toString();
178    this.hFileContext = fileContext;
179    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
180    if (encoding != DataBlockEncoding.NONE) {
181      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
182    } else {
183      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
184    }
185    IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding();
186    if (indexBlockEncoding != IndexBlockEncoding.NONE) {
187      this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding);
188    } else {
189      this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE;
190    }
191    closeOutputStream = path != null;
192    this.cacheConf = cacheConf;
193    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
194    this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
195
196    finishInit(conf);
197    if (LOG.isTraceEnabled()) {
198      LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
199        + cacheConf + " fileContext: " + fileContext);
200    }
201  }
202
203  /**
204   * Add to the file info. All added key/value pairs can be obtained using
205   * {@link HFile.Reader#getHFileInfo()}.
206   * @param k Key
207   * @param v Value
208   * @throws IOException in case the key or the value are invalid
209   */
210  @Override
211  public void appendFileInfo(final byte[] k, final byte[] v) throws IOException {
212    fileInfo.append(k, v, true);
213  }
214
215  /**
216   * Sets the file info offset in the trailer, finishes up populating fields in the file info, and
217   * writes the file info into the given data output. The reason the data output is not always
218   * {@link #outputStream} is that we store file info as a block in version 2.
219   * @param trailer fixed file trailer
220   * @param out     the data output to write the file info to
221   */
222  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
223    throws IOException {
224    trailer.setFileInfoOffset(outputStream.getPos());
225    finishFileInfo();
226    long startTime = EnvironmentEdgeManager.currentTime();
227    fileInfo.write(out);
228    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
229  }
230
231  public long getPos() throws IOException {
232    return outputStream.getPos();
233
234  }
235
236  /**
237   * Checks that the given Cell's key does not violate the key order.
238   * @param cell Cell whose key to check.
239   * @return true if the key is duplicate
240   * @throws IOException if the key or the key order is wrong
241   */
242  protected boolean checkKey(final Cell cell) throws IOException {
243    boolean isDuplicateKey = false;
244
245    if (cell == null) {
246      throw new IOException("Key cannot be null or empty");
247    }
248    if (lastCell != null) {
249      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
250        lastCell, cell);
251      if (keyComp > 0) {
252        String message = getLexicalErrorMessage(cell);
253        throw new IOException(message);
254      } else if (keyComp == 0) {
255        isDuplicateKey = true;
256      }
257    }
258    return isDuplicateKey;
259  }
260
261  private String getLexicalErrorMessage(Cell cell) {
262    StringBuilder sb = new StringBuilder();
263    sb.append("Added a key not lexically larger than previous. Current cell = ");
264    sb.append(cell);
265    sb.append(", lastCell = ");
266    sb.append(lastCell);
267    // file context includes HFile path and optionally table and CF of file being written
268    sb.append("fileContext=");
269    sb.append(hFileContext);
270    return sb.toString();
271  }
272
273  /** Checks the given value for validity. */
274  protected void checkValue(final byte[] value, final int offset, final int length)
275    throws IOException {
276    if (value == null) {
277      throw new IOException("Value cannot be null");
278    }
279  }
280
281  /** Returns Path or null if we were passed a stream rather than a Path. */
282  @Override
283  public Path getPath() {
284    return path;
285  }
286
287  @Override
288  public String toString() {
289    return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression="
290      + hFileContext.getCompression().getName();
291  }
292
293  public static Compression.Algorithm compressionByName(String algoName) {
294    if (algoName == null) {
295      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
296    }
297    return Compression.getCompressionAlgorithmByName(algoName);
298  }
299
300  /** A helper method to create HFile output streams in constructors */
301  protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs,
302    Path path, InetSocketAddress[] favoredNodes) throws IOException {
303    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
304    return FSUtils.create(conf, fs, path, perms, favoredNodes);
305  }
306
307  /** Additional initialization steps */
308  protected void finishInit(final Configuration conf) {
309    if (blockWriter != null) {
310      throw new IllegalStateException("finishInit called twice");
311    }
312    blockWriter =
313      new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
314        conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
315    // Data block index writer
316    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
317    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
318      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder);
319    dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
320    dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
321    inlineBlockWriters.add(dataBlockIndexWriter);
322
323    // Meta data block index writer
324    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
325    LOG.trace("Initialized with {}", cacheConf);
326  }
327
328  /**
329   * At a block boundary, write all the inline blocks and opens new block.
330   */
331  protected void checkBlockBoundary() throws IOException {
332    boolean shouldFinishBlock = false;
333    // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0
334    // and we should use the encoding ratio
335    if (encodedBlockSizeLimit > 0) {
336      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit;
337    } else {
338      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
339        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
340    }
341    shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
342    if (shouldFinishBlock) {
343      finishBlock();
344      writeInlineBlocks(false);
345      newBlock();
346    }
347  }
348
349  /** Clean up the data block that is currently being written. */
350  private void finishBlock() throws IOException {
351    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
352      return;
353    }
354
355    // Update the first data block offset if UNSET; used scanning.
356    if (firstDataBlockOffset == UNSET) {
357      firstDataBlockOffset = outputStream.getPos();
358    }
359    // Update the last data block offset each time through here.
360    lastDataBlockOffset = outputStream.getPos();
361    blockWriter.writeHeaderAndData(outputStream);
362    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
363    Cell indexEntry =
364      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
365    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
366      lastDataBlockOffset, onDiskSize);
367    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
368    if (cacheConf.shouldCacheDataOnWrite()) {
369      doCacheOnWrite(lastDataBlockOffset);
370    }
371  }
372
373  /**
374   * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is
375   * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an
376   * optimization. It does not always work. In this case we'll just return the <code>right</code>
377   * cell.
378   * @return A cell that sorts between <code>left</code> and <code>right</code>.
379   */
380  public static Cell getMidpoint(final CellComparator comparator, final Cell left,
381    final Cell right) {
382    if (right == null) {
383      throw new IllegalArgumentException("right cell can not be null");
384    }
385    if (left == null) {
386      return right;
387    }
388    // If Cells from meta table, don't mess around. meta table Cells have schema
389    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
390    // out without trying to do this optimization.
391    if (comparator instanceof MetaCellComparator) {
392      return right;
393    }
394    byte[] midRow;
395    boolean bufferBacked =
396      left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell;
397    if (bufferBacked) {
398      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
399        ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
400        ((ByteBufferExtendedCell) right).getRowByteBuffer(),
401        ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
402    } else {
403      midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
404        right.getRowArray(), right.getRowOffset(), right.getRowLength());
405    }
406    if (midRow != null) {
407      return PrivateCellUtil.createFirstOnRow(midRow);
408    }
409    // Rows are same. Compare on families.
410    if (bufferBacked) {
411      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
412        ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
413        ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
414        ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
415    } else {
416      midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
417        left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
418        right.getFamilyLength());
419    }
420    if (midRow != null) {
421      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
422    }
423    // Families are same. Compare on qualifiers.
424    if (bufferBacked) {
425      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
426        ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
427        ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
428        ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
429    } else {
430      midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
431        left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
432        right.getQualifierLength());
433    }
434    if (midRow != null) {
435      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
436    }
437    // No opportunity for optimization. Just return right key.
438    return right;
439  }
440
441  /**
442   * Try to get a byte array that falls between left and right as short as possible with
443   * lexicographical order;
444   * @return Return a new array that is between left and right and minimally sized else just return
445   *         null if left == right.
446   */
447  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
448    final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
449    int minLength = leftLength < rightLength ? leftLength : rightLength;
450    int diffIdx = 0;
451    for (; diffIdx < minLength; diffIdx++) {
452      byte leftByte = leftArray[leftOffset + diffIdx];
453      byte rightByte = rightArray[rightOffset + diffIdx];
454      if ((leftByte & 0xff) > (rightByte & 0xff)) {
455        throw new IllegalArgumentException("Left byte array sorts after right row; left="
456          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
457          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
458      } else if (leftByte != rightByte) {
459        break;
460      }
461    }
462    if (diffIdx == minLength) {
463      if (leftLength > rightLength) {
464        // right is prefix of left
465        throw new IllegalArgumentException("Left byte array sorts after right row; left="
466          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
467          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
468      } else if (leftLength < rightLength) {
469        // left is prefix of right.
470        byte[] minimumMidpointArray = new byte[minLength + 1];
471        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1);
472        minimumMidpointArray[minLength] = 0x00;
473        return minimumMidpointArray;
474      } else {
475        // left == right
476        return null;
477      }
478    }
479    // Note that left[diffIdx] can never be equal to 0xff since left < right
480    byte[] minimumMidpointArray = new byte[diffIdx + 1];
481    System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1);
482    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
483    return minimumMidpointArray;
484  }
485
486  /**
487   * Try to create a new byte array that falls between left and right as short as possible with
488   * lexicographical order.
489   * @return Return a new array that is between left and right and minimally sized else just return
490   *         null if left == right.
491   */
492  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
493    ByteBuffer right, int rightOffset, int rightLength) {
494    int minLength = leftLength < rightLength ? leftLength : rightLength;
495    int diffIdx = 0;
496    for (; diffIdx < minLength; diffIdx++) {
497      int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
498      int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx);
499      if ((leftByte & 0xff) > (rightByte & 0xff)) {
500        throw new IllegalArgumentException("Left byte array sorts after right row; left="
501          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
502          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
503      } else if (leftByte != rightByte) {
504        break;
505      }
506    }
507    if (diffIdx == minLength) {
508      if (leftLength > rightLength) {
509        // right is prefix of left
510        throw new IllegalArgumentException("Left byte array sorts after right row; left="
511          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
512          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
513      } else if (leftLength < rightLength) {
514        // left is prefix of right.
515        byte[] minimumMidpointArray = new byte[minLength + 1];
516        ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0,
517          minLength + 1);
518        minimumMidpointArray[minLength] = 0x00;
519        return minimumMidpointArray;
520      } else {
521        // left == right
522        return null;
523      }
524    }
525    // Note that left[diffIdx] can never be equal to 0xff since left < right
526    byte[] minimumMidpointArray = new byte[diffIdx + 1];
527    ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1);
528    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
529    return minimumMidpointArray;
530  }
531
532  /** Gives inline block writers an opportunity to contribute blocks. */
533  private void writeInlineBlocks(boolean closing) throws IOException {
534    for (InlineBlockWriter ibw : inlineBlockWriters) {
535      while (ibw.shouldWriteBlock(closing)) {
536        long offset = outputStream.getPos();
537        boolean cacheThisBlock = ibw.getCacheOnWrite();
538        ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType()));
539        blockWriter.writeHeaderAndData(outputStream);
540        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
541          blockWriter.getUncompressedSizeWithoutHeader());
542        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
543
544        if (cacheThisBlock) {
545          doCacheOnWrite(offset);
546        }
547      }
548    }
549  }
550
551  /**
552   * Caches the last written HFile block.
553   * @param offset the offset of the block we want to cache. Used to determine the cache key.
554   */
555  private void doCacheOnWrite(long offset) {
556    cacheConf.getBlockCache().ifPresent(cache -> {
557      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
558      try {
559        cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
560          cacheFormatBlock, cacheConf.isInMemory(), true);
561      } finally {
562        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
563        cacheFormatBlock.release();
564      }
565    });
566  }
567
568  /**
569   * Ready a new block for writing.
570   */
571  protected void newBlock() throws IOException {
572    // This is where the next block begins.
573    blockWriter.startWriting(BlockType.DATA);
574    firstCellInBlock = null;
575    if (lastCell != null) {
576      lastCellOfPreviousBlock = lastCell;
577    }
578  }
579
580  /**
581   * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive.
582   * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance.
583   * If metadata is small, consider adding to file info using
584   * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data
585   * later (DO NOT REUSE)
586   */
587  @Override
588  public void appendMetaBlock(String metaBlockName, Writable content) {
589    byte[] key = Bytes.toBytes(metaBlockName);
590    int i;
591    for (i = 0; i < metaNames.size(); ++i) {
592      // stop when the current key is greater than our own
593      byte[] cur = metaNames.get(i);
594      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) {
595        break;
596      }
597    }
598    metaNames.add(i, key);
599    metaData.add(i, content);
600  }
601
602  @Override
603  public void close() throws IOException {
604    if (outputStream == null) {
605      return;
606    }
607    // Save data block encoder metadata in the file info.
608    blockEncoder.saveMetadata(this);
609    // Save index block encoder metadata in the file info.
610    indexBlockEncoder.saveMetadata(this);
611    // Write out the end of the data blocks, then write meta data blocks.
612    // followed by fileinfo, data block index and meta block index.
613
614    finishBlock();
615    writeInlineBlocks(true);
616
617    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
618
619    // Write out the metadata blocks if any.
620    if (!metaNames.isEmpty()) {
621      for (int i = 0; i < metaNames.size(); ++i) {
622        // store the beginning offset
623        long offset = outputStream.getPos();
624        // write the metadata content
625        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
626        metaData.get(i).write(dos);
627
628        blockWriter.writeHeaderAndData(outputStream);
629        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
630
631        // Add the new meta block to the meta index.
632        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
633          blockWriter.getOnDiskSizeWithHeader());
634      }
635    }
636
637    // Load-on-open section.
638
639    // Data block index.
640    //
641    // In version 2, this section of the file starts with the root level data
642    // block index. We call a function that writes intermediate-level blocks
643    // first, then root level, and returns the offset of the root level block
644    // index.
645
646    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
647    trailer.setLoadOnOpenOffset(rootIndexOffset);
648
649    // Meta block index.
650    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX),
651      "meta");
652    blockWriter.writeHeaderAndData(outputStream);
653    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
654
655    if (this.hFileContext.isIncludesMvcc()) {
656      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
657      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
658    }
659
660    // File info
661    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
662    blockWriter.writeHeaderAndData(outputStream);
663    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
664
665    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
666    for (BlockWritable w : additionalLoadOnOpenData) {
667      blockWriter.writeBlock(w, outputStream);
668      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
669    }
670
671    // Now finish off the trailer.
672    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
673    trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize());
674    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
675    trailer.setLastDataBlockOffset(lastDataBlockOffset);
676    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
677    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
678
679    finishClose(trailer);
680
681    blockWriter.release();
682  }
683
684  @Override
685  public void addInlineBlockWriter(InlineBlockWriter ibw) {
686    inlineBlockWriters.add(ibw);
687  }
688
689  @Override
690  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
691    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
692  }
693
694  @Override
695  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
696    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
697  }
698
699  private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) {
700    if (bfw.getKeyCount() <= 0) {
701      return;
702    }
703
704    if (
705      blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META
706    ) {
707      throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported");
708    }
709    additionalLoadOnOpenData.add(new BlockWritable() {
710      @Override
711      public BlockType getBlockType() {
712        return blockType;
713      }
714
715      @Override
716      public void writeToBlock(DataOutput out) throws IOException {
717        bfw.getMetaWriter().write(out);
718        Writable dataWriter = bfw.getDataWriter();
719        if (dataWriter != null) {
720          dataWriter.write(out);
721        }
722      }
723    });
724  }
725
726  @Override
727  public HFileContext getFileContext() {
728    return hFileContext;
729  }
730
731  /**
732   * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on
733   * construction. Cell to add. Cannot be empty nor null.
734   */
735  @Override
736  public void append(final Cell cell) throws IOException {
737    // checkKey uses comparator to check we are writing in order.
738    boolean dupKey = checkKey(cell);
739    if (!dupKey) {
740      checkBlockBoundary();
741    }
742
743    if (!blockWriter.isWriting()) {
744      newBlock();
745    }
746
747    blockWriter.write(cell);
748
749    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
750    totalValueLength += cell.getValueLength();
751    if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) {
752      lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell);
753      keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell);
754    }
755    // Are we the first key in this block?
756    if (firstCellInBlock == null) {
757      // If cell is big, block will be closed and this firstCellInBlock reference will only last
758      // a short while.
759      firstCellInBlock = cell;
760    }
761
762    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
763    lastCell = cell;
764    entryCount++;
765    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
766    int tagsLength = cell.getTagsLength();
767    if (tagsLength > this.maxTagsLength) {
768      this.maxTagsLength = tagsLength;
769    }
770  }
771
772  @Override
773  public void beforeShipped() throws IOException {
774    this.blockWriter.beforeShipped();
775    // Add clone methods for every cell
776    if (this.lastCell != null) {
777      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
778    }
779    if (this.firstCellInBlock != null) {
780      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
781    }
782    if (this.lastCellOfPreviousBlock != null) {
783      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
784    }
785  }
786
787  public Cell getLastCell() {
788    return lastCell;
789  }
790
791  protected void finishFileInfo() throws IOException {
792    if (lastCell != null) {
793      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
794      // byte buffer. Won't take a tuple.
795      byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
796      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
797    }
798
799    // Average key length.
800    int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
801    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
802    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
803      false);
804
805    // Average value length.
806    int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
807    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
808
809    // Biggest cell.
810    if (keyOfBiggestCell != null) {
811      fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false);
812      fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false);
813      LOG.debug("Len of the biggest cell in {} is {}, key is {}",
814        this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell,
815        CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false));
816    }
817
818    if (hFileContext.isIncludesTags()) {
819      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
820      // from the FileInfo
821      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
822      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
823        && hFileContext.isCompressTags();
824      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
825    }
826  }
827
828  protected int getMajorVersion() {
829    return 3;
830  }
831
832  protected int getMinorVersion() {
833    return HFileReaderImpl.MAX_MINOR_VERSION;
834  }
835
836  protected void finishClose(FixedFileTrailer trailer) throws IOException {
837    // Write out encryption metadata before finalizing if we have a valid crypto context
838    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
839    if (cryptoContext != Encryption.Context.NONE) {
840      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
841      // all information needed for decryption
842      trailer.setEncryptionKey(EncryptionUtil.wrapKey(
843        cryptoContext.getConf(), cryptoContext.getConf()
844          .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()),
845        cryptoContext.getKey()));
846    }
847    // Now we can finish the close
848    trailer.setMetaIndexCount(metaNames.size());
849    trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize());
850    trailer.setEntryCount(entryCount);
851    trailer.setCompressionCodec(hFileContext.getCompression());
852
853    long startTime = EnvironmentEdgeManager.currentTime();
854    trailer.serialize(outputStream);
855    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
856
857    if (closeOutputStream) {
858      outputStream.close();
859      outputStream = null;
860    }
861  }
862}