1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.KeyValueUtil;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.codec.BaseDecoder;
32 import org.apache.hadoop.hbase.codec.BaseEncoder;
33 import org.apache.hadoop.hbase.codec.Codec;
34 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
35 import org.apache.hadoop.hbase.io.util.Dictionary;
36 import org.apache.hadoop.hbase.io.util.StreamUtils;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.util.ReflectionUtils;
39 import org.apache.hadoop.io.IOUtils;
40
41 import com.google.protobuf.ByteString;
42
43
44
45
46
47
48
49
50
51 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
52 public class WALCellCodec implements Codec {
53
54 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
55
56 protected final CompressionContext compression;
57 protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
58 @Override
59 public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
60 return WALCellCodec.uncompressByteString(data, dict);
61 }
62 };
63
64
65
66
67 public WALCellCodec() {
68 this.compression = null;
69 }
70
71
72
73
74
75
76
77
78 public WALCellCodec(Configuration conf, CompressionContext compression) {
79 this.compression = compression;
80 }
81
82 public static Class<?> getWALCellCodecClass(Configuration conf) {
83 return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public static WALCellCodec create(Configuration conf, String cellCodecClsName,
99 CompressionContext compression) throws UnsupportedOperationException {
100 if (cellCodecClsName == null) {
101 cellCodecClsName = getWALCellCodecClass(conf).getName();
102 }
103 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
104 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118 public static WALCellCodec create(Configuration conf,
119 CompressionContext compression) throws UnsupportedOperationException {
120 String cellCodecClsName = getWALCellCodecClass(conf).getName();
121 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
122 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
123 }
124
125 public interface ByteStringCompressor {
126 ByteString compress(byte[] data, Dictionary dict) throws IOException;
127 }
128
129 public interface ByteStringUncompressor {
130 byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
131 }
132
133
134
135
136 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
137 public ByteString toByteString() {
138 return ByteString.copyFrom(this.buf, 0, this.count);
139 }
140
141 @Override
142 public ByteString compress(byte[] data, Dictionary dict) throws IOException {
143 writeCompressed(data, dict);
144 ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
145 reset();
146 return result;
147 }
148
149 private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
150 assert dict != null;
151 short dictIdx = dict.findEntry(data, 0, data.length);
152 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
153 write(Dictionary.NOT_IN_DICTIONARY);
154 StreamUtils.writeRawVInt32(this, data.length);
155 write(data, 0, data.length);
156 } else {
157 StreamUtils.writeShort(this, dictIdx);
158 }
159 }
160 }
161
162 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
163 InputStream in = bs.newInput();
164 byte status = (byte)in.read();
165 if (status == Dictionary.NOT_IN_DICTIONARY) {
166 byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
167 int bytesRead = in.read(arr);
168 if (bytesRead != arr.length) {
169 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
170 }
171 if (dict != null) dict.addEntry(arr, 0, arr.length);
172 return arr;
173 } else {
174
175 short dictIdx = StreamUtils.toShort(status, (byte)in.read());
176 byte[] entry = dict.getEntry(dictIdx);
177 if (entry == null) {
178 throw new IOException("Missing dictionary entry for index " + dictIdx);
179 }
180 return entry;
181 }
182 }
183
184 static class CompressedKvEncoder extends BaseEncoder {
185 private final CompressionContext compression;
186 public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
187 super(out);
188 this.compression = compression;
189 }
190
191 @Override
192 public void write(Cell cell) throws IOException {
193
194 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
195 StreamUtils.writeRawVInt32(out, cell.getValueLength());
196
197 int tagsLength = cell.getTagsLength();
198 StreamUtils.writeRawVInt32(out, tagsLength);
199
200
201
202 write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict);
203 write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
204 compression.familyDict);
205 write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
206 compression.qualifierDict);
207
208
209 StreamUtils.writeLong(out, cell.getTimestamp());
210 out.write(cell.getTypeByte());
211 out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
212 if (tagsLength > 0) {
213 if (compression.tagCompressionContext != null) {
214
215 compression.tagCompressionContext.compressTags(out, cell.getTagsArray(),
216 cell.getTagsOffset(), tagsLength);
217 } else {
218
219
220 out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
221 }
222 }
223 }
224
225 private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
226 short dictIdx = Dictionary.NOT_IN_DICTIONARY;
227 if (dict != null) {
228 dictIdx = dict.findEntry(data, offset, length);
229 }
230 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
231 out.write(Dictionary.NOT_IN_DICTIONARY);
232 StreamUtils.writeRawVInt32(out, length);
233 out.write(data, offset, length);
234 } else {
235 StreamUtils.writeShort(out, dictIdx);
236 }
237 }
238 }
239
240 static class CompressedKvDecoder extends BaseDecoder {
241 private final CompressionContext compression;
242 public CompressedKvDecoder(InputStream in, CompressionContext compression) {
243 super(in);
244 this.compression = compression;
245 }
246
247 @Override
248 protected Cell parseCell() throws IOException {
249 int keylength = StreamUtils.readRawVarint32(in);
250 int vlength = StreamUtils.readRawVarint32(in);
251
252 int tagsLength = StreamUtils.readRawVarint32(in);
253 int length = 0;
254 if(tagsLength == 0) {
255 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
256 } else {
257 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
258 }
259
260 byte[] backingArray = new byte[length];
261 int pos = 0;
262 pos = Bytes.putInt(backingArray, pos, keylength);
263 pos = Bytes.putInt(backingArray, pos, vlength);
264
265
266 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
267 checkLength(elemLen, Short.MAX_VALUE);
268 pos = Bytes.putShort(backingArray, pos, (short)elemLen);
269 pos += elemLen;
270
271
272 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
273 checkLength(elemLen, Byte.MAX_VALUE);
274 pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
275 pos += elemLen;
276
277
278 elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
279 pos += elemLen;
280
281
282 int tsTypeValLen = length - pos;
283 if (tagsLength > 0) {
284 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
285 }
286 IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
287 pos += tsTypeValLen;
288
289
290 if (tagsLength > 0) {
291 pos = Bytes.putAsShort(backingArray, pos, tagsLength);
292 if (compression.tagCompressionContext != null) {
293 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
294 } else {
295 IOUtils.readFully(in, backingArray, pos, tagsLength);
296 }
297 }
298 return new KeyValue(backingArray, 0, length);
299 }
300
301 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
302 byte status = (byte)in.read();
303 if (status == Dictionary.NOT_IN_DICTIONARY) {
304
305
306 int length = StreamUtils.readRawVarint32(in);
307 IOUtils.readFully(in, to, offset, length);
308 dict.addEntry(to, offset, length);
309 return length;
310 } else {
311
312 short dictIdx = StreamUtils.toShort(status, (byte)in.read());
313 byte[] entry = dict.getEntry(dictIdx);
314 if (entry == null) {
315 throw new IOException("Missing dictionary entry for index " + dictIdx);
316 }
317
318 Bytes.putBytes(to, offset, entry, 0, entry.length);
319 return entry.length;
320 }
321 }
322
323 private static void checkLength(int len, int max) throws IOException {
324 if (len < 0 || len > max) {
325 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
326 }
327 }
328 }
329
330 public static class EnsureKvEncoder extends BaseEncoder {
331 public EnsureKvEncoder(OutputStream out) {
332 super(out);
333 }
334 @Override
335 public void write(Cell cell) throws IOException {
336 checkFlushed();
337
338 KeyValueUtil.oswrite(cell, this.out, true);
339 }
340 }
341
342 @Override
343 public Decoder getDecoder(InputStream is) {
344 return (compression == null)
345 ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
346 }
347
348 @Override
349 public Encoder getEncoder(OutputStream os) {
350 return (compression == null)
351 ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
352 }
353
354 public ByteStringCompressor getByteStringCompressor() {
355
356 return new BaosAndCompressor();
357 }
358
359 public ByteStringUncompressor getByteStringUncompressor() {
360
361 return this.statelessUncompressor;
362 }
363 }