001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayInputStream; 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import org.apache.commons.io.IOUtils; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ExtendedCell; 028import org.apache.hadoop.hbase.KeyValue; 029import org.apache.hadoop.hbase.KeyValueUtil; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 032import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; 033import org.apache.hadoop.hbase.io.crypto.Decryptor; 034import org.apache.hadoop.hbase.io.crypto.Encryption; 035import org.apache.hadoop.hbase.io.crypto.Encryptor; 036import org.apache.hadoop.hbase.io.util.StreamUtils; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.yetus.audience.InterfaceAudience; 039 040/** 041 * A WALCellCodec that encrypts the WALedits. 042 */ 043@InterfaceAudience.Private 044public class SecureWALCellCodec extends WALCellCodec { 045 046 private Encryptor encryptor; 047 private Decryptor decryptor; 048 049 public SecureWALCellCodec(Configuration conf, CompressionContext compression) { 050 super(conf, compression); 051 } 052 053 public SecureWALCellCodec(Configuration conf, Encryptor encryptor) { 054 super(conf, null); 055 this.encryptor = encryptor; 056 } 057 058 public SecureWALCellCodec(Configuration conf, Decryptor decryptor) { 059 super(conf, null); 060 this.decryptor = decryptor; 061 } 062 063 static class EncryptedKvDecoder extends KeyValueCodecWithTags.KeyValueDecoder { 064 065 private Decryptor decryptor; 066 private byte[] iv; 067 068 public EncryptedKvDecoder(InputStream in) { 069 super(in); 070 } 071 072 public EncryptedKvDecoder(InputStream in, Decryptor decryptor) { 073 super(in); 074 this.decryptor = decryptor; 075 if (decryptor != null) { 076 this.iv = new byte[decryptor.getIvLength()]; 077 } 078 } 079 080 @Override 081 protected ExtendedCell parseCell() throws IOException { 082 if (this.decryptor == null) { 083 return super.parseCell(); 084 } 085 int ivLength = 0; 086 087 ivLength = StreamUtils.readRawVarint32(in); 088 089 // TODO: An IV length of 0 could signify an unwrapped cell, when the 090 // encoder supports that just read the remainder in directly 091 092 if (ivLength != this.iv.length) { 093 throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" + ivLength); 094 } 095 IOUtils.readFully(in, this.iv); 096 097 int codedLength = StreamUtils.readRawVarint32(in); 098 byte[] codedBytes = new byte[codedLength]; 099 IOUtils.readFully(in, codedBytes); 100 101 decryptor.setIv(iv); 102 decryptor.reset(); 103 104 InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes)); 105 106 // TODO: Add support for WAL compression 107 108 int keylength = StreamUtils.readRawVarint32(cin); 109 int vlength = StreamUtils.readRawVarint32(cin); 110 int tagsLength = StreamUtils.readRawVarint32(cin); 111 int length = 0; 112 if (tagsLength == 0) { 113 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; 114 } else { 115 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; 116 } 117 118 byte[] backingArray = new byte[length]; 119 int pos = 0; 120 pos = Bytes.putInt(backingArray, pos, keylength); 121 pos = Bytes.putInt(backingArray, pos, vlength); 122 123 // Row 124 int elemLen = StreamUtils.readRawVarint32(cin); 125 pos = Bytes.putShort(backingArray, pos, (short) elemLen); 126 IOUtils.readFully(cin, backingArray, pos, elemLen); 127 pos += elemLen; 128 // Family 129 elemLen = StreamUtils.readRawVarint32(cin); 130 pos = Bytes.putByte(backingArray, pos, (byte) elemLen); 131 IOUtils.readFully(cin, backingArray, pos, elemLen); 132 pos += elemLen; 133 // Qualifier 134 elemLen = StreamUtils.readRawVarint32(cin); 135 IOUtils.readFully(cin, backingArray, pos, elemLen); 136 pos += elemLen; 137 // Remainder 138 IOUtils.readFully(cin, backingArray, pos, length - pos); 139 return new KeyValue(backingArray, 0, length); 140 } 141 142 } 143 144 static class EncryptedKvEncoder extends KeyValueCodecWithTags.KeyValueEncoder { 145 146 private Encryptor encryptor; 147 private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() { 148 @Override 149 protected byte[] initialValue() { 150 byte[] iv = new byte[encryptor.getIvLength()]; 151 Bytes.secureRandom(iv); 152 return iv; 153 } 154 }; 155 156 protected byte[] nextIv() { 157 byte[] b = iv.get(), ret = new byte[b.length]; 158 System.arraycopy(b, 0, ret, 0, b.length); 159 return ret; 160 } 161 162 protected void incrementIv(int v) { 163 Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize())); 164 } 165 166 public EncryptedKvEncoder(OutputStream os) { 167 super(os); 168 } 169 170 public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) { 171 super(os); 172 this.encryptor = encryptor; 173 } 174 175 @Override 176 public void write(ExtendedCell cell) throws IOException { 177 if (encryptor == null) { 178 super.write(cell); 179 return; 180 } 181 182 byte[] iv = nextIv(); 183 encryptor.setIv(iv); 184 encryptor.reset(); 185 186 // TODO: Check if this is a cell for an encrypted CF. If not, we can 187 // write a 0 here to signal an unwrapped cell and just dump the KV bytes 188 // afterward 189 190 StreamUtils.writeRawVInt32(out, iv.length); 191 out.write(iv); 192 193 // TODO: Add support for WAL compression 194 195 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 196 OutputStream cout = encryptor.createEncryptionStream(baos); 197 ByteBufferWriterOutputStream bos = new ByteBufferWriterOutputStream(cout); 198 int tlen = cell.getTagsLength(); 199 // Write the KeyValue infrastructure as VInts. 200 StreamUtils.writeRawVInt32(bos, KeyValueUtil.keyLength(cell)); 201 StreamUtils.writeRawVInt32(bos, cell.getValueLength()); 202 // To support tags 203 StreamUtils.writeRawVInt32(bos, tlen); 204 205 // Write row, qualifier, and family 206 short rowLength = cell.getRowLength(); 207 StreamUtils.writeRawVInt32(bos, rowLength); 208 PrivateCellUtil.writeRow(bos, cell, rowLength); 209 byte familyLength = cell.getFamilyLength(); 210 StreamUtils.writeRawVInt32(bos, familyLength); 211 PrivateCellUtil.writeFamily(bos, cell, familyLength); 212 int qualifierLength = cell.getQualifierLength(); 213 StreamUtils.writeRawVInt32(bos, qualifierLength); 214 PrivateCellUtil.writeQualifier(bos, cell, qualifierLength); 215 // Write the rest ie. ts, type, value and tags parts 216 StreamUtils.writeLong(bos, cell.getTimestamp()); 217 bos.write(cell.getTypeByte()); 218 PrivateCellUtil.writeValue(bos, cell, cell.getValueLength()); 219 if (tlen > 0) { 220 PrivateCellUtil.writeTags(bos, cell, tlen); 221 } 222 bos.close(); 223 224 StreamUtils.writeRawVInt32(out, baos.size()); 225 baos.writeTo(out); 226 227 // Increment IV given the final payload length 228 incrementIv(baos.size()); 229 } 230 231 } 232 233 @Override 234 public Decoder getDecoder(InputStream is) { 235 return new EncryptedKvDecoder(is, decryptor); 236 } 237 238 @Override 239 public Encoder getEncoder(OutputStream os) { 240 return new EncryptedKvEncoder(os, encryptor); 241 } 242 243 public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) { 244 return new SecureWALCellCodec(conf, encryptor); 245 } 246 247 public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) { 248 return new SecureWALCellCodec(conf, decryptor); 249 } 250 251}