001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayInputStream;
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.security.SecureRandom;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.PrivateCellUtil;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.KeyValueUtil;
034import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
035import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
036import org.apache.hadoop.hbase.io.crypto.Decryptor;
037import org.apache.hadoop.hbase.io.crypto.Encryption;
038import org.apache.hadoop.hbase.io.crypto.Encryptor;
039import org.apache.hadoop.hbase.io.util.StreamUtils;
040import org.apache.hadoop.hbase.util.Bytes;
041
042/**
043 * A WALCellCodec that encrypts the WALedits.
044 */
045@InterfaceAudience.Private
046public class SecureWALCellCodec extends WALCellCodec {
047
048  private Encryptor encryptor;
049  private Decryptor decryptor;
050
051  public SecureWALCellCodec(Configuration conf, CompressionContext compression) {
052    super(conf, compression);
053  }
054
055  public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
056    super(conf, null);
057    this.encryptor = encryptor;
058  }
059
060  public SecureWALCellCodec(Configuration conf, Decryptor decryptor) {
061    super(conf, null);
062    this.decryptor = decryptor;
063  }
064
065  static class EncryptedKvDecoder extends KeyValueCodecWithTags.KeyValueDecoder {
066
067    private Decryptor decryptor;
068    private byte[] iv;
069
070    public EncryptedKvDecoder(InputStream in) {
071      super(in);
072    }
073
074    public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
075      super(in);
076      this.decryptor = decryptor;
077      if (decryptor != null) {
078        this.iv = new byte[decryptor.getIvLength()];
079      }
080    }
081
082    @Override
083    protected Cell parseCell() throws IOException {
084      if (this.decryptor == null) {
085        return super.parseCell();
086      }
087      int ivLength = 0;
088
089      ivLength = StreamUtils.readRawVarint32(in);
090
091      // TODO: An IV length of 0 could signify an unwrapped cell, when the
092      // encoder supports that just read the remainder in directly
093
094      if (ivLength != this.iv.length) {
095        throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" +
096          ivLength);
097      }
098      IOUtils.readFully(in, this.iv);
099
100      int codedLength = StreamUtils.readRawVarint32(in);
101      byte[] codedBytes = new byte[codedLength];
102      IOUtils.readFully(in, codedBytes);
103
104      decryptor.setIv(iv);
105      decryptor.reset();
106
107      InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes));
108
109      // TODO: Add support for WAL compression
110
111      int keylength = StreamUtils.readRawVarint32(cin);
112      int vlength = StreamUtils.readRawVarint32(cin);
113      int tagsLength = StreamUtils.readRawVarint32(cin);
114      int length = 0;
115      if (tagsLength == 0) {
116        length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
117      } else {
118        length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
119      }
120
121      byte[] backingArray = new byte[length];
122      int pos = 0;
123      pos = Bytes.putInt(backingArray, pos, keylength);
124      pos = Bytes.putInt(backingArray, pos, vlength);
125
126      // Row
127      int elemLen = StreamUtils.readRawVarint32(cin);
128      pos = Bytes.putShort(backingArray, pos, (short)elemLen);
129      IOUtils.readFully(cin, backingArray, pos, elemLen);
130      pos += elemLen;
131      // Family
132      elemLen = StreamUtils.readRawVarint32(cin);
133      pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
134      IOUtils.readFully(cin, backingArray, pos, elemLen);
135      pos += elemLen;
136      // Qualifier
137      elemLen = StreamUtils.readRawVarint32(cin);
138      IOUtils.readFully(cin, backingArray, pos, elemLen);
139      pos += elemLen;
140      // Remainder
141      IOUtils.readFully(cin, backingArray, pos, length - pos);
142      return new KeyValue(backingArray, 0, length);
143    }
144
145  }
146
147  static class EncryptedKvEncoder extends KeyValueCodecWithTags.KeyValueEncoder {
148
149    private Encryptor encryptor;
150    private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() {
151      @Override
152      protected byte[] initialValue() {
153        byte[] iv = new byte[encryptor.getIvLength()];
154        new SecureRandom().nextBytes(iv);
155        return iv;
156      }
157    };
158
159    protected byte[] nextIv() {
160      byte[] b = iv.get(), ret = new byte[b.length];
161      System.arraycopy(b, 0, ret, 0, b.length);
162      return ret;
163    }
164
165    protected void incrementIv(int v) {
166      Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize()));
167    }
168
169    public EncryptedKvEncoder(OutputStream os) {
170      super(os);
171    }
172
173    public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) {
174      super(os);
175      this.encryptor = encryptor;
176    }
177
178    @Override
179    public void write(Cell cell) throws IOException {
180      if (encryptor == null) {
181        super.write(cell);
182        return;
183      }
184
185      byte[] iv = nextIv();
186      encryptor.setIv(iv);
187      encryptor.reset();
188
189      // TODO: Check if this is a cell for an encrypted CF. If not, we can
190      // write a 0 here to signal an unwrapped cell and just dump the KV bytes
191      // afterward
192
193      StreamUtils.writeRawVInt32(out, iv.length);
194      out.write(iv);
195
196      // TODO: Add support for WAL compression
197
198      ByteArrayOutputStream baos = new ByteArrayOutputStream();
199      OutputStream cout = encryptor.createEncryptionStream(baos);
200      ByteBufferWriterOutputStream bos = new ByteBufferWriterOutputStream(cout);
201      int tlen = cell.getTagsLength();
202      // Write the KeyValue infrastructure as VInts.
203      StreamUtils.writeRawVInt32(bos, KeyValueUtil.keyLength(cell));
204      StreamUtils.writeRawVInt32(bos, cell.getValueLength());
205      // To support tags
206      StreamUtils.writeRawVInt32(bos, tlen);
207
208      // Write row, qualifier, and family
209      short rowLength = cell.getRowLength();
210      StreamUtils.writeRawVInt32(bos, rowLength);
211      PrivateCellUtil.writeRow(bos, cell, rowLength);
212      byte familyLength = cell.getFamilyLength();
213      StreamUtils.writeRawVInt32(bos, familyLength);
214      PrivateCellUtil.writeFamily(bos, cell, familyLength);
215      int qualifierLength = cell.getQualifierLength();
216      StreamUtils.writeRawVInt32(bos, qualifierLength);
217      PrivateCellUtil.writeQualifier(bos, cell, qualifierLength);
218      // Write the rest ie. ts, type, value and tags parts
219      StreamUtils.writeLong(bos, cell.getTimestamp());
220      bos.write(cell.getTypeByte());
221      PrivateCellUtil.writeValue(bos, cell, cell.getValueLength());
222      if (tlen > 0) {
223        PrivateCellUtil.writeTags(bos, cell, tlen);
224      }
225      bos.close();
226
227      StreamUtils.writeRawVInt32(out, baos.size());
228      baos.writeTo(out);
229
230      // Increment IV given the final payload length
231      incrementIv(baos.size());
232    }
233
234  }
235
236  @Override
237  public Decoder getDecoder(InputStream is) {
238    return new EncryptedKvDecoder(is, decryptor);
239  }
240
241  @Override
242  public Encoder getEncoder(OutputStream os) {
243    return new EncryptedKvEncoder(os, encryptor);
244  }
245
246  public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) {
247    return new SecureWALCellCodec(conf, encryptor);
248  }
249
250  public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) {
251    return new SecureWALCellCodec(conf, decryptor);
252  }
253
254}