001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.Random;
029import java.util.stream.Stream;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
039import org.apache.hadoop.hbase.io.compress.Compression;
040import org.apache.hadoop.hbase.testclassification.IOTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.jupiter.api.AfterEach;
044import org.junit.jupiter.api.BeforeEach;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.TestTemplate;
047import org.junit.jupiter.params.provider.Arguments;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
052
053/**
054 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, and
055 * {@link LruBlockCache}.
056 */
057@Tag(IOTests.TAG)
058@Tag(SmallTests.TAG)
059@HBaseParameterizedTestTemplate(name = "{index}: cacheOnWrite={0}")
060public class TestLazyDataBlockDecompression {
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class);
063  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
064  private static final Random RNG = new Random(9713312); // Just a fixed seed.
065
066  private FileSystem fs;
067
068  private final boolean cacheOnWrite;
069
070  public static Stream<Arguments> parameters() {
071    return Stream.of(Arguments.of(false), Arguments.of(true));
072  }
073
074  public TestLazyDataBlockDecompression(boolean cacheOnWrite) {
075    this.cacheOnWrite = cacheOnWrite;
076  }
077
078  @BeforeEach
079  public void setUp() throws IOException {
080    fs = FileSystem.get(TEST_UTIL.getConfiguration());
081  }
082
083  @AfterEach
084  public void tearDown() {
085    fs = null;
086  }
087
088  /**
089   * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row bytes
090   * of the KeyValues written, in the order they were written.
091   */
092  private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path,
093    HFileContext cxt, int entryCount) throws IOException {
094    HFile.Writer writer =
095      new HFile.WriterFactory(conf, cc).withPath(fs, path).withFileContext(cxt).create();
096
097    // write a bunch of random kvs
098    final byte[] family = Bytes.toBytes("f");
099    final byte[] qualifier = Bytes.toBytes("q");
100    for (int i = 0; i < entryCount; i++) {
101      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(RNG, i);
102      byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG);
103      // make a real keyvalue so that hfile tool can examine it
104      writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes));
105    }
106    writer.close();
107  }
108
109  /**
110   * Read all blocks from {@code path} to populate {@code blockCache}.
111   */
112  private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs,
113    Path path, HFileContext cxt) throws IOException {
114    FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
115    long fileSize = fs.getFileStatus(path).getLen();
116    FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
117    ReaderContext context = new ReaderContextBuilder().withFilePath(path).withFileSize(fileSize)
118      .withFileSystem(fsdis.getHfs()).withInputStreamWrapper(fsdis).build();
119    HFileInfo fileInfo = new HFileInfo(context, conf);
120    HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf);
121    fileInfo.initMetaAndIndex(reader);
122    long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset();
123    List<HFileBlock> blocks = new ArrayList<>(4);
124    HFileBlock block;
125    while (offset <= max) {
126      block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false,
127        /* isCompaction */ false, /* updateCacheMetrics */ true, null, null);
128      offset += block.getOnDiskSizeWithHeader();
129      blocks.add(block);
130    }
131    LOG.info("read " + Iterables.toString(blocks));
132    reader.close();
133  }
134
135  @TestTemplate
136  public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception {
137    // enough room for 2 uncompressed block
138    int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1);
139    Path hfilePath =
140      new Path(TEST_UTIL.getDataTestDir(), "testCompressionIncreasesEffectiveBlockcacheSize");
141    HFileContext context =
142      new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).build();
143    LOG.info("context=" + context);
144
145    // setup cache with lazy-decompression disabled.
146    Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
147    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
148    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
149    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
150    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
151    CacheConfig cc = new CacheConfig(lazyCompressDisabled,
152      new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled));
153    assertFalse(cc.shouldCacheDataCompressed());
154    assertFalse(cc.isCombinedBlockCache());
155    LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
156    LOG.info("disabledBlockCache=" + disabledBlockCache);
157    assertEquals(maxSize, disabledBlockCache.getMaxSize(), "test inconsistency detected.");
158    assertTrue(disabledBlockCache.getEvictionThread() == null,
159      "eviction thread spawned unintentionally.");
160    assertEquals(0, disabledBlockCache.getBlockCount(),
161      "freshly created blockcache contains blocks.");
162
163    // 2000 kv's is ~3.6 full unencoded data blocks.
164    // Requires a conf and CacheConfig but should not be specific to this instance's cache settings
165    writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000);
166
167    // populate the cache
168    cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context);
169    long disabledBlockCount = disabledBlockCache.getBlockCount();
170    assertTrue(disabledBlockCount > 0,
171      "blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount);
172    long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount();
173    for (Map.Entry<BlockCacheKey, LruCachedBlock> e : disabledBlockCache.getMapForTests()
174      .entrySet()) {
175      HFileBlock block = (HFileBlock) e.getValue().getBuffer();
176      assertTrue(block.isUnpacked(), "found a packed block, block=" + block);
177    }
178
179    // count blocks with lazy decompression
180    Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
181    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
182    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
183    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
184    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
185    cc = new CacheConfig(lazyCompressEnabled,
186      new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled));
187    assertTrue(cc.shouldCacheDataCompressed(), "test improperly configured.");
188    assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
189    LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
190    LOG.info("enabledBlockCache=" + enabledBlockCache);
191    assertEquals(maxSize, enabledBlockCache.getMaxSize(), "test inconsistency detected");
192    assertTrue(enabledBlockCache.getEvictionThread() == null,
193      "eviction thread spawned unintentionally.");
194    assertEquals(0, enabledBlockCache.getBlockCount(),
195      "freshly created blockcache contains blocks.");
196
197    cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context);
198    long enabledBlockCount = enabledBlockCache.getBlockCount();
199    assertTrue(enabledBlockCount > 0,
200      "blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount);
201    long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount();
202    int candidatesFound = 0;
203    for (Map.Entry<BlockCacheKey, LruCachedBlock> e : enabledBlockCache.getMapForTests()
204      .entrySet()) {
205      candidatesFound++;
206      HFileBlock block = (HFileBlock) e.getValue().getBuffer();
207      if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) {
208        assertFalse(block.isUnpacked(), "found an unpacked block, block=" + block
209          + ", block buffer capacity=" + block.getBufferWithoutHeader().capacity());
210      }
211    }
212    assertTrue(candidatesFound > 0,
213      "did not find any candidates for compressed caching. Invalid test.");
214
215    LOG.info(
216      "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount);
217    assertTrue(disabledBlockCount < enabledBlockCount,
218      "enabling compressed data blocks should increase the effective cache size. "
219        + "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount);
220
221    LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount="
222      + enabledEvictedCount);
223    assertTrue(enabledEvictedCount < disabledEvictedCount,
224      "enabling compressed data blocks should reduce the number of evictions. "
225        + "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount="
226        + enabledEvictedCount);
227  }
228}