001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.EnumMap; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.ArrayBackedTag; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.Tag; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Durability; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.fs.HFileSystem; 051import org.apache.hadoop.hbase.io.compress.Compression; 052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 054import org.apache.hadoop.hbase.regionserver.BloomType; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 057import org.apache.hadoop.hbase.testclassification.IOTests; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.util.BloomFilterFactory; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.ChecksumType; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.Pair; 064import org.junit.After; 065import org.junit.AfterClass; 066import org.junit.Before; 067import org.junit.ClassRule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.runner.RunWith; 071import org.junit.runners.Parameterized; 072import org.junit.runners.Parameterized.Parameters; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 077import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 078 079/** 080 * Tests {@link HFile} cache-on-write functionality for the following block 081 * types: data blocks, non-root index blocks, and Bloom filter blocks. 082 */ 083@RunWith(Parameterized.class) 084@Category({IOTests.class, LargeTests.class}) 085public class TestCacheOnWrite { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestCacheOnWrite.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class); 092 093 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 094 private Configuration conf; 095 private CacheConfig cacheConf; 096 private FileSystem fs; 097 private Random rand = new Random(12983177L); 098 private Path storeFilePath; 099 private BlockCache blockCache; 100 private String testDescription; 101 102 private final CacheOnWriteType cowType; 103 private final Compression.Algorithm compress; 104 private final boolean cacheCompressedData; 105 106 private static final int DATA_BLOCK_SIZE = 2048; 107 private static final int NUM_KV = 25000; 108 private static final int INDEX_BLOCK_SIZE = 512; 109 private static final int BLOOM_BLOCK_SIZE = 4096; 110 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; 111 private static final int CKBYTES = 512; 112 113 114 private static final Set<BlockType> INDEX_BLOCK_TYPES = ImmutableSet.of( 115 BlockType.INDEX_V1, 116 BlockType.INTERMEDIATE_INDEX, 117 BlockType.ROOT_INDEX, 118 BlockType.LEAF_INDEX 119 ); 120 private static final Set<BlockType> BLOOM_BLOCK_TYPES = ImmutableSet.of( 121 BlockType.BLOOM_CHUNK, 122 BlockType.GENERAL_BLOOM_META, 123 BlockType.DELETE_FAMILY_BLOOM_META 124 ); 125 private static final Set<BlockType> DATA_BLOCK_TYPES = ImmutableSet.of( 126 BlockType.ENCODED_DATA, 127 BlockType.DATA 128 ); 129 130 // All test cases are supposed to generate files for compaction within this range 131 private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L; 132 private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L; 133 134 /** The number of valid key types possible in a store file */ 135 private static final int NUM_VALID_KEY_TYPES = 136 KeyValue.Type.values().length - 2; 137 138 private enum CacheOnWriteType { 139 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, 140 BlockType.DATA, BlockType.ENCODED_DATA), 141 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 142 BlockType.BLOOM_CHUNK), 143 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 144 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX); 145 146 private final String confKey; 147 private final BlockType blockType1; 148 private final BlockType blockType2; 149 150 CacheOnWriteType(String confKey, BlockType blockType) { 151 this(confKey, blockType, blockType); 152 } 153 154 CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2) { 155 this.blockType1 = blockType1; 156 this.blockType2 = blockType2; 157 this.confKey = confKey; 158 } 159 160 public boolean shouldBeCached(BlockType blockType) { 161 return blockType == blockType1 || blockType == blockType2; 162 } 163 164 public void modifyConf(Configuration conf) { 165 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 166 conf.setBoolean(cowType.confKey, cowType == this); 167 } 168 } 169 } 170 171 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, 172 boolean cacheCompressedData, BlockCache blockCache) { 173 this.cowType = cowType; 174 this.compress = compress; 175 this.cacheCompressedData = cacheCompressedData; 176 this.blockCache = blockCache; 177 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + 178 ", cacheCompressedData=" + cacheCompressedData + "]"; 179 LOG.info(testDescription); 180 } 181 182 private static List<BlockCache> getBlockCaches() throws IOException { 183 Configuration conf = TEST_UTIL.getConfiguration(); 184 List<BlockCache> blockcaches = new ArrayList<>(); 185 // default 186 blockcaches.add(BlockCacheFactory.createBlockCache(conf)); 187 188 //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 189 TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f); 190 // memory 191 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); 192 blockcaches.add(lru); 193 194 // bucket cache 195 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); 196 int[] bucketSizes = 197 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; 198 BlockCache bucketcache = 199 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); 200 blockcaches.add(bucketcache); 201 return blockcaches; 202 } 203 204 @Parameters 205 public static Collection<Object[]> getParameters() throws IOException { 206 List<Object[]> params = new ArrayList<>(); 207 for (BlockCache blockCache : getBlockCaches()) { 208 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 209 for (Compression.Algorithm compress : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) { 210 for (boolean cacheCompressedData : new boolean[] { false, true }) { 211 params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); 212 } 213 } 214 } 215 } 216 return params; 217 } 218 219 private void clearBlockCache(BlockCache blockCache) throws InterruptedException { 220 if (blockCache instanceof LruBlockCache) { 221 ((LruBlockCache) blockCache).clearCache(); 222 } else { 223 // BucketCache may not return all cached blocks(blocks in write queue), so check it here. 224 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { 225 if (clearCount > 0) { 226 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " 227 + blockCache.getBlockCount() + " blocks remaining"); 228 Thread.sleep(10); 229 } 230 for (CachedBlock block : Lists.newArrayList(blockCache)) { 231 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); 232 // CombinedBucketCache may need evict two times. 233 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { 234 if (evictCount > 1) { 235 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount 236 + " times, maybe a bug here"); 237 } 238 } 239 } 240 } 241 } 242 } 243 244 @Before 245 public void setUp() throws IOException { 246 conf = TEST_UTIL.getConfiguration(); 247 this.conf.set("dfs.datanode.data.dir.perm", "700"); 248 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); 249 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); 250 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); 251 cowType.modifyConf(conf); 252 conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA)); 253 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 254 cowType.shouldBeCached(BlockType.LEAF_INDEX)); 255 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 256 cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); 257 cacheConf = new CacheConfig(conf, blockCache); 258 fs = HFileSystem.get(conf); 259 } 260 261 @After 262 public void tearDown() throws IOException, InterruptedException { 263 clearBlockCache(blockCache); 264 } 265 266 @AfterClass 267 public static void afterClass() throws IOException { 268 TEST_UTIL.cleanupTestDir(); 269 } 270 271 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { 272 writeStoreFile(useTags); 273 readStoreFile(useTags); 274 } 275 276 private void readStoreFile(boolean useTags) throws IOException { 277 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 278 LOG.info("HFile information: " + reader); 279 HFileContext meta = new HFileContextBuilder().withCompression(compress) 280 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) 281 .withBlockSize(DATA_BLOCK_SIZE) 282 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 283 .withIncludesTags(useTags).build(); 284 final boolean cacheBlocks = false; 285 final boolean pread = false; 286 HFileScanner scanner = reader.getScanner(cacheBlocks, pread); 287 assertTrue(testDescription, scanner.seekTo()); 288 289 long offset = 0; 290 EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class); 291 292 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); 293 List<Long> cachedBlocksOffset = new ArrayList<>(); 294 Map<Long, Pair<HFileBlock, HFileBlock>> cachedBlocks = new HashMap<>(); 295 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 296 // Flags: don't cache the block, use pread, this is not a compaction. 297 // Also, pass null for expected block type to avoid checking it. 298 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, 299 encodingInCache); 300 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 301 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); 302 boolean isCached = fromCache != null; 303 cachedBlocksOffset.add(offset); 304 cachedBlocks.put(offset, fromCache == null ? null : Pair.newPair(block, fromCache)); 305 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); 306 assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + 307 "isCached: " + isCached + "\n" + 308 "Test description: " + testDescription + "\n" + 309 "block: " + block + "\n" + 310 "encodingInCache: " + encodingInCache + "\n" + 311 "blockCacheKey: " + blockCacheKey, 312 shouldBeCached == isCached); 313 if (isCached) { 314 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { 315 if (compress != Compression.Algorithm.NONE) { 316 assertFalse(fromCache.isUnpacked()); 317 } 318 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); 319 } else { 320 assertTrue(fromCache.isUnpacked()); 321 } 322 // block we cached at write-time and block read from file should be identical 323 assertEquals(block.getChecksumType(), fromCache.getChecksumType()); 324 assertEquals(block.getBlockType(), fromCache.getBlockType()); 325 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); 326 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); 327 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); 328 assertEquals( 329 block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); 330 } 331 offset += block.getOnDiskSizeWithHeader(); 332 BlockType bt = block.getBlockType(); 333 Integer count = blockCountByType.get(bt); 334 blockCountByType.put(bt, (count == null ? 0 : count) + 1); 335 } 336 337 LOG.info("Block count by type: " + blockCountByType); 338 String countByType = blockCountByType.toString(); 339 if (useTags) { 340 assertEquals("{" + BlockType.DATA 341 + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType); 342 } else { 343 assertEquals("{" + BlockType.DATA 344 + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType); 345 } 346 347 // iterate all the keyvalue from hfile 348 while (scanner.next()) { 349 scanner.getCell(); 350 } 351 Iterator<Long> iterator = cachedBlocksOffset.iterator(); 352 while(iterator.hasNext()) { 353 Long entry = iterator.next(); 354 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), 355 entry); 356 Pair<HFileBlock, HFileBlock> blockPair = cachedBlocks.get(entry); 357 if (blockPair != null) { 358 // Call return twice because for the isCache cased the counter would have got incremented 359 // twice. Notice that here we need to returnBlock with different blocks. see comments in 360 // BucketCache#returnBlock. 361 blockPair.getSecond().release(); 362 if (cacheCompressedData) { 363 if (this.compress == Compression.Algorithm.NONE 364 || cowType == CacheOnWriteType.INDEX_BLOCKS 365 || cowType == CacheOnWriteType.BLOOM_BLOCKS) { 366 blockPair.getFirst().release(); 367 } 368 } else { 369 blockPair.getFirst().release(); 370 } 371 } 372 } 373 scanner.shipped(); 374 reader.close(); 375 } 376 377 public static KeyValue.Type generateKeyType(Random rand) { 378 if (rand.nextBoolean()) { 379 // Let's make half of KVs puts. 380 return KeyValue.Type.Put; 381 } else { 382 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 383 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 384 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 385 + "Probably the layout of KeyValue.Type has changed."); 386 } 387 return keyType; 388 } 389 } 390 391 private void writeStoreFile(boolean useTags) throws IOException { 392 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), 393 "test_cache_on_write"); 394 HFileContext meta = new HFileContextBuilder().withCompression(compress) 395 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) 396 .withBlockSize(DATA_BLOCK_SIZE) 397 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 398 .withIncludesTags(useTags).build(); 399 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 400 .withOutputDir(storeFileParentDir) 401 .withFileContext(meta) 402 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); 403 byte[] cf = Bytes.toBytes("fam"); 404 for (int i = 0; i < NUM_KV; ++i) { 405 byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i); 406 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); 407 byte[] value = RandomKeyValueUtil.randomValue(rand); 408 KeyValue kv; 409 if(useTags) { 410 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 411 List<Tag> tagList = new ArrayList<>(); 412 tagList.add(t); 413 Tag[] tags = new Tag[1]; 414 tags[0] = t; 415 kv = 416 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 417 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList); 418 } else { 419 kv = 420 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 421 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length); 422 } 423 sfw.append(kv); 424 } 425 426 sfw.close(); 427 storeFilePath = sfw.getPath(); 428 } 429 430 private void testCachingDataBlocksDuringCompactionInternals(boolean useTags, 431 boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold) 432 throws IOException, InterruptedException { 433 // create a localConf 434 boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false); 435 long localCacheCompactedBlocksThreshold = conf 436 .getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 437 CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD); 438 boolean localCacheBloomBlocksValue = conf 439 .getBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 440 CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE); 441 boolean localCacheIndexBlocksValue = conf 442 .getBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 443 CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE); 444 445 try { 446 // Set the conf if testing caching compacted blocks on write 447 conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, 448 cacheBlocksOnCompaction); 449 450 // set size threshold if testing compaction size threshold 451 if (cacheBlocksOnCompactionThreshold > 0) { 452 conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 453 cacheBlocksOnCompactionThreshold); 454 } 455 456 // TODO: need to change this test if we add a cache size threshold for 457 // compactions, or if we implement some other kind of intelligent logic for 458 // deciding what blocks to cache-on-write on compaction. 459 final String table = "CompactionCacheOnWrite"; 460 final String cf = "myCF"; 461 final byte[] cfBytes = Bytes.toBytes(cf); 462 final int maxVersions = 3; 463 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder 464 .newBuilder(cfBytes) 465 .setCompressionType(compress) 466 .setBloomFilterType(BLOOM_TYPE) 467 .setMaxVersions(maxVersions) 468 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 469 .build(); 470 HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache); 471 int rowIdx = 0; 472 long ts = EnvironmentEdgeManager.currentTime(); 473 for (int iFile = 0; iFile < 5; ++iFile) { 474 for (int iRow = 0; iRow < 500; ++iRow) { 475 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow; 476 Put p = new Put(Bytes.toBytes(rowStr)); 477 ++rowIdx; 478 for (int iCol = 0; iCol < 10; ++iCol) { 479 String qualStr = "col" + iCol; 480 String valueStr = "value_" + rowStr + "_" + qualStr; 481 for (int iTS = 0; iTS < 5; ++iTS) { 482 if (useTags) { 483 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 484 Tag[] tags = new Tag[1]; 485 tags[0] = t; 486 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 487 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); 488 p.add(kv); 489 } else { 490 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 491 ts++, Bytes.toBytes(valueStr)); 492 p.add(kv); 493 } 494 } 495 } 496 p.setDurability(Durability.ASYNC_WAL); 497 region.put(p); 498 } 499 region.flush(true); 500 } 501 502 clearBlockCache(blockCache); 503 assertEquals(0, blockCache.getBlockCount()); 504 505 region.compact(false); 506 LOG.debug("compactStores() returned"); 507 508 boolean dataBlockCached = false; 509 boolean bloomBlockCached = false; 510 boolean indexBlockCached = false; 511 512 for (CachedBlock block : blockCache) { 513 if (DATA_BLOCK_TYPES.contains(block.getBlockType())) { 514 dataBlockCached = true; 515 } else if (BLOOM_BLOCK_TYPES.contains(block.getBlockType())) { 516 bloomBlockCached = true; 517 } else if (INDEX_BLOCK_TYPES.contains(block.getBlockType())) { 518 indexBlockCached = true; 519 } 520 } 521 522 // Data blocks should be cached in instances where we are caching blocks on write. In the case 523 // of testing 524 // BucketCache, we cannot verify block type as it is not stored in the cache. 525 boolean cacheOnCompactAndNonBucketCache = cacheBlocksOnCompaction 526 && !(blockCache instanceof BucketCache); 527 528 String assertErrorMessage = "\nTest description: " + testDescription + 529 "\ncacheBlocksOnCompaction: " 530 + cacheBlocksOnCompaction + "\n"; 531 532 if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) { 533 if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) { 534 assertTrue(assertErrorMessage, dataBlockCached); 535 assertTrue(assertErrorMessage, bloomBlockCached); 536 assertTrue(assertErrorMessage, indexBlockCached); 537 } else { 538 assertFalse(assertErrorMessage, dataBlockCached); 539 540 if (localCacheBloomBlocksValue) { 541 assertTrue(assertErrorMessage, bloomBlockCached); 542 } else { 543 assertFalse(assertErrorMessage, bloomBlockCached); 544 } 545 546 if (localCacheIndexBlocksValue) { 547 assertTrue(assertErrorMessage, indexBlockCached); 548 } else { 549 assertFalse(assertErrorMessage, indexBlockCached); 550 } 551 } 552 } else { 553 assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached); 554 555 if (cacheOnCompactAndNonBucketCache) { 556 assertTrue(assertErrorMessage, bloomBlockCached); 557 assertTrue(assertErrorMessage, indexBlockCached); 558 } 559 } 560 561 562 region.close(); 563 } finally { 564 // reset back 565 conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue); 566 conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 567 localCacheCompactedBlocksThreshold); 568 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue); 569 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue); 570 } 571 } 572 573 @Test 574 public void testStoreFileCacheOnWrite() throws IOException { 575 testStoreFileCacheOnWriteInternals(false); 576 testStoreFileCacheOnWriteInternals(true); 577 } 578 579 @Test 580 public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { 581 testCachingDataBlocksDuringCompactionInternals(false, false, -1); 582 testCachingDataBlocksDuringCompactionInternals(true, true, -1); 583 } 584 585 @Test 586 public void testCachingDataBlocksThresholdDuringCompaction() 587 throws IOException, InterruptedException { 588 testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD); 589 testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD); 590 } 591 592}