View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile;
22  
23  import java.io.DataOutput;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.KeyValue.KeyComparator;
38  import org.apache.hadoop.hbase.fs.HFileSystem;
39  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
40  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
41  import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
42  import org.apache.hadoop.hbase.util.ChecksumType;
43  import org.apache.hadoop.hbase.util.BloomFilterWriter;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.io.Writable;
46  import org.apache.hadoop.io.WritableUtils;
47  
48  /**
49   * Writes HFile format version 2.
50   */
51  public class HFileWriterV2 extends AbstractHFileWriter {
52    static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
53  
54    /** Max memstore (mvcc) timestamp in FileInfo */
55    public static final byte [] MAX_MEMSTORE_TS_KEY =
56        Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
57  
58    /** KeyValue version in FileInfo */
59    public static final byte [] KEY_VALUE_VERSION =
60        Bytes.toBytes("KEY_VALUE_VERSION");
61  
62    /** Version for KeyValue which includes memstore timestamp */
63    public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
64  
65    /** Inline block writers for multi-level block index and compound Blooms. */
66    private List<InlineBlockWriter> inlineBlockWriters =
67        new ArrayList<InlineBlockWriter>();
68  
69    /** Unified version 2 block writer */
70    private HFileBlock.Writer fsBlockWriter;
71  
72    private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
73    private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
74  
75    /** The offset of the first data block or -1 if the file is empty. */
76    private long firstDataBlockOffset = -1;
77  
78    /** The offset of the last data block or 0 if the file is empty. */
79    private long lastDataBlockOffset;
80  
81    /** Additional data items to be written to the "load-on-open" section. */
82    private List<BlockWritable> additionalLoadOnOpenData =
83      new ArrayList<BlockWritable>();
84  
85    /** Checksum related settings */
86    private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
87    private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
88  
89    private final boolean includeMemstoreTS;
90    private long maxMemstoreTS = 0;
91  
92    private int minorVersion = HFileReaderV2.MAX_MINOR_VERSION;
93  
94    static class WriterFactoryV2 extends HFile.WriterFactory {
95      WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
96        super(conf, cacheConf);
97      }
98  
99      @Override
100     public Writer createWriter(FileSystem fs, Path path,
101         FSDataOutputStream ostream, int blockSize,
102         Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
103         final KeyComparator comparator, final ChecksumType checksumType,
104         final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException {
105       return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, compress,
106           blockEncoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint);
107     }
108   }
109 
110   /** Constructor that takes a path, creates and closes the output stream. */
111   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
112       FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize,
113       Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder,
114       final KeyComparator comparator, final ChecksumType checksumType,
115       final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException {
116     super(cacheConf,
117         ostream == null ? createOutputStream(conf, fs, path) : ostream,
118         path, blockSize, compressAlgo, blockEncoder, comparator);
119     SchemaMetrics.configureGlobally(conf);
120     this.checksumType = checksumType;
121     this.bytesPerChecksum = bytesPerChecksum;
122     this.includeMemstoreTS = includeMVCCReadpoint;
123     if (!conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, false)) {
124       this.minorVersion = 0;
125     }
126     finishInit(conf);
127   }
128 
129   /** Additional initialization steps */
130   private void finishInit(final Configuration conf) {
131     if (fsBlockWriter != null)
132       throw new IllegalStateException("finishInit called twice");
133 
134     // HFile filesystem-level (non-caching) block writer
135     fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
136         includeMemstoreTS, minorVersion, checksumType, bytesPerChecksum);
137 
138     // Data block index writer
139     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
140     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
141         cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
142         cacheIndexesOnWrite ? name : null);
143     dataBlockIndexWriter.setMaxChunkSize(
144         HFileBlockIndex.getMaxChunkSize(conf));
145     inlineBlockWriters.add(dataBlockIndexWriter);
146 
147     // Meta data block index writer
148     metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
149     LOG.debug("Initialized with " + cacheConf);
150 
151     if (isSchemaConfigured()) {
152       schemaConfigurationChanged();
153     }
154   }
155 
156   @Override
157   protected void schemaConfigurationChanged() {
158     passSchemaMetricsTo(dataBlockIndexWriter);
159     passSchemaMetricsTo(metaBlockIndexWriter);
160   }
161 
162   /**
163    * At a block boundary, write all the inline blocks and opens new block.
164    *
165    * @throws IOException
166    */
167   private void checkBlockBoundary() throws IOException {
168     if (fsBlockWriter.blockSizeWritten() < blockSize)
169       return;
170 
171     finishBlock();
172     writeInlineBlocks(false);
173     newBlock();
174   }
175 
176   /** Clean up the current block */
177   private void finishBlock() throws IOException {
178     if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
179       return;
180 
181     long startTimeNs = System.nanoTime();
182 
183     // Update the first data block offset for scanning.
184     if (firstDataBlockOffset == -1) {
185       firstDataBlockOffset = outputStream.getPos();
186     }
187 
188     // Update the last data block offset
189     lastDataBlockOffset = outputStream.getPos();
190 
191     fsBlockWriter.writeHeaderAndData(outputStream);
192 
193     int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
194     dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset,
195         onDiskSize);
196     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
197 
198     HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
199     
200     if (cacheConf.shouldCacheDataOnWrite()) {
201       doCacheOnWrite(lastDataBlockOffset);
202     }
203   }
204 
205   /** Gives inline block writers an opportunity to contribute blocks. */
206   private void writeInlineBlocks(boolean closing) throws IOException {
207     for (InlineBlockWriter ibw : inlineBlockWriters) {
208       while (ibw.shouldWriteBlock(closing)) {
209         long offset = outputStream.getPos();
210         boolean cacheThisBlock = ibw.cacheOnWrite();
211         ibw.writeInlineBlock(fsBlockWriter.startWriting(
212             ibw.getInlineBlockType()));
213         fsBlockWriter.writeHeaderAndData(outputStream);
214         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
215             fsBlockWriter.getUncompressedSizeWithoutHeader());
216         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
217 
218         if (cacheThisBlock) {
219           doCacheOnWrite(offset);
220         }
221       }
222     }
223   }
224 
225   /**
226    * Caches the last written HFile block.
227    * @param offset the offset of the block we want to cache. Used to determine
228    *          the cache key.
229    */
230   private void doCacheOnWrite(long offset) {
231     // We don't cache-on-write data blocks on compaction, so assume this is not
232     // a compaction.
233     final boolean isCompaction = false;
234     HFileBlock cacheFormatBlock = blockEncoder.diskToCacheFormat(
235         fsBlockWriter.getBlockForCaching(), isCompaction);
236     passSchemaMetricsTo(cacheFormatBlock);
237     cacheConf.getBlockCache().cacheBlock(
238         new BlockCacheKey(name, offset, blockEncoder.getEncodingInCache(),
239             cacheFormatBlock.getBlockType()), cacheFormatBlock);
240   }
241 
242   /**
243    * Ready a new block for writing.
244    *
245    * @throws IOException
246    */
247   private void newBlock() throws IOException {
248     // This is where the next block begins.
249     fsBlockWriter.startWriting(BlockType.DATA);
250     firstKeyInBlock = null;
251   }
252 
253   /**
254    * Add a meta block to the end of the file. Call before close(). Metadata
255    * blocks are expensive. Fill one with a bunch of serialized data rather than
256    * do a metadata block per metadata instance. If metadata is small, consider
257    * adding to file info using {@link #appendFileInfo(byte[], byte[])}
258    *
259    * @param metaBlockName
260    *          name of the block
261    * @param content
262    *          will call readFields to get data later (DO NOT REUSE)
263    */
264   @Override
265   public void appendMetaBlock(String metaBlockName, Writable content) {
266     byte[] key = Bytes.toBytes(metaBlockName);
267     int i;
268     for (i = 0; i < metaNames.size(); ++i) {
269       // stop when the current key is greater than our own
270       byte[] cur = metaNames.get(i);
271       if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
272           key.length) > 0) {
273         break;
274       }
275     }
276     metaNames.add(i, key);
277     metaData.add(i, content);
278   }
279 
280   /**
281    * Add key/value to file. Keys must be added in an order that agrees with the
282    * Comparator passed on construction.
283    *
284    * @param kv
285    *          KeyValue to add. Cannot be empty nor null.
286    * @throws IOException
287    */
288   @Override
289   public void append(final KeyValue kv) throws IOException {
290     append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
291         kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
292     this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS());
293   }
294 
295   /**
296    * Add key/value to file. Keys must be added in an order that agrees with the
297    * Comparator passed on construction.
298    *
299    * @param key
300    *          Key to add. Cannot be empty nor null.
301    * @param value
302    *          Value to add. Cannot be empty nor null.
303    * @throws IOException
304    */
305   @Override
306   public void append(final byte[] key, final byte[] value) throws IOException {
307     append(0, key, 0, key.length, value, 0, value.length);
308   }
309 
310   /**
311    * Add key/value to file. Keys must be added in an order that agrees with the
312    * Comparator passed on construction.
313    *
314    * @param key
315    * @param koffset
316    * @param klength
317    * @param value
318    * @param voffset
319    * @param vlength
320    * @throws IOException
321    */
322   private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
323       final byte[] value, final int voffset, final int vlength)
324       throws IOException {
325     boolean dupKey = checkKey(key, koffset, klength);
326     checkValue(value, voffset, vlength);
327     if (!dupKey) {
328       checkBlockBoundary();
329     }
330 
331     if (!fsBlockWriter.isWriting())
332       newBlock();
333 
334     // Write length of key and value and then actual key and value bytes.
335     // Additionally, we may also write down the memstoreTS.
336     {
337       DataOutputStream out = fsBlockWriter.getUserDataStream();
338       out.writeInt(klength);
339       totalKeyLength += klength;
340       out.writeInt(vlength);
341       totalValueLength += vlength;
342       out.write(key, koffset, klength);
343       out.write(value, voffset, vlength);
344       if (this.includeMemstoreTS) {
345         WritableUtils.writeVLong(out, memstoreTS);
346       }
347     }
348 
349     // Are we the first key in this block?
350     if (firstKeyInBlock == null) {
351       // Copy the key.
352       firstKeyInBlock = new byte[klength];
353       System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
354     }
355 
356     lastKeyBuffer = key;
357     lastKeyOffset = koffset;
358     lastKeyLength = klength;
359     entryCount++;
360   }
361 
362   @Override
363   public void close() throws IOException {
364     if (outputStream == null) {
365       return;
366     }
367     // Save data block encoder metadata in the file info.
368     blockEncoder.saveMetadata(this);
369     // Write out the end of the data blocks, then write meta data blocks.
370     // followed by fileinfo, data block index and meta block index.
371 
372     finishBlock();
373     writeInlineBlocks(true);
374 
375     FixedFileTrailer trailer = new FixedFileTrailer(2, minorVersion);
376 
377     // Write out the metadata blocks if any.
378     if (!metaNames.isEmpty()) {
379       for (int i = 0; i < metaNames.size(); ++i) {
380         // store the beginning offset
381         long offset = outputStream.getPos();
382         // write the metadata content
383         DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
384         metaData.get(i).write(dos);
385 
386         fsBlockWriter.writeHeaderAndData(outputStream);
387         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
388 
389         // Add the new meta block to the meta index.
390         metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
391             fsBlockWriter.getOnDiskSizeWithHeader());
392       }
393     }
394 
395     // Load-on-open section.
396 
397     // Data block index.
398     //
399     // In version 2, this section of the file starts with the root level data
400     // block index. We call a function that writes intermediate-level blocks
401     // first, then root level, and returns the offset of the root level block
402     // index.
403 
404     long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
405     trailer.setLoadOnOpenOffset(rootIndexOffset);
406 
407     // Meta block index.
408     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
409         BlockType.ROOT_INDEX), "meta");
410     fsBlockWriter.writeHeaderAndData(outputStream);
411     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
412 
413     if (this.includeMemstoreTS) {
414       appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
415       appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
416     }
417 
418     // File info
419     writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
420     fsBlockWriter.writeHeaderAndData(outputStream);
421     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
422 
423     // Load-on-open data supplied by higher levels, e.g. Bloom filters.
424     for (BlockWritable w : additionalLoadOnOpenData){
425       fsBlockWriter.writeBlock(w, outputStream);
426       totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
427     }
428 
429     // Now finish off the trailer.
430     trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
431     trailer.setUncompressedDataIndexSize(
432         dataBlockIndexWriter.getTotalUncompressedSize());
433     trailer.setFirstDataBlockOffset(firstDataBlockOffset);
434     trailer.setLastDataBlockOffset(lastDataBlockOffset);
435     trailer.setComparatorClass(comparator.getClass());
436     trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
437 
438 
439     finishClose(trailer);
440 
441     fsBlockWriter.releaseCompressor();
442   }
443 
444   @Override
445   public void addInlineBlockWriter(InlineBlockWriter ibw) {
446     inlineBlockWriters.add(ibw);
447   }
448 
449   @Override
450   public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
451     this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
452   }
453 
454   @Override
455   public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
456     this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
457   }
458 
459   private void addBloomFilter(final BloomFilterWriter bfw,
460       final BlockType blockType) {
461     if (bfw.getKeyCount() <= 0)
462       return;
463 
464     if (blockType != BlockType.GENERAL_BLOOM_META &&
465         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
466       throw new RuntimeException("Block Type: " + blockType.toString() +
467           "is not supported");
468     }
469     additionalLoadOnOpenData.add(new BlockWritable() {
470       @Override
471       public BlockType getBlockType() {
472         return blockType;
473       }
474 
475       @Override
476       public void writeToBlock(DataOutput out) throws IOException {
477         bfw.getMetaWriter().write(out);
478         Writable dataWriter = bfw.getDataWriter();
479         if (dataWriter != null)
480           dataWriter.write(out);
481       }
482     });
483   }
484 
485 }