001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import org.apache.commons.lang3.NotImplementedException;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
034import org.apache.hadoop.hbase.io.hfile.HFileContext;
035import org.apache.hadoop.hbase.util.ByteBufferUtils;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.io.IOUtils;
038import org.apache.hadoop.io.compress.Compressor;
039import org.apache.yetus.audience.InterfaceAudience;
040
041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
042
043/**
044 * Encapsulates a data block compressed using a particular encoding algorithm.
045 * Useful for testing and benchmarking.
046 * This is used only in testing.
047 */
048@InterfaceAudience.Private
049public class EncodedDataBlock {
050  private byte[] rawKVs;
051  private ByteBuffer rawBuffer;
052  private DataBlockEncoder dataBlockEncoder;
053
054  private byte[] cachedEncodedData;
055
056  private final HFileBlockEncodingContext encodingCtx;
057  private HFileContext meta;
058
059  private final DataBlockEncoding encoding;
060
061  // The is for one situation that there are some cells includes tags and others are not.
062  // isTagsLenZero stores if cell tags length is zero before doing encoding since we need
063  // to check cell tags length is zero or not after decoding.
064  // Encoders ROW_INDEX_V1 would abandon tags segment if tags is 0 after decode cells to
065  // byte array, other encoders won't do that. So we have to find a way to add tagsLen zero
066  // in the decoded byte array.
067  private List<Boolean> isTagsLenZero = new ArrayList<>();
068
069  /**
070   * Create a buffer which will be encoded using dataBlockEncoder.
071   * @param dataBlockEncoder Algorithm used for compression.
072   * @param encoding encoding type used
073   * @param rawKVs
074   * @param meta
075   */
076  public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
077      byte[] rawKVs, HFileContext meta) {
078    Preconditions.checkNotNull(encoding,
079        "Cannot create encoded data block with null encoder");
080    this.dataBlockEncoder = dataBlockEncoder;
081    this.encoding = encoding;
082    encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
083        HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
084    this.rawKVs = rawKVs;
085    this.meta = meta;
086  }
087
088  /**
089   * Provides access to compressed value.
090   * @param headerSize header size of the block.
091   * @return Forwards sequential iterator.
092   */
093  public Iterator<Cell> getIterator(int headerSize) {
094    final int rawSize = rawKVs.length;
095    byte[] encodedDataWithHeader = getEncodedData();
096    int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
097    ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader,
098        bytesToSkip, encodedDataWithHeader.length - bytesToSkip);
099    final DataInputStream dis = new DataInputStream(bais);
100
101    return new Iterator<Cell>() {
102      private ByteBuffer decompressedData = null;
103      private Iterator<Boolean> it = isTagsLenZero.iterator();
104
105      @Override
106      public boolean hasNext() {
107        if (decompressedData == null) {
108          return rawSize > 0;
109        }
110        return decompressedData.hasRemaining();
111      }
112
113      @Override
114      public Cell next() {
115        if (decompressedData == null) {
116          try {
117            decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
118                .newDataBlockDecodingContext(meta));
119          } catch (IOException e) {
120            throw new RuntimeException("Problem with data block encoder, " +
121                "most likely it requested more bytes than are available.", e);
122          }
123          decompressedData.rewind();
124        }
125        int offset = decompressedData.position();
126        int klen = decompressedData.getInt();
127        int vlen = decompressedData.getInt();
128        int tagsLen = 0;
129        ByteBufferUtils.skip(decompressedData, klen + vlen);
130        // Read the tag length in case when stream contain tags
131        if (meta.isIncludesTags()) {
132          boolean noTags = true;
133          if (it.hasNext()) {
134            noTags = it.next();
135          }
136          // ROW_INDEX_V1 will not put tagsLen back in cell if it is zero, there is no need
137          // to read short here.
138          if (!(encoding.equals(DataBlockEncoding.ROW_INDEX_V1) && noTags)) {
139            tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
140            ByteBufferUtils.skip(decompressedData, tagsLen);
141          }
142        }
143        KeyValue kv = new KeyValue(decompressedData.array(), offset,
144            (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
145        if (meta.isIncludesMvcc()) {
146          long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
147          kv.setSequenceId(mvccVersion);
148        }
149        return kv;
150      }
151
152      @Override
153      public void remove() {
154        throw new NotImplementedException("remove() is not supported!");
155      }
156
157      @Override
158      public String toString() {
159        return "Iterator of: " + dataBlockEncoder.getClass().getName();
160      }
161
162    };
163  }
164
165  /**
166   * Find the size of minimal buffer that could store compressed data.
167   * @return Size in bytes of compressed data.
168   */
169  public int getSize() {
170    return getEncodedData().length;
171  }
172
173  /**
174   * Find the size of compressed data assuming that buffer will be compressed
175   * using given algorithm.
176   * @param algo compression algorithm
177   * @param compressor compressor already requested from codec
178   * @param inputBuffer Array to be compressed.
179   * @param offset Offset to beginning of the data.
180   * @param length Length to be compressed.
181   * @return Size of compressed data in bytes.
182   * @throws IOException
183   */
184  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
185       justification="No sure what findbugs wants but looks to me like no NPE")
186  public static int getCompressedSize(Algorithm algo, Compressor compressor,
187      byte[] inputBuffer, int offset, int length) throws IOException {
188
189    // Create streams
190    // Storing them so we can close them
191    final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
192    final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream);
193    OutputStream compressingStream = null;
194
195
196    try {
197      if (compressor != null) {
198        compressor.reset();
199      }
200
201      compressingStream = algo.createCompressionStream(compressedStream, compressor, 0);
202
203      compressingStream.write(inputBuffer, offset, length);
204      compressingStream.flush();
205
206      return compressedStream.size();
207    } finally {
208      nullOutputStream.close();
209      compressedStream.close();
210      compressingStream.close();
211    }
212  }
213
214  /**
215   * Estimate size after second stage of compression (e.g. LZO).
216   * @param comprAlgo compression algorithm to be used for compression
217   * @param compressor compressor corresponding to the given compression
218   *          algorithm
219   * @return Size after second stage of compression.
220   */
221  public int getEncodedCompressedSize(Algorithm comprAlgo,
222      Compressor compressor) throws IOException {
223    byte[] compressedBytes = getEncodedData();
224    return getCompressedSize(comprAlgo, compressor, compressedBytes, 0,
225        compressedBytes.length);
226  }
227
228  /** @return encoded data with header */
229  private byte[] getEncodedData() {
230    if (cachedEncodedData != null) {
231      return cachedEncodedData;
232    }
233    cachedEncodedData = encodeData();
234    return cachedEncodedData;
235  }
236
237  private ByteBuffer getUncompressedBuffer() {
238    if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
239      rawBuffer = ByteBuffer.wrap(rawKVs);
240    }
241    return rawBuffer;
242  }
243
244  /**
245   * Do the encoding, but do not cache the encoded data.
246   * @return encoded data block with header and checksum
247   */
248  public byte[] encodeData() {
249    ByteArrayOutputStream baos = new ByteArrayOutputStream();
250    byte [] baosBytes = null;
251    try {
252      baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
253      DataOutputStream out = new DataOutputStream(baos);
254      this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
255      ByteBuffer in = getUncompressedBuffer();
256      in.rewind();
257      int klength, vlength;
258      int tagsLength = 0;
259      long memstoreTS = 0L;
260      KeyValue kv = null;
261      while (in.hasRemaining()) {
262        int kvOffset = in.position();
263        klength = in.getInt();
264        vlength = in.getInt();
265        ByteBufferUtils.skip(in, klength + vlength);
266        if (this.meta.isIncludesTags()) {
267          tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
268          ByteBufferUtils.skip(in, tagsLength);
269          this.isTagsLenZero.add(tagsLength == 0);
270        }
271        if (this.meta.isIncludesMvcc()) {
272          memstoreTS = ByteBufferUtils.readVLong(in);
273        }
274        kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
275            klength, vlength, tagsLength));
276        kv.setSequenceId(memstoreTS);
277        this.dataBlockEncoder.encode(kv, encodingCtx, out);
278      }
279      // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far.
280      baos.flush();
281      baosBytes = baos.toByteArray();
282      this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes);
283      // In endBlockEncoding(encodingCtx, out, baosBytes), Encoder ROW_INDEX_V1 write integer in
284      // out while the others write integer in baosBytes(byte array). We need to add
285      // baos.toByteArray() after endBlockEncoding again to make sure the integer writes in
286      // outputstream with Encoder ROW_INDEX_V1 dump to byte array (baosBytes).
287      // The if branch is necessary because Encoders excepts ROW_INDEX_V1 write integer in
288      // baosBytes directly, without if branch and do toByteArray() again, baosBytes won't
289      // contains the integer wrotten in endBlockEncoding.
290      if (this.encoding.equals(DataBlockEncoding.ROW_INDEX_V1)) {
291        baosBytes = baos.toByteArray();
292      }
293    } catch (IOException e) {
294      throw new RuntimeException(String.format(
295          "Bug in encoding part of algorithm %s. " +
296          "Probably it requested more bytes than are available.",
297          toString()), e);
298    }
299    return baosBytes;
300  }
301
302  @Override
303  public String toString() {
304    return encoding.name();
305  }
306}