001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.Random; 029import java.util.stream.Stream; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.testclassification.IOTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.jupiter.api.AfterEach; 044import org.junit.jupiter.api.BeforeEach; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.TestTemplate; 047import org.junit.jupiter.params.provider.Arguments; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 052 053/** 054 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, and 055 * {@link LruBlockCache}. 056 */ 057@Tag(IOTests.TAG) 058@Tag(SmallTests.TAG) 059@HBaseParameterizedTestTemplate(name = "{index}: cacheOnWrite={0}") 060public class TestLazyDataBlockDecompression { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class); 063 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 064 private static final Random RNG = new Random(9713312); // Just a fixed seed. 065 066 private FileSystem fs; 067 068 private final boolean cacheOnWrite; 069 070 public static Stream<Arguments> parameters() { 071 return Stream.of(Arguments.of(false), Arguments.of(true)); 072 } 073 074 public TestLazyDataBlockDecompression(boolean cacheOnWrite) { 075 this.cacheOnWrite = cacheOnWrite; 076 } 077 078 @BeforeEach 079 public void setUp() throws IOException { 080 fs = FileSystem.get(TEST_UTIL.getConfiguration()); 081 } 082 083 @AfterEach 084 public void tearDown() { 085 fs = null; 086 } 087 088 /** 089 * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row bytes 090 * of the KeyValues written, in the order they were written. 091 */ 092 private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, 093 HFileContext cxt, int entryCount) throws IOException { 094 HFile.Writer writer = 095 new HFile.WriterFactory(conf, cc).withPath(fs, path).withFileContext(cxt).create(); 096 097 // write a bunch of random kvs 098 final byte[] family = Bytes.toBytes("f"); 099 final byte[] qualifier = Bytes.toBytes("q"); 100 for (int i = 0; i < entryCount; i++) { 101 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(RNG, i); 102 byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG); 103 // make a real keyvalue so that hfile tool can examine it 104 writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); 105 } 106 writer.close(); 107 } 108 109 /** 110 * Read all blocks from {@code path} to populate {@code blockCache}. 111 */ 112 private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, 113 Path path, HFileContext cxt) throws IOException { 114 FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); 115 long fileSize = fs.getFileStatus(path).getLen(); 116 FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); 117 ReaderContext context = new ReaderContextBuilder().withFilePath(path).withFileSize(fileSize) 118 .withFileSystem(fsdis.getHfs()).withInputStreamWrapper(fsdis).build(); 119 HFileInfo fileInfo = new HFileInfo(context, conf); 120 HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf); 121 fileInfo.initMetaAndIndex(reader); 122 long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset(); 123 List<HFileBlock> blocks = new ArrayList<>(4); 124 HFileBlock block; 125 while (offset <= max) { 126 block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, 127 /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); 128 offset += block.getOnDiskSizeWithHeader(); 129 blocks.add(block); 130 } 131 LOG.info("read " + Iterables.toString(blocks)); 132 reader.close(); 133 } 134 135 @TestTemplate 136 public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { 137 // enough room for 2 uncompressed block 138 int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); 139 Path hfilePath = 140 new Path(TEST_UTIL.getDataTestDir(), "testCompressionIncreasesEffectiveBlockcacheSize"); 141 HFileContext context = 142 new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).build(); 143 LOG.info("context=" + context); 144 145 // setup cache with lazy-decompression disabled. 146 Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 147 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 148 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 149 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 150 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 151 CacheConfig cc = new CacheConfig(lazyCompressDisabled, 152 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled)); 153 assertFalse(cc.shouldCacheDataCompressed()); 154 assertFalse(cc.isCombinedBlockCache()); 155 LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 156 LOG.info("disabledBlockCache=" + disabledBlockCache); 157 assertEquals(maxSize, disabledBlockCache.getMaxSize(), "test inconsistency detected."); 158 assertTrue(disabledBlockCache.getEvictionThread() == null, 159 "eviction thread spawned unintentionally."); 160 assertEquals(0, disabledBlockCache.getBlockCount(), 161 "freshly created blockcache contains blocks."); 162 163 // 2000 kv's is ~3.6 full unencoded data blocks. 164 // Requires a conf and CacheConfig but should not be specific to this instance's cache settings 165 writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); 166 167 // populate the cache 168 cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); 169 long disabledBlockCount = disabledBlockCache.getBlockCount(); 170 assertTrue(disabledBlockCount > 0, 171 "blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount); 172 long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); 173 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : disabledBlockCache.getMapForTests() 174 .entrySet()) { 175 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 176 assertTrue(block.isUnpacked(), "found a packed block, block=" + block); 177 } 178 179 // count blocks with lazy decompression 180 Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 181 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 182 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 183 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 184 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 185 cc = new CacheConfig(lazyCompressEnabled, 186 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled)); 187 assertTrue(cc.shouldCacheDataCompressed(), "test improperly configured."); 188 assertTrue(cc.getBlockCache().get() instanceof LruBlockCache); 189 LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 190 LOG.info("enabledBlockCache=" + enabledBlockCache); 191 assertEquals(maxSize, enabledBlockCache.getMaxSize(), "test inconsistency detected"); 192 assertTrue(enabledBlockCache.getEvictionThread() == null, 193 "eviction thread spawned unintentionally."); 194 assertEquals(0, enabledBlockCache.getBlockCount(), 195 "freshly created blockcache contains blocks."); 196 197 cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); 198 long enabledBlockCount = enabledBlockCache.getBlockCount(); 199 assertTrue(enabledBlockCount > 0, 200 "blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount); 201 long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); 202 int candidatesFound = 0; 203 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : enabledBlockCache.getMapForTests() 204 .entrySet()) { 205 candidatesFound++; 206 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 207 if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { 208 assertFalse(block.isUnpacked(), "found an unpacked block, block=" + block 209 + ", block buffer capacity=" + block.getBufferWithoutHeader().capacity()); 210 } 211 } 212 assertTrue(candidatesFound > 0, 213 "did not find any candidates for compressed caching. Invalid test."); 214 215 LOG.info( 216 "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount); 217 assertTrue(disabledBlockCount < enabledBlockCount, 218 "enabling compressed data blocks should increase the effective cache size. " 219 + "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount); 220 221 LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" 222 + enabledEvictedCount); 223 assertTrue(enabledEvictedCount < disabledEvictedCount, 224 "enabling compressed data blocks should reduce the number of evictions. " 225 + "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" 226 + enabledEvictedCount); 227 } 228}