001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029
030import org.apache.commons.lang3.NotImplementedException;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
035import org.apache.hadoop.hbase.io.hfile.HFileContext;
036import org.apache.hadoop.hbase.util.ByteBufferUtils;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.io.IOUtils;
039import org.apache.hadoop.io.compress.Compressor;
040import org.apache.yetus.audience.InterfaceAudience;
041
042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
044
045/**
046 * Encapsulates a data block compressed using a particular encoding algorithm.
047 * Useful for testing and benchmarking.
048 * This is used only in testing.
049 */
050@InterfaceAudience.Private
051@VisibleForTesting
052public class EncodedDataBlock {
053  private byte[] rawKVs;
054  private ByteBuffer rawBuffer;
055  private DataBlockEncoder dataBlockEncoder;
056
057  private byte[] cachedEncodedData;
058
059  private final HFileBlockEncodingContext encodingCtx;
060  private HFileContext meta;
061
062  private final DataBlockEncoding encoding;
063
064  // The is for one situation that there are some cells includes tags and others are not.
065  // isTagsLenZero stores if cell tags length is zero before doing encoding since we need
066  // to check cell tags length is zero or not after decoding.
067  // Encoders ROW_INDEX_V1 would abandon tags segment if tags is 0 after decode cells to
068  // byte array, other encoders won't do that. So we have to find a way to add tagsLen zero
069  // in the decoded byte array.
070  private List<Boolean> isTagsLenZero = new ArrayList<>();
071
072  /**
073   * Create a buffer which will be encoded using dataBlockEncoder.
074   * @param dataBlockEncoder Algorithm used for compression.
075   * @param encoding encoding type used
076   * @param rawKVs
077   * @param meta
078   */
079  public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
080      byte[] rawKVs, HFileContext meta) {
081    Preconditions.checkNotNull(encoding,
082        "Cannot create encoded data block with null encoder");
083    this.dataBlockEncoder = dataBlockEncoder;
084    this.encoding = encoding;
085    encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
086        HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
087    this.rawKVs = rawKVs;
088    this.meta = meta;
089  }
090
091  /**
092   * Provides access to compressed value.
093   * @param headerSize header size of the block.
094   * @return Forwards sequential iterator.
095   */
096  public Iterator<Cell> getIterator(int headerSize) {
097    final int rawSize = rawKVs.length;
098    byte[] encodedDataWithHeader = getEncodedData();
099    int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
100    ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader,
101        bytesToSkip, encodedDataWithHeader.length - bytesToSkip);
102    final DataInputStream dis = new DataInputStream(bais);
103
104    return new Iterator<Cell>() {
105      private ByteBuffer decompressedData = null;
106      private Iterator<Boolean> it = isTagsLenZero.iterator();
107
108      @Override
109      public boolean hasNext() {
110        if (decompressedData == null) {
111          return rawSize > 0;
112        }
113        return decompressedData.hasRemaining();
114      }
115
116      @Override
117      public Cell next() {
118        if (decompressedData == null) {
119          try {
120            decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
121                .newDataBlockDecodingContext(meta));
122          } catch (IOException e) {
123            throw new RuntimeException("Problem with data block encoder, " +
124                "most likely it requested more bytes than are available.", e);
125          }
126          decompressedData.rewind();
127        }
128        int offset = decompressedData.position();
129        int klen = decompressedData.getInt();
130        int vlen = decompressedData.getInt();
131        int tagsLen = 0;
132        ByteBufferUtils.skip(decompressedData, klen + vlen);
133        // Read the tag length in case when stream contain tags
134        if (meta.isIncludesTags()) {
135          boolean noTags = true;
136          if (it.hasNext()) {
137            noTags = it.next();
138          }
139          // ROW_INDEX_V1 will not put tagsLen back in cell if it is zero, there is no need
140          // to read short here.
141          if (!(encoding.equals(DataBlockEncoding.ROW_INDEX_V1) && noTags)) {
142            tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
143            ByteBufferUtils.skip(decompressedData, tagsLen);
144          }
145        }
146        KeyValue kv = new KeyValue(decompressedData.array(), offset,
147            (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
148        if (meta.isIncludesMvcc()) {
149          long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
150          kv.setSequenceId(mvccVersion);
151        }
152        return kv;
153      }
154
155      @Override
156      public void remove() {
157        throw new NotImplementedException("remove() is not supported!");
158      }
159
160      @Override
161      public String toString() {
162        return "Iterator of: " + dataBlockEncoder.getClass().getName();
163      }
164
165    };
166  }
167
168  /**
169   * Find the size of minimal buffer that could store compressed data.
170   * @return Size in bytes of compressed data.
171   */
172  public int getSize() {
173    return getEncodedData().length;
174  }
175
176  /**
177   * Find the size of compressed data assuming that buffer will be compressed
178   * using given algorithm.
179   * @param algo compression algorithm
180   * @param compressor compressor already requested from codec
181   * @param inputBuffer Array to be compressed.
182   * @param offset Offset to beginning of the data.
183   * @param length Length to be compressed.
184   * @return Size of compressed data in bytes.
185   * @throws IOException
186   */
187  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
188       justification="No sure what findbugs wants but looks to me like no NPE")
189  public static int getCompressedSize(Algorithm algo, Compressor compressor,
190      byte[] inputBuffer, int offset, int length) throws IOException {
191
192    // Create streams
193    // Storing them so we can close them
194    final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
195    final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream);
196    OutputStream compressingStream = null;
197
198
199    try {
200      if (compressor != null) {
201        compressor.reset();
202      }
203
204      compressingStream = algo.createCompressionStream(compressedStream, compressor, 0);
205
206      compressingStream.write(inputBuffer, offset, length);
207      compressingStream.flush();
208
209      return compressedStream.size();
210    } finally {
211      nullOutputStream.close();
212      compressedStream.close();
213      compressingStream.close();
214    }
215  }
216
217  /**
218   * Estimate size after second stage of compression (e.g. LZO).
219   * @param comprAlgo compression algorithm to be used for compression
220   * @param compressor compressor corresponding to the given compression
221   *          algorithm
222   * @return Size after second stage of compression.
223   */
224  public int getEncodedCompressedSize(Algorithm comprAlgo,
225      Compressor compressor) throws IOException {
226    byte[] compressedBytes = getEncodedData();
227    return getCompressedSize(comprAlgo, compressor, compressedBytes, 0,
228        compressedBytes.length);
229  }
230
231  /** @return encoded data with header */
232  private byte[] getEncodedData() {
233    if (cachedEncodedData != null) {
234      return cachedEncodedData;
235    }
236    cachedEncodedData = encodeData();
237    return cachedEncodedData;
238  }
239
240  private ByteBuffer getUncompressedBuffer() {
241    if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
242      rawBuffer = ByteBuffer.wrap(rawKVs);
243    }
244    return rawBuffer;
245  }
246
247  /**
248   * Do the encoding, but do not cache the encoded data.
249   * @return encoded data block with header and checksum
250   */
251  public byte[] encodeData() {
252    ByteArrayOutputStream baos = new ByteArrayOutputStream();
253    byte [] baosBytes = null;
254    try {
255      baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
256      DataOutputStream out = new DataOutputStream(baos);
257      this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
258      ByteBuffer in = getUncompressedBuffer();
259      in.rewind();
260      int klength, vlength;
261      int tagsLength = 0;
262      long memstoreTS = 0L;
263      KeyValue kv = null;
264      while (in.hasRemaining()) {
265        int kvOffset = in.position();
266        klength = in.getInt();
267        vlength = in.getInt();
268        ByteBufferUtils.skip(in, klength + vlength);
269        if (this.meta.isIncludesTags()) {
270          tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
271          ByteBufferUtils.skip(in, tagsLength);
272          this.isTagsLenZero.add(tagsLength == 0);
273        }
274        if (this.meta.isIncludesMvcc()) {
275          memstoreTS = ByteBufferUtils.readVLong(in);
276        }
277        kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
278            klength, vlength, tagsLength));
279        kv.setSequenceId(memstoreTS);
280        this.dataBlockEncoder.encode(kv, encodingCtx, out);
281      }
282      // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far.
283      baos.flush();
284      baosBytes = baos.toByteArray();
285      this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes);
286      // In endBlockEncoding(encodingCtx, out, baosBytes), Encoder ROW_INDEX_V1 write integer in
287      // out while the others write integer in baosBytes(byte array). We need to add
288      // baos.toByteArray() after endBlockEncoding again to make sure the integer writes in
289      // outputstream with Encoder ROW_INDEX_V1 dump to byte array (baosBytes).
290      // The if branch is necessary because Encoders excepts ROW_INDEX_V1 write integer in
291      // baosBytes directly, without if branch and do toByteArray() again, baosBytes won't
292      // contains the integer wrotten in endBlockEncoding.
293      if (this.encoding.equals(DataBlockEncoding.ROW_INDEX_V1)) {
294        baosBytes = baos.toByteArray();
295      }
296    } catch (IOException e) {
297      throw new RuntimeException(String.format(
298          "Bug in encoding part of algorithm %s. " +
299          "Probably it requested more bytes than are available.",
300          toString()), e);
301    }
302    return baosBytes;
303  }
304
305  @Override
306  public String toString() {
307    return encoding.name();
308  }
309}