001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
021import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
024
025import java.io.DataOutput;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.net.InetSocketAddress;
029import java.nio.ByteBuffer;
030import java.security.Key;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Optional;
034import java.util.function.Supplier;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.hbase.ByteBufferExtendedCell;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellComparator;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.ExtendedCell;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.KeyValueUtil;
048import org.apache.hadoop.hbase.MetaCellComparator;
049import org.apache.hadoop.hbase.PrivateCellUtil;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.crypto.Encryption;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
054import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
055import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
056import org.apache.hadoop.hbase.security.EncryptionUtil;
057import org.apache.hadoop.hbase.security.User;
058import org.apache.hadoop.hbase.util.BloomFilterWriter;
059import org.apache.hadoop.hbase.util.ByteBufferUtils;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.FSUtils;
064import org.apache.hadoop.io.Writable;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * Common functionality needed by all versions of {@link HFile} writers.
071 */
072@InterfaceAudience.Private
073public class HFileWriterImpl implements HFile.Writer {
074  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
075
076  private static final long UNSET = -1;
077
078  /** if this feature is enabled, preCalculate encoded data size before real encoding happens */
079  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
080    "hbase.writer.unified.encoded.blocksize.ratio";
081
082  /** Block size limit after encoding, used to unify encoded block Cache entry size */
083  private final int encodedBlockSizeLimit;
084
085  /** The Cell previously appended. Becomes the last cell in the file. */
086  protected ExtendedCell lastCell = null;
087
088  /** FileSystem stream to write into. */
089  protected FSDataOutputStream outputStream;
090
091  /** True if we opened the <code>outputStream</code> (and so will close it). */
092  protected final boolean closeOutputStream;
093
094  /** A "file info" block: a key-value map of file-wide metadata. */
095  protected HFileInfo fileInfo = new HFileInfo();
096
097  /** Total # of key/value entries, i.e. how many times add() was called. */
098  protected long entryCount = 0;
099
100  /** Used for calculating the average key length. */
101  protected long totalKeyLength = 0;
102
103  /** Used for calculating the average value length. */
104  protected long totalValueLength = 0;
105
106  /** Len of the biggest cell. */
107  protected long lenOfBiggestCell = 0;
108  /** Key of the biggest cell. */
109  protected byte[] keyOfBiggestCell;
110
111  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
112  protected long totalUncompressedBytes = 0;
113
114  /** Meta block names. */
115  protected List<byte[]> metaNames = new ArrayList<>();
116
117  /** {@link Writable}s representing meta block data. */
118  protected List<Writable> metaData = new ArrayList<>();
119
120  /**
121   * First cell in a block. This reference should be short-lived since we write hfiles in a burst.
122   */
123  protected ExtendedCell firstCellInBlock = null;
124
125  /** May be null if we were passed a stream. */
126  protected final Path path;
127
128  protected final Configuration conf;
129
130  /** Cache configuration for caching data on write. */
131  protected final CacheConfig cacheConf;
132
133  public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) {
134    this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
135  }
136
137  private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;
138
139  /**
140   * Name for this object used when logging or in toString. Is either the result of a toString on
141   * stream or else name of passed file Path.
142   */
143  protected final String name;
144
145  /**
146   * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is
147   * no encoding.
148   */
149  protected final HFileDataBlockEncoder blockEncoder;
150
151  protected final HFileIndexBlockEncoder indexBlockEncoder;
152
153  protected final HFileContext hFileContext;
154
155  private int maxTagsLength = 0;
156
157  /** KeyValue version in FileInfo */
158  public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
159
160  /** Version for KeyValue which includes memstore timestamp */
161  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
162
163  /** Inline block writers for multi-level block index and compound Blooms. */
164  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
165
166  /** block writer */
167  protected HFileBlock.Writer blockWriter;
168
169  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
170  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
171
172  /** The offset of the first data block or -1 if the file is empty. */
173  private long firstDataBlockOffset = UNSET;
174
175  /** The offset of the last data block or 0 if the file is empty. */
176  protected long lastDataBlockOffset = UNSET;
177
178  /**
179   * The last(stop) Cell of the previous data block. This reference should be short-lived since we
180   * write hfiles in a burst.
181   */
182  private ExtendedCell lastCellOfPreviousBlock = null;
183
184  /** Additional data items to be written to the "load-on-open" section. */
185  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
186
187  protected long maxMemstoreTS = 0;
188
189  private final TimeRangeTracker timeRangeTracker;
190  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
191
192  private String regionName;
193
194  private String familyName;
195
196  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
197    FSDataOutputStream outputStream, HFileContext fileContext) {
198    this.outputStream = outputStream;
199    this.path = path;
200    if (path != null) {
201      this.name = path.getName();
202      this.regionName = path.getParent().getParent().getName();
203      this.familyName = path.getParent().getName();
204    } else {
205      this.name = outputStream.toString();
206    }
207    this.hFileContext = fileContext;
208    // TODO: Move this back to upper layer
209    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
210    this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
211    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
212    if (encoding != DataBlockEncoding.NONE) {
213      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
214    } else {
215      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
216    }
217    IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding();
218    if (indexBlockEncoding != IndexBlockEncoding.NONE) {
219      this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding);
220    } else {
221      this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE;
222    }
223    closeOutputStream = path != null;
224    this.cacheConf = cacheConf;
225    this.conf = conf;
226    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
227    this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
228
229    finishInit(conf);
230    if (LOG.isTraceEnabled()) {
231      LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
232        + cacheConf + " fileContext: " + fileContext);
233    }
234  }
235
236  /**
237   * Add to the file info. All added key/value pairs can be obtained using
238   * {@link HFile.Reader#getHFileInfo()}.
239   * @param k Key
240   * @param v Value
241   * @throws IOException in case the key or the value are invalid
242   */
243  @Override
244  public void appendFileInfo(final byte[] k, final byte[] v) throws IOException {
245    fileInfo.append(k, v, true);
246  }
247
248  /**
249   * Sets the file info offset in the trailer, finishes up populating fields in the file info, and
250   * writes the file info into the given data output. The reason the data output is not always
251   * {@link #outputStream} is that we store file info as a block in version 2.
252   * @param trailer fixed file trailer
253   * @param out     the data output to write the file info to
254   */
255  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
256    throws IOException {
257    trailer.setFileInfoOffset(outputStream.getPos());
258    finishFileInfo();
259    long startTime = EnvironmentEdgeManager.currentTime();
260    fileInfo.write(out);
261    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
262  }
263
264  public long getPos() throws IOException {
265    return outputStream.getPos();
266
267  }
268
269  /**
270   * Checks that the given Cell's key does not violate the key order.
271   * @param cell Cell whose key to check.
272   * @return true if the key is duplicate
273   * @throws IOException if the key or the key order is wrong
274   */
275  protected boolean checkKey(final Cell cell) throws IOException {
276    boolean isDuplicateKey = false;
277
278    if (cell == null) {
279      throw new IOException("Key cannot be null or empty");
280    }
281    if (lastCell != null) {
282      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
283        lastCell, cell);
284      if (keyComp > 0) {
285        String message = getLexicalErrorMessage(cell);
286        throw new IOException(message);
287      } else if (keyComp == 0) {
288        isDuplicateKey = true;
289      }
290    }
291    return isDuplicateKey;
292  }
293
294  private String getLexicalErrorMessage(Cell cell) {
295    StringBuilder sb = new StringBuilder();
296    sb.append("Added a key not lexically larger than previous. Current cell = ");
297    sb.append(cell);
298    sb.append(", lastCell = ");
299    sb.append(lastCell);
300    // file context includes HFile path and optionally table and CF of file being written
301    sb.append("fileContext=");
302    sb.append(hFileContext);
303    return sb.toString();
304  }
305
306  /** Checks the given value for validity. */
307  protected void checkValue(final byte[] value, final int offset, final int length)
308    throws IOException {
309    if (value == null) {
310      throw new IOException("Value cannot be null");
311    }
312  }
313
314  /** Returns Path or null if we were passed a stream rather than a Path. */
315  @Override
316  public Path getPath() {
317    return path;
318  }
319
320  @Override
321  public String toString() {
322    return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression="
323      + hFileContext.getCompression().getName();
324  }
325
326  public static Compression.Algorithm compressionByName(String algoName) {
327    if (algoName == null) {
328      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
329    }
330    return Compression.getCompressionAlgorithmByName(algoName);
331  }
332
333  /** A helper method to create HFile output streams in constructors */
334  protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs,
335    Path path, InetSocketAddress[] favoredNodes) throws IOException {
336    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
337    return FSUtils.create(conf, fs, path, perms, favoredNodes);
338  }
339
340  /** Additional initialization steps */
341  protected void finishInit(final Configuration conf) {
342    if (blockWriter != null) {
343      throw new IllegalStateException("finishInit called twice");
344    }
345    blockWriter =
346      new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
347        conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
348    // Data block index writer
349    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
350    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
351      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder);
352    dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
353    dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
354    inlineBlockWriters.add(dataBlockIndexWriter);
355
356    // Meta data block index writer
357    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
358    LOG.trace("Initialized with {}", cacheConf);
359  }
360
361  /**
362   * At a block boundary, write all the inline blocks and opens new block.
363   */
364  protected void checkBlockBoundary() throws IOException {
365    boolean shouldFinishBlock = false;
366    // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0
367    // and we should use the encoding ratio
368    if (encodedBlockSizeLimit > 0) {
369      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit;
370    } else {
371      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
372        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
373    }
374    shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
375    if (shouldFinishBlock) {
376      finishBlock();
377      writeInlineBlocks(false);
378      newBlock();
379    }
380  }
381
382  /** Clean up the data block that is currently being written. */
383  private void finishBlock() throws IOException {
384    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
385      return;
386    }
387
388    // Update the first data block offset if UNSET; used scanning.
389    if (firstDataBlockOffset == UNSET) {
390      firstDataBlockOffset = outputStream.getPos();
391    }
392    // Update the last data block offset each time through here.
393    lastDataBlockOffset = outputStream.getPos();
394    blockWriter.writeHeaderAndData(outputStream);
395    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
396    ExtendedCell indexEntry =
397      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
398    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
399      lastDataBlockOffset, onDiskSize);
400    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
401    if (cacheConf.shouldCacheDataOnWrite()) {
402      doCacheOnWrite(lastDataBlockOffset);
403    }
404  }
405
406  /**
407   * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is
408   * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an
409   * optimization. It does not always work. In this case we'll just return the <code>right</code>
410   * cell.
411   * @return A cell that sorts between <code>left</code> and <code>right</code>.
412   */
413  public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left,
414    final ExtendedCell right) {
415    if (right == null) {
416      throw new IllegalArgumentException("right cell can not be null");
417    }
418    if (left == null) {
419      return right;
420    }
421    // If Cells from meta table, don't mess around. meta table Cells have schema
422    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
423    // out without trying to do this optimization.
424    if (comparator instanceof MetaCellComparator) {
425      return right;
426    }
427    byte[] midRow;
428    boolean bufferBacked =
429      left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell;
430    if (bufferBacked) {
431      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
432        ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
433        ((ByteBufferExtendedCell) right).getRowByteBuffer(),
434        ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
435    } else {
436      midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
437        right.getRowArray(), right.getRowOffset(), right.getRowLength());
438    }
439    if (midRow != null) {
440      return PrivateCellUtil.createFirstOnRow(midRow);
441    }
442    // Rows are same. Compare on families.
443    if (bufferBacked) {
444      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
445        ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
446        ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
447        ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
448    } else {
449      midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
450        left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
451        right.getFamilyLength());
452    }
453    if (midRow != null) {
454      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
455    }
456    // Families are same. Compare on qualifiers.
457    if (bufferBacked) {
458      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
459        ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
460        ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
461        ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
462    } else {
463      midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
464        left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
465        right.getQualifierLength());
466    }
467    if (midRow != null) {
468      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
469    }
470    // No opportunity for optimization. Just return right key.
471    return right;
472  }
473
474  /**
475   * Try to get a byte array that falls between left and right as short as possible with
476   * lexicographical order;
477   * @return Return a new array that is between left and right and minimally sized else just return
478   *         null if left == right.
479   */
480  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
481    final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
482    int minLength = leftLength < rightLength ? leftLength : rightLength;
483    int diffIdx = 0;
484    for (; diffIdx < minLength; diffIdx++) {
485      byte leftByte = leftArray[leftOffset + diffIdx];
486      byte rightByte = rightArray[rightOffset + diffIdx];
487      if ((leftByte & 0xff) > (rightByte & 0xff)) {
488        throw new IllegalArgumentException("Left byte array sorts after right row; left="
489          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
490          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
491      } else if (leftByte != rightByte) {
492        break;
493      }
494    }
495    if (diffIdx == minLength) {
496      if (leftLength > rightLength) {
497        // right is prefix of left
498        throw new IllegalArgumentException("Left byte array sorts after right row; left="
499          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
500          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
501      } else if (leftLength < rightLength) {
502        // left is prefix of right.
503        byte[] minimumMidpointArray = new byte[minLength + 1];
504        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1);
505        minimumMidpointArray[minLength] = 0x00;
506        return minimumMidpointArray;
507      } else {
508        // left == right
509        return null;
510      }
511    }
512    // Note that left[diffIdx] can never be equal to 0xff since left < right
513    byte[] minimumMidpointArray = new byte[diffIdx + 1];
514    System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1);
515    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
516    return minimumMidpointArray;
517  }
518
519  /**
520   * Try to create a new byte array that falls between left and right as short as possible with
521   * lexicographical order.
522   * @return Return a new array that is between left and right and minimally sized else just return
523   *         null if left == right.
524   */
525  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
526    ByteBuffer right, int rightOffset, int rightLength) {
527    int minLength = leftLength < rightLength ? leftLength : rightLength;
528    int diffIdx = 0;
529    for (; diffIdx < minLength; diffIdx++) {
530      int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
531      int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx);
532      if ((leftByte & 0xff) > (rightByte & 0xff)) {
533        throw new IllegalArgumentException("Left byte array sorts after right row; left="
534          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
535          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
536      } else if (leftByte != rightByte) {
537        break;
538      }
539    }
540    if (diffIdx == minLength) {
541      if (leftLength > rightLength) {
542        // right is prefix of left
543        throw new IllegalArgumentException("Left byte array sorts after right row; left="
544          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
545          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
546      } else if (leftLength < rightLength) {
547        // left is prefix of right.
548        byte[] minimumMidpointArray = new byte[minLength + 1];
549        ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0,
550          minLength + 1);
551        minimumMidpointArray[minLength] = 0x00;
552        return minimumMidpointArray;
553      } else {
554        // left == right
555        return null;
556      }
557    }
558    // Note that left[diffIdx] can never be equal to 0xff since left < right
559    byte[] minimumMidpointArray = new byte[diffIdx + 1];
560    ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1);
561    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
562    return minimumMidpointArray;
563  }
564
565  /** Gives inline block writers an opportunity to contribute blocks. */
566  private void writeInlineBlocks(boolean closing) throws IOException {
567    for (InlineBlockWriter ibw : inlineBlockWriters) {
568      while (ibw.shouldWriteBlock(closing)) {
569        long offset = outputStream.getPos();
570        boolean cacheThisBlock = ibw.getCacheOnWrite();
571        ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType()));
572        blockWriter.writeHeaderAndData(outputStream);
573        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
574          blockWriter.getUncompressedSizeWithoutHeader());
575        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
576
577        if (cacheThisBlock) {
578          doCacheOnWrite(offset);
579        }
580      }
581    }
582  }
583
584  /**
585   * Caches the last written HFile block.
586   * @param offset the offset of the block we want to cache. Used to determine the cache key.
587   */
588  private void doCacheOnWrite(long offset) {
589    cacheConf.getBlockCache().ifPresent(cache -> {
590      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
591      try {
592        BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
593        if (!shouldCacheBlock(cache, key)) {
594          return;
595        }
596        cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
597      } finally {
598        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
599        cacheFormatBlock.release();
600      }
601    });
602  }
603
604  private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
605    if (path != null) {
606      return new BlockCacheKey(name, familyName, regionName, offset, true, blockType, false);
607    }
608    return new BlockCacheKey(name, offset, true, blockType);
609  }
610
611  private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
612    Optional<Boolean> result =
613      cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf);
614    return result.orElse(true);
615  }
616
617  /**
618   * Ready a new block for writing.
619   */
620  protected void newBlock() throws IOException {
621    // This is where the next block begins.
622    blockWriter.startWriting(BlockType.DATA);
623    firstCellInBlock = null;
624    if (lastCell != null) {
625      lastCellOfPreviousBlock = lastCell;
626    }
627  }
628
629  /**
630   * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive.
631   * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance.
632   * If metadata is small, consider adding to file info using
633   * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data
634   * later (DO NOT REUSE)
635   */
636  @Override
637  public void appendMetaBlock(String metaBlockName, Writable content) {
638    byte[] key = Bytes.toBytes(metaBlockName);
639    int i;
640    for (i = 0; i < metaNames.size(); ++i) {
641      // stop when the current key is greater than our own
642      byte[] cur = metaNames.get(i);
643      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) {
644        break;
645      }
646    }
647    metaNames.add(i, key);
648    metaData.add(i, content);
649  }
650
651  @Override
652  public void close() throws IOException {
653    if (outputStream == null) {
654      return;
655    }
656    // Save data block encoder metadata in the file info.
657    blockEncoder.saveMetadata(this);
658    // Save index block encoder metadata in the file info.
659    indexBlockEncoder.saveMetadata(this);
660    // Write out the end of the data blocks, then write meta data blocks.
661    // followed by fileinfo, data block index and meta block index.
662
663    finishBlock();
664    writeInlineBlocks(true);
665
666    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
667
668    // Write out the metadata blocks if any.
669    if (!metaNames.isEmpty()) {
670      for (int i = 0; i < metaNames.size(); ++i) {
671        // store the beginning offset
672        long offset = outputStream.getPos();
673        // write the metadata content
674        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
675        metaData.get(i).write(dos);
676
677        blockWriter.writeHeaderAndData(outputStream);
678        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
679
680        // Add the new meta block to the meta index.
681        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
682          blockWriter.getOnDiskSizeWithHeader());
683      }
684    }
685
686    // Load-on-open section.
687
688    // Data block index.
689    //
690    // In version 2, this section of the file starts with the root level data
691    // block index. We call a function that writes intermediate-level blocks
692    // first, then root level, and returns the offset of the root level block
693    // index.
694
695    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
696    trailer.setLoadOnOpenOffset(rootIndexOffset);
697
698    // Meta block index.
699    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX),
700      "meta");
701    blockWriter.writeHeaderAndData(outputStream);
702    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
703
704    if (this.hFileContext.isIncludesMvcc()) {
705      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
706      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
707    }
708
709    // File info
710    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
711    blockWriter.writeHeaderAndData(outputStream);
712    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
713
714    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
715    for (BlockWritable w : additionalLoadOnOpenData) {
716      blockWriter.writeBlock(w, outputStream);
717      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
718    }
719
720    // Now finish off the trailer.
721    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
722    trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize());
723    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
724    trailer.setLastDataBlockOffset(lastDataBlockOffset);
725    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
726    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
727
728    finishClose(trailer);
729
730    blockWriter.release();
731  }
732
733  @Override
734  public void addInlineBlockWriter(InlineBlockWriter ibw) {
735    inlineBlockWriters.add(ibw);
736  }
737
738  @Override
739  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
740    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
741  }
742
743  @Override
744  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
745    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
746  }
747
748  private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) {
749    if (bfw.getKeyCount() <= 0) {
750      return;
751    }
752
753    if (
754      blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META
755    ) {
756      throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported");
757    }
758    additionalLoadOnOpenData.add(new BlockWritable() {
759      @Override
760      public BlockType getBlockType() {
761        return blockType;
762      }
763
764      @Override
765      public void writeToBlock(DataOutput out) throws IOException {
766        bfw.getMetaWriter().write(out);
767        Writable dataWriter = bfw.getDataWriter();
768        if (dataWriter != null) {
769          dataWriter.write(out);
770        }
771      }
772    });
773  }
774
775  @Override
776  public HFileContext getFileContext() {
777    return hFileContext;
778  }
779
780  /**
781   * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on
782   * construction. Cell to add. Cannot be empty nor null.
783   */
784  @Override
785  public void append(final ExtendedCell cell) throws IOException {
786    // checkKey uses comparator to check we are writing in order.
787    boolean dupKey = checkKey(cell);
788    if (!dupKey) {
789      checkBlockBoundary();
790    }
791
792    if (!blockWriter.isWriting()) {
793      newBlock();
794    }
795
796    blockWriter.write(cell);
797
798    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
799    totalValueLength += cell.getValueLength();
800    if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) {
801      lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell);
802      keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell);
803    }
804    // Are we the first key in this block?
805    if (firstCellInBlock == null) {
806      // If cell is big, block will be closed and this firstCellInBlock reference will only last
807      // a short while.
808      firstCellInBlock = cell;
809    }
810
811    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
812    lastCell = cell;
813    entryCount++;
814    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
815    int tagsLength = cell.getTagsLength();
816    if (tagsLength > this.maxTagsLength) {
817      this.maxTagsLength = tagsLength;
818    }
819
820    trackTimestamps(cell);
821  }
822
823  @Override
824  public void beforeShipped() throws IOException {
825    this.blockWriter.beforeShipped();
826    // Add clone methods for every cell
827    if (this.lastCell != null) {
828      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
829    }
830    if (this.firstCellInBlock != null) {
831      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
832    }
833    if (this.lastCellOfPreviousBlock != null) {
834      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
835    }
836  }
837
838  public Cell getLastCell() {
839    return lastCell;
840  }
841
842  protected void finishFileInfo() throws IOException {
843    if (lastCell != null) {
844      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
845      // byte buffer. Won't take a tuple.
846      byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
847      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
848    }
849
850    // Average key length.
851    int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
852    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
853    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
854      false);
855
856    // Average value length.
857    int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
858    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
859
860    // Biggest cell.
861    if (keyOfBiggestCell != null) {
862      fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false);
863      fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false);
864      LOG.debug("Len of the biggest cell in {} is {}, key is {}",
865        this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell,
866        CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false));
867    }
868
869    if (hFileContext.isIncludesTags()) {
870      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
871      // from the FileInfo
872      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
873      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
874        && hFileContext.isCompressTags();
875      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
876    }
877  }
878
879  protected int getMajorVersion() {
880    return 3;
881  }
882
883  protected int getMinorVersion() {
884    return HFileReaderImpl.MAX_MINOR_VERSION;
885  }
886
887  protected void finishClose(FixedFileTrailer trailer) throws IOException {
888    // Write out encryption metadata before finalizing if we have a valid crypto context
889    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
890    if (cryptoContext != Encryption.Context.NONE) {
891      // key management is not yet implemented, so kekData is always null
892      // Use traditional encryption with master key
893      String wrapperSubject = cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
894        User.getCurrent().getShortName());
895      Key encKey = cryptoContext.getKey();
896
897      // Wrap the context's key and write it as the encryption metadata
898      if (encKey != null) {
899        byte[] wrappedKey =
900          EncryptionUtil.wrapKey(cryptoContext.getConf(), wrapperSubject, encKey, null);
901        trailer.setEncryptionKey(wrappedKey);
902      }
903
904      // Key management fields - not yet implemented, set to defaults
905      trailer.setKeyNamespace(null);
906      trailer.setKEKMetadata(null);
907      trailer.setKEKChecksum(0);
908    }
909    // Now we can finish the close
910    trailer.setMetaIndexCount(metaNames.size());
911    trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize());
912    trailer.setEntryCount(entryCount);
913    trailer.setCompressionCodec(hFileContext.getCompression());
914
915    long startTime = EnvironmentEdgeManager.currentTime();
916    trailer.serialize(outputStream);
917    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
918
919    if (closeOutputStream) {
920      outputStream.close();
921      outputStream = null;
922    }
923  }
924
925  /**
926   * Add TimestampRange and earliest put timestamp to Metadata
927   */
928  public void appendTrackedTimestampsToMetadata() throws IOException {
929    // TODO: The StoreFileReader always converts the byte[] to TimeRange
930    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
931    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
932    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
933  }
934
935  public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
936    throws IOException {
937    // TODO: The StoreFileReader always converts the byte[] to TimeRange
938    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
939    appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker));
940  }
941
942  /**
943   * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
944   * to include the timestamp of this key
945   */
946  private void trackTimestamps(final ExtendedCell cell) {
947    if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) {
948      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
949    }
950    timeRangeTracker.includeTimestamp(cell);
951  }
952}