View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValue.KVComparator;
37  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
38  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
39  import org.apache.hadoop.hbase.util.BloomFilterWriter;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.io.WritableUtils;
43  
44  /**
45   * Writes HFile format version 2.
46   */
47  @InterfaceAudience.Private
48  public class HFileWriterV2 extends AbstractHFileWriter {
49    static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
50  
51    /** Max memstore (mvcc) timestamp in FileInfo */
52    public static final byte [] MAX_MEMSTORE_TS_KEY =
53        Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
54  
55    /** KeyValue version in FileInfo */
56    public static final byte [] KEY_VALUE_VERSION =
57        Bytes.toBytes("KEY_VALUE_VERSION");
58  
59    /** Version for KeyValue which includes memstore timestamp */
60    public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
61  
62    /** Inline block writers for multi-level block index and compound Blooms. */
63    private List<InlineBlockWriter> inlineBlockWriters =
64        new ArrayList<InlineBlockWriter>();
65  
66    /** Unified version 2 block writer */
67    protected HFileBlock.Writer fsBlockWriter;
68  
69    private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
70    private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
71  
72    /** The offset of the first data block or -1 if the file is empty. */
73    private long firstDataBlockOffset = -1;
74  
75    /** The offset of the last data block or 0 if the file is empty. */
76    protected long lastDataBlockOffset;
77  
78    /** The last(stop) Key of the previous data block. */
79    private byte[] lastKeyOfPreviousBlock = null;
80  
81    /** Additional data items to be written to the "load-on-open" section. */
82    private List<BlockWritable> additionalLoadOnOpenData =
83      new ArrayList<BlockWritable>();
84  
85    protected long maxMemstoreTS = 0;
86  
87    static class WriterFactoryV2 extends HFile.WriterFactory {
88      WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
89        super(conf, cacheConf);
90      }
91  
92      @Override
93      public Writer createWriter(FileSystem fs, Path path, 
94          FSDataOutputStream ostream,
95          KVComparator comparator, HFileContext context) throws IOException {
96        context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
97        return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
98            comparator, context);
99        }
100     }
101 
102   /** Constructor that takes a path, creates and closes the output stream. */
103   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
104       FileSystem fs, Path path, FSDataOutputStream ostream, 
105       final KVComparator comparator, final HFileContext context) throws IOException {
106     super(cacheConf,
107         ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
108         path, comparator, context);
109     finishInit(conf);
110   }
111 
112   /** Additional initialization steps */
113   protected void finishInit(final Configuration conf) {
114     if (fsBlockWriter != null)
115       throw new IllegalStateException("finishInit called twice");
116 
117     fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
118 
119     // Data block index writer
120     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
121     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
122         cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
123         cacheIndexesOnWrite ? name : null);
124     dataBlockIndexWriter.setMaxChunkSize(
125         HFileBlockIndex.getMaxChunkSize(conf));
126     inlineBlockWriters.add(dataBlockIndexWriter);
127 
128     // Meta data block index writer
129     metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
130     if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
131   }
132 
133   /**
134    * At a block boundary, write all the inline blocks and opens new block.
135    *
136    * @throws IOException
137    */
138   protected void checkBlockBoundary() throws IOException {
139     if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
140       return;
141 
142     finishBlock();
143     writeInlineBlocks(false);
144     newBlock();
145   }
146 
147   /** Clean up the current block */
148   private void finishBlock() throws IOException {
149     if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
150       return;
151 
152     long startTimeNs = System.nanoTime();
153     // Update the first data block offset for scanning.
154     if (firstDataBlockOffset == -1) {
155       firstDataBlockOffset = outputStream.getPos();
156     }
157     // Update the last data block offset
158     lastDataBlockOffset = outputStream.getPos();
159     fsBlockWriter.writeHeaderAndData(outputStream);
160     int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
161 
162     byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
163     dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);
164     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
165     HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
166     if (cacheConf.shouldCacheDataOnWrite()) {
167       doCacheOnWrite(lastDataBlockOffset);
168     }
169   }
170 
171   /** Gives inline block writers an opportunity to contribute blocks. */
172   private void writeInlineBlocks(boolean closing) throws IOException {
173     for (InlineBlockWriter ibw : inlineBlockWriters) {
174       while (ibw.shouldWriteBlock(closing)) {
175         long offset = outputStream.getPos();
176         boolean cacheThisBlock = ibw.getCacheOnWrite();
177         ibw.writeInlineBlock(fsBlockWriter.startWriting(
178             ibw.getInlineBlockType()));
179         fsBlockWriter.writeHeaderAndData(outputStream);
180         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
181             fsBlockWriter.getUncompressedSizeWithoutHeader());
182         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
183 
184         if (cacheThisBlock) {
185           doCacheOnWrite(offset);
186         }
187       }
188     }
189   }
190 
191   /**
192    * Caches the last written HFile block.
193    * @param offset the offset of the block we want to cache. Used to determine
194    *          the cache key.
195    */
196   private void doCacheOnWrite(long offset) {
197     HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
198     cacheConf.getBlockCache().cacheBlock(
199         new BlockCacheKey(name, offset, blockEncoder.getDataBlockEncoding(),
200             cacheFormatBlock.getBlockType()), cacheFormatBlock);
201   }
202 
203   /**
204    * Ready a new block for writing.
205    *
206    * @throws IOException
207    */
208   protected void newBlock() throws IOException {
209     // This is where the next block begins.
210     fsBlockWriter.startWriting(BlockType.DATA);
211     firstKeyInBlock = null;
212     if (lastKeyLength > 0) {
213       lastKeyOfPreviousBlock = new byte[lastKeyLength];
214       System.arraycopy(lastKeyBuffer, lastKeyOffset, lastKeyOfPreviousBlock, 0, lastKeyLength);
215     }
216   }
217 
218   /**
219    * Add a meta block to the end of the file. Call before close(). Metadata
220    * blocks are expensive. Fill one with a bunch of serialized data rather than
221    * do a metadata block per metadata instance. If metadata is small, consider
222    * adding to file info using {@link #appendFileInfo(byte[], byte[])}
223    *
224    * @param metaBlockName
225    *          name of the block
226    * @param content
227    *          will call readFields to get data later (DO NOT REUSE)
228    */
229   @Override
230   public void appendMetaBlock(String metaBlockName, Writable content) {
231     byte[] key = Bytes.toBytes(metaBlockName);
232     int i;
233     for (i = 0; i < metaNames.size(); ++i) {
234       // stop when the current key is greater than our own
235       byte[] cur = metaNames.get(i);
236       if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
237           key.length) > 0) {
238         break;
239       }
240     }
241     metaNames.add(i, key);
242     metaData.add(i, content);
243   }
244 
245   /**
246    * Add key/value to file. Keys must be added in an order that agrees with the
247    * Comparator passed on construction.
248    *
249    * @param kv
250    *          KeyValue to add. Cannot be empty nor null.
251    * @throws IOException
252    */
253   @Override
254   public void append(final KeyValue kv) throws IOException {
255     append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
256         kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
257     this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
258   }
259 
260   /**
261    * Add key/value to file. Keys must be added in an order that agrees with the
262    * Comparator passed on construction.
263    *
264    * @param key
265    *          Key to add. Cannot be empty nor null.
266    * @param value
267    *          Value to add. Cannot be empty nor null.
268    * @throws IOException
269    */
270   @Override
271   public void append(final byte[] key, final byte[] value) throws IOException {
272     append(0, key, 0, key.length, value, 0, value.length);
273   }
274 
275   /**
276    * Add key/value to file. Keys must be added in an order that agrees with the
277    * Comparator passed on construction.
278    *
279    * @param key
280    * @param koffset
281    * @param klength
282    * @param value
283    * @param voffset
284    * @param vlength
285    * @throws IOException
286    */
287   protected void append(final long memstoreTS, final byte[] key, final int koffset,
288       final int klength, final byte[] value, final int voffset, final int vlength)
289       throws IOException {
290     boolean dupKey = checkKey(key, koffset, klength);
291     checkValue(value, voffset, vlength);
292     if (!dupKey) {
293       checkBlockBoundary();
294     }
295 
296     if (!fsBlockWriter.isWriting())
297       newBlock();
298 
299     // Write length of key and value and then actual key and value bytes.
300     // Additionally, we may also write down the memstoreTS.
301     {
302       DataOutputStream out = fsBlockWriter.getUserDataStream();
303       out.writeInt(klength);
304       totalKeyLength += klength;
305       out.writeInt(vlength);
306       totalValueLength += vlength;
307       out.write(key, koffset, klength);
308       out.write(value, voffset, vlength);
309       if (this.hFileContext.isIncludesMvcc()) {
310         WritableUtils.writeVLong(out, memstoreTS);
311       }
312     }
313 
314     // Are we the first key in this block?
315     if (firstKeyInBlock == null) {
316       // Copy the key.
317       firstKeyInBlock = new byte[klength];
318       System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
319     }
320 
321     lastKeyBuffer = key;
322     lastKeyOffset = koffset;
323     lastKeyLength = klength;
324     entryCount++;
325   }
326 
327   @Override
328   public void close() throws IOException {
329     if (outputStream == null) {
330       return;
331     }
332     // Save data block encoder metadata in the file info.
333     blockEncoder.saveMetadata(this);
334     // Write out the end of the data blocks, then write meta data blocks.
335     // followed by fileinfo, data block index and meta block index.
336 
337     finishBlock();
338     writeInlineBlocks(true);
339 
340     FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
341 
342     // Write out the metadata blocks if any.
343     if (!metaNames.isEmpty()) {
344       for (int i = 0; i < metaNames.size(); ++i) {
345         // store the beginning offset
346         long offset = outputStream.getPos();
347         // write the metadata content
348         DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
349         metaData.get(i).write(dos);
350 
351         fsBlockWriter.writeHeaderAndData(outputStream);
352         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
353 
354         // Add the new meta block to the meta index.
355         metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
356             fsBlockWriter.getOnDiskSizeWithHeader());
357       }
358     }
359 
360     // Load-on-open section.
361 
362     // Data block index.
363     //
364     // In version 2, this section of the file starts with the root level data
365     // block index. We call a function that writes intermediate-level blocks
366     // first, then root level, and returns the offset of the root level block
367     // index.
368 
369     long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
370     trailer.setLoadOnOpenOffset(rootIndexOffset);
371 
372     // Meta block index.
373     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
374         BlockType.ROOT_INDEX), "meta");
375     fsBlockWriter.writeHeaderAndData(outputStream);
376     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
377 
378     if (this.hFileContext.isIncludesMvcc()) {
379       appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
380       appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
381     }
382 
383     // File info
384     writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
385     fsBlockWriter.writeHeaderAndData(outputStream);
386     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
387 
388     // Load-on-open data supplied by higher levels, e.g. Bloom filters.
389     for (BlockWritable w : additionalLoadOnOpenData){
390       fsBlockWriter.writeBlock(w, outputStream);
391       totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
392     }
393 
394     // Now finish off the trailer.
395     trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
396     trailer.setUncompressedDataIndexSize(
397         dataBlockIndexWriter.getTotalUncompressedSize());
398     trailer.setFirstDataBlockOffset(firstDataBlockOffset);
399     trailer.setLastDataBlockOffset(lastDataBlockOffset);
400     trailer.setComparatorClass(comparator.getClass());
401     trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
402 
403 
404     finishClose(trailer);
405 
406     fsBlockWriter.release();
407   }
408 
409   @Override
410   public void addInlineBlockWriter(InlineBlockWriter ibw) {
411     inlineBlockWriters.add(ibw);
412   }
413 
414   @Override
415   public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
416     this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
417   }
418 
419   @Override
420   public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
421     this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
422   }
423 
424   private void addBloomFilter(final BloomFilterWriter bfw,
425       final BlockType blockType) {
426     if (bfw.getKeyCount() <= 0)
427       return;
428 
429     if (blockType != BlockType.GENERAL_BLOOM_META &&
430         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
431       throw new RuntimeException("Block Type: " + blockType.toString() +
432           "is not supported");
433     }
434     additionalLoadOnOpenData.add(new BlockWritable() {
435       @Override
436       public BlockType getBlockType() {
437         return blockType;
438       }
439 
440       @Override
441       public void writeToBlock(DataOutput out) throws IOException {
442         bfw.getMetaWriter().write(out);
443         Writable dataWriter = bfw.getDataWriter();
444         if (dataWriter != null)
445           dataWriter.write(out);
446       }
447     });
448   }
449 
450   @Override
451   public void append(byte[] key, byte[] value, byte[] tag) throws IOException {
452     throw new UnsupportedOperationException("KV tags are supported only from HFile V3");
453   }
454 
455   protected int getMajorVersion() {
456     return 2;
457   }
458 
459   protected int getMinorVersion() {
460     return HFileReaderV2.MAX_MINOR_VERSION;
461   }
462 
463   @Override
464   public HFileContext getFileContext() {
465     return hFileContext;
466   }
467 }