001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.DataInputStream; 027import java.io.DataOutputStream; 028import java.io.IOException; 029import java.util.List; 030import java.util.Random; 031import java.util.concurrent.ThreadLocalRandom; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.ExtendedCell; 038import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.KeyValueUtil; 043import org.apache.hadoop.hbase.io.ByteBuffAllocator; 044import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 045import org.apache.hadoop.hbase.io.compress.Compression; 046import org.apache.hadoop.hbase.io.crypto.Cipher; 047import org.apache.hadoop.hbase.io.crypto.Encryption; 048import org.apache.hadoop.hbase.io.crypto.MockAesKeyProvider; 049import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 050import org.apache.hadoop.hbase.testclassification.IOTests; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.RedundantKVGenerator; 054import org.junit.jupiter.api.BeforeAll; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.Test; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Tag(IOTests.TAG) 061@Tag(SmallTests.TAG) 062public class TestHFileEncryption { 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestHFileEncryption.class); 065 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 066 067 private static FileSystem fs; 068 private static Encryption.Context cryptoContext; 069 070 @BeforeAll 071 public static void setUp() throws Exception { 072 Configuration conf = TEST_UTIL.getConfiguration(); 073 // Disable block cache in this test. 074 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 075 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, MockAesKeyProvider.class.getName()); 076 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); 077 conf.setInt("hfile.format.version", 3); 078 079 fs = FileSystem.get(conf); 080 081 cryptoContext = Encryption.newContext(conf); 082 String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES); 083 Cipher aes = Encryption.getCipher(conf, algorithm); 084 assertNotNull(aes); 085 cryptoContext.setCipher(aes); 086 byte[] key = new byte[aes.getKeyLength()]; 087 Bytes.secureRandom(key); 088 cryptoContext.setKey(key); 089 } 090 091 private int writeBlock(Configuration conf, FSDataOutputStream os, HFileContext fileContext, 092 int size) throws IOException { 093 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, fileContext); 094 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 095 for (int j = 0; j < size; j++) { 096 dos.writeInt(j); 097 } 098 hbw.writeHeaderAndData(os); 099 LOG.info("Wrote a block at " + os.getPos() + " with" + " onDiskSizeWithHeader=" 100 + hbw.getOnDiskSizeWithHeader() + " uncompressedSizeWithoutHeader=" 101 + hbw.getOnDiskSizeWithoutHeader() + " uncompressedSizeWithoutHeader=" 102 + hbw.getUncompressedSizeWithoutHeader()); 103 return hbw.getOnDiskSizeWithHeader(); 104 } 105 106 private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size) 107 throws IOException { 108 HFileBlock b = hbr.readBlockData(pos, -1, false, false, true); 109 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 110 b.sanityCheck(); 111 assertFalse( 112 (b.getHFileContext().getCompression() != Compression.Algorithm.NONE) && b.isUnpacked()); 113 b = b.unpack(ctx, hbr); 114 LOG.info( 115 "Read a block at " + pos + " with" + " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() 116 + " uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() 117 + " uncompressedSizeWithoutHeader=" + b.getUncompressedSizeWithoutHeader()); 118 DataInputStream dis = b.getByteStream(); 119 for (int i = 0; i < size; i++) { 120 int read = dis.readInt(); 121 if (read != i) { 122 fail("Block data corrupt at element " + i); 123 } 124 } 125 return b.getOnDiskSizeWithHeader(); 126 } 127 128 @Test 129 public void testDataBlockEncryption() throws IOException { 130 final int blocks = 10; 131 final int[] blockSizes = new int[blocks]; 132 final Random rand = ThreadLocalRandom.current(); 133 for (int i = 0; i < blocks; i++) { 134 blockSizes[i] = (1024 + rand.nextInt(1024 * 63)) / Bytes.SIZEOF_INT; 135 } 136 for (Compression.Algorithm compression : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 137 Path path = new Path(TEST_UTIL.getDataTestDir(), "block_v3_" + compression + "_AES"); 138 LOG.info("testDataBlockEncryption: encryption=AES compression=" + compression); 139 long totalSize = 0; 140 HFileContext fileContext = new HFileContextBuilder().withCompression(compression) 141 .withEncryptionContext(cryptoContext).build(); 142 FSDataOutputStream os = fs.create(path); 143 try { 144 for (int i = 0; i < blocks; i++) { 145 totalSize += writeBlock(TEST_UTIL.getConfiguration(), os, fileContext, blockSizes[i]); 146 } 147 } finally { 148 os.close(); 149 } 150 FSDataInputStream is = fs.open(path); 151 ReaderContext context = 152 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 153 .withFilePath(path).withFileSystem(fs).withFileSize(totalSize).build(); 154 try { 155 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, fileContext, 156 ByteBuffAllocator.HEAP, TEST_UTIL.getConfiguration()); 157 long pos = 0; 158 for (int i = 0; i < blocks; i++) { 159 pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]); 160 } 161 } finally { 162 is.close(); 163 } 164 } 165 } 166 167 @Test 168 public void testHFileEncryptionMetadata() throws Exception { 169 Configuration conf = TEST_UTIL.getConfiguration(); 170 CacheConfig cacheConf = new CacheConfig(conf); 171 HFileContext fileContext = 172 new HFileContextBuilder().withEncryptionContext(cryptoContext).build(); 173 174 // write a simple encrypted hfile 175 Path path = new Path(TEST_UTIL.getDataTestDir(), "cryptometa.hfile"); 176 FSDataOutputStream out = fs.create(path); 177 HFile.Writer writer = HFile.getWriterFactory(conf, cacheConf).withOutputStream(out) 178 .withFileContext(fileContext).create(); 179 try { 180 KeyValue kv = 181 new KeyValue(Bytes.toBytes("foo"), Bytes.toBytes("f1"), null, Bytes.toBytes("value")); 182 writer.append(kv); 183 } finally { 184 writer.close(); 185 out.close(); 186 } 187 188 // read it back in and validate correct crypto metadata 189 HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); 190 try { 191 FixedFileTrailer trailer = reader.getTrailer(); 192 assertNotNull(trailer.getEncryptionKey()); 193 Encryption.Context readerContext = reader.getFileContext().getEncryptionContext(); 194 assertEquals(readerContext.getCipher().getName(), cryptoContext.getCipher().getName()); 195 assertTrue(Bytes.equals(readerContext.getKeyBytes(), cryptoContext.getKeyBytes())); 196 } finally { 197 reader.close(); 198 } 199 } 200 201 @Test 202 public void testHFileEncryption() throws Exception { 203 // Create 1000 random test KVs 204 RedundantKVGenerator generator = new RedundantKVGenerator(); 205 List<KeyValue> testKvs = generator.generateTestKeyValues(1000); 206 207 // Iterate through data block encoding and compression combinations 208 Configuration conf = TEST_UTIL.getConfiguration(); 209 CacheConfig cacheConf = new CacheConfig(conf); 210 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 211 for (Compression.Algorithm compression : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 212 HFileContext fileContext = new HFileContextBuilder().withBlockSize(4096) // small blocks 213 .withEncryptionContext(cryptoContext).withCompression(compression) 214 .withDataBlockEncoding(encoding).build(); 215 // write a new test HFile 216 LOG.info("Writing with " + fileContext); 217 Path path = new Path(TEST_UTIL.getDataTestDir(), 218 HBaseCommonTestingUtil.getRandomUUID().toString() + ".hfile"); 219 FSDataOutputStream out = fs.create(path); 220 HFile.Writer writer = HFile.getWriterFactory(conf, cacheConf).withOutputStream(out) 221 .withFileContext(fileContext).create(); 222 try { 223 for (KeyValue kv : testKvs) { 224 writer.append(kv); 225 } 226 } finally { 227 writer.close(); 228 out.close(); 229 } 230 231 // read it back in 232 LOG.info("Reading with " + fileContext); 233 int i = 0; 234 HFileScanner scanner = null; 235 HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); 236 try { 237 FixedFileTrailer trailer = reader.getTrailer(); 238 assertNotNull(trailer.getEncryptionKey()); 239 scanner = reader.getScanner(conf, false, false); 240 assertTrue(scanner.seekTo(), "Initial seekTo failed"); 241 do { 242 ExtendedCell kv = scanner.getCell(); 243 assertTrue(testKvs.contains(KeyValueUtil.ensureKeyValue(kv)), 244 "Read back an unexpected or invalid KV"); 245 i++; 246 } while (scanner.next()); 247 } finally { 248 reader.close(); 249 scanner.close(); 250 } 251 252 assertEquals(i, testKvs.size(), "Did not read back as many KVs as written"); 253 254 // Test random seeks with pread 255 LOG.info("Random seeking with " + fileContext); 256 Random rand = ThreadLocalRandom.current(); 257 reader = HFile.createReader(fs, path, cacheConf, true, conf); 258 try { 259 scanner = reader.getScanner(conf, false, true); 260 assertTrue(scanner.seekTo(), "Initial seekTo failed"); 261 for (i = 0; i < 100; i++) { 262 KeyValue kv = testKvs.get(rand.nextInt(testKvs.size())); 263 assertEquals(0, scanner.seekTo(kv), "Unable to find KV as expected: " + kv); 264 } 265 } finally { 266 scanner.close(); 267 reader.close(); 268 } 269 } 270 } 271 } 272 273}