View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValue.KVComparator;
37  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
38  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
39  import org.apache.hadoop.hbase.util.BloomFilterWriter;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.io.Writable;
42  
43  /**
44   * Writes HFile format version 2.
45   */
46  @InterfaceAudience.Private
47  public class HFileWriterV2 extends AbstractHFileWriter {
48    static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
49  
50    /** Max memstore (mvcc) timestamp in FileInfo */
51    public static final byte [] MAX_MEMSTORE_TS_KEY =
52        Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
53  
54    /** KeyValue version in FileInfo */
55    public static final byte [] KEY_VALUE_VERSION =
56        Bytes.toBytes("KEY_VALUE_VERSION");
57  
58    /** Version for KeyValue which includes memstore timestamp */
59    public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
60  
61    /** Inline block writers for multi-level block index and compound Blooms. */
62    private List<InlineBlockWriter> inlineBlockWriters =
63        new ArrayList<InlineBlockWriter>();
64  
65    /** Unified version 2 block writer */
66    protected HFileBlock.Writer fsBlockWriter;
67  
68    private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
69    private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
70  
71    /** The offset of the first data block or -1 if the file is empty. */
72    private long firstDataBlockOffset = -1;
73  
74    /** The offset of the last data block or 0 if the file is empty. */
75    protected long lastDataBlockOffset;
76  
77    /** The last(stop) Key of the previous data block. */
78    private byte[] lastKeyOfPreviousBlock = null;
79  
80    /** Additional data items to be written to the "load-on-open" section. */
81    private List<BlockWritable> additionalLoadOnOpenData =
82      new ArrayList<BlockWritable>();
83  
84    protected long maxMemstoreTS = 0;
85  
86    static class WriterFactoryV2 extends HFile.WriterFactory {
87      WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
88        super(conf, cacheConf);
89      }
90  
91      @Override
92      public Writer createWriter(FileSystem fs, Path path, 
93          FSDataOutputStream ostream,
94          KVComparator comparator, HFileContext context) throws IOException {
95        context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
96        return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
97            comparator, context);
98        }
99      }
100 
101   /** Constructor that takes a path, creates and closes the output stream. */
102   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
103       FileSystem fs, Path path, FSDataOutputStream ostream, 
104       final KVComparator comparator, final HFileContext context) throws IOException {
105     super(cacheConf,
106         ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
107         path, comparator, context);
108     finishInit(conf);
109   }
110 
111   /** Additional initialization steps */
112   protected void finishInit(final Configuration conf) {
113     if (fsBlockWriter != null)
114       throw new IllegalStateException("finishInit called twice");
115 
116     fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
117 
118     // Data block index writer
119     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
120     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
121         cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
122         cacheIndexesOnWrite ? name : null);
123     dataBlockIndexWriter.setMaxChunkSize(
124         HFileBlockIndex.getMaxChunkSize(conf));
125     inlineBlockWriters.add(dataBlockIndexWriter);
126 
127     // Meta data block index writer
128     metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
129     if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
130   }
131 
132   /**
133    * At a block boundary, write all the inline blocks and opens new block.
134    *
135    * @throws IOException
136    */
137   protected void checkBlockBoundary() throws IOException {
138     if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
139       return;
140 
141     finishBlock();
142     writeInlineBlocks(false);
143     newBlock();
144   }
145 
146   /** Clean up the current block */
147   private void finishBlock() throws IOException {
148     if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
149       return;
150 
151     long startTimeNs = System.nanoTime();
152     // Update the first data block offset for scanning.
153     if (firstDataBlockOffset == -1) {
154       firstDataBlockOffset = outputStream.getPos();
155     }
156     // Update the last data block offset
157     lastDataBlockOffset = outputStream.getPos();
158     fsBlockWriter.writeHeaderAndData(outputStream);
159     int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
160 
161     byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
162     dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);
163     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
164     HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
165     if (cacheConf.shouldCacheDataOnWrite()) {
166       doCacheOnWrite(lastDataBlockOffset);
167     }
168   }
169 
170   /** Gives inline block writers an opportunity to contribute blocks. */
171   private void writeInlineBlocks(boolean closing) throws IOException {
172     for (InlineBlockWriter ibw : inlineBlockWriters) {
173       while (ibw.shouldWriteBlock(closing)) {
174         long offset = outputStream.getPos();
175         boolean cacheThisBlock = ibw.getCacheOnWrite();
176         ibw.writeInlineBlock(fsBlockWriter.startWriting(
177             ibw.getInlineBlockType()));
178         fsBlockWriter.writeHeaderAndData(outputStream);
179         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
180             fsBlockWriter.getUncompressedSizeWithoutHeader());
181         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
182 
183         if (cacheThisBlock) {
184           doCacheOnWrite(offset);
185         }
186       }
187     }
188   }
189 
190   /**
191    * Caches the last written HFile block.
192    * @param offset the offset of the block we want to cache. Used to determine
193    *          the cache key.
194    */
195   private void doCacheOnWrite(long offset) {
196     HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
197     cacheConf.getBlockCache().cacheBlock(
198         new BlockCacheKey(name, offset), cacheFormatBlock);
199   }
200 
201   /**
202    * Ready a new block for writing.
203    *
204    * @throws IOException
205    */
206   protected void newBlock() throws IOException {
207     // This is where the next block begins.
208     fsBlockWriter.startWriting(BlockType.DATA);
209     firstKeyInBlock = null;
210     if (lastKeyLength > 0) {
211       lastKeyOfPreviousBlock = new byte[lastKeyLength];
212       System.arraycopy(lastKeyBuffer, lastKeyOffset, lastKeyOfPreviousBlock, 0, lastKeyLength);
213     }
214   }
215 
216   /**
217    * Add a meta block to the end of the file. Call before close(). Metadata
218    * blocks are expensive. Fill one with a bunch of serialized data rather than
219    * do a metadata block per metadata instance. If metadata is small, consider
220    * adding to file info using {@link #appendFileInfo(byte[], byte[])}
221    *
222    * @param metaBlockName
223    *          name of the block
224    * @param content
225    *          will call readFields to get data later (DO NOT REUSE)
226    */
227   @Override
228   public void appendMetaBlock(String metaBlockName, Writable content) {
229     byte[] key = Bytes.toBytes(metaBlockName);
230     int i;
231     for (i = 0; i < metaNames.size(); ++i) {
232       // stop when the current key is greater than our own
233       byte[] cur = metaNames.get(i);
234       if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
235           key.length) > 0) {
236         break;
237       }
238     }
239     metaNames.add(i, key);
240     metaData.add(i, content);
241   }
242 
243   /**
244    * Add key/value to file. Keys must be added in an order that agrees with the
245    * Comparator passed on construction.
246    *
247    * @param kv
248    *          KeyValue to add. Cannot be empty nor null.
249    * @throws IOException
250    */
251   @Override
252   public void append(final KeyValue kv) throws IOException {
253     byte[] key = kv.getBuffer();
254     int koffset = kv.getKeyOffset();
255     int klength = kv.getKeyLength();
256     byte[] value = kv.getValueArray();
257     int voffset = kv.getValueOffset();
258     int vlength = kv.getValueLength();
259     boolean dupKey = checkKey(key, koffset, klength);
260     checkValue(value, voffset, vlength);
261     if (!dupKey) {
262       checkBlockBoundary();
263     }
264 
265     if (!fsBlockWriter.isWriting())
266       newBlock();
267 
268     fsBlockWriter.write(kv);
269 
270     totalKeyLength += klength;
271     totalValueLength += vlength;
272 
273     // Are we the first key in this block?
274     if (firstKeyInBlock == null) {
275       // Copy the key.
276       firstKeyInBlock = new byte[klength];
277       System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
278     }
279 
280     lastKeyBuffer = key;
281     lastKeyOffset = koffset;
282     lastKeyLength = klength;
283     entryCount++;
284     this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
285   }
286 
287   /**
288    * Add key/value to file. Keys must be added in an order that agrees with the
289    * Comparator passed on construction.
290    *
291    * @param key
292    *          Key to add. Cannot be empty nor null.
293    * @param value
294    *          Value to add. Cannot be empty nor null.
295    * @throws IOException
296    */
297   @Override
298   public void append(final byte[] key, final byte[] value) throws IOException {
299     int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, 0);
300     byte[] b = new byte[kvlen];
301     int pos = 0;
302     pos = Bytes.putInt(b, pos, key.length);
303     pos = Bytes.putInt(b, pos, value.length);
304     pos = Bytes.putBytes(b, pos, key, 0, key.length);
305     Bytes.putBytes(b, pos, value, 0, value.length);
306     append(new KeyValue(b, 0, kvlen));
307   }
308 
309   @Override
310   public void close() throws IOException {
311     if (outputStream == null) {
312       return;
313     }
314     // Save data block encoder metadata in the file info.
315     blockEncoder.saveMetadata(this);
316     // Write out the end of the data blocks, then write meta data blocks.
317     // followed by fileinfo, data block index and meta block index.
318 
319     finishBlock();
320     writeInlineBlocks(true);
321 
322     FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
323 
324     // Write out the metadata blocks if any.
325     if (!metaNames.isEmpty()) {
326       for (int i = 0; i < metaNames.size(); ++i) {
327         // store the beginning offset
328         long offset = outputStream.getPos();
329         // write the metadata content
330         DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
331         metaData.get(i).write(dos);
332 
333         fsBlockWriter.writeHeaderAndData(outputStream);
334         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
335 
336         // Add the new meta block to the meta index.
337         metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
338             fsBlockWriter.getOnDiskSizeWithHeader());
339       }
340     }
341 
342     // Load-on-open section.
343 
344     // Data block index.
345     //
346     // In version 2, this section of the file starts with the root level data
347     // block index. We call a function that writes intermediate-level blocks
348     // first, then root level, and returns the offset of the root level block
349     // index.
350 
351     long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
352     trailer.setLoadOnOpenOffset(rootIndexOffset);
353 
354     // Meta block index.
355     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
356         BlockType.ROOT_INDEX), "meta");
357     fsBlockWriter.writeHeaderAndData(outputStream);
358     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
359 
360     if (this.hFileContext.isIncludesMvcc()) {
361       appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
362       appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
363     }
364 
365     // File info
366     writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
367     fsBlockWriter.writeHeaderAndData(outputStream);
368     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
369 
370     // Load-on-open data supplied by higher levels, e.g. Bloom filters.
371     for (BlockWritable w : additionalLoadOnOpenData){
372       fsBlockWriter.writeBlock(w, outputStream);
373       totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
374     }
375 
376     // Now finish off the trailer.
377     trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
378     trailer.setUncompressedDataIndexSize(
379         dataBlockIndexWriter.getTotalUncompressedSize());
380     trailer.setFirstDataBlockOffset(firstDataBlockOffset);
381     trailer.setLastDataBlockOffset(lastDataBlockOffset);
382     trailer.setComparatorClass(comparator.getClass());
383     trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
384 
385 
386     finishClose(trailer);
387 
388     fsBlockWriter.release();
389   }
390 
391   @Override
392   public void addInlineBlockWriter(InlineBlockWriter ibw) {
393     inlineBlockWriters.add(ibw);
394   }
395 
396   @Override
397   public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
398     this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
399   }
400 
401   @Override
402   public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
403     this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
404   }
405 
406   private void addBloomFilter(final BloomFilterWriter bfw,
407       final BlockType blockType) {
408     if (bfw.getKeyCount() <= 0)
409       return;
410 
411     if (blockType != BlockType.GENERAL_BLOOM_META &&
412         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
413       throw new RuntimeException("Block Type: " + blockType.toString() +
414           "is not supported");
415     }
416     additionalLoadOnOpenData.add(new BlockWritable() {
417       @Override
418       public BlockType getBlockType() {
419         return blockType;
420       }
421 
422       @Override
423       public void writeToBlock(DataOutput out) throws IOException {
424         bfw.getMetaWriter().write(out);
425         Writable dataWriter = bfw.getDataWriter();
426         if (dataWriter != null)
427           dataWriter.write(out);
428       }
429     });
430   }
431 
432   @Override
433   public void append(byte[] key, byte[] value, byte[] tag) throws IOException {
434     throw new UnsupportedOperationException("KV tags are supported only from HFile V3");
435   }
436 
437   protected int getMajorVersion() {
438     return 2;
439   }
440 
441   protected int getMinorVersion() {
442     return HFileReaderV2.MAX_MINOR_VERSION;
443   }
444 
445   @Override
446   public HFileContext getFileContext() {
447     return hFileContext;
448   }
449 }