001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.net.InetSocketAddress;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.List;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.permission.FsPermission;
034import org.apache.hadoop.hbase.ByteBufferExtendedCell;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellComparator;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.KeyValueUtil;
041import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.apache.hadoop.hbase.io.compress.Compression;
046import org.apache.hadoop.hbase.io.crypto.Encryption;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
049import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
050import org.apache.hadoop.hbase.security.EncryptionUtil;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.util.BloomFilterWriter;
053import org.apache.hadoop.hbase.util.ByteBufferUtils;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.FSUtils;
056import org.apache.hadoop.io.Writable;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059
060/**
061 * Common functionality needed by all versions of {@link HFile} writers.
062 */
063@InterfaceAudience.Private
064public class HFileWriterImpl implements HFile.Writer {
065  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
066
067  private static final long UNSET = -1;
068
069  /** if this feature is enabled, preCalculate encoded data size before real encoding happens*/
070  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = "hbase.writer.unified.encoded.blocksize.ratio";
071
072  /** Block size limit after encoding, used to unify encoded block Cache entry size*/
073  private final int encodedBlockSizeLimit;
074
075  /** The Cell previously appended. Becomes the last cell in the file.*/
076  protected Cell lastCell = null;
077
078  /** FileSystem stream to write into. */
079  protected FSDataOutputStream outputStream;
080
081  /** True if we opened the <code>outputStream</code> (and so will close it). */
082  protected final boolean closeOutputStream;
083
084  /** A "file info" block: a key-value map of file-wide metadata. */
085  protected FileInfo fileInfo = new HFile.FileInfo();
086
087  /** Total # of key/value entries, i.e. how many times add() was called. */
088  protected long entryCount = 0;
089
090  /** Used for calculating the average key length. */
091  protected long totalKeyLength = 0;
092
093  /** Used for calculating the average value length. */
094  protected long totalValueLength = 0;
095
096  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
097  protected long totalUncompressedBytes = 0;
098
099  /** Key comparator. Used to ensure we write in order. */
100  protected final CellComparator comparator;
101
102  /** Meta block names. */
103  protected List<byte[]> metaNames = new ArrayList<>();
104
105  /** {@link Writable}s representing meta block data. */
106  protected List<Writable> metaData = new ArrayList<>();
107
108  /**
109   * First cell in a block.
110   * This reference should be short-lived since we write hfiles in a burst.
111   */
112  protected Cell firstCellInBlock = null;
113
114
115  /** May be null if we were passed a stream. */
116  protected final Path path;
117
118  /** Cache configuration for caching data on write. */
119  protected final CacheConfig cacheConf;
120
121  /**
122   * Name for this object used when logging or in toString. Is either
123   * the result of a toString on stream or else name of passed file Path.
124   */
125  protected final String name;
126
127  /**
128   * The data block encoding which will be used.
129   * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
130   */
131  protected final HFileDataBlockEncoder blockEncoder;
132
133  protected final HFileContext hFileContext;
134
135  private int maxTagsLength = 0;
136
137  /** KeyValue version in FileInfo */
138  public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
139
140  /** Version for KeyValue which includes memstore timestamp */
141  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
142
143  /** Inline block writers for multi-level block index and compound Blooms. */
144  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
145
146  /** block writer */
147  protected HFileBlock.Writer blockWriter;
148
149  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
150  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
151
152  /** The offset of the first data block or -1 if the file is empty. */
153  private long firstDataBlockOffset = UNSET;
154
155  /** The offset of the last data block or 0 if the file is empty. */
156  protected long lastDataBlockOffset = UNSET;
157
158  /**
159   * The last(stop) Cell of the previous data block.
160   * This reference should be short-lived since we write hfiles in a burst.
161   */
162  private Cell lastCellOfPreviousBlock = null;
163
164  /** Additional data items to be written to the "load-on-open" section. */
165  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
166
167  protected long maxMemstoreTS = 0;
168
169  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
170      FSDataOutputStream outputStream,
171      CellComparator comparator, HFileContext fileContext) {
172    this.outputStream = outputStream;
173    this.path = path;
174    this.name = path != null ? path.getName() : outputStream.toString();
175    this.hFileContext = fileContext;
176    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
177    if (encoding != DataBlockEncoding.NONE) {
178      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
179    } else {
180      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
181    }
182    this.comparator = comparator != null ? comparator : CellComparator.getInstance();
183
184    closeOutputStream = path != null;
185    this.cacheConf = cacheConf;
186    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f);
187    this.encodedBlockSizeLimit = (int)(hFileContext.getBlocksize() * encodeBlockSizeRatio);
188    finishInit(conf);
189    if (LOG.isTraceEnabled()) {
190      LOG.trace("Writer" + (path != null ? " for " + path : "") +
191        " initialized with cacheConf: " + cacheConf +
192        " comparator: " + comparator.getClass().getSimpleName() +
193        " fileContext: " + fileContext);
194    }
195  }
196
197  /**
198   * Add to the file info. All added key/value pairs can be obtained using
199   * {@link HFile.Reader#loadFileInfo()}.
200   *
201   * @param k Key
202   * @param v Value
203   * @throws IOException in case the key or the value are invalid
204   */
205  @Override
206  public void appendFileInfo(final byte[] k, final byte[] v)
207      throws IOException {
208    fileInfo.append(k, v, true);
209  }
210
211  /**
212   * Sets the file info offset in the trailer, finishes up populating fields in
213   * the file info, and writes the file info into the given data output. The
214   * reason the data output is not always {@link #outputStream} is that we store
215   * file info as a block in version 2.
216   *
217   * @param trailer fixed file trailer
218   * @param out the data output to write the file info to
219   * @throws IOException
220   */
221  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
222  throws IOException {
223    trailer.setFileInfoOffset(outputStream.getPos());
224    finishFileInfo();
225    long startTime = System.currentTimeMillis();
226    fileInfo.write(out);
227    HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
228  }
229
230  /**
231   * Checks that the given Cell's key does not violate the key order.
232   *
233   * @param cell Cell whose key to check.
234   * @return true if the key is duplicate
235   * @throws IOException if the key or the key order is wrong
236   */
237  protected boolean checkKey(final Cell cell) throws IOException {
238    boolean isDuplicateKey = false;
239
240    if (cell == null) {
241      throw new IOException("Key cannot be null or empty");
242    }
243    if (lastCell != null) {
244      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, lastCell, cell);
245
246      if (keyComp > 0) {
247        throw new IOException("Added a key not lexically larger than"
248            + " previous. Current cell = " + cell + ", lastCell = " + lastCell);
249      } else if (keyComp == 0) {
250        isDuplicateKey = true;
251      }
252    }
253    return isDuplicateKey;
254  }
255
256  /** Checks the given value for validity. */
257  protected void checkValue(final byte[] value, final int offset,
258      final int length) throws IOException {
259    if (value == null) {
260      throw new IOException("Value cannot be null");
261    }
262  }
263
264  /**
265   * @return Path or null if we were passed a stream rather than a Path.
266   */
267  @Override
268  public Path getPath() {
269    return path;
270  }
271
272  @Override
273  public String toString() {
274    return "writer=" + (path != null ? path.toString() : null) + ", name="
275        + name + ", compression=" + hFileContext.getCompression().getName();
276  }
277
278  public static Compression.Algorithm compressionByName(String algoName) {
279    if (algoName == null)
280      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
281    return Compression.getCompressionAlgorithmByName(algoName);
282  }
283
284  /** A helper method to create HFile output streams in constructors */
285  protected static FSDataOutputStream createOutputStream(Configuration conf,
286      FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
287    FsPermission perms = FSUtils.getFilePermissions(fs, conf,
288        HConstants.DATA_FILE_UMASK_KEY);
289    return FSUtils.create(conf, fs, path, perms, favoredNodes);
290  }
291
292  /** Additional initialization steps */
293  protected void finishInit(final Configuration conf) {
294    if (blockWriter != null) {
295      throw new IllegalStateException("finishInit called twice");
296    }
297
298    blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
299
300    // Data block index writer
301    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
302    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
303        cacheIndexesOnWrite ? cacheConf : null,
304        cacheIndexesOnWrite ? name : null);
305    dataBlockIndexWriter.setMaxChunkSize(
306        HFileBlockIndex.getMaxChunkSize(conf));
307    dataBlockIndexWriter.setMinIndexNumEntries(
308        HFileBlockIndex.getMinIndexNumEntries(conf));
309    inlineBlockWriters.add(dataBlockIndexWriter);
310
311    // Meta data block index writer
312    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
313    if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
314  }
315
316  /**
317   * At a block boundary, write all the inline blocks and opens new block.
318   *
319   * @throws IOException
320   */
321  protected void checkBlockBoundary() throws IOException {
322    //for encoder like prefixTree, encoded size is not available, so we have to compare both encoded size
323    //and unencoded size to blocksize limit.
324    if (blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit
325        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()) {
326      finishBlock();
327      writeInlineBlocks(false);
328      newBlock();
329    }
330  }
331
332  /** Clean up the data block that is currently being written.*/
333  private void finishBlock() throws IOException {
334    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) return;
335
336    // Update the first data block offset if UNSET; used scanning.
337    if (firstDataBlockOffset == UNSET) {
338      firstDataBlockOffset = outputStream.getPos();
339    }
340    // Update the last data block offset each time through here.
341    lastDataBlockOffset = outputStream.getPos();
342    blockWriter.writeHeaderAndData(outputStream);
343    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
344    Cell indexEntry =
345      getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
346    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
347      lastDataBlockOffset, onDiskSize);
348    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
349    if (cacheConf.shouldCacheDataOnWrite()) {
350      doCacheOnWrite(lastDataBlockOffset);
351    }
352  }
353
354  /**
355   * Try to return a Cell that falls between <code>left</code> and
356   * <code>right</code> but that is shorter; i.e. takes up less space. This
357   * trick is used building HFile block index. Its an optimization. It does not
358   * always work. In this case we'll just return the <code>right</code> cell.
359   *
360   * @param comparator
361   *          Comparator to use.
362   * @param left
363   * @param right
364   * @return A cell that sorts between <code>left</code> and <code>right</code>.
365   */
366  public static Cell getMidpoint(final CellComparator comparator, final Cell left,
367      final Cell right) {
368    // TODO: Redo so only a single pass over the arrays rather than one to
369    // compare and then a second composing midpoint.
370    if (right == null) {
371      throw new IllegalArgumentException("right cell can not be null");
372    }
373    if (left == null) {
374      return right;
375    }
376    // If Cells from meta table, don't mess around. meta table Cells have schema
377    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
378    // out without trying to do this optimization.
379    if (comparator instanceof MetaCellComparator) {
380      return right;
381    }
382    int diff = comparator.compareRows(left, right);
383    if (diff > 0) {
384      throw new IllegalArgumentException("Left row sorts after right row; left="
385          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
386    }
387    byte[] midRow;
388    boolean bufferBacked = left instanceof ByteBufferExtendedCell
389        && right instanceof ByteBufferExtendedCell;
390    if (diff < 0) {
391      // Left row is < right row.
392      if (bufferBacked) {
393        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
394            ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
395            ((ByteBufferExtendedCell) right).getRowByteBuffer(),
396            ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
397      } else {
398        midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(),
399            left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
400      }
401      // If midRow is null, just return 'right'. Can't do optimization.
402      if (midRow == null) return right;
403      return PrivateCellUtil.createFirstOnRow(midRow);
404    }
405    // Rows are same. Compare on families.
406    diff = comparator.compareFamilies(left, right);
407    if (diff > 0) {
408      throw new IllegalArgumentException("Left family sorts after right family; left="
409          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
410    }
411    if (diff < 0) {
412      if (bufferBacked) {
413        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
414            ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
415            ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
416            ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
417      } else {
418        midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
419            left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
420            right.getFamilyLength());
421      }
422      // If midRow is null, just return 'right'. Can't do optimization.
423      if (midRow == null) return right;
424      // Return new Cell where we use right row and then a mid sort family.
425      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
426    }
427    // Families are same. Compare on qualifiers.
428    diff = comparator.compareQualifiers(left, right);
429    if (diff > 0) {
430      throw new IllegalArgumentException("Left qualifier sorts after right qualifier; left="
431          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
432    }
433    if (diff < 0) {
434      if (bufferBacked) {
435        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
436            ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
437            ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
438            ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
439      } else {
440        midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
441            left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
442            right.getQualifierLength());
443      }
444      // If midRow is null, just return 'right'. Can't do optimization.
445      if (midRow == null) return right;
446      // Return new Cell where we use right row and family and then a mid sort qualifier.
447      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
448    }
449    // No opportunity for optimization. Just return right key.
450    return right;
451  }
452
453  /**
454   * @param leftArray
455   * @param leftOffset
456   * @param leftLength
457   * @param rightArray
458   * @param rightOffset
459   * @param rightLength
460   * @return Return a new array that is between left and right and minimally
461   *         sized else just return null as indicator that we could not create a
462   *         mid point.
463   */
464  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
465      final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
466    // rows are different
467    int minLength = leftLength < rightLength ? leftLength : rightLength;
468    int diffIdx = 0;
469    while (diffIdx < minLength
470        && leftArray[leftOffset + diffIdx] == rightArray[rightOffset + diffIdx]) {
471      diffIdx++;
472    }
473    byte[] minimumMidpointArray = null;
474    if (diffIdx >= minLength) {
475      // leftKey's row is prefix of rightKey's.
476      minimumMidpointArray = new byte[diffIdx + 1];
477      System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1);
478    } else {
479      int diffByte = leftArray[leftOffset + diffIdx];
480      if ((0xff & diffByte) < 0xff && (diffByte + 1) < (rightArray[rightOffset + diffIdx] & 0xff)) {
481        minimumMidpointArray = new byte[diffIdx + 1];
482        System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx);
483        minimumMidpointArray[diffIdx] = (byte) (diffByte + 1);
484      } else {
485        minimumMidpointArray = new byte[diffIdx + 1];
486        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1);
487      }
488    }
489    return minimumMidpointArray;
490  }
491
492  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
493      ByteBuffer right, int rightOffset, int rightLength) {
494    // rows are different
495    int minLength = leftLength < rightLength ? leftLength : rightLength;
496    int diffIdx = 0;
497    while (diffIdx < minLength && ByteBufferUtils.toByte(left,
498        leftOffset + diffIdx) == ByteBufferUtils.toByte(right, rightOffset + diffIdx)) {
499      diffIdx++;
500    }
501    byte[] minMidpoint = null;
502    if (diffIdx >= minLength) {
503      // leftKey's row is prefix of rightKey's.
504      minMidpoint = new byte[diffIdx + 1];
505      ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1);
506    } else {
507      int diffByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
508      if ((0xff & diffByte) < 0xff
509          && (diffByte + 1) < (ByteBufferUtils.toByte(right, rightOffset + diffIdx) & 0xff)) {
510        minMidpoint = new byte[diffIdx + 1];
511        ByteBufferUtils.copyFromBufferToArray(minMidpoint, left, leftOffset, 0, diffIdx);
512        minMidpoint[diffIdx] = (byte) (diffByte + 1);
513      } else {
514        minMidpoint = new byte[diffIdx + 1];
515        ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1);
516      }
517    }
518    return minMidpoint;
519  }
520
521  /** Gives inline block writers an opportunity to contribute blocks. */
522  private void writeInlineBlocks(boolean closing) throws IOException {
523    for (InlineBlockWriter ibw : inlineBlockWriters) {
524      while (ibw.shouldWriteBlock(closing)) {
525        long offset = outputStream.getPos();
526        boolean cacheThisBlock = ibw.getCacheOnWrite();
527        ibw.writeInlineBlock(blockWriter.startWriting(
528            ibw.getInlineBlockType()));
529        blockWriter.writeHeaderAndData(outputStream);
530        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
531            blockWriter.getUncompressedSizeWithoutHeader());
532        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
533
534        if (cacheThisBlock) {
535          doCacheOnWrite(offset);
536        }
537      }
538    }
539  }
540
541  /**
542   * Caches the last written HFile block.
543   * @param offset the offset of the block we want to cache. Used to determine
544   *          the cache key.
545   */
546  private void doCacheOnWrite(long offset) {
547    HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
548    cacheConf.getBlockCache().cacheBlock(
549        new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
550        cacheFormatBlock);
551  }
552
553  /**
554   * Ready a new block for writing.
555   *
556   * @throws IOException
557   */
558  protected void newBlock() throws IOException {
559    // This is where the next block begins.
560    blockWriter.startWriting(BlockType.DATA);
561    firstCellInBlock = null;
562    if (lastCell != null) {
563      lastCellOfPreviousBlock = lastCell;
564    }
565  }
566
567  /**
568   * Add a meta block to the end of the file. Call before close(). Metadata
569   * blocks are expensive. Fill one with a bunch of serialized data rather than
570   * do a metadata block per metadata instance. If metadata is small, consider
571   * adding to file info using {@link #appendFileInfo(byte[], byte[])}
572   *
573   * @param metaBlockName
574   *          name of the block
575   * @param content
576   *          will call readFields to get data later (DO NOT REUSE)
577   */
578  @Override
579  public void appendMetaBlock(String metaBlockName, Writable content) {
580    byte[] key = Bytes.toBytes(metaBlockName);
581    int i;
582    for (i = 0; i < metaNames.size(); ++i) {
583      // stop when the current key is greater than our own
584      byte[] cur = metaNames.get(i);
585      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
586          key.length) > 0) {
587        break;
588      }
589    }
590    metaNames.add(i, key);
591    metaData.add(i, content);
592  }
593
594  @Override
595  public void close() throws IOException {
596    if (outputStream == null) {
597      return;
598    }
599    // Save data block encoder metadata in the file info.
600    blockEncoder.saveMetadata(this);
601    // Write out the end of the data blocks, then write meta data blocks.
602    // followed by fileinfo, data block index and meta block index.
603
604    finishBlock();
605    writeInlineBlocks(true);
606
607    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
608
609    // Write out the metadata blocks if any.
610    if (!metaNames.isEmpty()) {
611      for (int i = 0; i < metaNames.size(); ++i) {
612        // store the beginning offset
613        long offset = outputStream.getPos();
614        // write the metadata content
615        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
616        metaData.get(i).write(dos);
617
618        blockWriter.writeHeaderAndData(outputStream);
619        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
620
621        // Add the new meta block to the meta index.
622        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
623            blockWriter.getOnDiskSizeWithHeader());
624      }
625    }
626
627    // Load-on-open section.
628
629    // Data block index.
630    //
631    // In version 2, this section of the file starts with the root level data
632    // block index. We call a function that writes intermediate-level blocks
633    // first, then root level, and returns the offset of the root level block
634    // index.
635
636    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
637    trailer.setLoadOnOpenOffset(rootIndexOffset);
638
639    // Meta block index.
640    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(
641        BlockType.ROOT_INDEX), "meta");
642    blockWriter.writeHeaderAndData(outputStream);
643    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
644
645    if (this.hFileContext.isIncludesMvcc()) {
646      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
647      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
648    }
649
650    // File info
651    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
652    blockWriter.writeHeaderAndData(outputStream);
653    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
654
655    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
656    for (BlockWritable w : additionalLoadOnOpenData){
657      blockWriter.writeBlock(w, outputStream);
658      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
659    }
660
661    // Now finish off the trailer.
662    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
663    trailer.setUncompressedDataIndexSize(
664        dataBlockIndexWriter.getTotalUncompressedSize());
665    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
666    trailer.setLastDataBlockOffset(lastDataBlockOffset);
667    trailer.setComparatorClass(comparator.getClass());
668    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
669
670
671    finishClose(trailer);
672
673    blockWriter.release();
674  }
675
676  @Override
677  public void addInlineBlockWriter(InlineBlockWriter ibw) {
678    inlineBlockWriters.add(ibw);
679  }
680
681  @Override
682  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
683    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
684  }
685
686  @Override
687  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
688    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
689  }
690
691  private void addBloomFilter(final BloomFilterWriter bfw,
692      final BlockType blockType) {
693    if (bfw.getKeyCount() <= 0)
694      return;
695
696    if (blockType != BlockType.GENERAL_BLOOM_META &&
697        blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
698      throw new RuntimeException("Block Type: " + blockType.toString() +
699          "is not supported");
700    }
701    additionalLoadOnOpenData.add(new BlockWritable() {
702      @Override
703      public BlockType getBlockType() {
704        return blockType;
705      }
706
707      @Override
708      public void writeToBlock(DataOutput out) throws IOException {
709        bfw.getMetaWriter().write(out);
710        Writable dataWriter = bfw.getDataWriter();
711        if (dataWriter != null)
712          dataWriter.write(out);
713      }
714    });
715  }
716
717  @Override
718  public HFileContext getFileContext() {
719    return hFileContext;
720  }
721
722  /**
723   * Add key/value to file. Keys must be added in an order that agrees with the
724   * Comparator passed on construction.
725   *
726   * @param cell
727   *          Cell to add. Cannot be empty nor null.
728   * @throws IOException
729   */
730  @Override
731  public void append(final Cell cell) throws IOException {
732    // checkKey uses comparator to check we are writing in order.
733    boolean dupKey = checkKey(cell);
734    if (!dupKey) {
735      checkBlockBoundary();
736    }
737
738    if (!blockWriter.isWriting()) {
739      newBlock();
740    }
741
742    blockWriter.write(cell);
743
744    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
745    totalValueLength += cell.getValueLength();
746
747    // Are we the first key in this block?
748    if (firstCellInBlock == null) {
749      // If cell is big, block will be closed and this firstCellInBlock reference will only last
750      // a short while.
751      firstCellInBlock = cell;
752    }
753
754    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
755    lastCell = cell;
756    entryCount++;
757    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
758    int tagsLength = cell.getTagsLength();
759    if (tagsLength > this.maxTagsLength) {
760      this.maxTagsLength = tagsLength;
761    }
762  }
763
764  @Override
765  public void beforeShipped() throws IOException {
766    // Add clone methods for every cell
767    if (this.lastCell != null) {
768      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
769    }
770    if (this.firstCellInBlock != null) {
771      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
772    }
773    if (this.lastCellOfPreviousBlock != null) {
774      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
775    }
776  }
777
778  @VisibleForTesting
779  public Cell getLastCell() {
780    return lastCell;
781  }
782
783  protected void finishFileInfo() throws IOException {
784    if (lastCell != null) {
785      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
786      // byte buffer. Won't take a tuple.
787      byte [] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
788      fileInfo.append(FileInfo.LASTKEY, lastKey, false);
789    }
790
791    // Average key length.
792    int avgKeyLen =
793        entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
794    fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
795    fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
796      false);
797
798    // Average value length.
799    int avgValueLen =
800        entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
801    fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
802    if (hFileContext.isIncludesTags()) {
803      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
804      // from the FileInfo
805      fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
806      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
807        && hFileContext.isCompressTags();
808      fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
809    }
810  }
811
812  protected int getMajorVersion() {
813    return 3;
814  }
815
816  protected int getMinorVersion() {
817    return HFileReaderImpl.MAX_MINOR_VERSION;
818  }
819
820  protected void finishClose(FixedFileTrailer trailer) throws IOException {
821    // Write out encryption metadata before finalizing if we have a valid crypto context
822    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
823    if (cryptoContext != Encryption.Context.NONE) {
824      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
825      // all information needed for decryption
826      trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
827        cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
828          User.getCurrent().getShortName()),
829        cryptoContext.getKey()));
830    }
831    // Now we can finish the close
832    trailer.setMetaIndexCount(metaNames.size());
833    trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
834    trailer.setEntryCount(entryCount);
835    trailer.setCompressionCodec(hFileContext.getCompression());
836
837    long startTime = System.currentTimeMillis();
838    trailer.serialize(outputStream);
839    HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
840
841    if (closeOutputStream) {
842      outputStream.close();
843      outputStream = null;
844    }
845  }
846}