001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hfile; 020 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.net.InetSocketAddress; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataOutputStream; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.fs.permission.FsPermission; 033import org.apache.hadoop.hbase.ByteBufferExtendedCell; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.MetaCellComparator; 040import org.apache.hadoop.hbase.PrivateCellUtil; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.crypto.Encryption; 043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 044import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 045import org.apache.hadoop.hbase.security.EncryptionUtil; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.util.BloomFilterWriter; 048import org.apache.hadoop.hbase.util.ByteBufferUtils; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.CommonFSUtils; 051import org.apache.hadoop.hbase.util.FSUtils; 052import org.apache.hadoop.io.Writable; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * Common functionality needed by all versions of {@link HFile} writers. 059 */ 060@InterfaceAudience.Private 061public class HFileWriterImpl implements HFile.Writer { 062 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 063 064 private static final long UNSET = -1; 065 066 /** if this feature is enabled, preCalculate encoded data size before real encoding happens*/ 067 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = 068 "hbase.writer.unified.encoded.blocksize.ratio"; 069 070 /** Block size limit after encoding, used to unify encoded block Cache entry size*/ 071 private final int encodedBlockSizeLimit; 072 073 /** The Cell previously appended. Becomes the last cell in the file.*/ 074 protected Cell lastCell = null; 075 076 /** FileSystem stream to write into. */ 077 protected FSDataOutputStream outputStream; 078 079 /** True if we opened the <code>outputStream</code> (and so will close it). */ 080 protected final boolean closeOutputStream; 081 082 /** A "file info" block: a key-value map of file-wide metadata. */ 083 protected HFileInfo fileInfo = new HFileInfo(); 084 085 /** Total # of key/value entries, i.e. how many times add() was called. */ 086 protected long entryCount = 0; 087 088 /** Used for calculating the average key length. */ 089 protected long totalKeyLength = 0; 090 091 /** Used for calculating the average value length. */ 092 protected long totalValueLength = 0; 093 094 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 095 protected long totalUncompressedBytes = 0; 096 097 /** Meta block names. */ 098 protected List<byte[]> metaNames = new ArrayList<>(); 099 100 /** {@link Writable}s representing meta block data. */ 101 protected List<Writable> metaData = new ArrayList<>(); 102 103 /** 104 * First cell in a block. 105 * This reference should be short-lived since we write hfiles in a burst. 106 */ 107 protected Cell firstCellInBlock = null; 108 109 110 /** May be null if we were passed a stream. */ 111 protected final Path path; 112 113 /** Cache configuration for caching data on write. */ 114 protected final CacheConfig cacheConf; 115 116 /** 117 * Name for this object used when logging or in toString. Is either 118 * the result of a toString on stream or else name of passed file Path. 119 */ 120 protected final String name; 121 122 /** 123 * The data block encoding which will be used. 124 * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding. 125 */ 126 protected final HFileDataBlockEncoder blockEncoder; 127 128 protected final HFileContext hFileContext; 129 130 private int maxTagsLength = 0; 131 132 /** KeyValue version in FileInfo */ 133 public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 134 135 /** Version for KeyValue which includes memstore timestamp */ 136 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 137 138 /** Inline block writers for multi-level block index and compound Blooms. */ 139 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 140 141 /** block writer */ 142 protected HFileBlock.Writer blockWriter; 143 144 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 145 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 146 147 /** The offset of the first data block or -1 if the file is empty. */ 148 private long firstDataBlockOffset = UNSET; 149 150 /** The offset of the last data block or 0 if the file is empty. */ 151 protected long lastDataBlockOffset = UNSET; 152 153 /** 154 * The last(stop) Cell of the previous data block. 155 * This reference should be short-lived since we write hfiles in a burst. 156 */ 157 private Cell lastCellOfPreviousBlock = null; 158 159 /** Additional data items to be written to the "load-on-open" section. */ 160 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 161 162 protected long maxMemstoreTS = 0; 163 164 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 165 FSDataOutputStream outputStream, HFileContext fileContext) { 166 this.outputStream = outputStream; 167 this.path = path; 168 this.name = path != null ? path.getName() : outputStream.toString(); 169 this.hFileContext = fileContext; 170 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 171 if (encoding != DataBlockEncoding.NONE) { 172 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 173 } else { 174 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 175 } 176 closeOutputStream = path != null; 177 this.cacheConf = cacheConf; 178 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f); 179 this.encodedBlockSizeLimit = (int)(hFileContext.getBlocksize() * encodeBlockSizeRatio); 180 finishInit(conf); 181 if (LOG.isTraceEnabled()) { 182 LOG.trace("Writer" + (path != null ? " for " + path : "") + 183 " initialized with cacheConf: " + cacheConf + 184 " fileContext: " + fileContext); 185 } 186 } 187 188 /** 189 * Add to the file info. All added key/value pairs can be obtained using 190 * {@link HFile.Reader#getHFileInfo()}. 191 * 192 * @param k Key 193 * @param v Value 194 * @throws IOException in case the key or the value are invalid 195 */ 196 @Override 197 public void appendFileInfo(final byte[] k, final byte[] v) 198 throws IOException { 199 fileInfo.append(k, v, true); 200 } 201 202 /** 203 * Sets the file info offset in the trailer, finishes up populating fields in 204 * the file info, and writes the file info into the given data output. The 205 * reason the data output is not always {@link #outputStream} is that we store 206 * file info as a block in version 2. 207 * 208 * @param trailer fixed file trailer 209 * @param out the data output to write the file info to 210 */ 211 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 212 throws IOException { 213 trailer.setFileInfoOffset(outputStream.getPos()); 214 finishFileInfo(); 215 long startTime = System.currentTimeMillis(); 216 fileInfo.write(out); 217 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 218 } 219 220 /** 221 * Checks that the given Cell's key does not violate the key order. 222 * 223 * @param cell Cell whose key to check. 224 * @return true if the key is duplicate 225 * @throws IOException if the key or the key order is wrong 226 */ 227 protected boolean checkKey(final Cell cell) throws IOException { 228 boolean isDuplicateKey = false; 229 230 if (cell == null) { 231 throw new IOException("Key cannot be null or empty"); 232 } 233 if (lastCell != null) { 234 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), 235 lastCell, cell); 236 if (keyComp > 0) { 237 String message = getLexicalErrorMessage(cell); 238 throw new IOException(message); 239 } else if (keyComp == 0) { 240 isDuplicateKey = true; 241 } 242 } 243 return isDuplicateKey; 244 } 245 246 private String getLexicalErrorMessage(Cell cell) { 247 StringBuilder sb = new StringBuilder(); 248 sb.append("Added a key not lexically larger than previous. Current cell = "); 249 sb.append(cell); 250 sb.append(", lastCell = "); 251 sb.append(lastCell); 252 //file context includes HFile path and optionally table and CF of file being written 253 sb.append("fileContext="); 254 sb.append(hFileContext); 255 return sb.toString(); 256 } 257 258 /** Checks the given value for validity. */ 259 protected void checkValue(final byte[] value, final int offset, 260 final int length) throws IOException { 261 if (value == null) { 262 throw new IOException("Value cannot be null"); 263 } 264 } 265 266 /** 267 * @return Path or null if we were passed a stream rather than a Path. 268 */ 269 @Override 270 public Path getPath() { 271 return path; 272 } 273 274 @Override 275 public String toString() { 276 return "writer=" + (path != null ? path.toString() : null) + ", name=" 277 + name + ", compression=" + hFileContext.getCompression().getName(); 278 } 279 280 public static Compression.Algorithm compressionByName(String algoName) { 281 if (algoName == null) { 282 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 283 } 284 return Compression.getCompressionAlgorithmByName(algoName); 285 } 286 287 /** A helper method to create HFile output streams in constructors */ 288 protected static FSDataOutputStream createOutputStream(Configuration conf, 289 FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { 290 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, 291 HConstants.DATA_FILE_UMASK_KEY); 292 return FSUtils.create(conf, fs, path, perms, favoredNodes); 293 } 294 295 /** Additional initialization steps */ 296 protected void finishInit(final Configuration conf) { 297 if (blockWriter != null) { 298 throw new IllegalStateException("finishInit called twice"); 299 } 300 blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext, 301 cacheConf.getByteBuffAllocator()); 302 // Data block index writer 303 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 304 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 305 cacheIndexesOnWrite ? cacheConf : null, 306 cacheIndexesOnWrite ? name : null); 307 dataBlockIndexWriter.setMaxChunkSize( 308 HFileBlockIndex.getMaxChunkSize(conf)); 309 dataBlockIndexWriter.setMinIndexNumEntries( 310 HFileBlockIndex.getMinIndexNumEntries(conf)); 311 inlineBlockWriters.add(dataBlockIndexWriter); 312 313 // Meta data block index writer 314 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 315 LOG.trace("Initialized with {}", cacheConf); 316 } 317 318 /** 319 * At a block boundary, write all the inline blocks and opens new block. 320 */ 321 protected void checkBlockBoundary() throws IOException { 322 // For encoder like prefixTree, encoded size is not available, so we have to compare both 323 // encoded size and unencoded size to blocksize limit. 324 if (blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit 325 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()) { 326 finishBlock(); 327 writeInlineBlocks(false); 328 newBlock(); 329 } 330 } 331 332 /** Clean up the data block that is currently being written.*/ 333 private void finishBlock() throws IOException { 334 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { 335 return; 336 } 337 338 // Update the first data block offset if UNSET; used scanning. 339 if (firstDataBlockOffset == UNSET) { 340 firstDataBlockOffset = outputStream.getPos(); 341 } 342 // Update the last data block offset each time through here. 343 lastDataBlockOffset = outputStream.getPos(); 344 blockWriter.writeHeaderAndData(outputStream); 345 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 346 Cell indexEntry = 347 getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); 348 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 349 lastDataBlockOffset, onDiskSize); 350 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 351 if (cacheConf.shouldCacheDataOnWrite()) { 352 doCacheOnWrite(lastDataBlockOffset); 353 } 354 } 355 356 /** 357 * Try to return a Cell that falls between <code>left</code> and 358 * <code>right</code> but that is shorter; i.e. takes up less space. This 359 * trick is used building HFile block index. Its an optimization. It does not 360 * always work. In this case we'll just return the <code>right</code> cell. 361 * @return A cell that sorts between <code>left</code> and <code>right</code>. 362 */ 363 public static Cell getMidpoint(final CellComparator comparator, final Cell left, 364 final Cell right) { 365 // TODO: Redo so only a single pass over the arrays rather than one to 366 // compare and then a second composing midpoint. 367 if (right == null) { 368 throw new IllegalArgumentException("right cell can not be null"); 369 } 370 if (left == null) { 371 return right; 372 } 373 // If Cells from meta table, don't mess around. meta table Cells have schema 374 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 375 // out without trying to do this optimization. 376 if (comparator instanceof MetaCellComparator) { 377 return right; 378 } 379 int diff = comparator.compareRows(left, right); 380 if (diff > 0) { 381 throw new IllegalArgumentException("Left row sorts after right row; left=" 382 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 383 } 384 byte[] midRow; 385 boolean bufferBacked = left instanceof ByteBufferExtendedCell 386 && right instanceof ByteBufferExtendedCell; 387 if (diff < 0) { 388 // Left row is < right row. 389 if (bufferBacked) { 390 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 391 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 392 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 393 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 394 } else { 395 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), 396 left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength()); 397 } 398 // If midRow is null, just return 'right'. Can't do optimization. 399 if (midRow == null) { 400 return right; 401 } 402 return PrivateCellUtil.createFirstOnRow(midRow); 403 } 404 // Rows are same. Compare on families. 405 diff = comparator.compareFamilies(left, right); 406 if (diff > 0) { 407 throw new IllegalArgumentException("Left family sorts after right family; left=" 408 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 409 } 410 if (diff < 0) { 411 if (bufferBacked) { 412 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 413 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 414 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 415 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 416 } else { 417 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 418 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 419 right.getFamilyLength()); 420 } 421 // If midRow is null, just return 'right'. Can't do optimization. 422 if (midRow == null) { 423 return right; 424 } 425 // Return new Cell where we use right row and then a mid sort family. 426 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 427 } 428 // Families are same. Compare on qualifiers. 429 diff = comparator.compareQualifiers(left, right); 430 if (diff > 0) { 431 throw new IllegalArgumentException("Left qualifier sorts after right qualifier; left=" 432 + CellUtil.getCellKeyAsString(left) + ", right=" + CellUtil.getCellKeyAsString(right)); 433 } 434 if (diff < 0) { 435 if (bufferBacked) { 436 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 437 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 438 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 439 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 440 } else { 441 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 442 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 443 right.getQualifierLength()); 444 } 445 // If midRow is null, just return 'right'. Can't do optimization. 446 if (midRow == null) { 447 return right; 448 } 449 // Return new Cell where we use right row and family and then a mid sort qualifier. 450 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 451 } 452 // No opportunity for optimization. Just return right key. 453 return right; 454 } 455 456 /** 457 * @return Return a new array that is between left and right and minimally 458 * sized else just return null as indicator that we could not create a 459 * mid point. 460 */ 461 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 462 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 463 // rows are different 464 int minLength = leftLength < rightLength ? leftLength : rightLength; 465 int diffIdx = 0; 466 while (diffIdx < minLength 467 && leftArray[leftOffset + diffIdx] == rightArray[rightOffset + diffIdx]) { 468 diffIdx++; 469 } 470 byte[] minimumMidpointArray = null; 471 if (diffIdx >= minLength) { 472 // leftKey's row is prefix of rightKey's. 473 minimumMidpointArray = new byte[diffIdx + 1]; 474 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1); 475 } else { 476 int diffByte = leftArray[leftOffset + diffIdx]; 477 if ((0xff & diffByte) < 0xff && (diffByte + 1) < (rightArray[rightOffset + diffIdx] & 0xff)) { 478 minimumMidpointArray = new byte[diffIdx + 1]; 479 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx); 480 minimumMidpointArray[diffIdx] = (byte) (diffByte + 1); 481 } else { 482 minimumMidpointArray = new byte[diffIdx + 1]; 483 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, diffIdx + 1); 484 } 485 } 486 return minimumMidpointArray; 487 } 488 489 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 490 ByteBuffer right, int rightOffset, int rightLength) { 491 // rows are different 492 int minLength = leftLength < rightLength ? leftLength : rightLength; 493 int diffIdx = 0; 494 while (diffIdx < minLength && ByteBufferUtils.toByte(left, 495 leftOffset + diffIdx) == ByteBufferUtils.toByte(right, rightOffset + diffIdx)) { 496 diffIdx++; 497 } 498 byte[] minMidpoint = null; 499 if (diffIdx >= minLength) { 500 // leftKey's row is prefix of rightKey's. 501 minMidpoint = new byte[diffIdx + 1]; 502 ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1); 503 } else { 504 int diffByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 505 if ((0xff & diffByte) < 0xff 506 && (diffByte + 1) < (ByteBufferUtils.toByte(right, rightOffset + diffIdx) & 0xff)) { 507 minMidpoint = new byte[diffIdx + 1]; 508 ByteBufferUtils.copyFromBufferToArray(minMidpoint, left, leftOffset, 0, diffIdx); 509 minMidpoint[diffIdx] = (byte) (diffByte + 1); 510 } else { 511 minMidpoint = new byte[diffIdx + 1]; 512 ByteBufferUtils.copyFromBufferToArray(minMidpoint, right, rightOffset, 0, diffIdx + 1); 513 } 514 } 515 return minMidpoint; 516 } 517 518 /** Gives inline block writers an opportunity to contribute blocks. */ 519 private void writeInlineBlocks(boolean closing) throws IOException { 520 for (InlineBlockWriter ibw : inlineBlockWriters) { 521 while (ibw.shouldWriteBlock(closing)) { 522 long offset = outputStream.getPos(); 523 boolean cacheThisBlock = ibw.getCacheOnWrite(); 524 ibw.writeInlineBlock(blockWriter.startWriting( 525 ibw.getInlineBlockType())); 526 blockWriter.writeHeaderAndData(outputStream); 527 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 528 blockWriter.getUncompressedSizeWithoutHeader()); 529 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 530 531 if (cacheThisBlock) { 532 doCacheOnWrite(offset); 533 } 534 } 535 } 536 } 537 538 /** 539 * Caches the last written HFile block. 540 * @param offset the offset of the block we want to cache. Used to determine 541 * the cache key. 542 */ 543 private void doCacheOnWrite(long offset) { 544 cacheConf.getBlockCache().ifPresent(cache -> { 545 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 546 try { 547 cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), 548 cacheFormatBlock); 549 } finally { 550 // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent 551 cacheFormatBlock.release(); 552 } 553 }); 554 } 555 556 /** 557 * Ready a new block for writing. 558 */ 559 protected void newBlock() throws IOException { 560 // This is where the next block begins. 561 blockWriter.startWriting(BlockType.DATA); 562 firstCellInBlock = null; 563 if (lastCell != null) { 564 lastCellOfPreviousBlock = lastCell; 565 } 566 } 567 568 /** 569 * Add a meta block to the end of the file. Call before close(). Metadata 570 * blocks are expensive. Fill one with a bunch of serialized data rather than 571 * do a metadata block per metadata instance. If metadata is small, consider 572 * adding to file info using {@link #appendFileInfo(byte[], byte[])} 573 * 574 * @param metaBlockName 575 * name of the block 576 * @param content 577 * will call readFields to get data later (DO NOT REUSE) 578 */ 579 @Override 580 public void appendMetaBlock(String metaBlockName, Writable content) { 581 byte[] key = Bytes.toBytes(metaBlockName); 582 int i; 583 for (i = 0; i < metaNames.size(); ++i) { 584 // stop when the current key is greater than our own 585 byte[] cur = metaNames.get(i); 586 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, 587 key.length) > 0) { 588 break; 589 } 590 } 591 metaNames.add(i, key); 592 metaData.add(i, content); 593 } 594 595 @Override 596 public void close() throws IOException { 597 if (outputStream == null) { 598 return; 599 } 600 // Save data block encoder metadata in the file info. 601 blockEncoder.saveMetadata(this); 602 // Write out the end of the data blocks, then write meta data blocks. 603 // followed by fileinfo, data block index and meta block index. 604 605 finishBlock(); 606 writeInlineBlocks(true); 607 608 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 609 610 // Write out the metadata blocks if any. 611 if (!metaNames.isEmpty()) { 612 for (int i = 0; i < metaNames.size(); ++i) { 613 // store the beginning offset 614 long offset = outputStream.getPos(); 615 // write the metadata content 616 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 617 metaData.get(i).write(dos); 618 619 blockWriter.writeHeaderAndData(outputStream); 620 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 621 622 // Add the new meta block to the meta index. 623 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 624 blockWriter.getOnDiskSizeWithHeader()); 625 } 626 } 627 628 // Load-on-open section. 629 630 // Data block index. 631 // 632 // In version 2, this section of the file starts with the root level data 633 // block index. We call a function that writes intermediate-level blocks 634 // first, then root level, and returns the offset of the root level block 635 // index. 636 637 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 638 trailer.setLoadOnOpenOffset(rootIndexOffset); 639 640 // Meta block index. 641 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting( 642 BlockType.ROOT_INDEX), "meta"); 643 blockWriter.writeHeaderAndData(outputStream); 644 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 645 646 if (this.hFileContext.isIncludesMvcc()) { 647 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 648 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 649 } 650 651 // File info 652 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 653 blockWriter.writeHeaderAndData(outputStream); 654 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 655 656 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 657 for (BlockWritable w : additionalLoadOnOpenData){ 658 blockWriter.writeBlock(w, outputStream); 659 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 660 } 661 662 // Now finish off the trailer. 663 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 664 trailer.setUncompressedDataIndexSize( 665 dataBlockIndexWriter.getTotalUncompressedSize()); 666 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 667 trailer.setLastDataBlockOffset(lastDataBlockOffset); 668 trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); 669 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 670 671 672 finishClose(trailer); 673 674 blockWriter.release(); 675 } 676 677 @Override 678 public void addInlineBlockWriter(InlineBlockWriter ibw) { 679 inlineBlockWriters.add(ibw); 680 } 681 682 @Override 683 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 684 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 685 } 686 687 @Override 688 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 689 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 690 } 691 692 private void addBloomFilter(final BloomFilterWriter bfw, 693 final BlockType blockType) { 694 if (bfw.getKeyCount() <= 0) { 695 return; 696 } 697 698 if (blockType != BlockType.GENERAL_BLOOM_META && 699 blockType != BlockType.DELETE_FAMILY_BLOOM_META) { 700 throw new RuntimeException("Block Type: " + blockType.toString() + 701 "is not supported"); 702 } 703 additionalLoadOnOpenData.add(new BlockWritable() { 704 @Override 705 public BlockType getBlockType() { 706 return blockType; 707 } 708 709 @Override 710 public void writeToBlock(DataOutput out) throws IOException { 711 bfw.getMetaWriter().write(out); 712 Writable dataWriter = bfw.getDataWriter(); 713 if (dataWriter != null) { 714 dataWriter.write(out); 715 } 716 } 717 }); 718 } 719 720 @Override 721 public HFileContext getFileContext() { 722 return hFileContext; 723 } 724 725 /** 726 * Add key/value to file. Keys must be added in an order that agrees with the 727 * Comparator passed on construction. 728 * 729 * @param cell 730 * Cell to add. Cannot be empty nor null. 731 */ 732 @Override 733 public void append(final Cell cell) throws IOException { 734 // checkKey uses comparator to check we are writing in order. 735 boolean dupKey = checkKey(cell); 736 if (!dupKey) { 737 checkBlockBoundary(); 738 } 739 740 if (!blockWriter.isWriting()) { 741 newBlock(); 742 } 743 744 blockWriter.write(cell); 745 746 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 747 totalValueLength += cell.getValueLength(); 748 749 // Are we the first key in this block? 750 if (firstCellInBlock == null) { 751 // If cell is big, block will be closed and this firstCellInBlock reference will only last 752 // a short while. 753 firstCellInBlock = cell; 754 } 755 756 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 757 lastCell = cell; 758 entryCount++; 759 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 760 int tagsLength = cell.getTagsLength(); 761 if (tagsLength > this.maxTagsLength) { 762 this.maxTagsLength = tagsLength; 763 } 764 } 765 766 @Override 767 public void beforeShipped() throws IOException { 768 this.blockWriter.beforeShipped(); 769 // Add clone methods for every cell 770 if (this.lastCell != null) { 771 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 772 } 773 if (this.firstCellInBlock != null) { 774 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 775 } 776 if (this.lastCellOfPreviousBlock != null) { 777 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 778 } 779 } 780 781 public Cell getLastCell() { 782 return lastCell; 783 } 784 785 protected void finishFileInfo() throws IOException { 786 if (lastCell != null) { 787 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 788 // byte buffer. Won't take a tuple. 789 byte [] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 790 fileInfo.append(HFileInfo.LASTKEY, lastKey, false); 791 } 792 793 // Average key length. 794 int avgKeyLen = 795 entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 796 fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 797 fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 798 false); 799 800 // Average value length. 801 int avgValueLen = 802 entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 803 fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 804 if (hFileContext.isIncludesTags()) { 805 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 806 // from the FileInfo 807 fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 808 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 809 && hFileContext.isCompressTags(); 810 fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 811 } 812 } 813 814 protected int getMajorVersion() { 815 return 3; 816 } 817 818 protected int getMinorVersion() { 819 return HFileReaderImpl.MAX_MINOR_VERSION; 820 } 821 822 protected void finishClose(FixedFileTrailer trailer) throws IOException { 823 // Write out encryption metadata before finalizing if we have a valid crypto context 824 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 825 if (cryptoContext != Encryption.Context.NONE) { 826 // Wrap the context's key and write it as the encryption metadata, the wrapper includes 827 // all information needed for decryption 828 trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(), 829 cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 830 User.getCurrent().getShortName()), 831 cryptoContext.getKey())); 832 } 833 // Now we can finish the close 834 trailer.setMetaIndexCount(metaNames.size()); 835 trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize()); 836 trailer.setEntryCount(entryCount); 837 trailer.setCompressionCodec(hFileContext.getCompression()); 838 839 long startTime = System.currentTimeMillis(); 840 trailer.serialize(outputStream); 841 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 842 843 if (closeOutputStream) { 844 outputStream.close(); 845 outputStream = null; 846 } 847 } 848}