001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayInputStream;
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import org.apache.commons.io.IOUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.KeyValueUtil;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
032import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
033import org.apache.hadoop.hbase.io.crypto.Decryptor;
034import org.apache.hadoop.hbase.io.crypto.Encryption;
035import org.apache.hadoop.hbase.io.crypto.Encryptor;
036import org.apache.hadoop.hbase.io.util.StreamUtils;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * A WALCellCodec that encrypts the WALedits.
042 */
043@InterfaceAudience.Private
044public class SecureWALCellCodec extends WALCellCodec {
045
046  private Encryptor encryptor;
047  private Decryptor decryptor;
048
049  public SecureWALCellCodec(Configuration conf, CompressionContext compression) {
050    super(conf, compression);
051  }
052
053  public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
054    super(conf, null);
055    this.encryptor = encryptor;
056  }
057
058  public SecureWALCellCodec(Configuration conf, Decryptor decryptor) {
059    super(conf, null);
060    this.decryptor = decryptor;
061  }
062
063  static class EncryptedKvDecoder extends KeyValueCodecWithTags.KeyValueDecoder {
064
065    private Decryptor decryptor;
066    private byte[] iv;
067
068    public EncryptedKvDecoder(InputStream in) {
069      super(in);
070    }
071
072    public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
073      super(in);
074      this.decryptor = decryptor;
075      if (decryptor != null) {
076        this.iv = new byte[decryptor.getIvLength()];
077      }
078    }
079
080    @Override
081    protected Cell parseCell() throws IOException {
082      if (this.decryptor == null) {
083        return super.parseCell();
084      }
085      int ivLength = 0;
086
087      ivLength = StreamUtils.readRawVarint32(in);
088
089      // TODO: An IV length of 0 could signify an unwrapped cell, when the
090      // encoder supports that just read the remainder in directly
091
092      if (ivLength != this.iv.length) {
093        throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" + ivLength);
094      }
095      IOUtils.readFully(in, this.iv);
096
097      int codedLength = StreamUtils.readRawVarint32(in);
098      byte[] codedBytes = new byte[codedLength];
099      IOUtils.readFully(in, codedBytes);
100
101      decryptor.setIv(iv);
102      decryptor.reset();
103
104      InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes));
105
106      // TODO: Add support for WAL compression
107
108      int keylength = StreamUtils.readRawVarint32(cin);
109      int vlength = StreamUtils.readRawVarint32(cin);
110      int tagsLength = StreamUtils.readRawVarint32(cin);
111      int length = 0;
112      if (tagsLength == 0) {
113        length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
114      } else {
115        length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
116      }
117
118      byte[] backingArray = new byte[length];
119      int pos = 0;
120      pos = Bytes.putInt(backingArray, pos, keylength);
121      pos = Bytes.putInt(backingArray, pos, vlength);
122
123      // Row
124      int elemLen = StreamUtils.readRawVarint32(cin);
125      pos = Bytes.putShort(backingArray, pos, (short) elemLen);
126      IOUtils.readFully(cin, backingArray, pos, elemLen);
127      pos += elemLen;
128      // Family
129      elemLen = StreamUtils.readRawVarint32(cin);
130      pos = Bytes.putByte(backingArray, pos, (byte) elemLen);
131      IOUtils.readFully(cin, backingArray, pos, elemLen);
132      pos += elemLen;
133      // Qualifier
134      elemLen = StreamUtils.readRawVarint32(cin);
135      IOUtils.readFully(cin, backingArray, pos, elemLen);
136      pos += elemLen;
137      // Remainder
138      IOUtils.readFully(cin, backingArray, pos, length - pos);
139      return new KeyValue(backingArray, 0, length);
140    }
141
142  }
143
144  static class EncryptedKvEncoder extends KeyValueCodecWithTags.KeyValueEncoder {
145
146    private Encryptor encryptor;
147    private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() {
148      @Override
149      protected byte[] initialValue() {
150        byte[] iv = new byte[encryptor.getIvLength()];
151        Bytes.secureRandom(iv);
152        return iv;
153      }
154    };
155
156    protected byte[] nextIv() {
157      byte[] b = iv.get(), ret = new byte[b.length];
158      System.arraycopy(b, 0, ret, 0, b.length);
159      return ret;
160    }
161
162    protected void incrementIv(int v) {
163      Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize()));
164    }
165
166    public EncryptedKvEncoder(OutputStream os) {
167      super(os);
168    }
169
170    public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) {
171      super(os);
172      this.encryptor = encryptor;
173    }
174
175    @Override
176    public void write(Cell cell) throws IOException {
177      if (encryptor == null) {
178        super.write(cell);
179        return;
180      }
181
182      byte[] iv = nextIv();
183      encryptor.setIv(iv);
184      encryptor.reset();
185
186      // TODO: Check if this is a cell for an encrypted CF. If not, we can
187      // write a 0 here to signal an unwrapped cell and just dump the KV bytes
188      // afterward
189
190      StreamUtils.writeRawVInt32(out, iv.length);
191      out.write(iv);
192
193      // TODO: Add support for WAL compression
194
195      ByteArrayOutputStream baos = new ByteArrayOutputStream();
196      OutputStream cout = encryptor.createEncryptionStream(baos);
197      ByteBufferWriterOutputStream bos = new ByteBufferWriterOutputStream(cout);
198      int tlen = cell.getTagsLength();
199      // Write the KeyValue infrastructure as VInts.
200      StreamUtils.writeRawVInt32(bos, KeyValueUtil.keyLength(cell));
201      StreamUtils.writeRawVInt32(bos, cell.getValueLength());
202      // To support tags
203      StreamUtils.writeRawVInt32(bos, tlen);
204
205      // Write row, qualifier, and family
206      short rowLength = cell.getRowLength();
207      StreamUtils.writeRawVInt32(bos, rowLength);
208      PrivateCellUtil.writeRow(bos, cell, rowLength);
209      byte familyLength = cell.getFamilyLength();
210      StreamUtils.writeRawVInt32(bos, familyLength);
211      PrivateCellUtil.writeFamily(bos, cell, familyLength);
212      int qualifierLength = cell.getQualifierLength();
213      StreamUtils.writeRawVInt32(bos, qualifierLength);
214      PrivateCellUtil.writeQualifier(bos, cell, qualifierLength);
215      // Write the rest ie. ts, type, value and tags parts
216      StreamUtils.writeLong(bos, cell.getTimestamp());
217      bos.write(cell.getTypeByte());
218      PrivateCellUtil.writeValue(bos, cell, cell.getValueLength());
219      if (tlen > 0) {
220        PrivateCellUtil.writeTags(bos, cell, tlen);
221      }
222      bos.close();
223
224      StreamUtils.writeRawVInt32(out, baos.size());
225      baos.writeTo(out);
226
227      // Increment IV given the final payload length
228      incrementIv(baos.size());
229    }
230
231  }
232
233  @Override
234  public Decoder getDecoder(InputStream is) {
235    return new EncryptedKvDecoder(is, decryptor);
236  }
237
238  @Override
239  public Encoder getEncoder(OutputStream os) {
240    return new EncryptedKvEncoder(os, encryptor);
241  }
242
243  public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) {
244    return new SecureWALCellCodec(conf, encryptor);
245  }
246
247  public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) {
248    return new SecureWALCellCodec(conf, decryptor);
249  }
250
251}