001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
039import org.apache.hadoop.hbase.io.compress.Compression;
040import org.apache.hadoop.hbase.testclassification.IOTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.After;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.runner.RunWith;
049import org.junit.runners.Parameterized;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
054
055/**
056 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig},
057 * and {@link LruBlockCache}.
058 */
059@Category({IOTests.class, SmallTests.class})
060@RunWith(Parameterized.class)
061public class TestLazyDataBlockDecompression {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestLazyDataBlockDecompression.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class);
068  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
069
070  private FileSystem fs;
071
072  @Parameterized.Parameter(0)
073  public boolean cacheOnWrite;
074
075  @Parameterized.Parameters
076  public static Iterable<Object[]> data() {
077    return Arrays.asList(new Object[][] {
078      { false },
079      { true }
080    });
081  }
082
083  @Before
084  public void setUp() throws IOException {
085    fs = FileSystem.get(TEST_UTIL.getConfiguration());
086  }
087
088  @After
089  public void tearDown() {
090    fs = null;
091  }
092
093  /**
094   * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row
095   * bytes of the KeyValues written, in the order they were written.
096   */
097  private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path,
098      HFileContext cxt, int entryCount) throws IOException {
099    HFile.Writer writer = new HFile.WriterFactory(conf, cc)
100        .withPath(fs, path)
101        .withFileContext(cxt)
102        .create();
103
104    // write a bunch of random kv's
105    Random rand = new Random(9713312); // some seed.
106    final byte[] family = Bytes.toBytes("f");
107    final byte[] qualifier = Bytes.toBytes("q");
108
109    for (int i = 0; i < entryCount; i++) {
110      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i);
111      byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
112      // make a real keyvalue so that hfile tool can examine it
113      writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes));
114    }
115    writer.close();
116  }
117
118  /**
119   * Read all blocks from {@code path} to populate {@code blockCache}.
120   */
121  private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs,
122      Path path, HFileContext cxt) throws IOException {
123    FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
124    long fileSize = fs.getFileStatus(path).getLen();
125    FixedFileTrailer trailer =
126      FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
127    ReaderContext context = new ReaderContextBuilder()
128        .withFilePath(path)
129        .withFileSize(fileSize)
130        .withFileSystem(fsdis.getHfs())
131        .withInputStreamWrapper(fsdis)
132        .build();
133    HFileInfo fileInfo = new HFileInfo(context, conf);
134    HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf);
135    fileInfo.initMetaAndIndex(reader);
136    long offset = trailer.getFirstDataBlockOffset(),
137      max = trailer.getLastDataBlockOffset();
138    List<HFileBlock> blocks = new ArrayList<>(4);
139    HFileBlock block;
140    while (offset <= max) {
141      block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false,
142      /* isCompaction */ false, /* updateCacheMetrics */ true, null, null);
143      offset += block.getOnDiskSizeWithHeader();
144      blocks.add(block);
145    }
146    LOG.info("read " + Iterables.toString(blocks));
147    reader.close();
148  }
149
150  @Test
151  public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception {
152    // enough room for 2 uncompressed block
153    int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1);
154    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
155      "testCompressionIncreasesEffectiveBlockcacheSize");
156    HFileContext context = new HFileContextBuilder()
157      .withCompression(Compression.Algorithm.GZ)
158      .build();
159    LOG.info("context=" + context);
160
161    // setup cache with lazy-decompression disabled.
162    Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
163    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
164    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
165    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
166    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
167    CacheConfig cc = new CacheConfig(lazyCompressDisabled,
168        new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled));
169    assertFalse(cc.shouldCacheDataCompressed());
170    assertFalse(cc.isCombinedBlockCache());
171    LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
172    LOG.info("disabledBlockCache=" + disabledBlockCache);
173    assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());
174    assertTrue("eviction thread spawned unintentionally.",
175      disabledBlockCache.getEvictionThread() == null);
176    assertEquals("freshly created blockcache contains blocks.",
177      0, disabledBlockCache.getBlockCount());
178
179    // 2000 kv's is ~3.6 full unencoded data blocks.
180    // Requires a conf and CacheConfig but should not be specific to this instance's cache settings
181    writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000);
182
183    // populate the cache
184    cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context);
185    long disabledBlockCount = disabledBlockCache.getBlockCount();
186    assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount,
187      disabledBlockCount > 0);
188    long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount();
189    for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
190      disabledBlockCache.getMapForTests().entrySet()) {
191      HFileBlock block = (HFileBlock) e.getValue().getBuffer();
192      assertTrue("found a packed block, block=" + block, block.isUnpacked());
193    }
194
195    // count blocks with lazy decompression
196    Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
197    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
198    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
199    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
200    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
201    cc = new CacheConfig(lazyCompressEnabled,
202        new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled));
203    assertTrue("test improperly configured.", cc.shouldCacheDataCompressed());
204    assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
205    LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
206    LOG.info("enabledBlockCache=" + enabledBlockCache);
207    assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize());
208    assertTrue("eviction thread spawned unintentionally.",
209      enabledBlockCache.getEvictionThread() == null);
210    assertEquals("freshly created blockcache contains blocks.",
211      0, enabledBlockCache.getBlockCount());
212
213    cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context);
214    long enabledBlockCount = enabledBlockCache.getBlockCount();
215    assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount,
216      enabledBlockCount > 0);
217    long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount();
218    int candidatesFound = 0;
219    for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
220        enabledBlockCache.getMapForTests().entrySet()) {
221      candidatesFound++;
222      HFileBlock block = (HFileBlock) e.getValue().getBuffer();
223      if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) {
224        assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" +
225          block.getBufferWithoutHeader().capacity(), block.isUnpacked());
226      }
227    }
228    assertTrue("did not find any candidates for compressed caching. Invalid test.",
229      candidatesFound > 0);
230
231    LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
232      enabledBlockCount);
233    assertTrue("enabling compressed data blocks should increase the effective cache size. " +
234      "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
235      enabledBlockCount, disabledBlockCount < enabledBlockCount);
236
237    LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
238      enabledEvictedCount);
239    assertTrue("enabling compressed data blocks should reduce the number of evictions. " +
240      "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
241      enabledEvictedCount, enabledEvictedCount < disabledEvictedCount);
242  }
243}