001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Map; 029import java.util.Random; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.testclassification.IOTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 054 055/** 056 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, 057 * and {@link LruBlockCache}. 058 */ 059@Category({IOTests.class, SmallTests.class}) 060@RunWith(Parameterized.class) 061public class TestLazyDataBlockDecompression { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestLazyDataBlockDecompression.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class); 068 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 069 070 private FileSystem fs; 071 072 @Parameterized.Parameter(0) 073 public boolean cacheOnWrite; 074 075 @Parameterized.Parameters 076 public static Iterable<Object[]> data() { 077 return Arrays.asList(new Object[][] { 078 { false }, 079 { true } 080 }); 081 } 082 083 @Before 084 public void setUp() throws IOException { 085 fs = FileSystem.get(TEST_UTIL.getConfiguration()); 086 } 087 088 @After 089 public void tearDown() { 090 fs = null; 091 } 092 093 /** 094 * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row 095 * bytes of the KeyValues written, in the order they were written. 096 */ 097 private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, 098 HFileContext cxt, int entryCount) throws IOException { 099 HFile.Writer writer = new HFile.WriterFactory(conf, cc) 100 .withPath(fs, path) 101 .withFileContext(cxt) 102 .create(); 103 104 // write a bunch of random kv's 105 Random rand = new Random(9713312); // some seed. 106 final byte[] family = Bytes.toBytes("f"); 107 final byte[] qualifier = Bytes.toBytes("q"); 108 109 for (int i = 0; i < entryCount; i++) { 110 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i); 111 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 112 // make a real keyvalue so that hfile tool can examine it 113 writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); 114 } 115 writer.close(); 116 } 117 118 /** 119 * Read all blocks from {@code path} to populate {@code blockCache}. 120 */ 121 private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, 122 Path path, HFileContext cxt) throws IOException { 123 FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); 124 long fileSize = fs.getFileStatus(path).getLen(); 125 FixedFileTrailer trailer = 126 FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); 127 HFile.Reader reader = new HFileReaderImpl(path, trailer, fsdis, fileSize, cacheConfig, 128 fsdis.getHfs(), conf); 129 reader.loadFileInfo(); 130 long offset = trailer.getFirstDataBlockOffset(), 131 max = trailer.getLastDataBlockOffset(); 132 List<HFileBlock> blocks = new ArrayList<>(4); 133 HFileBlock block; 134 while (offset <= max) { 135 block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, 136 /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); 137 offset += block.getOnDiskSizeWithHeader(); 138 blocks.add(block); 139 } 140 LOG.info("read " + Iterables.toString(blocks)); 141 } 142 143 @Test 144 public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { 145 // enough room for 2 uncompressed block 146 int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); 147 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), 148 "testCompressionIncreasesEffectiveBlockcacheSize"); 149 HFileContext context = new HFileContextBuilder() 150 .withCompression(Compression.Algorithm.GZ) 151 .build(); 152 LOG.info("context=" + context); 153 154 // setup cache with lazy-decompression disabled. 155 Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 156 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 157 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 158 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 159 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 160 CacheConfig cc = new CacheConfig(lazyCompressDisabled, 161 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled)); 162 assertFalse(cc.shouldCacheDataCompressed()); 163 assertTrue(cc.getBlockCache().get() instanceof LruBlockCache); 164 LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 165 LOG.info("disabledBlockCache=" + disabledBlockCache); 166 assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); 167 assertTrue("eviction thread spawned unintentionally.", 168 disabledBlockCache.getEvictionThread() == null); 169 assertEquals("freshly created blockcache contains blocks.", 170 0, disabledBlockCache.getBlockCount()); 171 172 // 2000 kv's is ~3.6 full unencoded data blocks. 173 // Requires a conf and CacheConfig but should not be specific to this instance's cache settings 174 writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); 175 176 // populate the cache 177 cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); 178 long disabledBlockCount = disabledBlockCache.getBlockCount(); 179 assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, 180 disabledBlockCount > 0); 181 long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); 182 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : 183 disabledBlockCache.getMapForTests().entrySet()) { 184 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 185 assertTrue("found a packed block, block=" + block, block.isUnpacked()); 186 } 187 188 // count blocks with lazy decompression 189 Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 190 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 191 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 192 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 193 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 194 cc = new CacheConfig(lazyCompressEnabled, 195 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled)); 196 assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); 197 assertTrue(cc.getBlockCache().get() instanceof LruBlockCache); 198 LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 199 LOG.info("enabledBlockCache=" + enabledBlockCache); 200 assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); 201 assertTrue("eviction thread spawned unintentionally.", 202 enabledBlockCache.getEvictionThread() == null); 203 assertEquals("freshly created blockcache contains blocks.", 204 0, enabledBlockCache.getBlockCount()); 205 206 cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); 207 long enabledBlockCount = enabledBlockCache.getBlockCount(); 208 assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, 209 enabledBlockCount > 0); 210 long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); 211 int candidatesFound = 0; 212 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : 213 enabledBlockCache.getMapForTests().entrySet()) { 214 candidatesFound++; 215 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 216 if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { 217 assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" + 218 block.getBufferWithoutHeader().capacity(), block.isUnpacked()); 219 } 220 } 221 assertTrue("did not find any candidates for compressed caching. Invalid test.", 222 candidatesFound > 0); 223 224 LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + 225 enabledBlockCount); 226 assertTrue("enabling compressed data blocks should increase the effective cache size. " + 227 "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + 228 enabledBlockCount, disabledBlockCount < enabledBlockCount); 229 230 LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + 231 enabledEvictedCount); 232 assertTrue("enabling compressed data blocks should reduce the number of evictions. " + 233 "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + 234 enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); 235 } 236}