001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.ByteArrayInputStream;
021import java.io.ByteArrayOutputStream;
022import java.io.DataInputStream;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Iterator;
029import java.util.List;
030import org.apache.commons.lang3.NotImplementedException;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
036import org.apache.hadoop.hbase.io.hfile.HFileContext;
037import org.apache.hadoop.hbase.util.ByteBufferUtils;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.IOUtils;
040import org.apache.hadoop.io.compress.Compressor;
041import org.apache.yetus.audience.InterfaceAudience;
042
043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
044
045/**
046 * Encapsulates a data block compressed using a particular encoding algorithm. Useful for testing
047 * and benchmarking. This is used only in testing.
048 */
049@InterfaceAudience.Private
050public class EncodedDataBlock {
051  private byte[] rawKVs;
052  private ByteBuffer rawBuffer;
053  private DataBlockEncoder dataBlockEncoder;
054
055  private byte[] cachedEncodedData;
056
057  private final HFileBlockEncodingContext encodingCtx;
058  private HFileContext meta;
059
060  private final DataBlockEncoding encoding;
061  private final Configuration conf;
062
063  // The is for one situation that there are some cells includes tags and others are not.
064  // isTagsLenZero stores if cell tags length is zero before doing encoding since we need
065  // to check cell tags length is zero or not after decoding.
066  // Encoders ROW_INDEX_V1 would abandon tags segment if tags is 0 after decode cells to
067  // byte array, other encoders won't do that. So we have to find a way to add tagsLen zero
068  // in the decoded byte array.
069  private List<Boolean> isTagsLenZero = new ArrayList<>();
070
071  /**
072   * Create a buffer which will be encoded using dataBlockEncoder.
073   * @param conf             store configuration
074   * @param dataBlockEncoder Algorithm used for compression.
075   * @param encoding         encoding type used
076   * @param rawKVs           raw KVs
077   * @param meta             hfile context
078   */
079  public EncodedDataBlock(Configuration conf, DataBlockEncoder dataBlockEncoder,
080    DataBlockEncoding encoding, byte[] rawKVs, HFileContext meta) {
081    Preconditions.checkNotNull(encoding, "Cannot create encoded data block with null encoder");
082    this.dataBlockEncoder = dataBlockEncoder;
083    this.encoding = encoding;
084    encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(conf, encoding,
085      HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
086    this.rawKVs = rawKVs;
087    this.meta = meta;
088    this.conf = conf;
089  }
090
091  /**
092   * Provides access to compressed value.
093   * @param headerSize header size of the block.
094   * @return Forwards sequential iterator.
095   */
096  public Iterator<Cell> getIterator(int headerSize) {
097    final int rawSize = rawKVs.length;
098    byte[] encodedDataWithHeader = getEncodedData();
099    int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
100    ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader, bytesToSkip,
101      encodedDataWithHeader.length - bytesToSkip);
102    final DataInputStream dis = new DataInputStream(bais);
103
104    return new Iterator<Cell>() {
105      private ByteBuffer decompressedData = null;
106      private Iterator<Boolean> it = isTagsLenZero.iterator();
107
108      @Override
109      public boolean hasNext() {
110        if (decompressedData == null) {
111          return rawSize > 0;
112        }
113        return decompressedData.hasRemaining();
114      }
115
116      @Override
117      public Cell next() {
118        if (decompressedData == null) {
119          try {
120            decompressedData = dataBlockEncoder.decodeKeyValues(dis,
121              dataBlockEncoder.newDataBlockDecodingContext(conf, meta));
122          } catch (IOException e) {
123            throw new RuntimeException("Problem with data block encoder, "
124              + "most likely it requested more bytes than are available.", e);
125          }
126          decompressedData.rewind();
127        }
128        int offset = decompressedData.position();
129        int klen = decompressedData.getInt();
130        int vlen = decompressedData.getInt();
131        int tagsLen = 0;
132        ByteBufferUtils.skip(decompressedData, klen + vlen);
133        // Read the tag length in case when stream contain tags
134        if (meta.isIncludesTags()) {
135          boolean noTags = true;
136          if (it.hasNext()) {
137            noTags = it.next();
138          }
139          // ROW_INDEX_V1 will not put tagsLen back in cell if it is zero, there is no need
140          // to read short here.
141          if (!(encoding.equals(DataBlockEncoding.ROW_INDEX_V1) && noTags)) {
142            tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
143            ByteBufferUtils.skip(decompressedData, tagsLen);
144          }
145        }
146        KeyValue kv =
147          new KeyValue(decompressedData.array(), decompressedData.arrayOffset() + offset,
148            (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
149        if (meta.isIncludesMvcc()) {
150          long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
151          kv.setSequenceId(mvccVersion);
152        }
153        return kv;
154      }
155
156      @Override
157      public void remove() {
158        throw new NotImplementedException("remove() is not supported!");
159      }
160
161      @Override
162      public String toString() {
163        return "Iterator of: " + dataBlockEncoder.getClass().getName();
164      }
165
166    };
167  }
168
169  /**
170   * Find the size of minimal buffer that could store compressed data.
171   * @return Size in bytes of compressed data.
172   */
173  public int getSize() {
174    return getEncodedData().length;
175  }
176
177  /**
178   * Find the size of compressed data assuming that buffer will be compressed using given algorithm.
179   * @param algo        compression algorithm
180   * @param compressor  compressor already requested from codec
181   * @param inputBuffer Array to be compressed.
182   * @param offset      Offset to beginning of the data.
183   * @param length      Length to be compressed.
184   * @return Size of compressed data in bytes. n
185   */
186  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
187      justification = "No sure what findbugs wants but looks to me like no NPE")
188  public static int getCompressedSize(Algorithm algo, Compressor compressor, byte[] inputBuffer,
189    int offset, int length) throws IOException {
190
191    // Create streams
192    // Storing them so we can close them
193    final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
194    final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream);
195    OutputStream compressingStream = null;
196
197    try {
198      if (compressor != null) {
199        compressor.reset();
200      }
201
202      compressingStream = algo.createCompressionStream(compressedStream, compressor, 0);
203
204      compressingStream.write(inputBuffer, offset, length);
205      compressingStream.flush();
206
207      return compressedStream.size();
208    } finally {
209      nullOutputStream.close();
210      compressedStream.close();
211      compressingStream.close();
212    }
213  }
214
215  /**
216   * Estimate size after second stage of compression (e.g. LZO).
217   * @param comprAlgo  compression algorithm to be used for compression
218   * @param compressor compressor corresponding to the given compression algorithm
219   * @return Size after second stage of compression.
220   */
221  public int getEncodedCompressedSize(Algorithm comprAlgo, Compressor compressor)
222    throws IOException {
223    byte[] compressedBytes = getEncodedData();
224    return getCompressedSize(comprAlgo, compressor, compressedBytes, 0, compressedBytes.length);
225  }
226
227  /** Returns encoded data with header */
228  private byte[] getEncodedData() {
229    if (cachedEncodedData != null) {
230      return cachedEncodedData;
231    }
232    cachedEncodedData = encodeData();
233    return cachedEncodedData;
234  }
235
236  private ByteBuffer getUncompressedBuffer() {
237    if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
238      rawBuffer = ByteBuffer.wrap(rawKVs);
239    }
240    return rawBuffer;
241  }
242
243  /**
244   * Do the encoding, but do not cache the encoded data.
245   * @return encoded data block with header and checksum
246   */
247  public byte[] encodeData() {
248    ByteArrayOutputStream baos = new ByteArrayOutputStream();
249    byte[] baosBytes = null;
250    try {
251      baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
252      DataOutputStream out = new DataOutputStream(baos);
253      this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
254      ByteBuffer in = getUncompressedBuffer();
255      in.rewind();
256      int klength, vlength;
257      int tagsLength = 0;
258      long memstoreTS = 0L;
259      KeyValue kv = null;
260      while (in.hasRemaining()) {
261        int kvOffset = in.position();
262        klength = in.getInt();
263        vlength = in.getInt();
264        ByteBufferUtils.skip(in, klength + vlength);
265        if (this.meta.isIncludesTags()) {
266          tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
267          ByteBufferUtils.skip(in, tagsLength);
268          this.isTagsLenZero.add(tagsLength == 0);
269        }
270        if (this.meta.isIncludesMvcc()) {
271          memstoreTS = ByteBufferUtils.readVLong(in);
272        }
273        kv = new KeyValue(in.array(), in.arrayOffset() + kvOffset,
274          (int) KeyValue.getKeyValueDataStructureSize(klength, vlength, tagsLength));
275        kv.setSequenceId(memstoreTS);
276        this.dataBlockEncoder.encode(kv, encodingCtx, out);
277      }
278      // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far.
279      baos.flush();
280      baosBytes = baos.toByteArray();
281      this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes);
282      // In endBlockEncoding(encodingCtx, out, baosBytes), Encoder ROW_INDEX_V1 write integer in
283      // out while the others write integer in baosBytes(byte array). We need to add
284      // baos.toByteArray() after endBlockEncoding again to make sure the integer writes in
285      // outputstream with Encoder ROW_INDEX_V1 dump to byte array (baosBytes).
286      // The if branch is necessary because Encoders excepts ROW_INDEX_V1 write integer in
287      // baosBytes directly, without if branch and do toByteArray() again, baosBytes won't
288      // contains the integer wrotten in endBlockEncoding.
289      if (this.encoding.equals(DataBlockEncoding.ROW_INDEX_V1)) {
290        baosBytes = baos.toByteArray();
291      }
292    } catch (IOException e) {
293      throw new RuntimeException(String.format("Bug in encoding part of algorithm %s. "
294        + "Probably it requested more bytes than are available.", toString()), e);
295    }
296    return baosBytes;
297  }
298
299  @Override
300  public String toString() {
301    return encoding.name();
302  }
303}