001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import java.lang.reflect.Constructor;
024import java.lang.reflect.InvocationTargetException;
025import java.nio.ByteBuffer;
026import org.apache.hadoop.hbase.Tag;
027import org.apache.hadoop.hbase.io.util.Dictionary;
028import org.apache.hadoop.hbase.io.util.StreamUtils;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.hbase.util.ByteBufferUtils;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.io.IOUtils;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
037 * will be used for compressing tags while writing into HFiles and WALs.
038 */
039@InterfaceAudience.Private
040public class TagCompressionContext {
041  private final Dictionary tagDict;
042
043  public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
044    throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException,
045    InvocationTargetException {
046    Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
047    tagDict = dictConstructor.newInstance();
048    tagDict.init(dictCapacity);
049  }
050
051  public void clear() {
052    tagDict.clear();
053  }
054
055  /**
056   * Compress tags one by one and writes to the OutputStream.
057   * @param out    Stream to which the compressed tags to be written
058   * @param in     Source where tags are available
059   * @param offset Offset for the tags bytes
060   * @param length Length of all tag bytes
061   */
062  public void compressTags(OutputStream out, byte[] in, int offset, int length) throws IOException {
063    int pos = offset;
064    int endOffset = pos + length;
065    assert pos < endOffset;
066    while (pos < endOffset) {
067      int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
068      pos += Tag.TAG_LENGTH_SIZE;
069      Dictionary.write(out, in, pos, tagLen, tagDict);
070      pos += tagLen;
071    }
072  }
073
074  /**
075   * Compress tags one by one and writes to the OutputStream.
076   * @param out    Stream to which the compressed tags to be written
077   * @param in     Source buffer where tags are available
078   * @param offset Offset for the tags byte buffer
079   * @param length Length of all tag bytes
080   */
081  public void compressTags(OutputStream out, ByteBuffer in, int offset, int length)
082    throws IOException {
083    if (in.hasArray()) {
084      // Offset we are given is relative to ByteBuffer#arrayOffset
085      compressTags(out, in.array(), in.arrayOffset() + offset, length);
086    } else {
087      int pos = offset;
088      int endOffset = pos + length;
089      assert pos < endOffset;
090      while (pos < endOffset) {
091        int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
092        pos += Tag.TAG_LENGTH_SIZE;
093        Dictionary.write(out, in, pos, tagLen, tagDict);
094        pos += tagLen;
095      }
096    }
097  }
098
099  /**
100   * Uncompress tags from the InputStream and writes to the destination array.
101   * @param src    Stream where the compressed tags are available
102   * @param dest   Destination array where to write the uncompressed tags
103   * @param offset Offset in destination where tags to be written
104   * @param length Length of all tag bytes
105   */
106  public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
107    throws IOException {
108    int endOffset = offset + length;
109    while (offset < endOffset) {
110      byte status = StreamUtils.readByte(src);
111      if (status == Dictionary.NOT_IN_DICTIONARY) {
112        int tagLen = StreamUtils.readRawVarint32(src);
113        offset = Bytes.putAsShort(dest, offset, tagLen);
114        IOUtils.readFully(src, dest, offset, tagLen);
115        tagDict.addEntry(dest, offset, tagLen);
116        offset += tagLen;
117      } else {
118        short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
119        byte[] entry = tagDict.getEntry(dictIdx);
120        if (entry == null) {
121          throw new IOException("Missing dictionary entry for index " + dictIdx);
122        }
123        offset = Bytes.putAsShort(dest, offset, entry.length);
124        System.arraycopy(entry, 0, dest, offset, entry.length);
125        offset += entry.length;
126      }
127    }
128  }
129
130  /**
131   * Uncompress tags from the input ByteBuffer and writes to the destination array.
132   * @param src    Buffer where the compressed tags are available
133   * @param dest   Destination array where to write the uncompressed tags
134   * @param offset Offset in destination where tags to be written
135   * @param length Length of all tag bytes
136   * @return bytes count read from source to uncompress all tags.
137   */
138  public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length) throws IOException {
139    int srcBeginPos = src.position();
140    int endOffset = offset + length;
141    while (offset < endOffset) {
142      byte status = src.get();
143      int tagLen;
144      if (status == Dictionary.NOT_IN_DICTIONARY) {
145        tagLen = StreamUtils.readRawVarint32(src);
146        offset = Bytes.putAsShort(dest, offset, tagLen);
147        src.get(dest, offset, tagLen);
148        tagDict.addEntry(dest, offset, tagLen);
149        offset += tagLen;
150      } else {
151        short dictIdx = StreamUtils.toShort(status, src.get());
152        byte[] entry = tagDict.getEntry(dictIdx);
153        if (entry == null) {
154          throw new IOException("Missing dictionary entry for index " + dictIdx);
155        }
156        tagLen = entry.length;
157        offset = Bytes.putAsShort(dest, offset, tagLen);
158        System.arraycopy(entry, 0, dest, offset, tagLen);
159        offset += tagLen;
160      }
161    }
162    return src.position() - srcBeginPos;
163  }
164
165  /**
166   * Uncompress tags from the InputStream and writes to the destination buffer.
167   * @param src    Stream where the compressed tags are available
168   * @param dest   Destination buffer where to write the uncompressed tags
169   * @param length Length of all tag bytes
170   * @throws IOException when the dictionary does not have the entry
171   */
172  public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
173    if (dest.hasArray()) {
174      uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
175    } else {
176      byte[] tagBuf = new byte[length];
177      uncompressTags(src, tagBuf, 0, length);
178      dest.put(tagBuf);
179    }
180  }
181}