View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValue.KVComparator;
37  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
38  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
39  import org.apache.hadoop.hbase.util.BloomFilterWriter;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.io.Writable;
42  
43  /**
44   * Writes HFile format version 2.
45   */
46  @InterfaceAudience.Private
47  public class HFileWriterV2 extends AbstractHFileWriter {
48    static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
49  
50    /** Max memstore (mvcc) timestamp in FileInfo */
51    public static final byte [] MAX_MEMSTORE_TS_KEY =
52        Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
53  
54    /** KeyValue version in FileInfo */
55    public static final byte [] KEY_VALUE_VERSION =
56        Bytes.toBytes("KEY_VALUE_VERSION");
57  
58    /** Version for KeyValue which includes memstore timestamp */
59    public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
60  
61    /** Inline block writers for multi-level block index and compound Blooms. */
62    private List<InlineBlockWriter> inlineBlockWriters =
63        new ArrayList<InlineBlockWriter>();
64  
65    /** Unified version 2 block writer */
66    protected HFileBlock.Writer fsBlockWriter;
67  
68    private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
69    private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
70  
71    /** The offset of the first data block or -1 if the file is empty. */
72    private long firstDataBlockOffset = -1;
73  
74    /** The offset of the last data block or 0 if the file is empty. */
75    protected long lastDataBlockOffset;
76  
77    /** The last(stop) Key of the previous data block. */
78    private byte[] lastKeyOfPreviousBlock = null;
79  
80    /** Additional data items to be written to the "load-on-open" section. */
81    private List<BlockWritable> additionalLoadOnOpenData =
82      new ArrayList<BlockWritable>();
83  
84    protected long maxMemstoreTS = 0;
85  
86    static class WriterFactoryV2 extends HFile.WriterFactory {
87      WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
88        super(conf, cacheConf);
89      }
90  
91      @Override
92      public Writer createWriter(FileSystem fs, Path path, 
93          FSDataOutputStream ostream,
94          KVComparator comparator, HFileContext context) throws IOException {
95        context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
96        return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
97            comparator, context);
98        }
99      }
100 
101   /** Constructor that takes a path, creates and closes the output stream. */
102   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
103       FileSystem fs, Path path, FSDataOutputStream ostream, 
104       final KVComparator comparator, final HFileContext context) throws IOException {
105     super(cacheConf,
106         ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
107         path, comparator, context);
108     finishInit(conf);
109   }
110 
111   /** Additional initialization steps */
112   protected void finishInit(final Configuration conf) {
113     if (fsBlockWriter != null)
114       throw new IllegalStateException("finishInit called twice");
115 
116     fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
117 
118     // Data block index writer
119     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
120     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
121         cacheIndexesOnWrite ? cacheConf : null,
122         cacheIndexesOnWrite ? name : null);
123     dataBlockIndexWriter.setMaxChunkSize(
124         HFileBlockIndex.getMaxChunkSize(conf));
125     inlineBlockWriters.add(dataBlockIndexWriter);
126 
127     // Meta data block index writer
128     metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
129     if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
130   }
131 
132   /**
133    * At a block boundary, write all the inline blocks and opens new block.
134    *
135    * @throws IOException
136    */
137   protected void checkBlockBoundary() throws IOException {
138     if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
139       return;
140 
141     finishBlock();
142     writeInlineBlocks(false);
143     newBlock();
144   }
145 
146   /** Clean up the current data block */
147   private void finishBlock() throws IOException {
148     if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
149       return;
150 
151     // Update the first data block offset for scanning.
152     if (firstDataBlockOffset == -1) {
153       firstDataBlockOffset = outputStream.getPos();
154     }
155     // Update the last data block offset
156     lastDataBlockOffset = outputStream.getPos();
157     fsBlockWriter.writeHeaderAndData(outputStream);
158     int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
159 
160     byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
161     dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);
162     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
163     if (cacheConf.shouldCacheDataOnWrite()) {
164       doCacheOnWrite(lastDataBlockOffset);
165     }
166   }
167 
168   /** Gives inline block writers an opportunity to contribute blocks. */
169   private void writeInlineBlocks(boolean closing) throws IOException {
170     for (InlineBlockWriter ibw : inlineBlockWriters) {
171       while (ibw.shouldWriteBlock(closing)) {
172         long offset = outputStream.getPos();
173         boolean cacheThisBlock = ibw.getCacheOnWrite();
174         ibw.writeInlineBlock(fsBlockWriter.startWriting(
175             ibw.getInlineBlockType()));
176         fsBlockWriter.writeHeaderAndData(outputStream);
177         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
178             fsBlockWriter.getUncompressedSizeWithoutHeader());
179         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
180 
181         if (cacheThisBlock) {
182           doCacheOnWrite(offset);
183         }
184       }
185     }
186   }
187 
188   /**
189    * Caches the last written HFile block.
190    * @param offset the offset of the block we want to cache. Used to determine
191    *          the cache key.
192    */
193   private void doCacheOnWrite(long offset) {
194     HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
195     cacheConf.getBlockCache().cacheBlock(
196         new BlockCacheKey(name, offset), cacheFormatBlock);
197   }
198 
199   /**
200    * Ready a new block for writing.
201    *
202    * @throws IOException
203    */
204   protected void newBlock() throws IOException {
205     // This is where the next block begins.
206     fsBlockWriter.startWriting(BlockType.DATA);
207     firstKeyInBlock = null;
208     if (lastKeyLength > 0) {
209       lastKeyOfPreviousBlock = new byte[lastKeyLength];
210       System.arraycopy(lastKeyBuffer, lastKeyOffset, lastKeyOfPreviousBlock, 0, lastKeyLength);
211     }
212   }
213 
214   /**
215    * Add a meta block to the end of the file. Call before close(). Metadata
216    * blocks are expensive. Fill one with a bunch of serialized data rather than
217    * do a metadata block per metadata instance. If metadata is small, consider
218    * adding to file info using {@link #appendFileInfo(byte[], byte[])}
219    *
220    * @param metaBlockName
221    *          name of the block
222    * @param content
223    *          will call readFields to get data later (DO NOT REUSE)
224    */
225   @Override
226   public void appendMetaBlock(String metaBlockName, Writable content) {
227     byte[] key = Bytes.toBytes(metaBlockName);
228     int i;
229     for (i = 0; i < metaNames.size(); ++i) {
230       // stop when the current key is greater than our own
231       byte[] cur = metaNames.get(i);
232       if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
233           key.length) > 0) {
234         break;
235       }
236     }
237     metaNames.add(i, key);
238     metaData.add(i, content);
239   }
240 
241   /**
242    * Add key/value to file. Keys must be added in an order that agrees with the
243    * Comparator passed on construction.
244    *
245    * @param kv
246    *          KeyValue to add. Cannot be empty nor null.
247    * @throws IOException
248    */
249   @Override
250   public void append(final KeyValue kv) throws IOException {
251     byte[] key = kv.getBuffer();
252     int koffset = kv.getKeyOffset();
253     int klength = kv.getKeyLength();
254     byte[] value = kv.getValueArray();
255     int voffset = kv.getValueOffset();
256     int vlength = kv.getValueLength();
257     boolean dupKey = checkKey(key, koffset, klength);
258     checkValue(value, voffset, vlength);
259     if (!dupKey) {
260       checkBlockBoundary();
261     }
262 
263     if (!fsBlockWriter.isWriting())
264       newBlock();
265 
266     fsBlockWriter.write(kv);
267 
268     totalKeyLength += klength;
269     totalValueLength += vlength;
270 
271     // Are we the first key in this block?
272     if (firstKeyInBlock == null) {
273       // Copy the key.
274       firstKeyInBlock = new byte[klength];
275       System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
276     }
277 
278     lastKeyBuffer = key;
279     lastKeyOffset = koffset;
280     lastKeyLength = klength;
281     entryCount++;
282     this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
283   }
284 
285   /**
286    * Add key/value to file. Keys must be added in an order that agrees with the
287    * Comparator passed on construction.
288    *
289    * @param key
290    *          Key to add. Cannot be empty nor null.
291    * @param value
292    *          Value to add. Cannot be empty nor null.
293    * @throws IOException
294    */
295   @Override
296   public void append(final byte[] key, final byte[] value) throws IOException {
297     int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, 0);
298     byte[] b = new byte[kvlen];
299     int pos = 0;
300     pos = Bytes.putInt(b, pos, key.length);
301     pos = Bytes.putInt(b, pos, value.length);
302     pos = Bytes.putBytes(b, pos, key, 0, key.length);
303     Bytes.putBytes(b, pos, value, 0, value.length);
304     append(new KeyValue(b, 0, kvlen));
305   }
306 
307   @Override
308   public void close() throws IOException {
309     if (outputStream == null) {
310       return;
311     }
312     // Save data block encoder metadata in the file info.
313     blockEncoder.saveMetadata(this);
314     // Write out the end of the data blocks, then write meta data blocks.
315     // followed by fileinfo, data block index and meta block index.
316 
317     finishBlock();
318     writeInlineBlocks(true);
319 
320     FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
321 
322     // Write out the metadata blocks if any.
323     if (!metaNames.isEmpty()) {
324       for (int i = 0; i < metaNames.size(); ++i) {
325         // store the beginning offset
326         long offset = outputStream.getPos();
327         // write the metadata content
328         DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
329         metaData.get(i).write(dos);
330 
331         fsBlockWriter.writeHeaderAndData(outputStream);
332         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
333 
334         // Add the new meta block to the meta index.
335         metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
336             fsBlockWriter.getOnDiskSizeWithHeader());
337       }
338     }
339 
340     // Load-on-open section.
341 
342     // Data block index.
343     //
344     // In version 2, this section of the file starts with the root level data
345     // block index. We call a function that writes intermediate-level blocks
346     // first, then root level, and returns the offset of the root level block
347     // index.
348 
349     long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
350     trailer.setLoadOnOpenOffset(rootIndexOffset);
351 
352     // Meta block index.
353     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
354         BlockType.ROOT_INDEX), "meta");
355     fsBlockWriter.writeHeaderAndData(outputStream);
356     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
357 
358     if (this.hFileContext.isIncludesMvcc()) {
359       appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
360       appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
361     }
362 
363     // File info
364     writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
365     fsBlockWriter.writeHeaderAndData(outputStream);
366     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
367 
368     // Load-on-open data supplied by higher levels, e.g. Bloom filters.
369     for (BlockWritable w : additionalLoadOnOpenData){
370       fsBlockWriter.writeBlock(w, outputStream);
371       totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
372     }
373 
374     // Now finish off the trailer.
375     trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
376     trailer.setUncompressedDataIndexSize(
377         dataBlockIndexWriter.getTotalUncompressedSize());
378     trailer.setFirstDataBlockOffset(firstDataBlockOffset);
379     trailer.setLastDataBlockOffset(lastDataBlockOffset);
380     trailer.setComparatorClass(comparator.getClass());
381     trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
382 
383 
384     finishClose(trailer);
385 
386     fsBlockWriter.release();
387   }
388 
389   @Override
390   public void addInlineBlockWriter(InlineBlockWriter ibw) {
391     inlineBlockWriters.add(ibw);
392   }
393 
394   @Override
395   public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
396     this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
397   }
398 
399   @Override
400   public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
401     this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
402   }
403 
404   private void addBloomFilter(final BloomFilterWriter bfw,
405       final BlockType blockType) {
406     if (bfw.getKeyCount() <= 0)
407       return;
408 
409     if (blockType != BlockType.GENERAL_BLOOM_META &&
410         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
411       throw new RuntimeException("Block Type: " + blockType.toString() +
412           "is not supported");
413     }
414     additionalLoadOnOpenData.add(new BlockWritable() {
415       @Override
416       public BlockType getBlockType() {
417         return blockType;
418       }
419 
420       @Override
421       public void writeToBlock(DataOutput out) throws IOException {
422         bfw.getMetaWriter().write(out);
423         Writable dataWriter = bfw.getDataWriter();
424         if (dataWriter != null)
425           dataWriter.write(out);
426       }
427     });
428   }
429 
430   @Override
431   public void append(byte[] key, byte[] value, byte[] tag) throws IOException {
432     throw new UnsupportedOperationException("KV tags are supported only from HFile V3");
433   }
434 
435   protected int getMajorVersion() {
436     return 2;
437   }
438 
439   protected int getMinorVersion() {
440     return HFileReaderV2.MAX_MINOR_VERSION;
441   }
442 
443   @Override
444   public HFileContext getFileContext() {
445     return hFileContext;
446   }
447 }