001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import java.lang.reflect.Constructor;
024import java.lang.reflect.InvocationTargetException;
025import java.nio.ByteBuffer;
026import org.apache.hadoop.hbase.Tag;
027import org.apache.hadoop.hbase.io.util.Dictionary;
028import org.apache.hadoop.hbase.io.util.StreamUtils;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.hbase.util.ByteBufferUtils;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.io.IOUtils;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
037 * will be used for compressing tags while writing into HFiles and WALs.
038 */
039@InterfaceAudience.Private
040public class TagCompressionContext {
041  private final Dictionary tagDict;
042
043  public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
044    throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException,
045    InvocationTargetException {
046    Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
047    tagDict = dictConstructor.newInstance();
048    tagDict.init(dictCapacity);
049  }
050
051  public void clear() {
052    tagDict.clear();
053  }
054
055  /**
056   * Compress tags one by one and writes to the OutputStream.
057   * @param out    Stream to which the compressed tags to be written
058   * @param in     Source where tags are available
059   * @param offset Offset for the tags bytes
060   * @param length Length of all tag bytes n
061   */
062  public void compressTags(OutputStream out, byte[] in, int offset, int length) throws IOException {
063    int pos = offset;
064    int endOffset = pos + length;
065    assert pos < endOffset;
066    while (pos < endOffset) {
067      int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
068      pos += Tag.TAG_LENGTH_SIZE;
069      Dictionary.write(out, in, pos, tagLen, tagDict);
070      pos += tagLen;
071    }
072  }
073
074  /**
075   * Compress tags one by one and writes to the OutputStream.
076   * @param out    Stream to which the compressed tags to be written
077   * @param in     Source buffer where tags are available
078   * @param offset Offset for the tags byte buffer
079   * @param length Length of all tag bytes n
080   */
081  public void compressTags(OutputStream out, ByteBuffer in, int offset, int length)
082    throws IOException {
083    if (in.hasArray()) {
084      compressTags(out, in.array(), offset, length);
085    } else {
086      int pos = offset;
087      int endOffset = pos + length;
088      assert pos < endOffset;
089      while (pos < endOffset) {
090        int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
091        pos += Tag.TAG_LENGTH_SIZE;
092        Dictionary.write(out, in, pos, tagLen, tagDict);
093        pos += tagLen;
094      }
095    }
096  }
097
098  /**
099   * Uncompress tags from the InputStream and writes to the destination array.
100   * @param src    Stream where the compressed tags are available
101   * @param dest   Destination array where to write the uncompressed tags
102   * @param offset Offset in destination where tags to be written
103   * @param length Length of all tag bytes n
104   */
105  public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
106    throws IOException {
107    int endOffset = offset + length;
108    while (offset < endOffset) {
109      byte status = (byte) src.read();
110      if (status == Dictionary.NOT_IN_DICTIONARY) {
111        int tagLen = StreamUtils.readRawVarint32(src);
112        offset = Bytes.putAsShort(dest, offset, tagLen);
113        IOUtils.readFully(src, dest, offset, tagLen);
114        tagDict.addEntry(dest, offset, tagLen);
115        offset += tagLen;
116      } else {
117        short dictIdx = StreamUtils.toShort(status, (byte) src.read());
118        byte[] entry = tagDict.getEntry(dictIdx);
119        if (entry == null) {
120          throw new IOException("Missing dictionary entry for index " + dictIdx);
121        }
122        offset = Bytes.putAsShort(dest, offset, entry.length);
123        System.arraycopy(entry, 0, dest, offset, entry.length);
124        offset += entry.length;
125      }
126    }
127  }
128
129  /**
130   * Uncompress tags from the input ByteBuffer and writes to the destination array.
131   * @param src    Buffer where the compressed tags are available
132   * @param dest   Destination array where to write the uncompressed tags
133   * @param offset Offset in destination where tags to be written
134   * @param length Length of all tag bytes
135   * @return bytes count read from source to uncompress all tags. n
136   */
137  public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length) throws IOException {
138    int srcBeginPos = src.position();
139    int endOffset = offset + length;
140    while (offset < endOffset) {
141      byte status = src.get();
142      int tagLen;
143      if (status == Dictionary.NOT_IN_DICTIONARY) {
144        tagLen = StreamUtils.readRawVarint32(src);
145        offset = Bytes.putAsShort(dest, offset, tagLen);
146        src.get(dest, offset, tagLen);
147        tagDict.addEntry(dest, offset, tagLen);
148        offset += tagLen;
149      } else {
150        short dictIdx = StreamUtils.toShort(status, src.get());
151        byte[] entry = tagDict.getEntry(dictIdx);
152        if (entry == null) {
153          throw new IOException("Missing dictionary entry for index " + dictIdx);
154        }
155        tagLen = entry.length;
156        offset = Bytes.putAsShort(dest, offset, tagLen);
157        System.arraycopy(entry, 0, dest, offset, tagLen);
158        offset += tagLen;
159      }
160    }
161    return src.position() - srcBeginPos;
162  }
163
164  /**
165   * Uncompress tags from the InputStream and writes to the destination buffer.
166   * @param src    Stream where the compressed tags are available
167   * @param dest   Destination buffer where to write the uncompressed tags
168   * @param length Length of all tag bytes
169   * @throws IOException when the dictionary does not have the entry
170   */
171  public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
172    if (dest.hasArray()) {
173      uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
174    } else {
175      byte[] tagBuf = new byte[length];
176      uncompressTags(src, tagBuf, 0, length);
177      dest.put(tagBuf);
178    }
179  }
180}