1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.types;
19
20 import org.apache.hadoop.hbase.classification.InterfaceAudience;
21 import org.apache.hadoop.hbase.classification.InterfaceStability;
22 import org.apache.hadoop.hbase.util.Bytes;
23 import org.apache.hadoop.hbase.util.Order;
24 import org.apache.hadoop.hbase.util.PositionedByteRange;
25 import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange;
26
27
28
29
30
31
32
33 @InterfaceAudience.Public
34 @InterfaceStability.Evolving
35 public class TerminatedWrapper<T> implements DataType<T> {
36
37 protected final DataType<T> wrapped;
38 protected final byte[] term;
39
40
41
42
43
44 public TerminatedWrapper(DataType<T> wrapped, byte[] term) {
45 if (null == term || term.length == 0)
46 throw new IllegalArgumentException("terminator must be non-null and non-empty.");
47 this.wrapped = wrapped;
48 wrapped.getOrder().apply(term);
49 this.term = term;
50 }
51
52
53
54
55
56
57
58 public TerminatedWrapper(DataType<T> wrapped, String term) {
59 this(wrapped, Bytes.toBytes(term));
60 }
61
62 @Override
63 public boolean isOrderPreserving() { return wrapped.isOrderPreserving(); }
64
65 @Override
66 public Order getOrder() { return wrapped.getOrder(); }
67
68 @Override
69 public boolean isNullable() { return wrapped.isNullable(); }
70
71 @Override
72 public boolean isSkippable() { return true; }
73
74 @Override
75 public int encodedLength(T val) {
76 return wrapped.encodedLength(val) + term.length;
77 }
78
79 @Override
80 public Class<T> encodedClass() { return wrapped.encodedClass(); }
81
82
83
84
85
86 protected int terminatorPosition(PositionedByteRange src) {
87 byte[] a = src.getBytes();
88 final int offset = src.getOffset();
89 int i;
90 SKIP: for (i = src.getPosition(); i < src.getLength(); i++) {
91 if (a[offset + i] != term[0]) continue;
92 int j;
93 for (j = 1; j < term.length && offset + j < src.getLength(); j++) {
94 if (a[offset + i + j] != term[j]) continue SKIP;
95 }
96 if (j == term.length) return i;
97 }
98 return -1;
99 }
100
101
102
103
104
105
106
107 @Override
108 public int skip(PositionedByteRange src) {
109 if (wrapped.isSkippable()) {
110 int ret = wrapped.skip(src);
111 src.setPosition(src.getPosition() + term.length);
112 return ret + term.length;
113 } else {
114
115 final int start = src.getPosition();
116 int skipped = terminatorPosition(src);
117 if (-1 == skipped) throw new IllegalArgumentException("Terminator sequence not found.");
118 skipped += term.length;
119 src.setPosition(skipped);
120 return skipped - start;
121 }
122 }
123
124 @Override
125 public T decode(PositionedByteRange src) {
126 if (wrapped.isSkippable()) {
127 T ret = wrapped.decode(src);
128 src.setPosition(src.getPosition() + term.length);
129 return ret;
130 } else {
131
132 int term = terminatorPosition(src);
133 if (-1 == term) throw new IllegalArgumentException("Terminator sequence not found.");
134 byte[] b = new byte[term - src.getPosition()];
135 src.get(b);
136
137 T ret = wrapped.decode(new SimplePositionedMutableByteRange(b));
138 src.get(this.term);
139 return ret;
140 }
141 }
142
143
144
145
146
147
148 @Override
149 public int encode(PositionedByteRange dst, T val) {
150 final int start = dst.getPosition();
151 int written = wrapped.encode(dst, val);
152 PositionedByteRange b = dst.shallowCopy();
153 b.setLength(dst.getPosition());
154 b.setPosition(start);
155 if (-1 != terminatorPosition(b)) {
156 dst.setPosition(start);
157 throw new IllegalArgumentException("Encoded value contains terminator sequence.");
158 }
159 dst.put(term);
160 return written + term.length;
161 }
162 }