1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.InvocationTargetException;
26 import java.nio.ByteBuffer;
27
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.Tag;
30 import org.apache.hadoop.hbase.io.util.Dictionary;
31 import org.apache.hadoop.hbase.io.util.StreamUtils;
32 import org.apache.hadoop.hbase.util.ByteBufferUtils;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.io.IOUtils;
35
36
37
38
39
40 @InterfaceAudience.Private
41 public class TagCompressionContext {
42 private final Dictionary tagDict;
43
44 public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
45 throws SecurityException, NoSuchMethodException, InstantiationException,
46 IllegalAccessException, InvocationTargetException {
47 Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
48 tagDict = dictConstructor.newInstance();
49 tagDict.init(dictCapacity);
50 }
51
52 public void clear() {
53 tagDict.clear();
54 }
55
56
57
58
59
60
61
62
63
64 public void compressTags(OutputStream out, byte[] in, int offset, int length)
65 throws IOException {
66 int pos = offset;
67 int endOffset = pos + length;
68 assert pos < endOffset;
69 while (pos < endOffset) {
70 int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
71 pos += Tag.TAG_LENGTH_SIZE;
72 write(in, pos, tagLen, out);
73 pos += tagLen;
74 }
75 }
76
77
78
79
80
81
82
83
84 public void compressTags(OutputStream out, ByteBuffer in, int length) throws IOException {
85 if (in.hasArray()) {
86 compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
87 ByteBufferUtils.skip(in, length);
88 } else {
89 byte[] tagBuf = new byte[length];
90 in.get(tagBuf);
91 compressTags(out, tagBuf, 0, length);
92 }
93 }
94
95
96
97
98
99
100
101
102
103 public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
104 throws IOException {
105 int endOffset = offset + length;
106 while (offset < endOffset) {
107 byte status = (byte) src.read();
108 if (status == Dictionary.NOT_IN_DICTIONARY) {
109 int tagLen = StreamUtils.readRawVarint32(src);
110 offset = Bytes.putAsShort(dest, offset, tagLen);
111 IOUtils.readFully(src, dest, offset, tagLen);
112 tagDict.addEntry(dest, offset, tagLen);
113 offset += tagLen;
114 } else {
115 short dictIdx = StreamUtils.toShort(status, (byte) src.read());
116 byte[] entry = tagDict.getEntry(dictIdx);
117 if (entry == null) {
118 throw new IOException("Missing dictionary entry for index " + dictIdx);
119 }
120 offset = Bytes.putAsShort(dest, offset, entry.length);
121 System.arraycopy(entry, 0, dest, offset, entry.length);
122 offset += entry.length;
123 }
124 }
125 }
126
127
128
129
130
131
132
133
134
135
136 public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
137 throws IOException {
138 int srcBeginPos = src.position();
139 int endOffset = offset + length;
140 while (offset < endOffset) {
141 byte status = src.get();
142 int tagLen;
143 if (status == Dictionary.NOT_IN_DICTIONARY) {
144 tagLen = StreamUtils.readRawVarint32(src);
145 offset = Bytes.putAsShort(dest, offset, tagLen);
146 src.get(dest, offset, tagLen);
147 tagDict.addEntry(dest, offset, tagLen);
148 offset += tagLen;
149 } else {
150 short dictIdx = StreamUtils.toShort(status, src.get());
151 byte[] entry = tagDict.getEntry(dictIdx);
152 if (entry == null) {
153 throw new IOException("Missing dictionary entry for index " + dictIdx);
154 }
155 tagLen = entry.length;
156 offset = Bytes.putAsShort(dest, offset, tagLen);
157 System.arraycopy(entry, 0, dest, offset, tagLen);
158 offset += tagLen;
159 }
160 }
161 return src.position() - srcBeginPos;
162 }
163
164
165
166
167
168
169
170
171 public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
172 if (dest.hasArray()) {
173 uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
174 } else {
175 byte[] tagBuf = new byte[length];
176 uncompressTags(src, tagBuf, 0, length);
177 dest.put(tagBuf);
178 }
179 }
180
181 private void write(byte[] data, int offset, int length, OutputStream out) throws IOException {
182 short dictIdx = Dictionary.NOT_IN_DICTIONARY;
183 if (tagDict != null) {
184 dictIdx = tagDict.findEntry(data, offset, length);
185 }
186 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
187 out.write(Dictionary.NOT_IN_DICTIONARY);
188 StreamUtils.writeRawVInt32(out, length);
189 out.write(data, offset, length);
190 } else {
191 StreamUtils.writeShort(out, dictIdx);
192 }
193 }
194 }