001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
020import java.io.ByteArrayInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.security.SecureRandom;
025import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
026import org.apache.hadoop.hbase.io.TagCompressionContext;
027import org.apache.hadoop.hbase.io.compress.Compression;
028import org.apache.hadoop.hbase.io.crypto.Cipher;
029import org.apache.hadoop.hbase.io.crypto.Encryption;
030import org.apache.hadoop.hbase.io.crypto.Encryptor;
031import org.apache.hadoop.hbase.io.hfile.BlockType;
032import org.apache.hadoop.hbase.io.hfile.HFileContext;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
038
039/**
040 * A default implementation of {@link HFileBlockEncodingContext}. It will
041 * compress the data section as one continuous buffer.
042 *
043 * @see HFileBlockDefaultDecodingContext for the decompression part
044 *
045 */
046@InterfaceAudience.Private
047public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingContext {
048  private BlockType blockType;
049  private final DataBlockEncoding encodingAlgo;
050
051  private byte[] dummyHeader;
052
053  // Compression state
054
055  /** Compressor, which is also reused between consecutive blocks. */
056  private Compressor compressor;
057  /** Compression output stream */
058  private CompressionOutputStream compressionStream;
059  /** Underlying stream to write compressed bytes to */
060  private ByteArrayOutputStream compressedByteStream;
061
062  private HFileContext fileContext;
063  private TagCompressionContext tagCompressionContext;
064
065  // Encryption state
066
067  /** Underlying stream to write encrypted bytes to */
068  private ByteArrayOutputStream cryptoByteStream;
069  /** Initialization vector */
070  private byte[] iv;
071
072  private EncodingState encoderState;
073
074  /**
075   * @param encoding encoding used
076   * @param headerBytes dummy header bytes
077   * @param fileContext HFile meta data
078   */
079  public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
080      HFileContext fileContext) {
081    this.encodingAlgo = encoding;
082    this.fileContext = fileContext;
083    Compression.Algorithm compressionAlgorithm =
084        fileContext.getCompression() == null ? NONE : fileContext.getCompression();
085    if (compressionAlgorithm != NONE) {
086      compressor = compressionAlgorithm.getCompressor();
087      compressedByteStream = new ByteArrayOutputStream();
088      try {
089        compressionStream =
090            compressionAlgorithm.createPlainCompressionStream(
091                compressedByteStream, compressor);
092      } catch (IOException e) {
093        throw new RuntimeException(
094            "Could not create compression stream for algorithm "
095                + compressionAlgorithm, e);
096      }
097    }
098
099    Encryption.Context cryptoContext = fileContext.getEncryptionContext();
100    if (cryptoContext != Encryption.Context.NONE) {
101      cryptoByteStream = new ByteArrayOutputStream();
102      iv = new byte[cryptoContext.getCipher().getIvLength()];
103      new SecureRandom().nextBytes(iv);
104    }
105
106    dummyHeader = Preconditions.checkNotNull(headerBytes,
107      "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
108  }
109
110  /**
111   * prepare to start a new encoding.
112   */
113  public void prepareEncoding(DataOutputStream out) throws IOException {
114    if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
115      encodingAlgo.writeIdInBytes(out);
116    }
117  }
118
119  @Override
120  public void postEncoding(BlockType blockType)
121      throws IOException {
122    this.blockType = blockType;
123  }
124
125  @Override
126  public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException {
127    return compressAfterEncoding(data, offset, length, dummyHeader);
128  }
129
130  private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer,
131        int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength,
132        byte[] headerBytes)
133      throws IOException {
134    Encryption.Context cryptoContext = fileContext.getEncryptionContext();
135    if (cryptoContext != Encryption.Context.NONE) {
136
137      // Encrypted block format:
138      // +--------------------------+
139      // | byte iv length           |
140      // +--------------------------+
141      // | iv data ...              |
142      // +--------------------------+
143      // | encrypted block data ... |
144      // +--------------------------+
145
146      cryptoByteStream.reset();
147      // Write the block header (plaintext)
148      cryptoByteStream.write(headerBytes);
149
150      InputStream in;
151      int plaintextLength;
152      // Run any compression before encryption
153      if (fileContext.getCompression() != Compression.Algorithm.NONE) {
154        compressedByteStream.reset();
155        compressionStream.resetState();
156        compressionStream.write(uncompressedBytesWithHeaderBuffer,
157          headerBytes.length + uncompressedBytesWithHeaderOffset,
158          uncompressedBytesWithHeaderLength - headerBytes.length);
159        compressionStream.flush();
160        compressionStream.finish();
161        byte[] plaintext = compressedByteStream.toByteArray();
162        plaintextLength = plaintext.length;
163        in = new ByteArrayInputStream(plaintext);
164      } else {
165        plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length;
166        in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer,
167          headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength);
168      }
169
170      if (plaintextLength > 0) {
171
172        // Set up the cipher
173        Cipher cipher = cryptoContext.getCipher();
174        Encryptor encryptor = cipher.getEncryptor();
175        encryptor.setKey(cryptoContext.getKey());
176
177        // Set up the IV
178        int ivLength = iv.length;
179        Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range");
180        cryptoByteStream.write(ivLength);
181        if (ivLength > 0) {
182          encryptor.setIv(iv);
183          cryptoByteStream.write(iv);
184        }
185
186        // Encrypt the data
187        Encryption.encrypt(cryptoByteStream, in, encryptor);
188
189        // Increment the IV given the final block size
190        Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize()));
191        return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
192      } else {
193
194        cryptoByteStream.write(0);
195        return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
196      }
197
198    } else {
199
200      if (this.fileContext.getCompression() != NONE) {
201        compressedByteStream.reset();
202        compressedByteStream.write(headerBytes);
203        compressionStream.resetState();
204        compressionStream.write(uncompressedBytesWithHeaderBuffer,
205          headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength
206              - headerBytes.length);
207        compressionStream.flush();
208        compressionStream.finish();
209        return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size());
210      } else {
211        return null;
212      }
213    }
214  }
215
216  @Override
217  public BlockType getBlockType() {
218    return blockType;
219  }
220
221  /**
222   * Releases the compressor this writer uses to compress blocks into the
223   * compressor pool.
224   */
225  @Override
226  public void close() {
227    if (compressor != null) {
228      this.fileContext.getCompression().returnCompressor(compressor);
229      compressor = null;
230    }
231  }
232
233  @Override
234  public DataBlockEncoding getDataBlockEncoding() {
235    return this.encodingAlgo;
236  }
237
238  @Override
239  public HFileContext getHFileContext() {
240    return this.fileContext;
241  }
242
243  public TagCompressionContext getTagCompressionContext() {
244    return tagCompressionContext;
245  }
246
247  public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
248    this.tagCompressionContext = tagCompressionContext;
249  }
250
251  @Override
252  public EncodingState getEncodingState() {
253    return this.encoderState;
254  }
255
256  @Override
257  public void setEncodingState(EncodingState state) {
258    this.encoderState = state;
259  }
260}