001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.Iterator;
027
028import org.apache.commons.lang3.NotImplementedException;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
033import org.apache.hadoop.hbase.io.hfile.HFileContext;
034import org.apache.hadoop.hbase.util.ByteBufferUtils;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.io.IOUtils;
037import org.apache.hadoop.io.compress.Compressor;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
042
043/**
044 * Encapsulates a data block compressed using a particular encoding algorithm.
045 * Useful for testing and benchmarking.
046 * This is used only in testing.
047 */
048@InterfaceAudience.Private
049@VisibleForTesting
050public class EncodedDataBlock {
051  private byte[] rawKVs;
052  private ByteBuffer rawBuffer;
053  private DataBlockEncoder dataBlockEncoder;
054
055  private byte[] cachedEncodedData;
056
057  private final HFileBlockEncodingContext encodingCtx;
058  private HFileContext meta;
059
060  /**
061   * Create a buffer which will be encoded using dataBlockEncoder.
062   * @param dataBlockEncoder Algorithm used for compression.
063   * @param encoding encoding type used
064   * @param rawKVs
065   * @param meta
066   */
067  public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
068      byte[] rawKVs, HFileContext meta) {
069    Preconditions.checkNotNull(encoding,
070        "Cannot create encoded data block with null encoder");
071    this.dataBlockEncoder = dataBlockEncoder;
072    encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
073        HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
074    this.rawKVs = rawKVs;
075    this.meta = meta;
076  }
077
078  /**
079   * Provides access to compressed value.
080   * @param headerSize header size of the block.
081   * @return Forwards sequential iterator.
082   */
083  public Iterator<Cell> getIterator(int headerSize) {
084    final int rawSize = rawKVs.length;
085    byte[] encodedDataWithHeader = getEncodedData();
086    int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
087    ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader,
088        bytesToSkip, encodedDataWithHeader.length - bytesToSkip);
089    final DataInputStream dis = new DataInputStream(bais);
090
091    return new Iterator<Cell>() {
092      private ByteBuffer decompressedData = null;
093
094      @Override
095      public boolean hasNext() {
096        if (decompressedData == null) {
097          return rawSize > 0;
098        }
099        return decompressedData.hasRemaining();
100      }
101
102      @Override
103      public Cell next() {
104        if (decompressedData == null) {
105          try {
106            decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
107                .newDataBlockDecodingContext(meta));
108          } catch (IOException e) {
109            throw new RuntimeException("Problem with data block encoder, " +
110                "most likely it requested more bytes than are available.", e);
111          }
112          decompressedData.rewind();
113        }
114        int offset = decompressedData.position();
115        int klen = decompressedData.getInt();
116        int vlen = decompressedData.getInt();
117        int tagsLen = 0;
118        ByteBufferUtils.skip(decompressedData, klen + vlen);
119        // Read the tag length in case when steam contain tags
120        if (meta.isIncludesTags()) {
121          tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
122          ByteBufferUtils.skip(decompressedData, tagsLen);
123        }
124        KeyValue kv = new KeyValue(decompressedData.array(), offset,
125            (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
126        if (meta.isIncludesMvcc()) {
127          long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
128          kv.setSequenceId(mvccVersion);
129        }
130        return kv;
131      }
132
133      @Override
134      public void remove() {
135        throw new NotImplementedException("remove() is not supported!");
136      }
137
138      @Override
139      public String toString() {
140        return "Iterator of: " + dataBlockEncoder.getClass().getName();
141      }
142
143    };
144  }
145
146  /**
147   * Find the size of minimal buffer that could store compressed data.
148   * @return Size in bytes of compressed data.
149   */
150  public int getSize() {
151    return getEncodedData().length;
152  }
153
154  /**
155   * Find the size of compressed data assuming that buffer will be compressed
156   * using given algorithm.
157   * @param algo compression algorithm
158   * @param compressor compressor already requested from codec
159   * @param inputBuffer Array to be compressed.
160   * @param offset Offset to beginning of the data.
161   * @param length Length to be compressed.
162   * @return Size of compressed data in bytes.
163   * @throws IOException
164   */
165  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
166       justification="No sure what findbugs wants but looks to me like no NPE")
167  public static int getCompressedSize(Algorithm algo, Compressor compressor,
168      byte[] inputBuffer, int offset, int length) throws IOException {
169
170    // Create streams
171    // Storing them so we can close them
172    final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
173    final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream);
174    OutputStream compressingStream = null;
175
176
177    try {
178      if (compressor != null) {
179        compressor.reset();
180      }
181
182      compressingStream = algo.createCompressionStream(compressedStream, compressor, 0);
183
184      compressingStream.write(inputBuffer, offset, length);
185      compressingStream.flush();
186
187      return compressedStream.size();
188    } finally {
189      nullOutputStream.close();
190      compressedStream.close();
191      compressingStream.close();
192    }
193  }
194
195  /**
196   * Estimate size after second stage of compression (e.g. LZO).
197   * @param comprAlgo compression algorithm to be used for compression
198   * @param compressor compressor corresponding to the given compression
199   *          algorithm
200   * @return Size after second stage of compression.
201   */
202  public int getEncodedCompressedSize(Algorithm comprAlgo,
203      Compressor compressor) throws IOException {
204    byte[] compressedBytes = getEncodedData();
205    return getCompressedSize(comprAlgo, compressor, compressedBytes, 0,
206        compressedBytes.length);
207  }
208
209  /** @return encoded data with header */
210  private byte[] getEncodedData() {
211    if (cachedEncodedData != null) {
212      return cachedEncodedData;
213    }
214    cachedEncodedData = encodeData();
215    return cachedEncodedData;
216  }
217
218  private ByteBuffer getUncompressedBuffer() {
219    if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
220      rawBuffer = ByteBuffer.wrap(rawKVs);
221    }
222    return rawBuffer;
223  }
224
225  /**
226   * Do the encoding, but do not cache the encoded data.
227   * @return encoded data block with header and checksum
228   */
229  public byte[] encodeData() {
230    ByteArrayOutputStream baos = new ByteArrayOutputStream();
231    byte [] baosBytes = null;
232    try {
233      baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
234      DataOutputStream out = new DataOutputStream(baos);
235      this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
236      ByteBuffer in = getUncompressedBuffer();
237      in.rewind();
238      int klength, vlength;
239      int tagsLength = 0;
240      long memstoreTS = 0L;
241      KeyValue kv = null;
242      while (in.hasRemaining()) {
243        int kvOffset = in.position();
244        klength = in.getInt();
245        vlength = in.getInt();
246        ByteBufferUtils.skip(in, klength + vlength);
247        if (this.meta.isIncludesTags()) {
248          tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
249          ByteBufferUtils.skip(in, tagsLength);
250        }
251        if (this.meta.isIncludesMvcc()) {
252          memstoreTS = ByteBufferUtils.readVLong(in);
253        }
254        kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
255            klength, vlength, tagsLength));
256        kv.setSequenceId(memstoreTS);
257        this.dataBlockEncoder.encode(kv, encodingCtx, out);
258      }
259      // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far.
260      baos.flush();
261      baosBytes = baos.toByteArray();
262      this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes);
263    } catch (IOException e) {
264      throw new RuntimeException(String.format(
265          "Bug in encoding part of algorithm %s. " +
266          "Probably it requested more bytes than are available.",
267          toString()), e);
268    }
269    return baosBytes;
270  }
271
272  @Override
273  public String toString() {
274    return dataBlockEncoder.toString();
275  }
276}