001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 029import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS; 030import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY; 031 032import java.io.IOException; 033import java.net.InetSocketAddress; 034import java.util.ArrayList; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Set; 040import java.util.UUID; 041import java.util.function.Consumer; 042import java.util.function.Supplier; 043import java.util.regex.Pattern; 044import java.util.stream.Collectors; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellComparator; 050import org.apache.hadoop.hbase.CellUtil; 051import org.apache.hadoop.hbase.ExtendedCell; 052import org.apache.hadoop.hbase.HBaseInterfaceAudience; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.PrivateCellUtil; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 057import org.apache.hadoop.hbase.io.hfile.CacheConfig; 058import org.apache.hadoop.hbase.io.hfile.HFile; 059import org.apache.hadoop.hbase.io.hfile.HFileContext; 060import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 061import org.apache.hadoop.hbase.mob.MobUtils; 062import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 063import org.apache.hadoop.hbase.util.BloomContext; 064import org.apache.hadoop.hbase.util.BloomFilterFactory; 065import org.apache.hadoop.hbase.util.BloomFilterUtil; 066import org.apache.hadoop.hbase.util.BloomFilterWriter; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.CommonFSUtils; 069import org.apache.hadoop.hbase.util.RowBloomContext; 070import org.apache.hadoop.hbase.util.RowColBloomContext; 071import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 072import org.apache.yetus.audience.InterfaceAudience; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 077import org.apache.hbase.thirdparty.com.google.common.base.Strings; 078import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 079import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 080 081import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 082 083/** 084 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is 085 * an implementation detail of the HBase regionserver. 086 */ 087@InterfaceAudience.Private 088public class StoreFileWriter implements CellSink, ShipperListener { 089 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 090 public static final String ENABLE_HISTORICAL_COMPACTION_FILES = 091 "hbase.enable.historical.compaction.files"; 092 public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false; 093 private static final Pattern dash = Pattern.compile("-"); 094 private SingleStoreFileWriter liveFileWriter; 095 private SingleStoreFileWriter historicalFileWriter; 096 private final FileSystem fs; 097 private final Path historicalFilePath; 098 private final Configuration conf; 099 private final CacheConfig cacheConf; 100 private final BloomType bloomType; 101 private final long maxKeys; 102 private final InetSocketAddress[] favoredNodes; 103 private final HFileContext fileContext; 104 private final boolean shouldDropCacheBehind; 105 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 106 private final CellComparator comparator; 107 private ExtendedCell lastCell; 108 // The first (latest) delete family marker of the current row 109 private ExtendedCell deleteFamily; 110 // The list of delete family version markers of the current row 111 private List<ExtendedCell> deleteFamilyVersionList = new ArrayList<>(); 112 // The first (latest) delete column marker of the current column 113 private ExtendedCell deleteColumn; 114 // The list of delete column version markers of the current column 115 private List<ExtendedCell> deleteColumnVersionList = new ArrayList<>(); 116 // The live put cell count for the current column 117 private int livePutCellCount; 118 private final int maxVersions; 119 private final boolean newVersionBehavior; 120 121 /** 122 * Creates an HFile.Writer that also write helpful meta data. 123 * @param fs file system to write to 124 * @param liveFilePath the name of the live file to create 125 * @param historicalFilePath the name of the historical file name to create 126 * @param conf user configuration 127 * @param bloomType bloom filter setting 128 * @param maxKeys the expected maximum number of keys to be added. Was used for 129 * Bloom filter size in {@link HFile} format version 1. 130 * @param favoredNodes an array of favored nodes or possibly null 131 * @param fileContext The HFile context 132 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 133 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 134 * @param comparator Cell comparator 135 * @param maxVersions max cell versions 136 * @param newVersionBehavior enable new version behavior 137 * @throws IOException problem writing to FS 138 */ 139 private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath, 140 final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys, 141 InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind, 142 Supplier<Collection<HStoreFile>> compactedFilesSupplier, CellComparator comparator, 143 int maxVersions, boolean newVersionBehavior) throws IOException { 144 this.fs = fs; 145 this.historicalFilePath = historicalFilePath; 146 this.conf = conf; 147 this.cacheConf = cacheConf; 148 this.bloomType = bloomType; 149 this.maxKeys = maxKeys; 150 this.favoredNodes = favoredNodes; 151 this.fileContext = fileContext; 152 this.shouldDropCacheBehind = shouldDropCacheBehind; 153 this.compactedFilesSupplier = compactedFilesSupplier; 154 this.comparator = comparator; 155 this.maxVersions = maxVersions; 156 this.newVersionBehavior = newVersionBehavior; 157 liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType, 158 maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 159 } 160 161 public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) { 162 if ( 163 conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES, 164 DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES) 165 ) { 166 // Historical compaction files are supported only for default store engine with 167 // default compactor. 168 String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 169 if (!storeEngine.equals(DefaultStoreEngine.class.getName())) { 170 LOG.warn("Historical compaction file generation is ignored for " + storeEngine 171 + ". hbase.enable.historical.compaction.files can be set to true only for the " 172 + "default compaction (DefaultStoreEngine and DefaultCompactor)"); 173 return false; 174 } 175 String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName()); 176 if (!compactor.equals(DefaultCompactor.class.getName())) { 177 LOG.warn("Historical compaction file generation is ignored for " + compactor 178 + ". hbase.enable.historical.compaction.files can be set to true only for the " 179 + "default compaction (DefaultStoreEngine and DefaultCompactor)"); 180 return false; 181 } 182 return true; 183 } 184 return false; 185 } 186 187 public long getPos() throws IOException { 188 return liveFileWriter.getPos(); 189 } 190 191 /** 192 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 193 * @param maxSequenceId Maximum sequence id. 194 * @param majorCompaction True if this file is product of a major compaction 195 * @throws IOException problem writing to FS 196 */ 197 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 198 throws IOException { 199 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction); 200 if (historicalFileWriter != null) { 201 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction); 202 } 203 } 204 205 /** 206 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 207 * @param maxSequenceId Maximum sequence id. 208 * @param majorCompaction True if this file is product of a major compaction 209 * @param storeFiles The compacted store files to generate this new file 210 * @throws IOException problem writing to FS 211 */ 212 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 213 final Collection<HStoreFile> storeFiles) throws IOException { 214 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); 215 if (historicalFileWriter != null) { 216 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); 217 } 218 } 219 220 /** 221 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 222 * @param maxSequenceId Maximum sequence id. 223 * @param majorCompaction True if this file is product of a major compaction 224 * @param mobCellsCount The number of mob cells. 225 * @throws IOException problem writing to FS 226 */ 227 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 228 final long mobCellsCount) throws IOException { 229 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); 230 if (historicalFileWriter != null) { 231 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); 232 } 233 } 234 235 /** 236 * Appends MOB - specific metadata (even if it is empty) 237 * @param mobRefSet - original table -> set of MOB file names 238 * @throws IOException problem writing to FS 239 */ 240 public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 241 liveFileWriter.appendMobMetadata(mobRefSet); 242 if (historicalFileWriter != null) { 243 historicalFileWriter.appendMobMetadata(mobRefSet); 244 } 245 } 246 247 /** 248 * Add TimestampRange and earliest put timestamp to Metadata 249 */ 250 public void appendTrackedTimestampsToMetadata() throws IOException { 251 // TODO: The StoreFileReader always converts the byte[] to TimeRange 252 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 253 liveFileWriter.appendTrackedTimestampsToMetadata(); 254 if (historicalFileWriter != null) { 255 historicalFileWriter.appendTrackedTimestampsToMetadata(); 256 } 257 } 258 259 public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) 260 throws IOException { 261 liveFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker); 262 if (historicalFileWriter != null) { 263 historicalFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker); 264 } 265 } 266 267 @Override 268 public void beforeShipped() throws IOException { 269 liveFileWriter.beforeShipped(); 270 if (historicalFileWriter != null) { 271 historicalFileWriter.beforeShipped(); 272 } 273 } 274 275 public Path getPath() { 276 return liveFileWriter.getPath(); 277 } 278 279 public List<Path> getPaths() { 280 if (historicalFileWriter == null) { 281 return Lists.newArrayList(liveFileWriter.getPath()); 282 } 283 return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath()); 284 } 285 286 public boolean hasGeneralBloom() { 287 return liveFileWriter.hasGeneralBloom(); 288 } 289 290 /** 291 * For unit testing only. 292 * @return the Bloom filter used by this writer. 293 */ 294 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 295 public BloomFilterWriter getGeneralBloomWriter() { 296 return liveFileWriter.generalBloomFilterWriter; 297 } 298 299 public void close() throws IOException { 300 liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false)); 301 liveFileWriter.close(); 302 if (historicalFileWriter != null) { 303 historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true)); 304 historicalFileWriter.close(); 305 } 306 } 307 308 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 309 liveFileWriter.appendFileInfo(key, value); 310 if (historicalFileWriter != null) { 311 historicalFileWriter.appendFileInfo(key, value); 312 } 313 } 314 315 /** 316 * For use in testing. 317 */ 318 HFile.Writer getLiveFileWriter() { 319 return liveFileWriter.getHFileWriter(); 320 } 321 322 /** 323 * @param dir Directory to create file in. 324 * @return random filename inside passed <code>dir</code> 325 */ 326 public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 327 if (!fs.getFileStatus(dir).isDirectory()) { 328 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 329 } 330 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 331 } 332 333 private SingleStoreFileWriter getHistoricalFileWriter() throws IOException { 334 if (historicalFileWriter == null) { 335 historicalFileWriter = 336 new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys, 337 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 338 } 339 return historicalFileWriter; 340 } 341 342 private void initRowState() { 343 deleteFamily = null; 344 deleteFamilyVersionList.clear(); 345 lastCell = null; 346 } 347 348 private void initColumnState() { 349 livePutCellCount = 0; 350 deleteColumn = null; 351 deleteColumnVersionList.clear(); 352 353 } 354 355 private boolean isDeletedByDeleteFamily(ExtendedCell cell) { 356 return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp() 357 || (deleteFamily.getTimestamp() == cell.getTimestamp() 358 && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId()))); 359 } 360 361 private boolean isDeletedByDeleteFamilyVersion(ExtendedCell cell) { 362 for (ExtendedCell deleteFamilyVersion : deleteFamilyVersionList) { 363 if ( 364 deleteFamilyVersion.getTimestamp() == cell.getTimestamp() 365 && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId()) 366 ) { 367 return true; 368 } 369 } 370 return false; 371 } 372 373 private boolean isDeletedByDeleteColumn(ExtendedCell cell) { 374 return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp() 375 || (deleteColumn.getTimestamp() == cell.getTimestamp() 376 && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId()))); 377 } 378 379 private boolean isDeletedByDeleteColumnVersion(ExtendedCell cell) { 380 for (ExtendedCell deleteColumnVersion : deleteColumnVersionList) { 381 if ( 382 deleteColumnVersion.getTimestamp() == cell.getTimestamp() 383 && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId()) 384 ) { 385 return true; 386 } 387 } 388 return false; 389 } 390 391 private boolean isDeleted(ExtendedCell cell) { 392 return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell) 393 || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell); 394 } 395 396 private void appendCell(ExtendedCell cell) throws IOException { 397 if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) { 398 initColumnState(); 399 } 400 if (cell.getType() == Cell.Type.DeleteFamily) { 401 if (deleteFamily == null) { 402 deleteFamily = cell; 403 liveFileWriter.append(cell); 404 } else { 405 getHistoricalFileWriter().append(cell); 406 } 407 } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) { 408 if (!isDeletedByDeleteFamily(cell)) { 409 deleteFamilyVersionList.add(cell); 410 if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { 411 // This means both the delete-family and delete-family-version markers have the same 412 // timestamp but the sequence id of delete-family-version marker is higher than that of 413 // the delete-family marker. In this case, there is no need to add the 414 // delete-family-version marker to the live version file. This case happens only with 415 // the new version behavior. 416 liveFileWriter.append(cell); 417 } else { 418 liveFileWriter.append(cell); 419 } 420 } else { 421 getHistoricalFileWriter().append(cell); 422 } 423 } else if (cell.getType() == Cell.Type.DeleteColumn) { 424 if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { 425 deleteColumn = cell; 426 liveFileWriter.append(cell); 427 } else { 428 getHistoricalFileWriter().append(cell); 429 } 430 } else if (cell.getType() == Cell.Type.Delete) { 431 if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { 432 deleteColumnVersionList.add(cell); 433 if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { 434 // This means both the delete-family and delete-column-version markers have the same 435 // timestamp but the sequence id of delete-column-version marker is higher than that of 436 // the delete-family marker. In this case, there is no need to add the 437 // delete-column-version marker to the live version file. This case happens only with 438 // the new version behavior. 439 getHistoricalFileWriter().append(cell); 440 } else { 441 liveFileWriter.append(cell); 442 } 443 } else { 444 getHistoricalFileWriter().append(cell); 445 } 446 } else if (cell.getType() == Cell.Type.Put) { 447 if (livePutCellCount < maxVersions) { 448 // This is a live put cell (i.e., the latest version) of a column. Is it deleted? 449 if (!isDeleted(cell)) { 450 liveFileWriter.append(cell); 451 livePutCellCount++; 452 } else { 453 // It is deleted 454 getHistoricalFileWriter().append(cell); 455 if (newVersionBehavior) { 456 // Deleted versions are considered toward total version count when newVersionBehavior 457 livePutCellCount++; 458 } 459 } 460 } else { 461 // It is an older put cell 462 getHistoricalFileWriter().append(cell); 463 } 464 } 465 lastCell = cell; 466 } 467 468 @Override 469 public void appendAll(List<ExtendedCell> cellList) throws IOException { 470 if (historicalFilePath == null) { 471 // The dual writing is not enabled and all cells are written to one file. We use 472 // the live version file in this case 473 for (ExtendedCell cell : cellList) { 474 liveFileWriter.append(cell); 475 } 476 return; 477 } 478 if (cellList.isEmpty()) { 479 return; 480 } 481 if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) { 482 // It is a new row and thus time to reset the state 483 initRowState(); 484 } 485 for (ExtendedCell cell : cellList) { 486 appendCell(cell); 487 } 488 } 489 490 @Override 491 public void append(ExtendedCell cell) throws IOException { 492 if (historicalFilePath == null) { 493 // The dual writing is not enabled and all cells are written to one file. We use 494 // the live version file in this case 495 liveFileWriter.append(cell); 496 return; 497 } 498 appendCell(cell); 499 } 500 501 private static class SingleStoreFileWriter { 502 private final BloomFilterWriter generalBloomFilterWriter; 503 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 504 private final BloomType bloomType; 505 private byte[] bloomParam = null; 506 private long deleteFamilyCnt = 0; 507 private BloomContext bloomContext = null; 508 private BloomContext deleteFamilyBloomContext = null; 509 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 510 511 private HFile.Writer writer; 512 513 /** 514 * Creates an HFile.Writer that also write helpful meta data. 515 * @param fs file system to write to 516 * @param path file name to create 517 * @param conf user configuration 518 * @param bloomType bloom filter setting 519 * @param maxKeys the expected maximum number of keys to be added. Was used for 520 * Bloom filter size in {@link HFile} format version 1. 521 * @param favoredNodes an array of favored nodes or possibly null 522 * @param fileContext The HFile context 523 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 524 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 525 * @throws IOException problem writing to FS 526 */ 527 private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf, 528 CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, 529 HFileContext fileContext, boolean shouldDropCacheBehind, 530 Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException { 531 this.compactedFilesSupplier = compactedFilesSupplier; 532 // TODO : Change all writers to be specifically created for compaction context 533 writer = 534 HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) 535 .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create(); 536 537 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, 538 bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 539 540 if (generalBloomFilterWriter != null) { 541 this.bloomType = bloomType; 542 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 543 if (LOG.isTraceEnabled()) { 544 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 545 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH 546 ? Bytes.toInt(bloomParam) 547 : Bytes.toStringBinary(bloomParam)) 548 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 549 } 550 // init bloom context 551 switch (bloomType) { 552 case ROW: 553 bloomContext = 554 new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 555 break; 556 case ROWCOL: 557 bloomContext = 558 new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 559 break; 560 case ROWPREFIX_FIXED_LENGTH: 561 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, 562 fileContext.getCellComparator(), Bytes.toInt(bloomParam)); 563 break; 564 default: 565 throw new IOException( 566 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 567 } 568 } else { 569 // Not using Bloom filters. 570 this.bloomType = BloomType.NONE; 571 } 572 573 // initialize delete family Bloom filter when there is NO RowCol Bloom filter 574 if (this.bloomType != BloomType.ROWCOL) { 575 this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf, 576 cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 577 deleteFamilyBloomContext = 578 new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); 579 } else { 580 deleteFamilyBloomFilterWriter = null; 581 } 582 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 583 LOG.trace("Delete Family Bloom filter type for " + path + ": " 584 + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 585 } 586 } 587 588 private long getPos() throws IOException { 589 return ((HFileWriterImpl) writer).getPos(); 590 } 591 592 /** 593 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 594 * @param maxSequenceId Maximum sequence id. 595 * @param majorCompaction True if this file is product of a major compaction 596 * @throws IOException problem writing to FS 597 */ 598 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 599 throws IOException { 600 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 601 } 602 603 /** 604 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 605 * @param maxSequenceId Maximum sequence id. 606 * @param majorCompaction True if this file is product of a major compaction 607 * @param storeFiles The compacted store files to generate this new file 608 * @throws IOException problem writing to FS 609 */ 610 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 611 final Collection<HStoreFile> storeFiles) throws IOException { 612 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 613 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 614 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 615 appendTrackedTimestampsToMetadata(); 616 } 617 618 /** 619 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The 620 * compacted store files's name is needed. But if the compacted store file is a result of 621 * compaction, it's compacted files which still not archived is needed, too. And don't need to 622 * add compacted files recursively. If file A, B, C compacted to new file D, and file D 623 * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E 624 * compacted to new file F, will add E to F's compacted files first, then add E's compacted 625 * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has 626 * been in E's compacted files, too. See HBASE-20724 for more details. 627 * @param storeFiles The compacted store files to generate this new file 628 * @return bytes of CompactionEventTracker 629 */ 630 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 631 Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream() 632 .map(sf -> sf.getPath().getName()).collect(Collectors.toSet()); 633 Set<String> compactedStoreFiles = new HashSet<>(); 634 for (HStoreFile storeFile : storeFiles) { 635 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 636 for (String csf : storeFile.getCompactedStoreFiles()) { 637 if (notArchivedCompactedStoreFiles.contains(csf)) { 638 compactedStoreFiles.add(csf); 639 } 640 } 641 } 642 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 643 } 644 645 /** 646 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 647 * @param maxSequenceId Maximum sequence id. 648 * @param majorCompaction True if this file is product of a major compaction 649 * @param mobCellsCount The number of mob cells. 650 * @throws IOException problem writing to FS 651 */ 652 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 653 final long mobCellsCount) throws IOException { 654 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 655 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 656 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 657 appendTrackedTimestampsToMetadata(); 658 } 659 660 /** 661 * Appends MOB - specific metadata (even if it is empty) 662 * @param mobRefSet - original table -> set of MOB file names 663 * @throws IOException problem writing to FS 664 */ 665 private void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 666 writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet)); 667 } 668 669 /** 670 * Add TimestampRange and earliest put timestamp to Metadata 671 */ 672 private void appendTrackedTimestampsToMetadata() throws IOException { 673 writer.appendTrackedTimestampsToMetadata(); 674 } 675 676 public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) 677 throws IOException { 678 writer.appendCustomCellTimestampsToMetadata(timeRangeTracker); 679 } 680 681 private void appendGeneralBloomfilter(final ExtendedCell cell) throws IOException { 682 if (this.generalBloomFilterWriter != null) { 683 /* 684 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue. 685 * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of 686 * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed 687 * Length Row Prefix 688 */ 689 bloomContext.writeBloom(cell); 690 } 691 } 692 693 private void appendDeleteFamilyBloomFilter(final ExtendedCell cell) throws IOException { 694 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 695 return; 696 } 697 698 // increase the number of delete family in the store file 699 deleteFamilyCnt++; 700 if (this.deleteFamilyBloomFilterWriter != null) { 701 deleteFamilyBloomContext.writeBloom(cell); 702 } 703 } 704 705 private void append(final ExtendedCell cell) throws IOException { 706 appendGeneralBloomfilter(cell); 707 appendDeleteFamilyBloomFilter(cell); 708 writer.append(cell); 709 } 710 711 private void beforeShipped() throws IOException { 712 // For now these writer will always be of type ShipperListener true. 713 // TODO : Change all writers to be specifically created for compaction context 714 writer.beforeShipped(); 715 if (generalBloomFilterWriter != null) { 716 generalBloomFilterWriter.beforeShipped(); 717 } 718 if (deleteFamilyBloomFilterWriter != null) { 719 deleteFamilyBloomFilterWriter.beforeShipped(); 720 } 721 } 722 723 private Path getPath() { 724 return this.writer.getPath(); 725 } 726 727 private boolean hasGeneralBloom() { 728 return this.generalBloomFilterWriter != null; 729 } 730 731 /** 732 * For unit testing only. 733 * @return the Bloom filter used by this writer. 734 */ 735 BloomFilterWriter getGeneralBloomWriter() { 736 return generalBloomFilterWriter; 737 } 738 739 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 740 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 741 if (haveBloom) { 742 bfw.compactBloom(); 743 } 744 return haveBloom; 745 } 746 747 private boolean closeGeneralBloomFilter() throws IOException { 748 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 749 750 // add the general Bloom filter writer and append file info 751 if (hasGeneralBloom) { 752 writer.addGeneralBloomFilter(generalBloomFilterWriter); 753 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 754 if (bloomParam != null) { 755 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 756 } 757 bloomContext.addLastBloomKey(writer); 758 } 759 return hasGeneralBloom; 760 } 761 762 private boolean closeDeleteFamilyBloomFilter() throws IOException { 763 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 764 765 // add the delete family Bloom filter writer 766 if (hasDeleteFamilyBloom) { 767 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 768 } 769 770 // append file info about the number of delete family kvs 771 // even if there is no delete family Bloom. 772 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 773 774 return hasDeleteFamilyBloom; 775 } 776 777 private void close() throws IOException { 778 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 779 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 780 781 writer.close(); 782 783 // Log final Bloom filter statistics. This needs to be done after close() 784 // because compound Bloom filters might be finalized as part of closing. 785 if (LOG.isTraceEnabled()) { 786 LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " 787 + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " 788 + getPath()); 789 } 790 791 } 792 793 private void appendFileInfo(byte[] key, byte[] value) throws IOException { 794 writer.appendFileInfo(key, value); 795 } 796 797 /** 798 * For use in testing. 799 */ 800 private HFile.Writer getHFileWriter() { 801 return writer; 802 } 803 } 804 805 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 806 justification = "Will not overflow") 807 public static class Builder { 808 private final Configuration conf; 809 private final CacheConfig cacheConf; 810 private final FileSystem fs; 811 812 private BloomType bloomType = BloomType.NONE; 813 private long maxKeyCount = 0; 814 private Path dir; 815 private Path liveFilePath; 816 private Path historicalFilePath; 817 818 private InetSocketAddress[] favoredNodes; 819 private HFileContext fileContext; 820 private boolean shouldDropCacheBehind; 821 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 822 private String fileStoragePolicy; 823 // this is used to track the creation of the StoreFileWriter, mainly used for the SFT 824 // implementation where we will write store files directly to the final place, instead of 825 // writing a tmp file first. Under this scenario, we will have a background task to purge the 826 // store files which are not recorded in the SFT, but for the newly created store file writer, 827 // they are not tracked in SFT, so here we need to record them and treat them specially. 828 private Consumer<Path> writerCreationTracker; 829 private int maxVersions; 830 private boolean newVersionBehavior; 831 private CellComparator comparator; 832 private boolean isCompaction; 833 834 public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { 835 this.conf = conf; 836 this.cacheConf = cacheConf; 837 this.fs = fs; 838 } 839 840 /** 841 * Creates Builder with cache configuration disabled 842 */ 843 public Builder(Configuration conf, FileSystem fs) { 844 this.conf = conf; 845 this.cacheConf = CacheConfig.DISABLED; 846 this.fs = fs; 847 } 848 849 /** 850 * Use either this method or {@link #withFilePath}, but not both. 851 * @param dir Path to column family directory. The directory is created if does not exist. The 852 * file is given a unique name within this directory. 853 * @return this (for chained invocation) 854 */ 855 public Builder withOutputDir(Path dir) { 856 Preconditions.checkNotNull(dir); 857 this.dir = dir; 858 return this; 859 } 860 861 /** 862 * Use either this method or {@link #withOutputDir}, but not both. 863 * @param filePath the StoreFile path to write 864 * @return this (for chained invocation) 865 */ 866 public Builder withFilePath(Path filePath) { 867 Preconditions.checkNotNull(filePath); 868 this.liveFilePath = filePath; 869 return this; 870 } 871 872 /** 873 * @param favoredNodes an array of favored nodes or possibly null 874 * @return this (for chained invocation) 875 */ 876 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 877 this.favoredNodes = favoredNodes; 878 return this; 879 } 880 881 public Builder withBloomType(BloomType bloomType) { 882 Preconditions.checkNotNull(bloomType); 883 this.bloomType = bloomType; 884 return this; 885 } 886 887 /** 888 * @param maxKeyCount estimated maximum number of keys we expect to add 889 * @return this (for chained invocation) 890 */ 891 public Builder withMaxKeyCount(long maxKeyCount) { 892 this.maxKeyCount = maxKeyCount; 893 return this; 894 } 895 896 public Builder withFileContext(HFileContext fileContext) { 897 this.fileContext = fileContext; 898 return this; 899 } 900 901 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 902 this.shouldDropCacheBehind = shouldDropCacheBehind; 903 return this; 904 } 905 906 public Builder 907 withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 908 this.compactedFilesSupplier = compactedFilesSupplier; 909 return this; 910 } 911 912 public Builder withFileStoragePolicy(String fileStoragePolicy) { 913 this.fileStoragePolicy = fileStoragePolicy; 914 return this; 915 } 916 917 public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) { 918 this.writerCreationTracker = writerCreationTracker; 919 return this; 920 } 921 922 public Builder withMaxVersions(int maxVersions) { 923 this.maxVersions = maxVersions; 924 return this; 925 } 926 927 public Builder withNewVersionBehavior(boolean newVersionBehavior) { 928 this.newVersionBehavior = newVersionBehavior; 929 return this; 930 } 931 932 public Builder withCellComparator(CellComparator comparator) { 933 this.comparator = comparator; 934 return this; 935 } 936 937 public Builder withIsCompaction(boolean isCompaction) { 938 this.isCompaction = isCompaction; 939 return this; 940 } 941 942 /** 943 * Create a store file writer. Client is responsible for closing file when done. If metadata, 944 * add BEFORE closing using {@link StoreFileWriter#appendMetadata}. 945 */ 946 public StoreFileWriter build() throws IOException { 947 if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) { 948 throw new IllegalArgumentException("Either specify parent directory " + "or file path"); 949 } 950 951 if (dir == null) { 952 dir = liveFilePath.getParent(); 953 } 954 955 if (!fs.exists(dir)) { 956 // Handle permission for non-HDFS filesystem properly 957 // See HBASE-17710 958 HRegionFileSystem.mkdirs(fs, conf, dir); 959 } 960 961 // set block storage policy for temp path 962 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 963 if (null == policyName) { 964 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 965 } 966 CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); 967 968 if (liveFilePath == null) { 969 // The stored file and related blocks will used the directory based StoragePolicy. 970 // Because HDFS DistributedFileSystem does not support create files with storage policy 971 // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files 972 // satisfy the specific storage policy when writing. So as to avoid later data movement. 973 // We don't want to change whole temp dir to 'fileStoragePolicy'. 974 if (!Strings.isNullOrEmpty(fileStoragePolicy)) { 975 dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy); 976 if (!fs.exists(dir)) { 977 HRegionFileSystem.mkdirs(fs, conf, dir); 978 LOG.info( 979 "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy); 980 } 981 CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); 982 } 983 liveFilePath = getUniqueFile(fs, dir); 984 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 985 bloomType = BloomType.NONE; 986 } 987 } 988 989 if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) { 990 historicalFilePath = getUniqueFile(fs, dir); 991 } 992 993 // make sure we call this before actually create the writer 994 // in fact, it is not a big deal to even add an inexistent file to the track, as we will never 995 // try to delete it and finally we will clean the tracker up after compaction. But if the file 996 // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file 997 // and cause problem. 998 if (writerCreationTracker != null) { 999 writerCreationTracker.accept(liveFilePath); 1000 if (historicalFilePath != null) { 1001 writerCreationTracker.accept(historicalFilePath); 1002 } 1003 } 1004 return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType, 1005 maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier, 1006 comparator, maxVersions, newVersionBehavior); 1007 } 1008 } 1009}