View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.InvocationTargetException;
26  import java.nio.ByteBuffer;
27  
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.Tag;
30  import org.apache.hadoop.hbase.io.util.Dictionary;
31  import org.apache.hadoop.hbase.io.util.StreamUtils;
32  import org.apache.hadoop.hbase.util.ByteBufferUtils;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.io.IOUtils;
35  
36  /**
37   * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
38   * will be used for compressing tags while writing into HFiles and WALs.
39   */
40  @InterfaceAudience.Private
41  public class TagCompressionContext {
42    private final Dictionary tagDict;
43  
44    public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
45        throws SecurityException, NoSuchMethodException, InstantiationException,
46        IllegalAccessException, InvocationTargetException {
47      Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
48      tagDict = dictConstructor.newInstance();
49      tagDict.init(dictCapacity);
50    }
51  
52    public void clear() {
53      tagDict.clear();
54    }
55  
56    /**
57     * Compress tags one by one and writes to the OutputStream.
58     * @param out Stream to which the compressed tags to be written
59     * @param in Source where tags are available
60     * @param offset Offset for the tags bytes
61     * @param length Length of all tag bytes
62     * @throws IOException
63     */
64    public void compressTags(OutputStream out, byte[] in, int offset, int length)
65        throws IOException {
66      int pos = offset;
67      int endOffset = pos + length;
68      assert pos < endOffset;
69      while (pos < endOffset) {
70        int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
71        pos += Tag.TAG_LENGTH_SIZE;
72        write(in, pos, tagLen, out);
73        pos += tagLen;
74      }
75    }
76  
77    /**
78     * Compress tags one by one and writes to the OutputStream.
79     * @param out Stream to which the compressed tags to be written
80     * @param in Source buffer where tags are available
81     * @param length Length of all tag bytes
82     * @throws IOException
83     */
84    public void compressTags(OutputStream out, ByteBuffer in, int length) throws IOException {
85      if (in.hasArray()) {
86        compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
87        ByteBufferUtils.skip(in, length);
88      } else {
89        byte[] tagBuf = new byte[length];
90        in.get(tagBuf);
91        compressTags(out, tagBuf, 0, length);
92      }
93    }
94  
95    /**
96     * Uncompress tags from the InputStream and writes to the destination array.
97     * @param src Stream where the compressed tags are available
98     * @param dest Destination array where to write the uncompressed tags
99     * @param offset Offset in destination where tags to be written
100    * @param length Length of all tag bytes
101    * @throws IOException
102    */
103   public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
104       throws IOException {
105     int endOffset = offset + length;
106     while (offset < endOffset) {
107       byte status = (byte) src.read();
108       if (status == Dictionary.NOT_IN_DICTIONARY) {
109         int tagLen = StreamUtils.readRawVarint32(src);
110         offset = Bytes.putAsShort(dest, offset, tagLen);
111         IOUtils.readFully(src, dest, offset, tagLen);
112         tagDict.addEntry(dest, offset, tagLen);
113         offset += tagLen;
114       } else {
115         short dictIdx = StreamUtils.toShort(status, (byte) src.read());
116         byte[] entry = tagDict.getEntry(dictIdx);
117         if (entry == null) {
118           throw new IOException("Missing dictionary entry for index " + dictIdx);
119         }
120         offset = Bytes.putAsShort(dest, offset, entry.length);
121         System.arraycopy(entry, 0, dest, offset, entry.length);
122         offset += entry.length;
123       }
124     }
125   }
126 
127   /**
128    * Uncompress tags from the input ByteBuffer and writes to the destination array.
129    * @param src Buffer where the compressed tags are available
130    * @param dest Destination array where to write the uncompressed tags
131    * @param offset Offset in destination where tags to be written
132    * @param length Length of all tag bytes
133    * @return bytes count read from source to uncompress all tags.
134    * @throws IOException
135    */
136   public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
137       throws IOException {
138     int srcBeginPos = src.position();
139     int endOffset = offset + length;
140     while (offset < endOffset) {
141       byte status = src.get();
142       int tagLen;
143       if (status == Dictionary.NOT_IN_DICTIONARY) {
144         tagLen = StreamUtils.readRawVarint32(src);
145         offset = Bytes.putAsShort(dest, offset, tagLen);
146         src.get(dest, offset, tagLen);
147         tagDict.addEntry(dest, offset, tagLen);
148         offset += tagLen;
149       } else {
150         short dictIdx = StreamUtils.toShort(status, src.get());
151         byte[] entry = tagDict.getEntry(dictIdx);
152         if (entry == null) {
153           throw new IOException("Missing dictionary entry for index " + dictIdx);
154         }
155         tagLen = entry.length;
156         offset = Bytes.putAsShort(dest, offset, tagLen);
157         System.arraycopy(entry, 0, dest, offset, tagLen);
158         offset += tagLen;
159       }
160     }
161     return src.position() - srcBeginPos;
162   }
163 
164   /**
165    * Uncompress tags from the InputStream and writes to the destination buffer.
166    * @param src Stream where the compressed tags are available
167    * @param dest Destination buffer where to write the uncompressed tags
168    * @param length Length of all tag bytes
169    * @throws IOException
170    */
171   public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
172     if (dest.hasArray()) {
173       uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
174     } else {
175       byte[] tagBuf = new byte[length];
176       uncompressTags(src, tagBuf, 0, length);
177       dest.put(tagBuf);
178     }
179   }
180 
181   private void write(byte[] data, int offset, int length, OutputStream out) throws IOException {
182     short dictIdx = Dictionary.NOT_IN_DICTIONARY;
183     if (tagDict != null) {
184       dictIdx = tagDict.findEntry(data, offset, length);
185     }
186     if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
187       out.write(Dictionary.NOT_IN_DICTIONARY);
188       StreamUtils.writeRawVInt32(out, length);
189       out.write(data, offset, length);
190     } else {
191       StreamUtils.writeShort(out, dictIdx);
192     }
193   }
194 }