View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.ByteArrayOutputStream;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.security.SecureRandom;
27  
28  import org.apache.commons.io.IOUtils;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.codec.KeyValueCodec;
33  import org.apache.hadoop.hbase.io.crypto.Decryptor;
34  import org.apache.hadoop.hbase.io.crypto.Encryptor;
35  import org.apache.hadoop.hbase.io.util.StreamUtils;
36  import org.apache.hadoop.hbase.util.Bytes;
37  
38  /**
39   * A WALCellCodec that encrypts the WALedits.
40   */
41  public class SecureWALCellCodec extends WALCellCodec {
42  
43    private static final SecureRandom RNG = new SecureRandom();
44  
45    private Encryptor encryptor;
46    private Decryptor decryptor;
47  
48    public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
49      super(conf, null);
50      this.encryptor = encryptor;
51    }
52  
53    public SecureWALCellCodec(Configuration conf, Decryptor decryptor) {
54      super(conf, null);
55      this.decryptor = decryptor;
56    }
57  
58    static class EncryptedKvDecoder extends KeyValueCodec.KeyValueDecoder {
59  
60      private Decryptor decryptor;
61      private byte[] iv;
62  
63      public EncryptedKvDecoder(InputStream in) {
64        super(in);
65      }
66  
67      public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
68        super(in);
69        this.decryptor = decryptor;
70        this.iv = new byte[decryptor.getIvLength()];
71      }
72  
73      @Override
74      protected Cell parseCell() throws IOException {
75        int ivLength = 0;
76        try {
77          ivLength = StreamUtils.readRawVarint32(in);
78        } catch (EOFException e) {
79          // EOF at start is OK
80          return null;
81        }
82  
83        // TODO: An IV length of 0 could signify an unwrapped cell, when the
84        // encoder supports that just read the remainder in directly
85  
86        if (ivLength != this.iv.length) {
87          throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" +
88            ivLength);
89        }
90        IOUtils.readFully(in, this.iv);
91  
92        int codedLength = StreamUtils.readRawVarint32(in);
93        byte[] codedBytes = new byte[codedLength];
94        IOUtils.readFully(in, codedBytes);
95  
96        decryptor.setIv(iv);
97        decryptor.reset();
98  
99        InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes));
100 
101       // TODO: Add support for WAL compression
102 
103       int keylength = StreamUtils.readRawVarint32(cin);
104       int vlength = StreamUtils.readRawVarint32(cin);
105       int tagsLength = StreamUtils.readRawVarint32(cin);
106       int length = 0;
107       if (tagsLength == 0) {
108         length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
109       } else {
110         length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
111       }
112 
113       byte[] backingArray = new byte[length];
114       int pos = 0;
115       pos = Bytes.putInt(backingArray, pos, keylength);
116       pos = Bytes.putInt(backingArray, pos, vlength);
117 
118       // Row
119       int elemLen = StreamUtils.readRawVarint32(cin);
120       pos = Bytes.putShort(backingArray, pos, (short)elemLen);
121       IOUtils.readFully(cin, backingArray, pos, elemLen);
122       pos += elemLen;
123       // Family
124       elemLen = StreamUtils.readRawVarint32(cin);
125       pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
126       IOUtils.readFully(cin, backingArray, pos, elemLen);
127       pos += elemLen;
128       // Qualifier
129       elemLen = StreamUtils.readRawVarint32(cin);
130       IOUtils.readFully(cin, backingArray, pos, elemLen);
131       pos += elemLen;
132       // Remainder
133       IOUtils.readFully(cin, backingArray, pos, length - pos);
134       return new KeyValue(backingArray, 0, length);
135     }
136 
137   }
138 
139   static class EncryptedKvEncoder extends KeyValueCodec.KeyValueEncoder {
140 
141     private Encryptor encryptor;
142     private byte[] iv;
143 
144     public EncryptedKvEncoder(OutputStream os) {
145       super(os);
146     }
147 
148     public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) {
149       super(os);
150       this.encryptor = encryptor;
151       iv = new byte[encryptor.getIvLength()];
152     }
153 
154     @Override
155     public void write(Cell cell) throws IOException {
156       if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
157 
158       KeyValue kv = (KeyValue)cell;
159       byte[] kvBuffer = kv.getBuffer();
160       int offset = kv.getOffset();
161 
162       RNG.nextBytes(iv);
163       encryptor.setIv(iv);
164       encryptor.reset();
165 
166       // TODO: Check if this is a cell for an encrypted CF. If not, we can
167       // write a 0 here to signal an unwrapped cell and just dump the KV bytes
168       // afterward
169 
170       StreamUtils.writeRawVInt32(out, iv.length);
171       out.write(iv);
172 
173       // TODO: Add support for WAL compression
174 
175       ByteArrayOutputStream baos = new ByteArrayOutputStream();
176       OutputStream cout = encryptor.createEncryptionStream(baos);
177 
178       // Write the KeyValue infrastructure as VInts.
179       StreamUtils.writeRawVInt32(cout, kv.getKeyLength());
180       StreamUtils.writeRawVInt32(cout, kv.getValueLength());
181       // To support tags
182       StreamUtils.writeRawVInt32(cout, kv.getTagsLength());
183 
184       // Write row, qualifier, and family
185       StreamUtils.writeRawVInt32(cout, kv.getRowLength());
186       cout.write(kvBuffer, kv.getRowOffset(), kv.getRowLength());
187       StreamUtils.writeRawVInt32(cout, kv.getFamilyLength());
188       cout.write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength());
189       StreamUtils.writeRawVInt32(cout, kv.getQualifierLength());
190       cout.write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength());
191       // Write the rest
192       int pos = kv.getTimestampOffset();
193       int remainingLength = kv.getLength() + offset - pos;
194       cout.write(kvBuffer, pos, remainingLength);
195       cout.close();
196 
197       StreamUtils.writeRawVInt32(out, baos.size());
198       baos.writeTo(out);
199     }
200 
201   }
202 
203   @Override
204   public Decoder getDecoder(InputStream is) {
205     return new EncryptedKvDecoder(is, decryptor);
206   }
207 
208   @Override
209   public Encoder getEncoder(OutputStream os) {
210     return new EncryptedKvEncoder(os, encryptor);
211   }
212 
213   public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) {
214     return new SecureWALCellCodec(conf, encryptor);
215   }
216 
217   public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) {
218     return new SecureWALCellCodec(conf, decryptor);
219   }
220 
221 }