001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.DataOutput;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FSDataOutputStream;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.fs.permission.FsPermission;
032import org.apache.hadoop.hbase.ByteBufferExtendedCell;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValueUtil;
037import org.apache.hadoop.hbase.MetaCellComparator;
038import org.apache.hadoop.hbase.PrivateCellUtil;
039import org.apache.hadoop.hbase.io.compress.Compression;
040import org.apache.hadoop.hbase.io.crypto.Encryption;
041import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
042import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
043import org.apache.hadoop.hbase.security.EncryptionUtil;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.util.BloomFilterWriter;
046import org.apache.hadoop.hbase.util.ByteBufferUtils;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.FSUtils;
051import org.apache.hadoop.io.Writable;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Common functionality needed by all versions of {@link HFile} writers.
058 */
059@InterfaceAudience.Private
060public class HFileWriterImpl implements HFile.Writer {
061  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
062
063  private static final long UNSET = -1;
064
065  /** if this feature is enabled, preCalculate encoded data size before real encoding happens */
066  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
067    "hbase.writer.unified.encoded.blocksize.ratio";
068
069  /** Block size limit after encoding, used to unify encoded block Cache entry size */
070  private final int encodedBlockSizeLimit;
071
072  /** The Cell previously appended. Becomes the last cell in the file. */
073  protected Cell lastCell = null;
074
075  /** FileSystem stream to write into. */
076  protected FSDataOutputStream outputStream;
077
078  /** True if we opened the <code>outputStream</code> (and so will close it). */
079  protected final boolean closeOutputStream;
080
081  /** A "file info" block: a key-value map of file-wide metadata. */
082  protected HFileInfo fileInfo = new HFileInfo();
083
084  /** Total # of key/value entries, i.e. how many times add() was called. */
085  protected long entryCount = 0;
086
087  /** Used for calculating the average key length. */
088  protected long totalKeyLength = 0;
089
090  /** Used for calculating the average value length. */
091  protected long totalValueLength = 0;
092
093  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
094  protected long totalUncompressedBytes = 0;
095
096  /** Meta block names. */
097  protected List<byte[]> metaNames = new ArrayList<>();
098
099  /** {@link Writable}s representing meta block data. */
100  protected List<Writable> metaData = new ArrayList<>();
101
102  /**
103   * First cell in a block. This reference should be short-lived since we write hfiles in a burst.
104   */
105  protected Cell firstCellInBlock = null;
106
107  /** May be null if we were passed a stream. */
108  protected final Path path;
109
110  /** Cache configuration for caching data on write. */
111  protected final CacheConfig cacheConf;
112
113  /**
114   * Name for this object used when logging or in toString. Is either the result of a toString on
115   * stream or else name of passed file Path.
116   */
117  protected final String name;
118
119  /**
120   * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is
121   * no encoding.
122   */
123  protected final HFileDataBlockEncoder blockEncoder;
124
125  protected final HFileContext hFileContext;
126
127  private int maxTagsLength = 0;
128
129  /** KeyValue version in FileInfo */
130  public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
131
132  /** Version for KeyValue which includes memstore timestamp */
133  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
134
135  /** Inline block writers for multi-level block index and compound Blooms. */
136  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
137
138  /** block writer */
139  protected HFileBlock.Writer blockWriter;
140
141  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
142  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
143
144  /** The offset of the first data block or -1 if the file is empty. */
145  private long firstDataBlockOffset = UNSET;
146
147  /** The offset of the last data block or 0 if the file is empty. */
148  protected long lastDataBlockOffset = UNSET;
149
150  /**
151   * The last(stop) Cell of the previous data block. This reference should be short-lived since we
152   * write hfiles in a burst.
153   */
154  private Cell lastCellOfPreviousBlock = null;
155
156  /** Additional data items to be written to the "load-on-open" section. */
157  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
158
159  protected long maxMemstoreTS = 0;
160
161  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
162    FSDataOutputStream outputStream, HFileContext fileContext) {
163    this.outputStream = outputStream;
164    this.path = path;
165    this.name = path != null ? path.getName() : outputStream.toString();
166    this.hFileContext = fileContext;
167    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
168    if (encoding != DataBlockEncoding.NONE) {
169      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
170    } else {
171      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
172    }
173    closeOutputStream = path != null;
174    this.cacheConf = cacheConf;
175    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f);
176    this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
177    finishInit(conf);
178    if (LOG.isTraceEnabled()) {
179      LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
180        + cacheConf + " fileContext: " + fileContext);
181    }
182  }
183
184  /**
185   * Add to the file info. All added key/value pairs can be obtained using
186   * {@link HFile.Reader#getHFileInfo()}.
187   * @param k Key
188   * @param v Value
189   * @throws IOException in case the key or the value are invalid
190   */
191  @Override
192  public void appendFileInfo(final byte[] k, final byte[] v) throws IOException {
193    fileInfo.append(k, v, true);
194  }
195
196  /**
197   * Sets the file info offset in the trailer, finishes up populating fields in the file info, and
198   * writes the file info into the given data output. The reason the data output is not always
199   * {@link #outputStream} is that we store file info as a block in version 2.
200   * @param trailer fixed file trailer
201   * @param out     the data output to write the file info to
202   */
203  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
204    throws IOException {
205    trailer.setFileInfoOffset(outputStream.getPos());
206    finishFileInfo();
207    long startTime = EnvironmentEdgeManager.currentTime();
208    fileInfo.write(out);
209    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
210  }
211
212  public long getPos() throws IOException {
213    return outputStream.getPos();
214
215  }
216
217  /**
218   * Checks that the given Cell's key does not violate the key order.
219   * @param cell Cell whose key to check.
220   * @return true if the key is duplicate
221   * @throws IOException if the key or the key order is wrong
222   */
223  protected boolean checkKey(final Cell cell) throws IOException {
224    boolean isDuplicateKey = false;
225
226    if (cell == null) {
227      throw new IOException("Key cannot be null or empty");
228    }
229    if (lastCell != null) {
230      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
231        lastCell, cell);
232      if (keyComp > 0) {
233        String message = getLexicalErrorMessage(cell);
234        throw new IOException(message);
235      } else if (keyComp == 0) {
236        isDuplicateKey = true;
237      }
238    }
239    return isDuplicateKey;
240  }
241
242  private String getLexicalErrorMessage(Cell cell) {
243    StringBuilder sb = new StringBuilder();
244    sb.append("Added a key not lexically larger than previous. Current cell = ");
245    sb.append(cell);
246    sb.append(", lastCell = ");
247    sb.append(lastCell);
248    // file context includes HFile path and optionally table and CF of file being written
249    sb.append("fileContext=");
250    sb.append(hFileContext);
251    return sb.toString();
252  }
253
254  /** Checks the given value for validity. */
255  protected void checkValue(final byte[] value, final int offset, final int length)
256    throws IOException {
257    if (value == null) {
258      throw new IOException("Value cannot be null");
259    }
260  }
261
262  /**
263   * @return Path or null if we were passed a stream rather than a Path.
264   */
265  @Override
266  public Path getPath() {
267    return path;
268  }
269
270  @Override
271  public String toString() {
272    return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression="
273      + hFileContext.getCompression().getName();
274  }
275
276  public static Compression.Algorithm compressionByName(String algoName) {
277    if (algoName == null) {
278      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
279    }
280    return Compression.getCompressionAlgorithmByName(algoName);
281  }
282
283  /** A helper method to create HFile output streams in constructors */
284  protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs,
285    Path path, InetSocketAddress[] favoredNodes) throws IOException {
286    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
287    return FSUtils.create(conf, fs, path, perms, favoredNodes);
288  }
289
290  /** Additional initialization steps */
291  protected void finishInit(final Configuration conf) {
292    if (blockWriter != null) {
293      throw new IllegalStateException("finishInit called twice");
294    }
295    blockWriter =
296      new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
297    // Data block index writer
298    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
299    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
300      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null);
301    dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
302    dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
303    inlineBlockWriters.add(dataBlockIndexWriter);
304
305    // Meta data block index writer
306    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
307    LOG.trace("Initialized with {}", cacheConf);
308  }
309
310  /**
311   * At a block boundary, write all the inline blocks and opens new block.
312   */
313  protected void checkBlockBoundary() throws IOException {
314    // For encoder like prefixTree, encoded size is not available, so we have to compare both
315    // encoded size and unencoded size to blocksize limit.
316    if (
317      blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit
318        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()
319    ) {
320      finishBlock();
321      writeInlineBlocks(false);
322      newBlock();
323    }
324  }
325
326  /** Clean up the data block that is currently being written. */
327  private void finishBlock() throws IOException {
328    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
329      return;
330    }
331
332    // Update the first data block offset if UNSET; used scanning.
333    if (firstDataBlockOffset == UNSET) {
334      firstDataBlockOffset = outputStream.getPos();
335    }
336    // Update the last data block offset each time through here.
337    lastDataBlockOffset = outputStream.getPos();
338    blockWriter.writeHeaderAndData(outputStream);
339    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
340    Cell indexEntry =
341      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
342    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
343      lastDataBlockOffset, onDiskSize);
344    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
345    if (cacheConf.shouldCacheDataOnWrite()) {
346      doCacheOnWrite(lastDataBlockOffset);
347    }
348  }
349
350  /**
351   * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is
352   * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an
353   * optimization. It does not always work. In this case we'll just return the <code>right</code>
354   * cell.
355   * @return A cell that sorts between <code>left</code> and <code>right</code>.
356   */
357  public static Cell getMidpoint(final CellComparator comparator, final Cell left,
358    final Cell right) {
359    if (right == null) {
360      throw new IllegalArgumentException("right cell can not be null");
361    }
362    if (left == null) {
363      return right;
364    }
365    // If Cells from meta table, don't mess around. meta table Cells have schema
366    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
367    // out without trying to do this optimization.
368    if (comparator instanceof MetaCellComparator) {
369      return right;
370    }
371    byte[] midRow;
372    boolean bufferBacked =
373      left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell;
374    if (bufferBacked) {
375      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
376        ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
377        ((ByteBufferExtendedCell) right).getRowByteBuffer(),
378        ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
379    } else {
380      midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
381        right.getRowArray(), right.getRowOffset(), right.getRowLength());
382    }
383    if (midRow != null) {
384      return PrivateCellUtil.createFirstOnRow(midRow);
385    }
386    // Rows are same. Compare on families.
387    if (bufferBacked) {
388      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
389        ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
390        ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
391        ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
392    } else {
393      midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
394        left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
395        right.getFamilyLength());
396    }
397    if (midRow != null) {
398      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
399    }
400    // Families are same. Compare on qualifiers.
401    if (bufferBacked) {
402      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
403        ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
404        ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
405        ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
406    } else {
407      midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
408        left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
409        right.getQualifierLength());
410    }
411    if (midRow != null) {
412      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
413    }
414    // No opportunity for optimization. Just return right key.
415    return right;
416  }
417
418  /**
419   * Try to get a byte array that falls between left and right as short as possible with
420   * lexicographical order;
421   * @return Return a new array that is between left and right and minimally sized else just return
422   *         null if left == right.
423   */
424  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
425    final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
426    int minLength = leftLength < rightLength ? leftLength : rightLength;
427    int diffIdx = 0;
428    for (; diffIdx < minLength; diffIdx++) {
429      byte leftByte = leftArray[leftOffset + diffIdx];
430      byte rightByte = rightArray[rightOffset + diffIdx];
431      if ((leftByte & 0xff) > (rightByte & 0xff)) {
432        throw new IllegalArgumentException("Left byte array sorts after right row; left="
433          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
434          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
435      } else if (leftByte != rightByte) {
436        break;
437      }
438    }
439    if (diffIdx == minLength) {
440      if (leftLength > rightLength) {
441        // right is prefix of left
442        throw new IllegalArgumentException("Left byte array sorts after right row; left="
443          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
444          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
445      } else if (leftLength < rightLength) {
446        // left is prefix of right.
447        byte[] minimumMidpointArray = new byte[minLength + 1];
448        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1);
449        minimumMidpointArray[minLength] = 0x00;
450        return minimumMidpointArray;
451      } else {
452        // left == right
453        return null;
454      }
455    }
456    // Note that left[diffIdx] can never be equal to 0xff since left < right
457    byte[] minimumMidpointArray = new byte[diffIdx + 1];
458    System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1);
459    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
460    return minimumMidpointArray;
461  }
462
463  /**
464   * Try to create a new byte array that falls between left and right as short as possible with
465   * lexicographical order.
466   * @return Return a new array that is between left and right and minimally sized else just return
467   *         null if left == right.
468   */
469  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
470    ByteBuffer right, int rightOffset, int rightLength) {
471    int minLength = leftLength < rightLength ? leftLength : rightLength;
472    int diffIdx = 0;
473    for (; diffIdx < minLength; diffIdx++) {
474      int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
475      int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx);
476      if ((leftByte & 0xff) > (rightByte & 0xff)) {
477        throw new IllegalArgumentException("Left byte array sorts after right row; left="
478          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
479          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
480      } else if (leftByte != rightByte) {
481        break;
482      }
483    }
484    if (diffIdx == minLength) {
485      if (leftLength > rightLength) {
486        // right is prefix of left
487        throw new IllegalArgumentException("Left byte array sorts after right row; left="
488          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
489          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
490      } else if (leftLength < rightLength) {
491        // left is prefix of right.
492        byte[] minimumMidpointArray = new byte[minLength + 1];
493        ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0,
494          minLength + 1);
495        minimumMidpointArray[minLength] = 0x00;
496        return minimumMidpointArray;
497      } else {
498        // left == right
499        return null;
500      }
501    }
502    // Note that left[diffIdx] can never be equal to 0xff since left < right
503    byte[] minimumMidpointArray = new byte[diffIdx + 1];
504    ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1);
505    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
506    return minimumMidpointArray;
507  }
508
509  /** Gives inline block writers an opportunity to contribute blocks. */
510  private void writeInlineBlocks(boolean closing) throws IOException {
511    for (InlineBlockWriter ibw : inlineBlockWriters) {
512      while (ibw.shouldWriteBlock(closing)) {
513        long offset = outputStream.getPos();
514        boolean cacheThisBlock = ibw.getCacheOnWrite();
515        ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType()));
516        blockWriter.writeHeaderAndData(outputStream);
517        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
518          blockWriter.getUncompressedSizeWithoutHeader());
519        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
520
521        if (cacheThisBlock) {
522          doCacheOnWrite(offset);
523        }
524      }
525    }
526  }
527
528  /**
529   * Caches the last written HFile block.
530   * @param offset the offset of the block we want to cache. Used to determine the cache key.
531   */
532  private void doCacheOnWrite(long offset) {
533    cacheConf.getBlockCache().ifPresent(cache -> {
534      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
535      try {
536        cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
537          cacheFormatBlock);
538      } finally {
539        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
540        cacheFormatBlock.release();
541      }
542    });
543  }
544
545  /**
546   * Ready a new block for writing.
547   */
548  protected void newBlock() throws IOException {
549    // This is where the next block begins.
550    blockWriter.startWriting(BlockType.DATA);
551    firstCellInBlock = null;
552    if (lastCell != null) {
553      lastCellOfPreviousBlock = lastCell;
554    }
555  }
556
557  /**
558   * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive.
559   * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance.
560   * If metadata is small, consider adding to file info using
561   * {@link #appendFileInfo(byte[], byte[])} n * name of the block n * will call readFields to get
562   * data later (DO NOT REUSE)
563   */
564  @Override
565  public void appendMetaBlock(String metaBlockName, Writable content) {
566    byte[] key = Bytes.toBytes(metaBlockName);
567    int i;
568    for (i = 0; i < metaNames.size(); ++i) {
569      // stop when the current key is greater than our own
570      byte[] cur = metaNames.get(i);
571      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) {
572        break;
573      }
574    }
575    metaNames.add(i, key);
576    metaData.add(i, content);
577  }
578
579  @Override
580  public void close() throws IOException {
581    if (outputStream == null) {
582      return;
583    }
584    // Save data block encoder metadata in the file info.
585    blockEncoder.saveMetadata(this);
586    // Write out the end of the data blocks, then write meta data blocks.
587    // followed by fileinfo, data block index and meta block index.
588
589    finishBlock();
590    writeInlineBlocks(true);
591
592    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
593
594    // Write out the metadata blocks if any.
595    if (!metaNames.isEmpty()) {
596      for (int i = 0; i < metaNames.size(); ++i) {
597        // store the beginning offset
598        long offset = outputStream.getPos();
599        // write the metadata content
600        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
601        metaData.get(i).write(dos);
602
603        blockWriter.writeHeaderAndData(outputStream);
604        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
605
606        // Add the new meta block to the meta index.
607        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
608          blockWriter.getOnDiskSizeWithHeader());
609      }
610    }
611
612    // Load-on-open section.
613
614    // Data block index.
615    //
616    // In version 2, this section of the file starts with the root level data
617    // block index. We call a function that writes intermediate-level blocks
618    // first, then root level, and returns the offset of the root level block
619    // index.
620
621    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
622    trailer.setLoadOnOpenOffset(rootIndexOffset);
623
624    // Meta block index.
625    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX),
626      "meta");
627    blockWriter.writeHeaderAndData(outputStream);
628    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
629
630    if (this.hFileContext.isIncludesMvcc()) {
631      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
632      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
633    }
634
635    // File info
636    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
637    blockWriter.writeHeaderAndData(outputStream);
638    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
639
640    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
641    for (BlockWritable w : additionalLoadOnOpenData) {
642      blockWriter.writeBlock(w, outputStream);
643      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
644    }
645
646    // Now finish off the trailer.
647    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
648    trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize());
649    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
650    trailer.setLastDataBlockOffset(lastDataBlockOffset);
651    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
652    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
653
654    finishClose(trailer);
655
656    blockWriter.release();
657  }
658
659  @Override
660  public void addInlineBlockWriter(InlineBlockWriter ibw) {
661    inlineBlockWriters.add(ibw);
662  }
663
664  @Override
665  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
666    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
667  }
668
669  @Override
670  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
671    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
672  }
673
674  private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) {
675    if (bfw.getKeyCount() <= 0) {
676      return;
677    }
678
679    if (
680      blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META
681    ) {
682      throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported");
683    }
684    additionalLoadOnOpenData.add(new BlockWritable() {
685      @Override
686      public BlockType getBlockType() {
687        return blockType;
688      }
689
690      @Override
691      public void writeToBlock(DataOutput out) throws IOException {
692        bfw.getMetaWriter().write(out);
693        Writable dataWriter = bfw.getDataWriter();
694        if (dataWriter != null) {
695          dataWriter.write(out);
696        }
697      }
698    });
699  }
700
701  @Override
702  public HFileContext getFileContext() {
703    return hFileContext;
704  }
705
706  /**
707   * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on
708   * construction. n * Cell to add. Cannot be empty nor null.
709   */
710  @Override
711  public void append(final Cell cell) throws IOException {
712    // checkKey uses comparator to check we are writing in order.
713    boolean dupKey = checkKey(cell);
714    if (!dupKey) {
715      checkBlockBoundary();
716    }
717
718    if (!blockWriter.isWriting()) {
719      newBlock();
720    }
721
722    blockWriter.write(cell);
723
724    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
725    totalValueLength += cell.getValueLength();
726
727    // Are we the first key in this block?
728    if (firstCellInBlock == null) {
729      // If cell is big, block will be closed and this firstCellInBlock reference will only last
730      // a short while.
731      firstCellInBlock = cell;
732    }
733
734    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
735    lastCell = cell;
736    entryCount++;
737    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
738    int tagsLength = cell.getTagsLength();
739    if (tagsLength > this.maxTagsLength) {
740      this.maxTagsLength = tagsLength;
741    }
742  }
743
744  @Override
745  public void beforeShipped() throws IOException {
746    this.blockWriter.beforeShipped();
747    // Add clone methods for every cell
748    if (this.lastCell != null) {
749      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
750    }
751    if (this.firstCellInBlock != null) {
752      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
753    }
754    if (this.lastCellOfPreviousBlock != null) {
755      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
756    }
757  }
758
759  public Cell getLastCell() {
760    return lastCell;
761  }
762
763  protected void finishFileInfo() throws IOException {
764    if (lastCell != null) {
765      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
766      // byte buffer. Won't take a tuple.
767      byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
768      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
769    }
770
771    // Average key length.
772    int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
773    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
774    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
775      false);
776
777    // Average value length.
778    int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
779    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
780    if (hFileContext.isIncludesTags()) {
781      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
782      // from the FileInfo
783      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
784      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
785        && hFileContext.isCompressTags();
786      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
787    }
788  }
789
790  protected int getMajorVersion() {
791    return 3;
792  }
793
794  protected int getMinorVersion() {
795    return HFileReaderImpl.MAX_MINOR_VERSION;
796  }
797
798  protected void finishClose(FixedFileTrailer trailer) throws IOException {
799    // Write out encryption metadata before finalizing if we have a valid crypto context
800    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
801    if (cryptoContext != Encryption.Context.NONE) {
802      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
803      // all information needed for decryption
804      trailer.setEncryptionKey(EncryptionUtil.wrapKey(
805        cryptoContext.getConf(), cryptoContext.getConf()
806          .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()),
807        cryptoContext.getKey()));
808    }
809    // Now we can finish the close
810    trailer.setMetaIndexCount(metaNames.size());
811    trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize());
812    trailer.setEntryCount(entryCount);
813    trailer.setCompressionCodec(hFileContext.getCompression());
814
815    long startTime = EnvironmentEdgeManager.currentTime();
816    trailer.serialize(outputStream);
817    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
818
819    if (closeOutputStream) {
820      outputStream.close();
821      outputStream = null;
822    }
823  }
824}