1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.security.SecureRandom;
26
27 import org.apache.commons.io.IOUtils;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.KeyValueUtil;
33 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
34 import org.apache.hadoop.hbase.io.crypto.Decryptor;
35 import org.apache.hadoop.hbase.io.crypto.Encryption;
36 import org.apache.hadoop.hbase.io.crypto.Encryptor;
37 import org.apache.hadoop.hbase.io.util.StreamUtils;
38 import org.apache.hadoop.hbase.util.Bytes;
39
40
41
42
43 @InterfaceAudience.Private
44 public class SecureWALCellCodec extends WALCellCodec {
45
46 private Encryptor encryptor;
47 private Decryptor decryptor;
48
49 public SecureWALCellCodec(Configuration conf, CompressionContext compression) {
50 super(conf, compression);
51 }
52
53 public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
54 super(conf, null);
55 this.encryptor = encryptor;
56 }
57
58 public SecureWALCellCodec(Configuration conf, Decryptor decryptor) {
59 super(conf, null);
60 this.decryptor = decryptor;
61 }
62
63 static class EncryptedKvDecoder extends KeyValueCodecWithTags.KeyValueDecoder {
64
65 private Decryptor decryptor;
66 private byte[] iv;
67
68 public EncryptedKvDecoder(InputStream in) {
69 super(in);
70 }
71
72 public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
73 super(in);
74 this.decryptor = decryptor;
75 if (decryptor != null) {
76 this.iv = new byte[decryptor.getIvLength()];
77 }
78 }
79
80 @Override
81 protected Cell parseCell() throws IOException {
82 if (this.decryptor == null) {
83 return super.parseCell();
84 }
85 int ivLength = 0;
86
87 ivLength = StreamUtils.readRawVarint32(in);
88
89
90
91
92 if (ivLength != this.iv.length) {
93 throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" +
94 ivLength);
95 }
96 IOUtils.readFully(in, this.iv);
97
98 int codedLength = StreamUtils.readRawVarint32(in);
99 byte[] codedBytes = new byte[codedLength];
100 IOUtils.readFully(in, codedBytes);
101
102 decryptor.setIv(iv);
103 decryptor.reset();
104
105 InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes));
106
107
108
109 int keylength = StreamUtils.readRawVarint32(cin);
110 int vlength = StreamUtils.readRawVarint32(cin);
111 int tagsLength = StreamUtils.readRawVarint32(cin);
112 int length = 0;
113 if (tagsLength == 0) {
114 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
115 } else {
116 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
117 }
118
119 byte[] backingArray = new byte[length];
120 int pos = 0;
121 pos = Bytes.putInt(backingArray, pos, keylength);
122 pos = Bytes.putInt(backingArray, pos, vlength);
123
124
125 int elemLen = StreamUtils.readRawVarint32(cin);
126 pos = Bytes.putShort(backingArray, pos, (short)elemLen);
127 IOUtils.readFully(cin, backingArray, pos, elemLen);
128 pos += elemLen;
129
130 elemLen = StreamUtils.readRawVarint32(cin);
131 pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
132 IOUtils.readFully(cin, backingArray, pos, elemLen);
133 pos += elemLen;
134
135 elemLen = StreamUtils.readRawVarint32(cin);
136 IOUtils.readFully(cin, backingArray, pos, elemLen);
137 pos += elemLen;
138
139 IOUtils.readFully(cin, backingArray, pos, length - pos);
140 return new KeyValue(backingArray, 0, length);
141 }
142
143 }
144
145 static class EncryptedKvEncoder extends KeyValueCodecWithTags.KeyValueEncoder {
146
147 private Encryptor encryptor;
148 private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() {
149 @Override
150 protected byte[] initialValue() {
151 byte[] iv = new byte[encryptor.getIvLength()];
152 new SecureRandom().nextBytes(iv);
153 return iv;
154 }
155 };
156
157 protected byte[] nextIv() {
158 byte[] b = iv.get(), ret = new byte[b.length];
159 System.arraycopy(b, 0, ret, 0, b.length);
160 return ret;
161 }
162
163 protected void incrementIv(int v) {
164 Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize()));
165 }
166
167 public EncryptedKvEncoder(OutputStream os) {
168 super(os);
169 }
170
171 public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) {
172 super(os);
173 this.encryptor = encryptor;
174 }
175
176 @Override
177 public void write(Cell cell) throws IOException {
178 if (encryptor == null) {
179 super.write(cell);
180 return;
181 }
182
183 byte[] iv = nextIv();
184 encryptor.setIv(iv);
185 encryptor.reset();
186
187
188
189
190
191 StreamUtils.writeRawVInt32(out, iv.length);
192 out.write(iv);
193
194
195
196 ByteArrayOutputStream baos = new ByteArrayOutputStream();
197 OutputStream cout = encryptor.createEncryptionStream(baos);
198
199 int tlen = cell.getTagsLength();
200
201 StreamUtils.writeRawVInt32(cout, KeyValueUtil.keyLength(cell));
202 StreamUtils.writeRawVInt32(cout, cell.getValueLength());
203
204 StreamUtils.writeRawVInt32(cout, tlen);
205
206
207 StreamUtils.writeRawVInt32(cout, cell.getRowLength());
208 cout.write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
209 StreamUtils.writeRawVInt32(cout, cell.getFamilyLength());
210 cout.write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
211 StreamUtils.writeRawVInt32(cout, cell.getQualifierLength());
212 cout.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
213
214 StreamUtils.writeLong(cout, cell.getTimestamp());
215 cout.write(cell.getTypeByte());
216 cout.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
217 if (tlen > 0) {
218 cout.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
219 }
220 cout.close();
221
222 StreamUtils.writeRawVInt32(out, baos.size());
223 baos.writeTo(out);
224
225
226 incrementIv(baos.size());
227 }
228
229 }
230
231 @Override
232 public Decoder getDecoder(InputStream is) {
233 return new EncryptedKvDecoder(is, decryptor);
234 }
235
236 @Override
237 public Encoder getEncoder(OutputStream os) {
238 return new EncryptedKvEncoder(os, encryptor);
239 }
240
241 public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) {
242 return new SecureWALCellCodec(conf, encryptor);
243 }
244
245 public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) {
246 return new SecureWALCellCodec(conf, decryptor);
247 }
248
249 }