001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 029import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 030import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS; 031import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 032import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY; 033 034import java.io.IOException; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.HashSet; 040import java.util.List; 041import java.util.Set; 042import java.util.UUID; 043import java.util.function.Consumer; 044import java.util.function.Supplier; 045import java.util.regex.Pattern; 046import java.util.stream.Collectors; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Cell; 051import org.apache.hadoop.hbase.CellComparator; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.ExtendedCell; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.KeyValue; 056import org.apache.hadoop.hbase.PrivateCellUtil; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 059import org.apache.hadoop.hbase.io.hfile.CacheConfig; 060import org.apache.hadoop.hbase.io.hfile.HFile; 061import org.apache.hadoop.hbase.io.hfile.HFileContext; 062import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 063import org.apache.hadoop.hbase.mob.MobUtils; 064import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 065import org.apache.hadoop.hbase.util.BloomContext; 066import org.apache.hadoop.hbase.util.BloomFilterFactory; 067import org.apache.hadoop.hbase.util.BloomFilterUtil; 068import org.apache.hadoop.hbase.util.BloomFilterWriter; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.CommonFSUtils; 071import org.apache.hadoop.hbase.util.RowBloomContext; 072import org.apache.hadoop.hbase.util.RowColBloomContext; 073import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 074import org.apache.yetus.audience.InterfaceAudience; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 079import org.apache.hbase.thirdparty.com.google.common.base.Strings; 080import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 081import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 082 083import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 084 085/** 086 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is 087 * an implementation detail of the HBase regionserver. 088 */ 089@InterfaceAudience.Private 090public class StoreFileWriter implements CellSink, ShipperListener { 091 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 092 public static final String ENABLE_HISTORICAL_COMPACTION_FILES = 093 "hbase.enable.historical.compaction.files"; 094 public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false; 095 private static final Pattern dash = Pattern.compile("-"); 096 private SingleStoreFileWriter liveFileWriter; 097 private SingleStoreFileWriter historicalFileWriter; 098 private final FileSystem fs; 099 private final Path historicalFilePath; 100 private final Configuration conf; 101 private final CacheConfig cacheConf; 102 private final BloomType bloomType; 103 private final long maxKeys; 104 private final InetSocketAddress[] favoredNodes; 105 private final HFileContext fileContext; 106 private final boolean shouldDropCacheBehind; 107 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 108 private final CellComparator comparator; 109 private ExtendedCell lastCell; 110 // The first (latest) delete family marker of the current row 111 private ExtendedCell deleteFamily; 112 // The list of delete family version markers of the current row 113 private List<ExtendedCell> deleteFamilyVersionList = new ArrayList<>(); 114 // The first (latest) delete column marker of the current column 115 private ExtendedCell deleteColumn; 116 // The list of delete column version markers of the current column 117 private List<ExtendedCell> deleteColumnVersionList = new ArrayList<>(); 118 // The live put cell count for the current column 119 private int livePutCellCount; 120 private final int maxVersions; 121 private final boolean newVersionBehavior; 122 123 /** 124 * Creates an HFile.Writer that also write helpful meta data. 125 * @param fs file system to write to 126 * @param liveFilePath the name of the live file to create 127 * @param historicalFilePath the name of the historical file name to create 128 * @param conf user configuration 129 * @param bloomType bloom filter setting 130 * @param maxKeys the expected maximum number of keys to be added. Was used for 131 * Bloom filter size in {@link HFile} format version 1. 132 * @param favoredNodes an array of favored nodes or possibly null 133 * @param fileContext The HFile context 134 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 135 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 136 * @param comparator Cell comparator 137 * @param maxVersions max cell versions 138 * @param newVersionBehavior enable new version behavior 139 * @throws IOException problem writing to FS 140 */ 141 private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath, 142 final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys, 143 InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind, 144 Supplier<Collection<HStoreFile>> compactedFilesSupplier, CellComparator comparator, 145 int maxVersions, boolean newVersionBehavior) throws IOException { 146 this.fs = fs; 147 this.historicalFilePath = historicalFilePath; 148 this.conf = conf; 149 this.cacheConf = cacheConf; 150 this.bloomType = bloomType; 151 this.maxKeys = maxKeys; 152 this.favoredNodes = favoredNodes; 153 this.fileContext = fileContext; 154 this.shouldDropCacheBehind = shouldDropCacheBehind; 155 this.compactedFilesSupplier = compactedFilesSupplier; 156 this.comparator = comparator; 157 this.maxVersions = maxVersions; 158 this.newVersionBehavior = newVersionBehavior; 159 liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType, 160 maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 161 } 162 163 public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) { 164 if ( 165 conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES, 166 DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES) 167 ) { 168 // Historical compaction files are supported only for default store engine with 169 // default compactor. 170 String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 171 if (!storeEngine.equals(DefaultStoreEngine.class.getName())) { 172 LOG.warn("Historical compaction file generation is ignored for " + storeEngine 173 + ". hbase.enable.historical.compaction.files can be set to true only for the " 174 + "default compaction (DefaultStoreEngine and DefaultCompactor)"); 175 return false; 176 } 177 String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName()); 178 if (!compactor.equals(DefaultCompactor.class.getName())) { 179 LOG.warn("Historical compaction file generation is ignored for " + compactor 180 + ". hbase.enable.historical.compaction.files can be set to true only for the " 181 + "default compaction (DefaultStoreEngine and DefaultCompactor)"); 182 return false; 183 } 184 return true; 185 } 186 return false; 187 } 188 189 public long getPos() throws IOException { 190 return liveFileWriter.getPos(); 191 } 192 193 /** 194 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 195 * @param maxSequenceId Maximum sequence id. 196 * @param majorCompaction True if this file is product of a major compaction 197 * @throws IOException problem writing to FS 198 */ 199 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 200 throws IOException { 201 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction); 202 if (historicalFileWriter != null) { 203 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction); 204 } 205 } 206 207 /** 208 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 209 * @param maxSequenceId Maximum sequence id. 210 * @param majorCompaction True if this file is product of a major compaction 211 * @param storeFiles The compacted store files to generate this new file 212 * @throws IOException problem writing to FS 213 */ 214 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 215 final Collection<HStoreFile> storeFiles) throws IOException { 216 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); 217 if (historicalFileWriter != null) { 218 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); 219 } 220 } 221 222 /** 223 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 224 * @param maxSequenceId Maximum sequence id. 225 * @param majorCompaction True if this file is product of a major compaction 226 * @param mobCellsCount The number of mob cells. 227 * @throws IOException problem writing to FS 228 */ 229 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 230 final long mobCellsCount) throws IOException { 231 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); 232 if (historicalFileWriter != null) { 233 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); 234 } 235 } 236 237 /** 238 * Appends MOB - specific metadata (even if it is empty) 239 * @param mobRefSet - original table -> set of MOB file names 240 * @throws IOException problem writing to FS 241 */ 242 public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 243 liveFileWriter.appendMobMetadata(mobRefSet); 244 if (historicalFileWriter != null) { 245 historicalFileWriter.appendMobMetadata(mobRefSet); 246 } 247 } 248 249 /** 250 * Add TimestampRange and earliest put timestamp to Metadata 251 */ 252 public void appendTrackedTimestampsToMetadata() throws IOException { 253 // TODO: The StoreFileReader always converts the byte[] to TimeRange 254 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 255 liveFileWriter.appendTrackedTimestampsToMetadata(); 256 if (historicalFileWriter != null) { 257 historicalFileWriter.appendTrackedTimestampsToMetadata(); 258 } 259 } 260 261 @Override 262 public void beforeShipped() throws IOException { 263 liveFileWriter.beforeShipped(); 264 if (historicalFileWriter != null) { 265 historicalFileWriter.beforeShipped(); 266 } 267 } 268 269 public Path getPath() { 270 return liveFileWriter.getPath(); 271 } 272 273 public List<Path> getPaths() { 274 if (historicalFileWriter == null) { 275 return Lists.newArrayList(liveFileWriter.getPath()); 276 } 277 return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath()); 278 } 279 280 public boolean hasGeneralBloom() { 281 return liveFileWriter.hasGeneralBloom(); 282 } 283 284 /** 285 * For unit testing only. 286 * @return the Bloom filter used by this writer. 287 */ 288 BloomFilterWriter getGeneralBloomWriter() { 289 return liveFileWriter.generalBloomFilterWriter; 290 } 291 292 public void close() throws IOException { 293 liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false)); 294 liveFileWriter.close(); 295 if (historicalFileWriter != null) { 296 historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true)); 297 historicalFileWriter.close(); 298 } 299 } 300 301 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 302 liveFileWriter.appendFileInfo(key, value); 303 if (historicalFileWriter != null) { 304 historicalFileWriter.appendFileInfo(key, value); 305 } 306 } 307 308 /** 309 * For use in testing. 310 */ 311 HFile.Writer getLiveFileWriter() { 312 return liveFileWriter.getHFileWriter(); 313 } 314 315 /** 316 * @param dir Directory to create file in. 317 * @return random filename inside passed <code>dir</code> 318 */ 319 public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 320 if (!fs.getFileStatus(dir).isDirectory()) { 321 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 322 } 323 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 324 } 325 326 private SingleStoreFileWriter getHistoricalFileWriter() throws IOException { 327 if (historicalFileWriter == null) { 328 historicalFileWriter = 329 new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys, 330 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 331 } 332 return historicalFileWriter; 333 } 334 335 private void initRowState() { 336 deleteFamily = null; 337 deleteFamilyVersionList.clear(); 338 lastCell = null; 339 } 340 341 private void initColumnState() { 342 livePutCellCount = 0; 343 deleteColumn = null; 344 deleteColumnVersionList.clear(); 345 346 } 347 348 private boolean isDeletedByDeleteFamily(ExtendedCell cell) { 349 return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp() 350 || (deleteFamily.getTimestamp() == cell.getTimestamp() 351 && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId()))); 352 } 353 354 private boolean isDeletedByDeleteFamilyVersion(ExtendedCell cell) { 355 for (ExtendedCell deleteFamilyVersion : deleteFamilyVersionList) { 356 if ( 357 deleteFamilyVersion.getTimestamp() == cell.getTimestamp() 358 && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId()) 359 ) { 360 return true; 361 } 362 } 363 return false; 364 } 365 366 private boolean isDeletedByDeleteColumn(ExtendedCell cell) { 367 return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp() 368 || (deleteColumn.getTimestamp() == cell.getTimestamp() 369 && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId()))); 370 } 371 372 private boolean isDeletedByDeleteColumnVersion(ExtendedCell cell) { 373 for (ExtendedCell deleteColumnVersion : deleteColumnVersionList) { 374 if ( 375 deleteColumnVersion.getTimestamp() == cell.getTimestamp() 376 && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId()) 377 ) { 378 return true; 379 } 380 } 381 return false; 382 } 383 384 private boolean isDeleted(ExtendedCell cell) { 385 return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell) 386 || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell); 387 } 388 389 private void appendCell(ExtendedCell cell) throws IOException { 390 if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) { 391 initColumnState(); 392 } 393 if (cell.getType() == Cell.Type.DeleteFamily) { 394 if (deleteFamily == null) { 395 deleteFamily = cell; 396 liveFileWriter.append(cell); 397 } else { 398 getHistoricalFileWriter().append(cell); 399 } 400 } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) { 401 if (!isDeletedByDeleteFamily(cell)) { 402 deleteFamilyVersionList.add(cell); 403 if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { 404 // This means both the delete-family and delete-family-version markers have the same 405 // timestamp but the sequence id of delete-family-version marker is higher than that of 406 // the delete-family marker. In this case, there is no need to add the 407 // delete-family-version marker to the live version file. This case happens only with 408 // the new version behavior. 409 liveFileWriter.append(cell); 410 } else { 411 liveFileWriter.append(cell); 412 } 413 } else { 414 getHistoricalFileWriter().append(cell); 415 } 416 } else if (cell.getType() == Cell.Type.DeleteColumn) { 417 if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { 418 deleteColumn = cell; 419 liveFileWriter.append(cell); 420 } else { 421 getHistoricalFileWriter().append(cell); 422 } 423 } else if (cell.getType() == Cell.Type.Delete) { 424 if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { 425 deleteColumnVersionList.add(cell); 426 if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { 427 // This means both the delete-family and delete-column-version markers have the same 428 // timestamp but the sequence id of delete-column-version marker is higher than that of 429 // the delete-family marker. In this case, there is no need to add the 430 // delete-column-version marker to the live version file. This case happens only with 431 // the new version behavior. 432 getHistoricalFileWriter().append(cell); 433 } else { 434 liveFileWriter.append(cell); 435 } 436 } else { 437 getHistoricalFileWriter().append(cell); 438 } 439 } else if (cell.getType() == Cell.Type.Put) { 440 if (livePutCellCount < maxVersions) { 441 // This is a live put cell (i.e., the latest version) of a column. Is it deleted? 442 if (!isDeleted(cell)) { 443 liveFileWriter.append(cell); 444 livePutCellCount++; 445 } else { 446 // It is deleted 447 getHistoricalFileWriter().append(cell); 448 if (newVersionBehavior) { 449 // Deleted versions are considered toward total version count when newVersionBehavior 450 livePutCellCount++; 451 } 452 } 453 } else { 454 // It is an older put cell 455 getHistoricalFileWriter().append(cell); 456 } 457 } 458 lastCell = cell; 459 } 460 461 @Override 462 public void appendAll(List<ExtendedCell> cellList) throws IOException { 463 if (historicalFilePath == null) { 464 // The dual writing is not enabled and all cells are written to one file. We use 465 // the live version file in this case 466 for (ExtendedCell cell : cellList) { 467 liveFileWriter.append(cell); 468 } 469 return; 470 } 471 if (cellList.isEmpty()) { 472 return; 473 } 474 if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) { 475 // It is a new row and thus time to reset the state 476 initRowState(); 477 } 478 for (ExtendedCell cell : cellList) { 479 appendCell(cell); 480 } 481 } 482 483 @Override 484 public void append(ExtendedCell cell) throws IOException { 485 if (historicalFilePath == null) { 486 // The dual writing is not enabled and all cells are written to one file. We use 487 // the live version file in this case 488 liveFileWriter.append(cell); 489 return; 490 } 491 appendCell(cell); 492 } 493 494 private static class SingleStoreFileWriter { 495 private final BloomFilterWriter generalBloomFilterWriter; 496 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 497 private final BloomType bloomType; 498 private byte[] bloomParam = null; 499 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 500 private long deleteFamilyCnt = 0; 501 private BloomContext bloomContext = null; 502 private BloomContext deleteFamilyBloomContext = null; 503 private final TimeRangeTracker timeRangeTracker; 504 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 505 506 private HFile.Writer writer; 507 508 /** 509 * Creates an HFile.Writer that also write helpful meta data. 510 * @param fs file system to write to 511 * @param path file name to create 512 * @param conf user configuration 513 * @param bloomType bloom filter setting 514 * @param maxKeys the expected maximum number of keys to be added. Was used for 515 * Bloom filter size in {@link HFile} format version 1. 516 * @param favoredNodes an array of favored nodes or possibly null 517 * @param fileContext The HFile context 518 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 519 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 520 * @throws IOException problem writing to FS 521 */ 522 private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf, 523 CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, 524 HFileContext fileContext, boolean shouldDropCacheBehind, 525 Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException { 526 this.compactedFilesSupplier = compactedFilesSupplier; 527 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 528 // TODO : Change all writers to be specifically created for compaction context 529 writer = 530 HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) 531 .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create(); 532 533 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, 534 bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 535 536 if (generalBloomFilterWriter != null) { 537 this.bloomType = bloomType; 538 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 539 if (LOG.isTraceEnabled()) { 540 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 541 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH 542 ? Bytes.toInt(bloomParam) 543 : Bytes.toStringBinary(bloomParam)) 544 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 545 } 546 // init bloom context 547 switch (bloomType) { 548 case ROW: 549 bloomContext = 550 new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 551 break; 552 case ROWCOL: 553 bloomContext = 554 new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 555 break; 556 case ROWPREFIX_FIXED_LENGTH: 557 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, 558 fileContext.getCellComparator(), Bytes.toInt(bloomParam)); 559 break; 560 default: 561 throw new IOException( 562 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 563 } 564 } else { 565 // Not using Bloom filters. 566 this.bloomType = BloomType.NONE; 567 } 568 569 // initialize delete family Bloom filter when there is NO RowCol Bloom filter 570 if (this.bloomType != BloomType.ROWCOL) { 571 this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf, 572 cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 573 deleteFamilyBloomContext = 574 new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); 575 } else { 576 deleteFamilyBloomFilterWriter = null; 577 } 578 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 579 LOG.trace("Delete Family Bloom filter type for " + path + ": " 580 + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 581 } 582 } 583 584 private long getPos() throws IOException { 585 return ((HFileWriterImpl) writer).getPos(); 586 } 587 588 /** 589 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 590 * @param maxSequenceId Maximum sequence id. 591 * @param majorCompaction True if this file is product of a major compaction 592 * @throws IOException problem writing to FS 593 */ 594 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 595 throws IOException { 596 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 597 } 598 599 /** 600 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 601 * @param maxSequenceId Maximum sequence id. 602 * @param majorCompaction True if this file is product of a major compaction 603 * @param storeFiles The compacted store files to generate this new file 604 * @throws IOException problem writing to FS 605 */ 606 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 607 final Collection<HStoreFile> storeFiles) throws IOException { 608 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 609 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 610 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 611 appendTrackedTimestampsToMetadata(); 612 } 613 614 /** 615 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The 616 * compacted store files's name is needed. But if the compacted store file is a result of 617 * compaction, it's compacted files which still not archived is needed, too. And don't need to 618 * add compacted files recursively. If file A, B, C compacted to new file D, and file D 619 * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E 620 * compacted to new file F, will add E to F's compacted files first, then add E's compacted 621 * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has 622 * been in E's compacted files, too. See HBASE-20724 for more details. 623 * @param storeFiles The compacted store files to generate this new file 624 * @return bytes of CompactionEventTracker 625 */ 626 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 627 Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream() 628 .map(sf -> sf.getPath().getName()).collect(Collectors.toSet()); 629 Set<String> compactedStoreFiles = new HashSet<>(); 630 for (HStoreFile storeFile : storeFiles) { 631 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 632 for (String csf : storeFile.getCompactedStoreFiles()) { 633 if (notArchivedCompactedStoreFiles.contains(csf)) { 634 compactedStoreFiles.add(csf); 635 } 636 } 637 } 638 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 639 } 640 641 /** 642 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 643 * @param maxSequenceId Maximum sequence id. 644 * @param majorCompaction True if this file is product of a major compaction 645 * @param mobCellsCount The number of mob cells. 646 * @throws IOException problem writing to FS 647 */ 648 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 649 final long mobCellsCount) throws IOException { 650 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 651 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 652 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 653 appendTrackedTimestampsToMetadata(); 654 } 655 656 /** 657 * Appends MOB - specific metadata (even if it is empty) 658 * @param mobRefSet - original table -> set of MOB file names 659 * @throws IOException problem writing to FS 660 */ 661 private void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 662 writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet)); 663 } 664 665 /** 666 * Add TimestampRange and earliest put timestamp to Metadata 667 */ 668 private void appendTrackedTimestampsToMetadata() throws IOException { 669 // TODO: The StoreFileReader always converts the byte[] to TimeRange 670 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 671 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 672 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 673 } 674 675 /** 676 * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker 677 * to include the timestamp of this key 678 */ 679 private void trackTimestamps(final ExtendedCell cell) { 680 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 681 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 682 } 683 timeRangeTracker.includeTimestamp(cell); 684 } 685 686 private void appendGeneralBloomfilter(final ExtendedCell cell) throws IOException { 687 if (this.generalBloomFilterWriter != null) { 688 /* 689 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue. 690 * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of 691 * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed 692 * Length Row Prefix 693 */ 694 bloomContext.writeBloom(cell); 695 } 696 } 697 698 private void appendDeleteFamilyBloomFilter(final ExtendedCell cell) throws IOException { 699 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 700 return; 701 } 702 703 // increase the number of delete family in the store file 704 deleteFamilyCnt++; 705 if (this.deleteFamilyBloomFilterWriter != null) { 706 deleteFamilyBloomContext.writeBloom(cell); 707 } 708 } 709 710 private void append(final ExtendedCell cell) throws IOException { 711 appendGeneralBloomfilter(cell); 712 appendDeleteFamilyBloomFilter(cell); 713 writer.append(cell); 714 trackTimestamps(cell); 715 } 716 717 private void beforeShipped() throws IOException { 718 // For now these writer will always be of type ShipperListener true. 719 // TODO : Change all writers to be specifically created for compaction context 720 writer.beforeShipped(); 721 if (generalBloomFilterWriter != null) { 722 generalBloomFilterWriter.beforeShipped(); 723 } 724 if (deleteFamilyBloomFilterWriter != null) { 725 deleteFamilyBloomFilterWriter.beforeShipped(); 726 } 727 } 728 729 private Path getPath() { 730 return this.writer.getPath(); 731 } 732 733 private boolean hasGeneralBloom() { 734 return this.generalBloomFilterWriter != null; 735 } 736 737 /** 738 * For unit testing only. 739 * @return the Bloom filter used by this writer. 740 */ 741 BloomFilterWriter getGeneralBloomWriter() { 742 return generalBloomFilterWriter; 743 } 744 745 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 746 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 747 if (haveBloom) { 748 bfw.compactBloom(); 749 } 750 return haveBloom; 751 } 752 753 private boolean closeGeneralBloomFilter() throws IOException { 754 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 755 756 // add the general Bloom filter writer and append file info 757 if (hasGeneralBloom) { 758 writer.addGeneralBloomFilter(generalBloomFilterWriter); 759 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 760 if (bloomParam != null) { 761 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 762 } 763 bloomContext.addLastBloomKey(writer); 764 } 765 return hasGeneralBloom; 766 } 767 768 private boolean closeDeleteFamilyBloomFilter() throws IOException { 769 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 770 771 // add the delete family Bloom filter writer 772 if (hasDeleteFamilyBloom) { 773 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 774 } 775 776 // append file info about the number of delete family kvs 777 // even if there is no delete family Bloom. 778 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 779 780 return hasDeleteFamilyBloom; 781 } 782 783 private void close() throws IOException { 784 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 785 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 786 787 writer.close(); 788 789 // Log final Bloom filter statistics. This needs to be done after close() 790 // because compound Bloom filters might be finalized as part of closing. 791 if (LOG.isTraceEnabled()) { 792 LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " 793 + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " 794 + getPath()); 795 } 796 797 } 798 799 private void appendFileInfo(byte[] key, byte[] value) throws IOException { 800 writer.appendFileInfo(key, value); 801 } 802 803 /** 804 * For use in testing. 805 */ 806 private HFile.Writer getHFileWriter() { 807 return writer; 808 } 809 } 810 811 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 812 justification = "Will not overflow") 813 public static class Builder { 814 private final Configuration conf; 815 private final CacheConfig cacheConf; 816 private final FileSystem fs; 817 818 private BloomType bloomType = BloomType.NONE; 819 private long maxKeyCount = 0; 820 private Path dir; 821 private Path liveFilePath; 822 private Path historicalFilePath; 823 824 private InetSocketAddress[] favoredNodes; 825 private HFileContext fileContext; 826 private boolean shouldDropCacheBehind; 827 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 828 private String fileStoragePolicy; 829 // this is used to track the creation of the StoreFileWriter, mainly used for the SFT 830 // implementation where we will write store files directly to the final place, instead of 831 // writing a tmp file first. Under this scenario, we will have a background task to purge the 832 // store files which are not recorded in the SFT, but for the newly created store file writer, 833 // they are not tracked in SFT, so here we need to record them and treat them specially. 834 private Consumer<Path> writerCreationTracker; 835 private int maxVersions; 836 private boolean newVersionBehavior; 837 private CellComparator comparator; 838 private boolean isCompaction; 839 840 public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { 841 this.conf = conf; 842 this.cacheConf = cacheConf; 843 this.fs = fs; 844 } 845 846 /** 847 * Creates Builder with cache configuration disabled 848 */ 849 public Builder(Configuration conf, FileSystem fs) { 850 this.conf = conf; 851 this.cacheConf = CacheConfig.DISABLED; 852 this.fs = fs; 853 } 854 855 /** 856 * Use either this method or {@link #withFilePath}, but not both. 857 * @param dir Path to column family directory. The directory is created if does not exist. The 858 * file is given a unique name within this directory. 859 * @return this (for chained invocation) 860 */ 861 public Builder withOutputDir(Path dir) { 862 Preconditions.checkNotNull(dir); 863 this.dir = dir; 864 return this; 865 } 866 867 /** 868 * Use either this method or {@link #withOutputDir}, but not both. 869 * @param filePath the StoreFile path to write 870 * @return this (for chained invocation) 871 */ 872 public Builder withFilePath(Path filePath) { 873 Preconditions.checkNotNull(filePath); 874 this.liveFilePath = filePath; 875 return this; 876 } 877 878 /** 879 * @param favoredNodes an array of favored nodes or possibly null 880 * @return this (for chained invocation) 881 */ 882 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 883 this.favoredNodes = favoredNodes; 884 return this; 885 } 886 887 public Builder withBloomType(BloomType bloomType) { 888 Preconditions.checkNotNull(bloomType); 889 this.bloomType = bloomType; 890 return this; 891 } 892 893 /** 894 * @param maxKeyCount estimated maximum number of keys we expect to add 895 * @return this (for chained invocation) 896 */ 897 public Builder withMaxKeyCount(long maxKeyCount) { 898 this.maxKeyCount = maxKeyCount; 899 return this; 900 } 901 902 public Builder withFileContext(HFileContext fileContext) { 903 this.fileContext = fileContext; 904 return this; 905 } 906 907 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 908 this.shouldDropCacheBehind = shouldDropCacheBehind; 909 return this; 910 } 911 912 public Builder 913 withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 914 this.compactedFilesSupplier = compactedFilesSupplier; 915 return this; 916 } 917 918 public Builder withFileStoragePolicy(String fileStoragePolicy) { 919 this.fileStoragePolicy = fileStoragePolicy; 920 return this; 921 } 922 923 public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) { 924 this.writerCreationTracker = writerCreationTracker; 925 return this; 926 } 927 928 public Builder withMaxVersions(int maxVersions) { 929 this.maxVersions = maxVersions; 930 return this; 931 } 932 933 public Builder withNewVersionBehavior(boolean newVersionBehavior) { 934 this.newVersionBehavior = newVersionBehavior; 935 return this; 936 } 937 938 public Builder withCellComparator(CellComparator comparator) { 939 this.comparator = comparator; 940 return this; 941 } 942 943 public Builder withIsCompaction(boolean isCompaction) { 944 this.isCompaction = isCompaction; 945 return this; 946 } 947 948 /** 949 * Create a store file writer. Client is responsible for closing file when done. If metadata, 950 * add BEFORE closing using {@link StoreFileWriter#appendMetadata}. 951 */ 952 public StoreFileWriter build() throws IOException { 953 if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) { 954 throw new IllegalArgumentException("Either specify parent directory " + "or file path"); 955 } 956 957 if (dir == null) { 958 dir = liveFilePath.getParent(); 959 } 960 961 if (!fs.exists(dir)) { 962 // Handle permission for non-HDFS filesystem properly 963 // See HBASE-17710 964 HRegionFileSystem.mkdirs(fs, conf, dir); 965 } 966 967 // set block storage policy for temp path 968 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 969 if (null == policyName) { 970 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 971 } 972 CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); 973 974 if (liveFilePath == null) { 975 // The stored file and related blocks will used the directory based StoragePolicy. 976 // Because HDFS DistributedFileSystem does not support create files with storage policy 977 // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files 978 // satisfy the specific storage policy when writing. So as to avoid later data movement. 979 // We don't want to change whole temp dir to 'fileStoragePolicy'. 980 if (!Strings.isNullOrEmpty(fileStoragePolicy)) { 981 dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy); 982 if (!fs.exists(dir)) { 983 HRegionFileSystem.mkdirs(fs, conf, dir); 984 LOG.info( 985 "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy); 986 } 987 CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); 988 } 989 liveFilePath = getUniqueFile(fs, dir); 990 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 991 bloomType = BloomType.NONE; 992 } 993 } 994 995 if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) { 996 historicalFilePath = getUniqueFile(fs, dir); 997 } 998 999 // make sure we call this before actually create the writer 1000 // in fact, it is not a big deal to even add an inexistent file to the track, as we will never 1001 // try to delete it and finally we will clean the tracker up after compaction. But if the file 1002 // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file 1003 // and cause problem. 1004 if (writerCreationTracker != null) { 1005 writerCreationTracker.accept(liveFilePath); 1006 if (historicalFilePath != null) { 1007 writerCreationTracker.accept(historicalFilePath); 1008 } 1009 } 1010 return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType, 1011 maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier, 1012 comparator, maxVersions, newVersionBehavior); 1013 } 1014 } 1015}