001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
021import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
024
025import java.io.DataOutput;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.net.InetSocketAddress;
029import java.nio.ByteBuffer;
030import java.security.Key;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Optional;
034import java.util.function.Supplier;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.hbase.ByteBufferExtendedCell;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellComparator;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.ExtendedCell;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.KeyValueUtil;
048import org.apache.hadoop.hbase.MetaCellComparator;
049import org.apache.hadoop.hbase.PrivateCellUtil;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.crypto.Encryption;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
054import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
055import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
056import org.apache.hadoop.hbase.security.EncryptionUtil;
057import org.apache.hadoop.hbase.security.User;
058import org.apache.hadoop.hbase.util.BloomFilterWriter;
059import org.apache.hadoop.hbase.util.ByteBufferUtils;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.FSUtils;
064import org.apache.hadoop.io.Writable;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * Common functionality needed by all versions of {@link HFile} writers.
071 */
072@InterfaceAudience.Private
073public class HFileWriterImpl implements HFile.Writer {
074  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
075
076  private static final long UNSET = -1;
077
078  /** if this feature is enabled, preCalculate encoded data size before real encoding happens */
079  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
080    "hbase.writer.unified.encoded.blocksize.ratio";
081
082  /** Block size limit after encoding, used to unify encoded block Cache entry size */
083  private final int encodedBlockSizeLimit;
084
085  /** The Cell previously appended. Becomes the last cell in the file. */
086  protected ExtendedCell lastCell = null;
087
088  /** FileSystem stream to write into. */
089  protected FSDataOutputStream outputStream;
090
091  /** True if we opened the <code>outputStream</code> (and so will close it). */
092  protected final boolean closeOutputStream;
093
094  /** A "file info" block: a key-value map of file-wide metadata. */
095  protected HFileInfo fileInfo = new HFileInfo();
096
097  /** Total # of key/value entries, i.e. how many times add() was called. */
098  protected long entryCount = 0;
099
100  /** Used for calculating the average key length. */
101  protected long totalKeyLength = 0;
102
103  /** Used for calculating the average value length. */
104  protected long totalValueLength = 0;
105
106  /** Len of the biggest cell. */
107  protected long lenOfBiggestCell = 0;
108  /** Key of the biggest cell. */
109  protected byte[] keyOfBiggestCell;
110
111  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
112  protected long totalUncompressedBytes = 0;
113
114  /** Meta block names. */
115  protected List<byte[]> metaNames = new ArrayList<>();
116
117  /** {@link Writable}s representing meta block data. */
118  protected List<Writable> metaData = new ArrayList<>();
119
120  /**
121   * First cell in a block. This reference should be short-lived since we write hfiles in a burst.
122   */
123  protected ExtendedCell firstCellInBlock = null;
124
125  /** May be null if we were passed a stream. */
126  protected final Path path;
127
128  protected final Configuration conf;
129
130  /** Cache configuration for caching data on write. */
131  protected final CacheConfig cacheConf;
132
133  public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) {
134    this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
135  }
136
137  private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;
138
139  /**
140   * Name for this object used when logging or in toString. Is either the result of a toString on
141   * stream or else name of passed file Path.
142   */
143  protected final String name;
144
145  /**
146   * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is
147   * no encoding.
148   */
149  protected final HFileDataBlockEncoder blockEncoder;
150
151  protected final HFileIndexBlockEncoder indexBlockEncoder;
152
153  protected final HFileContext hFileContext;
154
155  private int maxTagsLength = 0;
156
157  /** KeyValue version in FileInfo */
158  public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
159
160  /** Version for KeyValue which includes memstore timestamp */
161  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
162
163  /** Inline block writers for multi-level block index and compound Blooms. */
164  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
165
166  /** block writer */
167  protected HFileBlock.Writer blockWriter;
168
169  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
170  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
171
172  /** The offset of the first data block or -1 if the file is empty. */
173  private long firstDataBlockOffset = UNSET;
174
175  /** The offset of the last data block or 0 if the file is empty. */
176  protected long lastDataBlockOffset = UNSET;
177
178  /**
179   * The last(stop) Cell of the previous data block. This reference should be short-lived since we
180   * write hfiles in a burst.
181   */
182  private ExtendedCell lastCellOfPreviousBlock = null;
183
184  /** Additional data items to be written to the "load-on-open" section. */
185  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
186
187  protected long maxMemstoreTS = 0;
188
189  private final TimeRangeTracker timeRangeTracker;
190  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
191
192  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
193    FSDataOutputStream outputStream, HFileContext fileContext) {
194    this.outputStream = outputStream;
195    this.path = path;
196    this.name = path != null ? path.getName() : outputStream.toString();
197    this.hFileContext = fileContext;
198    // TODO: Move this back to upper layer
199    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
200    this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
201    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
202    if (encoding != DataBlockEncoding.NONE) {
203      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
204    } else {
205      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
206    }
207    IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding();
208    if (indexBlockEncoding != IndexBlockEncoding.NONE) {
209      this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding);
210    } else {
211      this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE;
212    }
213    closeOutputStream = path != null;
214    this.cacheConf = cacheConf;
215    this.conf = conf;
216    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
217    this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
218
219    finishInit(conf);
220    if (LOG.isTraceEnabled()) {
221      LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
222        + cacheConf + " fileContext: " + fileContext);
223    }
224  }
225
226  /**
227   * Add to the file info. All added key/value pairs can be obtained using
228   * {@link HFile.Reader#getHFileInfo()}.
229   * @param k Key
230   * @param v Value
231   * @throws IOException in case the key or the value are invalid
232   */
233  @Override
234  public void appendFileInfo(final byte[] k, final byte[] v) throws IOException {
235    fileInfo.append(k, v, true);
236  }
237
238  /**
239   * Sets the file info offset in the trailer, finishes up populating fields in the file info, and
240   * writes the file info into the given data output. The reason the data output is not always
241   * {@link #outputStream} is that we store file info as a block in version 2.
242   * @param trailer fixed file trailer
243   * @param out     the data output to write the file info to
244   */
245  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
246    throws IOException {
247    trailer.setFileInfoOffset(outputStream.getPos());
248    finishFileInfo();
249    long startTime = EnvironmentEdgeManager.currentTime();
250    fileInfo.write(out);
251    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
252  }
253
254  public long getPos() throws IOException {
255    return outputStream.getPos();
256
257  }
258
259  /**
260   * Checks that the given Cell's key does not violate the key order.
261   * @param cell Cell whose key to check.
262   * @return true if the key is duplicate
263   * @throws IOException if the key or the key order is wrong
264   */
265  protected boolean checkKey(final Cell cell) throws IOException {
266    boolean isDuplicateKey = false;
267
268    if (cell == null) {
269      throw new IOException("Key cannot be null or empty");
270    }
271    if (lastCell != null) {
272      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
273        lastCell, cell);
274      if (keyComp > 0) {
275        String message = getLexicalErrorMessage(cell);
276        throw new IOException(message);
277      } else if (keyComp == 0) {
278        isDuplicateKey = true;
279      }
280    }
281    return isDuplicateKey;
282  }
283
284  private String getLexicalErrorMessage(Cell cell) {
285    StringBuilder sb = new StringBuilder();
286    sb.append("Added a key not lexically larger than previous. Current cell = ");
287    sb.append(cell);
288    sb.append(", lastCell = ");
289    sb.append(lastCell);
290    // file context includes HFile path and optionally table and CF of file being written
291    sb.append("fileContext=");
292    sb.append(hFileContext);
293    return sb.toString();
294  }
295
296  /** Checks the given value for validity. */
297  protected void checkValue(final byte[] value, final int offset, final int length)
298    throws IOException {
299    if (value == null) {
300      throw new IOException("Value cannot be null");
301    }
302  }
303
304  /** Returns Path or null if we were passed a stream rather than a Path. */
305  @Override
306  public Path getPath() {
307    return path;
308  }
309
310  @Override
311  public String toString() {
312    return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression="
313      + hFileContext.getCompression().getName();
314  }
315
316  public static Compression.Algorithm compressionByName(String algoName) {
317    if (algoName == null) {
318      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
319    }
320    return Compression.getCompressionAlgorithmByName(algoName);
321  }
322
323  /** A helper method to create HFile output streams in constructors */
324  protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs,
325    Path path, InetSocketAddress[] favoredNodes) throws IOException {
326    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
327    return FSUtils.create(conf, fs, path, perms, favoredNodes);
328  }
329
330  /** Additional initialization steps */
331  protected void finishInit(final Configuration conf) {
332    if (blockWriter != null) {
333      throw new IllegalStateException("finishInit called twice");
334    }
335    blockWriter =
336      new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
337        conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
338    // Data block index writer
339    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
340    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
341      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder);
342    dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
343    dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
344    inlineBlockWriters.add(dataBlockIndexWriter);
345
346    // Meta data block index writer
347    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
348    LOG.trace("Initialized with {}", cacheConf);
349  }
350
351  /**
352   * At a block boundary, write all the inline blocks and opens new block.
353   */
354  protected void checkBlockBoundary() throws IOException {
355    boolean shouldFinishBlock = false;
356    // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0
357    // and we should use the encoding ratio
358    if (encodedBlockSizeLimit > 0) {
359      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit;
360    } else {
361      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
362        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
363    }
364    shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
365    if (shouldFinishBlock) {
366      finishBlock();
367      writeInlineBlocks(false);
368      newBlock();
369    }
370  }
371
372  /** Clean up the data block that is currently being written. */
373  private void finishBlock() throws IOException {
374    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
375      return;
376    }
377
378    // Update the first data block offset if UNSET; used scanning.
379    if (firstDataBlockOffset == UNSET) {
380      firstDataBlockOffset = outputStream.getPos();
381    }
382    // Update the last data block offset each time through here.
383    lastDataBlockOffset = outputStream.getPos();
384    blockWriter.writeHeaderAndData(outputStream);
385    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
386    ExtendedCell indexEntry =
387      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
388    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
389      lastDataBlockOffset, onDiskSize);
390    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
391    if (cacheConf.shouldCacheDataOnWrite()) {
392      doCacheOnWrite(lastDataBlockOffset);
393    }
394  }
395
396  /**
397   * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is
398   * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an
399   * optimization. It does not always work. In this case we'll just return the <code>right</code>
400   * cell.
401   * @return A cell that sorts between <code>left</code> and <code>right</code>.
402   */
403  public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left,
404    final ExtendedCell right) {
405    if (right == null) {
406      throw new IllegalArgumentException("right cell can not be null");
407    }
408    if (left == null) {
409      return right;
410    }
411    // If Cells from meta table, don't mess around. meta table Cells have schema
412    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
413    // out without trying to do this optimization.
414    if (comparator instanceof MetaCellComparator) {
415      return right;
416    }
417    byte[] midRow;
418    boolean bufferBacked =
419      left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell;
420    if (bufferBacked) {
421      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
422        ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
423        ((ByteBufferExtendedCell) right).getRowByteBuffer(),
424        ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
425    } else {
426      midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
427        right.getRowArray(), right.getRowOffset(), right.getRowLength());
428    }
429    if (midRow != null) {
430      return PrivateCellUtil.createFirstOnRow(midRow);
431    }
432    // Rows are same. Compare on families.
433    if (bufferBacked) {
434      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
435        ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
436        ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
437        ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
438    } else {
439      midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
440        left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
441        right.getFamilyLength());
442    }
443    if (midRow != null) {
444      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
445    }
446    // Families are same. Compare on qualifiers.
447    if (bufferBacked) {
448      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
449        ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
450        ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
451        ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
452    } else {
453      midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
454        left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
455        right.getQualifierLength());
456    }
457    if (midRow != null) {
458      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
459    }
460    // No opportunity for optimization. Just return right key.
461    return right;
462  }
463
464  /**
465   * Try to get a byte array that falls between left and right as short as possible with
466   * lexicographical order;
467   * @return Return a new array that is between left and right and minimally sized else just return
468   *         null if left == right.
469   */
470  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
471    final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
472    int minLength = leftLength < rightLength ? leftLength : rightLength;
473    int diffIdx = 0;
474    for (; diffIdx < minLength; diffIdx++) {
475      byte leftByte = leftArray[leftOffset + diffIdx];
476      byte rightByte = rightArray[rightOffset + diffIdx];
477      if ((leftByte & 0xff) > (rightByte & 0xff)) {
478        throw new IllegalArgumentException("Left byte array sorts after right row; left="
479          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
480          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
481      } else if (leftByte != rightByte) {
482        break;
483      }
484    }
485    if (diffIdx == minLength) {
486      if (leftLength > rightLength) {
487        // right is prefix of left
488        throw new IllegalArgumentException("Left byte array sorts after right row; left="
489          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
490          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
491      } else if (leftLength < rightLength) {
492        // left is prefix of right.
493        byte[] minimumMidpointArray = new byte[minLength + 1];
494        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1);
495        minimumMidpointArray[minLength] = 0x00;
496        return minimumMidpointArray;
497      } else {
498        // left == right
499        return null;
500      }
501    }
502    // Note that left[diffIdx] can never be equal to 0xff since left < right
503    byte[] minimumMidpointArray = new byte[diffIdx + 1];
504    System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1);
505    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
506    return minimumMidpointArray;
507  }
508
509  /**
510   * Try to create a new byte array that falls between left and right as short as possible with
511   * lexicographical order.
512   * @return Return a new array that is between left and right and minimally sized else just return
513   *         null if left == right.
514   */
515  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
516    ByteBuffer right, int rightOffset, int rightLength) {
517    int minLength = leftLength < rightLength ? leftLength : rightLength;
518    int diffIdx = 0;
519    for (; diffIdx < minLength; diffIdx++) {
520      int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
521      int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx);
522      if ((leftByte & 0xff) > (rightByte & 0xff)) {
523        throw new IllegalArgumentException("Left byte array sorts after right row; left="
524          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
525          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
526      } else if (leftByte != rightByte) {
527        break;
528      }
529    }
530    if (diffIdx == minLength) {
531      if (leftLength > rightLength) {
532        // right is prefix of left
533        throw new IllegalArgumentException("Left byte array sorts after right row; left="
534          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
535          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
536      } else if (leftLength < rightLength) {
537        // left is prefix of right.
538        byte[] minimumMidpointArray = new byte[minLength + 1];
539        ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0,
540          minLength + 1);
541        minimumMidpointArray[minLength] = 0x00;
542        return minimumMidpointArray;
543      } else {
544        // left == right
545        return null;
546      }
547    }
548    // Note that left[diffIdx] can never be equal to 0xff since left < right
549    byte[] minimumMidpointArray = new byte[diffIdx + 1];
550    ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1);
551    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
552    return minimumMidpointArray;
553  }
554
555  /** Gives inline block writers an opportunity to contribute blocks. */
556  private void writeInlineBlocks(boolean closing) throws IOException {
557    for (InlineBlockWriter ibw : inlineBlockWriters) {
558      while (ibw.shouldWriteBlock(closing)) {
559        long offset = outputStream.getPos();
560        boolean cacheThisBlock = ibw.getCacheOnWrite();
561        ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType()));
562        blockWriter.writeHeaderAndData(outputStream);
563        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
564          blockWriter.getUncompressedSizeWithoutHeader());
565        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
566
567        if (cacheThisBlock) {
568          doCacheOnWrite(offset);
569        }
570      }
571    }
572  }
573
574  /**
575   * Caches the last written HFile block.
576   * @param offset the offset of the block we want to cache. Used to determine the cache key.
577   */
578  private void doCacheOnWrite(long offset) {
579    cacheConf.getBlockCache().ifPresent(cache -> {
580      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
581      BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
582      if (!shouldCacheBlock(cache, key)) {
583        return;
584      }
585      try {
586        cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
587      } finally {
588        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
589        cacheFormatBlock.release();
590      }
591    });
592  }
593
594  private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
595    if (path != null) {
596      return new BlockCacheKey(path, offset, true, blockType);
597    }
598    return new BlockCacheKey(name, offset, true, blockType);
599  }
600
601  private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
602    Optional<Boolean> result =
603      cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf);
604    return result.orElse(true);
605  }
606
607  /**
608   * Ready a new block for writing.
609   */
610  protected void newBlock() throws IOException {
611    // This is where the next block begins.
612    blockWriter.startWriting(BlockType.DATA);
613    firstCellInBlock = null;
614    if (lastCell != null) {
615      lastCellOfPreviousBlock = lastCell;
616    }
617  }
618
619  /**
620   * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive.
621   * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance.
622   * If metadata is small, consider adding to file info using
623   * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data
624   * later (DO NOT REUSE)
625   */
626  @Override
627  public void appendMetaBlock(String metaBlockName, Writable content) {
628    byte[] key = Bytes.toBytes(metaBlockName);
629    int i;
630    for (i = 0; i < metaNames.size(); ++i) {
631      // stop when the current key is greater than our own
632      byte[] cur = metaNames.get(i);
633      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) {
634        break;
635      }
636    }
637    metaNames.add(i, key);
638    metaData.add(i, content);
639  }
640
641  @Override
642  public void close() throws IOException {
643    if (outputStream == null) {
644      return;
645    }
646    // Save data block encoder metadata in the file info.
647    blockEncoder.saveMetadata(this);
648    // Save index block encoder metadata in the file info.
649    indexBlockEncoder.saveMetadata(this);
650    // Write out the end of the data blocks, then write meta data blocks.
651    // followed by fileinfo, data block index and meta block index.
652
653    finishBlock();
654    writeInlineBlocks(true);
655
656    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
657
658    // Write out the metadata blocks if any.
659    if (!metaNames.isEmpty()) {
660      for (int i = 0; i < metaNames.size(); ++i) {
661        // store the beginning offset
662        long offset = outputStream.getPos();
663        // write the metadata content
664        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
665        metaData.get(i).write(dos);
666
667        blockWriter.writeHeaderAndData(outputStream);
668        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
669
670        // Add the new meta block to the meta index.
671        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
672          blockWriter.getOnDiskSizeWithHeader());
673      }
674    }
675
676    // Load-on-open section.
677
678    // Data block index.
679    //
680    // In version 2, this section of the file starts with the root level data
681    // block index. We call a function that writes intermediate-level blocks
682    // first, then root level, and returns the offset of the root level block
683    // index.
684
685    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
686    trailer.setLoadOnOpenOffset(rootIndexOffset);
687
688    // Meta block index.
689    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX),
690      "meta");
691    blockWriter.writeHeaderAndData(outputStream);
692    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
693
694    if (this.hFileContext.isIncludesMvcc()) {
695      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
696      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
697    }
698
699    // File info
700    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
701    blockWriter.writeHeaderAndData(outputStream);
702    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
703
704    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
705    for (BlockWritable w : additionalLoadOnOpenData) {
706      blockWriter.writeBlock(w, outputStream);
707      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
708    }
709
710    // Now finish off the trailer.
711    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
712    trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize());
713    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
714    trailer.setLastDataBlockOffset(lastDataBlockOffset);
715    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
716    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
717
718    finishClose(trailer);
719
720    blockWriter.release();
721  }
722
723  @Override
724  public void addInlineBlockWriter(InlineBlockWriter ibw) {
725    inlineBlockWriters.add(ibw);
726  }
727
728  @Override
729  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
730    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
731  }
732
733  @Override
734  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
735    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
736  }
737
738  private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) {
739    if (bfw.getKeyCount() <= 0) {
740      return;
741    }
742
743    if (
744      blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META
745    ) {
746      throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported");
747    }
748    additionalLoadOnOpenData.add(new BlockWritable() {
749      @Override
750      public BlockType getBlockType() {
751        return blockType;
752      }
753
754      @Override
755      public void writeToBlock(DataOutput out) throws IOException {
756        bfw.getMetaWriter().write(out);
757        Writable dataWriter = bfw.getDataWriter();
758        if (dataWriter != null) {
759          dataWriter.write(out);
760        }
761      }
762    });
763  }
764
765  @Override
766  public HFileContext getFileContext() {
767    return hFileContext;
768  }
769
770  /**
771   * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on
772   * construction. Cell to add. Cannot be empty nor null.
773   */
774  @Override
775  public void append(final ExtendedCell cell) throws IOException {
776    // checkKey uses comparator to check we are writing in order.
777    boolean dupKey = checkKey(cell);
778    if (!dupKey) {
779      checkBlockBoundary();
780    }
781
782    if (!blockWriter.isWriting()) {
783      newBlock();
784    }
785
786    blockWriter.write(cell);
787
788    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
789    totalValueLength += cell.getValueLength();
790    if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) {
791      lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell);
792      keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell);
793    }
794    // Are we the first key in this block?
795    if (firstCellInBlock == null) {
796      // If cell is big, block will be closed and this firstCellInBlock reference will only last
797      // a short while.
798      firstCellInBlock = cell;
799    }
800
801    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
802    lastCell = cell;
803    entryCount++;
804    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
805    int tagsLength = cell.getTagsLength();
806    if (tagsLength > this.maxTagsLength) {
807      this.maxTagsLength = tagsLength;
808    }
809
810    trackTimestamps(cell);
811  }
812
813  @Override
814  public void beforeShipped() throws IOException {
815    this.blockWriter.beforeShipped();
816    // Add clone methods for every cell
817    if (this.lastCell != null) {
818      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
819    }
820    if (this.firstCellInBlock != null) {
821      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
822    }
823    if (this.lastCellOfPreviousBlock != null) {
824      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
825    }
826  }
827
828  public Cell getLastCell() {
829    return lastCell;
830  }
831
832  protected void finishFileInfo() throws IOException {
833    if (lastCell != null) {
834      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
835      // byte buffer. Won't take a tuple.
836      byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
837      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
838    }
839
840    // Average key length.
841    int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
842    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
843    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
844      false);
845
846    // Average value length.
847    int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
848    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
849
850    // Biggest cell.
851    if (keyOfBiggestCell != null) {
852      fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false);
853      fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false);
854      LOG.debug("Len of the biggest cell in {} is {}, key is {}",
855        this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell,
856        CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false));
857    }
858
859    if (hFileContext.isIncludesTags()) {
860      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
861      // from the FileInfo
862      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
863      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
864        && hFileContext.isCompressTags();
865      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
866    }
867  }
868
869  protected int getMajorVersion() {
870    return 3;
871  }
872
873  protected int getMinorVersion() {
874    return HFileReaderImpl.MAX_MINOR_VERSION;
875  }
876
877  protected void finishClose(FixedFileTrailer trailer) throws IOException {
878    // Write out encryption metadata before finalizing if we have a valid crypto context
879    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
880    if (cryptoContext != Encryption.Context.NONE) {
881      // key management is not yet implemented, so kekData is always null
882      // Use traditional encryption with master key
883      String wrapperSubject = cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
884        User.getCurrent().getShortName());
885      Key encKey = cryptoContext.getKey();
886
887      // Wrap the context's key and write it as the encryption metadata
888      if (encKey != null) {
889        byte[] wrappedKey =
890          EncryptionUtil.wrapKey(cryptoContext.getConf(), wrapperSubject, encKey, null);
891        trailer.setEncryptionKey(wrappedKey);
892      }
893
894      // Key management fields - not yet implemented, set to defaults
895      trailer.setKeyNamespace(null);
896      trailer.setKEKMetadata(null);
897      trailer.setKEKChecksum(0);
898    }
899    // Now we can finish the close
900    trailer.setMetaIndexCount(metaNames.size());
901    trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize());
902    trailer.setEntryCount(entryCount);
903    trailer.setCompressionCodec(hFileContext.getCompression());
904
905    long startTime = EnvironmentEdgeManager.currentTime();
906    trailer.serialize(outputStream);
907    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
908
909    if (closeOutputStream) {
910      outputStream.close();
911      outputStream = null;
912    }
913  }
914
915  /**
916   * Add TimestampRange and earliest put timestamp to Metadata
917   */
918  public void appendTrackedTimestampsToMetadata() throws IOException {
919    // TODO: The StoreFileReader always converts the byte[] to TimeRange
920    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
921    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
922    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
923  }
924
925  public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
926    throws IOException {
927    // TODO: The StoreFileReader always converts the byte[] to TimeRange
928    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
929    appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker));
930  }
931
932  /**
933   * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
934   * to include the timestamp of this key
935   */
936  private void trackTimestamps(final ExtendedCell cell) {
937    if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) {
938      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
939    }
940    timeRangeTracker.includeTimestamp(cell);
941  }
942}