001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 027 028import java.io.IOException; 029import java.net.InetSocketAddress; 030import java.util.UUID; 031import java.util.regex.Pattern; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellComparator; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.io.hfile.CacheConfig; 043import org.apache.hadoop.hbase.io.hfile.HFile; 044import org.apache.hadoop.hbase.io.hfile.HFileContext; 045import org.apache.hadoop.hbase.util.BloomContext; 046import org.apache.hadoop.hbase.util.BloomFilterFactory; 047import org.apache.hadoop.hbase.util.BloomFilterWriter; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.FSUtils; 050import org.apache.hadoop.hbase.util.RowBloomContext; 051import org.apache.hadoop.hbase.util.RowColBloomContext; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 056 057/** 058 * A StoreFile writer. Use this to read/write HBase Store Files. It is package 059 * local because it is an implementation detail of the HBase regionserver. 060 */ 061@InterfaceAudience.Private 062public class StoreFileWriter implements CellSink, ShipperListener { 063 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 064 private static final Pattern dash = Pattern.compile("-"); 065 private final BloomFilterWriter generalBloomFilterWriter; 066 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 067 private final BloomType bloomType; 068 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 069 private long deleteFamilyCnt = 0; 070 private BloomContext bloomContext = null; 071 private BloomContext deleteFamilyBloomContext = null; 072 private final TimeRangeTracker timeRangeTracker; 073 074 protected HFile.Writer writer; 075 076 /** 077 * Creates an HFile.Writer that also write helpful meta data. 078 * @param fs file system to write to 079 * @param path file name to create 080 * @param conf user configuration 081 * @param comparator key comparator 082 * @param bloomType bloom filter setting 083 * @param maxKeys the expected maximum number of keys to be added. Was used 084 * for Bloom filter size in {@link HFile} format version 1. 085 * @param favoredNodes 086 * @param fileContext - The HFile context 087 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 088 * @throws IOException problem writing to FS 089 */ 090 private StoreFileWriter(FileSystem fs, Path path, 091 final Configuration conf, 092 CacheConfig cacheConf, 093 final CellComparator comparator, BloomType bloomType, long maxKeys, 094 InetSocketAddress[] favoredNodes, HFileContext fileContext, 095 boolean shouldDropCacheBehind) 096 throws IOException { 097 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 098 // TODO : Change all writers to be specifically created for compaction context 099 writer = HFile.getWriterFactory(conf, cacheConf) 100 .withPath(fs, path) 101 .withComparator(comparator) 102 .withFavoredNodes(favoredNodes) 103 .withFileContext(fileContext) 104 .withShouldDropCacheBehind(shouldDropCacheBehind) 105 .create(); 106 107 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite( 108 conf, cacheConf, bloomType, 109 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 110 111 if (generalBloomFilterWriter != null) { 112 this.bloomType = bloomType; 113 if (LOG.isTraceEnabled()) { 114 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", " + 115 generalBloomFilterWriter.getClass().getSimpleName()); 116 } 117 // init bloom context 118 switch (bloomType) { 119 case ROW: 120 bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator); 121 break; 122 case ROWCOL: 123 bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator); 124 break; 125 default: 126 throw new IOException( 127 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL expected)"); 128 } 129 } else { 130 // Not using Bloom filters. 131 this.bloomType = BloomType.NONE; 132 } 133 134 // initialize delete family Bloom filter when there is NO RowCol Bloom 135 // filter 136 if (this.bloomType != BloomType.ROWCOL) { 137 this.deleteFamilyBloomFilterWriter = BloomFilterFactory 138 .createDeleteBloomAtWrite(conf, cacheConf, 139 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 140 deleteFamilyBloomContext = new RowBloomContext(deleteFamilyBloomFilterWriter, comparator); 141 } else { 142 deleteFamilyBloomFilterWriter = null; 143 } 144 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 145 LOG.trace("Delete Family Bloom filter type for " + path + ": " + 146 deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 147 } 148 } 149 150 /** 151 * Writes meta data. 152 * Call before {@link #close()} since its written as meta data to this file. 153 * @param maxSequenceId Maximum sequence id. 154 * @param majorCompaction True if this file is product of a major compaction 155 * @throws IOException problem writing to FS 156 */ 157 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 158 throws IOException { 159 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 160 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 161 appendTrackedTimestampsToMetadata(); 162 } 163 164 /** 165 * Writes meta data. 166 * Call before {@link #close()} since its written as meta data to this file. 167 * @param maxSequenceId Maximum sequence id. 168 * @param majorCompaction True if this file is product of a major compaction 169 * @param mobCellsCount The number of mob cells. 170 * @throws IOException problem writing to FS 171 */ 172 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 173 final long mobCellsCount) throws IOException { 174 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 175 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 176 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 177 appendTrackedTimestampsToMetadata(); 178 } 179 180 /** 181 * Add TimestampRange and earliest put timestamp to Metadata 182 */ 183 public void appendTrackedTimestampsToMetadata() throws IOException { 184 // TODO: The StoreFileReader always converts the byte[] to TimeRange 185 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 186 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 187 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 188 } 189 190 /** 191 * Record the earlest Put timestamp. 192 * 193 * If the timeRangeTracker is not set, 194 * update TimeRangeTracker to include the timestamp of this key 195 */ 196 public void trackTimestamps(final Cell cell) { 197 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 198 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 199 } 200 timeRangeTracker.includeTimestamp(cell); 201 } 202 203 private void appendGeneralBloomfilter(final Cell cell) throws IOException { 204 if (this.generalBloomFilterWriter != null) { 205 /* 206 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png 207 * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 208 * 209 * 2 Types of Filtering: 210 * 1. Row = Row 211 * 2. RowCol = Row + Qualifier 212 */ 213 bloomContext.writeBloom(cell); 214 } 215 } 216 217 private void appendDeleteFamilyBloomFilter(final Cell cell) 218 throws IOException { 219 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 220 return; 221 } 222 223 // increase the number of delete family in the store file 224 deleteFamilyCnt++; 225 if (this.deleteFamilyBloomFilterWriter != null) { 226 deleteFamilyBloomContext.writeBloom(cell); 227 } 228 } 229 230 @Override 231 public void append(final Cell cell) throws IOException { 232 appendGeneralBloomfilter(cell); 233 appendDeleteFamilyBloomFilter(cell); 234 writer.append(cell); 235 trackTimestamps(cell); 236 } 237 238 @Override 239 public void beforeShipped() throws IOException { 240 // For now these writer will always be of type ShipperListener true. 241 // TODO : Change all writers to be specifically created for compaction context 242 writer.beforeShipped(); 243 if (generalBloomFilterWriter != null) { 244 generalBloomFilterWriter.beforeShipped(); 245 } 246 if (deleteFamilyBloomFilterWriter != null) { 247 deleteFamilyBloomFilterWriter.beforeShipped(); 248 } 249 } 250 251 public Path getPath() { 252 return this.writer.getPath(); 253 } 254 255 public boolean hasGeneralBloom() { 256 return this.generalBloomFilterWriter != null; 257 } 258 259 /** 260 * For unit testing only. 261 * 262 * @return the Bloom filter used by this writer. 263 */ 264 BloomFilterWriter getGeneralBloomWriter() { 265 return generalBloomFilterWriter; 266 } 267 268 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 269 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 270 if (haveBloom) { 271 bfw.compactBloom(); 272 } 273 return haveBloom; 274 } 275 276 private boolean closeGeneralBloomFilter() throws IOException { 277 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 278 279 // add the general Bloom filter writer and append file info 280 if (hasGeneralBloom) { 281 writer.addGeneralBloomFilter(generalBloomFilterWriter); 282 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 283 bloomContext.addLastBloomKey(writer); 284 } 285 return hasGeneralBloom; 286 } 287 288 private boolean closeDeleteFamilyBloomFilter() throws IOException { 289 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 290 291 // add the delete family Bloom filter writer 292 if (hasDeleteFamilyBloom) { 293 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 294 } 295 296 // append file info about the number of delete family kvs 297 // even if there is no delete family Bloom. 298 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 299 300 return hasDeleteFamilyBloom; 301 } 302 303 public void close() throws IOException { 304 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 305 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 306 307 writer.close(); 308 309 // Log final Bloom filter statistics. This needs to be done after close() 310 // because compound Bloom filters might be finalized as part of closing. 311 if (LOG.isTraceEnabled()) { 312 LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " + 313 (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " + 314 getPath()); 315 } 316 317 } 318 319 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 320 writer.appendFileInfo(key, value); 321 } 322 323 /** For use in testing. 324 */ 325 HFile.Writer getHFileWriter() { 326 return writer; 327 } 328 329 /** 330 * @param fs 331 * @param dir Directory to create file in. 332 * @return random filename inside passed <code>dir</code> 333 */ 334 static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 335 if (!fs.getFileStatus(dir).isDirectory()) { 336 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 337 } 338 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 339 } 340 341 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 342 justification="Will not overflow") 343 public static class Builder { 344 private final Configuration conf; 345 private final CacheConfig cacheConf; 346 private final FileSystem fs; 347 348 private CellComparator comparator = CellComparator.getInstance(); 349 private BloomType bloomType = BloomType.NONE; 350 private long maxKeyCount = 0; 351 private Path dir; 352 private Path filePath; 353 private InetSocketAddress[] favoredNodes; 354 private HFileContext fileContext; 355 private boolean shouldDropCacheBehind; 356 357 public Builder(Configuration conf, CacheConfig cacheConf, 358 FileSystem fs) { 359 this.conf = conf; 360 this.cacheConf = cacheConf; 361 this.fs = fs; 362 } 363 364 /** 365 * Creates Builder with cache configuration disabled 366 */ 367 public Builder(Configuration conf, FileSystem fs) { 368 this.conf = conf; 369 this.cacheConf = CacheConfig.DISABLED; 370 this.fs = fs; 371 } 372 373 /** 374 * Use either this method or {@link #withFilePath}, but not both. 375 * @param dir Path to column family directory. The directory is created if 376 * does not exist. The file is given a unique name within this 377 * directory. 378 * @return this (for chained invocation) 379 */ 380 public Builder withOutputDir(Path dir) { 381 Preconditions.checkNotNull(dir); 382 this.dir = dir; 383 return this; 384 } 385 386 /** 387 * Use either this method or {@link #withOutputDir}, but not both. 388 * @param filePath the StoreFile path to write 389 * @return this (for chained invocation) 390 */ 391 public Builder withFilePath(Path filePath) { 392 Preconditions.checkNotNull(filePath); 393 this.filePath = filePath; 394 return this; 395 } 396 397 /** 398 * @param favoredNodes an array of favored nodes or possibly null 399 * @return this (for chained invocation) 400 */ 401 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 402 this.favoredNodes = favoredNodes; 403 return this; 404 } 405 406 public Builder withComparator(CellComparator comparator) { 407 Preconditions.checkNotNull(comparator); 408 this.comparator = comparator; 409 return this; 410 } 411 412 public Builder withBloomType(BloomType bloomType) { 413 Preconditions.checkNotNull(bloomType); 414 this.bloomType = bloomType; 415 return this; 416 } 417 418 /** 419 * @param maxKeyCount estimated maximum number of keys we expect to add 420 * @return this (for chained invocation) 421 */ 422 public Builder withMaxKeyCount(long maxKeyCount) { 423 this.maxKeyCount = maxKeyCount; 424 return this; 425 } 426 427 public Builder withFileContext(HFileContext fileContext) { 428 this.fileContext = fileContext; 429 return this; 430 } 431 432 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 433 this.shouldDropCacheBehind = shouldDropCacheBehind; 434 return this; 435 } 436 437 /** 438 * Create a store file writer. Client is responsible for closing file when 439 * done. If metadata, add BEFORE closing using 440 * {@link StoreFileWriter#appendMetadata}. 441 */ 442 public StoreFileWriter build() throws IOException { 443 if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { 444 throw new IllegalArgumentException("Either specify parent directory " + 445 "or file path"); 446 } 447 448 if (dir == null) { 449 dir = filePath.getParent(); 450 } 451 452 if (!fs.exists(dir)) { 453 // Handle permission for non-HDFS filesystem properly 454 // See HBASE-17710 455 HRegionFileSystem.mkdirs(fs, conf, dir); 456 } 457 458 // set block storage policy for temp path 459 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 460 if (null == policyName) { 461 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 462 } 463 FSUtils.setStoragePolicy(this.fs, dir, policyName); 464 465 if (filePath == null) { 466 filePath = getUniqueFile(fs, dir); 467 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 468 bloomType = BloomType.NONE; 469 } 470 } 471 472 if (comparator == null) { 473 comparator = CellComparator.getInstance(); 474 } 475 return new StoreFileWriter(fs, filePath, 476 conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext, 477 shouldDropCacheBehind); 478 } 479 } 480}