001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; 021import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 024 025import java.io.DataOutput; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.nio.ByteBuffer; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Optional; 033import java.util.function.Supplier; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.permission.FsPermission; 039import org.apache.hadoop.hbase.ByteBufferExtendedCell; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellComparator; 042import org.apache.hadoop.hbase.CellUtil; 043import org.apache.hadoop.hbase.ExtendedCell; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.KeyValueUtil; 047import org.apache.hadoop.hbase.MetaCellComparator; 048import org.apache.hadoop.hbase.PrivateCellUtil; 049import org.apache.hadoop.hbase.io.compress.Compression; 050import org.apache.hadoop.hbase.io.crypto.Encryption; 051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 052import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; 053import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 054import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 055import org.apache.hadoop.hbase.security.EncryptionUtil; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.util.BloomFilterWriter; 058import org.apache.hadoop.hbase.util.ByteBufferUtils; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.CommonFSUtils; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.FSUtils; 063import org.apache.hadoop.io.Writable; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * Common functionality needed by all versions of {@link HFile} writers. 070 */ 071@InterfaceAudience.Private 072public class HFileWriterImpl implements HFile.Writer { 073 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 074 075 private static final long UNSET = -1; 076 077 /** if this feature is enabled, preCalculate encoded data size before real encoding happens */ 078 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = 079 "hbase.writer.unified.encoded.blocksize.ratio"; 080 081 /** Block size limit after encoding, used to unify encoded block Cache entry size */ 082 private final int encodedBlockSizeLimit; 083 084 /** The Cell previously appended. Becomes the last cell in the file. */ 085 protected ExtendedCell lastCell = null; 086 087 /** FileSystem stream to write into. */ 088 protected FSDataOutputStream outputStream; 089 090 /** True if we opened the <code>outputStream</code> (and so will close it). */ 091 protected final boolean closeOutputStream; 092 093 /** A "file info" block: a key-value map of file-wide metadata. */ 094 protected HFileInfo fileInfo = new HFileInfo(); 095 096 /** Total # of key/value entries, i.e. how many times add() was called. */ 097 protected long entryCount = 0; 098 099 /** Used for calculating the average key length. */ 100 protected long totalKeyLength = 0; 101 102 /** Used for calculating the average value length. */ 103 protected long totalValueLength = 0; 104 105 /** Len of the biggest cell. */ 106 protected long lenOfBiggestCell = 0; 107 /** Key of the biggest cell. */ 108 protected byte[] keyOfBiggestCell; 109 110 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 111 protected long totalUncompressedBytes = 0; 112 113 /** Meta block names. */ 114 protected List<byte[]> metaNames = new ArrayList<>(); 115 116 /** {@link Writable}s representing meta block data. */ 117 protected List<Writable> metaData = new ArrayList<>(); 118 119 /** 120 * First cell in a block. This reference should be short-lived since we write hfiles in a burst. 121 */ 122 protected ExtendedCell firstCellInBlock = null; 123 124 /** May be null if we were passed a stream. */ 125 protected final Path path; 126 127 protected final Configuration conf; 128 129 /** Cache configuration for caching data on write. */ 130 protected final CacheConfig cacheConf; 131 132 public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) { 133 this.timeRangeTrackerForTiering = timeRangeTrackerForTiering; 134 } 135 136 private Supplier<TimeRangeTracker> timeRangeTrackerForTiering; 137 138 /** 139 * Name for this object used when logging or in toString. Is either the result of a toString on 140 * stream or else name of passed file Path. 141 */ 142 protected final String name; 143 144 /** 145 * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is 146 * no encoding. 147 */ 148 protected final HFileDataBlockEncoder blockEncoder; 149 150 protected final HFileIndexBlockEncoder indexBlockEncoder; 151 152 protected final HFileContext hFileContext; 153 154 private int maxTagsLength = 0; 155 156 /** KeyValue version in FileInfo */ 157 public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 158 159 /** Version for KeyValue which includes memstore timestamp */ 160 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 161 162 /** Inline block writers for multi-level block index and compound Blooms. */ 163 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 164 165 /** block writer */ 166 protected HFileBlock.Writer blockWriter; 167 168 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 169 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 170 171 /** The offset of the first data block or -1 if the file is empty. */ 172 private long firstDataBlockOffset = UNSET; 173 174 /** The offset of the last data block or 0 if the file is empty. */ 175 protected long lastDataBlockOffset = UNSET; 176 177 /** 178 * The last(stop) Cell of the previous data block. This reference should be short-lived since we 179 * write hfiles in a burst. 180 */ 181 private ExtendedCell lastCellOfPreviousBlock = null; 182 183 /** Additional data items to be written to the "load-on-open" section. */ 184 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 185 186 protected long maxMemstoreTS = 0; 187 188 private final TimeRangeTracker timeRangeTracker; 189 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 190 191 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 192 FSDataOutputStream outputStream, HFileContext fileContext) { 193 this.outputStream = outputStream; 194 this.path = path; 195 this.name = path != null ? path.getName() : outputStream.toString(); 196 this.hFileContext = fileContext; 197 // TODO: Move this back to upper layer 198 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 199 this.timeRangeTrackerForTiering = () -> this.timeRangeTracker; 200 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 201 if (encoding != DataBlockEncoding.NONE) { 202 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 203 } else { 204 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 205 } 206 IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding(); 207 if (indexBlockEncoding != IndexBlockEncoding.NONE) { 208 this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding); 209 } else { 210 this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE; 211 } 212 closeOutputStream = path != null; 213 this.cacheConf = cacheConf; 214 this.conf = conf; 215 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f); 216 this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio); 217 218 finishInit(conf); 219 if (LOG.isTraceEnabled()) { 220 LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: " 221 + cacheConf + " fileContext: " + fileContext); 222 } 223 } 224 225 /** 226 * Add to the file info. All added key/value pairs can be obtained using 227 * {@link HFile.Reader#getHFileInfo()}. 228 * @param k Key 229 * @param v Value 230 * @throws IOException in case the key or the value are invalid 231 */ 232 @Override 233 public void appendFileInfo(final byte[] k, final byte[] v) throws IOException { 234 fileInfo.append(k, v, true); 235 } 236 237 /** 238 * Sets the file info offset in the trailer, finishes up populating fields in the file info, and 239 * writes the file info into the given data output. The reason the data output is not always 240 * {@link #outputStream} is that we store file info as a block in version 2. 241 * @param trailer fixed file trailer 242 * @param out the data output to write the file info to 243 */ 244 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 245 throws IOException { 246 trailer.setFileInfoOffset(outputStream.getPos()); 247 finishFileInfo(); 248 long startTime = EnvironmentEdgeManager.currentTime(); 249 fileInfo.write(out); 250 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 251 } 252 253 public long getPos() throws IOException { 254 return outputStream.getPos(); 255 256 } 257 258 /** 259 * Checks that the given Cell's key does not violate the key order. 260 * @param cell Cell whose key to check. 261 * @return true if the key is duplicate 262 * @throws IOException if the key or the key order is wrong 263 */ 264 protected boolean checkKey(final Cell cell) throws IOException { 265 boolean isDuplicateKey = false; 266 267 if (cell == null) { 268 throw new IOException("Key cannot be null or empty"); 269 } 270 if (lastCell != null) { 271 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), 272 lastCell, cell); 273 if (keyComp > 0) { 274 String message = getLexicalErrorMessage(cell); 275 throw new IOException(message); 276 } else if (keyComp == 0) { 277 isDuplicateKey = true; 278 } 279 } 280 return isDuplicateKey; 281 } 282 283 private String getLexicalErrorMessage(Cell cell) { 284 StringBuilder sb = new StringBuilder(); 285 sb.append("Added a key not lexically larger than previous. Current cell = "); 286 sb.append(cell); 287 sb.append(", lastCell = "); 288 sb.append(lastCell); 289 // file context includes HFile path and optionally table and CF of file being written 290 sb.append("fileContext="); 291 sb.append(hFileContext); 292 return sb.toString(); 293 } 294 295 /** Checks the given value for validity. */ 296 protected void checkValue(final byte[] value, final int offset, final int length) 297 throws IOException { 298 if (value == null) { 299 throw new IOException("Value cannot be null"); 300 } 301 } 302 303 /** Returns Path or null if we were passed a stream rather than a Path. */ 304 @Override 305 public Path getPath() { 306 return path; 307 } 308 309 @Override 310 public String toString() { 311 return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression=" 312 + hFileContext.getCompression().getName(); 313 } 314 315 public static Compression.Algorithm compressionByName(String algoName) { 316 if (algoName == null) { 317 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 318 } 319 return Compression.getCompressionAlgorithmByName(algoName); 320 } 321 322 /** A helper method to create HFile output streams in constructors */ 323 protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs, 324 Path path, InetSocketAddress[] favoredNodes) throws IOException { 325 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 326 return FSUtils.create(conf, fs, path, perms, favoredNodes); 327 } 328 329 /** Additional initialization steps */ 330 protected void finishInit(final Configuration conf) { 331 if (blockWriter != null) { 332 throw new IllegalStateException("finishInit called twice"); 333 } 334 blockWriter = 335 new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), 336 conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); 337 // Data block index writer 338 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 339 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 340 cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); 341 dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); 342 dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); 343 inlineBlockWriters.add(dataBlockIndexWriter); 344 345 // Meta data block index writer 346 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 347 LOG.trace("Initialized with {}", cacheConf); 348 } 349 350 /** 351 * At a block boundary, write all the inline blocks and opens new block. 352 */ 353 protected void checkBlockBoundary() throws IOException { 354 boolean shouldFinishBlock = false; 355 // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0 356 // and we should use the encoding ratio 357 if (encodedBlockSizeLimit > 0) { 358 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit; 359 } else { 360 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() 361 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); 362 } 363 shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate(); 364 if (shouldFinishBlock) { 365 finishBlock(); 366 writeInlineBlocks(false); 367 newBlock(); 368 } 369 } 370 371 /** Clean up the data block that is currently being written. */ 372 private void finishBlock() throws IOException { 373 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { 374 return; 375 } 376 377 // Update the first data block offset if UNSET; used scanning. 378 if (firstDataBlockOffset == UNSET) { 379 firstDataBlockOffset = outputStream.getPos(); 380 } 381 // Update the last data block offset each time through here. 382 lastDataBlockOffset = outputStream.getPos(); 383 blockWriter.writeHeaderAndData(outputStream); 384 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 385 ExtendedCell indexEntry = 386 getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); 387 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 388 lastDataBlockOffset, onDiskSize); 389 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 390 if (cacheConf.shouldCacheDataOnWrite()) { 391 doCacheOnWrite(lastDataBlockOffset); 392 } 393 } 394 395 /** 396 * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is 397 * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an 398 * optimization. It does not always work. In this case we'll just return the <code>right</code> 399 * cell. 400 * @return A cell that sorts between <code>left</code> and <code>right</code>. 401 */ 402 public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left, 403 final ExtendedCell right) { 404 if (right == null) { 405 throw new IllegalArgumentException("right cell can not be null"); 406 } 407 if (left == null) { 408 return right; 409 } 410 // If Cells from meta table, don't mess around. meta table Cells have schema 411 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 412 // out without trying to do this optimization. 413 if (comparator instanceof MetaCellComparator) { 414 return right; 415 } 416 byte[] midRow; 417 boolean bufferBacked = 418 left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell; 419 if (bufferBacked) { 420 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 421 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 422 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 423 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 424 } else { 425 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(), 426 right.getRowArray(), right.getRowOffset(), right.getRowLength()); 427 } 428 if (midRow != null) { 429 return PrivateCellUtil.createFirstOnRow(midRow); 430 } 431 // Rows are same. Compare on families. 432 if (bufferBacked) { 433 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 434 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 435 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 436 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 437 } else { 438 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 439 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 440 right.getFamilyLength()); 441 } 442 if (midRow != null) { 443 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 444 } 445 // Families are same. Compare on qualifiers. 446 if (bufferBacked) { 447 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 448 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 449 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 450 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 451 } else { 452 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 453 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 454 right.getQualifierLength()); 455 } 456 if (midRow != null) { 457 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 458 } 459 // No opportunity for optimization. Just return right key. 460 return right; 461 } 462 463 /** 464 * Try to get a byte array that falls between left and right as short as possible with 465 * lexicographical order; 466 * @return Return a new array that is between left and right and minimally sized else just return 467 * null if left == right. 468 */ 469 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 470 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 471 int minLength = leftLength < rightLength ? leftLength : rightLength; 472 int diffIdx = 0; 473 for (; diffIdx < minLength; diffIdx++) { 474 byte leftByte = leftArray[leftOffset + diffIdx]; 475 byte rightByte = rightArray[rightOffset + diffIdx]; 476 if ((leftByte & 0xff) > (rightByte & 0xff)) { 477 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 478 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 479 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 480 } else if (leftByte != rightByte) { 481 break; 482 } 483 } 484 if (diffIdx == minLength) { 485 if (leftLength > rightLength) { 486 // right is prefix of left 487 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 488 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 489 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 490 } else if (leftLength < rightLength) { 491 // left is prefix of right. 492 byte[] minimumMidpointArray = new byte[minLength + 1]; 493 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1); 494 minimumMidpointArray[minLength] = 0x00; 495 return minimumMidpointArray; 496 } else { 497 // left == right 498 return null; 499 } 500 } 501 // Note that left[diffIdx] can never be equal to 0xff since left < right 502 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 503 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1); 504 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 505 return minimumMidpointArray; 506 } 507 508 /** 509 * Try to create a new byte array that falls between left and right as short as possible with 510 * lexicographical order. 511 * @return Return a new array that is between left and right and minimally sized else just return 512 * null if left == right. 513 */ 514 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 515 ByteBuffer right, int rightOffset, int rightLength) { 516 int minLength = leftLength < rightLength ? leftLength : rightLength; 517 int diffIdx = 0; 518 for (; diffIdx < minLength; diffIdx++) { 519 int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 520 int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx); 521 if ((leftByte & 0xff) > (rightByte & 0xff)) { 522 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 523 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 524 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 525 } else if (leftByte != rightByte) { 526 break; 527 } 528 } 529 if (diffIdx == minLength) { 530 if (leftLength > rightLength) { 531 // right is prefix of left 532 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 533 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 534 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 535 } else if (leftLength < rightLength) { 536 // left is prefix of right. 537 byte[] minimumMidpointArray = new byte[minLength + 1]; 538 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0, 539 minLength + 1); 540 minimumMidpointArray[minLength] = 0x00; 541 return minimumMidpointArray; 542 } else { 543 // left == right 544 return null; 545 } 546 } 547 // Note that left[diffIdx] can never be equal to 0xff since left < right 548 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 549 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1); 550 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 551 return minimumMidpointArray; 552 } 553 554 /** Gives inline block writers an opportunity to contribute blocks. */ 555 private void writeInlineBlocks(boolean closing) throws IOException { 556 for (InlineBlockWriter ibw : inlineBlockWriters) { 557 while (ibw.shouldWriteBlock(closing)) { 558 long offset = outputStream.getPos(); 559 boolean cacheThisBlock = ibw.getCacheOnWrite(); 560 ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType())); 561 blockWriter.writeHeaderAndData(outputStream); 562 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 563 blockWriter.getUncompressedSizeWithoutHeader()); 564 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 565 566 if (cacheThisBlock) { 567 doCacheOnWrite(offset); 568 } 569 } 570 } 571 } 572 573 /** 574 * Caches the last written HFile block. 575 * @param offset the offset of the block we want to cache. Used to determine the cache key. 576 */ 577 private void doCacheOnWrite(long offset) { 578 cacheConf.getBlockCache().ifPresent(cache -> { 579 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 580 BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType()); 581 if (!shouldCacheBlock(cache, key)) { 582 return; 583 } 584 try { 585 cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true); 586 } finally { 587 // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent 588 cacheFormatBlock.release(); 589 } 590 }); 591 } 592 593 private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) { 594 if (path != null) { 595 return new BlockCacheKey(path, offset, true, blockType); 596 } 597 return new BlockCacheKey(name, offset, true, blockType); 598 } 599 600 private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { 601 Optional<Boolean> result = 602 cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf); 603 return result.orElse(true); 604 } 605 606 /** 607 * Ready a new block for writing. 608 */ 609 protected void newBlock() throws IOException { 610 // This is where the next block begins. 611 blockWriter.startWriting(BlockType.DATA); 612 firstCellInBlock = null; 613 if (lastCell != null) { 614 lastCellOfPreviousBlock = lastCell; 615 } 616 } 617 618 /** 619 * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive. 620 * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance. 621 * If metadata is small, consider adding to file info using 622 * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data 623 * later (DO NOT REUSE) 624 */ 625 @Override 626 public void appendMetaBlock(String metaBlockName, Writable content) { 627 byte[] key = Bytes.toBytes(metaBlockName); 628 int i; 629 for (i = 0; i < metaNames.size(); ++i) { 630 // stop when the current key is greater than our own 631 byte[] cur = metaNames.get(i); 632 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { 633 break; 634 } 635 } 636 metaNames.add(i, key); 637 metaData.add(i, content); 638 } 639 640 @Override 641 public void close() throws IOException { 642 if (outputStream == null) { 643 return; 644 } 645 // Save data block encoder metadata in the file info. 646 blockEncoder.saveMetadata(this); 647 // Save index block encoder metadata in the file info. 648 indexBlockEncoder.saveMetadata(this); 649 // Write out the end of the data blocks, then write meta data blocks. 650 // followed by fileinfo, data block index and meta block index. 651 652 finishBlock(); 653 writeInlineBlocks(true); 654 655 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 656 657 // Write out the metadata blocks if any. 658 if (!metaNames.isEmpty()) { 659 for (int i = 0; i < metaNames.size(); ++i) { 660 // store the beginning offset 661 long offset = outputStream.getPos(); 662 // write the metadata content 663 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 664 metaData.get(i).write(dos); 665 666 blockWriter.writeHeaderAndData(outputStream); 667 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 668 669 // Add the new meta block to the meta index. 670 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 671 blockWriter.getOnDiskSizeWithHeader()); 672 } 673 } 674 675 // Load-on-open section. 676 677 // Data block index. 678 // 679 // In version 2, this section of the file starts with the root level data 680 // block index. We call a function that writes intermediate-level blocks 681 // first, then root level, and returns the offset of the root level block 682 // index. 683 684 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 685 trailer.setLoadOnOpenOffset(rootIndexOffset); 686 687 // Meta block index. 688 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX), 689 "meta"); 690 blockWriter.writeHeaderAndData(outputStream); 691 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 692 693 if (this.hFileContext.isIncludesMvcc()) { 694 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 695 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 696 } 697 698 // File info 699 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 700 blockWriter.writeHeaderAndData(outputStream); 701 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 702 703 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 704 for (BlockWritable w : additionalLoadOnOpenData) { 705 blockWriter.writeBlock(w, outputStream); 706 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 707 } 708 709 // Now finish off the trailer. 710 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 711 trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize()); 712 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 713 trailer.setLastDataBlockOffset(lastDataBlockOffset); 714 trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); 715 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 716 717 finishClose(trailer); 718 719 blockWriter.release(); 720 } 721 722 @Override 723 public void addInlineBlockWriter(InlineBlockWriter ibw) { 724 inlineBlockWriters.add(ibw); 725 } 726 727 @Override 728 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 729 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 730 } 731 732 @Override 733 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 734 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 735 } 736 737 private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) { 738 if (bfw.getKeyCount() <= 0) { 739 return; 740 } 741 742 if ( 743 blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META 744 ) { 745 throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported"); 746 } 747 additionalLoadOnOpenData.add(new BlockWritable() { 748 @Override 749 public BlockType getBlockType() { 750 return blockType; 751 } 752 753 @Override 754 public void writeToBlock(DataOutput out) throws IOException { 755 bfw.getMetaWriter().write(out); 756 Writable dataWriter = bfw.getDataWriter(); 757 if (dataWriter != null) { 758 dataWriter.write(out); 759 } 760 } 761 }); 762 } 763 764 @Override 765 public HFileContext getFileContext() { 766 return hFileContext; 767 } 768 769 /** 770 * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on 771 * construction. Cell to add. Cannot be empty nor null. 772 */ 773 @Override 774 public void append(final ExtendedCell cell) throws IOException { 775 // checkKey uses comparator to check we are writing in order. 776 boolean dupKey = checkKey(cell); 777 if (!dupKey) { 778 checkBlockBoundary(); 779 } 780 781 if (!blockWriter.isWriting()) { 782 newBlock(); 783 } 784 785 blockWriter.write(cell); 786 787 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 788 totalValueLength += cell.getValueLength(); 789 if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) { 790 lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell); 791 keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell); 792 } 793 // Are we the first key in this block? 794 if (firstCellInBlock == null) { 795 // If cell is big, block will be closed and this firstCellInBlock reference will only last 796 // a short while. 797 firstCellInBlock = cell; 798 } 799 800 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 801 lastCell = cell; 802 entryCount++; 803 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 804 int tagsLength = cell.getTagsLength(); 805 if (tagsLength > this.maxTagsLength) { 806 this.maxTagsLength = tagsLength; 807 } 808 809 trackTimestamps(cell); 810 } 811 812 @Override 813 public void beforeShipped() throws IOException { 814 this.blockWriter.beforeShipped(); 815 // Add clone methods for every cell 816 if (this.lastCell != null) { 817 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 818 } 819 if (this.firstCellInBlock != null) { 820 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 821 } 822 if (this.lastCellOfPreviousBlock != null) { 823 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 824 } 825 } 826 827 public Cell getLastCell() { 828 return lastCell; 829 } 830 831 protected void finishFileInfo() throws IOException { 832 if (lastCell != null) { 833 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 834 // byte buffer. Won't take a tuple. 835 byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 836 fileInfo.append(HFileInfo.LASTKEY, lastKey, false); 837 } 838 839 // Average key length. 840 int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 841 fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 842 fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 843 false); 844 845 // Average value length. 846 int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 847 fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 848 849 // Biggest cell. 850 if (keyOfBiggestCell != null) { 851 fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false); 852 fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false); 853 LOG.debug("Len of the biggest cell in {} is {}, key is {}", 854 this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell, 855 CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false)); 856 } 857 858 if (hFileContext.isIncludesTags()) { 859 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 860 // from the FileInfo 861 fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 862 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 863 && hFileContext.isCompressTags(); 864 fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 865 } 866 } 867 868 protected int getMajorVersion() { 869 return 3; 870 } 871 872 protected int getMinorVersion() { 873 return HFileReaderImpl.MAX_MINOR_VERSION; 874 } 875 876 protected void finishClose(FixedFileTrailer trailer) throws IOException { 877 // Write out encryption metadata before finalizing if we have a valid crypto context 878 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 879 if (cryptoContext != Encryption.Context.NONE) { 880 // Wrap the context's key and write it as the encryption metadata, the wrapper includes 881 // all information needed for decryption 882 trailer.setEncryptionKey(EncryptionUtil.wrapKey( 883 cryptoContext.getConf(), cryptoContext.getConf() 884 .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), 885 cryptoContext.getKey())); 886 } 887 // Now we can finish the close 888 trailer.setMetaIndexCount(metaNames.size()); 889 trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize()); 890 trailer.setEntryCount(entryCount); 891 trailer.setCompressionCodec(hFileContext.getCompression()); 892 893 long startTime = EnvironmentEdgeManager.currentTime(); 894 trailer.serialize(outputStream); 895 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 896 897 if (closeOutputStream) { 898 outputStream.close(); 899 outputStream = null; 900 } 901 } 902 903 /** 904 * Add TimestampRange and earliest put timestamp to Metadata 905 */ 906 public void appendTrackedTimestampsToMetadata() throws IOException { 907 // TODO: The StoreFileReader always converts the byte[] to TimeRange 908 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 909 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 910 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 911 } 912 913 public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) 914 throws IOException { 915 // TODO: The StoreFileReader always converts the byte[] to TimeRange 916 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 917 appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); 918 } 919 920 /** 921 * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker 922 * to include the timestamp of this key 923 */ 924 private void trackTimestamps(final ExtendedCell cell) { 925 if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) { 926 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 927 } 928 timeRangeTracker.includeTimestamp(cell); 929 } 930}