001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
020
021import java.io.ByteArrayInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.security.SecureRandom;
026
027import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
028import org.apache.hadoop.hbase.io.TagCompressionContext;
029import org.apache.hadoop.hbase.io.compress.Compression;
030import org.apache.hadoop.hbase.io.crypto.Cipher;
031import org.apache.hadoop.hbase.io.crypto.Encryption;
032import org.apache.hadoop.hbase.io.crypto.Encryptor;
033import org.apache.hadoop.hbase.io.hfile.BlockType;
034import org.apache.hadoop.hbase.io.hfile.HFileContext;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.io.compress.CompressionOutputStream;
037import org.apache.hadoop.io.compress.Compressor;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041
042/**
043 * A default implementation of {@link HFileBlockEncodingContext}. It will
044 * compress the data section as one continuous buffer.
045 *
046 * @see HFileBlockDefaultDecodingContext for the decompression part
047 *
048 */
049@InterfaceAudience.Private
050public class HFileBlockDefaultEncodingContext implements
051    HFileBlockEncodingContext {
052  private BlockType blockType;
053  private final DataBlockEncoding encodingAlgo;
054
055  private byte[] dummyHeader;
056
057  // Compression state
058
059  /** Compressor, which is also reused between consecutive blocks. */
060  private Compressor compressor;
061  /** Compression output stream */
062  private CompressionOutputStream compressionStream;
063  /** Underlying stream to write compressed bytes to */
064  private ByteArrayOutputStream compressedByteStream;
065
066  private HFileContext fileContext;
067  private TagCompressionContext tagCompressionContext;
068
069  // Encryption state
070
071  /** Underlying stream to write encrypted bytes to */
072  private ByteArrayOutputStream cryptoByteStream;
073  /** Initialization vector */
074  private byte[] iv;
075
076  private EncodingState encoderState;
077
078  /**
079   * @param encoding encoding used
080   * @param headerBytes dummy header bytes
081   * @param fileContext HFile meta data
082   */
083  public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
084      HFileContext fileContext) {
085    this.encodingAlgo = encoding;
086    this.fileContext = fileContext;
087    Compression.Algorithm compressionAlgorithm =
088        fileContext.getCompression() == null ? NONE : fileContext.getCompression();
089    if (compressionAlgorithm != NONE) {
090      compressor = compressionAlgorithm.getCompressor();
091      compressedByteStream = new ByteArrayOutputStream();
092      try {
093        compressionStream =
094            compressionAlgorithm.createPlainCompressionStream(
095                compressedByteStream, compressor);
096      } catch (IOException e) {
097        throw new RuntimeException(
098            "Could not create compression stream for algorithm "
099                + compressionAlgorithm, e);
100      }
101    }
102
103    Encryption.Context cryptoContext = fileContext.getEncryptionContext();
104    if (cryptoContext != Encryption.Context.NONE) {
105      cryptoByteStream = new ByteArrayOutputStream();
106      iv = new byte[cryptoContext.getCipher().getIvLength()];
107      new SecureRandom().nextBytes(iv);
108    }
109
110    dummyHeader = Preconditions.checkNotNull(headerBytes,
111      "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
112  }
113
114  /**
115   * prepare to start a new encoding.
116   * @throws IOException
117   */
118  public void prepareEncoding(DataOutputStream out) throws IOException {
119    if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
120      encodingAlgo.writeIdInBytes(out);
121    }
122  }
123
124  @Override
125  public void postEncoding(BlockType blockType)
126      throws IOException {
127    this.blockType = blockType;
128  }
129
130  @Override
131  public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException {
132    return compressAfterEncoding(data, offset, length, dummyHeader);
133  }
134
135  private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer,
136        int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength, byte[] headerBytes)
137      throws IOException {
138    Encryption.Context cryptoContext = fileContext.getEncryptionContext();
139    if (cryptoContext != Encryption.Context.NONE) {
140
141      // Encrypted block format:
142      // +--------------------------+
143      // | byte iv length           |
144      // +--------------------------+
145      // | iv data ...              |
146      // +--------------------------+
147      // | encrypted block data ... |
148      // +--------------------------+
149
150      cryptoByteStream.reset();
151      // Write the block header (plaintext)
152      cryptoByteStream.write(headerBytes);
153
154      InputStream in;
155      int plaintextLength;
156      // Run any compression before encryption
157      if (fileContext.getCompression() != Compression.Algorithm.NONE) {
158        compressedByteStream.reset();
159        compressionStream.resetState();
160        compressionStream.write(uncompressedBytesWithHeaderBuffer,
161            headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength - headerBytes.length);
162        compressionStream.flush();
163        compressionStream.finish();
164        byte[] plaintext = compressedByteStream.toByteArray();
165        plaintextLength = plaintext.length;
166        in = new ByteArrayInputStream(plaintext);
167      } else {
168        plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length;
169        in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer,
170          headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength);
171      }
172
173      if (plaintextLength > 0) {
174
175        // Set up the cipher
176        Cipher cipher = cryptoContext.getCipher();
177        Encryptor encryptor = cipher.getEncryptor();
178        encryptor.setKey(cryptoContext.getKey());
179
180        // Set up the IV
181        int ivLength = iv.length;
182        Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range");
183        cryptoByteStream.write(ivLength);
184        if (ivLength > 0) {
185          encryptor.setIv(iv);
186          cryptoByteStream.write(iv);
187        }
188
189        // Encrypt the data
190        Encryption.encrypt(cryptoByteStream, in, encryptor);
191
192        // Increment the IV given the final block size
193        Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize()));
194        return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
195      } else {
196
197        cryptoByteStream.write(0);
198        return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
199      }
200
201    } else {
202
203      if (this.fileContext.getCompression() != NONE) {
204        compressedByteStream.reset();
205        compressedByteStream.write(headerBytes);
206        compressionStream.resetState();
207        compressionStream.write(uncompressedBytesWithHeaderBuffer,
208          headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength
209              - headerBytes.length);
210        compressionStream.flush();
211        compressionStream.finish();
212        return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size());
213      } else {
214        return null;
215      }
216    }
217  }
218
219  @Override
220  public BlockType getBlockType() {
221    return blockType;
222  }
223
224  /**
225   * Releases the compressor this writer uses to compress blocks into the
226   * compressor pool.
227   */
228  @Override
229  public void close() {
230    if (compressor != null) {
231      this.fileContext.getCompression().returnCompressor(compressor);
232      compressor = null;
233    }
234  }
235
236  @Override
237  public DataBlockEncoding getDataBlockEncoding() {
238    return this.encodingAlgo;
239  }
240
241  @Override
242  public HFileContext getHFileContext() {
243    return this.fileContext;
244  }
245
246  public TagCompressionContext getTagCompressionContext() {
247    return tagCompressionContext;
248  }
249
250  public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
251    this.tagCompressionContext = tagCompressionContext;
252  }
253
254  @Override
255  public EncodingState getEncodingState() {
256    return this.encoderState;
257  }
258
259  @Override
260  public void setEncodingState(EncodingState state) {
261    this.encoderState = state;
262  }
263}