001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.nio.ByteBuffer;
027
028import org.apache.hadoop.hbase.Tag;
029import org.apache.hadoop.hbase.io.util.Dictionary;
030import org.apache.hadoop.hbase.io.util.StreamUtils;
031import org.apache.hadoop.hbase.nio.ByteBuff;
032import org.apache.hadoop.hbase.util.ByteBufferUtils;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.io.IOUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036
037/**
038 * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
039 * will be used for compressing tags while writing into HFiles and WALs.
040 */
041@InterfaceAudience.Private
042public class TagCompressionContext {
043  private final Dictionary tagDict;
044
045  public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
046      throws SecurityException, NoSuchMethodException, InstantiationException,
047      IllegalAccessException, InvocationTargetException {
048    Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
049    tagDict = dictConstructor.newInstance();
050    tagDict.init(dictCapacity);
051  }
052
053  public void clear() {
054    tagDict.clear();
055  }
056
057  /**
058   * Compress tags one by one and writes to the OutputStream.
059   * @param out Stream to which the compressed tags to be written
060   * @param in Source where tags are available
061   * @param offset Offset for the tags bytes
062   * @param length Length of all tag bytes
063   * @throws IOException
064   */
065  public void compressTags(OutputStream out, byte[] in, int offset, int length)
066      throws IOException {
067    int pos = offset;
068    int endOffset = pos + length;
069    assert pos < endOffset;
070    while (pos < endOffset) {
071      int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
072      pos += Tag.TAG_LENGTH_SIZE;
073      Dictionary.write(out, in, pos, tagLen, tagDict);
074      pos += tagLen;
075    }
076  }
077
078  /**
079   * Compress tags one by one and writes to the OutputStream.
080   * @param out Stream to which the compressed tags to be written
081   * @param in Source buffer where tags are available
082   * @param offset Offset for the tags byte buffer
083   * @param length Length of all tag bytes
084   * @throws IOException
085   */
086  public void compressTags(OutputStream out, ByteBuffer in, int offset, int length)
087      throws IOException {
088    if (in.hasArray()) {
089      compressTags(out, in.array(), offset, length);
090    } else {
091      int pos = offset;
092      int endOffset = pos + length;
093      assert pos < endOffset;
094      while (pos < endOffset) {
095        int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
096        pos += Tag.TAG_LENGTH_SIZE;
097        Dictionary.write(out, in, pos, tagLen, tagDict);
098        pos += tagLen;
099      }
100    }
101  }
102
103  /**
104   * Uncompress tags from the InputStream and writes to the destination array.
105   * @param src Stream where the compressed tags are available
106   * @param dest Destination array where to write the uncompressed tags
107   * @param offset Offset in destination where tags to be written
108   * @param length Length of all tag bytes
109   * @throws IOException
110   */
111  public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
112      throws IOException {
113    int endOffset = offset + length;
114    while (offset < endOffset) {
115      byte status = (byte) src.read();
116      if (status == Dictionary.NOT_IN_DICTIONARY) {
117        int tagLen = StreamUtils.readRawVarint32(src);
118        offset = Bytes.putAsShort(dest, offset, tagLen);
119        IOUtils.readFully(src, dest, offset, tagLen);
120        tagDict.addEntry(dest, offset, tagLen);
121        offset += tagLen;
122      } else {
123        short dictIdx = StreamUtils.toShort(status, (byte) src.read());
124        byte[] entry = tagDict.getEntry(dictIdx);
125        if (entry == null) {
126          throw new IOException("Missing dictionary entry for index " + dictIdx);
127        }
128        offset = Bytes.putAsShort(dest, offset, entry.length);
129        System.arraycopy(entry, 0, dest, offset, entry.length);
130        offset += entry.length;
131      }
132    }
133  }
134
135  /**
136   * Uncompress tags from the input ByteBuffer and writes to the destination array.
137   * @param src Buffer where the compressed tags are available
138   * @param dest Destination array where to write the uncompressed tags
139   * @param offset Offset in destination where tags to be written
140   * @param length Length of all tag bytes
141   * @return bytes count read from source to uncompress all tags.
142   * @throws IOException
143   */
144  public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length)
145      throws IOException {
146    int srcBeginPos = src.position();
147    int endOffset = offset + length;
148    while (offset < endOffset) {
149      byte status = src.get();
150      int tagLen;
151      if (status == Dictionary.NOT_IN_DICTIONARY) {
152        tagLen = StreamUtils.readRawVarint32(src);
153        offset = Bytes.putAsShort(dest, offset, tagLen);
154        src.get(dest, offset, tagLen);
155        tagDict.addEntry(dest, offset, tagLen);
156        offset += tagLen;
157      } else {
158        short dictIdx = StreamUtils.toShort(status, src.get());
159        byte[] entry = tagDict.getEntry(dictIdx);
160        if (entry == null) {
161          throw new IOException("Missing dictionary entry for index " + dictIdx);
162        }
163        tagLen = entry.length;
164        offset = Bytes.putAsShort(dest, offset, tagLen);
165        System.arraycopy(entry, 0, dest, offset, tagLen);
166        offset += tagLen;
167      }
168    }
169    return src.position() - srcBeginPos;
170  }
171
172  /**
173   * Uncompress tags from the InputStream and writes to the destination buffer.
174   * @param src Stream where the compressed tags are available
175   * @param dest Destination buffer where to write the uncompressed tags
176   * @param length Length of all tag bytes
177   * @throws IOException when the dictionary does not have the entry
178   */
179  public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
180    if (dest.hasArray()) {
181      uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
182    } else {
183      byte[] tagBuf = new byte[length];
184      uncompressTags(src, tagBuf, 0, length);
185      dest.put(tagBuf);
186    }
187  }
188}