View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.InvocationTargetException;
26  import java.nio.ByteBuffer;
27  
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.Tag;
30  import org.apache.hadoop.hbase.io.util.Dictionary;
31  import org.apache.hadoop.hbase.io.util.StreamUtils;
32  import org.apache.hadoop.hbase.util.ByteBufferUtils;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.io.IOUtils;
35  
36  /**
37   * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
38   * will be used for compressing tags while writing into HFiles and WALs.
39   */
40  @InterfaceAudience.Private
41  public class TagCompressionContext {
42    private final Dictionary tagDict;
43  
44    public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
45        throws SecurityException, NoSuchMethodException, InstantiationException,
46        IllegalAccessException, InvocationTargetException {
47      Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
48      tagDict = dictConstructor.newInstance();
49      tagDict.init(dictCapacity);
50    }
51  
52    public void clear() {
53      tagDict.clear();
54    }
55  
56    /**
57     * Compress tags one by one and writes to the OutputStream.
58     * @param out Stream to which the compressed tags to be written
59     * @param in Source where tags are available
60     * @param offset Offset for the tags bytes
61     * @param length Length of all tag bytes
62     * @throws IOException
63     */
64    public void compressTags(OutputStream out, byte[] in, int offset, short length)
65        throws IOException {
66      int pos = offset;
67      int endOffset = pos + length;
68      assert pos < endOffset;
69      while (pos < endOffset) {
70        short tagLen = Bytes.toShort(in, pos);
71        pos += Tag.TAG_LENGTH_SIZE;
72        write(in, pos, tagLen, out);
73        pos += tagLen;
74      }
75    }
76  
77    /**
78     * Compress tags one by one and writes to the OutputStream.
79     * @param out Stream to which the compressed tags to be written
80     * @param in Source buffer where tags are available
81     * @param length Length of all tag bytes
82     * @throws IOException
83     */
84    public void compressTags(OutputStream out, ByteBuffer in, short length) throws IOException {
85      if (in.hasArray()) {
86        compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
87        ByteBufferUtils.skip(in, length);
88      } else {
89        byte[] tagBuf = new byte[length];
90        in.get(tagBuf);
91        compressTags(out, tagBuf, 0, length);
92      }
93    }
94  
95    /**
96     * Uncompress tags from the InputStream and writes to the destination array.
97     * @param src Stream where the compressed tags are available
98     * @param dest Destination array where to write the uncompressed tags
99     * @param offset Offset in destination where tags to be written
100    * @param length Length of all tag bytes
101    * @throws IOException
102    */
103   public void uncompressTags(InputStream src, byte[] dest, int offset, short length)
104       throws IOException {
105     int endOffset = offset + length;
106     while (offset < endOffset) {
107       byte status = (byte) src.read();
108       if (status == Dictionary.NOT_IN_DICTIONARY) {
109         // We are writing short as tagLen. So can downcast this without any risk.
110         short tagLen = (short) StreamUtils.readRawVarint32(src);
111         offset = Bytes.putShort(dest, offset, tagLen);
112         IOUtils.readFully(src, dest, offset, tagLen);
113         tagDict.addEntry(dest, offset, tagLen);
114         offset += tagLen;
115       } else {
116         short dictIdx = StreamUtils.toShort(status, (byte) src.read());
117         byte[] entry = tagDict.getEntry(dictIdx);
118         if (entry == null) {
119           throw new IOException("Missing dictionary entry for index " + dictIdx);
120         }
121         offset = Bytes.putShort(dest, offset, (short) entry.length);
122         System.arraycopy(entry, 0, dest, offset, entry.length);
123         offset += entry.length;
124       }
125     }
126   }
127 
128   /**
129    * Uncompress tags from the input ByteBuffer and writes to the destination array.
130    * @param src Buffer where the compressed tags are available
131    * @param dest Destination array where to write the uncompressed tags
132    * @param offset Offset in destination where tags to be written
133    * @param length Length of all tag bytes
134    * @return bytes count read from source to uncompress all tags.
135    * @throws IOException
136    */
137   public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
138       throws IOException {
139     int srcBeginPos = src.position();
140     int endOffset = offset + length;
141     while (offset < endOffset) {
142       byte status = src.get();
143       short tagLen;
144       if (status == Dictionary.NOT_IN_DICTIONARY) {
145         // We are writing short as tagLen. So can downcast this without any risk.
146         tagLen = (short) StreamUtils.readRawVarint32(src);
147         offset = Bytes.putShort(dest, offset, tagLen);
148         src.get(dest, offset, tagLen);
149         tagDict.addEntry(dest, offset, tagLen);
150         offset += tagLen;
151       } else {
152         short dictIdx = StreamUtils.toShort(status, src.get());
153         byte[] entry = tagDict.getEntry(dictIdx);
154         if (entry == null) {
155           throw new IOException("Missing dictionary entry for index " + dictIdx);
156         }
157         tagLen = (short) entry.length;
158         offset = Bytes.putShort(dest, offset, tagLen);
159         System.arraycopy(entry, 0, dest, offset, tagLen);
160         offset += tagLen;
161       }
162     }
163     return src.position() - srcBeginPos;
164   }
165 
166   /**
167    * Uncompress tags from the InputStream and writes to the destination buffer.
168    * @param src Stream where the compressed tags are available
169    * @param dest Destination buffer where to write the uncompressed tags
170    * @param length Length of all tag bytes
171    * @throws IOException
172    */
173   public void uncompressTags(InputStream src, ByteBuffer dest, short length) throws IOException {
174     if (dest.hasArray()) {
175       uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
176     } else {
177       byte[] tagBuf = new byte[length];
178       uncompressTags(src, tagBuf, 0, length);
179       dest.put(tagBuf);
180     }
181   }
182 
183   private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
184     short dictIdx = Dictionary.NOT_IN_DICTIONARY;
185     if (tagDict != null) {
186       dictIdx = tagDict.findEntry(data, offset, length);
187     }
188     if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
189       out.write(Dictionary.NOT_IN_DICTIONARY);
190       StreamUtils.writeRawVInt32(out, length);
191       out.write(data, offset, length);
192     } else {
193       StreamUtils.writeShort(out, dictIdx);
194     }
195   }
196 }