1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.Cell;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.KeyValue;
28 import org.apache.hadoop.hbase.KeyValueUtil;
29 import org.apache.hadoop.hbase.KeyValue.KVComparator;
30 import org.apache.hadoop.hbase.util.ByteBufferUtils;
31 import org.apache.hadoop.hbase.util.Bytes;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
56 static final int FLAG_SAME_KEY_LENGTH = 1;
57 static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
58 static final int FLAG_SAME_TYPE = 1 << 2;
59 static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
60 static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
61 static final int SHIFT_TIMESTAMP_LENGTH = 4;
62 static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
63
64 protected static class DiffCompressionState extends CompressionState {
65 long timestamp;
66 byte[] familyNameWithSize;
67
68 @Override
69 protected void readTimestamp(ByteBuffer in) {
70 timestamp = in.getLong();
71 }
72
73 @Override
74 void copyFrom(CompressionState state) {
75 super.copyFrom(state);
76 DiffCompressionState state2 = (DiffCompressionState) state;
77 timestamp = state2.timestamp;
78 }
79 }
80
81 private void uncompressSingleKeyValue(DataInputStream source,
82 ByteBuffer buffer,
83 DiffCompressionState state)
84 throws IOException, EncoderBufferTooSmallException {
85
86 if (state.isFirst()) {
87 state.familyLength = source.readByte();
88 state.familyNameWithSize =
89 new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
90 state.familyNameWithSize[0] = state.familyLength;
91 int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
92 state.familyLength);
93 assert read == state.familyLength;
94 }
95
96
97 byte flag = source.readByte();
98
99
100 int keyLength;
101 int valueLength;
102 if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
103 keyLength = state.keyLength;
104 } else {
105 keyLength = ByteBufferUtils.readCompressedInt(source);
106 }
107 if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
108 valueLength = state.valueLength;
109 } else {
110 valueLength = ByteBufferUtils.readCompressedInt(source);
111 }
112 int commonPrefix = ByteBufferUtils.readCompressedInt(source);
113
114
115 int keyOffset = buffer.position();
116 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
117 buffer.putInt(keyLength);
118 buffer.putInt(valueLength);
119
120
121 if (commonPrefix > 0) {
122 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
123 + KeyValue.ROW_OFFSET, commonPrefix);
124 }
125
126
127 int keyRestLength;
128 if (state.isFirst() || commonPrefix <
129 state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
130
131 short rowLength;
132 int rowRestLength;
133
134
135 if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
136
137 ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
138 KeyValue.ROW_LENGTH_SIZE - commonPrefix);
139 ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
140 rowLength = buffer.getShort();
141 rowRestLength = rowLength;
142 } else {
143
144 rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
145 rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
146 }
147
148
149 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
150 state.rowLength = rowLength;
151
152
153 buffer.put(state.familyNameWithSize);
154
155 keyRestLength = keyLength - rowLength -
156 state.familyNameWithSize.length -
157 (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
158 } else {
159
160 keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
161 }
162
163 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
164
165
166 int timestampFitsInBytes =
167 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
168 long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
169 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
170 timestamp = -timestamp;
171 }
172 if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
173 timestamp = state.timestamp - timestamp;
174 }
175 buffer.putLong(timestamp);
176
177
178 byte type;
179 if ((flag & FLAG_SAME_TYPE) != 0) {
180 type = state.type;
181 } else {
182 type = source.readByte();
183 }
184 buffer.put(type);
185
186
187 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
188
189 state.keyLength = keyLength;
190 state.valueLength = valueLength;
191 state.prevOffset = keyOffset;
192 state.timestamp = timestamp;
193 state.type = type;
194
195 }
196
197 @Override
198 public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
199 DataOutputStream out) throws IOException {
200 EncodingState state = encodingContext.getEncodingState();
201 int size = compressSingleKeyValue(out, cell, state.prevCell);
202 size += afterEncodingKeyValue(cell, out, encodingContext);
203 state.prevCell = cell;
204 return size;
205 }
206
207 private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
208 throws IOException {
209 byte flag = 0;
210 int kLength = KeyValueUtil.keyLength(cell);
211 int vLength = cell.getValueLength();
212
213 long timestamp;
214 long diffTimestamp = 0;
215 int diffTimestampFitsInBytes = 0;
216 int timestampFitsInBytes;
217 int commonPrefix = 0;
218
219 if (prevCell == null) {
220 timestamp = cell.getTimestamp();
221 if (timestamp < 0) {
222 flag |= FLAG_TIMESTAMP_SIGN;
223 timestamp = -timestamp;
224 }
225 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
226 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
227
228 byte familyLength = cell.getFamilyLength();
229 out.write(familyLength);
230 out.write(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
231 } else {
232
233 int preKeyLength = KeyValueUtil.keyLength(prevCell);
234 commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
235 if (kLength == preKeyLength) {
236 flag |= FLAG_SAME_KEY_LENGTH;
237 }
238 if (vLength == prevCell.getValueLength()) {
239 flag |= FLAG_SAME_VALUE_LENGTH;
240 }
241 if (cell.getTypeByte() == prevCell.getTypeByte()) {
242 flag |= FLAG_SAME_TYPE;
243 }
244
245 timestamp = cell.getTimestamp();
246 diffTimestamp = prevCell.getTimestamp() - timestamp;
247 boolean negativeTimestamp = timestamp < 0;
248 if (negativeTimestamp) {
249 timestamp = -timestamp;
250 }
251 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
252 boolean minusDiffTimestamp = diffTimestamp < 0;
253 if (minusDiffTimestamp) {
254 diffTimestamp = -diffTimestamp;
255 }
256 diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
257 if (diffTimestampFitsInBytes < timestampFitsInBytes) {
258 flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
259 flag |= FLAG_TIMESTAMP_IS_DIFF;
260 if (minusDiffTimestamp) {
261 flag |= FLAG_TIMESTAMP_SIGN;
262 }
263 } else {
264 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
265 if (negativeTimestamp) {
266 flag |= FLAG_TIMESTAMP_SIGN;
267 }
268 }
269 }
270 out.write(flag);
271 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
272 ByteBufferUtils.putCompressedInt(out, kLength);
273 }
274 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
275 ByteBufferUtils.putCompressedInt(out, vLength);
276 }
277 ByteBufferUtils.putCompressedInt(out, commonPrefix);
278 short rLen = cell.getRowLength();
279 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
280
281
282 CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
283 out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
284 } else {
285
286
287
288
289 int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
290 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
291 out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
292 cell.getQualifierLength() - commonQualPrefix);
293 }
294 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
295 ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
296 } else {
297 ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
298 }
299
300 if ((flag & FLAG_SAME_TYPE) == 0) {
301 out.write(cell.getTypeByte());
302 }
303 out.write(cell.getValueArray(), cell.getValueOffset(), vLength);
304 return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
305 }
306
307 @Override
308 public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
309 block.mark();
310 block.position(Bytes.SIZEOF_INT);
311 byte familyLength = block.get();
312 ByteBufferUtils.skip(block, familyLength);
313 byte flag = block.get();
314 int keyLength = ByteBufferUtils.readCompressedInt(block);
315 ByteBufferUtils.readCompressedInt(block);
316 ByteBufferUtils.readCompressedInt(block);
317 ByteBuffer result = ByteBuffer.allocate(keyLength);
318
319
320 assert !(result.isDirect());
321 int pos = result.arrayOffset();
322 block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
323 pos += Bytes.SIZEOF_SHORT;
324 short rowLength = result.getShort();
325 block.get(result.array(), pos, rowLength);
326 pos += rowLength;
327
328
329 int savePosition = block.position();
330 block.position(Bytes.SIZEOF_INT);
331 block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
332 pos += familyLength + Bytes.SIZEOF_BYTE;
333
334
335 block.position(savePosition);
336 int qualifierLength =
337 keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
338 block.get(result.array(), pos, qualifierLength);
339 pos += qualifierLength;
340
341
342 int timestampFitInBytes =
343 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
344 long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
345 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
346 timestamp = -timestamp;
347 }
348 result.putLong(pos, timestamp);
349 pos += Bytes.SIZEOF_LONG;
350 block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
351
352 block.reset();
353 return result;
354 }
355
356 @Override
357 public String toString() {
358 return DiffKeyDeltaEncoder.class.getSimpleName();
359 }
360
361 protected static class DiffSeekerState extends SeekerState {
362 private int rowLengthWithSize;
363 private long timestamp;
364
365 @Override
366 protected void copyFromNext(SeekerState that) {
367 super.copyFromNext(that);
368 DiffSeekerState other = (DiffSeekerState) that;
369 rowLengthWithSize = other.rowLengthWithSize;
370 timestamp = other.timestamp;
371 }
372 }
373
374 @Override
375 public EncodedSeeker createSeeker(KVComparator comparator,
376 HFileBlockDecodingContext decodingCtx) {
377 return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
378 private byte[] familyNameWithSize;
379 private static final int TIMESTAMP_WITH_TYPE_LENGTH =
380 Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
381
382 private void decode(boolean isFirst) {
383 byte flag = currentBuffer.get();
384 byte type = 0;
385 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
386 if (!isFirst) {
387 type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
388 }
389 current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
390 }
391 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
392 current.valueLength =
393 ByteBufferUtils.readCompressedInt(currentBuffer);
394 }
395 current.lastCommonPrefix =
396 ByteBufferUtils.readCompressedInt(currentBuffer);
397
398 current.ensureSpaceForKey();
399
400 if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
401
402
403
404 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
405 Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
406 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
407 Bytes.SIZEOF_SHORT;
408
409
410 currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
411 current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
412
413
414 System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
415 current.rowLengthWithSize, familyNameWithSize.length);
416
417
418 currentBuffer.get(current.keyBuffer,
419 current.rowLengthWithSize + familyNameWithSize.length,
420 current.keyLength - current.rowLengthWithSize -
421 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
422 } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
423
424
425
426
427 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
428 current.rowLengthWithSize - current.lastCommonPrefix);
429
430
431 currentBuffer.get(current.keyBuffer,
432 current.rowLengthWithSize + familyNameWithSize.length,
433 current.keyLength - current.rowLengthWithSize -
434 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
435 } else {
436
437 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
438 current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
439 current.lastCommonPrefix);
440 }
441
442
443 int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
444 int timestampFitInBytes = 1 +
445 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
446 long timestampOrDiff =
447 ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
448 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
449 timestampOrDiff = -timestampOrDiff;
450 }
451 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
452 current.timestamp = timestampOrDiff;
453 } else {
454 current.timestamp = current.timestamp - timestampOrDiff;
455 }
456 Bytes.putLong(current.keyBuffer, pos, current.timestamp);
457 pos += Bytes.SIZEOF_LONG;
458
459
460 if ((flag & FLAG_SAME_TYPE) == 0) {
461 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
462 } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
463 current.keyBuffer[pos] = type;
464 }
465
466 current.valueOffset = currentBuffer.position();
467 ByteBufferUtils.skip(currentBuffer, current.valueLength);
468
469 if (includesTags()) {
470 decodeTags();
471 }
472 if (includesMvcc()) {
473 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
474 } else {
475 current.memstoreTS = 0;
476 }
477 current.nextKvOffset = currentBuffer.position();
478 }
479
480 @Override
481 protected void decodeFirst() {
482 ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
483
484
485 byte familyNameLength = currentBuffer.get();
486 familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
487 familyNameWithSize[0] = familyNameLength;
488 currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
489 familyNameLength);
490 decode(true);
491 }
492
493 @Override
494 protected void decodeNext() {
495 decode(false);
496 }
497
498 @Override
499 protected DiffSeekerState createSeekerState() {
500 return new DiffSeekerState();
501 }
502 };
503 }
504
505 @Override
506 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
507 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
508 int decompressedSize = source.readInt();
509 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
510 allocateHeaderLength);
511 buffer.position(allocateHeaderLength);
512 DiffCompressionState state = new DiffCompressionState();
513 while (source.available() > skipLastBytes) {
514 uncompressSingleKeyValue(source, buffer, state);
515 afterDecodingKeyValue(source, buffer, decodingCtx);
516 }
517
518 if (source.available() != skipLastBytes) {
519 throw new IllegalStateException("Read too much bytes.");
520 }
521
522 return buffer;
523 }
524 }