001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.net.InetSocketAddress;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.fs.permission.FsPermission;
033import org.apache.hadoop.hbase.ByteBufferExtendedCell;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValueUtil;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.crypto.Encryption;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
045import org.apache.hadoop.hbase.security.EncryptionUtil;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.util.BloomFilterWriter;
048import org.apache.hadoop.hbase.util.ByteBufferUtils;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CommonFSUtils;
051import org.apache.hadoop.hbase.util.FSUtils;
052import org.apache.hadoop.io.Writable;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
058
059/**
060 * Common functionality needed by all versions of {@link HFile} writers.
061 */
062@InterfaceAudience.Private
063public class HFileWriterImpl implements HFile.Writer {
064  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
065
066  private static final long UNSET = -1;
067
068  /** if this feature is enabled, preCalculate encoded data size before real encoding happens*/
069  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
070    "hbase.writer.unified.encoded.blocksize.ratio";
071
072  /** Block size limit after encoding, used to unify encoded block Cache entry size*/
073  private final int encodedBlockSizeLimit;
074
075  /** The Cell previously appended. Becomes the last cell in the file.*/
076  protected Cell lastCell = null;
077
078  /** FileSystem stream to write into. */
079  protected FSDataOutputStream outputStream;
080
081  /** True if we opened the <code>outputStream</code> (and so will close it). */
082  protected final boolean closeOutputStream;
083
084  /** A "file info" block: a key-value map of file-wide metadata. */
085  protected HFileInfo fileInfo = new HFileInfo();
086
087  /** Total # of key/value entries, i.e. how many times add() was called. */
088  protected long entryCount = 0;
089
090  /** Used for calculating the average key length. */
091  protected long totalKeyLength = 0;
092
093  /** Used for calculating the average value length. */
094  protected long totalValueLength = 0;
095
096  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
097  protected long totalUncompressedBytes = 0;
098
099  /** Meta block names. */
100  protected List<byte[]> metaNames = new ArrayList<>();
101
102  /** {@link Writable}s representing meta block data. */
103  protected List<Writable> metaData = new ArrayList<>();
104
105  /**
106   * First cell in a block.
107   * This reference should be short-lived since we write hfiles in a burst.
108   */
109  protected Cell firstCellInBlock = null;
110
111
112  /** May be null if we were passed a stream. */
113  protected final Path path;
114
115  /** Cache configuration for caching data on write. */
116  protected final CacheConfig cacheConf;
117
118  /**
119   * Name for this object used when logging or in toString. Is either
120   * the result of a toString on stream or else name of passed file Path.
121   */
122  protected final String name;
123
124  /**
125   * The data block encoding which will be used.
126   * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
127   */
128  protected final HFileDataBlockEncoder blockEncoder;
129
130  protected final HFileContext hFileContext;
131
132  private int maxTagsLength = 0;
133
134  /** KeyValue version in FileInfo */
135  public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
136
137  /** Version for KeyValue which includes memstore timestamp */
138  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
139
140  /** Inline block writers for multi-level block index and compound Blooms. */
141  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
142
143  /** block writer */
144  protected HFileBlock.Writer blockWriter;
145
146  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
147  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
148
149  /** The offset of the first data block or -1 if the file is empty. */
150  private long firstDataBlockOffset = UNSET;
151
152  /** The offset of the last data block or 0 if the file is empty. */
153  protected long lastDataBlockOffset = UNSET;
154
155  /**
156   * The last(stop) Cell of the previous data block.
157   * This reference should be short-lived since we write hfiles in a burst.
158   */
159  private Cell lastCellOfPreviousBlock = null;
160
161  /** Additional data items to be written to the "load-on-open" section. */
162  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
163
164  protected long maxMemstoreTS = 0;
165
166  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
167      FSDataOutputStream outputStream, HFileContext fileContext) {
168    this.outputStream = outputStream;
169    this.path = path;
170    this.name = path != null ? path.getName() : outputStream.toString();
171    this.hFileContext = fileContext;
172    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
173    if (encoding != DataBlockEncoding.NONE) {
174      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
175    } else {
176      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
177    }
178    closeOutputStream = path != null;
179    this.cacheConf = cacheConf;
180    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f);
181    this.encodedBlockSizeLimit = (int)(hFileContext.getBlocksize() * encodeBlockSizeRatio);
182    finishInit(conf);
183    if (LOG.isTraceEnabled()) {
184      LOG.trace("Writer" + (path != null ? " for " + path : "") +
185        " initialized with cacheConf: " + cacheConf +
186        " fileContext: " + fileContext);
187    }
188  }
189
190  /**
191   * Add to the file info. All added key/value pairs can be obtained using
192   * {@link HFile.Reader#getHFileInfo()}.
193   *
194   * @param k Key
195   * @param v Value
196   * @throws IOException in case the key or the value are invalid
197   */
198  @Override
199  public void appendFileInfo(final byte[] k, final byte[] v)
200      throws IOException {
201    fileInfo.append(k, v, true);
202  }
203
204  /**
205   * Sets the file info offset in the trailer, finishes up populating fields in
206   * the file info, and writes the file info into the given data output. The
207   * reason the data output is not always {@link #outputStream} is that we store
208   * file info as a block in version 2.
209   *
210   * @param trailer fixed file trailer
211   * @param out the data output to write the file info to
212   */
213  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
214      throws IOException {
215    trailer.setFileInfoOffset(outputStream.getPos());
216    finishFileInfo();
217    long startTime = System.currentTimeMillis();
218    fileInfo.write(out);
219    HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
220  }
221
222  /**
223   * Checks that the given Cell's key does not violate the key order.
224   *
225   * @param cell Cell whose key to check.
226   * @return true if the key is duplicate
227   * @throws IOException if the key or the key order is wrong
228   */
229  protected boolean checkKey(final Cell cell) throws IOException {
230    boolean isDuplicateKey = false;
231
232    if (cell == null) {
233      throw new IOException("Key cannot be null or empty");
234    }
235    if (lastCell != null) {
236      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
237        lastCell, cell);
238      if (keyComp > 0) {
239        String message = getLexicalErrorMessage(cell);
240        throw new IOException(message);
241      } else if (keyComp == 0) {
242        isDuplicateKey = true;
243      }
244    }
245    return isDuplicateKey;
246  }
247
248  private String getLexicalErrorMessage(Cell cell) {
249    StringBuilder sb = new StringBuilder();
250    sb.append("Added a key not lexically larger than previous. Current cell = ");
251    sb.append(cell);
252    sb.append(", lastCell = ");
253    sb.append(lastCell);
254    //file context includes HFile path and optionally table and CF of file being written
255    sb.append("fileContext=");
256    sb.append(hFileContext);
257    return sb.toString();
258  }
259
260  /** Checks the given value for validity. */
261  protected void checkValue(final byte[] value, final int offset,
262      final int length) throws IOException {
263    if (value == null) {
264      throw new IOException("Value cannot be null");
265    }
266  }
267
268  /**
269   * @return Path or null if we were passed a stream rather than a Path.
270   */
271  @Override
272  public Path getPath() {
273    return path;
274  }
275
276  @Override
277  public String toString() {
278    return "writer=" + (path != null ? path.toString() : null) + ", name="
279        + name + ", compression=" + hFileContext.getCompression().getName();
280  }
281
282  public static Compression.Algorithm compressionByName(String algoName) {
283    if (algoName == null) {
284      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
285    }
286    return Compression.getCompressionAlgorithmByName(algoName);
287  }
288
289  /** A helper method to create HFile output streams in constructors */
290  protected static FSDataOutputStream createOutputStream(Configuration conf,
291      FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
292    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf,
293        HConstants.DATA_FILE_UMASK_KEY);
294    return FSUtils.create(conf, fs, path, perms, favoredNodes);
295  }
296
297  /** Additional initialization steps */
298  protected void finishInit(final Configuration conf) {
299    if (blockWriter != null) {
300      throw new IllegalStateException("finishInit called twice");
301    }
302    blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext,
303        cacheConf.getByteBuffAllocator());
304    // Data block index writer
305    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
306    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
307        cacheIndexesOnWrite ? cacheConf : null,
308        cacheIndexesOnWrite ? name : null);
309    dataBlockIndexWriter.setMaxChunkSize(
310        HFileBlockIndex.getMaxChunkSize(conf));
311    dataBlockIndexWriter.setMinIndexNumEntries(
312        HFileBlockIndex.getMinIndexNumEntries(conf));
313    inlineBlockWriters.add(dataBlockIndexWriter);
314
315    // Meta data block index writer
316    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
317    LOG.trace("Initialized with {}", cacheConf);
318  }
319
320  /**
321   * At a block boundary, write all the inline blocks and opens new block.
322   */
323  protected void checkBlockBoundary() throws IOException {
324    // For encoder like prefixTree, encoded size is not available, so we have to compare both
325    // encoded size and unencoded size to blocksize limit.
326    if (blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit
327        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()) {
328      finishBlock();
329      writeInlineBlocks(false);
330      newBlock();
331    }
332  }
333
334  /** Clean up the data block that is currently being written.*/
335  private void finishBlock() throws IOException {
336    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
337      return;
338    }
339
340    // Update the first data block offset if UNSET; used scanning.
341    if (firstDataBlockOffset == UNSET) {
342      firstDataBlockOffset = outputStream.getPos();
343    }
344    // Update the last data block offset each time through here.
345    lastDataBlockOffset = outputStream.getPos();
346    blockWriter.writeHeaderAndData(outputStream);
347    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
348    Cell indexEntry =
349      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
350    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
351      lastDataBlockOffset, onDiskSize);
352    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
353    if (cacheConf.shouldCacheDataOnWrite()) {
354      doCacheOnWrite(lastDataBlockOffset);
355    }
356  }
357
358  /**
359   * Try to return a Cell that falls between <code>left</code> and
360   * <code>right</code> but that is shorter; i.e. takes up less space. This
361   * trick is used building HFile block index. Its an optimization. It does not
362   * always work. In this case we'll just return the <code>right</code> cell.
363   * @return A cell that sorts between <code>left</code> and <code>right</code>.
364   */
365  public static Cell getMidpoint(final CellComparator comparator, final Cell left,
366      final Cell right) {
367    // TODO: Redo so only a single pass over the arrays rather than one to
368    // compare and then a second composing midpoint.
369    if (right == null) {
370      throw new IllegalArgumentException("right cell can not be null");
371    }
372    if (left == null) {
373      return right;
374    }
375    // If Cells from meta table, don't mess around. meta table Cells have schema
376    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
377    // out without trying to do this optimization.
378    if (comparator instanceof MetaCellComparator) {
379      return right;
380    }
381    int diff = comparator.compareRows(left, right);
382    if (diff > 0) {
383      throw new IllegalArgumentException("Left row sorts after right row; left="
384          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
385    }
386    byte[] midRow;
387    boolean bufferBacked = left instanceof ByteBufferExtendedCell
388        && right instanceof ByteBufferExtendedCell;
389    if (diff < 0) {
390      // Left row is < right row.
391      if (bufferBacked) {
392        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
393            ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
394            ((ByteBufferExtendedCell) right).getRowByteBuffer(),
395            ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
396      } else {
397        midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(),
398            left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
399      }
400      // If midRow is null, just return 'right'. Can't do optimization.
401      if (midRow == null) {
402        return right;
403      }
404      return PrivateCellUtil.createFirstOnRow(midRow);
405    }
406    // Rows are same. Compare on families.
407    diff = comparator.compareFamilies(left, right);
408    if (diff > 0) {
409      throw new IllegalArgumentException("Left family sorts after right family; left="
410          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
411    }
412    if (diff < 0) {
413      if (bufferBacked) {
414        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
415            ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
416            ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
417            ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
418      } else {
419        midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
420            left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
421            right.getFamilyLength());
422      }
423      // If midRow is null, just return 'right'. Can't do optimization.
424      if (midRow == null) {
425        return right;
426      }
427      // Return new Cell where we use right row and then a mid sort family.
428      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
429    }
430    // Families are same. Compare on qualifiers.
431    diff = comparator.compareQualifiers(left, right);
432    if (diff > 0) {
433      throw new IllegalArgumentException("Left qualifier sorts after right qualifier; left="
434          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
435    }
436    if (diff < 0) {
437      if (bufferBacked) {
438        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
439            ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
440            ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
441            ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
442      } else {
443        midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
444            left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
445            right.getQualifierLength());
446      }
447      // If midRow is null, just return 'right'. Can't do optimization.
448      if (midRow == null) {
449        return right;
450      }
451      // Return new Cell where we use right row and family and then a mid sort qualifier.
452      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
453    }
454    // No opportunity for optimization. Just return right key.
455    return right;
456  }
457
458  /**
459   * @return Return a new array that is between left and right and minimally
460   *         sized else just return null as indicator that we could not create a
461   *         mid point.
462   */
463  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
464      final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
465    // rows are different
466    int minLength = leftLength < rightLength ? leftLength : rightLength;
467    int diffIdx = 0;
468    while (diffIdx < minLength
469        && leftArray[leftOffset + diffIdx] == rightArray[rightOffset + diffIdx]) {
470      diffIdx++;
471    }
472    byte[] minimumMidpointArray = null;
473    if (diffIdx >= minLength) {
474      // leftKey's row is prefix of rightKey's.
475      minimumMidpointArray = new byte[diffIdx + 1];
476      System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1);
477    } else {
478      int diffByte = leftArray[leftOffset + diffIdx];
479      if ((0xff & diffByte) < 0xff && (diffByte + 1) < (rightArray[rightOffset + diffIdx] & 0xff)) {
480        minimumMidpointArray = new byte[diffIdx + 1];
481        System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx);
482        minimumMidpointArray[diffIdx] = (byte) (diffByte + 1);
483      } else {
484        minimumMidpointArray = new byte[diffIdx + 1];
485        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1);
486      }
487    }
488    return minimumMidpointArray;
489  }
490
491  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
492      ByteBuffer right, int rightOffset, int rightLength) {
493    // rows are different
494    int minLength = leftLength < rightLength ? leftLength : rightLength;
495    int diffIdx = 0;
496    while (diffIdx < minLength && ByteBufferUtils.toByte(left,
497        leftOffset + diffIdx) == ByteBufferUtils.toByte(right, rightOffset + diffIdx)) {
498      diffIdx++;
499    }
500    byte[] minMidpoint = null;
501    if (diffIdx >= minLength) {
502      // leftKey's row is prefix of rightKey's.
503      minMidpoint = new byte[diffIdx + 1];
504      ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1);
505    } else {
506      int diffByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
507      if ((0xff & diffByte) < 0xff
508          && (diffByte + 1) < (ByteBufferUtils.toByte(right, rightOffset + diffIdx) & 0xff)) {
509        minMidpoint = new byte[diffIdx + 1];
510        ByteBufferUtils.copyFromBufferToArray(minMidpoint, left, leftOffset, 0, diffIdx);
511        minMidpoint[diffIdx] = (byte) (diffByte + 1);
512      } else {
513        minMidpoint = new byte[diffIdx + 1];
514        ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1);
515      }
516    }
517    return minMidpoint;
518  }
519
520  /** Gives inline block writers an opportunity to contribute blocks. */
521  private void writeInlineBlocks(boolean closing) throws IOException {
522    for (InlineBlockWriter ibw : inlineBlockWriters) {
523      while (ibw.shouldWriteBlock(closing)) {
524        long offset = outputStream.getPos();
525        boolean cacheThisBlock = ibw.getCacheOnWrite();
526        ibw.writeInlineBlock(blockWriter.startWriting(
527            ibw.getInlineBlockType()));
528        blockWriter.writeHeaderAndData(outputStream);
529        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
530            blockWriter.getUncompressedSizeWithoutHeader());
531        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
532
533        if (cacheThisBlock) {
534          doCacheOnWrite(offset);
535        }
536      }
537    }
538  }
539
540  /**
541   * Caches the last written HFile block.
542   * @param offset the offset of the block we want to cache. Used to determine
543   *          the cache key.
544   */
545  private void doCacheOnWrite(long offset) {
546    cacheConf.getBlockCache().ifPresent(cache -> {
547      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
548      try {
549        cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
550            cacheFormatBlock);
551      } finally {
552        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
553        cacheFormatBlock.release();
554      }
555    });
556  }
557
558  /**
559   * Ready a new block for writing.
560   */
561  protected void newBlock() throws IOException {
562    // This is where the next block begins.
563    blockWriter.startWriting(BlockType.DATA);
564    firstCellInBlock = null;
565    if (lastCell != null) {
566      lastCellOfPreviousBlock = lastCell;
567    }
568  }
569
570  /**
571   * Add a meta block to the end of the file. Call before close(). Metadata
572   * blocks are expensive. Fill one with a bunch of serialized data rather than
573   * do a metadata block per metadata instance. If metadata is small, consider
574   * adding to file info using {@link #appendFileInfo(byte[], byte[])}
575   *
576   * @param metaBlockName
577   *          name of the block
578   * @param content
579   *          will call readFields to get data later (DO NOT REUSE)
580   */
581  @Override
582  public void appendMetaBlock(String metaBlockName, Writable content) {
583    byte[] key = Bytes.toBytes(metaBlockName);
584    int i;
585    for (i = 0; i < metaNames.size(); ++i) {
586      // stop when the current key is greater than our own
587      byte[] cur = metaNames.get(i);
588      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
589          key.length) > 0) {
590        break;
591      }
592    }
593    metaNames.add(i, key);
594    metaData.add(i, content);
595  }
596
597  @Override
598  public void close() throws IOException {
599    if (outputStream == null) {
600      return;
601    }
602    // Save data block encoder metadata in the file info.
603    blockEncoder.saveMetadata(this);
604    // Write out the end of the data blocks, then write meta data blocks.
605    // followed by fileinfo, data block index and meta block index.
606
607    finishBlock();
608    writeInlineBlocks(true);
609
610    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
611
612    // Write out the metadata blocks if any.
613    if (!metaNames.isEmpty()) {
614      for (int i = 0; i < metaNames.size(); ++i) {
615        // store the beginning offset
616        long offset = outputStream.getPos();
617        // write the metadata content
618        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
619        metaData.get(i).write(dos);
620
621        blockWriter.writeHeaderAndData(outputStream);
622        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
623
624        // Add the new meta block to the meta index.
625        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
626            blockWriter.getOnDiskSizeWithHeader());
627      }
628    }
629
630    // Load-on-open section.
631
632    // Data block index.
633    //
634    // In version 2, this section of the file starts with the root level data
635    // block index. We call a function that writes intermediate-level blocks
636    // first, then root level, and returns the offset of the root level block
637    // index.
638
639    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
640    trailer.setLoadOnOpenOffset(rootIndexOffset);
641
642    // Meta block index.
643    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(
644        BlockType.ROOT_INDEX), "meta");
645    blockWriter.writeHeaderAndData(outputStream);
646    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
647
648    if (this.hFileContext.isIncludesMvcc()) {
649      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
650      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
651    }
652
653    // File info
654    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
655    blockWriter.writeHeaderAndData(outputStream);
656    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
657
658    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
659    for (BlockWritable w : additionalLoadOnOpenData){
660      blockWriter.writeBlock(w, outputStream);
661      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
662    }
663
664    // Now finish off the trailer.
665    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
666    trailer.setUncompressedDataIndexSize(
667        dataBlockIndexWriter.getTotalUncompressedSize());
668    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
669    trailer.setLastDataBlockOffset(lastDataBlockOffset);
670    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
671    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
672
673
674    finishClose(trailer);
675
676    blockWriter.release();
677  }
678
679  @Override
680  public void addInlineBlockWriter(InlineBlockWriter ibw) {
681    inlineBlockWriters.add(ibw);
682  }
683
684  @Override
685  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
686    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
687  }
688
689  @Override
690  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
691    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
692  }
693
694  private void addBloomFilter(final BloomFilterWriter bfw,
695      final BlockType blockType) {
696    if (bfw.getKeyCount() <= 0) {
697      return;
698    }
699
700    if (blockType != BlockType.GENERAL_BLOOM_META &&
701        blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
702      throw new RuntimeException("Block Type: " + blockType.toString() +
703          "is not supported");
704    }
705    additionalLoadOnOpenData.add(new BlockWritable() {
706      @Override
707      public BlockType getBlockType() {
708        return blockType;
709      }
710
711      @Override
712      public void writeToBlock(DataOutput out) throws IOException {
713        bfw.getMetaWriter().write(out);
714        Writable dataWriter = bfw.getDataWriter();
715        if (dataWriter != null) {
716          dataWriter.write(out);
717        }
718      }
719    });
720  }
721
722  @Override
723  public HFileContext getFileContext() {
724    return hFileContext;
725  }
726
727  /**
728   * Add key/value to file. Keys must be added in an order that agrees with the
729   * Comparator passed on construction.
730   *
731   * @param cell
732   *          Cell to add. Cannot be empty nor null.
733   */
734  @Override
735  public void append(final Cell cell) throws IOException {
736    // checkKey uses comparator to check we are writing in order.
737    boolean dupKey = checkKey(cell);
738    if (!dupKey) {
739      checkBlockBoundary();
740    }
741
742    if (!blockWriter.isWriting()) {
743      newBlock();
744    }
745
746    blockWriter.write(cell);
747
748    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
749    totalValueLength += cell.getValueLength();
750
751    // Are we the first key in this block?
752    if (firstCellInBlock == null) {
753      // If cell is big, block will be closed and this firstCellInBlock reference will only last
754      // a short while.
755      firstCellInBlock = cell;
756    }
757
758    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
759    lastCell = cell;
760    entryCount++;
761    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
762    int tagsLength = cell.getTagsLength();
763    if (tagsLength > this.maxTagsLength) {
764      this.maxTagsLength = tagsLength;
765    }
766  }
767
768  @Override
769  public void beforeShipped() throws IOException {
770    this.blockWriter.beforeShipped();
771    // Add clone methods for every cell
772    if (this.lastCell != null) {
773      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
774    }
775    if (this.firstCellInBlock != null) {
776      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
777    }
778    if (this.lastCellOfPreviousBlock != null) {
779      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
780    }
781  }
782
783  @VisibleForTesting
784  public Cell getLastCell() {
785    return lastCell;
786  }
787
788  protected void finishFileInfo() throws IOException {
789    if (lastCell != null) {
790      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
791      // byte buffer. Won't take a tuple.
792      byte [] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
793      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
794    }
795
796    // Average key length.
797    int avgKeyLen =
798        entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
799    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
800    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
801      false);
802
803    // Average value length.
804    int avgValueLen =
805        entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
806    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
807    if (hFileContext.isIncludesTags()) {
808      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
809      // from the FileInfo
810      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
811      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
812        && hFileContext.isCompressTags();
813      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
814    }
815  }
816
817  protected int getMajorVersion() {
818    return 3;
819  }
820
821  protected int getMinorVersion() {
822    return HFileReaderImpl.MAX_MINOR_VERSION;
823  }
824
825  protected void finishClose(FixedFileTrailer trailer) throws IOException {
826    // Write out encryption metadata before finalizing if we have a valid crypto context
827    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
828    if (cryptoContext != Encryption.Context.NONE) {
829      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
830      // all information needed for decryption
831      trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
832        cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
833          User.getCurrent().getShortName()),
834        cryptoContext.getKey()));
835    }
836    // Now we can finish the close
837    trailer.setMetaIndexCount(metaNames.size());
838    trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
839    trailer.setEntryCount(entryCount);
840    trailer.setCompressionCodec(hFileContext.getCompression());
841
842    long startTime = System.currentTimeMillis();
843    trailer.serialize(outputStream);
844    HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
845
846    if (closeOutputStream) {
847      outputStream.close();
848      outputStream = null;
849    }
850  }
851}