001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 029 030import java.io.IOException; 031import java.net.InetSocketAddress; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.HashSet; 035import java.util.Set; 036import java.util.UUID; 037import java.util.function.Supplier; 038import java.util.regex.Pattern; 039import java.util.stream.Collectors; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.PrivateCellUtil; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.io.hfile.CacheConfig; 049import org.apache.hadoop.hbase.io.hfile.HFile; 050import org.apache.hadoop.hbase.io.hfile.HFileContext; 051import org.apache.hadoop.hbase.util.BloomContext; 052import org.apache.hadoop.hbase.util.BloomFilterFactory; 053import org.apache.hadoop.hbase.util.BloomFilterUtil; 054import org.apache.hadoop.hbase.util.BloomFilterWriter; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.RowBloomContext; 058import org.apache.hadoop.hbase.util.RowColBloomContext; 059import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 065 066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 067 068/** 069 * A StoreFile writer. Use this to read/write HBase Store Files. It is package 070 * local because it is an implementation detail of the HBase regionserver. 071 */ 072@InterfaceAudience.Private 073public class StoreFileWriter implements CellSink, ShipperListener { 074 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 075 private static final Pattern dash = Pattern.compile("-"); 076 private final BloomFilterWriter generalBloomFilterWriter; 077 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 078 private final BloomType bloomType; 079 private byte[] bloomParam = null; 080 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 081 private long deleteFamilyCnt = 0; 082 private BloomContext bloomContext = null; 083 private BloomContext deleteFamilyBloomContext = null; 084 private final TimeRangeTracker timeRangeTracker; 085 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 086 087 protected HFile.Writer writer; 088 089 /** 090 * Creates an HFile.Writer that also write helpful meta data. 091 * 092 * @param fs file system to write to 093 * @param path file name to create 094 * @param conf user configuration 095 * @param bloomType bloom filter setting 096 * @param maxKeys the expected maximum number of keys to be added. Was used 097 * for Bloom filter size in {@link HFile} format version 1. 098 * @param favoredNodes an array of favored nodes or possibly null 099 * @param fileContext The HFile context 100 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 101 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 102 * @throws IOException problem writing to FS 103 */ 104 private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, 105 BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext, 106 boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier) 107 throws IOException { 108 this.compactedFilesSupplier = compactedFilesSupplier; 109 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 110 // TODO : Change all writers to be specifically created for compaction context 111 writer = HFile.getWriterFactory(conf, cacheConf) 112 .withPath(fs, path) 113 .withFavoredNodes(favoredNodes) 114 .withFileContext(fileContext) 115 .withShouldDropCacheBehind(shouldDropCacheBehind) 116 .create(); 117 118 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite( 119 conf, cacheConf, bloomType, 120 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 121 122 if (generalBloomFilterWriter != null) { 123 this.bloomType = bloomType; 124 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 125 if (LOG.isTraceEnabled()) { 126 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 127 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH? 128 Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam)) 129 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 130 } 131 // init bloom context 132 switch (bloomType) { 133 case ROW: 134 bloomContext = 135 new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 136 break; 137 case ROWCOL: 138 bloomContext = 139 new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 140 break; 141 case ROWPREFIX_FIXED_LENGTH: 142 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, 143 fileContext.getCellComparator(), Bytes.toInt(bloomParam)); 144 break; 145 default: 146 throw new IOException( 147 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 148 } 149 } else { 150 // Not using Bloom filters. 151 this.bloomType = BloomType.NONE; 152 } 153 154 // initialize delete family Bloom filter when there is NO RowCol Bloom 155 // filter 156 if (this.bloomType != BloomType.ROWCOL) { 157 this.deleteFamilyBloomFilterWriter = BloomFilterFactory 158 .createDeleteBloomAtWrite(conf, cacheConf, 159 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 160 deleteFamilyBloomContext = 161 new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); 162 } else { 163 deleteFamilyBloomFilterWriter = null; 164 } 165 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 166 LOG.trace("Delete Family Bloom filter type for " + path + ": " + 167 deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 168 } 169 } 170 171 /** 172 * Writes meta data. 173 * Call before {@link #close()} since its written as meta data to this file. 174 * @param maxSequenceId Maximum sequence id. 175 * @param majorCompaction True if this file is product of a major compaction 176 * @throws IOException problem writing to FS 177 */ 178 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 179 throws IOException { 180 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 181 } 182 183 /** 184 * Writes meta data. 185 * Call before {@link #close()} since its written as meta data to this file. 186 * @param maxSequenceId Maximum sequence id. 187 * @param majorCompaction True if this file is product of a major compaction 188 * @param storeFiles The compacted store files to generate this new file 189 * @throws IOException problem writing to FS 190 */ 191 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 192 final Collection<HStoreFile> storeFiles) throws IOException { 193 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 194 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 195 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 196 appendTrackedTimestampsToMetadata(); 197 } 198 199 /** 200 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted 201 * store files's name is needed. But if the compacted store file is a result of compaction, it's 202 * compacted files which still not archived is needed, too. And don't need to add compacted files 203 * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will 204 * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E 205 * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to 206 * add D's compacted file, as D's compacted files has been in E's compacted files, too. 207 * See HBASE-20724 for more details. 208 * 209 * @param storeFiles The compacted store files to generate this new file 210 * @return bytes of CompactionEventTracker 211 */ 212 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 213 Set<String> notArchivedCompactedStoreFiles = 214 this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName()) 215 .collect(Collectors.toSet()); 216 Set<String> compactedStoreFiles = new HashSet<>(); 217 for (HStoreFile storeFile : storeFiles) { 218 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 219 for (String csf : storeFile.getCompactedStoreFiles()) { 220 if (notArchivedCompactedStoreFiles.contains(csf)) { 221 compactedStoreFiles.add(csf); 222 } 223 } 224 } 225 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 226 } 227 228 /** 229 * Writes meta data. 230 * Call before {@link #close()} since its written as meta data to this file. 231 * @param maxSequenceId Maximum sequence id. 232 * @param majorCompaction True if this file is product of a major compaction 233 * @param mobCellsCount The number of mob cells. 234 * @throws IOException problem writing to FS 235 */ 236 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 237 final long mobCellsCount) throws IOException { 238 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 239 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 240 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 241 appendTrackedTimestampsToMetadata(); 242 } 243 244 /** 245 * Add TimestampRange and earliest put timestamp to Metadata 246 */ 247 public void appendTrackedTimestampsToMetadata() throws IOException { 248 // TODO: The StoreFileReader always converts the byte[] to TimeRange 249 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 250 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 251 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 252 } 253 254 /** 255 * Record the earlest Put timestamp. 256 * 257 * If the timeRangeTracker is not set, 258 * update TimeRangeTracker to include the timestamp of this key 259 */ 260 public void trackTimestamps(final Cell cell) { 261 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 262 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 263 } 264 timeRangeTracker.includeTimestamp(cell); 265 } 266 267 private void appendGeneralBloomfilter(final Cell cell) throws IOException { 268 if (this.generalBloomFilterWriter != null) { 269 /* 270 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png 271 * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 272 * 273 * 3 Types of Filtering: 274 * 1. Row = Row 275 * 2. RowCol = Row + Qualifier 276 * 3. RowPrefixFixedLength = Fixed Length Row Prefix 277 */ 278 bloomContext.writeBloom(cell); 279 } 280 } 281 282 private void appendDeleteFamilyBloomFilter(final Cell cell) 283 throws IOException { 284 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 285 return; 286 } 287 288 // increase the number of delete family in the store file 289 deleteFamilyCnt++; 290 if (this.deleteFamilyBloomFilterWriter != null) { 291 deleteFamilyBloomContext.writeBloom(cell); 292 } 293 } 294 295 @Override 296 public void append(final Cell cell) throws IOException { 297 appendGeneralBloomfilter(cell); 298 appendDeleteFamilyBloomFilter(cell); 299 writer.append(cell); 300 trackTimestamps(cell); 301 } 302 303 @Override 304 public void beforeShipped() throws IOException { 305 // For now these writer will always be of type ShipperListener true. 306 // TODO : Change all writers to be specifically created for compaction context 307 writer.beforeShipped(); 308 if (generalBloomFilterWriter != null) { 309 generalBloomFilterWriter.beforeShipped(); 310 } 311 if (deleteFamilyBloomFilterWriter != null) { 312 deleteFamilyBloomFilterWriter.beforeShipped(); 313 } 314 } 315 316 public Path getPath() { 317 return this.writer.getPath(); 318 } 319 320 public boolean hasGeneralBloom() { 321 return this.generalBloomFilterWriter != null; 322 } 323 324 /** 325 * For unit testing only. 326 * 327 * @return the Bloom filter used by this writer. 328 */ 329 BloomFilterWriter getGeneralBloomWriter() { 330 return generalBloomFilterWriter; 331 } 332 333 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 334 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 335 if (haveBloom) { 336 bfw.compactBloom(); 337 } 338 return haveBloom; 339 } 340 341 private boolean closeGeneralBloomFilter() throws IOException { 342 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 343 344 // add the general Bloom filter writer and append file info 345 if (hasGeneralBloom) { 346 writer.addGeneralBloomFilter(generalBloomFilterWriter); 347 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 348 if (bloomParam != null) { 349 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 350 } 351 bloomContext.addLastBloomKey(writer); 352 } 353 return hasGeneralBloom; 354 } 355 356 private boolean closeDeleteFamilyBloomFilter() throws IOException { 357 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 358 359 // add the delete family Bloom filter writer 360 if (hasDeleteFamilyBloom) { 361 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 362 } 363 364 // append file info about the number of delete family kvs 365 // even if there is no delete family Bloom. 366 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 367 368 return hasDeleteFamilyBloom; 369 } 370 371 public void close() throws IOException { 372 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 373 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 374 375 writer.close(); 376 377 // Log final Bloom filter statistics. This needs to be done after close() 378 // because compound Bloom filters might be finalized as part of closing. 379 if (LOG.isTraceEnabled()) { 380 LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " + 381 (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " + 382 getPath()); 383 } 384 385 } 386 387 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 388 writer.appendFileInfo(key, value); 389 } 390 391 /** For use in testing. 392 */ 393 HFile.Writer getHFileWriter() { 394 return writer; 395 } 396 397 /** 398 * @param dir Directory to create file in. 399 * @return random filename inside passed <code>dir</code> 400 */ 401 static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 402 if (!fs.getFileStatus(dir).isDirectory()) { 403 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 404 } 405 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 406 } 407 408 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 409 justification="Will not overflow") 410 public static class Builder { 411 private final Configuration conf; 412 private final CacheConfig cacheConf; 413 private final FileSystem fs; 414 415 private BloomType bloomType = BloomType.NONE; 416 private long maxKeyCount = 0; 417 private Path dir; 418 private Path filePath; 419 private InetSocketAddress[] favoredNodes; 420 private HFileContext fileContext; 421 private boolean shouldDropCacheBehind; 422 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 423 424 public Builder(Configuration conf, CacheConfig cacheConf, 425 FileSystem fs) { 426 this.conf = conf; 427 this.cacheConf = cacheConf; 428 this.fs = fs; 429 } 430 431 /** 432 * Creates Builder with cache configuration disabled 433 */ 434 public Builder(Configuration conf, FileSystem fs) { 435 this.conf = conf; 436 this.cacheConf = CacheConfig.DISABLED; 437 this.fs = fs; 438 } 439 440 /** 441 * Use either this method or {@link #withFilePath}, but not both. 442 * @param dir Path to column family directory. The directory is created if 443 * does not exist. The file is given a unique name within this 444 * directory. 445 * @return this (for chained invocation) 446 */ 447 public Builder withOutputDir(Path dir) { 448 Preconditions.checkNotNull(dir); 449 this.dir = dir; 450 return this; 451 } 452 453 /** 454 * Use either this method or {@link #withOutputDir}, but not both. 455 * @param filePath the StoreFile path to write 456 * @return this (for chained invocation) 457 */ 458 public Builder withFilePath(Path filePath) { 459 Preconditions.checkNotNull(filePath); 460 this.filePath = filePath; 461 return this; 462 } 463 464 /** 465 * @param favoredNodes an array of favored nodes or possibly null 466 * @return this (for chained invocation) 467 */ 468 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 469 this.favoredNodes = favoredNodes; 470 return this; 471 } 472 473 public Builder withBloomType(BloomType bloomType) { 474 Preconditions.checkNotNull(bloomType); 475 this.bloomType = bloomType; 476 return this; 477 } 478 479 /** 480 * @param maxKeyCount estimated maximum number of keys we expect to add 481 * @return this (for chained invocation) 482 */ 483 public Builder withMaxKeyCount(long maxKeyCount) { 484 this.maxKeyCount = maxKeyCount; 485 return this; 486 } 487 488 public Builder withFileContext(HFileContext fileContext) { 489 this.fileContext = fileContext; 490 return this; 491 } 492 493 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 494 this.shouldDropCacheBehind = shouldDropCacheBehind; 495 return this; 496 } 497 498 public Builder withCompactedFilesSupplier( 499 Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 500 this.compactedFilesSupplier = compactedFilesSupplier; 501 return this; 502 } 503 504 /** 505 * Create a store file writer. Client is responsible for closing file when 506 * done. If metadata, add BEFORE closing using 507 * {@link StoreFileWriter#appendMetadata}. 508 */ 509 public StoreFileWriter build() throws IOException { 510 if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { 511 throw new IllegalArgumentException("Either specify parent directory " + 512 "or file path"); 513 } 514 515 if (dir == null) { 516 dir = filePath.getParent(); 517 } 518 519 if (!fs.exists(dir)) { 520 // Handle permission for non-HDFS filesystem properly 521 // See HBASE-17710 522 HRegionFileSystem.mkdirs(fs, conf, dir); 523 } 524 525 // set block storage policy for temp path 526 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 527 if (null == policyName) { 528 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 529 } 530 CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); 531 532 if (filePath == null) { 533 filePath = getUniqueFile(fs, dir); 534 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 535 bloomType = BloomType.NONE; 536 } 537 } 538 539 return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, 540 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 541 } 542 } 543}