001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hfile; 020 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.net.InetSocketAddress; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.List; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.permission.FsPermission; 034import org.apache.hadoop.hbase.ByteBufferExtendedCell; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.KeyValueUtil; 041import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hadoop.hbase.io.compress.Compression; 046import org.apache.hadoop.hbase.io.crypto.Encryption; 047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 048import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; 049import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 050import org.apache.hadoop.hbase.security.EncryptionUtil; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.util.BloomFilterWriter; 053import org.apache.hadoop.hbase.util.ByteBufferUtils; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.FSUtils; 056import org.apache.hadoop.io.Writable; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059 060/** 061 * Common functionality needed by all versions of {@link HFile} writers. 062 */ 063@InterfaceAudience.Private 064public class HFileWriterImpl implements HFile.Writer { 065 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 066 067 private static final long UNSET = -1; 068 069 /** if this feature is enabled, preCalculate encoded data size before real encoding happens*/ 070 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = "hbase.writer.unified.encoded.blocksize.ratio"; 071 072 /** Block size limit after encoding, used to unify encoded block Cache entry size*/ 073 private final int encodedBlockSizeLimit; 074 075 /** The Cell previously appended. Becomes the last cell in the file.*/ 076 protected Cell lastCell = null; 077 078 /** FileSystem stream to write into. */ 079 protected FSDataOutputStream outputStream; 080 081 /** True if we opened the <code>outputStream</code> (and so will close it). */ 082 protected final boolean closeOutputStream; 083 084 /** A "file info" block: a key-value map of file-wide metadata. */ 085 protected FileInfo fileInfo = new HFile.FileInfo(); 086 087 /** Total # of key/value entries, i.e. how many times add() was called. */ 088 protected long entryCount = 0; 089 090 /** Used for calculating the average key length. */ 091 protected long totalKeyLength = 0; 092 093 /** Used for calculating the average value length. */ 094 protected long totalValueLength = 0; 095 096 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 097 protected long totalUncompressedBytes = 0; 098 099 /** Key comparator. Used to ensure we write in order. */ 100 protected final CellComparator comparator; 101 102 /** Meta block names. */ 103 protected List<byte[]> metaNames = new ArrayList<>(); 104 105 /** {@link Writable}s representing meta block data. */ 106 protected List<Writable> metaData = new ArrayList<>(); 107 108 /** 109 * First cell in a block. 110 * This reference should be short-lived since we write hfiles in a burst. 111 */ 112 protected Cell firstCellInBlock = null; 113 114 115 /** May be null if we were passed a stream. */ 116 protected final Path path; 117 118 /** Cache configuration for caching data on write. */ 119 protected final CacheConfig cacheConf; 120 121 /** 122 * Name for this object used when logging or in toString. Is either 123 * the result of a toString on stream or else name of passed file Path. 124 */ 125 protected final String name; 126 127 /** 128 * The data block encoding which will be used. 129 * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding. 130 */ 131 protected final HFileDataBlockEncoder blockEncoder; 132 133 protected final HFileContext hFileContext; 134 135 private int maxTagsLength = 0; 136 137 /** KeyValue version in FileInfo */ 138 public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 139 140 /** Version for KeyValue which includes memstore timestamp */ 141 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 142 143 /** Inline block writers for multi-level block index and compound Blooms. */ 144 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 145 146 /** block writer */ 147 protected HFileBlock.Writer blockWriter; 148 149 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 150 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 151 152 /** The offset of the first data block or -1 if the file is empty. */ 153 private long firstDataBlockOffset = UNSET; 154 155 /** The offset of the last data block or 0 if the file is empty. */ 156 protected long lastDataBlockOffset = UNSET; 157 158 /** 159 * The last(stop) Cell of the previous data block. 160 * This reference should be short-lived since we write hfiles in a burst. 161 */ 162 private Cell lastCellOfPreviousBlock = null; 163 164 /** Additional data items to be written to the "load-on-open" section. */ 165 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 166 167 protected long maxMemstoreTS = 0; 168 169 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 170 FSDataOutputStream outputStream, 171 CellComparator comparator, HFileContext fileContext) { 172 this.outputStream = outputStream; 173 this.path = path; 174 this.name = path != null ? path.getName() : outputStream.toString(); 175 this.hFileContext = fileContext; 176 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 177 if (encoding != DataBlockEncoding.NONE) { 178 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 179 } else { 180 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 181 } 182 this.comparator = comparator != null ? comparator : CellComparator.getInstance(); 183 184 closeOutputStream = path != null; 185 this.cacheConf = cacheConf; 186 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f); 187 this.encodedBlockSizeLimit = (int)(hFileContext.getBlocksize() * encodeBlockSizeRatio); 188 finishInit(conf); 189 if (LOG.isTraceEnabled()) { 190 LOG.trace("Writer" + (path != null ? " for " + path : "") + 191 " initialized with cacheConf: " + cacheConf + 192 " comparator: " + comparator.getClass().getSimpleName() + 193 " fileContext: " + fileContext); 194 } 195 } 196 197 /** 198 * Add to the file info. All added key/value pairs can be obtained using 199 * {@link HFile.Reader#loadFileInfo()}. 200 * 201 * @param k Key 202 * @param v Value 203 * @throws IOException in case the key or the value are invalid 204 */ 205 @Override 206 public void appendFileInfo(final byte[] k, final byte[] v) 207 throws IOException { 208 fileInfo.append(k, v, true); 209 } 210 211 /** 212 * Sets the file info offset in the trailer, finishes up populating fields in 213 * the file info, and writes the file info into the given data output. The 214 * reason the data output is not always {@link #outputStream} is that we store 215 * file info as a block in version 2. 216 * 217 * @param trailer fixed file trailer 218 * @param out the data output to write the file info to 219 * @throws IOException 220 */ 221 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 222 throws IOException { 223 trailer.setFileInfoOffset(outputStream.getPos()); 224 finishFileInfo(); 225 long startTime = System.currentTimeMillis(); 226 fileInfo.write(out); 227 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 228 } 229 230 /** 231 * Checks that the given Cell's key does not violate the key order. 232 * 233 * @param cell Cell whose key to check. 234 * @return true if the key is duplicate 235 * @throws IOException if the key or the key order is wrong 236 */ 237 protected boolean checkKey(final Cell cell) throws IOException { 238 boolean isDuplicateKey = false; 239 240 if (cell == null) { 241 throw new IOException("Key cannot be null or empty"); 242 } 243 if (lastCell != null) { 244 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, lastCell, cell); 245 246 if (keyComp > 0) { 247 throw new IOException("Added a key not lexically larger than" 248 + " previous. Current cell = " + cell + ", lastCell = " + lastCell); 249 } else if (keyComp == 0) { 250 isDuplicateKey = true; 251 } 252 } 253 return isDuplicateKey; 254 } 255 256 /** Checks the given value for validity. */ 257 protected void checkValue(final byte[] value, final int offset, 258 final int length) throws IOException { 259 if (value == null) { 260 throw new IOException("Value cannot be null"); 261 } 262 } 263 264 /** 265 * @return Path or null if we were passed a stream rather than a Path. 266 */ 267 @Override 268 public Path getPath() { 269 return path; 270 } 271 272 @Override 273 public String toString() { 274 return "writer=" + (path != null ? path.toString() : null) + ", name=" 275 + name + ", compression=" + hFileContext.getCompression().getName(); 276 } 277 278 public static Compression.Algorithm compressionByName(String algoName) { 279 if (algoName == null) 280 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 281 return Compression.getCompressionAlgorithmByName(algoName); 282 } 283 284 /** A helper method to create HFile output streams in constructors */ 285 protected static FSDataOutputStream createOutputStream(Configuration conf, 286 FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { 287 FsPermission perms = FSUtils.getFilePermissions(fs, conf, 288 HConstants.DATA_FILE_UMASK_KEY); 289 return FSUtils.create(conf, fs, path, perms, favoredNodes); 290 } 291 292 /** Additional initialization steps */ 293 protected void finishInit(final Configuration conf) { 294 if (blockWriter != null) { 295 throw new IllegalStateException("finishInit called twice"); 296 } 297 298 blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); 299 300 // Data block index writer 301 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 302 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 303 cacheIndexesOnWrite ? cacheConf : null, 304 cacheIndexesOnWrite ? name : null); 305 dataBlockIndexWriter.setMaxChunkSize( 306 HFileBlockIndex.getMaxChunkSize(conf)); 307 dataBlockIndexWriter.setMinIndexNumEntries( 308 HFileBlockIndex.getMinIndexNumEntries(conf)); 309 inlineBlockWriters.add(dataBlockIndexWriter); 310 311 // Meta data block index writer 312 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 313 if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf); 314 } 315 316 /** 317 * At a block boundary, write all the inline blocks and opens new block. 318 * 319 * @throws IOException 320 */ 321 protected void checkBlockBoundary() throws IOException { 322 //for encoder like prefixTree, encoded size is not available, so we have to compare both encoded size 323 //and unencoded size to blocksize limit. 324 if (blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit 325 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()) { 326 finishBlock(); 327 writeInlineBlocks(false); 328 newBlock(); 329 } 330 } 331 332 /** Clean up the data block that is currently being written.*/ 333 private void finishBlock() throws IOException { 334 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) return; 335 336 // Update the first data block offset if UNSET; used scanning. 337 if (firstDataBlockOffset == UNSET) { 338 firstDataBlockOffset = outputStream.getPos(); 339 } 340 // Update the last data block offset each time through here. 341 lastDataBlockOffset = outputStream.getPos(); 342 blockWriter.writeHeaderAndData(outputStream); 343 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 344 Cell indexEntry = 345 getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock); 346 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 347 lastDataBlockOffset, onDiskSize); 348 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 349 if (cacheConf.shouldCacheDataOnWrite()) { 350 doCacheOnWrite(lastDataBlockOffset); 351 } 352 } 353 354 /** 355 * Try to return a Cell that falls between <code>left</code> and 356 * <code>right</code> but that is shorter; i.e. takes up less space. This 357 * trick is used building HFile block index. Its an optimization. It does not 358 * always work. In this case we'll just return the <code>right</code> cell. 359 * 360 * @param comparator 361 * Comparator to use. 362 * @param left 363 * @param right 364 * @return A cell that sorts between <code>left</code> and <code>right</code>. 365 */ 366 public static Cell getMidpoint(final CellComparator comparator, final Cell left, 367 final Cell right) { 368 // TODO: Redo so only a single pass over the arrays rather than one to 369 // compare and then a second composing midpoint. 370 if (right == null) { 371 throw new IllegalArgumentException("right cell can not be null"); 372 } 373 if (left == null) { 374 return right; 375 } 376 // If Cells from meta table, don't mess around. meta table Cells have schema 377 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 378 // out without trying to do this optimization. 379 if (comparator instanceof MetaCellComparator) { 380 return right; 381 } 382 int diff = comparator.compareRows(left, right); 383 if (diff > 0) { 384 throw new IllegalArgumentException("Left row sorts after right row; left=" 385 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 386 } 387 byte[] midRow; 388 boolean bufferBacked = left instanceof ByteBufferExtendedCell 389 && right instanceof ByteBufferExtendedCell; 390 if (diff < 0) { 391 // Left row is < right row. 392 if (bufferBacked) { 393 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 394 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 395 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 396 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 397 } else { 398 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), 399 left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength()); 400 } 401 // If midRow is null, just return 'right'. Can't do optimization. 402 if (midRow == null) return right; 403 return PrivateCellUtil.createFirstOnRow(midRow); 404 } 405 // Rows are same. Compare on families. 406 diff = comparator.compareFamilies(left, right); 407 if (diff > 0) { 408 throw new IllegalArgumentException("Left family sorts after right family; left=" 409 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 410 } 411 if (diff < 0) { 412 if (bufferBacked) { 413 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 414 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 415 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 416 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 417 } else { 418 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 419 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 420 right.getFamilyLength()); 421 } 422 // If midRow is null, just return 'right'. Can't do optimization. 423 if (midRow == null) return right; 424 // Return new Cell where we use right row and then a mid sort family. 425 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 426 } 427 // Families are same. Compare on qualifiers. 428 diff = comparator.compareQualifiers(left, right); 429 if (diff > 0) { 430 throw new IllegalArgumentException("Left qualifier sorts after right qualifier; left=" 431 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 432 } 433 if (diff < 0) { 434 if (bufferBacked) { 435 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 436 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 437 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 438 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 439 } else { 440 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 441 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 442 right.getQualifierLength()); 443 } 444 // If midRow is null, just return 'right'. Can't do optimization. 445 if (midRow == null) return right; 446 // Return new Cell where we use right row and family and then a mid sort qualifier. 447 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 448 } 449 // No opportunity for optimization. Just return right key. 450 return right; 451 } 452 453 /** 454 * @param leftArray 455 * @param leftOffset 456 * @param leftLength 457 * @param rightArray 458 * @param rightOffset 459 * @param rightLength 460 * @return Return a new array that is between left and right and minimally 461 * sized else just return null as indicator that we could not create a 462 * mid point. 463 */ 464 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 465 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 466 // rows are different 467 int minLength = leftLength < rightLength ? leftLength : rightLength; 468 int diffIdx = 0; 469 while (diffIdx < minLength 470 && leftArray[leftOffset + diffIdx] == rightArray[rightOffset + diffIdx]) { 471 diffIdx++; 472 } 473 byte[] minimumMidpointArray = null; 474 if (diffIdx >= minLength) { 475 // leftKey's row is prefix of rightKey's. 476 minimumMidpointArray = new byte[diffIdx + 1]; 477 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1); 478 } else { 479 int diffByte = leftArray[leftOffset + diffIdx]; 480 if ((0xff & diffByte) < 0xff && (diffByte + 1) < (rightArray[rightOffset + diffIdx] & 0xff)) { 481 minimumMidpointArray = new byte[diffIdx + 1]; 482 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx); 483 minimumMidpointArray[diffIdx] = (byte) (diffByte + 1); 484 } else { 485 minimumMidpointArray = new byte[diffIdx + 1]; 486 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1); 487 } 488 } 489 return minimumMidpointArray; 490 } 491 492 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 493 ByteBuffer right, int rightOffset, int rightLength) { 494 // rows are different 495 int minLength = leftLength < rightLength ? leftLength : rightLength; 496 int diffIdx = 0; 497 while (diffIdx < minLength && ByteBufferUtils.toByte(left, 498 leftOffset + diffIdx) == ByteBufferUtils.toByte(right, rightOffset + diffIdx)) { 499 diffIdx++; 500 } 501 byte[] minMidpoint = null; 502 if (diffIdx >= minLength) { 503 // leftKey's row is prefix of rightKey's. 504 minMidpoint = new byte[diffIdx + 1]; 505 ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1); 506 } else { 507 int diffByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 508 if ((0xff & diffByte) < 0xff 509 && (diffByte + 1) < (ByteBufferUtils.toByte(right, rightOffset + diffIdx) & 0xff)) { 510 minMidpoint = new byte[diffIdx + 1]; 511 ByteBufferUtils.copyFromBufferToArray(minMidpoint, left, leftOffset, 0, diffIdx); 512 minMidpoint[diffIdx] = (byte) (diffByte + 1); 513 } else { 514 minMidpoint = new byte[diffIdx + 1]; 515 ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1); 516 } 517 } 518 return minMidpoint; 519 } 520 521 /** Gives inline block writers an opportunity to contribute blocks. */ 522 private void writeInlineBlocks(boolean closing) throws IOException { 523 for (InlineBlockWriter ibw : inlineBlockWriters) { 524 while (ibw.shouldWriteBlock(closing)) { 525 long offset = outputStream.getPos(); 526 boolean cacheThisBlock = ibw.getCacheOnWrite(); 527 ibw.writeInlineBlock(blockWriter.startWriting( 528 ibw.getInlineBlockType())); 529 blockWriter.writeHeaderAndData(outputStream); 530 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 531 blockWriter.getUncompressedSizeWithoutHeader()); 532 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 533 534 if (cacheThisBlock) { 535 doCacheOnWrite(offset); 536 } 537 } 538 } 539 } 540 541 /** 542 * Caches the last written HFile block. 543 * @param offset the offset of the block we want to cache. Used to determine 544 * the cache key. 545 */ 546 private void doCacheOnWrite(long offset) { 547 cacheConf.getBlockCache().ifPresent(cache -> { 548 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 549 cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), 550 cacheFormatBlock); 551 }); 552 } 553 554 /** 555 * Ready a new block for writing. 556 * 557 * @throws IOException 558 */ 559 protected void newBlock() throws IOException { 560 // This is where the next block begins. 561 blockWriter.startWriting(BlockType.DATA); 562 firstCellInBlock = null; 563 if (lastCell != null) { 564 lastCellOfPreviousBlock = lastCell; 565 } 566 } 567 568 /** 569 * Add a meta block to the end of the file. Call before close(). Metadata 570 * blocks are expensive. Fill one with a bunch of serialized data rather than 571 * do a metadata block per metadata instance. If metadata is small, consider 572 * adding to file info using {@link #appendFileInfo(byte[], byte[])} 573 * 574 * @param metaBlockName 575 * name of the block 576 * @param content 577 * will call readFields to get data later (DO NOT REUSE) 578 */ 579 @Override 580 public void appendMetaBlock(String metaBlockName, Writable content) { 581 byte[] key = Bytes.toBytes(metaBlockName); 582 int i; 583 for (i = 0; i < metaNames.size(); ++i) { 584 // stop when the current key is greater than our own 585 byte[] cur = metaNames.get(i); 586 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, 587 key.length) > 0) { 588 break; 589 } 590 } 591 metaNames.add(i, key); 592 metaData.add(i, content); 593 } 594 595 @Override 596 public void close() throws IOException { 597 if (outputStream == null) { 598 return; 599 } 600 // Save data block encoder metadata in the file info. 601 blockEncoder.saveMetadata(this); 602 // Write out the end of the data blocks, then write meta data blocks. 603 // followed by fileinfo, data block index and meta block index. 604 605 finishBlock(); 606 writeInlineBlocks(true); 607 608 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 609 610 // Write out the metadata blocks if any. 611 if (!metaNames.isEmpty()) { 612 for (int i = 0; i < metaNames.size(); ++i) { 613 // store the beginning offset 614 long offset = outputStream.getPos(); 615 // write the metadata content 616 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 617 metaData.get(i).write(dos); 618 619 blockWriter.writeHeaderAndData(outputStream); 620 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 621 622 // Add the new meta block to the meta index. 623 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 624 blockWriter.getOnDiskSizeWithHeader()); 625 } 626 } 627 628 // Load-on-open section. 629 630 // Data block index. 631 // 632 // In version 2, this section of the file starts with the root level data 633 // block index. We call a function that writes intermediate-level blocks 634 // first, then root level, and returns the offset of the root level block 635 // index. 636 637 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 638 trailer.setLoadOnOpenOffset(rootIndexOffset); 639 640 // Meta block index. 641 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting( 642 BlockType.ROOT_INDEX), "meta"); 643 blockWriter.writeHeaderAndData(outputStream); 644 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 645 646 if (this.hFileContext.isIncludesMvcc()) { 647 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 648 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 649 } 650 651 // File info 652 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 653 blockWriter.writeHeaderAndData(outputStream); 654 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 655 656 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 657 for (BlockWritable w : additionalLoadOnOpenData){ 658 blockWriter.writeBlock(w, outputStream); 659 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 660 } 661 662 // Now finish off the trailer. 663 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 664 trailer.setUncompressedDataIndexSize( 665 dataBlockIndexWriter.getTotalUncompressedSize()); 666 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 667 trailer.setLastDataBlockOffset(lastDataBlockOffset); 668 trailer.setComparatorClass(comparator.getClass()); 669 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 670 671 672 finishClose(trailer); 673 674 blockWriter.release(); 675 } 676 677 @Override 678 public void addInlineBlockWriter(InlineBlockWriter ibw) { 679 inlineBlockWriters.add(ibw); 680 } 681 682 @Override 683 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 684 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 685 } 686 687 @Override 688 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 689 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 690 } 691 692 private void addBloomFilter(final BloomFilterWriter bfw, 693 final BlockType blockType) { 694 if (bfw.getKeyCount() <= 0) 695 return; 696 697 if (blockType != BlockType.GENERAL_BLOOM_META && 698 blockType != BlockType.DELETE_FAMILY_BLOOM_META) { 699 throw new RuntimeException("Block Type: " + blockType.toString() + 700 "is not supported"); 701 } 702 additionalLoadOnOpenData.add(new BlockWritable() { 703 @Override 704 public BlockType getBlockType() { 705 return blockType; 706 } 707 708 @Override 709 public void writeToBlock(DataOutput out) throws IOException { 710 bfw.getMetaWriter().write(out); 711 Writable dataWriter = bfw.getDataWriter(); 712 if (dataWriter != null) 713 dataWriter.write(out); 714 } 715 }); 716 } 717 718 @Override 719 public HFileContext getFileContext() { 720 return hFileContext; 721 } 722 723 /** 724 * Add key/value to file. Keys must be added in an order that agrees with the 725 * Comparator passed on construction. 726 * 727 * @param cell 728 * Cell to add. Cannot be empty nor null. 729 * @throws IOException 730 */ 731 @Override 732 public void append(final Cell cell) throws IOException { 733 // checkKey uses comparator to check we are writing in order. 734 boolean dupKey = checkKey(cell); 735 if (!dupKey) { 736 checkBlockBoundary(); 737 } 738 739 if (!blockWriter.isWriting()) { 740 newBlock(); 741 } 742 743 blockWriter.write(cell); 744 745 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 746 totalValueLength += cell.getValueLength(); 747 748 // Are we the first key in this block? 749 if (firstCellInBlock == null) { 750 // If cell is big, block will be closed and this firstCellInBlock reference will only last 751 // a short while. 752 firstCellInBlock = cell; 753 } 754 755 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 756 lastCell = cell; 757 entryCount++; 758 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 759 int tagsLength = cell.getTagsLength(); 760 if (tagsLength > this.maxTagsLength) { 761 this.maxTagsLength = tagsLength; 762 } 763 } 764 765 @Override 766 public void beforeShipped() throws IOException { 767 this.blockWriter.beforeShipped(); 768 // Add clone methods for every cell 769 if (this.lastCell != null) { 770 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 771 } 772 if (this.firstCellInBlock != null) { 773 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 774 } 775 if (this.lastCellOfPreviousBlock != null) { 776 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 777 } 778 } 779 780 @VisibleForTesting 781 public Cell getLastCell() { 782 return lastCell; 783 } 784 785 protected void finishFileInfo() throws IOException { 786 if (lastCell != null) { 787 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 788 // byte buffer. Won't take a tuple. 789 byte [] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 790 fileInfo.append(FileInfo.LASTKEY, lastKey, false); 791 } 792 793 // Average key length. 794 int avgKeyLen = 795 entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 796 fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 797 fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 798 false); 799 800 // Average value length. 801 int avgValueLen = 802 entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 803 fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 804 if (hFileContext.isIncludesTags()) { 805 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 806 // from the FileInfo 807 fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 808 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 809 && hFileContext.isCompressTags(); 810 fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 811 } 812 } 813 814 protected int getMajorVersion() { 815 return 3; 816 } 817 818 protected int getMinorVersion() { 819 return HFileReaderImpl.MAX_MINOR_VERSION; 820 } 821 822 protected void finishClose(FixedFileTrailer trailer) throws IOException { 823 // Write out encryption metadata before finalizing if we have a valid crypto context 824 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 825 if (cryptoContext != Encryption.Context.NONE) { 826 // Wrap the context's key and write it as the encryption metadata, the wrapper includes 827 // all information needed for decryption 828 trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(), 829 cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 830 User.getCurrent().getShortName()), 831 cryptoContext.getKey())); 832 } 833 // Now we can finish the close 834 trailer.setMetaIndexCount(metaNames.size()); 835 trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize()); 836 trailer.setEntryCount(entryCount); 837 trailer.setCompressionCodec(hFileContext.getCompression()); 838 839 long startTime = System.currentTimeMillis(); 840 trailer.serialize(outputStream); 841 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 842 843 if (closeOutputStream) { 844 outputStream.close(); 845 outputStream = null; 846 } 847 } 848}