001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; 021import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 024 025import java.io.DataOutput; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.nio.ByteBuffer; 030import java.security.Key; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Optional; 034import java.util.function.Supplier; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.fs.permission.FsPermission; 040import org.apache.hadoop.hbase.ByteBufferExtendedCell; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellComparator; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.ExtendedCell; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.KeyValueUtil; 048import org.apache.hadoop.hbase.MetaCellComparator; 049import org.apache.hadoop.hbase.PrivateCellUtil; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.crypto.Encryption; 052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 053import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; 054import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 055import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 056import org.apache.hadoop.hbase.security.EncryptionUtil; 057import org.apache.hadoop.hbase.security.User; 058import org.apache.hadoop.hbase.util.BloomFilterWriter; 059import org.apache.hadoop.hbase.util.ByteBufferUtils; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.hadoop.io.Writable; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * Common functionality needed by all versions of {@link HFile} writers. 071 */ 072@InterfaceAudience.Private 073public class HFileWriterImpl implements HFile.Writer { 074 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 075 076 private static final long UNSET = -1; 077 078 /** if this feature is enabled, preCalculate encoded data size before real encoding happens */ 079 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = 080 "hbase.writer.unified.encoded.blocksize.ratio"; 081 082 /** Block size limit after encoding, used to unify encoded block Cache entry size */ 083 private final int encodedBlockSizeLimit; 084 085 /** The Cell previously appended. Becomes the last cell in the file. */ 086 protected ExtendedCell lastCell = null; 087 088 /** FileSystem stream to write into. */ 089 protected FSDataOutputStream outputStream; 090 091 /** True if we opened the <code>outputStream</code> (and so will close it). */ 092 protected final boolean closeOutputStream; 093 094 /** A "file info" block: a key-value map of file-wide metadata. */ 095 protected HFileInfo fileInfo = new HFileInfo(); 096 097 /** Total # of key/value entries, i.e. how many times add() was called. */ 098 protected long entryCount = 0; 099 100 /** Used for calculating the average key length. */ 101 protected long totalKeyLength = 0; 102 103 /** Used for calculating the average value length. */ 104 protected long totalValueLength = 0; 105 106 /** Len of the biggest cell. */ 107 protected long lenOfBiggestCell = 0; 108 /** Key of the biggest cell. */ 109 protected byte[] keyOfBiggestCell; 110 111 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 112 protected long totalUncompressedBytes = 0; 113 114 /** Meta block names. */ 115 protected List<byte[]> metaNames = new ArrayList<>(); 116 117 /** {@link Writable}s representing meta block data. */ 118 protected List<Writable> metaData = new ArrayList<>(); 119 120 /** 121 * First cell in a block. This reference should be short-lived since we write hfiles in a burst. 122 */ 123 protected ExtendedCell firstCellInBlock = null; 124 125 /** May be null if we were passed a stream. */ 126 protected final Path path; 127 128 protected final Configuration conf; 129 130 /** Cache configuration for caching data on write. */ 131 protected final CacheConfig cacheConf; 132 133 public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) { 134 this.timeRangeTrackerForTiering = timeRangeTrackerForTiering; 135 } 136 137 private Supplier<TimeRangeTracker> timeRangeTrackerForTiering; 138 139 /** 140 * Name for this object used when logging or in toString. Is either the result of a toString on 141 * stream or else name of passed file Path. 142 */ 143 protected final String name; 144 145 /** 146 * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is 147 * no encoding. 148 */ 149 protected final HFileDataBlockEncoder blockEncoder; 150 151 protected final HFileIndexBlockEncoder indexBlockEncoder; 152 153 protected final HFileContext hFileContext; 154 155 private int maxTagsLength = 0; 156 157 /** KeyValue version in FileInfo */ 158 public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 159 160 /** Version for KeyValue which includes memstore timestamp */ 161 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 162 163 /** Inline block writers for multi-level block index and compound Blooms. */ 164 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 165 166 /** block writer */ 167 protected HFileBlock.Writer blockWriter; 168 169 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 170 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 171 172 /** The offset of the first data block or -1 if the file is empty. */ 173 private long firstDataBlockOffset = UNSET; 174 175 /** The offset of the last data block or 0 if the file is empty. */ 176 protected long lastDataBlockOffset = UNSET; 177 178 /** 179 * The last(stop) Cell of the previous data block. This reference should be short-lived since we 180 * write hfiles in a burst. 181 */ 182 private ExtendedCell lastCellOfPreviousBlock = null; 183 184 /** Additional data items to be written to the "load-on-open" section. */ 185 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 186 187 protected long maxMemstoreTS = 0; 188 189 private final TimeRangeTracker timeRangeTracker; 190 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 191 192 private String regionName; 193 194 private String familyName; 195 196 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 197 FSDataOutputStream outputStream, HFileContext fileContext) { 198 this.outputStream = outputStream; 199 this.path = path; 200 if (path != null) { 201 this.name = path.getName(); 202 this.regionName = path.getParent().getParent().getName(); 203 this.familyName = path.getParent().getName(); 204 } else { 205 this.name = outputStream.toString(); 206 } 207 this.hFileContext = fileContext; 208 // TODO: Move this back to upper layer 209 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 210 this.timeRangeTrackerForTiering = () -> this.timeRangeTracker; 211 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 212 if (encoding != DataBlockEncoding.NONE) { 213 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 214 } else { 215 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 216 } 217 IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding(); 218 if (indexBlockEncoding != IndexBlockEncoding.NONE) { 219 this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding); 220 } else { 221 this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE; 222 } 223 closeOutputStream = path != null; 224 this.cacheConf = cacheConf; 225 this.conf = conf; 226 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f); 227 this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio); 228 229 finishInit(conf); 230 if (LOG.isTraceEnabled()) { 231 LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: " 232 + cacheConf + " fileContext: " + fileContext); 233 } 234 } 235 236 /** 237 * Add to the file info. All added key/value pairs can be obtained using 238 * {@link HFile.Reader#getHFileInfo()}. 239 * @param k Key 240 * @param v Value 241 * @throws IOException in case the key or the value are invalid 242 */ 243 @Override 244 public void appendFileInfo(final byte[] k, final byte[] v) throws IOException { 245 fileInfo.append(k, v, true); 246 } 247 248 /** 249 * Sets the file info offset in the trailer, finishes up populating fields in the file info, and 250 * writes the file info into the given data output. The reason the data output is not always 251 * {@link #outputStream} is that we store file info as a block in version 2. 252 * @param trailer fixed file trailer 253 * @param out the data output to write the file info to 254 */ 255 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 256 throws IOException { 257 trailer.setFileInfoOffset(outputStream.getPos()); 258 finishFileInfo(); 259 long startTime = EnvironmentEdgeManager.currentTime(); 260 fileInfo.write(out); 261 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 262 } 263 264 public long getPos() throws IOException { 265 return outputStream.getPos(); 266 267 } 268 269 /** 270 * Checks that the given Cell's key does not violate the key order. 271 * @param cell Cell whose key to check. 272 * @return true if the key is duplicate 273 * @throws IOException if the key or the key order is wrong 274 */ 275 protected boolean checkKey(final Cell cell) throws IOException { 276 boolean isDuplicateKey = false; 277 278 if (cell == null) { 279 throw new IOException("Key cannot be null or empty"); 280 } 281 if (lastCell != null) { 282 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), 283 lastCell, cell); 284 if (keyComp > 0) { 285 String message = getLexicalErrorMessage(cell); 286 throw new IOException(message); 287 } else if (keyComp == 0) { 288 isDuplicateKey = true; 289 } 290 } 291 return isDuplicateKey; 292 } 293 294 private String getLexicalErrorMessage(Cell cell) { 295 StringBuilder sb = new StringBuilder(); 296 sb.append("Added a key not lexically larger than previous. Current cell = "); 297 sb.append(cell); 298 sb.append(", lastCell = "); 299 sb.append(lastCell); 300 // file context includes HFile path and optionally table and CF of file being written 301 sb.append("fileContext="); 302 sb.append(hFileContext); 303 return sb.toString(); 304 } 305 306 /** Checks the given value for validity. */ 307 protected void checkValue(final byte[] value, final int offset, final int length) 308 throws IOException { 309 if (value == null) { 310 throw new IOException("Value cannot be null"); 311 } 312 } 313 314 /** Returns Path or null if we were passed a stream rather than a Path. */ 315 @Override 316 public Path getPath() { 317 return path; 318 } 319 320 @Override 321 public String toString() { 322 return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression=" 323 + hFileContext.getCompression().getName(); 324 } 325 326 public static Compression.Algorithm compressionByName(String algoName) { 327 if (algoName == null) { 328 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 329 } 330 return Compression.getCompressionAlgorithmByName(algoName); 331 } 332 333 /** A helper method to create HFile output streams in constructors */ 334 protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs, 335 Path path, InetSocketAddress[] favoredNodes) throws IOException { 336 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 337 return FSUtils.create(conf, fs, path, perms, favoredNodes); 338 } 339 340 /** Additional initialization steps */ 341 protected void finishInit(final Configuration conf) { 342 if (blockWriter != null) { 343 throw new IllegalStateException("finishInit called twice"); 344 } 345 blockWriter = 346 new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), 347 conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); 348 // Data block index writer 349 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 350 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 351 cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); 352 dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); 353 dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); 354 inlineBlockWriters.add(dataBlockIndexWriter); 355 356 // Meta data block index writer 357 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 358 LOG.trace("Initialized with {}", cacheConf); 359 } 360 361 /** 362 * At a block boundary, write all the inline blocks and opens new block. 363 */ 364 protected void checkBlockBoundary() throws IOException { 365 boolean shouldFinishBlock = false; 366 // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0 367 // and we should use the encoding ratio 368 if (encodedBlockSizeLimit > 0) { 369 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit; 370 } else { 371 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() 372 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); 373 } 374 shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate(); 375 if (shouldFinishBlock) { 376 finishBlock(); 377 writeInlineBlocks(false); 378 newBlock(); 379 } 380 } 381 382 /** Clean up the data block that is currently being written. */ 383 private void finishBlock() throws IOException { 384 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { 385 return; 386 } 387 388 // Update the first data block offset if UNSET; used scanning. 389 if (firstDataBlockOffset == UNSET) { 390 firstDataBlockOffset = outputStream.getPos(); 391 } 392 // Update the last data block offset each time through here. 393 lastDataBlockOffset = outputStream.getPos(); 394 blockWriter.writeHeaderAndData(outputStream); 395 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 396 ExtendedCell indexEntry = 397 getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); 398 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 399 lastDataBlockOffset, onDiskSize); 400 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 401 if (cacheConf.shouldCacheDataOnWrite()) { 402 doCacheOnWrite(lastDataBlockOffset); 403 } 404 } 405 406 /** 407 * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is 408 * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an 409 * optimization. It does not always work. In this case we'll just return the <code>right</code> 410 * cell. 411 * @return A cell that sorts between <code>left</code> and <code>right</code>. 412 */ 413 public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left, 414 final ExtendedCell right) { 415 if (right == null) { 416 throw new IllegalArgumentException("right cell can not be null"); 417 } 418 if (left == null) { 419 return right; 420 } 421 // If Cells from meta table, don't mess around. meta table Cells have schema 422 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 423 // out without trying to do this optimization. 424 if (comparator instanceof MetaCellComparator) { 425 return right; 426 } 427 byte[] midRow; 428 boolean bufferBacked = 429 left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell; 430 if (bufferBacked) { 431 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 432 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 433 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 434 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 435 } else { 436 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(), 437 right.getRowArray(), right.getRowOffset(), right.getRowLength()); 438 } 439 if (midRow != null) { 440 return PrivateCellUtil.createFirstOnRow(midRow); 441 } 442 // Rows are same. Compare on families. 443 if (bufferBacked) { 444 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 445 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 446 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 447 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 448 } else { 449 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 450 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 451 right.getFamilyLength()); 452 } 453 if (midRow != null) { 454 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 455 } 456 // Families are same. Compare on qualifiers. 457 if (bufferBacked) { 458 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 459 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 460 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 461 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 462 } else { 463 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 464 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 465 right.getQualifierLength()); 466 } 467 if (midRow != null) { 468 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 469 } 470 // No opportunity for optimization. Just return right key. 471 return right; 472 } 473 474 /** 475 * Try to get a byte array that falls between left and right as short as possible with 476 * lexicographical order; 477 * @return Return a new array that is between left and right and minimally sized else just return 478 * null if left == right. 479 */ 480 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 481 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 482 int minLength = leftLength < rightLength ? leftLength : rightLength; 483 int diffIdx = 0; 484 for (; diffIdx < minLength; diffIdx++) { 485 byte leftByte = leftArray[leftOffset + diffIdx]; 486 byte rightByte = rightArray[rightOffset + diffIdx]; 487 if ((leftByte & 0xff) > (rightByte & 0xff)) { 488 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 489 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 490 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 491 } else if (leftByte != rightByte) { 492 break; 493 } 494 } 495 if (diffIdx == minLength) { 496 if (leftLength > rightLength) { 497 // right is prefix of left 498 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 499 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 500 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 501 } else if (leftLength < rightLength) { 502 // left is prefix of right. 503 byte[] minimumMidpointArray = new byte[minLength + 1]; 504 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1); 505 minimumMidpointArray[minLength] = 0x00; 506 return minimumMidpointArray; 507 } else { 508 // left == right 509 return null; 510 } 511 } 512 // Note that left[diffIdx] can never be equal to 0xff since left < right 513 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 514 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1); 515 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 516 return minimumMidpointArray; 517 } 518 519 /** 520 * Try to create a new byte array that falls between left and right as short as possible with 521 * lexicographical order. 522 * @return Return a new array that is between left and right and minimally sized else just return 523 * null if left == right. 524 */ 525 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 526 ByteBuffer right, int rightOffset, int rightLength) { 527 int minLength = leftLength < rightLength ? leftLength : rightLength; 528 int diffIdx = 0; 529 for (; diffIdx < minLength; diffIdx++) { 530 int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 531 int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx); 532 if ((leftByte & 0xff) > (rightByte & 0xff)) { 533 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 534 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 535 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 536 } else if (leftByte != rightByte) { 537 break; 538 } 539 } 540 if (diffIdx == minLength) { 541 if (leftLength > rightLength) { 542 // right is prefix of left 543 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 544 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 545 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 546 } else if (leftLength < rightLength) { 547 // left is prefix of right. 548 byte[] minimumMidpointArray = new byte[minLength + 1]; 549 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0, 550 minLength + 1); 551 minimumMidpointArray[minLength] = 0x00; 552 return minimumMidpointArray; 553 } else { 554 // left == right 555 return null; 556 } 557 } 558 // Note that left[diffIdx] can never be equal to 0xff since left < right 559 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 560 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1); 561 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 562 return minimumMidpointArray; 563 } 564 565 /** Gives inline block writers an opportunity to contribute blocks. */ 566 private void writeInlineBlocks(boolean closing) throws IOException { 567 for (InlineBlockWriter ibw : inlineBlockWriters) { 568 while (ibw.shouldWriteBlock(closing)) { 569 long offset = outputStream.getPos(); 570 boolean cacheThisBlock = ibw.getCacheOnWrite(); 571 ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType())); 572 blockWriter.writeHeaderAndData(outputStream); 573 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 574 blockWriter.getUncompressedSizeWithoutHeader()); 575 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 576 577 if (cacheThisBlock) { 578 doCacheOnWrite(offset); 579 } 580 } 581 } 582 } 583 584 /** 585 * Caches the last written HFile block. 586 * @param offset the offset of the block we want to cache. Used to determine the cache key. 587 */ 588 private void doCacheOnWrite(long offset) { 589 cacheConf.getBlockCache().ifPresent(cache -> { 590 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 591 try { 592 BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType()); 593 if (!shouldCacheBlock(cache, key)) { 594 return; 595 } 596 cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true); 597 } finally { 598 // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent 599 cacheFormatBlock.release(); 600 } 601 }); 602 } 603 604 private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) { 605 if (path != null) { 606 return new BlockCacheKey(name, familyName, regionName, offset, true, blockType, false); 607 } 608 return new BlockCacheKey(name, offset, true, blockType); 609 } 610 611 private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { 612 Optional<Boolean> result = 613 cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf); 614 return result.orElse(true); 615 } 616 617 /** 618 * Ready a new block for writing. 619 */ 620 protected void newBlock() throws IOException { 621 // This is where the next block begins. 622 blockWriter.startWriting(BlockType.DATA); 623 firstCellInBlock = null; 624 if (lastCell != null) { 625 lastCellOfPreviousBlock = lastCell; 626 } 627 } 628 629 /** 630 * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive. 631 * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance. 632 * If metadata is small, consider adding to file info using 633 * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data 634 * later (DO NOT REUSE) 635 */ 636 @Override 637 public void appendMetaBlock(String metaBlockName, Writable content) { 638 byte[] key = Bytes.toBytes(metaBlockName); 639 int i; 640 for (i = 0; i < metaNames.size(); ++i) { 641 // stop when the current key is greater than our own 642 byte[] cur = metaNames.get(i); 643 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { 644 break; 645 } 646 } 647 metaNames.add(i, key); 648 metaData.add(i, content); 649 } 650 651 @Override 652 public void close() throws IOException { 653 if (outputStream == null) { 654 return; 655 } 656 // Save data block encoder metadata in the file info. 657 blockEncoder.saveMetadata(this); 658 // Save index block encoder metadata in the file info. 659 indexBlockEncoder.saveMetadata(this); 660 // Write out the end of the data blocks, then write meta data blocks. 661 // followed by fileinfo, data block index and meta block index. 662 663 finishBlock(); 664 writeInlineBlocks(true); 665 666 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 667 668 // Write out the metadata blocks if any. 669 if (!metaNames.isEmpty()) { 670 for (int i = 0; i < metaNames.size(); ++i) { 671 // store the beginning offset 672 long offset = outputStream.getPos(); 673 // write the metadata content 674 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 675 metaData.get(i).write(dos); 676 677 blockWriter.writeHeaderAndData(outputStream); 678 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 679 680 // Add the new meta block to the meta index. 681 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 682 blockWriter.getOnDiskSizeWithHeader()); 683 } 684 } 685 686 // Load-on-open section. 687 688 // Data block index. 689 // 690 // In version 2, this section of the file starts with the root level data 691 // block index. We call a function that writes intermediate-level blocks 692 // first, then root level, and returns the offset of the root level block 693 // index. 694 695 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 696 trailer.setLoadOnOpenOffset(rootIndexOffset); 697 698 // Meta block index. 699 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX), 700 "meta"); 701 blockWriter.writeHeaderAndData(outputStream); 702 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 703 704 if (this.hFileContext.isIncludesMvcc()) { 705 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 706 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 707 } 708 709 // File info 710 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 711 blockWriter.writeHeaderAndData(outputStream); 712 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 713 714 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 715 for (BlockWritable w : additionalLoadOnOpenData) { 716 blockWriter.writeBlock(w, outputStream); 717 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 718 } 719 720 // Now finish off the trailer. 721 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 722 trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize()); 723 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 724 trailer.setLastDataBlockOffset(lastDataBlockOffset); 725 trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); 726 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 727 728 finishClose(trailer); 729 730 blockWriter.release(); 731 } 732 733 @Override 734 public void addInlineBlockWriter(InlineBlockWriter ibw) { 735 inlineBlockWriters.add(ibw); 736 } 737 738 @Override 739 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 740 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 741 } 742 743 @Override 744 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 745 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 746 } 747 748 private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) { 749 if (bfw.getKeyCount() <= 0) { 750 return; 751 } 752 753 if ( 754 blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META 755 ) { 756 throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported"); 757 } 758 additionalLoadOnOpenData.add(new BlockWritable() { 759 @Override 760 public BlockType getBlockType() { 761 return blockType; 762 } 763 764 @Override 765 public void writeToBlock(DataOutput out) throws IOException { 766 bfw.getMetaWriter().write(out); 767 Writable dataWriter = bfw.getDataWriter(); 768 if (dataWriter != null) { 769 dataWriter.write(out); 770 } 771 } 772 }); 773 } 774 775 @Override 776 public HFileContext getFileContext() { 777 return hFileContext; 778 } 779 780 /** 781 * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on 782 * construction. Cell to add. Cannot be empty nor null. 783 */ 784 @Override 785 public void append(final ExtendedCell cell) throws IOException { 786 // checkKey uses comparator to check we are writing in order. 787 boolean dupKey = checkKey(cell); 788 if (!dupKey) { 789 checkBlockBoundary(); 790 } 791 792 if (!blockWriter.isWriting()) { 793 newBlock(); 794 } 795 796 blockWriter.write(cell); 797 798 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 799 totalValueLength += cell.getValueLength(); 800 if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) { 801 lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell); 802 keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell); 803 } 804 // Are we the first key in this block? 805 if (firstCellInBlock == null) { 806 // If cell is big, block will be closed and this firstCellInBlock reference will only last 807 // a short while. 808 firstCellInBlock = cell; 809 } 810 811 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 812 lastCell = cell; 813 entryCount++; 814 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 815 int tagsLength = cell.getTagsLength(); 816 if (tagsLength > this.maxTagsLength) { 817 this.maxTagsLength = tagsLength; 818 } 819 820 trackTimestamps(cell); 821 } 822 823 @Override 824 public void beforeShipped() throws IOException { 825 this.blockWriter.beforeShipped(); 826 // Add clone methods for every cell 827 if (this.lastCell != null) { 828 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 829 } 830 if (this.firstCellInBlock != null) { 831 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 832 } 833 if (this.lastCellOfPreviousBlock != null) { 834 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 835 } 836 } 837 838 public Cell getLastCell() { 839 return lastCell; 840 } 841 842 protected void finishFileInfo() throws IOException { 843 if (lastCell != null) { 844 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 845 // byte buffer. Won't take a tuple. 846 byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 847 fileInfo.append(HFileInfo.LASTKEY, lastKey, false); 848 } 849 850 // Average key length. 851 int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 852 fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 853 fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 854 false); 855 856 // Average value length. 857 int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 858 fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 859 860 // Biggest cell. 861 if (keyOfBiggestCell != null) { 862 fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false); 863 fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false); 864 LOG.debug("Len of the biggest cell in {} is {}, key is {}", 865 this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell, 866 CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false)); 867 } 868 869 if (hFileContext.isIncludesTags()) { 870 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 871 // from the FileInfo 872 fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 873 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 874 && hFileContext.isCompressTags(); 875 fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 876 } 877 } 878 879 protected int getMajorVersion() { 880 return 3; 881 } 882 883 protected int getMinorVersion() { 884 return HFileReaderImpl.MAX_MINOR_VERSION; 885 } 886 887 protected void finishClose(FixedFileTrailer trailer) throws IOException { 888 // Write out encryption metadata before finalizing if we have a valid crypto context 889 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 890 if (cryptoContext != Encryption.Context.NONE) { 891 // key management is not yet implemented, so kekData is always null 892 // Use traditional encryption with master key 893 String wrapperSubject = cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 894 User.getCurrent().getShortName()); 895 Key encKey = cryptoContext.getKey(); 896 897 // Wrap the context's key and write it as the encryption metadata 898 if (encKey != null) { 899 byte[] wrappedKey = 900 EncryptionUtil.wrapKey(cryptoContext.getConf(), wrapperSubject, encKey, null); 901 trailer.setEncryptionKey(wrappedKey); 902 } 903 904 // Key management fields - not yet implemented, set to defaults 905 trailer.setKeyNamespace(null); 906 trailer.setKEKMetadata(null); 907 trailer.setKEKChecksum(0); 908 } 909 // Now we can finish the close 910 trailer.setMetaIndexCount(metaNames.size()); 911 trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize()); 912 trailer.setEntryCount(entryCount); 913 trailer.setCompressionCodec(hFileContext.getCompression()); 914 915 long startTime = EnvironmentEdgeManager.currentTime(); 916 trailer.serialize(outputStream); 917 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 918 919 if (closeOutputStream) { 920 outputStream.close(); 921 outputStream = null; 922 } 923 } 924 925 /** 926 * Add TimestampRange and earliest put timestamp to Metadata 927 */ 928 public void appendTrackedTimestampsToMetadata() throws IOException { 929 // TODO: The StoreFileReader always converts the byte[] to TimeRange 930 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 931 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 932 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 933 } 934 935 public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) 936 throws IOException { 937 // TODO: The StoreFileReader always converts the byte[] to TimeRange 938 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 939 appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); 940 } 941 942 /** 943 * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker 944 * to include the timestamp of this key 945 */ 946 private void trackTimestamps(final ExtendedCell cell) { 947 if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) { 948 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 949 } 950 timeRangeTracker.includeTimestamp(cell); 951 } 952}