View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellComparator;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.KeyValue.KVComparator;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
40  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
41  import org.apache.hadoop.hbase.util.BloomFilterWriter;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.io.Writable;
44  
45  /**
46   * Writes HFile format version 2.
47   */
48  @InterfaceAudience.Private
49  public class HFileWriterV2 extends AbstractHFileWriter {
50    static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
51  
52    /** Max memstore (mvcc) timestamp in FileInfo */
53    public static final byte [] MAX_MEMSTORE_TS_KEY =
54        Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
55  
56    /** KeyValue version in FileInfo */
57    public static final byte [] KEY_VALUE_VERSION =
58        Bytes.toBytes("KEY_VALUE_VERSION");
59  
60    /** Version for KeyValue which includes memstore timestamp */
61    public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
62  
63    /** Inline block writers for multi-level block index and compound Blooms. */
64    private List<InlineBlockWriter> inlineBlockWriters =
65        new ArrayList<InlineBlockWriter>();
66  
67    /** Unified version 2 block writer */
68    protected HFileBlock.Writer fsBlockWriter;
69  
70    private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
71    private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
72  
73    /** The offset of the first data block or -1 if the file is empty. */
74    private long firstDataBlockOffset = -1;
75  
76    /** The offset of the last data block or 0 if the file is empty. */
77    protected long lastDataBlockOffset;
78  
79    /**
80     * The last(stop) Cell of the previous data block.
81     * This reference should be short-lived since we write hfiles in a burst.
82     */
83    private Cell lastCellOfPreviousBlock = null;
84  
85    /** Additional data items to be written to the "load-on-open" section. */
86    private List<BlockWritable> additionalLoadOnOpenData =
87      new ArrayList<BlockWritable>();
88  
89    protected long maxMemstoreTS = 0;
90  
91    static class WriterFactoryV2 extends HFile.WriterFactory {
92      WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
93        super(conf, cacheConf);
94      }
95  
96      @Override
97      public Writer createWriter(FileSystem fs, Path path, 
98          FSDataOutputStream ostream,
99          KVComparator comparator, HFileContext context) throws IOException {
100       context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
101       return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
102           comparator, context);
103       }
104     }
105 
106   /** Constructor that takes a path, creates and closes the output stream. */
107   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
108       FileSystem fs, Path path, FSDataOutputStream ostream, 
109       final KVComparator comparator, final HFileContext context) throws IOException {
110     super(cacheConf,
111         ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
112         path, comparator, context);
113     finishInit(conf);
114   }
115 
116   /** Additional initialization steps */
117   protected void finishInit(final Configuration conf) {
118     if (fsBlockWriter != null)
119       throw new IllegalStateException("finishInit called twice");
120 
121     fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
122 
123     // Data block index writer
124     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
125     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
126         cacheIndexesOnWrite ? cacheConf : null,
127         cacheIndexesOnWrite ? name : null);
128     dataBlockIndexWriter.setMaxChunkSize(
129         HFileBlockIndex.getMaxChunkSize(conf));
130     inlineBlockWriters.add(dataBlockIndexWriter);
131 
132     // Meta data block index writer
133     metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
134     if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
135   }
136 
137   /**
138    * At a block boundary, write all the inline blocks and opens new block.
139    *
140    * @throws IOException
141    */
142   protected void checkBlockBoundary() throws IOException {
143     if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
144       return;
145 
146     finishBlock();
147     writeInlineBlocks(false);
148     newBlock();
149   }
150 
151   /** Clean up the current data block */
152   private void finishBlock() throws IOException {
153     if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
154       return;
155 
156     // Update the first data block offset for scanning.
157     if (firstDataBlockOffset == -1) {
158       firstDataBlockOffset = outputStream.getPos();
159     }
160     // Update the last data block offset
161     lastDataBlockOffset = outputStream.getPos();
162     fsBlockWriter.writeHeaderAndData(outputStream);
163     int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
164 
165     // Rather than CellComparator, we should be making use of an Interface here with the
166     // implementation class serialized out to the HFile metadata. TODO.
167     Cell indexEntry = CellComparator.getMidpoint(lastCellOfPreviousBlock, firstCellInBlock);
168     dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
169       lastDataBlockOffset, onDiskSize);
170     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
171     if (cacheConf.shouldCacheDataOnWrite()) {
172       doCacheOnWrite(lastDataBlockOffset);
173     }
174   }
175 
176   /** Gives inline block writers an opportunity to contribute blocks. */
177   private void writeInlineBlocks(boolean closing) throws IOException {
178     for (InlineBlockWriter ibw : inlineBlockWriters) {
179       while (ibw.shouldWriteBlock(closing)) {
180         long offset = outputStream.getPos();
181         boolean cacheThisBlock = ibw.getCacheOnWrite();
182         ibw.writeInlineBlock(fsBlockWriter.startWriting(
183             ibw.getInlineBlockType()));
184         fsBlockWriter.writeHeaderAndData(outputStream);
185         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
186             fsBlockWriter.getUncompressedSizeWithoutHeader());
187         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
188 
189         if (cacheThisBlock) {
190           doCacheOnWrite(offset);
191         }
192       }
193     }
194   }
195 
196   /**
197    * Caches the last written HFile block.
198    * @param offset the offset of the block we want to cache. Used to determine
199    *          the cache key.
200    */
201   private void doCacheOnWrite(long offset) {
202     HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
203     cacheConf.getBlockCache().cacheBlock(
204         new BlockCacheKey(name, offset), cacheFormatBlock);
205   }
206 
207   /**
208    * Ready a new block for writing.
209    *
210    * @throws IOException
211    */
212   protected void newBlock() throws IOException {
213     // This is where the next block begins.
214     fsBlockWriter.startWriting(BlockType.DATA);
215     firstCellInBlock = null;
216     if (lastCell != null) {
217       lastCellOfPreviousBlock = lastCell;
218     }
219   }
220 
221   /**
222    * Add a meta block to the end of the file. Call before close(). Metadata
223    * blocks are expensive. Fill one with a bunch of serialized data rather than
224    * do a metadata block per metadata instance. If metadata is small, consider
225    * adding to file info using {@link #appendFileInfo(byte[], byte[])}
226    *
227    * @param metaBlockName
228    *          name of the block
229    * @param content
230    *          will call readFields to get data later (DO NOT REUSE)
231    */
232   @Override
233   public void appendMetaBlock(String metaBlockName, Writable content) {
234     byte[] key = Bytes.toBytes(metaBlockName);
235     int i;
236     for (i = 0; i < metaNames.size(); ++i) {
237       // stop when the current key is greater than our own
238       byte[] cur = metaNames.get(i);
239       if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
240           key.length) > 0) {
241         break;
242       }
243     }
244     metaNames.add(i, key);
245     metaData.add(i, content);
246   }
247 
248   /**
249    * Add key/value to file. Keys must be added in an order that agrees with the
250    * Comparator passed on construction.
251    *
252    * @param cell Cell to add. Cannot be empty nor null.
253    * @throws IOException
254    */
255   @Override
256   public void append(final Cell cell) throws IOException {
257     byte[] value = cell.getValueArray();
258     int voffset = cell.getValueOffset();
259     int vlength = cell.getValueLength();
260     // checkKey uses comparator to check we are writing in order.
261     boolean dupKey = checkKey(cell);
262     checkValue(value, voffset, vlength);
263     if (!dupKey) {
264       checkBlockBoundary();
265     }
266 
267     if (!fsBlockWriter.isWriting())
268       newBlock();
269 
270     fsBlockWriter.write(cell);
271 
272     totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
273     totalValueLength += vlength;
274 
275     // Are we the first key in this block?
276     if (firstCellInBlock == null) {
277       // If cell is big, block will be closed and this firstCellInBlock reference will only last
278       // a short while.
279       firstCellInBlock = cell;
280     }
281 
282     // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
283     // indefinetly?
284     lastCell = cell;
285     entryCount++;
286     this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
287   }
288 
289   @Override
290   public void close() throws IOException {
291     if (outputStream == null) {
292       return;
293     }
294     // Save data block encoder metadata in the file info.
295     blockEncoder.saveMetadata(this);
296     // Write out the end of the data blocks, then write meta data blocks.
297     // followed by fileinfo, data block index and meta block index.
298 
299     finishBlock();
300     writeInlineBlocks(true);
301 
302     FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
303 
304     // Write out the metadata blocks if any.
305     if (!metaNames.isEmpty()) {
306       for (int i = 0; i < metaNames.size(); ++i) {
307         // store the beginning offset
308         long offset = outputStream.getPos();
309         // write the metadata content
310         DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
311         metaData.get(i).write(dos);
312 
313         fsBlockWriter.writeHeaderAndData(outputStream);
314         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
315 
316         // Add the new meta block to the meta index.
317         metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
318             fsBlockWriter.getOnDiskSizeWithHeader());
319       }
320     }
321 
322     // Load-on-open section.
323 
324     // Data block index.
325     //
326     // In version 2, this section of the file starts with the root level data
327     // block index. We call a function that writes intermediate-level blocks
328     // first, then root level, and returns the offset of the root level block
329     // index.
330 
331     long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
332     trailer.setLoadOnOpenOffset(rootIndexOffset);
333 
334     // Meta block index.
335     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
336         BlockType.ROOT_INDEX), "meta");
337     fsBlockWriter.writeHeaderAndData(outputStream);
338     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
339 
340     if (this.hFileContext.isIncludesMvcc()) {
341       appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
342       appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
343     }
344 
345     // File info
346     writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
347     fsBlockWriter.writeHeaderAndData(outputStream);
348     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
349 
350     // Load-on-open data supplied by higher levels, e.g. Bloom filters.
351     for (BlockWritable w : additionalLoadOnOpenData){
352       fsBlockWriter.writeBlock(w, outputStream);
353       totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
354     }
355 
356     // Now finish off the trailer.
357     trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
358     trailer.setUncompressedDataIndexSize(
359         dataBlockIndexWriter.getTotalUncompressedSize());
360     trailer.setFirstDataBlockOffset(firstDataBlockOffset);
361     trailer.setLastDataBlockOffset(lastDataBlockOffset);
362     trailer.setComparatorClass(comparator.getClass());
363     trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
364 
365 
366     finishClose(trailer);
367 
368     fsBlockWriter.release();
369   }
370 
371   @Override
372   public void addInlineBlockWriter(InlineBlockWriter ibw) {
373     inlineBlockWriters.add(ibw);
374   }
375 
376   @Override
377   public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
378     this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
379   }
380 
381   @Override
382   public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
383     this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
384   }
385 
386   private void addBloomFilter(final BloomFilterWriter bfw,
387       final BlockType blockType) {
388     if (bfw.getKeyCount() <= 0)
389       return;
390 
391     if (blockType != BlockType.GENERAL_BLOOM_META &&
392         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
393       throw new RuntimeException("Block Type: " + blockType.toString() +
394           "is not supported");
395     }
396     additionalLoadOnOpenData.add(new BlockWritable() {
397       @Override
398       public BlockType getBlockType() {
399         return blockType;
400       }
401 
402       @Override
403       public void writeToBlock(DataOutput out) throws IOException {
404         bfw.getMetaWriter().write(out);
405         Writable dataWriter = bfw.getDataWriter();
406         if (dataWriter != null)
407           dataWriter.write(out);
408       }
409     });
410   }
411 
412   protected int getMajorVersion() {
413     return 2;
414   }
415 
416   protected int getMinorVersion() {
417     return HFileReaderV2.MAX_MINOR_VERSION;
418   }
419 
420   @Override
421   public HFileContext getFileContext() {
422     return hFileContext;
423   }
424 }