001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hfile; 020 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.net.InetSocketAddress; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataOutputStream; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.fs.permission.FsPermission; 033import org.apache.hadoop.hbase.ByteBufferExtendedCell; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValueUtil; 040import org.apache.hadoop.hbase.PrivateCellUtil; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.crypto.Encryption; 043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 044import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 045import org.apache.hadoop.hbase.security.EncryptionUtil; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.util.BloomFilterWriter; 048import org.apache.hadoop.hbase.util.ByteBufferUtils; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.CommonFSUtils; 051import org.apache.hadoop.hbase.util.FSUtils; 052import org.apache.hadoop.io.Writable; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 058 059/** 060 * Common functionality needed by all versions of {@link HFile} writers. 061 */ 062@InterfaceAudience.Private 063public class HFileWriterImpl implements HFile.Writer { 064 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 065 066 private static final long UNSET = -1; 067 068 /** if this feature is enabled, preCalculate encoded data size before real encoding happens*/ 069 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = 070 "hbase.writer.unified.encoded.blocksize.ratio"; 071 072 /** Block size limit after encoding, used to unify encoded block Cache entry size*/ 073 private final int encodedBlockSizeLimit; 074 075 /** The Cell previously appended. Becomes the last cell in the file.*/ 076 protected Cell lastCell = null; 077 078 /** FileSystem stream to write into. */ 079 protected FSDataOutputStream outputStream; 080 081 /** True if we opened the <code>outputStream</code> (and so will close it). */ 082 protected final boolean closeOutputStream; 083 084 /** A "file info" block: a key-value map of file-wide metadata. */ 085 protected HFileInfo fileInfo = new HFileInfo(); 086 087 /** Total # of key/value entries, i.e. how many times add() was called. */ 088 protected long entryCount = 0; 089 090 /** Used for calculating the average key length. */ 091 protected long totalKeyLength = 0; 092 093 /** Used for calculating the average value length. */ 094 protected long totalValueLength = 0; 095 096 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 097 protected long totalUncompressedBytes = 0; 098 099 /** Meta block names. */ 100 protected List<byte[]> metaNames = new ArrayList<>(); 101 102 /** {@link Writable}s representing meta block data. */ 103 protected List<Writable> metaData = new ArrayList<>(); 104 105 /** 106 * First cell in a block. 107 * This reference should be short-lived since we write hfiles in a burst. 108 */ 109 protected Cell firstCellInBlock = null; 110 111 112 /** May be null if we were passed a stream. */ 113 protected final Path path; 114 115 /** Cache configuration for caching data on write. */ 116 protected final CacheConfig cacheConf; 117 118 /** 119 * Name for this object used when logging or in toString. Is either 120 * the result of a toString on stream or else name of passed file Path. 121 */ 122 protected final String name; 123 124 /** 125 * The data block encoding which will be used. 126 * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding. 127 */ 128 protected final HFileDataBlockEncoder blockEncoder; 129 130 protected final HFileContext hFileContext; 131 132 private int maxTagsLength = 0; 133 134 /** KeyValue version in FileInfo */ 135 public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 136 137 /** Version for KeyValue which includes memstore timestamp */ 138 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 139 140 /** Inline block writers for multi-level block index and compound Blooms. */ 141 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 142 143 /** block writer */ 144 protected HFileBlock.Writer blockWriter; 145 146 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 147 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 148 149 /** The offset of the first data block or -1 if the file is empty. */ 150 private long firstDataBlockOffset = UNSET; 151 152 /** The offset of the last data block or 0 if the file is empty. */ 153 protected long lastDataBlockOffset = UNSET; 154 155 /** 156 * The last(stop) Cell of the previous data block. 157 * This reference should be short-lived since we write hfiles in a burst. 158 */ 159 private Cell lastCellOfPreviousBlock = null; 160 161 /** Additional data items to be written to the "load-on-open" section. */ 162 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 163 164 protected long maxMemstoreTS = 0; 165 166 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 167 FSDataOutputStream outputStream, HFileContext fileContext) { 168 this.outputStream = outputStream; 169 this.path = path; 170 this.name = path != null ? path.getName() : outputStream.toString(); 171 this.hFileContext = fileContext; 172 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 173 if (encoding != DataBlockEncoding.NONE) { 174 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 175 } else { 176 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 177 } 178 closeOutputStream = path != null; 179 this.cacheConf = cacheConf; 180 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f); 181 this.encodedBlockSizeLimit = (int)(hFileContext.getBlocksize() * encodeBlockSizeRatio); 182 finishInit(conf); 183 if (LOG.isTraceEnabled()) { 184 LOG.trace("Writer" + (path != null ? " for " + path : "") + 185 " initialized with cacheConf: " + cacheConf + 186 " fileContext: " + fileContext); 187 } 188 } 189 190 /** 191 * Add to the file info. All added key/value pairs can be obtained using 192 * {@link HFile.Reader#getHFileInfo()}. 193 * 194 * @param k Key 195 * @param v Value 196 * @throws IOException in case the key or the value are invalid 197 */ 198 @Override 199 public void appendFileInfo(final byte[] k, final byte[] v) 200 throws IOException { 201 fileInfo.append(k, v, true); 202 } 203 204 /** 205 * Sets the file info offset in the trailer, finishes up populating fields in 206 * the file info, and writes the file info into the given data output. The 207 * reason the data output is not always {@link #outputStream} is that we store 208 * file info as a block in version 2. 209 * 210 * @param trailer fixed file trailer 211 * @param out the data output to write the file info to 212 */ 213 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 214 throws IOException { 215 trailer.setFileInfoOffset(outputStream.getPos()); 216 finishFileInfo(); 217 long startTime = System.currentTimeMillis(); 218 fileInfo.write(out); 219 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 220 } 221 222 /** 223 * Checks that the given Cell's key does not violate the key order. 224 * 225 * @param cell Cell whose key to check. 226 * @return true if the key is duplicate 227 * @throws IOException if the key or the key order is wrong 228 */ 229 protected boolean checkKey(final Cell cell) throws IOException { 230 boolean isDuplicateKey = false; 231 232 if (cell == null) { 233 throw new IOException("Key cannot be null or empty"); 234 } 235 if (lastCell != null) { 236 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), 237 lastCell, cell); 238 if (keyComp > 0) { 239 String message = getLexicalErrorMessage(cell); 240 throw new IOException(message); 241 } else if (keyComp == 0) { 242 isDuplicateKey = true; 243 } 244 } 245 return isDuplicateKey; 246 } 247 248 private String getLexicalErrorMessage(Cell cell) { 249 StringBuilder sb = new StringBuilder(); 250 sb.append("Added a key not lexically larger than previous. Current cell = "); 251 sb.append(cell); 252 sb.append(", lastCell = "); 253 sb.append(lastCell); 254 //file context includes HFile path and optionally table and CF of file being written 255 sb.append("fileContext="); 256 sb.append(hFileContext); 257 return sb.toString(); 258 } 259 260 /** Checks the given value for validity. */ 261 protected void checkValue(final byte[] value, final int offset, 262 final int length) throws IOException { 263 if (value == null) { 264 throw new IOException("Value cannot be null"); 265 } 266 } 267 268 /** 269 * @return Path or null if we were passed a stream rather than a Path. 270 */ 271 @Override 272 public Path getPath() { 273 return path; 274 } 275 276 @Override 277 public String toString() { 278 return "writer=" + (path != null ? path.toString() : null) + ", name=" 279 + name + ", compression=" + hFileContext.getCompression().getName(); 280 } 281 282 public static Compression.Algorithm compressionByName(String algoName) { 283 if (algoName == null) { 284 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 285 } 286 return Compression.getCompressionAlgorithmByName(algoName); 287 } 288 289 /** A helper method to create HFile output streams in constructors */ 290 protected static FSDataOutputStream createOutputStream(Configuration conf, 291 FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { 292 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, 293 HConstants.DATA_FILE_UMASK_KEY); 294 return FSUtils.create(conf, fs, path, perms, favoredNodes); 295 } 296 297 /** Additional initialization steps */ 298 protected void finishInit(final Configuration conf) { 299 if (blockWriter != null) { 300 throw new IllegalStateException("finishInit called twice"); 301 } 302 blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext, 303 cacheConf.getByteBuffAllocator()); 304 // Data block index writer 305 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 306 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 307 cacheIndexesOnWrite ? cacheConf : null, 308 cacheIndexesOnWrite ? name : null); 309 dataBlockIndexWriter.setMaxChunkSize( 310 HFileBlockIndex.getMaxChunkSize(conf)); 311 dataBlockIndexWriter.setMinIndexNumEntries( 312 HFileBlockIndex.getMinIndexNumEntries(conf)); 313 inlineBlockWriters.add(dataBlockIndexWriter); 314 315 // Meta data block index writer 316 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 317 LOG.trace("Initialized with {}", cacheConf); 318 } 319 320 /** 321 * At a block boundary, write all the inline blocks and opens new block. 322 */ 323 protected void checkBlockBoundary() throws IOException { 324 // For encoder like prefixTree, encoded size is not available, so we have to compare both 325 // encoded size and unencoded size to blocksize limit. 326 if (blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit 327 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()) { 328 finishBlock(); 329 writeInlineBlocks(false); 330 newBlock(); 331 } 332 } 333 334 /** Clean up the data block that is currently being written.*/ 335 private void finishBlock() throws IOException { 336 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { 337 return; 338 } 339 340 // Update the first data block offset if UNSET; used scanning. 341 if (firstDataBlockOffset == UNSET) { 342 firstDataBlockOffset = outputStream.getPos(); 343 } 344 // Update the last data block offset each time through here. 345 lastDataBlockOffset = outputStream.getPos(); 346 blockWriter.writeHeaderAndData(outputStream); 347 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 348 Cell indexEntry = 349 getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); 350 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 351 lastDataBlockOffset, onDiskSize); 352 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 353 if (cacheConf.shouldCacheDataOnWrite()) { 354 doCacheOnWrite(lastDataBlockOffset); 355 } 356 } 357 358 /** 359 * Try to return a Cell that falls between <code>left</code> and 360 * <code>right</code> but that is shorter; i.e. takes up less space. This 361 * trick is used building HFile block index. Its an optimization. It does not 362 * always work. In this case we'll just return the <code>right</code> cell. 363 * @return A cell that sorts between <code>left</code> and <code>right</code>. 364 */ 365 public static Cell getMidpoint(final CellComparator comparator, final Cell left, 366 final Cell right) { 367 // TODO: Redo so only a single pass over the arrays rather than one to 368 // compare and then a second composing midpoint. 369 if (right == null) { 370 throw new IllegalArgumentException("right cell can not be null"); 371 } 372 if (left == null) { 373 return right; 374 } 375 // If Cells from meta table, don't mess around. meta table Cells have schema 376 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 377 // out without trying to do this optimization. 378 if (comparator instanceof MetaCellComparator) { 379 return right; 380 } 381 int diff = comparator.compareRows(left, right); 382 if (diff > 0) { 383 throw new IllegalArgumentException("Left row sorts after right row; left=" 384 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 385 } 386 byte[] midRow; 387 boolean bufferBacked = left instanceof ByteBufferExtendedCell 388 && right instanceof ByteBufferExtendedCell; 389 if (diff < 0) { 390 // Left row is < right row. 391 if (bufferBacked) { 392 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 393 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 394 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 395 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 396 } else { 397 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), 398 left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength()); 399 } 400 // If midRow is null, just return 'right'. Can't do optimization. 401 if (midRow == null) { 402 return right; 403 } 404 return PrivateCellUtil.createFirstOnRow(midRow); 405 } 406 // Rows are same. Compare on families. 407 diff = comparator.compareFamilies(left, right); 408 if (diff > 0) { 409 throw new IllegalArgumentException("Left family sorts after right family; left=" 410 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 411 } 412 if (diff < 0) { 413 if (bufferBacked) { 414 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 415 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 416 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 417 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 418 } else { 419 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 420 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 421 right.getFamilyLength()); 422 } 423 // If midRow is null, just return 'right'. Can't do optimization. 424 if (midRow == null) { 425 return right; 426 } 427 // Return new Cell where we use right row and then a mid sort family. 428 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 429 } 430 // Families are same. Compare on qualifiers. 431 diff = comparator.compareQualifiers(left, right); 432 if (diff > 0) { 433 throw new IllegalArgumentException("Left qualifier sorts after right qualifier; left=" 434 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 435 } 436 if (diff < 0) { 437 if (bufferBacked) { 438 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 439 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 440 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 441 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 442 } else { 443 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 444 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 445 right.getQualifierLength()); 446 } 447 // If midRow is null, just return 'right'. Can't do optimization. 448 if (midRow == null) { 449 return right; 450 } 451 // Return new Cell where we use right row and family and then a mid sort qualifier. 452 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 453 } 454 // No opportunity for optimization. Just return right key. 455 return right; 456 } 457 458 /** 459 * @return Return a new array that is between left and right and minimally 460 * sized else just return null as indicator that we could not create a 461 * mid point. 462 */ 463 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 464 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 465 // rows are different 466 int minLength = leftLength < rightLength ? leftLength : rightLength; 467 int diffIdx = 0; 468 while (diffIdx < minLength 469 && leftArray[leftOffset + diffIdx] == rightArray[rightOffset + diffIdx]) { 470 diffIdx++; 471 } 472 byte[] minimumMidpointArray = null; 473 if (diffIdx >= minLength) { 474 // leftKey's row is prefix of rightKey's. 475 minimumMidpointArray = new byte[diffIdx + 1]; 476 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1); 477 } else { 478 int diffByte = leftArray[leftOffset + diffIdx]; 479 if ((0xff & diffByte) < 0xff && (diffByte + 1) < (rightArray[rightOffset + diffIdx] & 0xff)) { 480 minimumMidpointArray = new byte[diffIdx + 1]; 481 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx); 482 minimumMidpointArray[diffIdx] = (byte) (diffByte + 1); 483 } else { 484 minimumMidpointArray = new byte[diffIdx + 1]; 485 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1); 486 } 487 } 488 return minimumMidpointArray; 489 } 490 491 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 492 ByteBuffer right, int rightOffset, int rightLength) { 493 // rows are different 494 int minLength = leftLength < rightLength ? leftLength : rightLength; 495 int diffIdx = 0; 496 while (diffIdx < minLength && ByteBufferUtils.toByte(left, 497 leftOffset + diffIdx) == ByteBufferUtils.toByte(right, rightOffset + diffIdx)) { 498 diffIdx++; 499 } 500 byte[] minMidpoint = null; 501 if (diffIdx >= minLength) { 502 // leftKey's row is prefix of rightKey's. 503 minMidpoint = new byte[diffIdx + 1]; 504 ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1); 505 } else { 506 int diffByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 507 if ((0xff & diffByte) < 0xff 508 && (diffByte + 1) < (ByteBufferUtils.toByte(right, rightOffset + diffIdx) & 0xff)) { 509 minMidpoint = new byte[diffIdx + 1]; 510 ByteBufferUtils.copyFromBufferToArray(minMidpoint, left, leftOffset, 0, diffIdx); 511 minMidpoint[diffIdx] = (byte) (diffByte + 1); 512 } else { 513 minMidpoint = new byte[diffIdx + 1]; 514 ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1); 515 } 516 } 517 return minMidpoint; 518 } 519 520 /** Gives inline block writers an opportunity to contribute blocks. */ 521 private void writeInlineBlocks(boolean closing) throws IOException { 522 for (InlineBlockWriter ibw : inlineBlockWriters) { 523 while (ibw.shouldWriteBlock(closing)) { 524 long offset = outputStream.getPos(); 525 boolean cacheThisBlock = ibw.getCacheOnWrite(); 526 ibw.writeInlineBlock(blockWriter.startWriting( 527 ibw.getInlineBlockType())); 528 blockWriter.writeHeaderAndData(outputStream); 529 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 530 blockWriter.getUncompressedSizeWithoutHeader()); 531 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 532 533 if (cacheThisBlock) { 534 doCacheOnWrite(offset); 535 } 536 } 537 } 538 } 539 540 /** 541 * Caches the last written HFile block. 542 * @param offset the offset of the block we want to cache. Used to determine 543 * the cache key. 544 */ 545 private void doCacheOnWrite(long offset) { 546 cacheConf.getBlockCache().ifPresent(cache -> { 547 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 548 try { 549 cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), 550 cacheFormatBlock); 551 } finally { 552 // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent 553 cacheFormatBlock.release(); 554 } 555 }); 556 } 557 558 /** 559 * Ready a new block for writing. 560 */ 561 protected void newBlock() throws IOException { 562 // This is where the next block begins. 563 blockWriter.startWriting(BlockType.DATA); 564 firstCellInBlock = null; 565 if (lastCell != null) { 566 lastCellOfPreviousBlock = lastCell; 567 } 568 } 569 570 /** 571 * Add a meta block to the end of the file. Call before close(). Metadata 572 * blocks are expensive. Fill one with a bunch of serialized data rather than 573 * do a metadata block per metadata instance. If metadata is small, consider 574 * adding to file info using {@link #appendFileInfo(byte[], byte[])} 575 * 576 * @param metaBlockName 577 * name of the block 578 * @param content 579 * will call readFields to get data later (DO NOT REUSE) 580 */ 581 @Override 582 public void appendMetaBlock(String metaBlockName, Writable content) { 583 byte[] key = Bytes.toBytes(metaBlockName); 584 int i; 585 for (i = 0; i < metaNames.size(); ++i) { 586 // stop when the current key is greater than our own 587 byte[] cur = metaNames.get(i); 588 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, 589 key.length) > 0) { 590 break; 591 } 592 } 593 metaNames.add(i, key); 594 metaData.add(i, content); 595 } 596 597 @Override 598 public void close() throws IOException { 599 if (outputStream == null) { 600 return; 601 } 602 // Save data block encoder metadata in the file info. 603 blockEncoder.saveMetadata(this); 604 // Write out the end of the data blocks, then write meta data blocks. 605 // followed by fileinfo, data block index and meta block index. 606 607 finishBlock(); 608 writeInlineBlocks(true); 609 610 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 611 612 // Write out the metadata blocks if any. 613 if (!metaNames.isEmpty()) { 614 for (int i = 0; i < metaNames.size(); ++i) { 615 // store the beginning offset 616 long offset = outputStream.getPos(); 617 // write the metadata content 618 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 619 metaData.get(i).write(dos); 620 621 blockWriter.writeHeaderAndData(outputStream); 622 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 623 624 // Add the new meta block to the meta index. 625 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 626 blockWriter.getOnDiskSizeWithHeader()); 627 } 628 } 629 630 // Load-on-open section. 631 632 // Data block index. 633 // 634 // In version 2, this section of the file starts with the root level data 635 // block index. We call a function that writes intermediate-level blocks 636 // first, then root level, and returns the offset of the root level block 637 // index. 638 639 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 640 trailer.setLoadOnOpenOffset(rootIndexOffset); 641 642 // Meta block index. 643 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting( 644 BlockType.ROOT_INDEX), "meta"); 645 blockWriter.writeHeaderAndData(outputStream); 646 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 647 648 if (this.hFileContext.isIncludesMvcc()) { 649 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 650 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 651 } 652 653 // File info 654 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 655 blockWriter.writeHeaderAndData(outputStream); 656 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 657 658 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 659 for (BlockWritable w : additionalLoadOnOpenData){ 660 blockWriter.writeBlock(w, outputStream); 661 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 662 } 663 664 // Now finish off the trailer. 665 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 666 trailer.setUncompressedDataIndexSize( 667 dataBlockIndexWriter.getTotalUncompressedSize()); 668 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 669 trailer.setLastDataBlockOffset(lastDataBlockOffset); 670 trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); 671 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 672 673 674 finishClose(trailer); 675 676 blockWriter.release(); 677 } 678 679 @Override 680 public void addInlineBlockWriter(InlineBlockWriter ibw) { 681 inlineBlockWriters.add(ibw); 682 } 683 684 @Override 685 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 686 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 687 } 688 689 @Override 690 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 691 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 692 } 693 694 private void addBloomFilter(final BloomFilterWriter bfw, 695 final BlockType blockType) { 696 if (bfw.getKeyCount() <= 0) { 697 return; 698 } 699 700 if (blockType != BlockType.GENERAL_BLOOM_META && 701 blockType != BlockType.DELETE_FAMILY_BLOOM_META) { 702 throw new RuntimeException("Block Type: " + blockType.toString() + 703 "is not supported"); 704 } 705 additionalLoadOnOpenData.add(new BlockWritable() { 706 @Override 707 public BlockType getBlockType() { 708 return blockType; 709 } 710 711 @Override 712 public void writeToBlock(DataOutput out) throws IOException { 713 bfw.getMetaWriter().write(out); 714 Writable dataWriter = bfw.getDataWriter(); 715 if (dataWriter != null) { 716 dataWriter.write(out); 717 } 718 } 719 }); 720 } 721 722 @Override 723 public HFileContext getFileContext() { 724 return hFileContext; 725 } 726 727 /** 728 * Add key/value to file. Keys must be added in an order that agrees with the 729 * Comparator passed on construction. 730 * 731 * @param cell 732 * Cell to add. Cannot be empty nor null. 733 */ 734 @Override 735 public void append(final Cell cell) throws IOException { 736 // checkKey uses comparator to check we are writing in order. 737 boolean dupKey = checkKey(cell); 738 if (!dupKey) { 739 checkBlockBoundary(); 740 } 741 742 if (!blockWriter.isWriting()) { 743 newBlock(); 744 } 745 746 blockWriter.write(cell); 747 748 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 749 totalValueLength += cell.getValueLength(); 750 751 // Are we the first key in this block? 752 if (firstCellInBlock == null) { 753 // If cell is big, block will be closed and this firstCellInBlock reference will only last 754 // a short while. 755 firstCellInBlock = cell; 756 } 757 758 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 759 lastCell = cell; 760 entryCount++; 761 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 762 int tagsLength = cell.getTagsLength(); 763 if (tagsLength > this.maxTagsLength) { 764 this.maxTagsLength = tagsLength; 765 } 766 } 767 768 @Override 769 public void beforeShipped() throws IOException { 770 this.blockWriter.beforeShipped(); 771 // Add clone methods for every cell 772 if (this.lastCell != null) { 773 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 774 } 775 if (this.firstCellInBlock != null) { 776 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 777 } 778 if (this.lastCellOfPreviousBlock != null) { 779 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 780 } 781 } 782 783 @VisibleForTesting 784 public Cell getLastCell() { 785 return lastCell; 786 } 787 788 protected void finishFileInfo() throws IOException { 789 if (lastCell != null) { 790 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 791 // byte buffer. Won't take a tuple. 792 byte [] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 793 fileInfo.append(HFileInfo.LASTKEY, lastKey, false); 794 } 795 796 // Average key length. 797 int avgKeyLen = 798 entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 799 fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 800 fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 801 false); 802 803 // Average value length. 804 int avgValueLen = 805 entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 806 fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 807 if (hFileContext.isIncludesTags()) { 808 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 809 // from the FileInfo 810 fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 811 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 812 && hFileContext.isCompressTags(); 813 fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 814 } 815 } 816 817 protected int getMajorVersion() { 818 return 3; 819 } 820 821 protected int getMinorVersion() { 822 return HFileReaderImpl.MAX_MINOR_VERSION; 823 } 824 825 protected void finishClose(FixedFileTrailer trailer) throws IOException { 826 // Write out encryption metadata before finalizing if we have a valid crypto context 827 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 828 if (cryptoContext != Encryption.Context.NONE) { 829 // Wrap the context's key and write it as the encryption metadata, the wrapper includes 830 // all information needed for decryption 831 trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(), 832 cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 833 User.getCurrent().getShortName()), 834 cryptoContext.getKey())); 835 } 836 // Now we can finish the close 837 trailer.setMetaIndexCount(metaNames.size()); 838 trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize()); 839 trailer.setEntryCount(entryCount); 840 trailer.setCompressionCodec(hFileContext.getCompression()); 841 842 long startTime = System.currentTimeMillis(); 843 trailer.serialize(outputStream); 844 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 845 846 if (closeOutputStream) { 847 outputStream.close(); 848 outputStream = null; 849 } 850 } 851}