001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 020 021import java.io.ByteArrayInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.security.SecureRandom; 026 027import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 028import org.apache.hadoop.hbase.io.TagCompressionContext; 029import org.apache.hadoop.hbase.io.compress.Compression; 030import org.apache.hadoop.hbase.io.crypto.Cipher; 031import org.apache.hadoop.hbase.io.crypto.Encryption; 032import org.apache.hadoop.hbase.io.crypto.Encryptor; 033import org.apache.hadoop.hbase.io.hfile.BlockType; 034import org.apache.hadoop.hbase.io.hfile.HFileContext; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.io.compress.CompressionOutputStream; 037import org.apache.hadoop.io.compress.Compressor; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041 042/** 043 * A default implementation of {@link HFileBlockEncodingContext}. It will 044 * compress the data section as one continuous buffer. 045 * 046 * @see HFileBlockDefaultDecodingContext for the decompression part 047 * 048 */ 049@InterfaceAudience.Private 050public class HFileBlockDefaultEncodingContext implements 051 HFileBlockEncodingContext { 052 private BlockType blockType; 053 private final DataBlockEncoding encodingAlgo; 054 055 private byte[] dummyHeader; 056 057 // Compression state 058 059 /** Compressor, which is also reused between consecutive blocks. */ 060 private Compressor compressor; 061 /** Compression output stream */ 062 private CompressionOutputStream compressionStream; 063 /** Underlying stream to write compressed bytes to */ 064 private ByteArrayOutputStream compressedByteStream; 065 066 private HFileContext fileContext; 067 private TagCompressionContext tagCompressionContext; 068 069 // Encryption state 070 071 /** Underlying stream to write encrypted bytes to */ 072 private ByteArrayOutputStream cryptoByteStream; 073 /** Initialization vector */ 074 private byte[] iv; 075 076 private EncodingState encoderState; 077 078 /** 079 * @param encoding encoding used 080 * @param headerBytes dummy header bytes 081 * @param fileContext HFile meta data 082 */ 083 public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes, 084 HFileContext fileContext) { 085 this.encodingAlgo = encoding; 086 this.fileContext = fileContext; 087 Compression.Algorithm compressionAlgorithm = 088 fileContext.getCompression() == null ? NONE : fileContext.getCompression(); 089 if (compressionAlgorithm != NONE) { 090 compressor = compressionAlgorithm.getCompressor(); 091 compressedByteStream = new ByteArrayOutputStream(); 092 try { 093 compressionStream = 094 compressionAlgorithm.createPlainCompressionStream( 095 compressedByteStream, compressor); 096 } catch (IOException e) { 097 throw new RuntimeException( 098 "Could not create compression stream for algorithm " 099 + compressionAlgorithm, e); 100 } 101 } 102 103 Encryption.Context cryptoContext = fileContext.getEncryptionContext(); 104 if (cryptoContext != Encryption.Context.NONE) { 105 cryptoByteStream = new ByteArrayOutputStream(); 106 iv = new byte[cryptoContext.getCipher().getIvLength()]; 107 new SecureRandom().nextBytes(iv); 108 } 109 110 dummyHeader = Preconditions.checkNotNull(headerBytes, 111 "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes"); 112 } 113 114 /** 115 * prepare to start a new encoding. 116 * @throws IOException 117 */ 118 public void prepareEncoding(DataOutputStream out) throws IOException { 119 if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) { 120 encodingAlgo.writeIdInBytes(out); 121 } 122 } 123 124 @Override 125 public void postEncoding(BlockType blockType) 126 throws IOException { 127 this.blockType = blockType; 128 } 129 130 @Override 131 public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException { 132 return compressAfterEncoding(data, offset, length, dummyHeader); 133 } 134 135 private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer, 136 int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength, byte[] headerBytes) 137 throws IOException { 138 Encryption.Context cryptoContext = fileContext.getEncryptionContext(); 139 if (cryptoContext != Encryption.Context.NONE) { 140 141 // Encrypted block format: 142 // +--------------------------+ 143 // | byte iv length | 144 // +--------------------------+ 145 // | iv data ... | 146 // +--------------------------+ 147 // | encrypted block data ... | 148 // +--------------------------+ 149 150 cryptoByteStream.reset(); 151 // Write the block header (plaintext) 152 cryptoByteStream.write(headerBytes); 153 154 InputStream in; 155 int plaintextLength; 156 // Run any compression before encryption 157 if (fileContext.getCompression() != Compression.Algorithm.NONE) { 158 compressedByteStream.reset(); 159 compressionStream.resetState(); 160 compressionStream.write(uncompressedBytesWithHeaderBuffer, 161 headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength - headerBytes.length); 162 compressionStream.flush(); 163 compressionStream.finish(); 164 byte[] plaintext = compressedByteStream.toByteArray(); 165 plaintextLength = plaintext.length; 166 in = new ByteArrayInputStream(plaintext); 167 } else { 168 plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length; 169 in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer, 170 headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength); 171 } 172 173 if (plaintextLength > 0) { 174 175 // Set up the cipher 176 Cipher cipher = cryptoContext.getCipher(); 177 Encryptor encryptor = cipher.getEncryptor(); 178 encryptor.setKey(cryptoContext.getKey()); 179 180 // Set up the IV 181 int ivLength = iv.length; 182 Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range"); 183 cryptoByteStream.write(ivLength); 184 if (ivLength > 0) { 185 encryptor.setIv(iv); 186 cryptoByteStream.write(iv); 187 } 188 189 // Encrypt the data 190 Encryption.encrypt(cryptoByteStream, in, encryptor); 191 192 // Increment the IV given the final block size 193 Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize())); 194 return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); 195 } else { 196 197 cryptoByteStream.write(0); 198 return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); 199 } 200 201 } else { 202 203 if (this.fileContext.getCompression() != NONE) { 204 compressedByteStream.reset(); 205 compressedByteStream.write(headerBytes); 206 compressionStream.resetState(); 207 compressionStream.write(uncompressedBytesWithHeaderBuffer, 208 headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength 209 - headerBytes.length); 210 compressionStream.flush(); 211 compressionStream.finish(); 212 return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size()); 213 } else { 214 return null; 215 } 216 } 217 } 218 219 @Override 220 public BlockType getBlockType() { 221 return blockType; 222 } 223 224 /** 225 * Releases the compressor this writer uses to compress blocks into the 226 * compressor pool. 227 */ 228 @Override 229 public void close() { 230 if (compressor != null) { 231 this.fileContext.getCompression().returnCompressor(compressor); 232 compressor = null; 233 } 234 } 235 236 @Override 237 public DataBlockEncoding getDataBlockEncoding() { 238 return this.encodingAlgo; 239 } 240 241 @Override 242 public HFileContext getHFileContext() { 243 return this.fileContext; 244 } 245 246 public TagCompressionContext getTagCompressionContext() { 247 return tagCompressionContext; 248 } 249 250 public void setTagCompressionContext(TagCompressionContext tagCompressionContext) { 251 this.tagCompressionContext = tagCompressionContext; 252 } 253 254 @Override 255 public EncodingState getEncodingState() { 256 return this.encoderState; 257 } 258 259 @Override 260 public void setEncodingState(EncodingState state) { 261 this.encoderState = state; 262 } 263}