View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.OutputStream;
26  import java.nio.ByteBuffer;
27  import java.util.ArrayList;
28  import java.util.List;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FSDataOutputStream;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.KeyValue.KeyComparator;
38  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
39  import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
40  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
41  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
42  import org.apache.hadoop.hbase.regionserver.MemStore;
43  import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
44  import org.apache.hadoop.hbase.util.ChecksumType;
45  import org.apache.hadoop.hbase.util.BloomFilterWriter;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.io.Writable;
48  import org.apache.hadoop.io.compress.Compressor;
49  
50  /**
51   * Writes version 1 HFiles. Mainly used for testing backwards-compatibility.
52   */
53  public class HFileWriterV1 extends AbstractHFileWriter {
54  
55    /** Meta data block name for bloom filter parameters. */
56    static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
57  
58    /** Meta data block name for bloom filter bits. */
59    public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
60  
61    private static final Log LOG = LogFactory.getLog(HFileWriterV1.class);
62  
63    // A stream made per block written.
64    private DataOutputStream out;
65  
66    // Offset where the current block began.
67    private long blockBegin;
68  
69    // First keys of every block.
70    private ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
71  
72    // Block offset in backing stream.
73    private ArrayList<Long> blockOffsets = new ArrayList<Long>();
74  
75    // Raw (decompressed) data size.
76    private ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
77  
78    private Compressor compressor;
79  
80    // Additional byte array output stream used to fill block cache
81    private ByteArrayOutputStream baos;
82    private DataOutputStream baosDos;
83    private int blockNumber = 0;
84  
85    static class WriterFactoryV1 extends HFile.WriterFactory {
86      WriterFactoryV1(Configuration conf, CacheConfig cacheConf) {
87        super(conf, cacheConf);
88      }
89  
90      @Override
91      public Writer createWriter(FileSystem fs, Path path,
92          FSDataOutputStream ostream, int blockSize,
93          Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
94          KeyComparator comparator, final ChecksumType checksumType,
95          final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException {
96        // version 1 does not implement checksums
97        return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize,
98            compressAlgo, dataBlockEncoder, comparator);
99      }
100   }
101 
102   /** Constructor that takes a path, creates and closes the output stream. */
103   public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
104       FileSystem fs, Path path, FSDataOutputStream ostream,
105       int blockSize, Compression.Algorithm compress,
106       HFileDataBlockEncoder blockEncoder,
107       final KeyComparator comparator) throws IOException {
108     super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path,
109         blockSize, compress, blockEncoder, comparator);
110     SchemaMetrics.configureGlobally(conf);
111   }
112 
113   /**
114    * If at block boundary, opens new block.
115    *
116    * @throws IOException
117    */
118   private void checkBlockBoundary() throws IOException {
119     if (this.out != null && this.out.size() < blockSize)
120       return;
121     finishBlock();
122     newBlock();
123   }
124 
125   /**
126    * Do the cleanup if a current block.
127    *
128    * @throws IOException
129    */
130   private void finishBlock() throws IOException {
131     if (this.out == null)
132       return;
133     long startTimeNs = System.nanoTime();
134 
135     int size = releaseCompressingStream(this.out);
136     this.out = null;
137     blockKeys.add(firstKeyInBlock);
138     blockOffsets.add(Long.valueOf(blockBegin));
139     blockDataSizes.add(Integer.valueOf(size));
140     this.totalUncompressedBytes += size;
141 
142     HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
143     
144     if (cacheConf.shouldCacheDataOnWrite()) {
145       baosDos.flush();
146       // we do not do data block encoding on disk for HFile v1
147       byte[] bytes = baos.toByteArray();
148       HFileBlock block = new HFileBlock(BlockType.DATA,
149           (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
150           ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
151           blockBegin, MemStore.NO_PERSISTENT_TS, 
152           HFileBlock.MINOR_VERSION_NO_CHECKSUM,        // minor version
153           0,                                         // bytesPerChecksum
154           ChecksumType.NULL.getCode(),               // checksum type
155           (int) (outputStream.getPos() - blockBegin) +
156           HFileBlock.HEADER_SIZE_NO_CHECKSUM);       // onDiskDataSizeWithHeader
157 
158       block = blockEncoder.diskToCacheFormat(block, false);
159       passSchemaMetricsTo(block);
160       cacheConf.getBlockCache().cacheBlock(
161           new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE,
162               block.getBlockType()), block);
163       baosDos.close();
164     }
165     blockNumber++;
166   }
167 
168   /**
169    * Ready a new block for writing.
170    *
171    * @throws IOException
172    */
173   private void newBlock() throws IOException {
174     // This is where the next block begins.
175     blockBegin = outputStream.getPos();
176     this.out = getCompressingStream();
177     BlockType.DATA.write(out);
178     firstKeyInBlock = null;
179     if (cacheConf.shouldCacheDataOnWrite()) {
180       this.baos = new ByteArrayOutputStream();
181       this.baosDos = new DataOutputStream(baos);
182       baosDos.write(HFileBlock.DUMMY_HEADER_NO_CHECKSUM);
183     }
184   }
185 
186   /**
187    * Sets up a compressor and creates a compression stream on top of
188    * this.outputStream. Get one per block written.
189    *
190    * @return A compressing stream; if 'none' compression, returned stream does
191    * not compress.
192    *
193    * @throws IOException
194    *
195    * @see {@link #releaseCompressingStream(DataOutputStream)}
196    */
197   private DataOutputStream getCompressingStream() throws IOException {
198     this.compressor = compressAlgo.getCompressor();
199     // Get new DOS compression stream. In tfile, the DOS, is not closed,
200     // just finished, and that seems to be fine over there. TODO: Check
201     // no memory retention of the DOS. Should I disable the 'flush' on the
202     // DOS as the BCFile over in tfile does? It wants to make it so flushes
203     // don't go through to the underlying compressed stream. Flush on the
204     // compressed downstream should be only when done. I was going to but
205     // looks like when we call flush in here, its legitimate flush that
206     // should go through to the compressor.
207     OutputStream os = this.compressAlgo.createCompressionStream(
208         this.outputStream, this.compressor, 0);
209     return new DataOutputStream(os);
210   }
211 
212   /**
213    * Let go of block compressor and compressing stream gotten in call {@link
214    * #getCompressingStream}.
215    *
216    * @param dos
217    *
218    * @return How much was written on this stream since it was taken out.
219    *
220    * @see #getCompressingStream()
221    *
222    * @throws IOException
223    */
224   private int releaseCompressingStream(final DataOutputStream dos)
225       throws IOException {
226     dos.flush();
227     this.compressAlgo.returnCompressor(this.compressor);
228     this.compressor = null;
229     return dos.size();
230   }
231 
232   /**
233    * Add a meta block to the end of the file. Call before close(). Metadata
234    * blocks are expensive. Fill one with a bunch of serialized data rather than
235    * do a metadata block per metadata instance. If metadata is small, consider
236    * adding to file info using {@link #appendFileInfo(byte[], byte[])}
237    *
238    * @param metaBlockName
239    *          name of the block
240    * @param content
241    *          will call readFields to get data later (DO NOT REUSE)
242    */
243   public void appendMetaBlock(String metaBlockName, Writable content) {
244     byte[] key = Bytes.toBytes(metaBlockName);
245     int i;
246     for (i = 0; i < metaNames.size(); ++i) {
247       // stop when the current key is greater than our own
248       byte[] cur = metaNames.get(i);
249       if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
250           key.length) > 0) {
251         break;
252       }
253     }
254     metaNames.add(i, key);
255     metaData.add(i, content);
256   }
257 
258   /**
259    * Add key/value to file. Keys must be added in an order that agrees with the
260    * Comparator passed on construction.
261    *
262    * @param kv
263    *          KeyValue to add. Cannot be empty nor null.
264    * @throws IOException
265    */
266   public void append(final KeyValue kv) throws IOException {
267     append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
268         kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
269   }
270 
271   /**
272    * Add key/value to file. Keys must be added in an order that agrees with the
273    * Comparator passed on construction.
274    *
275    * @param key
276    *          Key to add. Cannot be empty nor null.
277    * @param value
278    *          Value to add. Cannot be empty nor null.
279    * @throws IOException
280    */
281   public void append(final byte[] key, final byte[] value) throws IOException {
282     append(key, 0, key.length, value, 0, value.length);
283   }
284 
285   /**
286    * Add key/value to file. Keys must be added in an order that agrees with the
287    * Comparator passed on construction.
288    *
289    * @param key
290    * @param koffset
291    * @param klength
292    * @param value
293    * @param voffset
294    * @param vlength
295    * @throws IOException
296    */
297   private void append(final byte[] key, final int koffset, final int klength,
298       final byte[] value, final int voffset, final int vlength)
299       throws IOException {
300     boolean dupKey = checkKey(key, koffset, klength);
301     checkValue(value, voffset, vlength);
302     if (!dupKey) {
303       checkBlockBoundary();
304     }
305     // Write length of key and value and then actual key and value bytes.
306     this.out.writeInt(klength);
307     totalKeyLength += klength;
308     this.out.writeInt(vlength);
309     totalValueLength += vlength;
310     this.out.write(key, koffset, klength);
311     this.out.write(value, voffset, vlength);
312     // Are we the first key in this block?
313     if (this.firstKeyInBlock == null) {
314       // Copy the key.
315       this.firstKeyInBlock = new byte[klength];
316       System.arraycopy(key, koffset, this.firstKeyInBlock, 0, klength);
317     }
318     this.lastKeyBuffer = key;
319     this.lastKeyOffset = koffset;
320     this.lastKeyLength = klength;
321     this.entryCount++;
322     // If we are pre-caching blocks on write, fill byte array stream
323     if (cacheConf.shouldCacheDataOnWrite()) {
324       this.baosDos.writeInt(klength);
325       this.baosDos.writeInt(vlength);
326       this.baosDos.write(key, koffset, klength);
327       this.baosDos.write(value, voffset, vlength);
328     }
329   }
330 
331   public void close() throws IOException {
332     if (this.outputStream == null) {
333       return;
334     }
335     // Save data block encoder metadata in the file info.
336     blockEncoder.saveMetadata(this);
337     // Write out the end of the data blocks, then write meta data blocks.
338     // followed by fileinfo, data block index and meta block index.
339 
340     finishBlock();
341 
342     FixedFileTrailer trailer = new FixedFileTrailer(1,
343                                  HFileBlock.MINOR_VERSION_NO_CHECKSUM);
344 
345     // Write out the metadata blocks if any.
346     ArrayList<Long> metaOffsets = null;
347     ArrayList<Integer> metaDataSizes = null;
348     if (metaNames.size() > 0) {
349       metaOffsets = new ArrayList<Long>(metaNames.size());
350       metaDataSizes = new ArrayList<Integer>(metaNames.size());
351       for (int i = 0; i < metaNames.size(); ++i) {
352         // store the beginning offset
353         long curPos = outputStream.getPos();
354         metaOffsets.add(curPos);
355         // write the metadata content
356         DataOutputStream dos = getCompressingStream();
357         BlockType.META.write(dos);
358         metaData.get(i).write(dos);
359         int size = releaseCompressingStream(dos);
360         // store the metadata size
361         metaDataSizes.add(size);
362       }
363     }
364 
365     writeFileInfo(trailer, outputStream);
366 
367     // Write the data block index.
368     trailer.setLoadOnOpenOffset(writeBlockIndex(this.outputStream,
369         this.blockKeys, this.blockOffsets, this.blockDataSizes));
370     LOG.info("Wrote a version 1 block index with " + this.blockKeys.size()
371         + " keys");
372 
373     if (metaNames.size() > 0) {
374       // Write the meta index.
375       writeBlockIndex(this.outputStream, metaNames, metaOffsets, metaDataSizes);
376     }
377 
378     // Now finish off the trailer.
379     trailer.setDataIndexCount(blockKeys.size());
380 
381     finishClose(trailer);
382   }
383 
384   @Override
385   protected void finishFileInfo() throws IOException {
386     super.finishFileInfo();
387 
388     // In version 1, we store comparator name in the file info.
389     fileInfo.append(FileInfo.COMPARATOR,
390         Bytes.toBytes(comparator.getClass().getName()), false);
391   }
392 
393   @Override
394   public void addInlineBlockWriter(InlineBlockWriter bloomWriter) {
395     // Inline blocks only exist in HFile format version 2.
396     throw new UnsupportedOperationException();
397   }
398 
399   /**
400    * Version 1 general Bloom filters are stored in two meta blocks with two different
401    * keys.
402    */
403   @Override
404   public void addGeneralBloomFilter(BloomFilterWriter bfw) {
405     appendMetaBlock(BLOOM_FILTER_META_KEY,
406         bfw.getMetaWriter());
407     Writable dataWriter = bfw.getDataWriter();
408     if (dataWriter != null) {
409       appendMetaBlock(BLOOM_FILTER_DATA_KEY, dataWriter);
410     }
411   }
412 
413   @Override
414   public void addDeleteFamilyBloomFilter(BloomFilterWriter bfw)
415   throws IOException {
416     throw new IOException("Delete Bloom filter is not supported in HFile V1");
417   }
418 
419   /**
420    * Write out the index in the version 1 format. This conforms to the legacy
421    * version 1 format, but can still be read by
422    * {@link HFileBlockIndex.BlockIndexReader#readRootIndex(java.io.DataInputStream,
423    * int)}.
424    *
425    * @param out the stream to write to
426    * @param keys
427    * @param offsets
428    * @param uncompressedSizes in contrast with a version 2 root index format,
429    *          the sizes stored in the version 1 are uncompressed sizes
430    * @return
431    * @throws IOException
432    */
433   private static long writeBlockIndex(final FSDataOutputStream out,
434       final List<byte[]> keys, final List<Long> offsets,
435       final List<Integer> uncompressedSizes) throws IOException {
436     long pos = out.getPos();
437     // Don't write an index if nothing in the index.
438     if (keys.size() > 0) {
439       BlockType.INDEX_V1.write(out);
440       // Write the index.
441       for (int i = 0; i < keys.size(); ++i) {
442         out.writeLong(offsets.get(i).longValue());
443         out.writeInt(uncompressedSizes.get(i).intValue());
444         byte[] key = keys.get(i);
445         Bytes.writeByteArray(out, key);
446       }
447     }
448     return pos;
449   }
450 
451 }