001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 029 030import java.io.IOException; 031import java.net.InetSocketAddress; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.HashSet; 035import java.util.Set; 036import java.util.UUID; 037import java.util.function.Supplier; 038import java.util.regex.Pattern; 039import java.util.stream.Collectors; 040 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellComparator; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.PrivateCellUtil; 048import org.apache.hadoop.hbase.KeyValue; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.io.hfile.CacheConfig; 051import org.apache.hadoop.hbase.io.hfile.HFile; 052import org.apache.hadoop.hbase.io.hfile.HFileContext; 053import org.apache.hadoop.hbase.util.BloomContext; 054import org.apache.hadoop.hbase.util.BloomFilterFactory; 055import org.apache.hadoop.hbase.util.BloomFilterUtil; 056import org.apache.hadoop.hbase.util.BloomFilterWriter; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.FSUtils; 059import org.apache.hadoop.hbase.util.RowBloomContext; 060import org.apache.hadoop.hbase.util.RowColBloomContext; 061import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 067 068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 069 070/** 071 * A StoreFile writer. Use this to read/write HBase Store Files. It is package 072 * local because it is an implementation detail of the HBase regionserver. 073 */ 074@InterfaceAudience.Private 075public class StoreFileWriter implements CellSink, ShipperListener { 076 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 077 private static final Pattern dash = Pattern.compile("-"); 078 private final BloomFilterWriter generalBloomFilterWriter; 079 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 080 private final BloomType bloomType; 081 private byte[] bloomParam = null; 082 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 083 private long deleteFamilyCnt = 0; 084 private BloomContext bloomContext = null; 085 private BloomContext deleteFamilyBloomContext = null; 086 private final TimeRangeTracker timeRangeTracker; 087 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 088 089 protected HFile.Writer writer; 090 091 /** 092 * Creates an HFile.Writer that also write helpful meta data. 093 * 094 * @param fs file system to write to 095 * @param path file name to create 096 * @param conf user configuration 097 * @param comparator key comparator 098 * @param bloomType bloom filter setting 099 * @param maxKeys the expected maximum number of keys to be added. Was used 100 * for Bloom filter size in {@link HFile} format version 1. 101 * @param favoredNodes an array of favored nodes or possibly null 102 * @param fileContext The HFile context 103 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 104 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 105 * @throws IOException problem writing to FS 106 */ 107 private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, 108 final CellComparator comparator, BloomType bloomType, long maxKeys, 109 InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind, 110 Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException { 111 this.compactedFilesSupplier = compactedFilesSupplier; 112 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 113 // TODO : Change all writers to be specifically created for compaction context 114 writer = HFile.getWriterFactory(conf, cacheConf) 115 .withPath(fs, path) 116 .withComparator(comparator) 117 .withFavoredNodes(favoredNodes) 118 .withFileContext(fileContext) 119 .withShouldDropCacheBehind(shouldDropCacheBehind) 120 .create(); 121 122 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite( 123 conf, cacheConf, bloomType, 124 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 125 126 if (generalBloomFilterWriter != null) { 127 this.bloomType = bloomType; 128 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 129 if (LOG.isTraceEnabled()) { 130 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 131 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH? 132 Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam)) 133 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 134 } 135 // init bloom context 136 switch (bloomType) { 137 case ROW: 138 bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator); 139 break; 140 case ROWCOL: 141 bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator); 142 break; 143 case ROWPREFIX_FIXED_LENGTH: 144 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, comparator, 145 Bytes.toInt(bloomParam)); 146 break; 147 default: 148 throw new IOException( 149 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 150 } 151 } else { 152 // Not using Bloom filters. 153 this.bloomType = BloomType.NONE; 154 } 155 156 // initialize delete family Bloom filter when there is NO RowCol Bloom 157 // filter 158 if (this.bloomType != BloomType.ROWCOL) { 159 this.deleteFamilyBloomFilterWriter = BloomFilterFactory 160 .createDeleteBloomAtWrite(conf, cacheConf, 161 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 162 deleteFamilyBloomContext = new RowBloomContext(deleteFamilyBloomFilterWriter, comparator); 163 } else { 164 deleteFamilyBloomFilterWriter = null; 165 } 166 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 167 LOG.trace("Delete Family Bloom filter type for " + path + ": " + 168 deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 169 } 170 } 171 172 /** 173 * Writes meta data. 174 * Call before {@link #close()} since its written as meta data to this file. 175 * @param maxSequenceId Maximum sequence id. 176 * @param majorCompaction True if this file is product of a major compaction 177 * @throws IOException problem writing to FS 178 */ 179 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 180 throws IOException { 181 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 182 } 183 184 /** 185 * Writes meta data. 186 * Call before {@link #close()} since its written as meta data to this file. 187 * @param maxSequenceId Maximum sequence id. 188 * @param majorCompaction True if this file is product of a major compaction 189 * @param storeFiles The compacted store files to generate this new file 190 * @throws IOException problem writing to FS 191 */ 192 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 193 final Collection<HStoreFile> storeFiles) throws IOException { 194 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 195 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 196 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 197 appendTrackedTimestampsToMetadata(); 198 } 199 200 /** 201 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted 202 * store files's name is needed. But if the compacted store file is a result of compaction, it's 203 * compacted files which still not archived is needed, too. And don't need to add compacted files 204 * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will 205 * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E 206 * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to 207 * add D's compacted file, as D's compacted files has been in E's compacted files, too. 208 * See HBASE-20724 for more details. 209 * 210 * @param storeFiles The compacted store files to generate this new file 211 * @return bytes of CompactionEventTracker 212 */ 213 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 214 Set<String> notArchivedCompactedStoreFiles = 215 this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName()) 216 .collect(Collectors.toSet()); 217 Set<String> compactedStoreFiles = new HashSet<>(); 218 for (HStoreFile storeFile : storeFiles) { 219 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 220 for (String csf : storeFile.getCompactedStoreFiles()) { 221 if (notArchivedCompactedStoreFiles.contains(csf)) { 222 compactedStoreFiles.add(csf); 223 } 224 } 225 } 226 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 227 } 228 229 /** 230 * Writes meta data. 231 * Call before {@link #close()} since its written as meta data to this file. 232 * @param maxSequenceId Maximum sequence id. 233 * @param majorCompaction True if this file is product of a major compaction 234 * @param mobCellsCount The number of mob cells. 235 * @throws IOException problem writing to FS 236 */ 237 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 238 final long mobCellsCount) throws IOException { 239 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 240 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 241 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 242 appendTrackedTimestampsToMetadata(); 243 } 244 245 /** 246 * Add TimestampRange and earliest put timestamp to Metadata 247 */ 248 public void appendTrackedTimestampsToMetadata() throws IOException { 249 // TODO: The StoreFileReader always converts the byte[] to TimeRange 250 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 251 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 252 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 253 } 254 255 /** 256 * Record the earlest Put timestamp. 257 * 258 * If the timeRangeTracker is not set, 259 * update TimeRangeTracker to include the timestamp of this key 260 */ 261 public void trackTimestamps(final Cell cell) { 262 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 263 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 264 } 265 timeRangeTracker.includeTimestamp(cell); 266 } 267 268 private void appendGeneralBloomfilter(final Cell cell) throws IOException { 269 if (this.generalBloomFilterWriter != null) { 270 /* 271 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png 272 * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 273 * 274 * 3 Types of Filtering: 275 * 1. Row = Row 276 * 2. RowCol = Row + Qualifier 277 * 3. RowPrefixFixedLength = Fixed Length Row Prefix 278 */ 279 bloomContext.writeBloom(cell); 280 } 281 } 282 283 private void appendDeleteFamilyBloomFilter(final Cell cell) 284 throws IOException { 285 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 286 return; 287 } 288 289 // increase the number of delete family in the store file 290 deleteFamilyCnt++; 291 if (this.deleteFamilyBloomFilterWriter != null) { 292 deleteFamilyBloomContext.writeBloom(cell); 293 } 294 } 295 296 @Override 297 public void append(final Cell cell) throws IOException { 298 appendGeneralBloomfilter(cell); 299 appendDeleteFamilyBloomFilter(cell); 300 writer.append(cell); 301 trackTimestamps(cell); 302 } 303 304 @Override 305 public void beforeShipped() throws IOException { 306 // For now these writer will always be of type ShipperListener true. 307 // TODO : Change all writers to be specifically created for compaction context 308 writer.beforeShipped(); 309 if (generalBloomFilterWriter != null) { 310 generalBloomFilterWriter.beforeShipped(); 311 } 312 if (deleteFamilyBloomFilterWriter != null) { 313 deleteFamilyBloomFilterWriter.beforeShipped(); 314 } 315 } 316 317 public Path getPath() { 318 return this.writer.getPath(); 319 } 320 321 public boolean hasGeneralBloom() { 322 return this.generalBloomFilterWriter != null; 323 } 324 325 /** 326 * For unit testing only. 327 * 328 * @return the Bloom filter used by this writer. 329 */ 330 BloomFilterWriter getGeneralBloomWriter() { 331 return generalBloomFilterWriter; 332 } 333 334 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 335 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 336 if (haveBloom) { 337 bfw.compactBloom(); 338 } 339 return haveBloom; 340 } 341 342 private boolean closeGeneralBloomFilter() throws IOException { 343 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 344 345 // add the general Bloom filter writer and append file info 346 if (hasGeneralBloom) { 347 writer.addGeneralBloomFilter(generalBloomFilterWriter); 348 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 349 if (bloomParam != null) { 350 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 351 } 352 bloomContext.addLastBloomKey(writer); 353 } 354 return hasGeneralBloom; 355 } 356 357 private boolean closeDeleteFamilyBloomFilter() throws IOException { 358 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 359 360 // add the delete family Bloom filter writer 361 if (hasDeleteFamilyBloom) { 362 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 363 } 364 365 // append file info about the number of delete family kvs 366 // even if there is no delete family Bloom. 367 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 368 369 return hasDeleteFamilyBloom; 370 } 371 372 public void close() throws IOException { 373 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 374 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 375 376 writer.close(); 377 378 // Log final Bloom filter statistics. This needs to be done after close() 379 // because compound Bloom filters might be finalized as part of closing. 380 if (LOG.isTraceEnabled()) { 381 LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " + 382 (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " + 383 getPath()); 384 } 385 386 } 387 388 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 389 writer.appendFileInfo(key, value); 390 } 391 392 /** For use in testing. 393 */ 394 HFile.Writer getHFileWriter() { 395 return writer; 396 } 397 398 /** 399 * @param fs 400 * @param dir Directory to create file in. 401 * @return random filename inside passed <code>dir</code> 402 */ 403 static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 404 if (!fs.getFileStatus(dir).isDirectory()) { 405 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 406 } 407 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 408 } 409 410 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 411 justification="Will not overflow") 412 public static class Builder { 413 private final Configuration conf; 414 private final CacheConfig cacheConf; 415 private final FileSystem fs; 416 417 private CellComparator comparator = CellComparator.getInstance(); 418 private BloomType bloomType = BloomType.NONE; 419 private long maxKeyCount = 0; 420 private Path dir; 421 private Path filePath; 422 private InetSocketAddress[] favoredNodes; 423 private HFileContext fileContext; 424 private boolean shouldDropCacheBehind; 425 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 426 427 public Builder(Configuration conf, CacheConfig cacheConf, 428 FileSystem fs) { 429 this.conf = conf; 430 this.cacheConf = cacheConf; 431 this.fs = fs; 432 } 433 434 /** 435 * Creates Builder with cache configuration disabled 436 */ 437 public Builder(Configuration conf, FileSystem fs) { 438 this.conf = conf; 439 this.cacheConf = CacheConfig.DISABLED; 440 this.fs = fs; 441 } 442 443 /** 444 * Use either this method or {@link #withFilePath}, but not both. 445 * @param dir Path to column family directory. The directory is created if 446 * does not exist. The file is given a unique name within this 447 * directory. 448 * @return this (for chained invocation) 449 */ 450 public Builder withOutputDir(Path dir) { 451 Preconditions.checkNotNull(dir); 452 this.dir = dir; 453 return this; 454 } 455 456 /** 457 * Use either this method or {@link #withOutputDir}, but not both. 458 * @param filePath the StoreFile path to write 459 * @return this (for chained invocation) 460 */ 461 public Builder withFilePath(Path filePath) { 462 Preconditions.checkNotNull(filePath); 463 this.filePath = filePath; 464 return this; 465 } 466 467 /** 468 * @param favoredNodes an array of favored nodes or possibly null 469 * @return this (for chained invocation) 470 */ 471 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 472 this.favoredNodes = favoredNodes; 473 return this; 474 } 475 476 public Builder withComparator(CellComparator comparator) { 477 Preconditions.checkNotNull(comparator); 478 this.comparator = comparator; 479 return this; 480 } 481 482 public Builder withBloomType(BloomType bloomType) { 483 Preconditions.checkNotNull(bloomType); 484 this.bloomType = bloomType; 485 return this; 486 } 487 488 /** 489 * @param maxKeyCount estimated maximum number of keys we expect to add 490 * @return this (for chained invocation) 491 */ 492 public Builder withMaxKeyCount(long maxKeyCount) { 493 this.maxKeyCount = maxKeyCount; 494 return this; 495 } 496 497 public Builder withFileContext(HFileContext fileContext) { 498 this.fileContext = fileContext; 499 return this; 500 } 501 502 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 503 this.shouldDropCacheBehind = shouldDropCacheBehind; 504 return this; 505 } 506 507 public Builder withCompactedFilesSupplier( 508 Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 509 this.compactedFilesSupplier = compactedFilesSupplier; 510 return this; 511 } 512 513 /** 514 * Create a store file writer. Client is responsible for closing file when 515 * done. If metadata, add BEFORE closing using 516 * {@link StoreFileWriter#appendMetadata}. 517 */ 518 public StoreFileWriter build() throws IOException { 519 if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { 520 throw new IllegalArgumentException("Either specify parent directory " + 521 "or file path"); 522 } 523 524 if (dir == null) { 525 dir = filePath.getParent(); 526 } 527 528 if (!fs.exists(dir)) { 529 // Handle permission for non-HDFS filesystem properly 530 // See HBASE-17710 531 HRegionFileSystem.mkdirs(fs, conf, dir); 532 } 533 534 // set block storage policy for temp path 535 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 536 if (null == policyName) { 537 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 538 } 539 FSUtils.setStoragePolicy(this.fs, dir, policyName); 540 541 if (filePath == null) { 542 filePath = getUniqueFile(fs, dir); 543 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 544 bloomType = BloomType.NONE; 545 } 546 } 547 548 if (comparator == null) { 549 comparator = CellComparator.getInstance(); 550 } 551 552 return new StoreFileWriter(fs, filePath, conf, cacheConf, comparator, bloomType, maxKeyCount, 553 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 554 } 555 } 556}