001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; 021import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 024 025import java.io.DataOutput; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.nio.ByteBuffer; 030import java.security.Key; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Optional; 034import java.util.function.Supplier; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.fs.permission.FsPermission; 040import org.apache.hadoop.hbase.ByteBufferExtendedCell; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellComparator; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.ExtendedCell; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.KeyValueUtil; 048import org.apache.hadoop.hbase.MetaCellComparator; 049import org.apache.hadoop.hbase.PrivateCellUtil; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.crypto.Encryption; 052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 053import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; 054import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 055import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 056import org.apache.hadoop.hbase.security.EncryptionUtil; 057import org.apache.hadoop.hbase.security.User; 058import org.apache.hadoop.hbase.util.BloomFilterWriter; 059import org.apache.hadoop.hbase.util.ByteBufferUtils; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.hadoop.io.Writable; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * Common functionality needed by all versions of {@link HFile} writers. 071 */ 072@InterfaceAudience.Private 073public class HFileWriterImpl implements HFile.Writer { 074 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 075 076 private static final long UNSET = -1; 077 078 /** if this feature is enabled, preCalculate encoded data size before real encoding happens */ 079 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = 080 "hbase.writer.unified.encoded.blocksize.ratio"; 081 082 /** Block size limit after encoding, used to unify encoded block Cache entry size */ 083 private final int encodedBlockSizeLimit; 084 085 /** The Cell previously appended. Becomes the last cell in the file. */ 086 protected ExtendedCell lastCell = null; 087 088 /** FileSystem stream to write into. */ 089 protected FSDataOutputStream outputStream; 090 091 /** True if we opened the <code>outputStream</code> (and so will close it). */ 092 protected final boolean closeOutputStream; 093 094 /** A "file info" block: a key-value map of file-wide metadata. */ 095 protected HFileInfo fileInfo = new HFileInfo(); 096 097 /** Total # of key/value entries, i.e. how many times add() was called. */ 098 protected long entryCount = 0; 099 100 /** Used for calculating the average key length. */ 101 protected long totalKeyLength = 0; 102 103 /** Used for calculating the average value length. */ 104 protected long totalValueLength = 0; 105 106 /** Len of the biggest cell. */ 107 protected long lenOfBiggestCell = 0; 108 /** Key of the biggest cell. */ 109 protected byte[] keyOfBiggestCell; 110 111 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 112 protected long totalUncompressedBytes = 0; 113 114 /** Meta block names. */ 115 protected List<byte[]> metaNames = new ArrayList<>(); 116 117 /** {@link Writable}s representing meta block data. */ 118 protected List<Writable> metaData = new ArrayList<>(); 119 120 /** 121 * First cell in a block. This reference should be short-lived since we write hfiles in a burst. 122 */ 123 protected ExtendedCell firstCellInBlock = null; 124 125 /** May be null if we were passed a stream. */ 126 protected final Path path; 127 128 protected final Configuration conf; 129 130 /** Cache configuration for caching data on write. */ 131 protected final CacheConfig cacheConf; 132 133 public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) { 134 this.timeRangeTrackerForTiering = timeRangeTrackerForTiering; 135 } 136 137 private Supplier<TimeRangeTracker> timeRangeTrackerForTiering; 138 139 /** 140 * Name for this object used when logging or in toString. Is either the result of a toString on 141 * stream or else name of passed file Path. 142 */ 143 protected final String name; 144 145 /** 146 * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is 147 * no encoding. 148 */ 149 protected final HFileDataBlockEncoder blockEncoder; 150 151 protected final HFileIndexBlockEncoder indexBlockEncoder; 152 153 protected final HFileContext hFileContext; 154 155 private int maxTagsLength = 0; 156 157 /** KeyValue version in FileInfo */ 158 public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 159 160 /** Version for KeyValue which includes memstore timestamp */ 161 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 162 163 /** Inline block writers for multi-level block index and compound Blooms. */ 164 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 165 166 /** block writer */ 167 protected HFileBlock.Writer blockWriter; 168 169 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 170 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 171 172 /** The offset of the first data block or -1 if the file is empty. */ 173 private long firstDataBlockOffset = UNSET; 174 175 /** The offset of the last data block or 0 if the file is empty. */ 176 protected long lastDataBlockOffset = UNSET; 177 178 /** 179 * The last(stop) Cell of the previous data block. This reference should be short-lived since we 180 * write hfiles in a burst. 181 */ 182 private ExtendedCell lastCellOfPreviousBlock = null; 183 184 /** Additional data items to be written to the "load-on-open" section. */ 185 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 186 187 protected long maxMemstoreTS = 0; 188 189 private final TimeRangeTracker timeRangeTracker; 190 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 191 192 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 193 FSDataOutputStream outputStream, HFileContext fileContext) { 194 this.outputStream = outputStream; 195 this.path = path; 196 this.name = path != null ? path.getName() : outputStream.toString(); 197 this.hFileContext = fileContext; 198 // TODO: Move this back to upper layer 199 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 200 this.timeRangeTrackerForTiering = () -> this.timeRangeTracker; 201 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 202 if (encoding != DataBlockEncoding.NONE) { 203 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 204 } else { 205 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 206 } 207 IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding(); 208 if (indexBlockEncoding != IndexBlockEncoding.NONE) { 209 this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding); 210 } else { 211 this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE; 212 } 213 closeOutputStream = path != null; 214 this.cacheConf = cacheConf; 215 this.conf = conf; 216 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f); 217 this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio); 218 219 finishInit(conf); 220 if (LOG.isTraceEnabled()) { 221 LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: " 222 + cacheConf + " fileContext: " + fileContext); 223 } 224 } 225 226 /** 227 * Add to the file info. All added key/value pairs can be obtained using 228 * {@link HFile.Reader#getHFileInfo()}. 229 * @param k Key 230 * @param v Value 231 * @throws IOException in case the key or the value are invalid 232 */ 233 @Override 234 public void appendFileInfo(final byte[] k, final byte[] v) throws IOException { 235 fileInfo.append(k, v, true); 236 } 237 238 /** 239 * Sets the file info offset in the trailer, finishes up populating fields in the file info, and 240 * writes the file info into the given data output. The reason the data output is not always 241 * {@link #outputStream} is that we store file info as a block in version 2. 242 * @param trailer fixed file trailer 243 * @param out the data output to write the file info to 244 */ 245 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 246 throws IOException { 247 trailer.setFileInfoOffset(outputStream.getPos()); 248 finishFileInfo(); 249 long startTime = EnvironmentEdgeManager.currentTime(); 250 fileInfo.write(out); 251 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 252 } 253 254 public long getPos() throws IOException { 255 return outputStream.getPos(); 256 257 } 258 259 /** 260 * Checks that the given Cell's key does not violate the key order. 261 * @param cell Cell whose key to check. 262 * @return true if the key is duplicate 263 * @throws IOException if the key or the key order is wrong 264 */ 265 protected boolean checkKey(final Cell cell) throws IOException { 266 boolean isDuplicateKey = false; 267 268 if (cell == null) { 269 throw new IOException("Key cannot be null or empty"); 270 } 271 if (lastCell != null) { 272 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), 273 lastCell, cell); 274 if (keyComp > 0) { 275 String message = getLexicalErrorMessage(cell); 276 throw new IOException(message); 277 } else if (keyComp == 0) { 278 isDuplicateKey = true; 279 } 280 } 281 return isDuplicateKey; 282 } 283 284 private String getLexicalErrorMessage(Cell cell) { 285 StringBuilder sb = new StringBuilder(); 286 sb.append("Added a key not lexically larger than previous. Current cell = "); 287 sb.append(cell); 288 sb.append(", lastCell = "); 289 sb.append(lastCell); 290 // file context includes HFile path and optionally table and CF of file being written 291 sb.append("fileContext="); 292 sb.append(hFileContext); 293 return sb.toString(); 294 } 295 296 /** Checks the given value for validity. */ 297 protected void checkValue(final byte[] value, final int offset, final int length) 298 throws IOException { 299 if (value == null) { 300 throw new IOException("Value cannot be null"); 301 } 302 } 303 304 /** Returns Path or null if we were passed a stream rather than a Path. */ 305 @Override 306 public Path getPath() { 307 return path; 308 } 309 310 @Override 311 public String toString() { 312 return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression=" 313 + hFileContext.getCompression().getName(); 314 } 315 316 public static Compression.Algorithm compressionByName(String algoName) { 317 if (algoName == null) { 318 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 319 } 320 return Compression.getCompressionAlgorithmByName(algoName); 321 } 322 323 /** A helper method to create HFile output streams in constructors */ 324 protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs, 325 Path path, InetSocketAddress[] favoredNodes) throws IOException { 326 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 327 return FSUtils.create(conf, fs, path, perms, favoredNodes); 328 } 329 330 /** Additional initialization steps */ 331 protected void finishInit(final Configuration conf) { 332 if (blockWriter != null) { 333 throw new IllegalStateException("finishInit called twice"); 334 } 335 blockWriter = 336 new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), 337 conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); 338 // Data block index writer 339 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 340 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 341 cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); 342 dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); 343 dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); 344 inlineBlockWriters.add(dataBlockIndexWriter); 345 346 // Meta data block index writer 347 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 348 LOG.trace("Initialized with {}", cacheConf); 349 } 350 351 /** 352 * At a block boundary, write all the inline blocks and opens new block. 353 */ 354 protected void checkBlockBoundary() throws IOException { 355 boolean shouldFinishBlock = false; 356 // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0 357 // and we should use the encoding ratio 358 if (encodedBlockSizeLimit > 0) { 359 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit; 360 } else { 361 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() 362 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); 363 } 364 shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate(); 365 if (shouldFinishBlock) { 366 finishBlock(); 367 writeInlineBlocks(false); 368 newBlock(); 369 } 370 } 371 372 /** Clean up the data block that is currently being written. */ 373 private void finishBlock() throws IOException { 374 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { 375 return; 376 } 377 378 // Update the first data block offset if UNSET; used scanning. 379 if (firstDataBlockOffset == UNSET) { 380 firstDataBlockOffset = outputStream.getPos(); 381 } 382 // Update the last data block offset each time through here. 383 lastDataBlockOffset = outputStream.getPos(); 384 blockWriter.writeHeaderAndData(outputStream); 385 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 386 ExtendedCell indexEntry = 387 getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); 388 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 389 lastDataBlockOffset, onDiskSize); 390 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 391 if (cacheConf.shouldCacheDataOnWrite()) { 392 doCacheOnWrite(lastDataBlockOffset); 393 } 394 } 395 396 /** 397 * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is 398 * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an 399 * optimization. It does not always work. In this case we'll just return the <code>right</code> 400 * cell. 401 * @return A cell that sorts between <code>left</code> and <code>right</code>. 402 */ 403 public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left, 404 final ExtendedCell right) { 405 if (right == null) { 406 throw new IllegalArgumentException("right cell can not be null"); 407 } 408 if (left == null) { 409 return right; 410 } 411 // If Cells from meta table, don't mess around. meta table Cells have schema 412 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 413 // out without trying to do this optimization. 414 if (comparator instanceof MetaCellComparator) { 415 return right; 416 } 417 byte[] midRow; 418 boolean bufferBacked = 419 left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell; 420 if (bufferBacked) { 421 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 422 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 423 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 424 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 425 } else { 426 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(), 427 right.getRowArray(), right.getRowOffset(), right.getRowLength()); 428 } 429 if (midRow != null) { 430 return PrivateCellUtil.createFirstOnRow(midRow); 431 } 432 // Rows are same. Compare on families. 433 if (bufferBacked) { 434 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 435 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 436 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 437 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 438 } else { 439 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 440 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 441 right.getFamilyLength()); 442 } 443 if (midRow != null) { 444 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 445 } 446 // Families are same. Compare on qualifiers. 447 if (bufferBacked) { 448 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 449 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 450 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 451 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 452 } else { 453 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 454 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 455 right.getQualifierLength()); 456 } 457 if (midRow != null) { 458 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 459 } 460 // No opportunity for optimization. Just return right key. 461 return right; 462 } 463 464 /** 465 * Try to get a byte array that falls between left and right as short as possible with 466 * lexicographical order; 467 * @return Return a new array that is between left and right and minimally sized else just return 468 * null if left == right. 469 */ 470 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 471 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 472 int minLength = leftLength < rightLength ? leftLength : rightLength; 473 int diffIdx = 0; 474 for (; diffIdx < minLength; diffIdx++) { 475 byte leftByte = leftArray[leftOffset + diffIdx]; 476 byte rightByte = rightArray[rightOffset + diffIdx]; 477 if ((leftByte & 0xff) > (rightByte & 0xff)) { 478 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 479 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 480 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 481 } else if (leftByte != rightByte) { 482 break; 483 } 484 } 485 if (diffIdx == minLength) { 486 if (leftLength > rightLength) { 487 // right is prefix of left 488 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 489 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 490 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 491 } else if (leftLength < rightLength) { 492 // left is prefix of right. 493 byte[] minimumMidpointArray = new byte[minLength + 1]; 494 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1); 495 minimumMidpointArray[minLength] = 0x00; 496 return minimumMidpointArray; 497 } else { 498 // left == right 499 return null; 500 } 501 } 502 // Note that left[diffIdx] can never be equal to 0xff since left < right 503 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 504 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1); 505 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 506 return minimumMidpointArray; 507 } 508 509 /** 510 * Try to create a new byte array that falls between left and right as short as possible with 511 * lexicographical order. 512 * @return Return a new array that is between left and right and minimally sized else just return 513 * null if left == right. 514 */ 515 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 516 ByteBuffer right, int rightOffset, int rightLength) { 517 int minLength = leftLength < rightLength ? leftLength : rightLength; 518 int diffIdx = 0; 519 for (; diffIdx < minLength; diffIdx++) { 520 int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 521 int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx); 522 if ((leftByte & 0xff) > (rightByte & 0xff)) { 523 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 524 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 525 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 526 } else if (leftByte != rightByte) { 527 break; 528 } 529 } 530 if (diffIdx == minLength) { 531 if (leftLength > rightLength) { 532 // right is prefix of left 533 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 534 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 535 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 536 } else if (leftLength < rightLength) { 537 // left is prefix of right. 538 byte[] minimumMidpointArray = new byte[minLength + 1]; 539 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0, 540 minLength + 1); 541 minimumMidpointArray[minLength] = 0x00; 542 return minimumMidpointArray; 543 } else { 544 // left == right 545 return null; 546 } 547 } 548 // Note that left[diffIdx] can never be equal to 0xff since left < right 549 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 550 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1); 551 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 552 return minimumMidpointArray; 553 } 554 555 /** Gives inline block writers an opportunity to contribute blocks. */ 556 private void writeInlineBlocks(boolean closing) throws IOException { 557 for (InlineBlockWriter ibw : inlineBlockWriters) { 558 while (ibw.shouldWriteBlock(closing)) { 559 long offset = outputStream.getPos(); 560 boolean cacheThisBlock = ibw.getCacheOnWrite(); 561 ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType())); 562 blockWriter.writeHeaderAndData(outputStream); 563 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 564 blockWriter.getUncompressedSizeWithoutHeader()); 565 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 566 567 if (cacheThisBlock) { 568 doCacheOnWrite(offset); 569 } 570 } 571 } 572 } 573 574 /** 575 * Caches the last written HFile block. 576 * @param offset the offset of the block we want to cache. Used to determine the cache key. 577 */ 578 private void doCacheOnWrite(long offset) { 579 cacheConf.getBlockCache().ifPresent(cache -> { 580 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 581 BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType()); 582 if (!shouldCacheBlock(cache, key)) { 583 return; 584 } 585 try { 586 cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true); 587 } finally { 588 // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent 589 cacheFormatBlock.release(); 590 } 591 }); 592 } 593 594 private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) { 595 if (path != null) { 596 return new BlockCacheKey(path, offset, true, blockType); 597 } 598 return new BlockCacheKey(name, offset, true, blockType); 599 } 600 601 private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { 602 Optional<Boolean> result = 603 cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf); 604 return result.orElse(true); 605 } 606 607 /** 608 * Ready a new block for writing. 609 */ 610 protected void newBlock() throws IOException { 611 // This is where the next block begins. 612 blockWriter.startWriting(BlockType.DATA); 613 firstCellInBlock = null; 614 if (lastCell != null) { 615 lastCellOfPreviousBlock = lastCell; 616 } 617 } 618 619 /** 620 * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive. 621 * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance. 622 * If metadata is small, consider adding to file info using 623 * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data 624 * later (DO NOT REUSE) 625 */ 626 @Override 627 public void appendMetaBlock(String metaBlockName, Writable content) { 628 byte[] key = Bytes.toBytes(metaBlockName); 629 int i; 630 for (i = 0; i < metaNames.size(); ++i) { 631 // stop when the current key is greater than our own 632 byte[] cur = metaNames.get(i); 633 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { 634 break; 635 } 636 } 637 metaNames.add(i, key); 638 metaData.add(i, content); 639 } 640 641 @Override 642 public void close() throws IOException { 643 if (outputStream == null) { 644 return; 645 } 646 // Save data block encoder metadata in the file info. 647 blockEncoder.saveMetadata(this); 648 // Save index block encoder metadata in the file info. 649 indexBlockEncoder.saveMetadata(this); 650 // Write out the end of the data blocks, then write meta data blocks. 651 // followed by fileinfo, data block index and meta block index. 652 653 finishBlock(); 654 writeInlineBlocks(true); 655 656 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 657 658 // Write out the metadata blocks if any. 659 if (!metaNames.isEmpty()) { 660 for (int i = 0; i < metaNames.size(); ++i) { 661 // store the beginning offset 662 long offset = outputStream.getPos(); 663 // write the metadata content 664 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 665 metaData.get(i).write(dos); 666 667 blockWriter.writeHeaderAndData(outputStream); 668 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 669 670 // Add the new meta block to the meta index. 671 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 672 blockWriter.getOnDiskSizeWithHeader()); 673 } 674 } 675 676 // Load-on-open section. 677 678 // Data block index. 679 // 680 // In version 2, this section of the file starts with the root level data 681 // block index. We call a function that writes intermediate-level blocks 682 // first, then root level, and returns the offset of the root level block 683 // index. 684 685 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 686 trailer.setLoadOnOpenOffset(rootIndexOffset); 687 688 // Meta block index. 689 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX), 690 "meta"); 691 blockWriter.writeHeaderAndData(outputStream); 692 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 693 694 if (this.hFileContext.isIncludesMvcc()) { 695 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 696 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 697 } 698 699 // File info 700 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 701 blockWriter.writeHeaderAndData(outputStream); 702 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 703 704 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 705 for (BlockWritable w : additionalLoadOnOpenData) { 706 blockWriter.writeBlock(w, outputStream); 707 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 708 } 709 710 // Now finish off the trailer. 711 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 712 trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize()); 713 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 714 trailer.setLastDataBlockOffset(lastDataBlockOffset); 715 trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); 716 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 717 718 finishClose(trailer); 719 720 blockWriter.release(); 721 } 722 723 @Override 724 public void addInlineBlockWriter(InlineBlockWriter ibw) { 725 inlineBlockWriters.add(ibw); 726 } 727 728 @Override 729 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 730 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 731 } 732 733 @Override 734 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 735 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 736 } 737 738 private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) { 739 if (bfw.getKeyCount() <= 0) { 740 return; 741 } 742 743 if ( 744 blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META 745 ) { 746 throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported"); 747 } 748 additionalLoadOnOpenData.add(new BlockWritable() { 749 @Override 750 public BlockType getBlockType() { 751 return blockType; 752 } 753 754 @Override 755 public void writeToBlock(DataOutput out) throws IOException { 756 bfw.getMetaWriter().write(out); 757 Writable dataWriter = bfw.getDataWriter(); 758 if (dataWriter != null) { 759 dataWriter.write(out); 760 } 761 } 762 }); 763 } 764 765 @Override 766 public HFileContext getFileContext() { 767 return hFileContext; 768 } 769 770 /** 771 * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on 772 * construction. Cell to add. Cannot be empty nor null. 773 */ 774 @Override 775 public void append(final ExtendedCell cell) throws IOException { 776 // checkKey uses comparator to check we are writing in order. 777 boolean dupKey = checkKey(cell); 778 if (!dupKey) { 779 checkBlockBoundary(); 780 } 781 782 if (!blockWriter.isWriting()) { 783 newBlock(); 784 } 785 786 blockWriter.write(cell); 787 788 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 789 totalValueLength += cell.getValueLength(); 790 if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) { 791 lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell); 792 keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell); 793 } 794 // Are we the first key in this block? 795 if (firstCellInBlock == null) { 796 // If cell is big, block will be closed and this firstCellInBlock reference will only last 797 // a short while. 798 firstCellInBlock = cell; 799 } 800 801 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 802 lastCell = cell; 803 entryCount++; 804 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 805 int tagsLength = cell.getTagsLength(); 806 if (tagsLength > this.maxTagsLength) { 807 this.maxTagsLength = tagsLength; 808 } 809 810 trackTimestamps(cell); 811 } 812 813 @Override 814 public void beforeShipped() throws IOException { 815 this.blockWriter.beforeShipped(); 816 // Add clone methods for every cell 817 if (this.lastCell != null) { 818 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 819 } 820 if (this.firstCellInBlock != null) { 821 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 822 } 823 if (this.lastCellOfPreviousBlock != null) { 824 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 825 } 826 } 827 828 public Cell getLastCell() { 829 return lastCell; 830 } 831 832 protected void finishFileInfo() throws IOException { 833 if (lastCell != null) { 834 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 835 // byte buffer. Won't take a tuple. 836 byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 837 fileInfo.append(HFileInfo.LASTKEY, lastKey, false); 838 } 839 840 // Average key length. 841 int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 842 fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 843 fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 844 false); 845 846 // Average value length. 847 int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 848 fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 849 850 // Biggest cell. 851 if (keyOfBiggestCell != null) { 852 fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false); 853 fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false); 854 LOG.debug("Len of the biggest cell in {} is {}, key is {}", 855 this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell, 856 CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false)); 857 } 858 859 if (hFileContext.isIncludesTags()) { 860 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 861 // from the FileInfo 862 fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 863 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 864 && hFileContext.isCompressTags(); 865 fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 866 } 867 } 868 869 protected int getMajorVersion() { 870 return 3; 871 } 872 873 protected int getMinorVersion() { 874 return HFileReaderImpl.MAX_MINOR_VERSION; 875 } 876 877 protected void finishClose(FixedFileTrailer trailer) throws IOException { 878 // Write out encryption metadata before finalizing if we have a valid crypto context 879 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 880 if (cryptoContext != Encryption.Context.NONE) { 881 // key management is not yet implemented, so kekData is always null 882 // Use traditional encryption with master key 883 String wrapperSubject = cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 884 User.getCurrent().getShortName()); 885 Key encKey = cryptoContext.getKey(); 886 887 // Wrap the context's key and write it as the encryption metadata 888 if (encKey != null) { 889 byte[] wrappedKey = 890 EncryptionUtil.wrapKey(cryptoContext.getConf(), wrapperSubject, encKey, null); 891 trailer.setEncryptionKey(wrappedKey); 892 } 893 894 // Key management fields - not yet implemented, set to defaults 895 trailer.setKeyNamespace(null); 896 trailer.setKEKMetadata(null); 897 trailer.setKEKChecksum(0); 898 } 899 // Now we can finish the close 900 trailer.setMetaIndexCount(metaNames.size()); 901 trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize()); 902 trailer.setEntryCount(entryCount); 903 trailer.setCompressionCodec(hFileContext.getCompression()); 904 905 long startTime = EnvironmentEdgeManager.currentTime(); 906 trailer.serialize(outputStream); 907 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 908 909 if (closeOutputStream) { 910 outputStream.close(); 911 outputStream = null; 912 } 913 } 914 915 /** 916 * Add TimestampRange and earliest put timestamp to Metadata 917 */ 918 public void appendTrackedTimestampsToMetadata() throws IOException { 919 // TODO: The StoreFileReader always converts the byte[] to TimeRange 920 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 921 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 922 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 923 } 924 925 public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) 926 throws IOException { 927 // TODO: The StoreFileReader always converts the byte[] to TimeRange 928 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 929 appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); 930 } 931 932 /** 933 * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker 934 * to include the timestamp of this key 935 */ 936 private void trackTimestamps(final ExtendedCell cell) { 937 if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) { 938 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 939 } 940 timeRangeTracker.includeTimestamp(cell); 941 } 942}