001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
021import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
024
025import java.io.DataOutput;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.net.InetSocketAddress;
029import java.nio.ByteBuffer;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Optional;
033import java.util.function.Supplier;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.permission.FsPermission;
039import org.apache.hadoop.hbase.ByteBufferExtendedCell;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellComparator;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.ExtendedCell;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.KeyValueUtil;
047import org.apache.hadoop.hbase.MetaCellComparator;
048import org.apache.hadoop.hbase.PrivateCellUtil;
049import org.apache.hadoop.hbase.io.compress.Compression;
050import org.apache.hadoop.hbase.io.crypto.Encryption;
051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
052import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
053import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
054import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
055import org.apache.hadoop.hbase.security.EncryptionUtil;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.util.BloomFilterWriter;
058import org.apache.hadoop.hbase.util.ByteBufferUtils;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.FSUtils;
063import org.apache.hadoop.io.Writable;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * Common functionality needed by all versions of {@link HFile} writers.
070 */
071@InterfaceAudience.Private
072public class HFileWriterImpl implements HFile.Writer {
073  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
074
075  private static final long UNSET = -1;
076
077  /** if this feature is enabled, preCalculate encoded data size before real encoding happens */
078  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
079    "hbase.writer.unified.encoded.blocksize.ratio";
080
081  /** Block size limit after encoding, used to unify encoded block Cache entry size */
082  private final int encodedBlockSizeLimit;
083
084  /** The Cell previously appended. Becomes the last cell in the file. */
085  protected ExtendedCell lastCell = null;
086
087  /** FileSystem stream to write into. */
088  protected FSDataOutputStream outputStream;
089
090  /** True if we opened the <code>outputStream</code> (and so will close it). */
091  protected final boolean closeOutputStream;
092
093  /** A "file info" block: a key-value map of file-wide metadata. */
094  protected HFileInfo fileInfo = new HFileInfo();
095
096  /** Total # of key/value entries, i.e. how many times add() was called. */
097  protected long entryCount = 0;
098
099  /** Used for calculating the average key length. */
100  protected long totalKeyLength = 0;
101
102  /** Used for calculating the average value length. */
103  protected long totalValueLength = 0;
104
105  /** Len of the biggest cell. */
106  protected long lenOfBiggestCell = 0;
107  /** Key of the biggest cell. */
108  protected byte[] keyOfBiggestCell;
109
110  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
111  protected long totalUncompressedBytes = 0;
112
113  /** Meta block names. */
114  protected List<byte[]> metaNames = new ArrayList<>();
115
116  /** {@link Writable}s representing meta block data. */
117  protected List<Writable> metaData = new ArrayList<>();
118
119  /**
120   * First cell in a block. This reference should be short-lived since we write hfiles in a burst.
121   */
122  protected ExtendedCell firstCellInBlock = null;
123
124  /** May be null if we were passed a stream. */
125  protected final Path path;
126
127  protected final Configuration conf;
128
129  /** Cache configuration for caching data on write. */
130  protected final CacheConfig cacheConf;
131
132  public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) {
133    this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
134  }
135
136  private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;
137
138  /**
139   * Name for this object used when logging or in toString. Is either the result of a toString on
140   * stream or else name of passed file Path.
141   */
142  protected final String name;
143
144  /**
145   * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is
146   * no encoding.
147   */
148  protected final HFileDataBlockEncoder blockEncoder;
149
150  protected final HFileIndexBlockEncoder indexBlockEncoder;
151
152  protected final HFileContext hFileContext;
153
154  private int maxTagsLength = 0;
155
156  /** KeyValue version in FileInfo */
157  public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
158
159  /** Version for KeyValue which includes memstore timestamp */
160  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
161
162  /** Inline block writers for multi-level block index and compound Blooms. */
163  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
164
165  /** block writer */
166  protected HFileBlock.Writer blockWriter;
167
168  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
169  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
170
171  /** The offset of the first data block or -1 if the file is empty. */
172  private long firstDataBlockOffset = UNSET;
173
174  /** The offset of the last data block or 0 if the file is empty. */
175  protected long lastDataBlockOffset = UNSET;
176
177  /**
178   * The last(stop) Cell of the previous data block. This reference should be short-lived since we
179   * write hfiles in a burst.
180   */
181  private ExtendedCell lastCellOfPreviousBlock = null;
182
183  /** Additional data items to be written to the "load-on-open" section. */
184  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
185
186  protected long maxMemstoreTS = 0;
187
188  private final TimeRangeTracker timeRangeTracker;
189  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
190
191  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
192    FSDataOutputStream outputStream, HFileContext fileContext) {
193    this.outputStream = outputStream;
194    this.path = path;
195    this.name = path != null ? path.getName() : outputStream.toString();
196    this.hFileContext = fileContext;
197    // TODO: Move this back to upper layer
198    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
199    this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
200    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
201    if (encoding != DataBlockEncoding.NONE) {
202      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
203    } else {
204      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
205    }
206    IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding();
207    if (indexBlockEncoding != IndexBlockEncoding.NONE) {
208      this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding);
209    } else {
210      this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE;
211    }
212    closeOutputStream = path != null;
213    this.cacheConf = cacheConf;
214    this.conf = conf;
215    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
216    this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
217
218    finishInit(conf);
219    if (LOG.isTraceEnabled()) {
220      LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
221        + cacheConf + " fileContext: " + fileContext);
222    }
223  }
224
225  /**
226   * Add to the file info. All added key/value pairs can be obtained using
227   * {@link HFile.Reader#getHFileInfo()}.
228   * @param k Key
229   * @param v Value
230   * @throws IOException in case the key or the value are invalid
231   */
232  @Override
233  public void appendFileInfo(final byte[] k, final byte[] v) throws IOException {
234    fileInfo.append(k, v, true);
235  }
236
237  /**
238   * Sets the file info offset in the trailer, finishes up populating fields in the file info, and
239   * writes the file info into the given data output. The reason the data output is not always
240   * {@link #outputStream} is that we store file info as a block in version 2.
241   * @param trailer fixed file trailer
242   * @param out     the data output to write the file info to
243   */
244  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
245    throws IOException {
246    trailer.setFileInfoOffset(outputStream.getPos());
247    finishFileInfo();
248    long startTime = EnvironmentEdgeManager.currentTime();
249    fileInfo.write(out);
250    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
251  }
252
253  public long getPos() throws IOException {
254    return outputStream.getPos();
255
256  }
257
258  /**
259   * Checks that the given Cell's key does not violate the key order.
260   * @param cell Cell whose key to check.
261   * @return true if the key is duplicate
262   * @throws IOException if the key or the key order is wrong
263   */
264  protected boolean checkKey(final Cell cell) throws IOException {
265    boolean isDuplicateKey = false;
266
267    if (cell == null) {
268      throw new IOException("Key cannot be null or empty");
269    }
270    if (lastCell != null) {
271      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
272        lastCell, cell);
273      if (keyComp > 0) {
274        String message = getLexicalErrorMessage(cell);
275        throw new IOException(message);
276      } else if (keyComp == 0) {
277        isDuplicateKey = true;
278      }
279    }
280    return isDuplicateKey;
281  }
282
283  private String getLexicalErrorMessage(Cell cell) {
284    StringBuilder sb = new StringBuilder();
285    sb.append("Added a key not lexically larger than previous. Current cell = ");
286    sb.append(cell);
287    sb.append(", lastCell = ");
288    sb.append(lastCell);
289    // file context includes HFile path and optionally table and CF of file being written
290    sb.append("fileContext=");
291    sb.append(hFileContext);
292    return sb.toString();
293  }
294
295  /** Checks the given value for validity. */
296  protected void checkValue(final byte[] value, final int offset, final int length)
297    throws IOException {
298    if (value == null) {
299      throw new IOException("Value cannot be null");
300    }
301  }
302
303  /** Returns Path or null if we were passed a stream rather than a Path. */
304  @Override
305  public Path getPath() {
306    return path;
307  }
308
309  @Override
310  public String toString() {
311    return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression="
312      + hFileContext.getCompression().getName();
313  }
314
315  public static Compression.Algorithm compressionByName(String algoName) {
316    if (algoName == null) {
317      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
318    }
319    return Compression.getCompressionAlgorithmByName(algoName);
320  }
321
322  /** A helper method to create HFile output streams in constructors */
323  protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs,
324    Path path, InetSocketAddress[] favoredNodes) throws IOException {
325    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
326    return FSUtils.create(conf, fs, path, perms, favoredNodes);
327  }
328
329  /** Additional initialization steps */
330  protected void finishInit(final Configuration conf) {
331    if (blockWriter != null) {
332      throw new IllegalStateException("finishInit called twice");
333    }
334    blockWriter =
335      new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
336        conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
337    // Data block index writer
338    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
339    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
340      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder);
341    dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
342    dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
343    inlineBlockWriters.add(dataBlockIndexWriter);
344
345    // Meta data block index writer
346    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
347    LOG.trace("Initialized with {}", cacheConf);
348  }
349
350  /**
351   * At a block boundary, write all the inline blocks and opens new block.
352   */
353  protected void checkBlockBoundary() throws IOException {
354    boolean shouldFinishBlock = false;
355    // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0
356    // and we should use the encoding ratio
357    if (encodedBlockSizeLimit > 0) {
358      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit;
359    } else {
360      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
361        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
362    }
363    shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
364    if (shouldFinishBlock) {
365      finishBlock();
366      writeInlineBlocks(false);
367      newBlock();
368    }
369  }
370
371  /** Clean up the data block that is currently being written. */
372  private void finishBlock() throws IOException {
373    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
374      return;
375    }
376
377    // Update the first data block offset if UNSET; used scanning.
378    if (firstDataBlockOffset == UNSET) {
379      firstDataBlockOffset = outputStream.getPos();
380    }
381    // Update the last data block offset each time through here.
382    lastDataBlockOffset = outputStream.getPos();
383    blockWriter.writeHeaderAndData(outputStream);
384    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
385    ExtendedCell indexEntry =
386      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
387    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
388      lastDataBlockOffset, onDiskSize);
389    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
390    if (cacheConf.shouldCacheDataOnWrite()) {
391      doCacheOnWrite(lastDataBlockOffset);
392    }
393  }
394
395  /**
396   * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is
397   * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an
398   * optimization. It does not always work. In this case we'll just return the <code>right</code>
399   * cell.
400   * @return A cell that sorts between <code>left</code> and <code>right</code>.
401   */
402  public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left,
403    final ExtendedCell right) {
404    if (right == null) {
405      throw new IllegalArgumentException("right cell can not be null");
406    }
407    if (left == null) {
408      return right;
409    }
410    // If Cells from meta table, don't mess around. meta table Cells have schema
411    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
412    // out without trying to do this optimization.
413    if (comparator instanceof MetaCellComparator) {
414      return right;
415    }
416    byte[] midRow;
417    boolean bufferBacked =
418      left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell;
419    if (bufferBacked) {
420      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
421        ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
422        ((ByteBufferExtendedCell) right).getRowByteBuffer(),
423        ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
424    } else {
425      midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
426        right.getRowArray(), right.getRowOffset(), right.getRowLength());
427    }
428    if (midRow != null) {
429      return PrivateCellUtil.createFirstOnRow(midRow);
430    }
431    // Rows are same. Compare on families.
432    if (bufferBacked) {
433      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
434        ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
435        ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
436        ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
437    } else {
438      midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
439        left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
440        right.getFamilyLength());
441    }
442    if (midRow != null) {
443      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
444    }
445    // Families are same. Compare on qualifiers.
446    if (bufferBacked) {
447      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
448        ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
449        ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
450        ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
451    } else {
452      midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
453        left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
454        right.getQualifierLength());
455    }
456    if (midRow != null) {
457      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
458    }
459    // No opportunity for optimization. Just return right key.
460    return right;
461  }
462
463  /**
464   * Try to get a byte array that falls between left and right as short as possible with
465   * lexicographical order;
466   * @return Return a new array that is between left and right and minimally sized else just return
467   *         null if left == right.
468   */
469  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
470    final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
471    int minLength = leftLength < rightLength ? leftLength : rightLength;
472    int diffIdx = 0;
473    for (; diffIdx < minLength; diffIdx++) {
474      byte leftByte = leftArray[leftOffset + diffIdx];
475      byte rightByte = rightArray[rightOffset + diffIdx];
476      if ((leftByte & 0xff) > (rightByte & 0xff)) {
477        throw new IllegalArgumentException("Left byte array sorts after right row; left="
478          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
479          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
480      } else if (leftByte != rightByte) {
481        break;
482      }
483    }
484    if (diffIdx == minLength) {
485      if (leftLength > rightLength) {
486        // right is prefix of left
487        throw new IllegalArgumentException("Left byte array sorts after right row; left="
488          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
489          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
490      } else if (leftLength < rightLength) {
491        // left is prefix of right.
492        byte[] minimumMidpointArray = new byte[minLength + 1];
493        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1);
494        minimumMidpointArray[minLength] = 0x00;
495        return minimumMidpointArray;
496      } else {
497        // left == right
498        return null;
499      }
500    }
501    // Note that left[diffIdx] can never be equal to 0xff since left < right
502    byte[] minimumMidpointArray = new byte[diffIdx + 1];
503    System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1);
504    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
505    return minimumMidpointArray;
506  }
507
508  /**
509   * Try to create a new byte array that falls between left and right as short as possible with
510   * lexicographical order.
511   * @return Return a new array that is between left and right and minimally sized else just return
512   *         null if left == right.
513   */
514  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
515    ByteBuffer right, int rightOffset, int rightLength) {
516    int minLength = leftLength < rightLength ? leftLength : rightLength;
517    int diffIdx = 0;
518    for (; diffIdx < minLength; diffIdx++) {
519      int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
520      int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx);
521      if ((leftByte & 0xff) > (rightByte & 0xff)) {
522        throw new IllegalArgumentException("Left byte array sorts after right row; left="
523          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
524          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
525      } else if (leftByte != rightByte) {
526        break;
527      }
528    }
529    if (diffIdx == minLength) {
530      if (leftLength > rightLength) {
531        // right is prefix of left
532        throw new IllegalArgumentException("Left byte array sorts after right row; left="
533          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
534          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
535      } else if (leftLength < rightLength) {
536        // left is prefix of right.
537        byte[] minimumMidpointArray = new byte[minLength + 1];
538        ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0,
539          minLength + 1);
540        minimumMidpointArray[minLength] = 0x00;
541        return minimumMidpointArray;
542      } else {
543        // left == right
544        return null;
545      }
546    }
547    // Note that left[diffIdx] can never be equal to 0xff since left < right
548    byte[] minimumMidpointArray = new byte[diffIdx + 1];
549    ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1);
550    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
551    return minimumMidpointArray;
552  }
553
554  /** Gives inline block writers an opportunity to contribute blocks. */
555  private void writeInlineBlocks(boolean closing) throws IOException {
556    for (InlineBlockWriter ibw : inlineBlockWriters) {
557      while (ibw.shouldWriteBlock(closing)) {
558        long offset = outputStream.getPos();
559        boolean cacheThisBlock = ibw.getCacheOnWrite();
560        ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType()));
561        blockWriter.writeHeaderAndData(outputStream);
562        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
563          blockWriter.getUncompressedSizeWithoutHeader());
564        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
565
566        if (cacheThisBlock) {
567          doCacheOnWrite(offset);
568        }
569      }
570    }
571  }
572
573  /**
574   * Caches the last written HFile block.
575   * @param offset the offset of the block we want to cache. Used to determine the cache key.
576   */
577  private void doCacheOnWrite(long offset) {
578    cacheConf.getBlockCache().ifPresent(cache -> {
579      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
580      BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
581      if (!shouldCacheBlock(cache, key)) {
582        return;
583      }
584      try {
585        cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
586      } finally {
587        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
588        cacheFormatBlock.release();
589      }
590    });
591  }
592
593  private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
594    if (path != null) {
595      return new BlockCacheKey(path, offset, true, blockType);
596    }
597    return new BlockCacheKey(name, offset, true, blockType);
598  }
599
600  private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
601    Optional<Boolean> result =
602      cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf);
603    return result.orElse(true);
604  }
605
606  /**
607   * Ready a new block for writing.
608   */
609  protected void newBlock() throws IOException {
610    // This is where the next block begins.
611    blockWriter.startWriting(BlockType.DATA);
612    firstCellInBlock = null;
613    if (lastCell != null) {
614      lastCellOfPreviousBlock = lastCell;
615    }
616  }
617
618  /**
619   * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive.
620   * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance.
621   * If metadata is small, consider adding to file info using
622   * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data
623   * later (DO NOT REUSE)
624   */
625  @Override
626  public void appendMetaBlock(String metaBlockName, Writable content) {
627    byte[] key = Bytes.toBytes(metaBlockName);
628    int i;
629    for (i = 0; i < metaNames.size(); ++i) {
630      // stop when the current key is greater than our own
631      byte[] cur = metaNames.get(i);
632      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) {
633        break;
634      }
635    }
636    metaNames.add(i, key);
637    metaData.add(i, content);
638  }
639
640  @Override
641  public void close() throws IOException {
642    if (outputStream == null) {
643      return;
644    }
645    // Save data block encoder metadata in the file info.
646    blockEncoder.saveMetadata(this);
647    // Save index block encoder metadata in the file info.
648    indexBlockEncoder.saveMetadata(this);
649    // Write out the end of the data blocks, then write meta data blocks.
650    // followed by fileinfo, data block index and meta block index.
651
652    finishBlock();
653    writeInlineBlocks(true);
654
655    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
656
657    // Write out the metadata blocks if any.
658    if (!metaNames.isEmpty()) {
659      for (int i = 0; i < metaNames.size(); ++i) {
660        // store the beginning offset
661        long offset = outputStream.getPos();
662        // write the metadata content
663        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
664        metaData.get(i).write(dos);
665
666        blockWriter.writeHeaderAndData(outputStream);
667        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
668
669        // Add the new meta block to the meta index.
670        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
671          blockWriter.getOnDiskSizeWithHeader());
672      }
673    }
674
675    // Load-on-open section.
676
677    // Data block index.
678    //
679    // In version 2, this section of the file starts with the root level data
680    // block index. We call a function that writes intermediate-level blocks
681    // first, then root level, and returns the offset of the root level block
682    // index.
683
684    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
685    trailer.setLoadOnOpenOffset(rootIndexOffset);
686
687    // Meta block index.
688    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX),
689      "meta");
690    blockWriter.writeHeaderAndData(outputStream);
691    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
692
693    if (this.hFileContext.isIncludesMvcc()) {
694      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
695      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
696    }
697
698    // File info
699    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
700    blockWriter.writeHeaderAndData(outputStream);
701    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
702
703    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
704    for (BlockWritable w : additionalLoadOnOpenData) {
705      blockWriter.writeBlock(w, outputStream);
706      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
707    }
708
709    // Now finish off the trailer.
710    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
711    trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize());
712    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
713    trailer.setLastDataBlockOffset(lastDataBlockOffset);
714    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
715    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
716
717    finishClose(trailer);
718
719    blockWriter.release();
720  }
721
722  @Override
723  public void addInlineBlockWriter(InlineBlockWriter ibw) {
724    inlineBlockWriters.add(ibw);
725  }
726
727  @Override
728  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
729    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
730  }
731
732  @Override
733  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
734    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
735  }
736
737  private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) {
738    if (bfw.getKeyCount() <= 0) {
739      return;
740    }
741
742    if (
743      blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META
744    ) {
745      throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported");
746    }
747    additionalLoadOnOpenData.add(new BlockWritable() {
748      @Override
749      public BlockType getBlockType() {
750        return blockType;
751      }
752
753      @Override
754      public void writeToBlock(DataOutput out) throws IOException {
755        bfw.getMetaWriter().write(out);
756        Writable dataWriter = bfw.getDataWriter();
757        if (dataWriter != null) {
758          dataWriter.write(out);
759        }
760      }
761    });
762  }
763
764  @Override
765  public HFileContext getFileContext() {
766    return hFileContext;
767  }
768
769  /**
770   * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on
771   * construction. Cell to add. Cannot be empty nor null.
772   */
773  @Override
774  public void append(final ExtendedCell cell) throws IOException {
775    // checkKey uses comparator to check we are writing in order.
776    boolean dupKey = checkKey(cell);
777    if (!dupKey) {
778      checkBlockBoundary();
779    }
780
781    if (!blockWriter.isWriting()) {
782      newBlock();
783    }
784
785    blockWriter.write(cell);
786
787    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
788    totalValueLength += cell.getValueLength();
789    if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) {
790      lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell);
791      keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell);
792    }
793    // Are we the first key in this block?
794    if (firstCellInBlock == null) {
795      // If cell is big, block will be closed and this firstCellInBlock reference will only last
796      // a short while.
797      firstCellInBlock = cell;
798    }
799
800    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
801    lastCell = cell;
802    entryCount++;
803    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
804    int tagsLength = cell.getTagsLength();
805    if (tagsLength > this.maxTagsLength) {
806      this.maxTagsLength = tagsLength;
807    }
808
809    trackTimestamps(cell);
810  }
811
812  @Override
813  public void beforeShipped() throws IOException {
814    this.blockWriter.beforeShipped();
815    // Add clone methods for every cell
816    if (this.lastCell != null) {
817      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
818    }
819    if (this.firstCellInBlock != null) {
820      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
821    }
822    if (this.lastCellOfPreviousBlock != null) {
823      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
824    }
825  }
826
827  public Cell getLastCell() {
828    return lastCell;
829  }
830
831  protected void finishFileInfo() throws IOException {
832    if (lastCell != null) {
833      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
834      // byte buffer. Won't take a tuple.
835      byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
836      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
837    }
838
839    // Average key length.
840    int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
841    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
842    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
843      false);
844
845    // Average value length.
846    int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
847    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
848
849    // Biggest cell.
850    if (keyOfBiggestCell != null) {
851      fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false);
852      fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false);
853      LOG.debug("Len of the biggest cell in {} is {}, key is {}",
854        this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell,
855        CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false));
856    }
857
858    if (hFileContext.isIncludesTags()) {
859      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
860      // from the FileInfo
861      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
862      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
863        && hFileContext.isCompressTags();
864      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
865    }
866  }
867
868  protected int getMajorVersion() {
869    return 3;
870  }
871
872  protected int getMinorVersion() {
873    return HFileReaderImpl.MAX_MINOR_VERSION;
874  }
875
876  protected void finishClose(FixedFileTrailer trailer) throws IOException {
877    // Write out encryption metadata before finalizing if we have a valid crypto context
878    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
879    if (cryptoContext != Encryption.Context.NONE) {
880      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
881      // all information needed for decryption
882      trailer.setEncryptionKey(EncryptionUtil.wrapKey(
883        cryptoContext.getConf(), cryptoContext.getConf()
884          .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()),
885        cryptoContext.getKey()));
886    }
887    // Now we can finish the close
888    trailer.setMetaIndexCount(metaNames.size());
889    trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize());
890    trailer.setEntryCount(entryCount);
891    trailer.setCompressionCodec(hFileContext.getCompression());
892
893    long startTime = EnvironmentEdgeManager.currentTime();
894    trailer.serialize(outputStream);
895    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
896
897    if (closeOutputStream) {
898      outputStream.close();
899      outputStream = null;
900    }
901  }
902
903  /**
904   * Add TimestampRange and earliest put timestamp to Metadata
905   */
906  public void appendTrackedTimestampsToMetadata() throws IOException {
907    // TODO: The StoreFileReader always converts the byte[] to TimeRange
908    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
909    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
910    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
911  }
912
913  public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
914    throws IOException {
915    // TODO: The StoreFileReader always converts the byte[] to TimeRange
916    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
917    appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker));
918  }
919
920  /**
921   * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
922   * to include the timestamp of this key
923   */
924  private void trackTimestamps(final ExtendedCell cell) {
925    if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) {
926      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
927    }
928    timeRangeTracker.includeTimestamp(cell);
929  }
930}