View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.security.SecureRandom;
27  
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.io.TagCompressionContext;
30  import org.apache.hadoop.hbase.io.compress.Compression;
31  import org.apache.hadoop.hbase.io.crypto.Cipher;
32  import org.apache.hadoop.hbase.io.crypto.Encryption;
33  import org.apache.hadoop.hbase.io.crypto.Encryptor;
34  import org.apache.hadoop.hbase.io.hfile.BlockType;
35  import org.apache.hadoop.hbase.io.hfile.HFileContext;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  
39  import com.google.common.base.Preconditions;
40  
41  /**
42   * A default implementation of {@link HFileBlockEncodingContext}. It will
43   * compress the data section as one continuous buffer.
44   *
45   * @see HFileBlockDefaultDecodingContext for the decompression part
46   *
47   */
48  @InterfaceAudience.Private
49  public class HFileBlockDefaultEncodingContext implements
50      HFileBlockEncodingContext {
51    private byte[] onDiskBytesWithHeader;
52    private BlockType blockType;
53    private final DataBlockEncoding encodingAlgo;
54  
55    private byte[] dummyHeader;
56  
57    // Compression state
58  
59    /** Compressor, which is also reused between consecutive blocks. */
60    private Compressor compressor;
61    /** Compression output stream */
62    private CompressionOutputStream compressionStream;
63    /** Underlying stream to write compressed bytes to */
64    private ByteArrayOutputStream compressedByteStream;
65  
66    private HFileContext fileContext;
67    private TagCompressionContext tagCompressionContext;
68  
69    // Encryption state
70  
71    /** Underlying stream to write encrypted bytes to */
72    private ByteArrayOutputStream cryptoByteStream;
73    /** Initialization vector */
74    private byte[] iv;
75  
76    private EncodingState encoderState;
77  
78    /**
79     * @param encoding encoding used
80     * @param headerBytes dummy header bytes
81     * @param fileContext HFile meta data
82     */
83    public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
84        HFileContext fileContext) {
85      this.encodingAlgo = encoding;
86      this.fileContext = fileContext;
87      Compression.Algorithm compressionAlgorithm =
88          fileContext.getCompression() == null ? NONE : fileContext.getCompression();
89      if (compressionAlgorithm != NONE) {
90        compressor = compressionAlgorithm.getCompressor();
91        compressedByteStream = new ByteArrayOutputStream();
92        try {
93          compressionStream =
94              compressionAlgorithm.createPlainCompressionStream(
95                  compressedByteStream, compressor);
96        } catch (IOException e) {
97          throw new RuntimeException(
98              "Could not create compression stream for algorithm "
99                  + compressionAlgorithm, e);
100       }
101     }
102 
103     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
104     if (cryptoContext != Encryption.Context.NONE) {
105       cryptoByteStream = new ByteArrayOutputStream();
106       iv = new byte[cryptoContext.getCipher().getIvLength()];
107       new SecureRandom().nextBytes(iv);
108     }
109 
110     dummyHeader = Preconditions.checkNotNull(headerBytes,
111       "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
112   }
113 
114   /**
115    * prepare to start a new encoding.
116    * @throws IOException
117    */
118   public void prepareEncoding(DataOutputStream out) throws IOException {
119     if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
120       encodingAlgo.writeIdInBytes(out);
121     }
122   }
123 
124   @Override
125   public void postEncoding(BlockType blockType)
126       throws IOException {
127     this.blockType = blockType;
128   }
129 
130   @Override
131   public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException {
132     compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader);
133     return onDiskBytesWithHeader;
134   }
135 
136   /**
137    * @param uncompressedBytesWithHeader
138    * @param headerBytes
139    * @throws IOException
140    */
141   protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes)
142       throws IOException {
143     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
144     if (cryptoContext != Encryption.Context.NONE) {
145 
146       // Encrypted block format:
147       // +--------------------------+
148       // | byte iv length           |
149       // +--------------------------+
150       // | iv data ...              |
151       // +--------------------------+
152       // | encrypted block data ... |
153       // +--------------------------+
154 
155       cryptoByteStream.reset();
156       // Write the block header (plaintext)
157       cryptoByteStream.write(headerBytes);
158 
159       InputStream in;
160       int plaintextLength;
161       // Run any compression before encryption
162       if (fileContext.getCompression() != Compression.Algorithm.NONE) {
163         compressedByteStream.reset();
164         compressionStream.resetState();
165         compressionStream.write(uncompressedBytesWithHeader,
166             headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length);
167         compressionStream.flush();
168         compressionStream.finish();
169         byte[] plaintext = compressedByteStream.toByteArray();
170         plaintextLength = plaintext.length;
171         in = new ByteArrayInputStream(plaintext);
172       } else {
173         plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length;
174         in = new ByteArrayInputStream(uncompressedBytesWithHeader,
175           headerBytes.length, plaintextLength);
176       }
177 
178       if (plaintextLength > 0) {
179 
180         // Set up the cipher
181         Cipher cipher = cryptoContext.getCipher();
182         Encryptor encryptor = cipher.getEncryptor();
183         encryptor.setKey(cryptoContext.getKey());
184 
185         // Set up the IV
186         int ivLength = iv.length;
187         Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range");
188         cryptoByteStream.write(ivLength);
189         if (ivLength > 0) {
190           Encryption.incrementIv(iv);
191           encryptor.setIv(iv);
192           cryptoByteStream.write(iv);
193         }
194 
195         // Encrypt the data
196         Encryption.encrypt(cryptoByteStream, in, encryptor);
197 
198         onDiskBytesWithHeader = cryptoByteStream.toByteArray();
199 
200       } else {
201 
202         cryptoByteStream.write(0);
203         onDiskBytesWithHeader = cryptoByteStream.toByteArray();
204 
205       }
206 
207     } else {
208 
209       if (this.fileContext.getCompression() != NONE) {
210         compressedByteStream.reset();
211         compressedByteStream.write(headerBytes);
212         compressionStream.resetState();
213         compressionStream.write(uncompressedBytesWithHeader,
214           headerBytes.length, uncompressedBytesWithHeader.length
215               - headerBytes.length);
216         compressionStream.flush();
217         compressionStream.finish();
218         onDiskBytesWithHeader = compressedByteStream.toByteArray();
219       } else {
220         onDiskBytesWithHeader = uncompressedBytesWithHeader;
221       }
222     }
223   }
224 
225   @Override
226   public BlockType getBlockType() {
227     return blockType;
228   }
229 
230   /**
231    * Releases the compressor this writer uses to compress blocks into the
232    * compressor pool.
233    */
234   @Override
235   public void close() {
236     if (compressor != null) {
237       this.fileContext.getCompression().returnCompressor(compressor);
238       compressor = null;
239     }
240   }
241 
242   @Override
243   public DataBlockEncoding getDataBlockEncoding() {
244     return this.encodingAlgo;
245   }
246 
247   @Override
248   public HFileContext getHFileContext() {
249     return this.fileContext;
250   }
251 
252   public TagCompressionContext getTagCompressionContext() {
253     return tagCompressionContext;
254   }
255 
256   public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
257     this.tagCompressionContext = tagCompressionContext;
258   }
259 
260   @Override
261   public EncodingState getEncodingState() {
262     return this.encoderState;
263   }
264 
265   @Override
266   public void setEncodingState(EncodingState state) {
267     this.encoderState = state;
268   }
269 }