001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayInputStream; 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.security.SecureRandom; 026 027import org.apache.commons.io.IOUtils; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.PrivateCellUtil; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 035import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; 036import org.apache.hadoop.hbase.io.crypto.Decryptor; 037import org.apache.hadoop.hbase.io.crypto.Encryption; 038import org.apache.hadoop.hbase.io.crypto.Encryptor; 039import org.apache.hadoop.hbase.io.util.StreamUtils; 040import org.apache.hadoop.hbase.util.Bytes; 041 042/** 043 * A WALCellCodec that encrypts the WALedits. 044 */ 045@InterfaceAudience.Private 046public class SecureWALCellCodec extends WALCellCodec { 047 048 private Encryptor encryptor; 049 private Decryptor decryptor; 050 051 public SecureWALCellCodec(Configuration conf, CompressionContext compression) { 052 super(conf, compression); 053 } 054 055 public SecureWALCellCodec(Configuration conf, Encryptor encryptor) { 056 super(conf, null); 057 this.encryptor = encryptor; 058 } 059 060 public SecureWALCellCodec(Configuration conf, Decryptor decryptor) { 061 super(conf, null); 062 this.decryptor = decryptor; 063 } 064 065 static class EncryptedKvDecoder extends KeyValueCodecWithTags.KeyValueDecoder { 066 067 private Decryptor decryptor; 068 private byte[] iv; 069 070 public EncryptedKvDecoder(InputStream in) { 071 super(in); 072 } 073 074 public EncryptedKvDecoder(InputStream in, Decryptor decryptor) { 075 super(in); 076 this.decryptor = decryptor; 077 if (decryptor != null) { 078 this.iv = new byte[decryptor.getIvLength()]; 079 } 080 } 081 082 @Override 083 protected Cell parseCell() throws IOException { 084 if (this.decryptor == null) { 085 return super.parseCell(); 086 } 087 int ivLength = 0; 088 089 ivLength = StreamUtils.readRawVarint32(in); 090 091 // TODO: An IV length of 0 could signify an unwrapped cell, when the 092 // encoder supports that just read the remainder in directly 093 094 if (ivLength != this.iv.length) { 095 throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" + 096 ivLength); 097 } 098 IOUtils.readFully(in, this.iv); 099 100 int codedLength = StreamUtils.readRawVarint32(in); 101 byte[] codedBytes = new byte[codedLength]; 102 IOUtils.readFully(in, codedBytes); 103 104 decryptor.setIv(iv); 105 decryptor.reset(); 106 107 InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes)); 108 109 // TODO: Add support for WAL compression 110 111 int keylength = StreamUtils.readRawVarint32(cin); 112 int vlength = StreamUtils.readRawVarint32(cin); 113 int tagsLength = StreamUtils.readRawVarint32(cin); 114 int length = 0; 115 if (tagsLength == 0) { 116 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; 117 } else { 118 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; 119 } 120 121 byte[] backingArray = new byte[length]; 122 int pos = 0; 123 pos = Bytes.putInt(backingArray, pos, keylength); 124 pos = Bytes.putInt(backingArray, pos, vlength); 125 126 // Row 127 int elemLen = StreamUtils.readRawVarint32(cin); 128 pos = Bytes.putShort(backingArray, pos, (short)elemLen); 129 IOUtils.readFully(cin, backingArray, pos, elemLen); 130 pos += elemLen; 131 // Family 132 elemLen = StreamUtils.readRawVarint32(cin); 133 pos = Bytes.putByte(backingArray, pos, (byte)elemLen); 134 IOUtils.readFully(cin, backingArray, pos, elemLen); 135 pos += elemLen; 136 // Qualifier 137 elemLen = StreamUtils.readRawVarint32(cin); 138 IOUtils.readFully(cin, backingArray, pos, elemLen); 139 pos += elemLen; 140 // Remainder 141 IOUtils.readFully(cin, backingArray, pos, length - pos); 142 return new KeyValue(backingArray, 0, length); 143 } 144 145 } 146 147 static class EncryptedKvEncoder extends KeyValueCodecWithTags.KeyValueEncoder { 148 149 private Encryptor encryptor; 150 private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() { 151 @Override 152 protected byte[] initialValue() { 153 byte[] iv = new byte[encryptor.getIvLength()]; 154 new SecureRandom().nextBytes(iv); 155 return iv; 156 } 157 }; 158 159 protected byte[] nextIv() { 160 byte[] b = iv.get(), ret = new byte[b.length]; 161 System.arraycopy(b, 0, ret, 0, b.length); 162 return ret; 163 } 164 165 protected void incrementIv(int v) { 166 Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize())); 167 } 168 169 public EncryptedKvEncoder(OutputStream os) { 170 super(os); 171 } 172 173 public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) { 174 super(os); 175 this.encryptor = encryptor; 176 } 177 178 @Override 179 public void write(Cell cell) throws IOException { 180 if (encryptor == null) { 181 super.write(cell); 182 return; 183 } 184 185 byte[] iv = nextIv(); 186 encryptor.setIv(iv); 187 encryptor.reset(); 188 189 // TODO: Check if this is a cell for an encrypted CF. If not, we can 190 // write a 0 here to signal an unwrapped cell and just dump the KV bytes 191 // afterward 192 193 StreamUtils.writeRawVInt32(out, iv.length); 194 out.write(iv); 195 196 // TODO: Add support for WAL compression 197 198 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 199 OutputStream cout = encryptor.createEncryptionStream(baos); 200 ByteBufferWriterOutputStream bos = new ByteBufferWriterOutputStream(cout); 201 int tlen = cell.getTagsLength(); 202 // Write the KeyValue infrastructure as VInts. 203 StreamUtils.writeRawVInt32(bos, KeyValueUtil.keyLength(cell)); 204 StreamUtils.writeRawVInt32(bos, cell.getValueLength()); 205 // To support tags 206 StreamUtils.writeRawVInt32(bos, tlen); 207 208 // Write row, qualifier, and family 209 short rowLength = cell.getRowLength(); 210 StreamUtils.writeRawVInt32(bos, rowLength); 211 PrivateCellUtil.writeRow(bos, cell, rowLength); 212 byte familyLength = cell.getFamilyLength(); 213 StreamUtils.writeRawVInt32(bos, familyLength); 214 PrivateCellUtil.writeFamily(bos, cell, familyLength); 215 int qualifierLength = cell.getQualifierLength(); 216 StreamUtils.writeRawVInt32(bos, qualifierLength); 217 PrivateCellUtil.writeQualifier(bos, cell, qualifierLength); 218 // Write the rest ie. ts, type, value and tags parts 219 StreamUtils.writeLong(bos, cell.getTimestamp()); 220 bos.write(cell.getTypeByte()); 221 PrivateCellUtil.writeValue(bos, cell, cell.getValueLength()); 222 if (tlen > 0) { 223 PrivateCellUtil.writeTags(bos, cell, tlen); 224 } 225 bos.close(); 226 227 StreamUtils.writeRawVInt32(out, baos.size()); 228 baos.writeTo(out); 229 230 // Increment IV given the final payload length 231 incrementIv(baos.size()); 232 } 233 234 } 235 236 @Override 237 public Decoder getDecoder(InputStream is) { 238 return new EncryptedKvDecoder(is, decryptor); 239 } 240 241 @Override 242 public Encoder getEncoder(OutputStream os) { 243 return new EncryptedKvEncoder(os, encryptor); 244 } 245 246 public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) { 247 return new SecureWALCellCodec(conf, encryptor); 248 } 249 250 public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) { 251 return new SecureWALCellCodec(conf, decryptor); 252 } 253 254}