001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 021 022import java.io.ByteArrayInputStream; 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 028import org.apache.hadoop.hbase.io.TagCompressionContext; 029import org.apache.hadoop.hbase.io.compress.Compression; 030import org.apache.hadoop.hbase.io.crypto.Cipher; 031import org.apache.hadoop.hbase.io.crypto.Encryption; 032import org.apache.hadoop.hbase.io.crypto.Encryptor; 033import org.apache.hadoop.hbase.io.hfile.BlockType; 034import org.apache.hadoop.hbase.io.hfile.HFileContext; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.io.compress.CompressionOutputStream; 037import org.apache.hadoop.io.compress.Compressor; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041 042/** 043 * A default implementation of {@link HFileBlockEncodingContext}. It will compress the data section 044 * as one continuous buffer. 045 * @see HFileBlockDefaultDecodingContext for the decompression part 046 */ 047@InterfaceAudience.Private 048public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingContext { 049 private BlockType blockType; 050 private final DataBlockEncoding encodingAlgo; 051 052 private byte[] dummyHeader; 053 054 // Compression state 055 056 /** Compressor, which is also reused between consecutive blocks. */ 057 private Compressor compressor; 058 /** Compression output stream */ 059 private CompressionOutputStream compressionStream; 060 /** Underlying stream to write compressed bytes to */ 061 private ByteArrayOutputStream compressedByteStream; 062 063 private HFileContext fileContext; 064 private TagCompressionContext tagCompressionContext; 065 066 // Encryption state 067 068 /** Underlying stream to write encrypted bytes to */ 069 private ByteArrayOutputStream cryptoByteStream; 070 /** Initialization vector */ 071 private byte[] iv; 072 073 private EncodingState encoderState; 074 075 /** 076 * @param conf configuraton 077 * @param encoding encoding used 078 * @param headerBytes dummy header bytes 079 * @param fileContext HFile meta data 080 */ 081 public HFileBlockDefaultEncodingContext(Configuration conf, DataBlockEncoding encoding, 082 byte[] headerBytes, HFileContext fileContext) { 083 this.encodingAlgo = encoding; 084 this.fileContext = fileContext; 085 Compression.Algorithm compressionAlgorithm = 086 fileContext.getCompression() == null ? NONE : fileContext.getCompression(); 087 if (compressionAlgorithm != NONE) { 088 if (compressor == null) { 089 compressor = compressionAlgorithm.getCompressor(); 090 // Some algorithms don't return compressors and accept null as a valid parameter for 091 // same when creating compression streams. We can ignore these cases wrt reinit. 092 if (compressor != null) { 093 compressor.reinit(conf); 094 } 095 } 096 compressedByteStream = new ByteArrayOutputStream(); 097 try { 098 compressionStream = 099 compressionAlgorithm.createPlainCompressionStream(compressedByteStream, compressor); 100 } catch (IOException e) { 101 throw new RuntimeException( 102 "Could not create compression stream for algorithm " + compressionAlgorithm, e); 103 } 104 } 105 106 Encryption.Context cryptoContext = fileContext.getEncryptionContext(); 107 if (cryptoContext != Encryption.Context.NONE) { 108 cryptoByteStream = new ByteArrayOutputStream(); 109 iv = new byte[cryptoContext.getCipher().getIvLength()]; 110 Bytes.secureRandom(iv); 111 } 112 113 dummyHeader = Preconditions.checkNotNull(headerBytes, 114 "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes"); 115 } 116 117 /** 118 * prepare to start a new encoding. 119 */ 120 public void prepareEncoding(DataOutputStream out) throws IOException { 121 if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) { 122 encodingAlgo.writeIdInBytes(out); 123 } 124 } 125 126 @Override 127 public void postEncoding(BlockType blockType) throws IOException { 128 this.blockType = blockType; 129 } 130 131 @Override 132 public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException { 133 return compressAfterEncoding(data, offset, length, dummyHeader); 134 } 135 136 private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer, 137 int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength, 138 byte[] headerBytes) throws IOException { 139 Encryption.Context cryptoContext = fileContext.getEncryptionContext(); 140 if (cryptoContext != Encryption.Context.NONE) { 141 142 // Encrypted block format: 143 // +--------------------------+ 144 // | byte iv length | 145 // +--------------------------+ 146 // | iv data ... | 147 // +--------------------------+ 148 // | encrypted block data ... | 149 // +--------------------------+ 150 151 cryptoByteStream.reset(); 152 // Write the block header (plaintext) 153 cryptoByteStream.write(headerBytes); 154 155 InputStream in; 156 int plaintextLength; 157 // Run any compression before encryption 158 if (fileContext.getCompression() != Compression.Algorithm.NONE) { 159 compressedByteStream.reset(); 160 compressionStream.resetState(); 161 compressionStream.write(uncompressedBytesWithHeaderBuffer, 162 headerBytes.length + uncompressedBytesWithHeaderOffset, 163 uncompressedBytesWithHeaderLength - headerBytes.length); 164 compressionStream.flush(); 165 compressionStream.finish(); 166 byte[] plaintext = compressedByteStream.toByteArray(); 167 plaintextLength = plaintext.length; 168 in = new ByteArrayInputStream(plaintext); 169 } else { 170 plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length; 171 in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer, 172 headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength); 173 } 174 175 if (plaintextLength > 0) { 176 177 // Set up the cipher 178 Cipher cipher = cryptoContext.getCipher(); 179 Encryptor encryptor = cipher.getEncryptor(); 180 encryptor.setKey(cryptoContext.getKey()); 181 182 // Set up the IV 183 int ivLength = iv.length; 184 Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range"); 185 cryptoByteStream.write(ivLength); 186 if (ivLength > 0) { 187 encryptor.setIv(iv); 188 cryptoByteStream.write(iv); 189 } 190 191 // Encrypt the data 192 Encryption.encrypt(cryptoByteStream, in, encryptor); 193 194 // Increment the IV given the final block size 195 Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize())); 196 return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); 197 } else { 198 199 cryptoByteStream.write(0); 200 return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); 201 } 202 203 } else { 204 205 if (this.fileContext.getCompression() != NONE) { 206 compressedByteStream.reset(); 207 compressedByteStream.write(headerBytes); 208 compressionStream.resetState(); 209 compressionStream.write(uncompressedBytesWithHeaderBuffer, 210 headerBytes.length + uncompressedBytesWithHeaderOffset, 211 uncompressedBytesWithHeaderLength - headerBytes.length); 212 compressionStream.flush(); 213 compressionStream.finish(); 214 return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size()); 215 } else { 216 return null; 217 } 218 } 219 } 220 221 @Override 222 public BlockType getBlockType() { 223 return blockType; 224 } 225 226 /** 227 * Releases the compressor this writer uses to compress blocks into the compressor pool. 228 */ 229 @Override 230 public void close() { 231 if (compressor != null) { 232 this.fileContext.getCompression().returnCompressor(compressor); 233 compressor = null; 234 } 235 } 236 237 @Override 238 public DataBlockEncoding getDataBlockEncoding() { 239 return this.encodingAlgo; 240 } 241 242 @Override 243 public HFileContext getHFileContext() { 244 return this.fileContext; 245 } 246 247 public TagCompressionContext getTagCompressionContext() { 248 return tagCompressionContext; 249 } 250 251 public void setTagCompressionContext(TagCompressionContext tagCompressionContext) { 252 this.tagCompressionContext = tagCompressionContext; 253 } 254 255 @Override 256 public EncodingState getEncodingState() { 257 return this.encoderState; 258 } 259 260 @Override 261 public void setEncodingState(EncodingState state) { 262 this.encoderState = state; 263 } 264}