001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Map; 029import java.util.Random; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.testclassification.IOTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 054 055/** 056 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, and 057 * {@link LruBlockCache}. 058 */ 059@Category({ IOTests.class, SmallTests.class }) 060@RunWith(Parameterized.class) 061public class TestLazyDataBlockDecompression { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestLazyDataBlockDecompression.class); 066 private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class); 067 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 068 private static final Random RNG = new Random(9713312); // Just a fixed seed. 069 070 private FileSystem fs; 071 072 @Parameterized.Parameter(0) 073 public boolean cacheOnWrite; 074 075 @Parameterized.Parameters 076 public static Iterable<Object[]> data() { 077 return Arrays.asList(new Object[][] { { false }, { true } }); 078 } 079 080 @Before 081 public void setUp() throws IOException { 082 fs = FileSystem.get(TEST_UTIL.getConfiguration()); 083 } 084 085 @After 086 public void tearDown() { 087 fs = null; 088 } 089 090 /** 091 * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row bytes 092 * of the KeyValues written, in the order they were written. 093 */ 094 private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, 095 HFileContext cxt, int entryCount) throws IOException { 096 HFile.Writer writer = 097 new HFile.WriterFactory(conf, cc).withPath(fs, path).withFileContext(cxt).create(); 098 099 // write a bunch of random kvs 100 final byte[] family = Bytes.toBytes("f"); 101 final byte[] qualifier = Bytes.toBytes("q"); 102 for (int i = 0; i < entryCount; i++) { 103 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(RNG, i); 104 byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG); 105 // make a real keyvalue so that hfile tool can examine it 106 writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); 107 } 108 writer.close(); 109 } 110 111 /** 112 * Read all blocks from {@code path} to populate {@code blockCache}. 113 */ 114 private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, 115 Path path, HFileContext cxt) throws IOException { 116 FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); 117 long fileSize = fs.getFileStatus(path).getLen(); 118 FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); 119 ReaderContext context = new ReaderContextBuilder().withFilePath(path).withFileSize(fileSize) 120 .withFileSystem(fsdis.getHfs()).withInputStreamWrapper(fsdis).build(); 121 HFileInfo fileInfo = new HFileInfo(context, conf); 122 HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf); 123 fileInfo.initMetaAndIndex(reader); 124 long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset(); 125 List<HFileBlock> blocks = new ArrayList<>(4); 126 HFileBlock block; 127 while (offset <= max) { 128 block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, 129 /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); 130 offset += block.getOnDiskSizeWithHeader(); 131 blocks.add(block); 132 } 133 LOG.info("read " + Iterables.toString(blocks)); 134 reader.close(); 135 } 136 137 @Test 138 public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { 139 // enough room for 2 uncompressed block 140 int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); 141 Path hfilePath = 142 new Path(TEST_UTIL.getDataTestDir(), "testCompressionIncreasesEffectiveBlockcacheSize"); 143 HFileContext context = 144 new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).build(); 145 LOG.info("context=" + context); 146 147 // setup cache with lazy-decompression disabled. 148 Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 149 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 150 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 151 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 152 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 153 CacheConfig cc = new CacheConfig(lazyCompressDisabled, 154 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled)); 155 assertFalse(cc.shouldCacheDataCompressed()); 156 assertFalse(cc.isCombinedBlockCache()); 157 LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 158 LOG.info("disabledBlockCache=" + disabledBlockCache); 159 assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); 160 assertTrue("eviction thread spawned unintentionally.", 161 disabledBlockCache.getEvictionThread() == null); 162 assertEquals("freshly created blockcache contains blocks.", 0, 163 disabledBlockCache.getBlockCount()); 164 165 // 2000 kv's is ~3.6 full unencoded data blocks. 166 // Requires a conf and CacheConfig but should not be specific to this instance's cache settings 167 writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); 168 169 // populate the cache 170 cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); 171 long disabledBlockCount = disabledBlockCache.getBlockCount(); 172 assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, 173 disabledBlockCount > 0); 174 long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); 175 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : disabledBlockCache.getMapForTests() 176 .entrySet()) { 177 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 178 assertTrue("found a packed block, block=" + block, block.isUnpacked()); 179 } 180 181 // count blocks with lazy decompression 182 Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 183 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 184 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 185 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 186 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 187 cc = new CacheConfig(lazyCompressEnabled, 188 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled)); 189 assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); 190 assertTrue(cc.getBlockCache().get() instanceof LruBlockCache); 191 LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 192 LOG.info("enabledBlockCache=" + enabledBlockCache); 193 assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); 194 assertTrue("eviction thread spawned unintentionally.", 195 enabledBlockCache.getEvictionThread() == null); 196 assertEquals("freshly created blockcache contains blocks.", 0, 197 enabledBlockCache.getBlockCount()); 198 199 cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); 200 long enabledBlockCount = enabledBlockCache.getBlockCount(); 201 assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, 202 enabledBlockCount > 0); 203 long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); 204 int candidatesFound = 0; 205 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : enabledBlockCache.getMapForTests() 206 .entrySet()) { 207 candidatesFound++; 208 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 209 if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { 210 assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" 211 + block.getBufferWithoutHeader().capacity(), block.isUnpacked()); 212 } 213 } 214 assertTrue("did not find any candidates for compressed caching. Invalid test.", 215 candidatesFound > 0); 216 217 LOG.info( 218 "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount); 219 assertTrue( 220 "enabling compressed data blocks should increase the effective cache size. " 221 + "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount, 222 disabledBlockCount < enabledBlockCount); 223 224 LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" 225 + enabledEvictedCount); 226 assertTrue("enabling compressed data blocks should reduce the number of evictions. " 227 + "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" 228 + enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); 229 } 230}