001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
021
022import java.io.ByteArrayInputStream;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
028import org.apache.hadoop.hbase.io.TagCompressionContext;
029import org.apache.hadoop.hbase.io.compress.Compression;
030import org.apache.hadoop.hbase.io.crypto.Cipher;
031import org.apache.hadoop.hbase.io.crypto.Encryption;
032import org.apache.hadoop.hbase.io.crypto.Encryptor;
033import org.apache.hadoop.hbase.io.hfile.BlockType;
034import org.apache.hadoop.hbase.io.hfile.HFileContext;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.io.compress.CompressionOutputStream;
037import org.apache.hadoop.io.compress.Compressor;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041
042/**
043 * A default implementation of {@link HFileBlockEncodingContext}. It will compress the data section
044 * as one continuous buffer.
045 * @see HFileBlockDefaultDecodingContext for the decompression part
046 */
047@InterfaceAudience.Private
048public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingContext {
049  private BlockType blockType;
050  private final DataBlockEncoding encodingAlgo;
051
052  private byte[] dummyHeader;
053
054  // Compression state
055
056  /** Compressor, which is also reused between consecutive blocks. */
057  private Compressor compressor;
058  /** Compression output stream */
059  private CompressionOutputStream compressionStream;
060  /** Underlying stream to write compressed bytes to */
061  private ByteArrayOutputStream compressedByteStream;
062
063  private HFileContext fileContext;
064  private TagCompressionContext tagCompressionContext;
065
066  // Encryption state
067
068  /** Underlying stream to write encrypted bytes to */
069  private ByteArrayOutputStream cryptoByteStream;
070  /** Initialization vector */
071  private byte[] iv;
072
073  private EncodingState encoderState;
074
075  /**
076   * @param conf        configuraton
077   * @param encoding    encoding used
078   * @param headerBytes dummy header bytes
079   * @param fileContext HFile meta data
080   */
081  public HFileBlockDefaultEncodingContext(Configuration conf, DataBlockEncoding encoding,
082    byte[] headerBytes, HFileContext fileContext) {
083    this.encodingAlgo = encoding;
084    this.fileContext = fileContext;
085    Compression.Algorithm compressionAlgorithm =
086      fileContext.getCompression() == null ? NONE : fileContext.getCompression();
087    if (compressionAlgorithm != NONE) {
088      if (compressor == null) {
089        compressor = compressionAlgorithm.getCompressor();
090        // Some algorithms don't return compressors and accept null as a valid parameter for
091        // same when creating compression streams. We can ignore these cases wrt reinit.
092        if (compressor != null) {
093          compressor.reinit(conf);
094        }
095      }
096      compressedByteStream = new ByteArrayOutputStream();
097      try {
098        compressionStream =
099          compressionAlgorithm.createPlainCompressionStream(compressedByteStream, compressor);
100      } catch (IOException e) {
101        throw new RuntimeException(
102          "Could not create compression stream for algorithm " + compressionAlgorithm, e);
103      }
104    }
105
106    Encryption.Context cryptoContext = fileContext.getEncryptionContext();
107    if (cryptoContext != Encryption.Context.NONE) {
108      cryptoByteStream = new ByteArrayOutputStream();
109      iv = new byte[cryptoContext.getCipher().getIvLength()];
110      Bytes.secureRandom(iv);
111    }
112
113    dummyHeader = Preconditions.checkNotNull(headerBytes,
114      "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
115  }
116
117  /**
118   * prepare to start a new encoding.
119   */
120  public void prepareEncoding(DataOutputStream out) throws IOException {
121    if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
122      encodingAlgo.writeIdInBytes(out);
123    }
124  }
125
126  @Override
127  public void postEncoding(BlockType blockType) throws IOException {
128    this.blockType = blockType;
129  }
130
131  @Override
132  public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException {
133    return compressAfterEncoding(data, offset, length, dummyHeader);
134  }
135
136  private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer,
137    int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength,
138    byte[] headerBytes) throws IOException {
139    Encryption.Context cryptoContext = fileContext.getEncryptionContext();
140    if (cryptoContext != Encryption.Context.NONE) {
141
142      // Encrypted block format:
143      // +--------------------------+
144      // | byte iv length |
145      // +--------------------------+
146      // | iv data ... |
147      // +--------------------------+
148      // | encrypted block data ... |
149      // +--------------------------+
150
151      cryptoByteStream.reset();
152      // Write the block header (plaintext)
153      cryptoByteStream.write(headerBytes);
154
155      InputStream in;
156      int plaintextLength;
157      // Run any compression before encryption
158      if (fileContext.getCompression() != Compression.Algorithm.NONE) {
159        compressedByteStream.reset();
160        compressionStream.resetState();
161        compressionStream.write(uncompressedBytesWithHeaderBuffer,
162          headerBytes.length + uncompressedBytesWithHeaderOffset,
163          uncompressedBytesWithHeaderLength - headerBytes.length);
164        compressionStream.flush();
165        compressionStream.finish();
166        byte[] plaintext = compressedByteStream.toByteArray();
167        plaintextLength = plaintext.length;
168        in = new ByteArrayInputStream(plaintext);
169      } else {
170        plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length;
171        in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer,
172          headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength);
173      }
174
175      if (plaintextLength > 0) {
176
177        // Set up the cipher
178        Cipher cipher = cryptoContext.getCipher();
179        Encryptor encryptor = cipher.getEncryptor();
180        encryptor.setKey(cryptoContext.getKey());
181
182        // Set up the IV
183        int ivLength = iv.length;
184        Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range");
185        cryptoByteStream.write(ivLength);
186        if (ivLength > 0) {
187          encryptor.setIv(iv);
188          cryptoByteStream.write(iv);
189        }
190
191        // Encrypt the data
192        Encryption.encrypt(cryptoByteStream, in, encryptor);
193
194        // Increment the IV given the final block size
195        Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize()));
196        return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
197      } else {
198
199        cryptoByteStream.write(0);
200        return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
201      }
202
203    } else {
204
205      if (this.fileContext.getCompression() != NONE) {
206        compressedByteStream.reset();
207        compressedByteStream.write(headerBytes);
208        compressionStream.resetState();
209        compressionStream.write(uncompressedBytesWithHeaderBuffer,
210          headerBytes.length + uncompressedBytesWithHeaderOffset,
211          uncompressedBytesWithHeaderLength - headerBytes.length);
212        compressionStream.flush();
213        compressionStream.finish();
214        return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size());
215      } else {
216        return null;
217      }
218    }
219  }
220
221  @Override
222  public BlockType getBlockType() {
223    return blockType;
224  }
225
226  /**
227   * Releases the compressor this writer uses to compress blocks into the compressor pool.
228   */
229  @Override
230  public void close() {
231    if (compressor != null) {
232      this.fileContext.getCompression().returnCompressor(compressor);
233      compressor = null;
234    }
235  }
236
237  @Override
238  public DataBlockEncoding getDataBlockEncoding() {
239    return this.encodingAlgo;
240  }
241
242  @Override
243  public HFileContext getHFileContext() {
244    return this.fileContext;
245  }
246
247  public TagCompressionContext getTagCompressionContext() {
248    return tagCompressionContext;
249  }
250
251  public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
252    this.tagCompressionContext = tagCompressionContext;
253  }
254
255  @Override
256  public EncodingState getEncodingState() {
257    return this.encoderState;
258  }
259
260  @Override
261  public void setEncodingState(EncodingState state) {
262    this.encoderState = state;
263  }
264}