001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 020import java.io.ByteArrayInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.security.SecureRandom; 025import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 026import org.apache.hadoop.hbase.io.TagCompressionContext; 027import org.apache.hadoop.hbase.io.compress.Compression; 028import org.apache.hadoop.hbase.io.crypto.Cipher; 029import org.apache.hadoop.hbase.io.crypto.Encryption; 030import org.apache.hadoop.hbase.io.crypto.Encryptor; 031import org.apache.hadoop.hbase.io.hfile.BlockType; 032import org.apache.hadoop.hbase.io.hfile.HFileContext; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038 039/** 040 * A default implementation of {@link HFileBlockEncodingContext}. It will 041 * compress the data section as one continuous buffer. 042 * 043 * @see HFileBlockDefaultDecodingContext for the decompression part 044 * 045 */ 046@InterfaceAudience.Private 047public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingContext { 048 private BlockType blockType; 049 private final DataBlockEncoding encodingAlgo; 050 051 private byte[] dummyHeader; 052 053 // Compression state 054 055 /** Compressor, which is also reused between consecutive blocks. */ 056 private Compressor compressor; 057 /** Compression output stream */ 058 private CompressionOutputStream compressionStream; 059 /** Underlying stream to write compressed bytes to */ 060 private ByteArrayOutputStream compressedByteStream; 061 062 private HFileContext fileContext; 063 private TagCompressionContext tagCompressionContext; 064 065 // Encryption state 066 067 /** Underlying stream to write encrypted bytes to */ 068 private ByteArrayOutputStream cryptoByteStream; 069 /** Initialization vector */ 070 private byte[] iv; 071 072 private EncodingState encoderState; 073 074 /** 075 * @param encoding encoding used 076 * @param headerBytes dummy header bytes 077 * @param fileContext HFile meta data 078 */ 079 public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes, 080 HFileContext fileContext) { 081 this.encodingAlgo = encoding; 082 this.fileContext = fileContext; 083 Compression.Algorithm compressionAlgorithm = 084 fileContext.getCompression() == null ? NONE : fileContext.getCompression(); 085 if (compressionAlgorithm != NONE) { 086 compressor = compressionAlgorithm.getCompressor(); 087 compressedByteStream = new ByteArrayOutputStream(); 088 try { 089 compressionStream = 090 compressionAlgorithm.createPlainCompressionStream( 091 compressedByteStream, compressor); 092 } catch (IOException e) { 093 throw new RuntimeException( 094 "Could not create compression stream for algorithm " 095 + compressionAlgorithm, e); 096 } 097 } 098 099 Encryption.Context cryptoContext = fileContext.getEncryptionContext(); 100 if (cryptoContext != Encryption.Context.NONE) { 101 cryptoByteStream = new ByteArrayOutputStream(); 102 iv = new byte[cryptoContext.getCipher().getIvLength()]; 103 new SecureRandom().nextBytes(iv); 104 } 105 106 dummyHeader = Preconditions.checkNotNull(headerBytes, 107 "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes"); 108 } 109 110 /** 111 * prepare to start a new encoding. 112 */ 113 public void prepareEncoding(DataOutputStream out) throws IOException { 114 if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) { 115 encodingAlgo.writeIdInBytes(out); 116 } 117 } 118 119 @Override 120 public void postEncoding(BlockType blockType) 121 throws IOException { 122 this.blockType = blockType; 123 } 124 125 @Override 126 public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException { 127 return compressAfterEncoding(data, offset, length, dummyHeader); 128 } 129 130 private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer, 131 int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength, 132 byte[] headerBytes) 133 throws IOException { 134 Encryption.Context cryptoContext = fileContext.getEncryptionContext(); 135 if (cryptoContext != Encryption.Context.NONE) { 136 137 // Encrypted block format: 138 // +--------------------------+ 139 // | byte iv length | 140 // +--------------------------+ 141 // | iv data ... | 142 // +--------------------------+ 143 // | encrypted block data ... | 144 // +--------------------------+ 145 146 cryptoByteStream.reset(); 147 // Write the block header (plaintext) 148 cryptoByteStream.write(headerBytes); 149 150 InputStream in; 151 int plaintextLength; 152 // Run any compression before encryption 153 if (fileContext.getCompression() != Compression.Algorithm.NONE) { 154 compressedByteStream.reset(); 155 compressionStream.resetState(); 156 compressionStream.write(uncompressedBytesWithHeaderBuffer, 157 headerBytes.length + uncompressedBytesWithHeaderOffset, 158 uncompressedBytesWithHeaderLength - headerBytes.length); 159 compressionStream.flush(); 160 compressionStream.finish(); 161 byte[] plaintext = compressedByteStream.toByteArray(); 162 plaintextLength = plaintext.length; 163 in = new ByteArrayInputStream(plaintext); 164 } else { 165 plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length; 166 in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer, 167 headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength); 168 } 169 170 if (plaintextLength > 0) { 171 172 // Set up the cipher 173 Cipher cipher = cryptoContext.getCipher(); 174 Encryptor encryptor = cipher.getEncryptor(); 175 encryptor.setKey(cryptoContext.getKey()); 176 177 // Set up the IV 178 int ivLength = iv.length; 179 Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range"); 180 cryptoByteStream.write(ivLength); 181 if (ivLength > 0) { 182 encryptor.setIv(iv); 183 cryptoByteStream.write(iv); 184 } 185 186 // Encrypt the data 187 Encryption.encrypt(cryptoByteStream, in, encryptor); 188 189 // Increment the IV given the final block size 190 Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize())); 191 return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); 192 } else { 193 194 cryptoByteStream.write(0); 195 return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); 196 } 197 198 } else { 199 200 if (this.fileContext.getCompression() != NONE) { 201 compressedByteStream.reset(); 202 compressedByteStream.write(headerBytes); 203 compressionStream.resetState(); 204 compressionStream.write(uncompressedBytesWithHeaderBuffer, 205 headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength 206 - headerBytes.length); 207 compressionStream.flush(); 208 compressionStream.finish(); 209 return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size()); 210 } else { 211 return null; 212 } 213 } 214 } 215 216 @Override 217 public BlockType getBlockType() { 218 return blockType; 219 } 220 221 /** 222 * Releases the compressor this writer uses to compress blocks into the 223 * compressor pool. 224 */ 225 @Override 226 public void close() { 227 if (compressor != null) { 228 this.fileContext.getCompression().returnCompressor(compressor); 229 compressor = null; 230 } 231 } 232 233 @Override 234 public DataBlockEncoding getDataBlockEncoding() { 235 return this.encodingAlgo; 236 } 237 238 @Override 239 public HFileContext getHFileContext() { 240 return this.fileContext; 241 } 242 243 public TagCompressionContext getTagCompressionContext() { 244 return tagCompressionContext; 245 } 246 247 public void setTagCompressionContext(TagCompressionContext tagCompressionContext) { 248 this.tagCompressionContext = tagCompressionContext; 249 } 250 251 @Override 252 public EncodingState getEncodingState() { 253 return this.encoderState; 254 } 255 256 @Override 257 public void setEncodingState(EncodingState state) { 258 this.encoderState = state; 259 } 260}