View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.security.SecureRandom;
27  
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.io.TagCompressionContext;
30  import org.apache.hadoop.hbase.io.compress.Compression;
31  import org.apache.hadoop.hbase.io.crypto.Cipher;
32  import org.apache.hadoop.hbase.io.crypto.Encryption;
33  import org.apache.hadoop.hbase.io.crypto.Encryptor;
34  import org.apache.hadoop.hbase.io.hfile.BlockType;
35  import org.apache.hadoop.hbase.io.hfile.HFileContext;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  
39  import com.google.common.base.Preconditions;
40  
41  /**
42   * A default implementation of {@link HFileBlockEncodingContext}. It will
43   * compress the data section as one continuous buffer.
44   *
45   * @see HFileBlockDefaultDecodingContext for the decompression part
46   *
47   */
48  @InterfaceAudience.Private
49  public class HFileBlockDefaultEncodingContext implements
50      HFileBlockEncodingContext {
51    private byte[] onDiskBytesWithHeader;
52    private byte[] uncompressedBytesWithHeader;
53    private BlockType blockType;
54    private final DataBlockEncoding encodingAlgo;
55  
56    private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
57    private DataOutputStream dataOut = new DataOutputStream(encodedStream);
58  
59    private byte[] dummyHeader;
60  
61    // Compression state
62  
63    /** Compressor, which is also reused between consecutive blocks. */
64    private Compressor compressor;
65    /** Compression output stream */
66    private CompressionOutputStream compressionStream;
67    /** Underlying stream to write compressed bytes to */
68    private ByteArrayOutputStream compressedByteStream;
69  
70    private HFileContext fileContext;
71    private TagCompressionContext tagCompressionContext;
72  
73    // Encryption state
74  
75    /** Underlying stream to write encrypted bytes to */
76    private ByteArrayOutputStream cryptoByteStream;
77    /** Initialization vector */
78    private byte[] iv;
79  
80    /**
81     * @param encoding encoding used
82     * @param headerBytes dummy header bytes
83     * @param fileContext HFile meta data
84     */
85    public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
86        HFileContext fileContext) {
87      this.encodingAlgo = encoding;
88      this.fileContext = fileContext;
89      Compression.Algorithm compressionAlgorithm =
90          fileContext.getCompression() == null ? NONE : fileContext.getCompression();
91      if (compressionAlgorithm != NONE) {
92        compressor = compressionAlgorithm.getCompressor();
93        compressedByteStream = new ByteArrayOutputStream();
94        try {
95          compressionStream =
96              compressionAlgorithm.createPlainCompressionStream(
97                  compressedByteStream, compressor);
98        } catch (IOException e) {
99          throw new RuntimeException(
100             "Could not create compression stream for algorithm "
101                 + compressionAlgorithm, e);
102       }
103     }
104 
105     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
106     if (cryptoContext != Encryption.Context.NONE) {
107       cryptoByteStream = new ByteArrayOutputStream();
108       iv = new byte[cryptoContext.getCipher().getIvLength()];
109       new SecureRandom().nextBytes(iv);
110     }
111 
112     dummyHeader = Preconditions.checkNotNull(headerBytes,
113       "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
114   }
115 
116   @Override
117   public void setDummyHeader(byte[] headerBytes) {
118     dummyHeader = headerBytes;
119   }
120 
121   /**
122    * prepare to start a new encoding.
123    * @throws IOException
124    */
125   public void prepareEncoding() throws IOException {
126     encodedStream.reset();
127     dataOut.write(dummyHeader);
128     if (encodingAlgo != null
129         && encodingAlgo != DataBlockEncoding.NONE) {
130       encodingAlgo.writeIdInBytes(dataOut);
131     }
132   }
133 
134   @Override
135   public void postEncoding(BlockType blockType)
136       throws IOException {
137     dataOut.flush();
138     compressAfterEncodingWithBlockType(encodedStream.toByteArray(), blockType);
139     this.blockType = blockType;
140   }
141 
142   /**
143    * @param uncompressedBytesWithHeader
144    * @param blockType
145    * @throws IOException
146    */
147   public void compressAfterEncodingWithBlockType(byte[] uncompressedBytesWithHeader,
148       BlockType blockType) throws IOException {
149     compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader);
150   }
151 
152   /**
153    * @param uncompressedBytesWithHeader
154    * @param blockType
155    * @param headerBytes
156    * @throws IOException
157    */
158   protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
159       BlockType blockType, byte[] headerBytes) throws IOException {
160     this.uncompressedBytesWithHeader = uncompressedBytesWithHeader;
161 
162     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
163     if (cryptoContext != Encryption.Context.NONE) {
164 
165       // Encrypted block format:
166       // +--------------------------+
167       // | byte iv length           |
168       // +--------------------------+
169       // | iv data ...              |
170       // +--------------------------+
171       // | encrypted block data ... |
172       // +--------------------------+
173 
174       cryptoByteStream.reset();
175       // Write the block header (plaintext)
176       cryptoByteStream.write(headerBytes);
177 
178       InputStream in;
179       int plaintextLength;
180       // Run any compression before encryption
181       if (fileContext.getCompression() != Compression.Algorithm.NONE) {
182         compressedByteStream.reset();
183         compressionStream.resetState();
184         compressionStream.write(uncompressedBytesWithHeader,
185             headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length);
186         compressionStream.flush();
187         compressionStream.finish();
188         byte[] plaintext = compressedByteStream.toByteArray();
189         plaintextLength = plaintext.length;
190         in = new ByteArrayInputStream(plaintext);
191       } else {
192         plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length;
193         in = new ByteArrayInputStream(uncompressedBytesWithHeader,
194           headerBytes.length, plaintextLength);
195       }
196 
197       if (plaintextLength > 0) {
198 
199         // Set up the cipher
200         Cipher cipher = cryptoContext.getCipher();
201         Encryptor encryptor = cipher.getEncryptor();
202         encryptor.setKey(cryptoContext.getKey());
203 
204         // Set up the IV
205         int ivLength = iv.length;
206         Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range");
207         cryptoByteStream.write(ivLength);
208         if (ivLength > 0) {
209           Encryption.incrementIv(iv);
210           encryptor.setIv(iv);
211           cryptoByteStream.write(iv);
212         }
213 
214         // Encrypt the data
215         Encryption.encrypt(cryptoByteStream, in, encryptor);
216 
217         onDiskBytesWithHeader = cryptoByteStream.toByteArray();
218 
219       } else {
220 
221         cryptoByteStream.write(0);
222         onDiskBytesWithHeader = cryptoByteStream.toByteArray();
223 
224       }
225 
226     } else {
227 
228       if (this.fileContext.getCompression() != NONE) {
229         compressedByteStream.reset();
230         compressedByteStream.write(headerBytes);
231         compressionStream.resetState();
232         compressionStream.write(uncompressedBytesWithHeader,
233           headerBytes.length, uncompressedBytesWithHeader.length
234               - headerBytes.length);
235         compressionStream.flush();
236         compressionStream.finish();
237         onDiskBytesWithHeader = compressedByteStream.toByteArray();
238       } else {
239         onDiskBytesWithHeader = uncompressedBytesWithHeader;
240       }
241 
242     }
243 
244     this.blockType = blockType;
245   }
246 
247   @Override
248   public byte[] getOnDiskBytesWithHeader() {
249     return onDiskBytesWithHeader;
250   }
251 
252   @Override
253   public byte[] getUncompressedBytesWithHeader() {
254     return uncompressedBytesWithHeader;
255   }
256 
257   @Override
258   public BlockType getBlockType() {
259     return blockType;
260   }
261 
262   /**
263    * Releases the compressor this writer uses to compress blocks into the
264    * compressor pool.
265    */
266   @Override
267   public void close() {
268     if (compressor != null) {
269       this.fileContext.getCompression().returnCompressor(compressor);
270       compressor = null;
271     }
272   }
273 
274   public DataOutputStream getOutputStreamForEncoder() {
275     return this.dataOut;
276   }
277 
278   @Override
279   public DataBlockEncoding getDataBlockEncoding() {
280     return this.encodingAlgo;
281   }
282 
283   @Override
284   public HFileContext getHFileContext() {
285     return this.fileContext;
286   }
287 
288   public TagCompressionContext getTagCompressionContext() {
289     return tagCompressionContext;
290   }
291 
292   public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
293     this.tagCompressionContext = tagCompressionContext;
294   }
295 }