001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.EnumMap;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.stream.Stream;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.ArrayBackedTag;
039import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
040import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.Tag;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Durability;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
053import org.apache.hadoop.hbase.regionserver.BloomType;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
056import org.apache.hadoop.hbase.testclassification.IOTests;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.util.BloomFilterFactory;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.ChecksumType;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.Pair;
063import org.junit.jupiter.api.AfterAll;
064import org.junit.jupiter.api.AfterEach;
065import org.junit.jupiter.api.BeforeEach;
066import org.junit.jupiter.api.TestTemplate;
067import org.junit.jupiter.params.provider.Arguments;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
072import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
073
074/**
075 * Tests {@link HFile} cache-on-write functionality for the following block types: data blocks,
076 * non-root index blocks, and Bloom filter blocks.
077 */
078@HBaseParameterizedTestTemplate(name = "{0}-{1}-{2}-{3}")
079@org.junit.jupiter.api.Tag(IOTests.TAG)
080@org.junit.jupiter.api.Tag(LargeTests.TAG)
081public class TestCacheOnWrite {
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class);
084
085  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
086  private Configuration conf;
087  private CacheConfig cacheConf;
088  private FileSystem fs;
089  private Random rand = new Random(12983177L);
090  private Path storeFilePath;
091  private BlockCache blockCache;
092  private String testDescription;
093
094  private final CacheOnWriteType cowType;
095  private final Compression.Algorithm compress;
096  private final boolean cacheCompressedData;
097
098  private static final int DATA_BLOCK_SIZE = 2048;
099  private static final int NUM_KV = 25000;
100  private static final int INDEX_BLOCK_SIZE = 512;
101  private static final int BLOOM_BLOCK_SIZE = 4096;
102  private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
103  private static final int CKBYTES = 512;
104
105  private static final Set<BlockType> INDEX_BLOCK_TYPES = ImmutableSet.of(BlockType.INDEX_V1,
106    BlockType.INTERMEDIATE_INDEX, BlockType.ROOT_INDEX, BlockType.LEAF_INDEX);
107  private static final Set<BlockType> BLOOM_BLOCK_TYPES = ImmutableSet.of(BlockType.BLOOM_CHUNK,
108    BlockType.GENERAL_BLOOM_META, BlockType.DELETE_FAMILY_BLOOM_META);
109  private static final Set<BlockType> DATA_BLOCK_TYPES =
110    ImmutableSet.of(BlockType.ENCODED_DATA, BlockType.DATA);
111
112  // All test cases are supposed to generate files for compaction within this range
113  private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L;
114  private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L;
115
116  /** The number of valid key types possible in a store file */
117  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
118
119  private enum CacheOnWriteType {
120    DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, BlockType.DATA, BlockType.ENCODED_DATA),
121    BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, BlockType.BLOOM_CHUNK),
122    INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, BlockType.LEAF_INDEX,
123      BlockType.INTERMEDIATE_INDEX);
124
125    private final String confKey;
126    private final BlockType blockType1;
127    private final BlockType blockType2;
128
129    CacheOnWriteType(String confKey, BlockType blockType) {
130      this(confKey, blockType, blockType);
131    }
132
133    CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2) {
134      this.blockType1 = blockType1;
135      this.blockType2 = blockType2;
136      this.confKey = confKey;
137    }
138
139    public boolean shouldBeCached(BlockType blockType) {
140      return blockType == blockType1 || blockType == blockType2;
141    }
142
143    public void modifyConf(Configuration conf) {
144      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
145        conf.setBoolean(cowType.confKey, cowType == this);
146      }
147    }
148  }
149
150  public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
151    boolean cacheCompressedData, BlockCache blockCache) {
152    this.cowType = cowType;
153    this.compress = compress;
154    this.cacheCompressedData = cacheCompressedData;
155    this.blockCache = blockCache;
156    testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress
157      + ", cacheCompressedData=" + cacheCompressedData + "]";
158    LOG.info(testDescription);
159  }
160
161  private static List<BlockCache> getBlockCaches() throws IOException {
162    Configuration conf = TEST_UTIL.getConfiguration();
163    List<BlockCache> blockcaches = new ArrayList<>();
164    // default
165    blockcaches.add(BlockCacheFactory.createBlockCache(conf));
166
167    // set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287
168    TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
169      2.0f);
170    // memory
171    BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
172    blockcaches.add(lru);
173
174    // bucket cache
175    FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
176    int[] bucketSizes =
177      { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
178    BlockCache bucketcache =
179      new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
180    blockcaches.add(bucketcache);
181    return blockcaches;
182  }
183
184  public static Stream<Arguments> parameters() throws IOException {
185    List<Arguments> params = new ArrayList<>();
186    for (BlockCache blockCache : getBlockCaches()) {
187      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
188        for (Compression.Algorithm compress : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
189          for (boolean cacheCompressedData : new boolean[] { false, true }) {
190            params.add(Arguments.of(cowType, compress, cacheCompressedData, blockCache));
191          }
192        }
193      }
194    }
195    return params.stream();
196  }
197
198  private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
199    if (blockCache instanceof LruBlockCache) {
200      ((LruBlockCache) blockCache).clearCache();
201    } else {
202      // BucketCache may not return all cached blocks(blocks in write queue), so check it here.
203      for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
204        if (clearCount > 0) {
205          LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
206            + blockCache.getBlockCount() + " blocks remaining");
207          Thread.sleep(10);
208        }
209        for (CachedBlock block : Lists.newArrayList(blockCache)) {
210          BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
211          // CombinedBucketCache may need evict two times.
212          for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
213            if (evictCount > 1) {
214              LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
215                + " times, maybe a bug here");
216            }
217          }
218        }
219      }
220    }
221  }
222
223  @BeforeEach
224  public void setUp() throws IOException {
225    conf = TEST_UTIL.getConfiguration();
226    this.conf.set("dfs.datanode.data.dir.perm", "700");
227    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
228    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
229    conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
230    cowType.modifyConf(conf);
231    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA));
232    conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
233      cowType.shouldBeCached(BlockType.LEAF_INDEX));
234    conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
235      cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
236    cacheConf = new CacheConfig(conf, blockCache);
237    fs = HFileSystem.get(conf);
238  }
239
240  @AfterEach
241  public void tearDown() throws IOException, InterruptedException {
242    clearBlockCache(blockCache);
243  }
244
245  @AfterAll
246  public static void afterClass() throws IOException {
247    TEST_UTIL.cleanupTestDir();
248  }
249
250  private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
251    writeStoreFile(useTags);
252    readStoreFile(useTags);
253  }
254
255  private void readStoreFile(boolean useTags) throws IOException {
256    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
257    LOG.info("HFile information: " + reader);
258    HFileContext meta =
259      new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES)
260        .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE)
261        .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
262        .withIncludesTags(useTags).build();
263    final boolean cacheBlocks = false;
264    final boolean pread = false;
265    HFileScanner scanner = reader.getScanner(conf, cacheBlocks, pread);
266    assertTrue(scanner.seekTo(), testDescription);
267
268    long offset = 0;
269    EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class);
270
271    DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding();
272    List<Long> cachedBlocksOffset = new ArrayList<>();
273    Map<Long, Pair<HFileBlock, HFileBlock>> cachedBlocks = new HashMap<>();
274    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
275      // Flags: don't cache the block, use pread, this is not a compaction.
276      // Also, pass null for expected block type to avoid checking it.
277      HFileBlock block =
278        reader.readBlock(offset, -1, false, true, false, true, null, encodingInCache);
279      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
280      HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
281      boolean isCached = fromCache != null;
282      cachedBlocksOffset.add(offset);
283      cachedBlocks.put(offset, fromCache == null ? null : Pair.newPair(block, fromCache));
284      boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
285      assertTrue(shouldBeCached == isCached,
286        "shouldBeCached: " + shouldBeCached + "\n" + "isCached: " + isCached + "\n"
287          + "Test description: " + testDescription + "\n" + "block: " + block + "\n"
288          + "encodingInCache: " + encodingInCache + "\n" + "blockCacheKey: " + blockCacheKey);
289      if (isCached) {
290        if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
291          if (compress != Compression.Algorithm.NONE) {
292            assertFalse(fromCache.isUnpacked());
293          }
294          fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
295        } else {
296          assertTrue(fromCache.isUnpacked());
297        }
298        // block we cached at write-time and block read from file should be identical
299        assertEquals(block.getChecksumType(), fromCache.getChecksumType());
300        assertEquals(block.getBlockType(), fromCache.getBlockType());
301        assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
302        assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
303        assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
304        assertEquals(block.getUncompressedSizeWithoutHeader(),
305          fromCache.getUncompressedSizeWithoutHeader());
306      }
307      offset += block.getOnDiskSizeWithHeader();
308      BlockType bt = block.getBlockType();
309      Integer count = blockCountByType.get(bt);
310      blockCountByType.put(bt, (count == null ? 0 : count) + 1);
311    }
312
313    LOG.info("Block count by type: " + blockCountByType);
314    String countByType = blockCountByType.toString();
315    if (useTags) {
316      assertEquals(
317        "{" + BlockType.DATA + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}",
318        countByType);
319    } else {
320      assertEquals(
321        "{" + BlockType.DATA + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}",
322        countByType);
323    }
324
325    // iterate all the keyvalue from hfile
326    while (scanner.next()) {
327      scanner.getCell();
328    }
329    Iterator<Long> iterator = cachedBlocksOffset.iterator();
330    while (iterator.hasNext()) {
331      Long entry = iterator.next();
332      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), entry);
333      Pair<HFileBlock, HFileBlock> blockPair = cachedBlocks.get(entry);
334      if (blockPair != null) {
335        // Call return twice because for the isCache cased the counter would have got incremented
336        // twice. Notice that here we need to returnBlock with different blocks. see comments in
337        // BucketCache#returnBlock.
338        blockPair.getSecond().release();
339        if (cacheCompressedData) {
340          if (
341            this.compress == Compression.Algorithm.NONE || cowType == CacheOnWriteType.INDEX_BLOCKS
342              || cowType == CacheOnWriteType.BLOOM_BLOCKS
343          ) {
344            blockPair.getFirst().release();
345          }
346        } else {
347          blockPair.getFirst().release();
348        }
349      }
350    }
351    scanner.shipped();
352    reader.close();
353  }
354
355  public static KeyValue.Type generateKeyType(Random rand) {
356    if (rand.nextBoolean()) {
357      // Let's make half of KVs puts.
358      return KeyValue.Type.Put;
359    } else {
360      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
361      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
362        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
363          + "Probably the layout of KeyValue.Type has changed.");
364      }
365      return keyType;
366    }
367  }
368
369  private void writeStoreFile(boolean useTags) throws IOException {
370    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "test_cache_on_write");
371    HFileContext meta =
372      new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES)
373        .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE)
374        .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
375        .withIncludesTags(useTags).build();
376    StoreFileWriter sfw =
377      new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeFileParentDir)
378        .withFileContext(meta).withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
379    byte[] cf = Bytes.toBytes("fam");
380    for (int i = 0; i < NUM_KV; ++i) {
381      byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i);
382      byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand);
383      byte[] value = RandomKeyValueUtil.randomValue(rand);
384      KeyValue kv;
385      if (useTags) {
386        Tag t = new ArrayBackedTag((byte) 1, "visibility");
387        List<Tag> tagList = new ArrayList<>();
388        tagList.add(t);
389        Tag[] tags = new Tag[1];
390        tags[0] = t;
391        kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
392          Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList);
393      } else {
394        kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
395          Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length);
396      }
397      sfw.append(kv);
398    }
399
400    sfw.close();
401    storeFilePath = sfw.getPath();
402  }
403
404  private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
405    boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold)
406    throws IOException, InterruptedException {
407    // create a localConf
408    boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false);
409    long localCacheCompactedBlocksThreshold =
410      conf.getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
411        CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
412    boolean localCacheBloomBlocksValue = conf.getBoolean(
413      CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE);
414    boolean localCacheIndexBlocksValue = conf.getBoolean(
415      CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE);
416
417    try {
418      // Set the conf if testing caching compacted blocks on write
419      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, cacheBlocksOnCompaction);
420
421      // set size threshold if testing compaction size threshold
422      if (cacheBlocksOnCompactionThreshold > 0) {
423        conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
424          cacheBlocksOnCompactionThreshold);
425      }
426
427      // TODO: need to change this test if we add a cache size threshold for
428      // compactions, or if we implement some other kind of intelligent logic for
429      // deciding what blocks to cache-on-write on compaction.
430      final String table = "CompactionCacheOnWrite";
431      final String cf = "myCF";
432      final byte[] cfBytes = Bytes.toBytes(cf);
433      final int maxVersions = 3;
434      ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes)
435        .setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
436        .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
437      HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
438      int rowIdx = 0;
439      long ts = EnvironmentEdgeManager.currentTime();
440      for (int iFile = 0; iFile < 5; ++iFile) {
441        for (int iRow = 0; iRow < 500; ++iRow) {
442          String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow;
443          Put p = new Put(Bytes.toBytes(rowStr));
444          ++rowIdx;
445          for (int iCol = 0; iCol < 10; ++iCol) {
446            String qualStr = "col" + iCol;
447            String valueStr = "value_" + rowStr + "_" + qualStr;
448            for (int iTS = 0; iTS < 5; ++iTS) {
449              if (useTags) {
450                Tag t = new ArrayBackedTag((byte) 1, "visibility");
451                Tag[] tags = new Tag[1];
452                tags[0] = t;
453                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
454                  HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
455                p.add(kv);
456              } else {
457                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
458                  ts++, Bytes.toBytes(valueStr));
459                p.add(kv);
460              }
461            }
462          }
463          p.setDurability(Durability.ASYNC_WAL);
464          region.put(p);
465        }
466        region.flush(true);
467      }
468
469      clearBlockCache(blockCache);
470      assertEquals(0, blockCache.getBlockCount());
471
472      region.compact(false);
473      LOG.debug("compactStores() returned");
474
475      boolean dataBlockCached = false;
476      boolean bloomBlockCached = false;
477      boolean indexBlockCached = false;
478
479      for (CachedBlock block : blockCache) {
480        if (DATA_BLOCK_TYPES.contains(block.getBlockType())) {
481          dataBlockCached = true;
482        } else if (BLOOM_BLOCK_TYPES.contains(block.getBlockType())) {
483          bloomBlockCached = true;
484        } else if (INDEX_BLOCK_TYPES.contains(block.getBlockType())) {
485          indexBlockCached = true;
486        }
487      }
488
489      // Data blocks should be cached in instances where we are caching blocks on write. In the case
490      // of testing
491      // BucketCache, we cannot verify block type as it is not stored in the cache.
492      boolean cacheOnCompactAndNonBucketCache =
493        cacheBlocksOnCompaction && !(blockCache instanceof BucketCache);
494
495      String assertErrorMessage = "\nTest description: " + testDescription
496        + "\ncacheBlocksOnCompaction: " + cacheBlocksOnCompaction + "\n";
497
498      if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) {
499        if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) {
500          assertTrue(dataBlockCached, assertErrorMessage);
501          assertTrue(bloomBlockCached, assertErrorMessage);
502          assertTrue(indexBlockCached, assertErrorMessage);
503        } else {
504          assertFalse(dataBlockCached, assertErrorMessage);
505
506          if (localCacheBloomBlocksValue) {
507            assertTrue(bloomBlockCached, assertErrorMessage);
508          } else {
509            assertFalse(bloomBlockCached, assertErrorMessage);
510          }
511
512          if (localCacheIndexBlocksValue) {
513            assertTrue(indexBlockCached, assertErrorMessage);
514          } else {
515            assertFalse(indexBlockCached, assertErrorMessage);
516          }
517        }
518      } else {
519        assertEquals(cacheOnCompactAndNonBucketCache, dataBlockCached, assertErrorMessage);
520
521        if (cacheOnCompactAndNonBucketCache) {
522          assertTrue(bloomBlockCached, assertErrorMessage);
523          assertTrue(indexBlockCached, assertErrorMessage);
524        }
525      }
526
527      region.close();
528    } finally {
529      // reset back
530      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
531      conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
532        localCacheCompactedBlocksThreshold);
533      conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue);
534      conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue);
535    }
536  }
537
538  @TestTemplate
539  public void testStoreFileCacheOnWrite() throws IOException {
540    testStoreFileCacheOnWriteInternals(false);
541    testStoreFileCacheOnWriteInternals(true);
542  }
543
544  @TestTemplate
545  public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
546    testCachingDataBlocksDuringCompactionInternals(false, false, -1);
547    testCachingDataBlocksDuringCompactionInternals(true, true, -1);
548  }
549
550  @TestTemplate
551  public void testCachingDataBlocksThresholdDuringCompaction()
552    throws IOException, InterruptedException {
553    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD);
554    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD);
555  }
556
557}