001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 029 030import java.io.IOException; 031import java.net.InetSocketAddress; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.HashSet; 035import java.util.Set; 036import java.util.UUID; 037import java.util.function.Supplier; 038import java.util.regex.Pattern; 039import java.util.stream.Collectors; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.PrivateCellUtil; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.io.hfile.CacheConfig; 049import org.apache.hadoop.hbase.io.hfile.HFile; 050import org.apache.hadoop.hbase.io.hfile.HFileContext; 051import org.apache.hadoop.hbase.util.BloomContext; 052import org.apache.hadoop.hbase.util.BloomFilterFactory; 053import org.apache.hadoop.hbase.util.BloomFilterUtil; 054import org.apache.hadoop.hbase.util.BloomFilterWriter; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.RowBloomContext; 058import org.apache.hadoop.hbase.util.RowColBloomContext; 059import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 065import org.apache.hbase.thirdparty.com.google.common.base.Strings; 066import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 067 068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 069 070/** 071 * A StoreFile writer. Use this to read/write HBase Store Files. It is package 072 * local because it is an implementation detail of the HBase regionserver. 073 */ 074@InterfaceAudience.Private 075public class StoreFileWriter implements CellSink, ShipperListener { 076 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 077 private static final Pattern dash = Pattern.compile("-"); 078 private final BloomFilterWriter generalBloomFilterWriter; 079 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 080 private final BloomType bloomType; 081 private byte[] bloomParam = null; 082 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 083 private long deleteFamilyCnt = 0; 084 private BloomContext bloomContext = null; 085 private BloomContext deleteFamilyBloomContext = null; 086 private final TimeRangeTracker timeRangeTracker; 087 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 088 089 protected HFile.Writer writer; 090 091 /** 092 * Creates an HFile.Writer that also write helpful meta data. 093 * 094 * @param fs file system to write to 095 * @param path file name to create 096 * @param conf user configuration 097 * @param bloomType bloom filter setting 098 * @param maxKeys the expected maximum number of keys to be added. Was used 099 * for Bloom filter size in {@link HFile} format version 1. 100 * @param favoredNodes an array of favored nodes or possibly null 101 * @param fileContext The HFile context 102 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 103 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 104 * @throws IOException problem writing to FS 105 */ 106 private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, 107 BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext, 108 boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier) 109 throws IOException { 110 this.compactedFilesSupplier = compactedFilesSupplier; 111 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 112 // TODO : Change all writers to be specifically created for compaction context 113 writer = HFile.getWriterFactory(conf, cacheConf) 114 .withPath(fs, path) 115 .withFavoredNodes(favoredNodes) 116 .withFileContext(fileContext) 117 .withShouldDropCacheBehind(shouldDropCacheBehind) 118 .create(); 119 120 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite( 121 conf, cacheConf, bloomType, 122 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 123 124 if (generalBloomFilterWriter != null) { 125 this.bloomType = bloomType; 126 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 127 if (LOG.isTraceEnabled()) { 128 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 129 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH? 130 Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam)) 131 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 132 } 133 // init bloom context 134 switch (bloomType) { 135 case ROW: 136 bloomContext = 137 new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 138 break; 139 case ROWCOL: 140 bloomContext = 141 new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 142 break; 143 case ROWPREFIX_FIXED_LENGTH: 144 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, 145 fileContext.getCellComparator(), Bytes.toInt(bloomParam)); 146 break; 147 default: 148 throw new IOException( 149 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 150 } 151 } else { 152 // Not using Bloom filters. 153 this.bloomType = BloomType.NONE; 154 } 155 156 // initialize delete family Bloom filter when there is NO RowCol Bloom 157 // filter 158 if (this.bloomType != BloomType.ROWCOL) { 159 this.deleteFamilyBloomFilterWriter = BloomFilterFactory 160 .createDeleteBloomAtWrite(conf, cacheConf, 161 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 162 deleteFamilyBloomContext = 163 new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); 164 } else { 165 deleteFamilyBloomFilterWriter = null; 166 } 167 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 168 LOG.trace("Delete Family Bloom filter type for " + path + ": " + 169 deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 170 } 171 } 172 173 /** 174 * Writes meta data. 175 * Call before {@link #close()} since its written as meta data to this file. 176 * @param maxSequenceId Maximum sequence id. 177 * @param majorCompaction True if this file is product of a major compaction 178 * @throws IOException problem writing to FS 179 */ 180 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 181 throws IOException { 182 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 183 } 184 185 /** 186 * Writes meta data. 187 * Call before {@link #close()} since its written as meta data to this file. 188 * @param maxSequenceId Maximum sequence id. 189 * @param majorCompaction True if this file is product of a major compaction 190 * @param storeFiles The compacted store files to generate this new file 191 * @throws IOException problem writing to FS 192 */ 193 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 194 final Collection<HStoreFile> storeFiles) throws IOException { 195 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 196 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 197 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 198 appendTrackedTimestampsToMetadata(); 199 } 200 201 /** 202 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted 203 * store files's name is needed. But if the compacted store file is a result of compaction, it's 204 * compacted files which still not archived is needed, too. And don't need to add compacted files 205 * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will 206 * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E 207 * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to 208 * add D's compacted file, as D's compacted files has been in E's compacted files, too. 209 * See HBASE-20724 for more details. 210 * 211 * @param storeFiles The compacted store files to generate this new file 212 * @return bytes of CompactionEventTracker 213 */ 214 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 215 Set<String> notArchivedCompactedStoreFiles = 216 this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName()) 217 .collect(Collectors.toSet()); 218 Set<String> compactedStoreFiles = new HashSet<>(); 219 for (HStoreFile storeFile : storeFiles) { 220 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 221 for (String csf : storeFile.getCompactedStoreFiles()) { 222 if (notArchivedCompactedStoreFiles.contains(csf)) { 223 compactedStoreFiles.add(csf); 224 } 225 } 226 } 227 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 228 } 229 230 /** 231 * Writes meta data. 232 * Call before {@link #close()} since its written as meta data to this file. 233 * @param maxSequenceId Maximum sequence id. 234 * @param majorCompaction True if this file is product of a major compaction 235 * @param mobCellsCount The number of mob cells. 236 * @throws IOException problem writing to FS 237 */ 238 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 239 final long mobCellsCount) throws IOException { 240 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 241 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 242 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 243 appendTrackedTimestampsToMetadata(); 244 } 245 246 /** 247 * Add TimestampRange and earliest put timestamp to Metadata 248 */ 249 public void appendTrackedTimestampsToMetadata() throws IOException { 250 // TODO: The StoreFileReader always converts the byte[] to TimeRange 251 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 252 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 253 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 254 } 255 256 /** 257 * Record the earlest Put timestamp. 258 * 259 * If the timeRangeTracker is not set, 260 * update TimeRangeTracker to include the timestamp of this key 261 */ 262 public void trackTimestamps(final Cell cell) { 263 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 264 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 265 } 266 timeRangeTracker.includeTimestamp(cell); 267 } 268 269 private void appendGeneralBloomfilter(final Cell cell) throws IOException { 270 if (this.generalBloomFilterWriter != null) { 271 /* 272 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png 273 * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 274 * 275 * 3 Types of Filtering: 276 * 1. Row = Row 277 * 2. RowCol = Row + Qualifier 278 * 3. RowPrefixFixedLength = Fixed Length Row Prefix 279 */ 280 bloomContext.writeBloom(cell); 281 } 282 } 283 284 private void appendDeleteFamilyBloomFilter(final Cell cell) 285 throws IOException { 286 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 287 return; 288 } 289 290 // increase the number of delete family in the store file 291 deleteFamilyCnt++; 292 if (this.deleteFamilyBloomFilterWriter != null) { 293 deleteFamilyBloomContext.writeBloom(cell); 294 } 295 } 296 297 @Override 298 public void append(final Cell cell) throws IOException { 299 appendGeneralBloomfilter(cell); 300 appendDeleteFamilyBloomFilter(cell); 301 writer.append(cell); 302 trackTimestamps(cell); 303 } 304 305 @Override 306 public void beforeShipped() throws IOException { 307 // For now these writer will always be of type ShipperListener true. 308 // TODO : Change all writers to be specifically created for compaction context 309 writer.beforeShipped(); 310 if (generalBloomFilterWriter != null) { 311 generalBloomFilterWriter.beforeShipped(); 312 } 313 if (deleteFamilyBloomFilterWriter != null) { 314 deleteFamilyBloomFilterWriter.beforeShipped(); 315 } 316 } 317 318 public Path getPath() { 319 return this.writer.getPath(); 320 } 321 322 public boolean hasGeneralBloom() { 323 return this.generalBloomFilterWriter != null; 324 } 325 326 /** 327 * For unit testing only. 328 * 329 * @return the Bloom filter used by this writer. 330 */ 331 BloomFilterWriter getGeneralBloomWriter() { 332 return generalBloomFilterWriter; 333 } 334 335 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 336 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 337 if (haveBloom) { 338 bfw.compactBloom(); 339 } 340 return haveBloom; 341 } 342 343 private boolean closeGeneralBloomFilter() throws IOException { 344 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 345 346 // add the general Bloom filter writer and append file info 347 if (hasGeneralBloom) { 348 writer.addGeneralBloomFilter(generalBloomFilterWriter); 349 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 350 if (bloomParam != null) { 351 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 352 } 353 bloomContext.addLastBloomKey(writer); 354 } 355 return hasGeneralBloom; 356 } 357 358 private boolean closeDeleteFamilyBloomFilter() throws IOException { 359 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 360 361 // add the delete family Bloom filter writer 362 if (hasDeleteFamilyBloom) { 363 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 364 } 365 366 // append file info about the number of delete family kvs 367 // even if there is no delete family Bloom. 368 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 369 370 return hasDeleteFamilyBloom; 371 } 372 373 public void close() throws IOException { 374 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 375 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 376 377 writer.close(); 378 379 // Log final Bloom filter statistics. This needs to be done after close() 380 // because compound Bloom filters might be finalized as part of closing. 381 if (LOG.isTraceEnabled()) { 382 LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " + 383 (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " + 384 getPath()); 385 } 386 387 } 388 389 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 390 writer.appendFileInfo(key, value); 391 } 392 393 /** For use in testing. 394 */ 395 HFile.Writer getHFileWriter() { 396 return writer; 397 } 398 399 /** 400 * @param dir Directory to create file in. 401 * @return random filename inside passed <code>dir</code> 402 */ 403 static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 404 if (!fs.getFileStatus(dir).isDirectory()) { 405 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 406 } 407 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 408 } 409 410 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 411 justification="Will not overflow") 412 public static class Builder { 413 private final Configuration conf; 414 private final CacheConfig cacheConf; 415 private final FileSystem fs; 416 417 private BloomType bloomType = BloomType.NONE; 418 private long maxKeyCount = 0; 419 private Path dir; 420 private Path filePath; 421 private InetSocketAddress[] favoredNodes; 422 private HFileContext fileContext; 423 private boolean shouldDropCacheBehind; 424 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 425 private String fileStoragePolicy; 426 427 public Builder(Configuration conf, CacheConfig cacheConf, 428 FileSystem fs) { 429 this.conf = conf; 430 this.cacheConf = cacheConf; 431 this.fs = fs; 432 } 433 434 /** 435 * Creates Builder with cache configuration disabled 436 */ 437 public Builder(Configuration conf, FileSystem fs) { 438 this.conf = conf; 439 this.cacheConf = CacheConfig.DISABLED; 440 this.fs = fs; 441 } 442 443 /** 444 * Use either this method or {@link #withFilePath}, but not both. 445 * @param dir Path to column family directory. The directory is created if 446 * does not exist. The file is given a unique name within this 447 * directory. 448 * @return this (for chained invocation) 449 */ 450 public Builder withOutputDir(Path dir) { 451 Preconditions.checkNotNull(dir); 452 this.dir = dir; 453 return this; 454 } 455 456 /** 457 * Use either this method or {@link #withOutputDir}, but not both. 458 * @param filePath the StoreFile path to write 459 * @return this (for chained invocation) 460 */ 461 public Builder withFilePath(Path filePath) { 462 Preconditions.checkNotNull(filePath); 463 this.filePath = filePath; 464 return this; 465 } 466 467 /** 468 * @param favoredNodes an array of favored nodes or possibly null 469 * @return this (for chained invocation) 470 */ 471 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 472 this.favoredNodes = favoredNodes; 473 return this; 474 } 475 476 public Builder withBloomType(BloomType bloomType) { 477 Preconditions.checkNotNull(bloomType); 478 this.bloomType = bloomType; 479 return this; 480 } 481 482 /** 483 * @param maxKeyCount estimated maximum number of keys we expect to add 484 * @return this (for chained invocation) 485 */ 486 public Builder withMaxKeyCount(long maxKeyCount) { 487 this.maxKeyCount = maxKeyCount; 488 return this; 489 } 490 491 public Builder withFileContext(HFileContext fileContext) { 492 this.fileContext = fileContext; 493 return this; 494 } 495 496 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 497 this.shouldDropCacheBehind = shouldDropCacheBehind; 498 return this; 499 } 500 501 public Builder withCompactedFilesSupplier( 502 Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 503 this.compactedFilesSupplier = compactedFilesSupplier; 504 return this; 505 } 506 507 public Builder withFileStoragePolicy(String fileStoragePolicy) { 508 this.fileStoragePolicy = fileStoragePolicy; 509 return this; 510 } 511 512 /** 513 * Create a store file writer. Client is responsible for closing file when 514 * done. If metadata, add BEFORE closing using 515 * {@link StoreFileWriter#appendMetadata}. 516 */ 517 public StoreFileWriter build() throws IOException { 518 if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { 519 throw new IllegalArgumentException("Either specify parent directory " + 520 "or file path"); 521 } 522 523 if (dir == null) { 524 dir = filePath.getParent(); 525 } 526 527 if (!fs.exists(dir)) { 528 // Handle permission for non-HDFS filesystem properly 529 // See HBASE-17710 530 HRegionFileSystem.mkdirs(fs, conf, dir); 531 } 532 533 // set block storage policy for temp path 534 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 535 if (null == policyName) { 536 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 537 } 538 CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); 539 540 if (filePath == null) { 541 // The stored file and related blocks will used the directory based StoragePolicy. 542 // Because HDFS DistributedFileSystem does not support create files with storage policy 543 // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files 544 // satisfy the specific storage policy when writing. So as to avoid later data movement. 545 // We don't want to change whole temp dir to 'fileStoragePolicy'. 546 if (!Strings.isNullOrEmpty(fileStoragePolicy)) { 547 dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy); 548 if (!fs.exists(dir)) { 549 HRegionFileSystem.mkdirs(fs, conf, dir); 550 LOG.info( 551 "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy); 552 } 553 CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); 554 } 555 filePath = getUniqueFile(fs, dir); 556 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 557 bloomType = BloomType.NONE; 558 } 559 } 560 561 return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, 562 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 563 } 564 } 565}