001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 029import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 030import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS; 031import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 032import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY; 033 034import java.io.IOException; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.HashSet; 040import java.util.List; 041import java.util.Set; 042import java.util.UUID; 043import java.util.function.Consumer; 044import java.util.function.Supplier; 045import java.util.regex.Pattern; 046import java.util.stream.Collectors; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Cell; 051import org.apache.hadoop.hbase.CellComparator; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.ExtendedCell; 054import org.apache.hadoop.hbase.HBaseInterfaceAudience; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.PrivateCellUtil; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.io.hfile.CacheConfig; 061import org.apache.hadoop.hbase.io.hfile.HFile; 062import org.apache.hadoop.hbase.io.hfile.HFileContext; 063import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 064import org.apache.hadoop.hbase.mob.MobUtils; 065import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 066import org.apache.hadoop.hbase.util.BloomContext; 067import org.apache.hadoop.hbase.util.BloomFilterFactory; 068import org.apache.hadoop.hbase.util.BloomFilterUtil; 069import org.apache.hadoop.hbase.util.BloomFilterWriter; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.CommonFSUtils; 072import org.apache.hadoop.hbase.util.RowBloomContext; 073import org.apache.hadoop.hbase.util.RowColBloomContext; 074import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 080import org.apache.hbase.thirdparty.com.google.common.base.Strings; 081import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 082import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 083 084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 085 086/** 087 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is 088 * an implementation detail of the HBase regionserver. 089 */ 090@InterfaceAudience.Private 091public class StoreFileWriter implements CellSink, ShipperListener { 092 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 093 public static final String ENABLE_HISTORICAL_COMPACTION_FILES = 094 "hbase.enable.historical.compaction.files"; 095 public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false; 096 private static final Pattern dash = Pattern.compile("-"); 097 private SingleStoreFileWriter liveFileWriter; 098 private SingleStoreFileWriter historicalFileWriter; 099 private final FileSystem fs; 100 private final Path historicalFilePath; 101 private final Configuration conf; 102 private final CacheConfig cacheConf; 103 private final BloomType bloomType; 104 private final long maxKeys; 105 private final InetSocketAddress[] favoredNodes; 106 private final HFileContext fileContext; 107 private final boolean shouldDropCacheBehind; 108 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 109 private final CellComparator comparator; 110 private ExtendedCell lastCell; 111 // The first (latest) delete family marker of the current row 112 private ExtendedCell deleteFamily; 113 // The list of delete family version markers of the current row 114 private List<ExtendedCell> deleteFamilyVersionList = new ArrayList<>(); 115 // The first (latest) delete column marker of the current column 116 private ExtendedCell deleteColumn; 117 // The list of delete column version markers of the current column 118 private List<ExtendedCell> deleteColumnVersionList = new ArrayList<>(); 119 // The live put cell count for the current column 120 private int livePutCellCount; 121 private final int maxVersions; 122 private final boolean newVersionBehavior; 123 124 /** 125 * Creates an HFile.Writer that also write helpful meta data. 126 * @param fs file system to write to 127 * @param liveFilePath the name of the live file to create 128 * @param historicalFilePath the name of the historical file name to create 129 * @param conf user configuration 130 * @param bloomType bloom filter setting 131 * @param maxKeys the expected maximum number of keys to be added. Was used for 132 * Bloom filter size in {@link HFile} format version 1. 133 * @param favoredNodes an array of favored nodes or possibly null 134 * @param fileContext The HFile context 135 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 136 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 137 * @param comparator Cell comparator 138 * @param maxVersions max cell versions 139 * @param newVersionBehavior enable new version behavior 140 * @throws IOException problem writing to FS 141 */ 142 private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath, 143 final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys, 144 InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind, 145 Supplier<Collection<HStoreFile>> compactedFilesSupplier, CellComparator comparator, 146 int maxVersions, boolean newVersionBehavior) throws IOException { 147 this.fs = fs; 148 this.historicalFilePath = historicalFilePath; 149 this.conf = conf; 150 this.cacheConf = cacheConf; 151 this.bloomType = bloomType; 152 this.maxKeys = maxKeys; 153 this.favoredNodes = favoredNodes; 154 this.fileContext = fileContext; 155 this.shouldDropCacheBehind = shouldDropCacheBehind; 156 this.compactedFilesSupplier = compactedFilesSupplier; 157 this.comparator = comparator; 158 this.maxVersions = maxVersions; 159 this.newVersionBehavior = newVersionBehavior; 160 liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType, 161 maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 162 } 163 164 public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) { 165 if ( 166 conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES, 167 DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES) 168 ) { 169 // Historical compaction files are supported only for default store engine with 170 // default compactor. 171 String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 172 if (!storeEngine.equals(DefaultStoreEngine.class.getName())) { 173 LOG.warn("Historical compaction file generation is ignored for " + storeEngine 174 + ". hbase.enable.historical.compaction.files can be set to true only for the " 175 + "default compaction (DefaultStoreEngine and DefaultCompactor)"); 176 return false; 177 } 178 String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName()); 179 if (!compactor.equals(DefaultCompactor.class.getName())) { 180 LOG.warn("Historical compaction file generation is ignored for " + compactor 181 + ". hbase.enable.historical.compaction.files can be set to true only for the " 182 + "default compaction (DefaultStoreEngine and DefaultCompactor)"); 183 return false; 184 } 185 return true; 186 } 187 return false; 188 } 189 190 public long getPos() throws IOException { 191 return liveFileWriter.getPos(); 192 } 193 194 /** 195 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 196 * @param maxSequenceId Maximum sequence id. 197 * @param majorCompaction True if this file is product of a major compaction 198 * @throws IOException problem writing to FS 199 */ 200 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 201 throws IOException { 202 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction); 203 if (historicalFileWriter != null) { 204 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction); 205 } 206 } 207 208 /** 209 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 210 * @param maxSequenceId Maximum sequence id. 211 * @param majorCompaction True if this file is product of a major compaction 212 * @param storeFiles The compacted store files to generate this new file 213 * @throws IOException problem writing to FS 214 */ 215 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 216 final Collection<HStoreFile> storeFiles) throws IOException { 217 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); 218 if (historicalFileWriter != null) { 219 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); 220 } 221 } 222 223 /** 224 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 225 * @param maxSequenceId Maximum sequence id. 226 * @param majorCompaction True if this file is product of a major compaction 227 * @param mobCellsCount The number of mob cells. 228 * @throws IOException problem writing to FS 229 */ 230 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 231 final long mobCellsCount) throws IOException { 232 liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); 233 if (historicalFileWriter != null) { 234 historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); 235 } 236 } 237 238 /** 239 * Appends MOB - specific metadata (even if it is empty) 240 * @param mobRefSet - original table -> set of MOB file names 241 * @throws IOException problem writing to FS 242 */ 243 public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 244 liveFileWriter.appendMobMetadata(mobRefSet); 245 if (historicalFileWriter != null) { 246 historicalFileWriter.appendMobMetadata(mobRefSet); 247 } 248 } 249 250 /** 251 * Add TimestampRange and earliest put timestamp to Metadata 252 */ 253 public void appendTrackedTimestampsToMetadata() throws IOException { 254 // TODO: The StoreFileReader always converts the byte[] to TimeRange 255 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 256 liveFileWriter.appendTrackedTimestampsToMetadata(); 257 if (historicalFileWriter != null) { 258 historicalFileWriter.appendTrackedTimestampsToMetadata(); 259 } 260 } 261 262 @Override 263 public void beforeShipped() throws IOException { 264 liveFileWriter.beforeShipped(); 265 if (historicalFileWriter != null) { 266 historicalFileWriter.beforeShipped(); 267 } 268 } 269 270 public Path getPath() { 271 return liveFileWriter.getPath(); 272 } 273 274 public List<Path> getPaths() { 275 if (historicalFileWriter == null) { 276 return Lists.newArrayList(liveFileWriter.getPath()); 277 } 278 return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath()); 279 } 280 281 public boolean hasGeneralBloom() { 282 return liveFileWriter.hasGeneralBloom(); 283 } 284 285 /** 286 * For unit testing only. 287 * @return the Bloom filter used by this writer. 288 */ 289 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 290 public BloomFilterWriter getGeneralBloomWriter() { 291 return liveFileWriter.generalBloomFilterWriter; 292 } 293 294 public void close() throws IOException { 295 liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false)); 296 liveFileWriter.close(); 297 if (historicalFileWriter != null) { 298 historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true)); 299 historicalFileWriter.close(); 300 } 301 } 302 303 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 304 liveFileWriter.appendFileInfo(key, value); 305 if (historicalFileWriter != null) { 306 historicalFileWriter.appendFileInfo(key, value); 307 } 308 } 309 310 /** 311 * For use in testing. 312 */ 313 HFile.Writer getLiveFileWriter() { 314 return liveFileWriter.getHFileWriter(); 315 } 316 317 /** 318 * @param dir Directory to create file in. 319 * @return random filename inside passed <code>dir</code> 320 */ 321 public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 322 if (!fs.getFileStatus(dir).isDirectory()) { 323 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 324 } 325 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 326 } 327 328 private SingleStoreFileWriter getHistoricalFileWriter() throws IOException { 329 if (historicalFileWriter == null) { 330 historicalFileWriter = 331 new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys, 332 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 333 } 334 return historicalFileWriter; 335 } 336 337 private void initRowState() { 338 deleteFamily = null; 339 deleteFamilyVersionList.clear(); 340 lastCell = null; 341 } 342 343 private void initColumnState() { 344 livePutCellCount = 0; 345 deleteColumn = null; 346 deleteColumnVersionList.clear(); 347 348 } 349 350 private boolean isDeletedByDeleteFamily(ExtendedCell cell) { 351 return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp() 352 || (deleteFamily.getTimestamp() == cell.getTimestamp() 353 && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId()))); 354 } 355 356 private boolean isDeletedByDeleteFamilyVersion(ExtendedCell cell) { 357 for (ExtendedCell deleteFamilyVersion : deleteFamilyVersionList) { 358 if ( 359 deleteFamilyVersion.getTimestamp() == cell.getTimestamp() 360 && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId()) 361 ) { 362 return true; 363 } 364 } 365 return false; 366 } 367 368 private boolean isDeletedByDeleteColumn(ExtendedCell cell) { 369 return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp() 370 || (deleteColumn.getTimestamp() == cell.getTimestamp() 371 && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId()))); 372 } 373 374 private boolean isDeletedByDeleteColumnVersion(ExtendedCell cell) { 375 for (ExtendedCell deleteColumnVersion : deleteColumnVersionList) { 376 if ( 377 deleteColumnVersion.getTimestamp() == cell.getTimestamp() 378 && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId()) 379 ) { 380 return true; 381 } 382 } 383 return false; 384 } 385 386 private boolean isDeleted(ExtendedCell cell) { 387 return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell) 388 || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell); 389 } 390 391 private void appendCell(ExtendedCell cell) throws IOException { 392 if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) { 393 initColumnState(); 394 } 395 if (cell.getType() == Cell.Type.DeleteFamily) { 396 if (deleteFamily == null) { 397 deleteFamily = cell; 398 liveFileWriter.append(cell); 399 } else { 400 getHistoricalFileWriter().append(cell); 401 } 402 } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) { 403 if (!isDeletedByDeleteFamily(cell)) { 404 deleteFamilyVersionList.add(cell); 405 if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { 406 // This means both the delete-family and delete-family-version markers have the same 407 // timestamp but the sequence id of delete-family-version marker is higher than that of 408 // the delete-family marker. In this case, there is no need to add the 409 // delete-family-version marker to the live version file. This case happens only with 410 // the new version behavior. 411 liveFileWriter.append(cell); 412 } else { 413 liveFileWriter.append(cell); 414 } 415 } else { 416 getHistoricalFileWriter().append(cell); 417 } 418 } else if (cell.getType() == Cell.Type.DeleteColumn) { 419 if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { 420 deleteColumn = cell; 421 liveFileWriter.append(cell); 422 } else { 423 getHistoricalFileWriter().append(cell); 424 } 425 } else if (cell.getType() == Cell.Type.Delete) { 426 if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { 427 deleteColumnVersionList.add(cell); 428 if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { 429 // This means both the delete-family and delete-column-version markers have the same 430 // timestamp but the sequence id of delete-column-version marker is higher than that of 431 // the delete-family marker. In this case, there is no need to add the 432 // delete-column-version marker to the live version file. This case happens only with 433 // the new version behavior. 434 getHistoricalFileWriter().append(cell); 435 } else { 436 liveFileWriter.append(cell); 437 } 438 } else { 439 getHistoricalFileWriter().append(cell); 440 } 441 } else if (cell.getType() == Cell.Type.Put) { 442 if (livePutCellCount < maxVersions) { 443 // This is a live put cell (i.e., the latest version) of a column. Is it deleted? 444 if (!isDeleted(cell)) { 445 liveFileWriter.append(cell); 446 livePutCellCount++; 447 } else { 448 // It is deleted 449 getHistoricalFileWriter().append(cell); 450 if (newVersionBehavior) { 451 // Deleted versions are considered toward total version count when newVersionBehavior 452 livePutCellCount++; 453 } 454 } 455 } else { 456 // It is an older put cell 457 getHistoricalFileWriter().append(cell); 458 } 459 } 460 lastCell = cell; 461 } 462 463 @Override 464 public void appendAll(List<ExtendedCell> cellList) throws IOException { 465 if (historicalFilePath == null) { 466 // The dual writing is not enabled and all cells are written to one file. We use 467 // the live version file in this case 468 for (ExtendedCell cell : cellList) { 469 liveFileWriter.append(cell); 470 } 471 return; 472 } 473 if (cellList.isEmpty()) { 474 return; 475 } 476 if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) { 477 // It is a new row and thus time to reset the state 478 initRowState(); 479 } 480 for (ExtendedCell cell : cellList) { 481 appendCell(cell); 482 } 483 } 484 485 @Override 486 public void append(ExtendedCell cell) throws IOException { 487 if (historicalFilePath == null) { 488 // The dual writing is not enabled and all cells are written to one file. We use 489 // the live version file in this case 490 liveFileWriter.append(cell); 491 return; 492 } 493 appendCell(cell); 494 } 495 496 private static class SingleStoreFileWriter { 497 private final BloomFilterWriter generalBloomFilterWriter; 498 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 499 private final BloomType bloomType; 500 private byte[] bloomParam = null; 501 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 502 private long deleteFamilyCnt = 0; 503 private BloomContext bloomContext = null; 504 private BloomContext deleteFamilyBloomContext = null; 505 private final TimeRangeTracker timeRangeTracker; 506 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 507 508 private HFile.Writer writer; 509 510 /** 511 * Creates an HFile.Writer that also write helpful meta data. 512 * @param fs file system to write to 513 * @param path file name to create 514 * @param conf user configuration 515 * @param bloomType bloom filter setting 516 * @param maxKeys the expected maximum number of keys to be added. Was used for 517 * Bloom filter size in {@link HFile} format version 1. 518 * @param favoredNodes an array of favored nodes or possibly null 519 * @param fileContext The HFile context 520 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 521 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 522 * @throws IOException problem writing to FS 523 */ 524 private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf, 525 CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, 526 HFileContext fileContext, boolean shouldDropCacheBehind, 527 Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException { 528 this.compactedFilesSupplier = compactedFilesSupplier; 529 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 530 // TODO : Change all writers to be specifically created for compaction context 531 writer = 532 HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) 533 .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create(); 534 535 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, 536 bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 537 538 if (generalBloomFilterWriter != null) { 539 this.bloomType = bloomType; 540 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 541 if (LOG.isTraceEnabled()) { 542 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 543 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH 544 ? Bytes.toInt(bloomParam) 545 : Bytes.toStringBinary(bloomParam)) 546 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 547 } 548 // init bloom context 549 switch (bloomType) { 550 case ROW: 551 bloomContext = 552 new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 553 break; 554 case ROWCOL: 555 bloomContext = 556 new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 557 break; 558 case ROWPREFIX_FIXED_LENGTH: 559 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, 560 fileContext.getCellComparator(), Bytes.toInt(bloomParam)); 561 break; 562 default: 563 throw new IOException( 564 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 565 } 566 } else { 567 // Not using Bloom filters. 568 this.bloomType = BloomType.NONE; 569 } 570 571 // initialize delete family Bloom filter when there is NO RowCol Bloom filter 572 if (this.bloomType != BloomType.ROWCOL) { 573 this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf, 574 cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 575 deleteFamilyBloomContext = 576 new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); 577 } else { 578 deleteFamilyBloomFilterWriter = null; 579 } 580 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 581 LOG.trace("Delete Family Bloom filter type for " + path + ": " 582 + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 583 } 584 } 585 586 private long getPos() throws IOException { 587 return ((HFileWriterImpl) writer).getPos(); 588 } 589 590 /** 591 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 592 * @param maxSequenceId Maximum sequence id. 593 * @param majorCompaction True if this file is product of a major compaction 594 * @throws IOException problem writing to FS 595 */ 596 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 597 throws IOException { 598 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 599 } 600 601 /** 602 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 603 * @param maxSequenceId Maximum sequence id. 604 * @param majorCompaction True if this file is product of a major compaction 605 * @param storeFiles The compacted store files to generate this new file 606 * @throws IOException problem writing to FS 607 */ 608 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 609 final Collection<HStoreFile> storeFiles) throws IOException { 610 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 611 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 612 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 613 appendTrackedTimestampsToMetadata(); 614 } 615 616 /** 617 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The 618 * compacted store files's name is needed. But if the compacted store file is a result of 619 * compaction, it's compacted files which still not archived is needed, too. And don't need to 620 * add compacted files recursively. If file A, B, C compacted to new file D, and file D 621 * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E 622 * compacted to new file F, will add E to F's compacted files first, then add E's compacted 623 * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has 624 * been in E's compacted files, too. See HBASE-20724 for more details. 625 * @param storeFiles The compacted store files to generate this new file 626 * @return bytes of CompactionEventTracker 627 */ 628 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 629 Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream() 630 .map(sf -> sf.getPath().getName()).collect(Collectors.toSet()); 631 Set<String> compactedStoreFiles = new HashSet<>(); 632 for (HStoreFile storeFile : storeFiles) { 633 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 634 for (String csf : storeFile.getCompactedStoreFiles()) { 635 if (notArchivedCompactedStoreFiles.contains(csf)) { 636 compactedStoreFiles.add(csf); 637 } 638 } 639 } 640 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 641 } 642 643 /** 644 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 645 * @param maxSequenceId Maximum sequence id. 646 * @param majorCompaction True if this file is product of a major compaction 647 * @param mobCellsCount The number of mob cells. 648 * @throws IOException problem writing to FS 649 */ 650 private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 651 final long mobCellsCount) throws IOException { 652 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 653 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 654 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 655 appendTrackedTimestampsToMetadata(); 656 } 657 658 /** 659 * Appends MOB - specific metadata (even if it is empty) 660 * @param mobRefSet - original table -> set of MOB file names 661 * @throws IOException problem writing to FS 662 */ 663 private void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 664 writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet)); 665 } 666 667 /** 668 * Add TimestampRange and earliest put timestamp to Metadata 669 */ 670 private void appendTrackedTimestampsToMetadata() throws IOException { 671 // TODO: The StoreFileReader always converts the byte[] to TimeRange 672 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 673 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 674 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 675 } 676 677 /** 678 * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker 679 * to include the timestamp of this key 680 */ 681 private void trackTimestamps(final ExtendedCell cell) { 682 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 683 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 684 } 685 timeRangeTracker.includeTimestamp(cell); 686 } 687 688 private void appendGeneralBloomfilter(final ExtendedCell cell) throws IOException { 689 if (this.generalBloomFilterWriter != null) { 690 /* 691 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue. 692 * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of 693 * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed 694 * Length Row Prefix 695 */ 696 bloomContext.writeBloom(cell); 697 } 698 } 699 700 private void appendDeleteFamilyBloomFilter(final ExtendedCell cell) throws IOException { 701 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 702 return; 703 } 704 705 // increase the number of delete family in the store file 706 deleteFamilyCnt++; 707 if (this.deleteFamilyBloomFilterWriter != null) { 708 deleteFamilyBloomContext.writeBloom(cell); 709 } 710 } 711 712 private void append(final ExtendedCell cell) throws IOException { 713 appendGeneralBloomfilter(cell); 714 appendDeleteFamilyBloomFilter(cell); 715 writer.append(cell); 716 trackTimestamps(cell); 717 } 718 719 private void beforeShipped() throws IOException { 720 // For now these writer will always be of type ShipperListener true. 721 // TODO : Change all writers to be specifically created for compaction context 722 writer.beforeShipped(); 723 if (generalBloomFilterWriter != null) { 724 generalBloomFilterWriter.beforeShipped(); 725 } 726 if (deleteFamilyBloomFilterWriter != null) { 727 deleteFamilyBloomFilterWriter.beforeShipped(); 728 } 729 } 730 731 private Path getPath() { 732 return this.writer.getPath(); 733 } 734 735 private boolean hasGeneralBloom() { 736 return this.generalBloomFilterWriter != null; 737 } 738 739 /** 740 * For unit testing only. 741 * @return the Bloom filter used by this writer. 742 */ 743 BloomFilterWriter getGeneralBloomWriter() { 744 return generalBloomFilterWriter; 745 } 746 747 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 748 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 749 if (haveBloom) { 750 bfw.compactBloom(); 751 } 752 return haveBloom; 753 } 754 755 private boolean closeGeneralBloomFilter() throws IOException { 756 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 757 758 // add the general Bloom filter writer and append file info 759 if (hasGeneralBloom) { 760 writer.addGeneralBloomFilter(generalBloomFilterWriter); 761 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 762 if (bloomParam != null) { 763 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 764 } 765 bloomContext.addLastBloomKey(writer); 766 } 767 return hasGeneralBloom; 768 } 769 770 private boolean closeDeleteFamilyBloomFilter() throws IOException { 771 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 772 773 // add the delete family Bloom filter writer 774 if (hasDeleteFamilyBloom) { 775 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 776 } 777 778 // append file info about the number of delete family kvs 779 // even if there is no delete family Bloom. 780 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 781 782 return hasDeleteFamilyBloom; 783 } 784 785 private void close() throws IOException { 786 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 787 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 788 789 writer.close(); 790 791 // Log final Bloom filter statistics. This needs to be done after close() 792 // because compound Bloom filters might be finalized as part of closing. 793 if (LOG.isTraceEnabled()) { 794 LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " 795 + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " 796 + getPath()); 797 } 798 799 } 800 801 private void appendFileInfo(byte[] key, byte[] value) throws IOException { 802 writer.appendFileInfo(key, value); 803 } 804 805 /** 806 * For use in testing. 807 */ 808 private HFile.Writer getHFileWriter() { 809 return writer; 810 } 811 } 812 813 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 814 justification = "Will not overflow") 815 public static class Builder { 816 private final Configuration conf; 817 private final CacheConfig cacheConf; 818 private final FileSystem fs; 819 820 private BloomType bloomType = BloomType.NONE; 821 private long maxKeyCount = 0; 822 private Path dir; 823 private Path liveFilePath; 824 private Path historicalFilePath; 825 826 private InetSocketAddress[] favoredNodes; 827 private HFileContext fileContext; 828 private boolean shouldDropCacheBehind; 829 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 830 private String fileStoragePolicy; 831 // this is used to track the creation of the StoreFileWriter, mainly used for the SFT 832 // implementation where we will write store files directly to the final place, instead of 833 // writing a tmp file first. Under this scenario, we will have a background task to purge the 834 // store files which are not recorded in the SFT, but for the newly created store file writer, 835 // they are not tracked in SFT, so here we need to record them and treat them specially. 836 private Consumer<Path> writerCreationTracker; 837 private int maxVersions; 838 private boolean newVersionBehavior; 839 private CellComparator comparator; 840 private boolean isCompaction; 841 842 public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { 843 this.conf = conf; 844 this.cacheConf = cacheConf; 845 this.fs = fs; 846 } 847 848 /** 849 * Creates Builder with cache configuration disabled 850 */ 851 public Builder(Configuration conf, FileSystem fs) { 852 this.conf = conf; 853 this.cacheConf = CacheConfig.DISABLED; 854 this.fs = fs; 855 } 856 857 /** 858 * Use either this method or {@link #withFilePath}, but not both. 859 * @param dir Path to column family directory. The directory is created if does not exist. The 860 * file is given a unique name within this directory. 861 * @return this (for chained invocation) 862 */ 863 public Builder withOutputDir(Path dir) { 864 Preconditions.checkNotNull(dir); 865 this.dir = dir; 866 return this; 867 } 868 869 /** 870 * Use either this method or {@link #withOutputDir}, but not both. 871 * @param filePath the StoreFile path to write 872 * @return this (for chained invocation) 873 */ 874 public Builder withFilePath(Path filePath) { 875 Preconditions.checkNotNull(filePath); 876 this.liveFilePath = filePath; 877 return this; 878 } 879 880 /** 881 * @param favoredNodes an array of favored nodes or possibly null 882 * @return this (for chained invocation) 883 */ 884 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 885 this.favoredNodes = favoredNodes; 886 return this; 887 } 888 889 public Builder withBloomType(BloomType bloomType) { 890 Preconditions.checkNotNull(bloomType); 891 this.bloomType = bloomType; 892 return this; 893 } 894 895 /** 896 * @param maxKeyCount estimated maximum number of keys we expect to add 897 * @return this (for chained invocation) 898 */ 899 public Builder withMaxKeyCount(long maxKeyCount) { 900 this.maxKeyCount = maxKeyCount; 901 return this; 902 } 903 904 public Builder withFileContext(HFileContext fileContext) { 905 this.fileContext = fileContext; 906 return this; 907 } 908 909 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 910 this.shouldDropCacheBehind = shouldDropCacheBehind; 911 return this; 912 } 913 914 public Builder 915 withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 916 this.compactedFilesSupplier = compactedFilesSupplier; 917 return this; 918 } 919 920 public Builder withFileStoragePolicy(String fileStoragePolicy) { 921 this.fileStoragePolicy = fileStoragePolicy; 922 return this; 923 } 924 925 public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) { 926 this.writerCreationTracker = writerCreationTracker; 927 return this; 928 } 929 930 public Builder withMaxVersions(int maxVersions) { 931 this.maxVersions = maxVersions; 932 return this; 933 } 934 935 public Builder withNewVersionBehavior(boolean newVersionBehavior) { 936 this.newVersionBehavior = newVersionBehavior; 937 return this; 938 } 939 940 public Builder withCellComparator(CellComparator comparator) { 941 this.comparator = comparator; 942 return this; 943 } 944 945 public Builder withIsCompaction(boolean isCompaction) { 946 this.isCompaction = isCompaction; 947 return this; 948 } 949 950 /** 951 * Create a store file writer. Client is responsible for closing file when done. If metadata, 952 * add BEFORE closing using {@link StoreFileWriter#appendMetadata}. 953 */ 954 public StoreFileWriter build() throws IOException { 955 if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) { 956 throw new IllegalArgumentException("Either specify parent directory " + "or file path"); 957 } 958 959 if (dir == null) { 960 dir = liveFilePath.getParent(); 961 } 962 963 if (!fs.exists(dir)) { 964 // Handle permission for non-HDFS filesystem properly 965 // See HBASE-17710 966 HRegionFileSystem.mkdirs(fs, conf, dir); 967 } 968 969 // set block storage policy for temp path 970 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 971 if (null == policyName) { 972 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 973 } 974 CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); 975 976 if (liveFilePath == null) { 977 // The stored file and related blocks will used the directory based StoragePolicy. 978 // Because HDFS DistributedFileSystem does not support create files with storage policy 979 // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files 980 // satisfy the specific storage policy when writing. So as to avoid later data movement. 981 // We don't want to change whole temp dir to 'fileStoragePolicy'. 982 if (!Strings.isNullOrEmpty(fileStoragePolicy)) { 983 dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy); 984 if (!fs.exists(dir)) { 985 HRegionFileSystem.mkdirs(fs, conf, dir); 986 LOG.info( 987 "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy); 988 } 989 CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); 990 } 991 liveFilePath = getUniqueFile(fs, dir); 992 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 993 bloomType = BloomType.NONE; 994 } 995 } 996 997 if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) { 998 historicalFilePath = getUniqueFile(fs, dir); 999 } 1000 1001 // make sure we call this before actually create the writer 1002 // in fact, it is not a big deal to even add an inexistent file to the track, as we will never 1003 // try to delete it and finally we will clean the tracker up after compaction. But if the file 1004 // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file 1005 // and cause problem. 1006 if (writerCreationTracker != null) { 1007 writerCreationTracker.accept(liveFilePath); 1008 if (historicalFilePath != null) { 1009 writerCreationTracker.accept(historicalFilePath); 1010 } 1011 } 1012 return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType, 1013 maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier, 1014 comparator, maxVersions, newVersionBehavior); 1015 } 1016 } 1017}