View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.io.OutputStream;
25  import java.nio.ByteBuffer;
26  import java.util.Iterator;
27  
28  import org.apache.commons.lang.NotImplementedException;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
34  import org.apache.hadoop.hbase.io.hfile.HFileContext;
35  import org.apache.hadoop.hbase.util.ByteBufferUtils;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.io.IOUtils;
38  import org.apache.hadoop.io.compress.Compressor;
39  
40  import com.google.common.annotations.VisibleForTesting;
41  import com.google.common.base.Preconditions;
42  
43  /**
44   * Encapsulates a data block compressed using a particular encoding algorithm.
45   * Useful for testing and benchmarking.
46   * This is used only in testing.
47   */
48  @InterfaceAudience.Private
49  @VisibleForTesting
50  public class EncodedDataBlock {
51    private byte[] rawKVs;
52    private ByteBuffer rawBuffer;
53    private DataBlockEncoder dataBlockEncoder;
54  
55    private byte[] cachedEncodedData;
56  
57    private final HFileBlockEncodingContext encodingCtx;
58    private HFileContext meta;
59  
60    /**
61     * Create a buffer which will be encoded using dataBlockEncoder.
62     * @param dataBlockEncoder Algorithm used for compression.
63     * @param encoding encoding type used
64     * @param rawKVs
65     * @param meta
66     */
67    public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
68        byte[] rawKVs, HFileContext meta) {
69      Preconditions.checkNotNull(encoding,
70          "Cannot create encoded data block with null encoder");
71      this.dataBlockEncoder = dataBlockEncoder;
72      encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
73          HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
74      this.rawKVs = rawKVs;
75      this.meta = meta;
76    }
77  
78    /**
79     * Provides access to compressed value.
80     * @param headerSize header size of the block.
81     * @return Forwards sequential iterator.
82     */
83    public Iterator<Cell> getIterator(int headerSize) {
84      final int rawSize = rawKVs.length;
85      byte[] encodedDataWithHeader = getEncodedData();
86      int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
87      ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader,
88          bytesToSkip, encodedDataWithHeader.length - bytesToSkip);
89      final DataInputStream dis = new DataInputStream(bais);
90  
91      return new Iterator<Cell>() {
92        private ByteBuffer decompressedData = null;
93  
94        @Override
95        public boolean hasNext() {
96          if (decompressedData == null) {
97            return rawSize > 0;
98          }
99          return decompressedData.hasRemaining();
100       }
101 
102       @Override
103       public Cell next() {
104         if (decompressedData == null) {
105           try {
106             decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
107                 .newDataBlockDecodingContext(meta));
108           } catch (IOException e) {
109             throw new RuntimeException("Problem with data block encoder, " +
110                 "most likely it requested more bytes than are available.", e);
111           }
112           decompressedData.rewind();
113         }
114         int offset = decompressedData.position();
115         int klen = decompressedData.getInt();
116         int vlen = decompressedData.getInt();
117         int tagsLen = 0;
118         ByteBufferUtils.skip(decompressedData, klen + vlen);
119         // Read the tag length in case when steam contain tags
120         if (meta.isIncludesTags()) {
121           tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
122           ByteBufferUtils.skip(decompressedData, tagsLen);
123         }
124         KeyValue kv = new KeyValue(decompressedData.array(), offset,
125             (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
126         if (meta.isIncludesMvcc()) {
127           long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
128           kv.setSequenceId(mvccVersion);
129         }
130         return kv;
131       }
132 
133       @Override
134       public void remove() {
135         throw new NotImplementedException("remove() is not supported!");
136       }
137 
138       @Override
139       public String toString() {
140         return "Iterator of: " + dataBlockEncoder.getClass().getName();
141       }
142 
143     };
144   }
145 
146   /**
147    * Find the size of minimal buffer that could store compressed data.
148    * @return Size in bytes of compressed data.
149    */
150   public int getSize() {
151     return getEncodedData().length;
152   }
153 
154   /**
155    * Find the size of compressed data assuming that buffer will be compressed
156    * using given algorithm.
157    * @param algo compression algorithm
158    * @param compressor compressor already requested from codec
159    * @param inputBuffer Array to be compressed.
160    * @param offset Offset to beginning of the data.
161    * @param length Length to be compressed.
162    * @return Size of compressed data in bytes.
163    * @throws IOException
164    */
165   public static int getCompressedSize(Algorithm algo, Compressor compressor,
166       byte[] inputBuffer, int offset, int length) throws IOException {
167     DataOutputStream compressedStream = new DataOutputStream(
168         new IOUtils.NullOutputStream());
169     if (compressor != null) {
170       compressor.reset();
171     }
172     OutputStream compressingStream = null;
173 
174     try {
175       compressingStream = algo.createCompressionStream(
176           compressedStream, compressor, 0);
177 
178       compressingStream.write(inputBuffer, offset, length);
179       compressingStream.flush();
180 
181       return compressedStream.size();
182     } finally {
183       if (compressingStream != null) compressingStream.close();
184     }
185   }
186 
187   /**
188    * Estimate size after second stage of compression (e.g. LZO).
189    * @param comprAlgo compression algorithm to be used for compression
190    * @param compressor compressor corresponding to the given compression
191    *          algorithm
192    * @return Size after second stage of compression.
193    */
194   public int getEncodedCompressedSize(Algorithm comprAlgo,
195       Compressor compressor) throws IOException {
196     byte[] compressedBytes = getEncodedData();
197     return getCompressedSize(comprAlgo, compressor, compressedBytes, 0,
198         compressedBytes.length);
199   }
200 
201   /** @return encoded data with header */
202   private byte[] getEncodedData() {
203     if (cachedEncodedData != null) {
204       return cachedEncodedData;
205     }
206     cachedEncodedData = encodeData();
207     return cachedEncodedData;
208   }
209 
210   private ByteBuffer getUncompressedBuffer() {
211     if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
212       rawBuffer = ByteBuffer.wrap(rawKVs);
213     }
214     return rawBuffer;
215   }
216 
217   /**
218    * Do the encoding, but do not cache the encoded data.
219    * @return encoded data block with header and checksum
220    */
221   public byte[] encodeData() {
222     ByteArrayOutputStream baos = new ByteArrayOutputStream();
223     try {
224       baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
225       DataOutputStream out = new DataOutputStream(baos);
226       this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
227       ByteBuffer in = getUncompressedBuffer();
228       in.rewind();
229       int klength, vlength;
230       int tagsLength = 0;
231       long memstoreTS = 0L;
232       KeyValue kv = null;
233       while (in.hasRemaining()) {
234         int kvOffset = in.position();
235         klength = in.getInt();
236         vlength = in.getInt();
237         ByteBufferUtils.skip(in, klength + vlength);
238         if (this.meta.isIncludesTags()) {
239           tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
240           ByteBufferUtils.skip(in, tagsLength);
241         }
242         if (this.meta.isIncludesMvcc()) {
243           memstoreTS = ByteBufferUtils.readVLong(in);
244         }
245         kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
246             klength, vlength, tagsLength));
247         kv.setSequenceId(memstoreTS);
248         this.dataBlockEncoder.encode(kv, encodingCtx, out);
249       }
250       BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
251       baos.writeTo(stream);
252       this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, stream.buf);
253     } catch (IOException e) {
254       throw new RuntimeException(String.format(
255           "Bug in encoding part of algorithm %s. " +
256           "Probably it requested more bytes than are available.",
257           toString()), e);
258     }
259     return baos.toByteArray();
260   }
261 
262   private static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
263     private byte[] buf;
264 
265     @Override
266     public void write(byte[] b, int off, int len) {
267       this.buf = b;
268     }
269   }
270 
271   @Override
272   public String toString() {
273     return dataBlockEncoder.toString();
274   }
275 }