001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import org.apache.commons.io.IOUtils;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.io.ByteBuffInputStream;
026import org.apache.hadoop.hbase.io.TagCompressionContext;
027import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor;
028import org.apache.hadoop.hbase.io.compress.CanReinit;
029import org.apache.hadoop.hbase.io.compress.Compression;
030import org.apache.hadoop.hbase.io.crypto.Cipher;
031import org.apache.hadoop.hbase.io.crypto.Decryptor;
032import org.apache.hadoop.hbase.io.crypto.Encryption;
033import org.apache.hadoop.hbase.io.hfile.HFileContext;
034import org.apache.hadoop.hbase.io.util.BlockIOUtils;
035import org.apache.hadoop.hbase.nio.ByteBuff;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.io.compress.Decompressor;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * A default implementation of {@link HFileBlockDecodingContext}. It assumes the block data section
042 * is compressed as a whole.
043 * @see HFileBlockDefaultEncodingContext for the default compression context
044 */
045@InterfaceAudience.Private
046public class HFileBlockDefaultDecodingContext implements HFileBlockDecodingContext {
047
048  private final Configuration conf;
049  private final HFileContext fileContext;
050  private TagCompressionContext tagCompressionContext;
051
052  public HFileBlockDefaultDecodingContext(Configuration conf, HFileContext fileContext) {
053    this.conf = conf;
054    this.fileContext = fileContext;
055  }
056
057  @Override
058  public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
059    ByteBuff blockBufferWithoutHeader, ByteBuff onDiskBlock) throws IOException {
060
061    // If possible, use the ByteBuffer decompression mechanism to avoid extra copies.
062    if (canDecompressViaByteBuff(blockBufferWithoutHeader, onDiskBlock)) {
063      decompressViaByteBuff(blockBufferWithoutHeader, onDiskBlock, onDiskSizeWithoutHeader);
064      return;
065    }
066
067    final ByteBuffInputStream byteBuffInputStream = new ByteBuffInputStream(onDiskBlock);
068    InputStream dataInputStream = new DataInputStream(byteBuffInputStream);
069
070    try {
071      Encryption.Context cryptoContext = fileContext.getEncryptionContext();
072      if (cryptoContext != Encryption.Context.NONE) {
073
074        Cipher cipher = cryptoContext.getCipher();
075        Decryptor decryptor = cipher.getDecryptor();
076        decryptor.setKey(cryptoContext.getKey());
077
078        // Encrypted block format:
079        // +--------------------------+
080        // | byte iv length |
081        // +--------------------------+
082        // | iv data ... |
083        // +--------------------------+
084        // | encrypted block data ... |
085        // +--------------------------+
086
087        int ivLength = dataInputStream.read();
088        if (ivLength > 0) {
089          byte[] iv = new byte[ivLength];
090          IOUtils.readFully(dataInputStream, iv);
091          decryptor.setIv(iv);
092          // All encrypted blocks will have a nonzero IV length. If we see an IV
093          // length of zero, this means the encoding context had 0 bytes of
094          // plaintext to encode.
095          decryptor.reset();
096          dataInputStream = decryptor.createDecryptionStream(dataInputStream);
097        }
098        onDiskSizeWithoutHeader -= Bytes.SIZEOF_BYTE + ivLength;
099      }
100
101      Compression.Algorithm compression = fileContext.getCompression();
102      if (compression != Compression.Algorithm.NONE) {
103        Decompressor decompressor = null;
104        try {
105          decompressor = compression.getDecompressor();
106          // Some algorithms don't return decompressors and accept null as a valid parameter for
107          // same when creating decompression streams. We can ignore these cases wrt reinit.
108          if (decompressor instanceof CanReinit) {
109            ((CanReinit) decompressor).reinit(conf);
110          }
111          try (InputStream is =
112            compression.createDecompressionStream(dataInputStream, decompressor, 0)) {
113            BlockIOUtils.readFullyWithHeapBuffer(is, blockBufferWithoutHeader,
114              uncompressedSizeWithoutHeader);
115          }
116        } finally {
117          if (decompressor != null) {
118            compression.returnDecompressor(decompressor);
119          }
120        }
121      } else {
122        BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader,
123          onDiskSizeWithoutHeader);
124      }
125    } finally {
126      byteBuffInputStream.close();
127      dataInputStream.close();
128    }
129  }
130
131  /**
132   * When only decompression is needed (not decryption), and the input and output buffers are
133   * SingleByteBuffs, and the decompression algorithm supports it, we can do decompression without
134   * any intermediate heap buffers. Do not call unless you've checked
135   * {@link #canDecompressViaByteBuff} first.
136   */
137  private void decompressViaByteBuff(ByteBuff blockBufferWithoutHeader, ByteBuff onDiskBlock,
138    int onDiskSizeWithoutHeader) throws IOException {
139    Compression.Algorithm compression = fileContext.getCompression();
140    ByteBuffDecompressor decompressor = compression.getByteBuffDecompressor();
141    try {
142      if (decompressor instanceof CanReinit) {
143        ((CanReinit) decompressor).reinit(conf);
144      }
145      decompressor.decompress(blockBufferWithoutHeader, onDiskBlock, onDiskSizeWithoutHeader);
146    } finally {
147      compression.returnByteBuffDecompressor(decompressor);
148    }
149  }
150
151  private boolean canDecompressViaByteBuff(ByteBuff blockBufferWithoutHeader,
152    ByteBuff onDiskBlock) {
153    // Theoretically we can do ByteBuff decompression after doing streaming decryption, but the
154    // refactoring necessary to support this has not been attempted. For now, we skip ByteBuff
155    // decompression if the input is encrypted.
156    if (fileContext.getEncryptionContext() != Encryption.Context.NONE) {
157      return false;
158    } else if (!fileContext.getCompression().supportsByteBuffDecompression()) {
159      return false;
160    } else {
161      ByteBuffDecompressor decompressor = fileContext.getCompression().getByteBuffDecompressor();
162      try {
163        if (decompressor instanceof CanReinit) {
164          ((CanReinit) decompressor).reinit(conf);
165        }
166        // Even if we have a ByteBuffDecompressor, we still need to check if it can decompress
167        // our particular ByteBuffs
168        return decompressor.canDecompress(blockBufferWithoutHeader, onDiskBlock);
169      } finally {
170        fileContext.getCompression().returnByteBuffDecompressor(decompressor);
171      }
172    }
173  }
174
175  @Override
176  public HFileContext getHFileContext() {
177    return this.fileContext;
178  }
179
180  public TagCompressionContext getTagCompressionContext() {
181    return tagCompressionContext;
182  }
183
184  public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
185    this.tagCompressionContext = tagCompressionContext;
186  }
187}