View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.security.SecureRandom;
27  
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.io.TagCompressionContext;
30  import org.apache.hadoop.hbase.io.compress.Compression;
31  import org.apache.hadoop.hbase.io.crypto.Cipher;
32  import org.apache.hadoop.hbase.io.crypto.Encryption;
33  import org.apache.hadoop.hbase.io.crypto.Encryptor;
34  import org.apache.hadoop.hbase.io.hfile.BlockType;
35  import org.apache.hadoop.hbase.io.hfile.HFileContext;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  
39  import com.google.common.base.Preconditions;
40  
41  /**
42   * A default implementation of {@link HFileBlockEncodingContext}. It will
43   * compress the data section as one continuous buffer.
44   *
45   * @see HFileBlockDefaultDecodingContext for the decompression part
46   *
47   */
48  @InterfaceAudience.Private
49  public class HFileBlockDefaultEncodingContext implements
50      HFileBlockEncodingContext {
51    private byte[] onDiskBytesWithHeader;
52    private BlockType blockType;
53    private final DataBlockEncoding encodingAlgo;
54  
55    private byte[] dummyHeader;
56  
57    // Compression state
58  
59    /** Compressor, which is also reused between consecutive blocks. */
60    private Compressor compressor;
61    /** Compression output stream */
62    private CompressionOutputStream compressionStream;
63    /** Underlying stream to write compressed bytes to */
64    private ByteArrayOutputStream compressedByteStream;
65  
66    private HFileContext fileContext;
67    private TagCompressionContext tagCompressionContext;
68  
69    // Encryption state
70  
71    /** Underlying stream to write encrypted bytes to */
72    private ByteArrayOutputStream cryptoByteStream;
73    /** Initialization vector */
74    private byte[] iv;
75  
76    private EncodingState encoderState;
77  
78    /**
79     * @param encoding encoding used
80     * @param headerBytes dummy header bytes
81     * @param fileContext HFile meta data
82     */
83    public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
84        HFileContext fileContext) {
85      this.encodingAlgo = encoding;
86      this.fileContext = fileContext;
87      Compression.Algorithm compressionAlgorithm =
88          fileContext.getCompression() == null ? NONE : fileContext.getCompression();
89      if (compressionAlgorithm != NONE) {
90        compressor = compressionAlgorithm.getCompressor();
91        compressedByteStream = new ByteArrayOutputStream();
92        try {
93          compressionStream =
94              compressionAlgorithm.createPlainCompressionStream(
95                  compressedByteStream, compressor);
96        } catch (IOException e) {
97          throw new RuntimeException(
98              "Could not create compression stream for algorithm "
99                  + compressionAlgorithm, e);
100       }
101     }
102 
103     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
104     if (cryptoContext != Encryption.Context.NONE) {
105       cryptoByteStream = new ByteArrayOutputStream();
106       iv = new byte[cryptoContext.getCipher().getIvLength()];
107       new SecureRandom().nextBytes(iv);
108     }
109 
110     dummyHeader = Preconditions.checkNotNull(headerBytes,
111       "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
112   }
113 
114   /**
115    * prepare to start a new encoding.
116    * @throws IOException
117    */
118   public void prepareEncoding(DataOutputStream out) throws IOException {
119     if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
120       encodingAlgo.writeIdInBytes(out);
121     }
122   }
123 
124   @Override
125   public void postEncoding(BlockType blockType)
126       throws IOException {
127     this.blockType = blockType;
128   }
129 
130   @Override
131   public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException {
132     compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader);
133     return onDiskBytesWithHeader;
134   }
135 
136   /**
137    * @param uncompressedBytesWithHeader
138    * @param headerBytes
139    * @throws IOException
140    */
141   protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes)
142       throws IOException {
143     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
144     if (cryptoContext != Encryption.Context.NONE) {
145 
146       // Encrypted block format:
147       // +--------------------------+
148       // | byte iv length           |
149       // +--------------------------+
150       // | iv data ...              |
151       // +--------------------------+
152       // | encrypted block data ... |
153       // +--------------------------+
154 
155       cryptoByteStream.reset();
156       // Write the block header (plaintext)
157       cryptoByteStream.write(headerBytes);
158 
159       InputStream in;
160       int plaintextLength;
161       // Run any compression before encryption
162       if (fileContext.getCompression() != Compression.Algorithm.NONE) {
163         compressedByteStream.reset();
164         compressionStream.resetState();
165         compressionStream.write(uncompressedBytesWithHeader,
166             headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length);
167         compressionStream.flush();
168         compressionStream.finish();
169         byte[] plaintext = compressedByteStream.toByteArray();
170         plaintextLength = plaintext.length;
171         in = new ByteArrayInputStream(plaintext);
172       } else {
173         plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length;
174         in = new ByteArrayInputStream(uncompressedBytesWithHeader,
175           headerBytes.length, plaintextLength);
176       }
177 
178       if (plaintextLength > 0) {
179 
180         // Set up the cipher
181         Cipher cipher = cryptoContext.getCipher();
182         Encryptor encryptor = cipher.getEncryptor();
183         encryptor.setKey(cryptoContext.getKey());
184 
185         // Set up the IV
186         int ivLength = iv.length;
187         Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range");
188         cryptoByteStream.write(ivLength);
189         if (ivLength > 0) {
190           encryptor.setIv(iv);
191           cryptoByteStream.write(iv);
192         }
193 
194         // Encrypt the data
195         Encryption.encrypt(cryptoByteStream, in, encryptor);
196 
197         onDiskBytesWithHeader = cryptoByteStream.toByteArray();
198 
199         // Increment the IV given the final block size
200         Encryption.incrementIv(iv, 1 + (onDiskBytesWithHeader.length / encryptor.getBlockSize()));
201 
202       } else {
203 
204         cryptoByteStream.write(0);
205         onDiskBytesWithHeader = cryptoByteStream.toByteArray();
206 
207       }
208 
209     } else {
210 
211       if (this.fileContext.getCompression() != NONE) {
212         compressedByteStream.reset();
213         compressedByteStream.write(headerBytes);
214         compressionStream.resetState();
215         compressionStream.write(uncompressedBytesWithHeader,
216           headerBytes.length, uncompressedBytesWithHeader.length
217               - headerBytes.length);
218         compressionStream.flush();
219         compressionStream.finish();
220         onDiskBytesWithHeader = compressedByteStream.toByteArray();
221       } else {
222         onDiskBytesWithHeader = uncompressedBytesWithHeader;
223       }
224     }
225   }
226 
227   @Override
228   public BlockType getBlockType() {
229     return blockType;
230   }
231 
232   /**
233    * Releases the compressor this writer uses to compress blocks into the
234    * compressor pool.
235    */
236   @Override
237   public void close() {
238     if (compressor != null) {
239       this.fileContext.getCompression().returnCompressor(compressor);
240       compressor = null;
241     }
242   }
243 
244   @Override
245   public DataBlockEncoding getDataBlockEncoding() {
246     return this.encodingAlgo;
247   }
248 
249   @Override
250   public HFileContext getHFileContext() {
251     return this.fileContext;
252   }
253 
254   public TagCompressionContext getTagCompressionContext() {
255     return tagCompressionContext;
256   }
257 
258   public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
259     this.tagCompressionContext = tagCompressionContext;
260   }
261 
262   @Override
263   public EncodingState getEncodingState() {
264     return this.encoderState;
265   }
266 
267   @Override
268   public void setEncodingState(EncodingState state) {
269     this.encoderState = state;
270   }
271 }