001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.ByteArrayInputStream;
021import java.io.ByteArrayOutputStream;
022import java.io.DataInputStream;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Iterator;
029import java.util.List;
030import org.apache.commons.lang3.NotImplementedException;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
036import org.apache.hadoop.hbase.io.hfile.HFileContext;
037import org.apache.hadoop.hbase.util.ByteBufferUtils;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.IOUtils;
040import org.apache.hadoop.io.compress.Compressor;
041import org.apache.yetus.audience.InterfaceAudience;
042
043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
044
045/**
046 * Encapsulates a data block compressed using a particular encoding algorithm. Useful for testing
047 * and benchmarking. This is used only in testing.
048 */
049@InterfaceAudience.Private
050public class EncodedDataBlock {
051  private byte[] rawKVs;
052  private ByteBuffer rawBuffer;
053  private DataBlockEncoder dataBlockEncoder;
054
055  private byte[] cachedEncodedData;
056
057  private final HFileBlockEncodingContext encodingCtx;
058  private HFileContext meta;
059
060  private final DataBlockEncoding encoding;
061  private final Configuration conf;
062
063  // The is for one situation that there are some cells includes tags and others are not.
064  // isTagsLenZero stores if cell tags length is zero before doing encoding since we need
065  // to check cell tags length is zero or not after decoding.
066  // Encoders ROW_INDEX_V1 would abandon tags segment if tags is 0 after decode cells to
067  // byte array, other encoders won't do that. So we have to find a way to add tagsLen zero
068  // in the decoded byte array.
069  private List<Boolean> isTagsLenZero = new ArrayList<>();
070
071  /**
072   * Create a buffer which will be encoded using dataBlockEncoder.
073   * @param conf             store configuration
074   * @param dataBlockEncoder Algorithm used for compression.
075   * @param encoding         encoding type used
076   * @param rawKVs           raw KVs
077   * @param meta             hfile context
078   */
079  public EncodedDataBlock(Configuration conf, DataBlockEncoder dataBlockEncoder,
080    DataBlockEncoding encoding, byte[] rawKVs, HFileContext meta) {
081    Preconditions.checkNotNull(encoding, "Cannot create encoded data block with null encoder");
082    this.dataBlockEncoder = dataBlockEncoder;
083    this.encoding = encoding;
084    encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(conf, encoding,
085      HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
086    this.rawKVs = rawKVs;
087    this.meta = meta;
088    this.conf = conf;
089  }
090
091  /**
092   * Provides access to compressed value.
093   * @param headerSize header size of the block.
094   * @return Forwards sequential iterator.
095   */
096  public Iterator<Cell> getIterator(int headerSize) {
097    final int rawSize = rawKVs.length;
098    byte[] encodedDataWithHeader = getEncodedData();
099    int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
100    ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader, bytesToSkip,
101      encodedDataWithHeader.length - bytesToSkip);
102    final DataInputStream dis = new DataInputStream(bais);
103
104    return new Iterator<Cell>() {
105      private ByteBuffer decompressedData = null;
106      private Iterator<Boolean> it = isTagsLenZero.iterator();
107
108      @Override
109      public boolean hasNext() {
110        if (decompressedData == null) {
111          return rawSize > 0;
112        }
113        return decompressedData.hasRemaining();
114      }
115
116      @Override
117      public Cell next() {
118        if (decompressedData == null) {
119          try {
120            decompressedData = dataBlockEncoder.decodeKeyValues(dis,
121              dataBlockEncoder.newDataBlockDecodingContext(conf, meta));
122          } catch (IOException e) {
123            throw new RuntimeException("Problem with data block encoder, "
124              + "most likely it requested more bytes than are available.", e);
125          }
126          decompressedData.rewind();
127        }
128        int offset = decompressedData.position();
129        int klen = decompressedData.getInt();
130        int vlen = decompressedData.getInt();
131        int tagsLen = 0;
132        ByteBufferUtils.skip(decompressedData, klen + vlen);
133        // Read the tag length in case when stream contain tags
134        if (meta.isIncludesTags()) {
135          boolean noTags = true;
136          if (it.hasNext()) {
137            noTags = it.next();
138          }
139          // ROW_INDEX_V1 will not put tagsLen back in cell if it is zero, there is no need
140          // to read short here.
141          if (!(encoding.equals(DataBlockEncoding.ROW_INDEX_V1) && noTags)) {
142            tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
143            ByteBufferUtils.skip(decompressedData, tagsLen);
144          }
145        }
146        KeyValue kv =
147          new KeyValue(decompressedData.array(), decompressedData.arrayOffset() + offset,
148            (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
149        if (meta.isIncludesMvcc()) {
150          long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
151          kv.setSequenceId(mvccVersion);
152        }
153        return kv;
154      }
155
156      @Override
157      public void remove() {
158        throw new NotImplementedException("remove() is not supported!");
159      }
160
161      @Override
162      public String toString() {
163        return "Iterator of: " + dataBlockEncoder.getClass().getName();
164      }
165
166    };
167  }
168
169  /**
170   * Find the size of minimal buffer that could store compressed data.
171   * @return Size in bytes of compressed data.
172   */
173  public int getSize() {
174    return getEncodedData().length;
175  }
176
177  /**
178   * Find the size of compressed data assuming that buffer will be compressed using given algorithm.
179   * @param algo        compression algorithm
180   * @param compressor  compressor already requested from codec
181   * @param inputBuffer Array to be compressed.
182   * @param offset      Offset to beginning of the data.
183   * @param length      Length to be compressed.
184   * @return Size of compressed data in bytes.
185   */
186  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
187      justification = "No sure what findbugs wants but looks to me like no NPE")
188  public static int getCompressedSize(Algorithm algo, Compressor compressor, byte[] inputBuffer,
189    int offset, int length) throws IOException {
190
191    // Create streams
192    // Storing them so we can close them
193    final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
194    final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream);
195    OutputStream compressingStream = null;
196
197    try {
198      if (compressor != null) {
199        compressor.reset();
200      }
201
202      compressingStream = algo.createCompressionStream(compressedStream, compressor, 0);
203
204      compressingStream.write(inputBuffer, offset, length);
205      compressingStream.flush();
206
207      return compressedStream.size();
208    } finally {
209      nullOutputStream.close();
210      compressedStream.close();
211      if (compressingStream != null) {
212        compressingStream.close();
213      }
214    }
215  }
216
217  /**
218   * Estimate size after second stage of compression (e.g. LZO).
219   * @param comprAlgo  compression algorithm to be used for compression
220   * @param compressor compressor corresponding to the given compression algorithm
221   * @return Size after second stage of compression.
222   */
223  public int getEncodedCompressedSize(Algorithm comprAlgo, Compressor compressor)
224    throws IOException {
225    byte[] compressedBytes = getEncodedData();
226    return getCompressedSize(comprAlgo, compressor, compressedBytes, 0, compressedBytes.length);
227  }
228
229  /** Returns encoded data with header */
230  private byte[] getEncodedData() {
231    if (cachedEncodedData != null) {
232      return cachedEncodedData;
233    }
234    cachedEncodedData = encodeData();
235    return cachedEncodedData;
236  }
237
238  private ByteBuffer getUncompressedBuffer() {
239    if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
240      rawBuffer = ByteBuffer.wrap(rawKVs);
241    }
242    return rawBuffer;
243  }
244
245  /**
246   * Do the encoding, but do not cache the encoded data.
247   * @return encoded data block with header and checksum
248   */
249  public byte[] encodeData() {
250    ByteArrayOutputStream baos = new ByteArrayOutputStream();
251    byte[] baosBytes = null;
252    try {
253      baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
254      DataOutputStream out = new DataOutputStream(baos);
255      this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
256      ByteBuffer in = getUncompressedBuffer();
257      in.rewind();
258      int klength, vlength;
259      int tagsLength = 0;
260      long memstoreTS = 0L;
261      KeyValue kv = null;
262      while (in.hasRemaining()) {
263        int kvOffset = in.position();
264        klength = in.getInt();
265        vlength = in.getInt();
266        ByteBufferUtils.skip(in, klength + vlength);
267        if (this.meta.isIncludesTags()) {
268          tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
269          ByteBufferUtils.skip(in, tagsLength);
270          this.isTagsLenZero.add(tagsLength == 0);
271        }
272        if (this.meta.isIncludesMvcc()) {
273          memstoreTS = ByteBufferUtils.readVLong(in);
274        }
275        kv = new KeyValue(in.array(), in.arrayOffset() + kvOffset,
276          (int) KeyValue.getKeyValueDataStructureSize(klength, vlength, tagsLength));
277        kv.setSequenceId(memstoreTS);
278        this.dataBlockEncoder.encode(kv, encodingCtx, out);
279      }
280      // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far.
281      baos.flush();
282      baosBytes = baos.toByteArray();
283      this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes);
284      // In endBlockEncoding(encodingCtx, out, baosBytes), Encoder ROW_INDEX_V1 write integer in
285      // out while the others write integer in baosBytes(byte array). We need to add
286      // baos.toByteArray() after endBlockEncoding again to make sure the integer writes in
287      // outputstream with Encoder ROW_INDEX_V1 dump to byte array (baosBytes).
288      // The if branch is necessary because Encoders excepts ROW_INDEX_V1 write integer in
289      // baosBytes directly, without if branch and do toByteArray() again, baosBytes won't
290      // contains the integer wrotten in endBlockEncoding.
291      if (this.encoding.equals(DataBlockEncoding.ROW_INDEX_V1)) {
292        baosBytes = baos.toByteArray();
293      }
294    } catch (IOException e) {
295      throw new RuntimeException(String.format("Bug in encoding part of algorithm %s. "
296        + "Probably it requested more bytes than are available.", toString()), e);
297    }
298    return baosBytes;
299  }
300
301  @Override
302  public String toString() {
303    return encoding.name();
304  }
305}