001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.EnumMap; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.stream.Stream; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.ArrayBackedTag; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 040import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.Tag; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 053import org.apache.hadoop.hbase.regionserver.BloomType; 054import org.apache.hadoop.hbase.regionserver.HRegion; 055import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 056import org.apache.hadoop.hbase.testclassification.IOTests; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.util.BloomFilterFactory; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.ChecksumType; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.Pair; 063import org.junit.jupiter.api.AfterAll; 064import org.junit.jupiter.api.AfterEach; 065import org.junit.jupiter.api.BeforeEach; 066import org.junit.jupiter.api.TestTemplate; 067import org.junit.jupiter.params.provider.Arguments; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 072import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 073 074/** 075 * Tests {@link HFile} cache-on-write functionality for the following block types: data blocks, 076 * non-root index blocks, and Bloom filter blocks. 077 */ 078@HBaseParameterizedTestTemplate(name = "{0}-{1}-{2}-{3}") 079@org.junit.jupiter.api.Tag(IOTests.TAG) 080@org.junit.jupiter.api.Tag(LargeTests.TAG) 081public class TestCacheOnWrite { 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class); 084 085 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 086 private Configuration conf; 087 private CacheConfig cacheConf; 088 private FileSystem fs; 089 private Random rand = new Random(12983177L); 090 private Path storeFilePath; 091 private BlockCache blockCache; 092 private String testDescription; 093 094 private final CacheOnWriteType cowType; 095 private final Compression.Algorithm compress; 096 private final boolean cacheCompressedData; 097 098 private static final int DATA_BLOCK_SIZE = 2048; 099 private static final int NUM_KV = 25000; 100 private static final int INDEX_BLOCK_SIZE = 512; 101 private static final int BLOOM_BLOCK_SIZE = 4096; 102 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; 103 private static final int CKBYTES = 512; 104 105 private static final Set<BlockType> INDEX_BLOCK_TYPES = ImmutableSet.of(BlockType.INDEX_V1, 106 BlockType.INTERMEDIATE_INDEX, BlockType.ROOT_INDEX, BlockType.LEAF_INDEX); 107 private static final Set<BlockType> BLOOM_BLOCK_TYPES = ImmutableSet.of(BlockType.BLOOM_CHUNK, 108 BlockType.GENERAL_BLOOM_META, BlockType.DELETE_FAMILY_BLOOM_META); 109 private static final Set<BlockType> DATA_BLOCK_TYPES = 110 ImmutableSet.of(BlockType.ENCODED_DATA, BlockType.DATA); 111 112 // All test cases are supposed to generate files for compaction within this range 113 private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L; 114 private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L; 115 116 /** The number of valid key types possible in a store file */ 117 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 118 119 private enum CacheOnWriteType { 120 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, BlockType.DATA, BlockType.ENCODED_DATA), 121 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, BlockType.BLOOM_CHUNK), 122 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, BlockType.LEAF_INDEX, 123 BlockType.INTERMEDIATE_INDEX); 124 125 private final String confKey; 126 private final BlockType blockType1; 127 private final BlockType blockType2; 128 129 CacheOnWriteType(String confKey, BlockType blockType) { 130 this(confKey, blockType, blockType); 131 } 132 133 CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2) { 134 this.blockType1 = blockType1; 135 this.blockType2 = blockType2; 136 this.confKey = confKey; 137 } 138 139 public boolean shouldBeCached(BlockType blockType) { 140 return blockType == blockType1 || blockType == blockType2; 141 } 142 143 public void modifyConf(Configuration conf) { 144 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 145 conf.setBoolean(cowType.confKey, cowType == this); 146 } 147 } 148 } 149 150 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, 151 boolean cacheCompressedData, BlockCache blockCache) { 152 this.cowType = cowType; 153 this.compress = compress; 154 this.cacheCompressedData = cacheCompressedData; 155 this.blockCache = blockCache; 156 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress 157 + ", cacheCompressedData=" + cacheCompressedData + "]"; 158 LOG.info(testDescription); 159 } 160 161 private static List<BlockCache> getBlockCaches() throws IOException { 162 Configuration conf = TEST_UTIL.getConfiguration(); 163 List<BlockCache> blockcaches = new ArrayList<>(); 164 // default 165 blockcaches.add(BlockCacheFactory.createBlockCache(conf)); 166 167 // set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 168 TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 169 2.0f); 170 // memory 171 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); 172 blockcaches.add(lru); 173 174 // bucket cache 175 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); 176 int[] bucketSizes = 177 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; 178 BlockCache bucketcache = 179 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); 180 blockcaches.add(bucketcache); 181 return blockcaches; 182 } 183 184 public static Stream<Arguments> parameters() throws IOException { 185 List<Arguments> params = new ArrayList<>(); 186 for (BlockCache blockCache : getBlockCaches()) { 187 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 188 for (Compression.Algorithm compress : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 189 for (boolean cacheCompressedData : new boolean[] { false, true }) { 190 params.add(Arguments.of(cowType, compress, cacheCompressedData, blockCache)); 191 } 192 } 193 } 194 } 195 return params.stream(); 196 } 197 198 private void clearBlockCache(BlockCache blockCache) throws InterruptedException { 199 if (blockCache instanceof LruBlockCache) { 200 ((LruBlockCache) blockCache).clearCache(); 201 } else { 202 // BucketCache may not return all cached blocks(blocks in write queue), so check it here. 203 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { 204 if (clearCount > 0) { 205 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " 206 + blockCache.getBlockCount() + " blocks remaining"); 207 Thread.sleep(10); 208 } 209 for (CachedBlock block : Lists.newArrayList(blockCache)) { 210 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); 211 // CombinedBucketCache may need evict two times. 212 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { 213 if (evictCount > 1) { 214 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount 215 + " times, maybe a bug here"); 216 } 217 } 218 } 219 } 220 } 221 } 222 223 @BeforeEach 224 public void setUp() throws IOException { 225 conf = TEST_UTIL.getConfiguration(); 226 this.conf.set("dfs.datanode.data.dir.perm", "700"); 227 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); 228 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); 229 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); 230 cowType.modifyConf(conf); 231 conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA)); 232 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 233 cowType.shouldBeCached(BlockType.LEAF_INDEX)); 234 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 235 cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); 236 cacheConf = new CacheConfig(conf, blockCache); 237 fs = HFileSystem.get(conf); 238 } 239 240 @AfterEach 241 public void tearDown() throws IOException, InterruptedException { 242 clearBlockCache(blockCache); 243 } 244 245 @AfterAll 246 public static void afterClass() throws IOException { 247 TEST_UTIL.cleanupTestDir(); 248 } 249 250 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { 251 writeStoreFile(useTags); 252 readStoreFile(useTags); 253 } 254 255 private void readStoreFile(boolean useTags) throws IOException { 256 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 257 LOG.info("HFile information: " + reader); 258 HFileContext meta = 259 new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES) 260 .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE) 261 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 262 .withIncludesTags(useTags).build(); 263 final boolean cacheBlocks = false; 264 final boolean pread = false; 265 HFileScanner scanner = reader.getScanner(conf, cacheBlocks, pread); 266 assertTrue(scanner.seekTo(), testDescription); 267 268 long offset = 0; 269 EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class); 270 271 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); 272 List<Long> cachedBlocksOffset = new ArrayList<>(); 273 Map<Long, Pair<HFileBlock, HFileBlock>> cachedBlocks = new HashMap<>(); 274 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 275 // Flags: don't cache the block, use pread, this is not a compaction. 276 // Also, pass null for expected block type to avoid checking it. 277 HFileBlock block = 278 reader.readBlock(offset, -1, false, true, false, true, null, encodingInCache); 279 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 280 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); 281 boolean isCached = fromCache != null; 282 cachedBlocksOffset.add(offset); 283 cachedBlocks.put(offset, fromCache == null ? null : Pair.newPair(block, fromCache)); 284 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); 285 assertTrue(shouldBeCached == isCached, 286 "shouldBeCached: " + shouldBeCached + "\n" + "isCached: " + isCached + "\n" 287 + "Test description: " + testDescription + "\n" + "block: " + block + "\n" 288 + "encodingInCache: " + encodingInCache + "\n" + "blockCacheKey: " + blockCacheKey); 289 if (isCached) { 290 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { 291 if (compress != Compression.Algorithm.NONE) { 292 assertFalse(fromCache.isUnpacked()); 293 } 294 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); 295 } else { 296 assertTrue(fromCache.isUnpacked()); 297 } 298 // block we cached at write-time and block read from file should be identical 299 assertEquals(block.getChecksumType(), fromCache.getChecksumType()); 300 assertEquals(block.getBlockType(), fromCache.getBlockType()); 301 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); 302 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); 303 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); 304 assertEquals(block.getUncompressedSizeWithoutHeader(), 305 fromCache.getUncompressedSizeWithoutHeader()); 306 } 307 offset += block.getOnDiskSizeWithHeader(); 308 BlockType bt = block.getBlockType(); 309 Integer count = blockCountByType.get(bt); 310 blockCountByType.put(bt, (count == null ? 0 : count) + 1); 311 } 312 313 LOG.info("Block count by type: " + blockCountByType); 314 String countByType = blockCountByType.toString(); 315 if (useTags) { 316 assertEquals( 317 "{" + BlockType.DATA + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", 318 countByType); 319 } else { 320 assertEquals( 321 "{" + BlockType.DATA + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", 322 countByType); 323 } 324 325 // iterate all the keyvalue from hfile 326 while (scanner.next()) { 327 scanner.getCell(); 328 } 329 Iterator<Long> iterator = cachedBlocksOffset.iterator(); 330 while (iterator.hasNext()) { 331 Long entry = iterator.next(); 332 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), entry); 333 Pair<HFileBlock, HFileBlock> blockPair = cachedBlocks.get(entry); 334 if (blockPair != null) { 335 // Call return twice because for the isCache cased the counter would have got incremented 336 // twice. Notice that here we need to returnBlock with different blocks. see comments in 337 // BucketCache#returnBlock. 338 blockPair.getSecond().release(); 339 if (cacheCompressedData) { 340 if ( 341 this.compress == Compression.Algorithm.NONE || cowType == CacheOnWriteType.INDEX_BLOCKS 342 || cowType == CacheOnWriteType.BLOOM_BLOCKS 343 ) { 344 blockPair.getFirst().release(); 345 } 346 } else { 347 blockPair.getFirst().release(); 348 } 349 } 350 } 351 scanner.shipped(); 352 reader.close(); 353 } 354 355 public static KeyValue.Type generateKeyType(Random rand) { 356 if (rand.nextBoolean()) { 357 // Let's make half of KVs puts. 358 return KeyValue.Type.Put; 359 } else { 360 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 361 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 362 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 363 + "Probably the layout of KeyValue.Type has changed."); 364 } 365 return keyType; 366 } 367 } 368 369 private void writeStoreFile(boolean useTags) throws IOException { 370 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "test_cache_on_write"); 371 HFileContext meta = 372 new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES) 373 .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE) 374 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 375 .withIncludesTags(useTags).build(); 376 StoreFileWriter sfw = 377 new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeFileParentDir) 378 .withFileContext(meta).withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); 379 byte[] cf = Bytes.toBytes("fam"); 380 for (int i = 0; i < NUM_KV; ++i) { 381 byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i); 382 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); 383 byte[] value = RandomKeyValueUtil.randomValue(rand); 384 KeyValue kv; 385 if (useTags) { 386 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 387 List<Tag> tagList = new ArrayList<>(); 388 tagList.add(t); 389 Tag[] tags = new Tag[1]; 390 tags[0] = t; 391 kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 392 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList); 393 } else { 394 kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 395 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length); 396 } 397 sfw.append(kv); 398 } 399 400 sfw.close(); 401 storeFilePath = sfw.getPath(); 402 } 403 404 private void testCachingDataBlocksDuringCompactionInternals(boolean useTags, 405 boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold) 406 throws IOException, InterruptedException { 407 // create a localConf 408 boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false); 409 long localCacheCompactedBlocksThreshold = 410 conf.getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 411 CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD); 412 boolean localCacheBloomBlocksValue = conf.getBoolean( 413 CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE); 414 boolean localCacheIndexBlocksValue = conf.getBoolean( 415 CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE); 416 417 try { 418 // Set the conf if testing caching compacted blocks on write 419 conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, cacheBlocksOnCompaction); 420 421 // set size threshold if testing compaction size threshold 422 if (cacheBlocksOnCompactionThreshold > 0) { 423 conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 424 cacheBlocksOnCompactionThreshold); 425 } 426 427 // TODO: need to change this test if we add a cache size threshold for 428 // compactions, or if we implement some other kind of intelligent logic for 429 // deciding what blocks to cache-on-write on compaction. 430 final String table = "CompactionCacheOnWrite"; 431 final String cf = "myCF"; 432 final byte[] cfBytes = Bytes.toBytes(cf); 433 final int maxVersions = 3; 434 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes) 435 .setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions) 436 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build(); 437 HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache); 438 int rowIdx = 0; 439 long ts = EnvironmentEdgeManager.currentTime(); 440 for (int iFile = 0; iFile < 5; ++iFile) { 441 for (int iRow = 0; iRow < 500; ++iRow) { 442 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow; 443 Put p = new Put(Bytes.toBytes(rowStr)); 444 ++rowIdx; 445 for (int iCol = 0; iCol < 10; ++iCol) { 446 String qualStr = "col" + iCol; 447 String valueStr = "value_" + rowStr + "_" + qualStr; 448 for (int iTS = 0; iTS < 5; ++iTS) { 449 if (useTags) { 450 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 451 Tag[] tags = new Tag[1]; 452 tags[0] = t; 453 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 454 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); 455 p.add(kv); 456 } else { 457 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 458 ts++, Bytes.toBytes(valueStr)); 459 p.add(kv); 460 } 461 } 462 } 463 p.setDurability(Durability.ASYNC_WAL); 464 region.put(p); 465 } 466 region.flush(true); 467 } 468 469 clearBlockCache(blockCache); 470 assertEquals(0, blockCache.getBlockCount()); 471 472 region.compact(false); 473 LOG.debug("compactStores() returned"); 474 475 boolean dataBlockCached = false; 476 boolean bloomBlockCached = false; 477 boolean indexBlockCached = false; 478 479 for (CachedBlock block : blockCache) { 480 if (DATA_BLOCK_TYPES.contains(block.getBlockType())) { 481 dataBlockCached = true; 482 } else if (BLOOM_BLOCK_TYPES.contains(block.getBlockType())) { 483 bloomBlockCached = true; 484 } else if (INDEX_BLOCK_TYPES.contains(block.getBlockType())) { 485 indexBlockCached = true; 486 } 487 } 488 489 // Data blocks should be cached in instances where we are caching blocks on write. In the case 490 // of testing 491 // BucketCache, we cannot verify block type as it is not stored in the cache. 492 boolean cacheOnCompactAndNonBucketCache = 493 cacheBlocksOnCompaction && !(blockCache instanceof BucketCache); 494 495 String assertErrorMessage = "\nTest description: " + testDescription 496 + "\ncacheBlocksOnCompaction: " + cacheBlocksOnCompaction + "\n"; 497 498 if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) { 499 if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) { 500 assertTrue(dataBlockCached, assertErrorMessage); 501 assertTrue(bloomBlockCached, assertErrorMessage); 502 assertTrue(indexBlockCached, assertErrorMessage); 503 } else { 504 assertFalse(dataBlockCached, assertErrorMessage); 505 506 if (localCacheBloomBlocksValue) { 507 assertTrue(bloomBlockCached, assertErrorMessage); 508 } else { 509 assertFalse(bloomBlockCached, assertErrorMessage); 510 } 511 512 if (localCacheIndexBlocksValue) { 513 assertTrue(indexBlockCached, assertErrorMessage); 514 } else { 515 assertFalse(indexBlockCached, assertErrorMessage); 516 } 517 } 518 } else { 519 assertEquals(cacheOnCompactAndNonBucketCache, dataBlockCached, assertErrorMessage); 520 521 if (cacheOnCompactAndNonBucketCache) { 522 assertTrue(bloomBlockCached, assertErrorMessage); 523 assertTrue(indexBlockCached, assertErrorMessage); 524 } 525 } 526 527 region.close(); 528 } finally { 529 // reset back 530 conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue); 531 conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 532 localCacheCompactedBlocksThreshold); 533 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue); 534 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue); 535 } 536 } 537 538 @TestTemplate 539 public void testStoreFileCacheOnWrite() throws IOException { 540 testStoreFileCacheOnWriteInternals(false); 541 testStoreFileCacheOnWriteInternals(true); 542 } 543 544 @TestTemplate 545 public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { 546 testCachingDataBlocksDuringCompactionInternals(false, false, -1); 547 testCachingDataBlocksDuringCompactionInternals(true, true, -1); 548 } 549 550 @TestTemplate 551 public void testCachingDataBlocksThresholdDuringCompaction() 552 throws IOException, InterruptedException { 553 testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD); 554 testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD); 555 } 556 557}