001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Map; 029import java.util.Random; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.testclassification.IOTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 054 055/** 056 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, 057 * and {@link LruBlockCache}. 058 */ 059@Category({IOTests.class, SmallTests.class}) 060@RunWith(Parameterized.class) 061public class TestLazyDataBlockDecompression { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestLazyDataBlockDecompression.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class); 068 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 069 070 private FileSystem fs; 071 072 @Parameterized.Parameter(0) 073 public boolean cacheOnWrite; 074 075 @Parameterized.Parameters 076 public static Iterable<Object[]> data() { 077 return Arrays.asList(new Object[][] { 078 { false }, 079 { true } 080 }); 081 } 082 083 @Before 084 public void setUp() throws IOException { 085 CacheConfig.clearGlobalInstances(); 086 fs = FileSystem.get(TEST_UTIL.getConfiguration()); 087 } 088 089 @After 090 public void tearDown() { 091 CacheConfig.clearGlobalInstances(); 092 fs = null; 093 } 094 095 /** 096 * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row 097 * bytes of the KeyValues written, in the order they were written. 098 */ 099 private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, 100 HFileContext cxt, int entryCount) throws IOException { 101 HFile.Writer writer = new HFile.WriterFactory(conf, cc) 102 .withPath(fs, path) 103 .withFileContext(cxt) 104 .create(); 105 106 // write a bunch of random kv's 107 Random rand = new Random(9713312); // some seed. 108 final byte[] family = Bytes.toBytes("f"); 109 final byte[] qualifier = Bytes.toBytes("q"); 110 111 for (int i = 0; i < entryCount; i++) { 112 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i); 113 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 114 // make a real keyvalue so that hfile tool can examine it 115 writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); 116 } 117 writer.close(); 118 } 119 120 /** 121 * Read all blocks from {@code path} to populate {@code blockCache}. 122 */ 123 private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, 124 Path path, HFileContext cxt) throws IOException { 125 FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); 126 long fileSize = fs.getFileStatus(path).getLen(); 127 FixedFileTrailer trailer = 128 FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); 129 HFile.Reader reader = new HFileReaderImpl(path, trailer, fsdis, fileSize, cacheConfig, 130 fsdis.getHfs(), conf); 131 reader.loadFileInfo(); 132 long offset = trailer.getFirstDataBlockOffset(), 133 max = trailer.getLastDataBlockOffset(); 134 List<HFileBlock> blocks = new ArrayList<>(4); 135 HFileBlock block; 136 while (offset <= max) { 137 block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, 138 /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); 139 offset += block.getOnDiskSizeWithHeader(); 140 blocks.add(block); 141 } 142 LOG.info("read " + Iterables.toString(blocks)); 143 } 144 145 @Test 146 public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { 147 // enough room for 2 uncompressed block 148 int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); 149 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), 150 "testCompressionIncreasesEffectiveBlockcacheSize"); 151 HFileContext context = new HFileContextBuilder() 152 .withCompression(Compression.Algorithm.GZ) 153 .build(); 154 LOG.info("context=" + context); 155 156 // setup cache with lazy-decompression disabled. 157 Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 158 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 159 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 160 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 161 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 162 CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = 163 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled); 164 CacheConfig cc = new CacheConfig(lazyCompressDisabled); 165 assertFalse(cc.shouldCacheDataCompressed()); 166 assertTrue(cc.getBlockCache() instanceof LruBlockCache); 167 LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache(); 168 LOG.info("disabledBlockCache=" + disabledBlockCache); 169 assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); 170 assertTrue("eviction thread spawned unintentionally.", 171 disabledBlockCache.getEvictionThread() == null); 172 assertEquals("freshly created blockcache contains blocks.", 173 0, disabledBlockCache.getBlockCount()); 174 175 // 2000 kv's is ~3.6 full unencoded data blocks. 176 // Requires a conf and CacheConfig but should not be specific to this instance's cache settings 177 writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); 178 179 // populate the cache 180 cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); 181 long disabledBlockCount = disabledBlockCache.getBlockCount(); 182 assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, 183 disabledBlockCount > 0); 184 long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); 185 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : 186 disabledBlockCache.getMapForTests().entrySet()) { 187 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 188 assertTrue("found a packed block, block=" + block, block.isUnpacked()); 189 } 190 191 // count blocks with lazy decompression 192 Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 193 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 194 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 195 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 196 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 197 CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = 198 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled); 199 cc = new CacheConfig(lazyCompressEnabled); 200 assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); 201 assertTrue(cc.getBlockCache() instanceof LruBlockCache); 202 LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache(); 203 LOG.info("enabledBlockCache=" + enabledBlockCache); 204 assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); 205 assertTrue("eviction thread spawned unintentionally.", 206 enabledBlockCache.getEvictionThread() == null); 207 assertEquals("freshly created blockcache contains blocks.", 208 0, enabledBlockCache.getBlockCount()); 209 210 cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); 211 long enabledBlockCount = enabledBlockCache.getBlockCount(); 212 assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, 213 enabledBlockCount > 0); 214 long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); 215 int candidatesFound = 0; 216 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : 217 enabledBlockCache.getMapForTests().entrySet()) { 218 candidatesFound++; 219 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 220 if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { 221 assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" + 222 block.getBufferWithoutHeader().capacity(), block.isUnpacked()); 223 } 224 } 225 assertTrue("did not find any candidates for compressed caching. Invalid test.", 226 candidatesFound > 0); 227 228 LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + 229 enabledBlockCount); 230 assertTrue("enabling compressed data blocks should increase the effective cache size. " + 231 "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + 232 enabledBlockCount, disabledBlockCount < enabledBlockCount); 233 234 LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + 235 enabledEvictedCount); 236 assertTrue("enabling compressed data blocks should reduce the number of evictions. " + 237 "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + 238 enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); 239 } 240}