View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.DataInputStream;
21  import java.io.DataOutputStream;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.ByteBuffer;
25  import java.util.Iterator;
26  
27  import org.apache.commons.lang.NotImplementedException;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
33  import org.apache.hadoop.hbase.io.hfile.HFileContext;
34  import org.apache.hadoop.hbase.util.ByteBufferUtils;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.io.IOUtils;
37  import org.apache.hadoop.io.compress.Compressor;
38  
39  import com.google.common.base.Preconditions;
40  
41  /**
42   * Encapsulates a data block compressed using a particular encoding algorithm.
43   * Useful for testing and benchmarking.
44   */
45  @InterfaceAudience.Private
46  public class EncodedDataBlock {
47    private byte[] rawKVs;
48    private ByteBuffer rawBuffer;
49    private DataBlockEncoder dataBlockEncoder;
50  
51    private byte[] cachedEncodedData;
52  
53    private final HFileBlockEncodingContext encodingCtx;
54    private HFileContext meta;
55  
56    /**
57     * Create a buffer which will be encoded using dataBlockEncoder.
58     * @param dataBlockEncoder Algorithm used for compression.
59     * @param encoding encoding type used
60     * @param rawKVs
61     * @param meta
62     */
63    public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
64        byte[] rawKVs, HFileContext meta) {
65      Preconditions.checkNotNull(encoding,
66          "Cannot create encoded data block with null encoder");
67      this.dataBlockEncoder = dataBlockEncoder;
68      encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
69          HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
70      this.rawKVs = rawKVs;
71      this.meta = meta;
72    }
73  
74    /**
75     * Provides access to compressed value.
76     * @param headerSize header size of the block.
77     * @return Forwards sequential iterator.
78     */
79    public Iterator<Cell> getIterator(int headerSize) {
80      final int rawSize = rawKVs.length;
81      byte[] encodedDataWithHeader = getEncodedData();
82      int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
83      ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader,
84          bytesToSkip, encodedDataWithHeader.length - bytesToSkip);
85      final DataInputStream dis = new DataInputStream(bais);
86  
87      return new Iterator<Cell>() {
88        private ByteBuffer decompressedData = null;
89  
90        @Override
91        public boolean hasNext() {
92          if (decompressedData == null) {
93            return rawSize > 0;
94          }
95          return decompressedData.hasRemaining();
96        }
97  
98        @Override
99        public Cell next() {
100         if (decompressedData == null) {
101           try {
102             decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
103                 .newDataBlockDecodingContext(meta));
104           } catch (IOException e) {
105             throw new RuntimeException("Problem with data block encoder, " +
106                 "most likely it requested more bytes than are available.", e);
107           }
108           decompressedData.rewind();
109         }
110         int offset = decompressedData.position();
111         int klen = decompressedData.getInt();
112         int vlen = decompressedData.getInt();
113         short tagsLen = 0;
114         ByteBufferUtils.skip(decompressedData, klen + vlen);
115         // Read the tag length in case when steam contain tags
116         if (meta.isIncludesTags()) {
117           tagsLen = decompressedData.getShort();
118           ByteBufferUtils.skip(decompressedData, tagsLen);
119         }
120         KeyValue kv = new KeyValue(decompressedData.array(), offset,
121             (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
122         if (meta.isIncludesMvcc()) {
123           long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
124           kv.setMvccVersion(mvccVersion);
125         }
126         return kv;
127       }
128 
129       @Override
130       public void remove() {
131         throw new NotImplementedException("remove() is not supported!");
132       }
133 
134       @Override
135       public String toString() {
136         return "Iterator of: " + dataBlockEncoder.getClass().getName();
137       }
138 
139     };
140   }
141 
142   /**
143    * Find the size of minimal buffer that could store compressed data.
144    * @return Size in bytes of compressed data.
145    */
146   public int getSize() {
147     return getEncodedData().length;
148   }
149 
150   /**
151    * Find the size of compressed data assuming that buffer will be compressed
152    * using given algorithm.
153    * @param algo compression algorithm
154    * @param compressor compressor already requested from codec
155    * @param inputBuffer Array to be compressed.
156    * @param offset Offset to beginning of the data.
157    * @param length Length to be compressed.
158    * @return Size of compressed data in bytes.
159    * @throws IOException
160    */
161   public static int getCompressedSize(Algorithm algo, Compressor compressor,
162       byte[] inputBuffer, int offset, int length) throws IOException {
163     DataOutputStream compressedStream = new DataOutputStream(
164         new IOUtils.NullOutputStream());
165     if (compressor != null) {
166       compressor.reset();
167     }
168     OutputStream compressingStream = null;
169 
170     try {
171       compressingStream = algo.createCompressionStream(
172           compressedStream, compressor, 0);
173 
174       compressingStream.write(inputBuffer, offset, length);
175       compressingStream.flush();
176 
177       return compressedStream.size();
178     } finally {
179       if (compressingStream != null) compressingStream.close();
180     }
181   }
182 
183   /**
184    * Estimate size after second stage of compression (e.g. LZO).
185    * @param comprAlgo compression algorithm to be used for compression
186    * @param compressor compressor corresponding to the given compression
187    *          algorithm
188    * @return Size after second stage of compression.
189    */
190   public int getEncodedCompressedSize(Algorithm comprAlgo,
191       Compressor compressor) throws IOException {
192     byte[] compressedBytes = getEncodedData();
193     return getCompressedSize(comprAlgo, compressor, compressedBytes, 0,
194         compressedBytes.length);
195   }
196 
197   /** @return encoded data with header */
198   private byte[] getEncodedData() {
199     if (cachedEncodedData != null) {
200       return cachedEncodedData;
201     }
202     cachedEncodedData = encodeData();
203     return cachedEncodedData;
204   }
205 
206   private ByteBuffer getUncompressedBuffer() {
207     if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
208       rawBuffer = ByteBuffer.wrap(rawKVs);
209     }
210     return rawBuffer;
211   }
212 
213   /**
214    * Do the encoding, but do not cache the encoded data.
215    * @return encoded data block with header and checksum
216    */
217   public byte[] encodeData() {
218     try {
219       this.dataBlockEncoder.encodeKeyValues(
220           getUncompressedBuffer(), encodingCtx);
221     } catch (IOException e) {
222       throw new RuntimeException(String.format(
223           "Bug in encoding part of algorithm %s. " +
224           "Probably it requested more bytes than are available.",
225           toString()), e);
226     }
227     return encodingCtx.getUncompressedBytesWithHeader();
228   }
229 
230   @Override
231   public String toString() {
232     return dataBlockEncoder.toString();
233   }
234 }