001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Map; 029import java.util.Random; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.testclassification.IOTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 054 055/** 056 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, 057 * and {@link LruBlockCache}. 058 */ 059@Category({IOTests.class, SmallTests.class}) 060@RunWith(Parameterized.class) 061public class TestLazyDataBlockDecompression { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestLazyDataBlockDecompression.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class); 068 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 069 070 private FileSystem fs; 071 072 @Parameterized.Parameter(0) 073 public boolean cacheOnWrite; 074 075 @Parameterized.Parameters 076 public static Iterable<Object[]> data() { 077 return Arrays.asList(new Object[][] { 078 { false }, 079 { true } 080 }); 081 } 082 083 @Before 084 public void setUp() throws IOException { 085 fs = FileSystem.get(TEST_UTIL.getConfiguration()); 086 } 087 088 @After 089 public void tearDown() { 090 fs = null; 091 } 092 093 /** 094 * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row 095 * bytes of the KeyValues written, in the order they were written. 096 */ 097 private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, 098 HFileContext cxt, int entryCount) throws IOException { 099 HFile.Writer writer = new HFile.WriterFactory(conf, cc) 100 .withPath(fs, path) 101 .withFileContext(cxt) 102 .create(); 103 104 // write a bunch of random kv's 105 Random rand = new Random(9713312); // some seed. 106 final byte[] family = Bytes.toBytes("f"); 107 final byte[] qualifier = Bytes.toBytes("q"); 108 109 for (int i = 0; i < entryCount; i++) { 110 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i); 111 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 112 // make a real keyvalue so that hfile tool can examine it 113 writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); 114 } 115 writer.close(); 116 } 117 118 /** 119 * Read all blocks from {@code path} to populate {@code blockCache}. 120 */ 121 private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, 122 Path path, HFileContext cxt) throws IOException { 123 FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); 124 long fileSize = fs.getFileStatus(path).getLen(); 125 FixedFileTrailer trailer = 126 FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); 127 ReaderContext context = new ReaderContextBuilder() 128 .withFilePath(path) 129 .withFileSize(fileSize) 130 .withFileSystem(fsdis.getHfs()) 131 .withInputStreamWrapper(fsdis) 132 .build(); 133 HFileInfo fileInfo = new HFileInfo(context, conf); 134 HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf); 135 fileInfo.initMetaAndIndex(reader); 136 long offset = trailer.getFirstDataBlockOffset(), 137 max = trailer.getLastDataBlockOffset(); 138 List<HFileBlock> blocks = new ArrayList<>(4); 139 HFileBlock block; 140 while (offset <= max) { 141 block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, 142 /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); 143 offset += block.getOnDiskSizeWithHeader(); 144 blocks.add(block); 145 } 146 LOG.info("read " + Iterables.toString(blocks)); 147 reader.close(); 148 } 149 150 @Test 151 public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { 152 // enough room for 2 uncompressed block 153 int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); 154 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), 155 "testCompressionIncreasesEffectiveBlockcacheSize"); 156 HFileContext context = new HFileContextBuilder() 157 .withCompression(Compression.Algorithm.GZ) 158 .build(); 159 LOG.info("context=" + context); 160 161 // setup cache with lazy-decompression disabled. 162 Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 163 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 164 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 165 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 166 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 167 CacheConfig cc = new CacheConfig(lazyCompressDisabled, 168 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled)); 169 assertFalse(cc.shouldCacheDataCompressed()); 170 assertFalse(cc.isCombinedBlockCache()); 171 LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 172 LOG.info("disabledBlockCache=" + disabledBlockCache); 173 assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); 174 assertTrue("eviction thread spawned unintentionally.", 175 disabledBlockCache.getEvictionThread() == null); 176 assertEquals("freshly created blockcache contains blocks.", 177 0, disabledBlockCache.getBlockCount()); 178 179 // 2000 kv's is ~3.6 full unencoded data blocks. 180 // Requires a conf and CacheConfig but should not be specific to this instance's cache settings 181 writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); 182 183 // populate the cache 184 cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); 185 long disabledBlockCount = disabledBlockCache.getBlockCount(); 186 assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, 187 disabledBlockCount > 0); 188 long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); 189 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : 190 disabledBlockCache.getMapForTests().entrySet()) { 191 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 192 assertTrue("found a packed block, block=" + block, block.isUnpacked()); 193 } 194 195 // count blocks with lazy decompression 196 Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 197 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 198 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 199 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 200 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 201 cc = new CacheConfig(lazyCompressEnabled, 202 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled)); 203 assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); 204 assertTrue(cc.getBlockCache().get() instanceof LruBlockCache); 205 LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 206 LOG.info("enabledBlockCache=" + enabledBlockCache); 207 assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); 208 assertTrue("eviction thread spawned unintentionally.", 209 enabledBlockCache.getEvictionThread() == null); 210 assertEquals("freshly created blockcache contains blocks.", 211 0, enabledBlockCache.getBlockCount()); 212 213 cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); 214 long enabledBlockCount = enabledBlockCache.getBlockCount(); 215 assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, 216 enabledBlockCount > 0); 217 long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); 218 int candidatesFound = 0; 219 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : 220 enabledBlockCache.getMapForTests().entrySet()) { 221 candidatesFound++; 222 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 223 if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { 224 assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" + 225 block.getBufferWithoutHeader().capacity(), block.isUnpacked()); 226 } 227 } 228 assertTrue("did not find any candidates for compressed caching. Invalid test.", 229 candidatesFound > 0); 230 231 LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + 232 enabledBlockCount); 233 assertTrue("enabling compressed data blocks should increase the effective cache size. " + 234 "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + 235 enabledBlockCount, disabledBlockCount < enabledBlockCount); 236 237 LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + 238 enabledEvictedCount); 239 assertTrue("enabling compressed data blocks should reduce the number of evictions. " + 240 "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + 241 enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); 242 } 243}