001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.net.InetSocketAddress;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.fs.permission.FsPermission;
033import org.apache.hadoop.hbase.ByteBufferExtendedCell;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.MetaCellComparator;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.crypto.Encryption;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
045import org.apache.hadoop.hbase.security.EncryptionUtil;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.util.BloomFilterWriter;
048import org.apache.hadoop.hbase.util.ByteBufferUtils;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CommonFSUtils;
051import org.apache.hadoop.hbase.util.FSUtils;
052import org.apache.hadoop.io.Writable;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * Common functionality needed by all versions of {@link HFile} writers.
059 */
060@InterfaceAudience.Private
061public class HFileWriterImpl implements HFile.Writer {
062  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
063
064  private static final long UNSET = -1;
065
066  /** if this feature is enabled, preCalculate encoded data size before real encoding happens*/
067  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
068    "hbase.writer.unified.encoded.blocksize.ratio";
069
070  /** Block size limit after encoding, used to unify encoded block Cache entry size*/
071  private final int encodedBlockSizeLimit;
072
073  /** The Cell previously appended. Becomes the last cell in the file.*/
074  protected Cell lastCell = null;
075
076  /** FileSystem stream to write into. */
077  protected FSDataOutputStream outputStream;
078
079  /** True if we opened the <code>outputStream</code> (and so will close it). */
080  protected final boolean closeOutputStream;
081
082  /** A "file info" block: a key-value map of file-wide metadata. */
083  protected HFileInfo fileInfo = new HFileInfo();
084
085  /** Total # of key/value entries, i.e. how many times add() was called. */
086  protected long entryCount = 0;
087
088  /** Used for calculating the average key length. */
089  protected long totalKeyLength = 0;
090
091  /** Used for calculating the average value length. */
092  protected long totalValueLength = 0;
093
094  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
095  protected long totalUncompressedBytes = 0;
096
097  /** Meta block names. */
098  protected List<byte[]> metaNames = new ArrayList<>();
099
100  /** {@link Writable}s representing meta block data. */
101  protected List<Writable> metaData = new ArrayList<>();
102
103  /**
104   * First cell in a block.
105   * This reference should be short-lived since we write hfiles in a burst.
106   */
107  protected Cell firstCellInBlock = null;
108
109
110  /** May be null if we were passed a stream. */
111  protected final Path path;
112
113  /** Cache configuration for caching data on write. */
114  protected final CacheConfig cacheConf;
115
116  /**
117   * Name for this object used when logging or in toString. Is either
118   * the result of a toString on stream or else name of passed file Path.
119   */
120  protected final String name;
121
122  /**
123   * The data block encoding which will be used.
124   * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
125   */
126  protected final HFileDataBlockEncoder blockEncoder;
127
128  protected final HFileContext hFileContext;
129
130  private int maxTagsLength = 0;
131
132  /** KeyValue version in FileInfo */
133  public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
134
135  /** Version for KeyValue which includes memstore timestamp */
136  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
137
138  /** Inline block writers for multi-level block index and compound Blooms. */
139  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
140
141  /** block writer */
142  protected HFileBlock.Writer blockWriter;
143
144  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
145  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
146
147  /** The offset of the first data block or -1 if the file is empty. */
148  private long firstDataBlockOffset = UNSET;
149
150  /** The offset of the last data block or 0 if the file is empty. */
151  protected long lastDataBlockOffset = UNSET;
152
153  /**
154   * The last(stop) Cell of the previous data block.
155   * This reference should be short-lived since we write hfiles in a burst.
156   */
157  private Cell lastCellOfPreviousBlock = null;
158
159  /** Additional data items to be written to the "load-on-open" section. */
160  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
161
162  protected long maxMemstoreTS = 0;
163
164  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
165      FSDataOutputStream outputStream, HFileContext fileContext) {
166    this.outputStream = outputStream;
167    this.path = path;
168    this.name = path != null ? path.getName() : outputStream.toString();
169    this.hFileContext = fileContext;
170    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
171    if (encoding != DataBlockEncoding.NONE) {
172      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
173    } else {
174      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
175    }
176    closeOutputStream = path != null;
177    this.cacheConf = cacheConf;
178    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f);
179    this.encodedBlockSizeLimit = (int)(hFileContext.getBlocksize() * encodeBlockSizeRatio);
180    finishInit(conf);
181    if (LOG.isTraceEnabled()) {
182      LOG.trace("Writer" + (path != null ? " for " + path : "") +
183        " initialized with cacheConf: " + cacheConf +
184        " fileContext: " + fileContext);
185    }
186  }
187
188  /**
189   * Add to the file info. All added key/value pairs can be obtained using
190   * {@link HFile.Reader#getHFileInfo()}.
191   *
192   * @param k Key
193   * @param v Value
194   * @throws IOException in case the key or the value are invalid
195   */
196  @Override
197  public void appendFileInfo(final byte[] k, final byte[] v)
198      throws IOException {
199    fileInfo.append(k, v, true);
200  }
201
202  /**
203   * Sets the file info offset in the trailer, finishes up populating fields in
204   * the file info, and writes the file info into the given data output. The
205   * reason the data output is not always {@link #outputStream} is that we store
206   * file info as a block in version 2.
207   *
208   * @param trailer fixed file trailer
209   * @param out the data output to write the file info to
210   */
211  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
212      throws IOException {
213    trailer.setFileInfoOffset(outputStream.getPos());
214    finishFileInfo();
215    long startTime = System.currentTimeMillis();
216    fileInfo.write(out);
217    HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
218  }
219
220  /**
221   * Checks that the given Cell's key does not violate the key order.
222   *
223   * @param cell Cell whose key to check.
224   * @return true if the key is duplicate
225   * @throws IOException if the key or the key order is wrong
226   */
227  protected boolean checkKey(final Cell cell) throws IOException {
228    boolean isDuplicateKey = false;
229
230    if (cell == null) {
231      throw new IOException("Key cannot be null or empty");
232    }
233    if (lastCell != null) {
234      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
235        lastCell, cell);
236      if (keyComp > 0) {
237        String message = getLexicalErrorMessage(cell);
238        throw new IOException(message);
239      } else if (keyComp == 0) {
240        isDuplicateKey = true;
241      }
242    }
243    return isDuplicateKey;
244  }
245
246  private String getLexicalErrorMessage(Cell cell) {
247    StringBuilder sb = new StringBuilder();
248    sb.append("Added a key not lexically larger than previous. Current cell = ");
249    sb.append(cell);
250    sb.append(", lastCell = ");
251    sb.append(lastCell);
252    //file context includes HFile path and optionally table and CF of file being written
253    sb.append("fileContext=");
254    sb.append(hFileContext);
255    return sb.toString();
256  }
257
258  /** Checks the given value for validity. */
259  protected void checkValue(final byte[] value, final int offset,
260      final int length) throws IOException {
261    if (value == null) {
262      throw new IOException("Value cannot be null");
263    }
264  }
265
266  /**
267   * @return Path or null if we were passed a stream rather than a Path.
268   */
269  @Override
270  public Path getPath() {
271    return path;
272  }
273
274  @Override
275  public String toString() {
276    return "writer=" + (path != null ? path.toString() : null) + ", name="
277        + name + ", compression=" + hFileContext.getCompression().getName();
278  }
279
280  public static Compression.Algorithm compressionByName(String algoName) {
281    if (algoName == null) {
282      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
283    }
284    return Compression.getCompressionAlgorithmByName(algoName);
285  }
286
287  /** A helper method to create HFile output streams in constructors */
288  protected static FSDataOutputStream createOutputStream(Configuration conf,
289      FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
290    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf,
291        HConstants.DATA_FILE_UMASK_KEY);
292    return FSUtils.create(conf, fs, path, perms, favoredNodes);
293  }
294
295  /** Additional initialization steps */
296  protected void finishInit(final Configuration conf) {
297    if (blockWriter != null) {
298      throw new IllegalStateException("finishInit called twice");
299    }
300    blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext,
301        cacheConf.getByteBuffAllocator());
302    // Data block index writer
303    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
304    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
305        cacheIndexesOnWrite ? cacheConf : null,
306        cacheIndexesOnWrite ? name : null);
307    dataBlockIndexWriter.setMaxChunkSize(
308        HFileBlockIndex.getMaxChunkSize(conf));
309    dataBlockIndexWriter.setMinIndexNumEntries(
310        HFileBlockIndex.getMinIndexNumEntries(conf));
311    inlineBlockWriters.add(dataBlockIndexWriter);
312
313    // Meta data block index writer
314    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
315    LOG.trace("Initialized with {}", cacheConf);
316  }
317
318  /**
319   * At a block boundary, write all the inline blocks and opens new block.
320   */
321  protected void checkBlockBoundary() throws IOException {
322    // For encoder like prefixTree, encoded size is not available, so we have to compare both
323    // encoded size and unencoded size to blocksize limit.
324    if (blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit
325        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()) {
326      finishBlock();
327      writeInlineBlocks(false);
328      newBlock();
329    }
330  }
331
332  /** Clean up the data block that is currently being written.*/
333  private void finishBlock() throws IOException {
334    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
335      return;
336    }
337
338    // Update the first data block offset if UNSET; used scanning.
339    if (firstDataBlockOffset == UNSET) {
340      firstDataBlockOffset = outputStream.getPos();
341    }
342    // Update the last data block offset each time through here.
343    lastDataBlockOffset = outputStream.getPos();
344    blockWriter.writeHeaderAndData(outputStream);
345    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
346    Cell indexEntry =
347      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
348    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
349      lastDataBlockOffset, onDiskSize);
350    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
351    if (cacheConf.shouldCacheDataOnWrite()) {
352      doCacheOnWrite(lastDataBlockOffset);
353    }
354  }
355
356  /**
357   * Try to return a Cell that falls between <code>left</code> and
358   * <code>right</code> but that is shorter; i.e. takes up less space. This
359   * trick is used building HFile block index. Its an optimization. It does not
360   * always work. In this case we'll just return the <code>right</code> cell.
361   * @return A cell that sorts between <code>left</code> and <code>right</code>.
362   */
363  public static Cell getMidpoint(final CellComparator comparator, final Cell left,
364      final Cell right) {
365    // TODO: Redo so only a single pass over the arrays rather than one to
366    // compare and then a second composing midpoint.
367    if (right == null) {
368      throw new IllegalArgumentException("right cell can not be null");
369    }
370    if (left == null) {
371      return right;
372    }
373    // If Cells from meta table, don't mess around. meta table Cells have schema
374    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
375    // out without trying to do this optimization.
376    if (comparator instanceof MetaCellComparator) {
377      return right;
378    }
379    int diff = comparator.compareRows(left, right);
380    if (diff > 0) {
381      throw new IllegalArgumentException("Left row sorts after right row; left="
382          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
383    }
384    byte[] midRow;
385    boolean bufferBacked = left instanceof ByteBufferExtendedCell
386        && right instanceof ByteBufferExtendedCell;
387    if (diff < 0) {
388      // Left row is < right row.
389      if (bufferBacked) {
390        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
391            ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
392            ((ByteBufferExtendedCell) right).getRowByteBuffer(),
393            ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
394      } else {
395        midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(),
396            left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
397      }
398      // If midRow is null, just return 'right'. Can't do optimization.
399      if (midRow == null) {
400        return right;
401      }
402      return PrivateCellUtil.createFirstOnRow(midRow);
403    }
404    // Rows are same. Compare on families.
405    diff = comparator.compareFamilies(left, right);
406    if (diff > 0) {
407      throw new IllegalArgumentException("Left family sorts after right family; left="
408          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
409    }
410    if (diff < 0) {
411      if (bufferBacked) {
412        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
413            ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
414            ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
415            ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
416      } else {
417        midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
418            left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
419            right.getFamilyLength());
420      }
421      // If midRow is null, just return 'right'. Can't do optimization.
422      if (midRow == null) {
423        return right;
424      }
425      // Return new Cell where we use right row and then a mid sort family.
426      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
427    }
428    // Families are same. Compare on qualifiers.
429    diff = comparator.compareQualifiers(left, right);
430    if (diff > 0) {
431      throw new IllegalArgumentException("Left qualifier sorts after right qualifier; left="
432          + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right));
433    }
434    if (diff < 0) {
435      if (bufferBacked) {
436        midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
437            ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
438            ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
439            ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
440      } else {
441        midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
442            left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
443            right.getQualifierLength());
444      }
445      // If midRow is null, just return 'right'. Can't do optimization.
446      if (midRow == null) {
447        return right;
448      }
449      // Return new Cell where we use right row and family and then a mid sort qualifier.
450      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
451    }
452    // No opportunity for optimization. Just return right key.
453    return right;
454  }
455
456  /**
457   * @return Return a new array that is between left and right and minimally
458   *         sized else just return null as indicator that we could not create a
459   *         mid point.
460   */
461  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
462      final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
463    // rows are different
464    int minLength = leftLength < rightLength ? leftLength : rightLength;
465    int diffIdx = 0;
466    while (diffIdx < minLength
467        && leftArray[leftOffset + diffIdx] == rightArray[rightOffset + diffIdx]) {
468      diffIdx++;
469    }
470    byte[] minimumMidpointArray = null;
471    if (diffIdx >= minLength) {
472      // leftKey's row is prefix of rightKey's.
473      minimumMidpointArray = new byte[diffIdx + 1];
474      System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1);
475    } else {
476      int diffByte = leftArray[leftOffset + diffIdx];
477      if ((0xff & diffByte) < 0xff && (diffByte + 1) < (rightArray[rightOffset + diffIdx] & 0xff)) {
478        minimumMidpointArray = new byte[diffIdx + 1];
479        System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx);
480        minimumMidpointArray[diffIdx] = (byte) (diffByte + 1);
481      } else {
482        minimumMidpointArray = new byte[diffIdx + 1];
483        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1);
484      }
485    }
486    return minimumMidpointArray;
487  }
488
489  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
490      ByteBuffer right, int rightOffset, int rightLength) {
491    // rows are different
492    int minLength = leftLength < rightLength ? leftLength : rightLength;
493    int diffIdx = 0;
494    while (diffIdx < minLength && ByteBufferUtils.toByte(left,
495        leftOffset + diffIdx) == ByteBufferUtils.toByte(right, rightOffset + diffIdx)) {
496      diffIdx++;
497    }
498    byte[] minMidpoint = null;
499    if (diffIdx >= minLength) {
500      // leftKey's row is prefix of rightKey's.
501      minMidpoint = new byte[diffIdx + 1];
502      ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1);
503    } else {
504      int diffByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
505      if ((0xff & diffByte) < 0xff
506          && (diffByte + 1) < (ByteBufferUtils.toByte(right, rightOffset + diffIdx) & 0xff)) {
507        minMidpoint = new byte[diffIdx + 1];
508        ByteBufferUtils.copyFromBufferToArray(minMidpoint, left, leftOffset, 0, diffIdx);
509        minMidpoint[diffIdx] = (byte) (diffByte + 1);
510      } else {
511        minMidpoint = new byte[diffIdx + 1];
512        ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1);
513      }
514    }
515    return minMidpoint;
516  }
517
518  /** Gives inline block writers an opportunity to contribute blocks. */
519  private void writeInlineBlocks(boolean closing) throws IOException {
520    for (InlineBlockWriter ibw : inlineBlockWriters) {
521      while (ibw.shouldWriteBlock(closing)) {
522        long offset = outputStream.getPos();
523        boolean cacheThisBlock = ibw.getCacheOnWrite();
524        ibw.writeInlineBlock(blockWriter.startWriting(
525            ibw.getInlineBlockType()));
526        blockWriter.writeHeaderAndData(outputStream);
527        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
528            blockWriter.getUncompressedSizeWithoutHeader());
529        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
530
531        if (cacheThisBlock) {
532          doCacheOnWrite(offset);
533        }
534      }
535    }
536  }
537
538  /**
539   * Caches the last written HFile block.
540   * @param offset the offset of the block we want to cache. Used to determine
541   *          the cache key.
542   */
543  private void doCacheOnWrite(long offset) {
544    cacheConf.getBlockCache().ifPresent(cache -> {
545      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
546      try {
547        cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
548            cacheFormatBlock);
549      } finally {
550        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
551        cacheFormatBlock.release();
552      }
553    });
554  }
555
556  /**
557   * Ready a new block for writing.
558   */
559  protected void newBlock() throws IOException {
560    // This is where the next block begins.
561    blockWriter.startWriting(BlockType.DATA);
562    firstCellInBlock = null;
563    if (lastCell != null) {
564      lastCellOfPreviousBlock = lastCell;
565    }
566  }
567
568  /**
569   * Add a meta block to the end of the file. Call before close(). Metadata
570   * blocks are expensive. Fill one with a bunch of serialized data rather than
571   * do a metadata block per metadata instance. If metadata is small, consider
572   * adding to file info using {@link #appendFileInfo(byte[], byte[])}
573   *
574   * @param metaBlockName
575   *          name of the block
576   * @param content
577   *          will call readFields to get data later (DO NOT REUSE)
578   */
579  @Override
580  public void appendMetaBlock(String metaBlockName, Writable content) {
581    byte[] key = Bytes.toBytes(metaBlockName);
582    int i;
583    for (i = 0; i < metaNames.size(); ++i) {
584      // stop when the current key is greater than our own
585      byte[] cur = metaNames.get(i);
586      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
587          key.length) > 0) {
588        break;
589      }
590    }
591    metaNames.add(i, key);
592    metaData.add(i, content);
593  }
594
595  @Override
596  public void close() throws IOException {
597    if (outputStream == null) {
598      return;
599    }
600    // Save data block encoder metadata in the file info.
601    blockEncoder.saveMetadata(this);
602    // Write out the end of the data blocks, then write meta data blocks.
603    // followed by fileinfo, data block index and meta block index.
604
605    finishBlock();
606    writeInlineBlocks(true);
607
608    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
609
610    // Write out the metadata blocks if any.
611    if (!metaNames.isEmpty()) {
612      for (int i = 0; i < metaNames.size(); ++i) {
613        // store the beginning offset
614        long offset = outputStream.getPos();
615        // write the metadata content
616        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
617        metaData.get(i).write(dos);
618
619        blockWriter.writeHeaderAndData(outputStream);
620        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
621
622        // Add the new meta block to the meta index.
623        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
624            blockWriter.getOnDiskSizeWithHeader());
625      }
626    }
627
628    // Load-on-open section.
629
630    // Data block index.
631    //
632    // In version 2, this section of the file starts with the root level data
633    // block index. We call a function that writes intermediate-level blocks
634    // first, then root level, and returns the offset of the root level block
635    // index.
636
637    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
638    trailer.setLoadOnOpenOffset(rootIndexOffset);
639
640    // Meta block index.
641    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(
642        BlockType.ROOT_INDEX), "meta");
643    blockWriter.writeHeaderAndData(outputStream);
644    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
645
646    if (this.hFileContext.isIncludesMvcc()) {
647      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
648      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
649    }
650
651    // File info
652    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
653    blockWriter.writeHeaderAndData(outputStream);
654    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
655
656    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
657    for (BlockWritable w : additionalLoadOnOpenData){
658      blockWriter.writeBlock(w, outputStream);
659      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
660    }
661
662    // Now finish off the trailer.
663    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
664    trailer.setUncompressedDataIndexSize(
665        dataBlockIndexWriter.getTotalUncompressedSize());
666    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
667    trailer.setLastDataBlockOffset(lastDataBlockOffset);
668    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
669    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
670
671
672    finishClose(trailer);
673
674    blockWriter.release();
675  }
676
677  @Override
678  public void addInlineBlockWriter(InlineBlockWriter ibw) {
679    inlineBlockWriters.add(ibw);
680  }
681
682  @Override
683  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
684    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
685  }
686
687  @Override
688  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
689    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
690  }
691
692  private void addBloomFilter(final BloomFilterWriter bfw,
693      final BlockType blockType) {
694    if (bfw.getKeyCount() <= 0) {
695      return;
696    }
697
698    if (blockType != BlockType.GENERAL_BLOOM_META &&
699        blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
700      throw new RuntimeException("Block Type: " + blockType.toString() +
701          "is not supported");
702    }
703    additionalLoadOnOpenData.add(new BlockWritable() {
704      @Override
705      public BlockType getBlockType() {
706        return blockType;
707      }
708
709      @Override
710      public void writeToBlock(DataOutput out) throws IOException {
711        bfw.getMetaWriter().write(out);
712        Writable dataWriter = bfw.getDataWriter();
713        if (dataWriter != null) {
714          dataWriter.write(out);
715        }
716      }
717    });
718  }
719
720  @Override
721  public HFileContext getFileContext() {
722    return hFileContext;
723  }
724
725  /**
726   * Add key/value to file. Keys must be added in an order that agrees with the
727   * Comparator passed on construction.
728   *
729   * @param cell
730   *          Cell to add. Cannot be empty nor null.
731   */
732  @Override
733  public void append(final Cell cell) throws IOException {
734    // checkKey uses comparator to check we are writing in order.
735    boolean dupKey = checkKey(cell);
736    if (!dupKey) {
737      checkBlockBoundary();
738    }
739
740    if (!blockWriter.isWriting()) {
741      newBlock();
742    }
743
744    blockWriter.write(cell);
745
746    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
747    totalValueLength += cell.getValueLength();
748
749    // Are we the first key in this block?
750    if (firstCellInBlock == null) {
751      // If cell is big, block will be closed and this firstCellInBlock reference will only last
752      // a short while.
753      firstCellInBlock = cell;
754    }
755
756    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
757    lastCell = cell;
758    entryCount++;
759    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
760    int tagsLength = cell.getTagsLength();
761    if (tagsLength > this.maxTagsLength) {
762      this.maxTagsLength = tagsLength;
763    }
764  }
765
766  @Override
767  public void beforeShipped() throws IOException {
768    this.blockWriter.beforeShipped();
769    // Add clone methods for every cell
770    if (this.lastCell != null) {
771      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
772    }
773    if (this.firstCellInBlock != null) {
774      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
775    }
776    if (this.lastCellOfPreviousBlock != null) {
777      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
778    }
779  }
780
781  public Cell getLastCell() {
782    return lastCell;
783  }
784
785  protected void finishFileInfo() throws IOException {
786    if (lastCell != null) {
787      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
788      // byte buffer. Won't take a tuple.
789      byte [] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
790      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
791    }
792
793    // Average key length.
794    int avgKeyLen =
795        entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
796    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
797    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
798      false);
799
800    // Average value length.
801    int avgValueLen =
802        entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
803    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
804    if (hFileContext.isIncludesTags()) {
805      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
806      // from the FileInfo
807      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
808      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
809        && hFileContext.isCompressTags();
810      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
811    }
812  }
813
814  protected int getMajorVersion() {
815    return 3;
816  }
817
818  protected int getMinorVersion() {
819    return HFileReaderImpl.MAX_MINOR_VERSION;
820  }
821
822  protected void finishClose(FixedFileTrailer trailer) throws IOException {
823    // Write out encryption metadata before finalizing if we have a valid crypto context
824    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
825    if (cryptoContext != Encryption.Context.NONE) {
826      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
827      // all information needed for decryption
828      trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
829        cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
830          User.getCurrent().getShortName()),
831        cryptoContext.getKey()));
832    }
833    // Now we can finish the close
834    trailer.setMetaIndexCount(metaNames.size());
835    trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
836    trailer.setEntryCount(entryCount);
837    trailer.setCompressionCodec(hFileContext.getCompression());
838
839    long startTime = System.currentTimeMillis();
840    trailer.serialize(outputStream);
841    HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
842
843    if (closeOutputStream) {
844      outputStream.close();
845      outputStream = null;
846    }
847  }
848}