001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.Arrays; 029import java.util.List; 030import java.util.concurrent.CyclicBarrier; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.Waiter; 035import org.apache.hadoop.hbase.io.ByteBuffAllocator; 036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 038import org.apache.hadoop.hbase.io.hfile.BlockType; 039import org.apache.hadoop.hbase.io.hfile.Cacheable; 040import org.apache.hadoop.hbase.io.hfile.HFileBlock; 041import org.apache.hadoop.hbase.io.hfile.HFileContext; 042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread; 044import org.apache.hadoop.hbase.nio.ByteBuff; 045import org.apache.hadoop.hbase.nio.RefCnt; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.junit.jupiter.api.Disabled; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051 052@Tag(IOTests.TAG) 053@Tag(SmallTests.TAG) 054public class TestBucketCacheRefCnt { 055 056 private static final String IO_ENGINE = "offheap"; 057 private static final long CAPACITY_SIZE = 32 * 1024 * 1024; 058 private static final int BLOCK_SIZE = 1024; 059 private static final int[] BLOCK_SIZE_ARRAY = 060 new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 }; 061 private static final String PERSISTENCE_PATH = null; 062 private static final HFileContext CONTEXT = new HFileContextBuilder().build(); 063 064 private BucketCache cache; 065 066 private static BucketCache create(int writerSize, int queueSize) throws IOException { 067 return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 068 queueSize, PERSISTENCE_PATH); 069 } 070 071 private static MyBucketCache createMyBucketCache(int writerSize, int queueSize) 072 throws IOException { 073 return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 074 queueSize, PERSISTENCE_PATH); 075 } 076 077 private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize) 078 throws IOException { 079 return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 080 queueSize, PERSISTENCE_PATH); 081 } 082 083 private static HFileBlock createBlock(int offset, int size) { 084 return createBlock(offset, size, ByteBuffAllocator.HEAP); 085 } 086 087 private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) { 088 return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)), 089 HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc); 090 } 091 092 private static BlockCacheKey createKey(String hfileName, long offset) { 093 return new BlockCacheKey(hfileName, offset); 094 } 095 096 private void disableWriter() { 097 if (cache != null) { 098 for (WriterThread wt : cache.writerThreads) { 099 wt.disableWriter(); 100 wt.interrupt(); 101 } 102 } 103 } 104 105 @Disabled 106 @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082 107 // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> 108 public void testBlockInRAMCache() throws IOException { 109 cache = create(1, 1000); 110 disableWriter(); 111 final String prefix = "testBlockInRamCache"; 112 try { 113 for (int i = 0; i < 10; i++) { 114 HFileBlock blk = createBlock(i, 1020); 115 BlockCacheKey key = createKey(prefix, i); 116 assertEquals(1, blk.refCnt()); 117 cache.cacheBlock(key, blk); 118 assertEquals(i + 1, cache.getBlockCount()); 119 assertEquals(2, blk.refCnt()); 120 121 Cacheable block = cache.getBlock(key, false, false, false); 122 try { 123 assertEquals(3, blk.refCnt()); 124 assertEquals(3, block.refCnt()); 125 assertEquals(blk, block); 126 } finally { 127 block.release(); 128 } 129 assertEquals(2, blk.refCnt()); 130 assertEquals(2, block.refCnt()); 131 } 132 133 for (int i = 0; i < 10; i++) { 134 BlockCacheKey key = createKey(prefix, i); 135 Cacheable blk = cache.getBlock(key, false, false, false); 136 assertEquals(3, blk.refCnt()); 137 assertFalse(blk.release()); 138 assertEquals(2, blk.refCnt()); 139 140 assertTrue(cache.evictBlock(key)); 141 assertEquals(1, blk.refCnt()); 142 assertTrue(blk.release()); 143 assertEquals(0, blk.refCnt()); 144 } 145 } finally { 146 cache.shutdown(); 147 } 148 } 149 150 private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey) 151 throws InterruptedException { 152 while ( 153 !bucketCache.backingMap.containsKey(blockCacheKey) 154 || bucketCache.ramCache.containsKey(blockCacheKey) 155 ) { 156 Thread.sleep(100); 157 } 158 Thread.sleep(1000); 159 } 160 161 @Test 162 public void testBlockInBackingMap() throws Exception { 163 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 164 cache = create(1, 1000); 165 try { 166 HFileBlock blk = createBlock(200, 1020, alloc); 167 BlockCacheKey key = createKey("testHFile-00", 200); 168 cache.cacheBlock(key, blk); 169 waitUntilFlushedToCache(cache, key); 170 assertEquals(1, blk.refCnt()); 171 172 Cacheable block = cache.getBlock(key, false, false, false); 173 assertTrue(block instanceof HFileBlock); 174 assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc); 175 assertEquals(2, block.refCnt()); 176 177 block.retain(); 178 assertEquals(3, block.refCnt()); 179 180 Cacheable newBlock = cache.getBlock(key, false, false, false); 181 assertTrue(newBlock instanceof HFileBlock); 182 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 183 assertEquals(4, newBlock.refCnt()); 184 185 // release the newBlock 186 assertFalse(newBlock.release()); 187 assertEquals(3, newBlock.refCnt()); 188 assertEquals(3, block.refCnt()); 189 190 // Evict the key 191 cache.evictBlock(key); 192 assertEquals(2, block.refCnt()); 193 194 // Evict again, shouldn't change the refCnt. 195 cache.evictBlock(key); 196 assertEquals(2, block.refCnt()); 197 198 assertFalse(block.release()); 199 assertEquals(1, block.refCnt()); 200 201 /** 202 * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache}, 203 * so {@link BucketCache#getBlock} return null. 204 */ 205 Cacheable newestBlock = cache.getBlock(key, false, false, false); 206 assertNull(newestBlock); 207 assertEquals(1, block.refCnt()); 208 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 209 210 // Release the block 211 assertTrue(block.release()); 212 assertEquals(0, block.refCnt()); 213 assertEquals(0, newBlock.refCnt()); 214 } finally { 215 cache.shutdown(); 216 } 217 } 218 219 @Test 220 public void testInBucketCache() throws IOException { 221 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 222 cache = create(1, 1000); 223 try { 224 HFileBlock blk = createBlock(200, 1020, alloc); 225 BlockCacheKey key = createKey("testHFile-00", 200); 226 cache.cacheBlock(key, blk); 227 assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2); 228 229 // wait for block to move to backing map because refCnt get refreshed once block moves to 230 // backing map 231 Waiter.waitFor(HBaseConfiguration.create(), 12000, () -> isRamCacheDrained(key, cache)); 232 233 Cacheable block1 = cache.getBlock(key, false, false, false); 234 assertTrue(block1.refCnt() >= 2); 235 assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc); 236 237 Cacheable block2 = cache.getBlock(key, false, false, false); 238 assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc); 239 assertTrue(block2.refCnt() >= 3); 240 241 cache.evictBlock(key); 242 assertTrue(blk.refCnt() >= 1); 243 assertTrue(block1.refCnt() >= 2); 244 assertTrue(block2.refCnt() >= 2); 245 246 // Get key again 247 Cacheable block3 = cache.getBlock(key, false, false, false); 248 if (block3 != null) { 249 assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc); 250 assertTrue(block3.refCnt() >= 3); 251 assertFalse(block3.release()); 252 } 253 254 blk.release(); 255 boolean ret1 = block1.release(); 256 boolean ret2 = block2.release(); 257 assertTrue(ret1 || ret2); 258 assertEquals(0, blk.refCnt()); 259 assertEquals(0, block1.refCnt()); 260 assertEquals(0, block2.refCnt()); 261 } finally { 262 cache.shutdown(); 263 } 264 } 265 266 private boolean isRamCacheDrained(BlockCacheKey key, BucketCache cache) { 267 return cache.backingMap.containsKey(key) && !cache.ramCache.containsKey(key); 268 } 269 270 @Test 271 public void testMarkStaleAsEvicted() throws Exception { 272 cache = create(1, 1000); 273 try { 274 HFileBlock blk = createBlock(200, 1020); 275 BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200); 276 cache.cacheBlock(key, blk); 277 waitUntilFlushedToCache(cache, key); 278 assertEquals(1, blk.refCnt()); 279 assertNotNull(cache.backingMap.get(key)); 280 assertEquals(1, cache.backingMap.get(key).refCnt()); 281 282 // RPC reference this cache. 283 Cacheable block1 = cache.getBlock(key, false, false, false); 284 assertEquals(2, block1.refCnt()); 285 BucketEntry be1 = cache.backingMap.get(key); 286 assertNotNull(be1); 287 assertEquals(2, be1.refCnt()); 288 289 // We've some RPC reference, so it won't have any effect. 290 assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 291 assertEquals(2, block1.refCnt()); 292 assertEquals(2, cache.backingMap.get(key).refCnt()); 293 294 // Release the RPC reference. 295 block1.release(); 296 assertEquals(1, block1.refCnt()); 297 assertEquals(1, cache.backingMap.get(key).refCnt()); 298 299 // Mark the stale as evicted again, it'll do the de-allocation. 300 assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 301 assertEquals(0, block1.refCnt()); 302 assertNull(cache.backingMap.get(key)); 303 assertEquals(0, cache.size()); 304 } finally { 305 cache.shutdown(); 306 } 307 } 308 309 /** 310 * <pre> 311 * This test is for HBASE-26281, 312 * test two threads for replacing Block and getting Block execute concurrently. 313 * The threads sequence is: 314 * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1. 315 * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied 316 * {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would 317 * replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal} 318 * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 319 * which returned Block1, the {@link RefCnt} of Block1 is 2. 320 * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap}, 321 * the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used 322 * by Thread2 and the content of Block1 would be overwritten after it is freed, which may 323 * cause a serious error. 324 * </pre> 325 */ 326 @Test 327 public void testReplacingBlockAndGettingBlockConcurrently() throws Exception { 328 ByteBuffAllocator byteBuffAllocator = 329 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 330 final MyBucketCache myBucketCache = createMyBucketCache(1, 1000); 331 try { 332 HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 333 final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200); 334 myBucketCache.cacheBlock(blockCacheKey, hfileBlock); 335 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 336 assertEquals(1, hfileBlock.refCnt()); 337 338 assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey)); 339 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 340 Thread cacheBlockThread = new Thread(() -> { 341 try { 342 HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator); 343 myBucketCache.cacheBlock(blockCacheKey, newHFileBlock); 344 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 345 346 } catch (Throwable exception) { 347 exceptionRef.set(exception); 348 } 349 }); 350 cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME); 351 cacheBlockThread.start(); 352 353 String oldThreadName = Thread.currentThread().getName(); 354 HFileBlock gotHFileBlock = null; 355 try { 356 357 Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME); 358 359 gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false)); 360 assertTrue(gotHFileBlock.equals(hfileBlock)); 361 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 362 assertEquals(2, gotHFileBlock.refCnt()); 363 /** 364 * Release the second cyclicBarrier.await in 365 * {@link MyBucketCache#cacheBlockWithWaitInternal} 366 */ 367 myBucketCache.cyclicBarrier.await(); 368 369 } finally { 370 Thread.currentThread().setName(oldThreadName); 371 } 372 373 cacheBlockThread.join(); 374 assertTrue(exceptionRef.get() == null); 375 assertEquals(1, gotHFileBlock.refCnt()); 376 assertTrue(gotHFileBlock.equals(hfileBlock)); 377 assertTrue(myBucketCache.overwiteByteBuff == null); 378 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0); 379 380 gotHFileBlock.release(); 381 assertEquals(0, gotHFileBlock.refCnt()); 382 assertTrue(myBucketCache.overwiteByteBuff != null); 383 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1); 384 assertTrue(myBucketCache.replaceCounter.get() == 1); 385 assertTrue(myBucketCache.blockEvictCounter.get() == 1); 386 } finally { 387 myBucketCache.shutdown(); 388 } 389 390 } 391 392 /** 393 * <pre> 394 * This test also is for HBASE-26281, 395 * test three threads for evicting Block,caching Block and getting Block 396 * execute concurrently. 397 * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap}, 398 * the {@link RefCnt} of Block1 is 1. 399 * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey}, 400 * but stopping after {@link BucketCache#removeFromRamCache}. 401 * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 402 * which returned Block1, the {@link RefCnt} of Block1 is 2. 403 * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove} 404 * returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1 405 * directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3. 406 * </pre> 407 */ 408 @Test 409 public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception { 410 ByteBuffAllocator byteBuffAllocator = 411 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 412 final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000); 413 try { 414 final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 415 final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200); 416 final AtomicReference<Throwable> cacheBlockThreadExceptionRef = 417 new AtomicReference<Throwable>(); 418 Thread cacheBlockThread = new Thread(() -> { 419 try { 420 myBucketCache2.cacheBlock(blockCacheKey, hfileBlock); 421 /** 422 * Wait for Caching Block completed. 423 */ 424 myBucketCache2.writeThreadDoneCyclicBarrier.await(); 425 } catch (Throwable exception) { 426 cacheBlockThreadExceptionRef.set(exception); 427 } 428 }); 429 cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME); 430 cacheBlockThread.start(); 431 432 final AtomicReference<Throwable> evictBlockThreadExceptionRef = 433 new AtomicReference<Throwable>(); 434 Thread evictBlockThread = new Thread(() -> { 435 try { 436 myBucketCache2.evictBlock(blockCacheKey); 437 } catch (Throwable exception) { 438 evictBlockThreadExceptionRef.set(exception); 439 } 440 }); 441 evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME); 442 evictBlockThread.start(); 443 444 String oldThreadName = Thread.currentThread().getName(); 445 HFileBlock gotHFileBlock = null; 446 try { 447 Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME); 448 gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false)); 449 assertTrue(gotHFileBlock.equals(hfileBlock)); 450 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 451 assertEquals(2, gotHFileBlock.refCnt()); 452 try { 453 /** 454 * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for 455 * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread} 456 * could continue. 457 */ 458 myBucketCache2.putCyclicBarrier.await(); 459 } catch (Throwable e) { 460 throw new RuntimeException(e); 461 } 462 463 } finally { 464 Thread.currentThread().setName(oldThreadName); 465 } 466 467 cacheBlockThread.join(); 468 evictBlockThread.join(); 469 assertTrue(cacheBlockThreadExceptionRef.get() == null); 470 assertTrue(evictBlockThreadExceptionRef.get() == null); 471 472 assertTrue(gotHFileBlock.equals(hfileBlock)); 473 assertEquals(1, gotHFileBlock.refCnt()); 474 assertTrue(myBucketCache2.overwiteByteBuff == null); 475 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0); 476 477 gotHFileBlock.release(); 478 assertEquals(0, gotHFileBlock.refCnt()); 479 assertTrue(myBucketCache2.overwiteByteBuff != null); 480 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1); 481 assertTrue(myBucketCache2.blockEvictCounter.get() == 1); 482 } finally { 483 myBucketCache2.shutdown(); 484 } 485 486 } 487 488 static class MyBucketCache extends BucketCache { 489 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 490 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 491 492 private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 493 private final AtomicInteger replaceCounter = new AtomicInteger(0); 494 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 495 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 496 private ByteBuff overwiteByteBuff = null; 497 498 public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 499 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 500 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 501 persistencePath); 502 } 503 504 /** 505 * Simulate the Block could be replaced. 506 */ 507 @Override 508 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 509 replaceCounter.incrementAndGet(); 510 return true; 511 } 512 513 @Override 514 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 515 boolean updateCacheMetrics) { 516 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 517 /** 518 * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal}, 519 * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef} 520 * checking. 521 */ 522 try { 523 cyclicBarrier.await(); 524 } catch (Throwable e) { 525 throw new RuntimeException(e); 526 } 527 } 528 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 529 return result; 530 } 531 532 @Override 533 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 534 boolean inMemory, boolean wait) { 535 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 536 /** 537 * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock} 538 */ 539 try { 540 cyclicBarrier.await(); 541 } catch (Throwable e) { 542 throw new RuntimeException(e); 543 } 544 } 545 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 546 /** 547 * Wait the cyclicBarrier.await() in 548 * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for 549 * {@link MyBucketCache#getBlock} and Assert completed. 550 */ 551 try { 552 cyclicBarrier.await(); 553 } catch (Throwable e) { 554 throw new RuntimeException(e); 555 } 556 } 557 super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 558 } 559 560 @Override 561 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 562 boolean evictedByEvictionProcess) { 563 blockEvictCounter.incrementAndGet(); 564 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 565 } 566 567 /** 568 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 569 * {@link BucketEntry} is freed. 570 */ 571 @Override 572 void freeBucketEntry(BucketEntry bucketEntry) { 573 freeBucketEntryCounter.incrementAndGet(); 574 super.freeBucketEntry(bucketEntry); 575 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 576 try { 577 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 578 } catch (IOException e) { 579 throw new RuntimeException(e); 580 } 581 } 582 } 583 584 static class MyBucketCache2 extends BucketCache { 585 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 586 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 587 private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread"; 588 589 private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2); 590 private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2); 591 private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2); 592 /** 593 * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and 594 * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed. 595 */ 596 private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3); 597 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 598 private final AtomicInteger removeRamCounter = new AtomicInteger(0); 599 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 600 private ByteBuff overwiteByteBuff = null; 601 602 public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 603 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 604 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 605 persistencePath); 606 } 607 608 @Override 609 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 610 super.putIntoBackingMap(key, bucketEntry); 611 /** 612 * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before 613 * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME} 614 */ 615 try { 616 evictCyclicBarrier.await(); 617 } catch (Throwable e) { 618 throw new RuntimeException(e); 619 } 620 621 /** 622 * Wait the cyclicBarrier.await() in 623 * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for 624 * {@link MyBucketCache#getBlock} and Assert completed. 625 */ 626 try { 627 putCyclicBarrier.await(); 628 } catch (Throwable e) { 629 throw new RuntimeException(e); 630 } 631 } 632 633 @Override 634 void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 635 super.doDrain(entries, metaBuff); 636 if (entries.size() > 0) { 637 /** 638 * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and 639 * {@link #EVICT_BLOCK_THREAD_NAME}. 640 */ 641 try { 642 writeThreadDoneCyclicBarrier.await(); 643 } catch (Throwable e) { 644 throw new RuntimeException(e); 645 } 646 } 647 648 } 649 650 @Override 651 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 652 boolean updateCacheMetrics) { 653 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 654 /** 655 * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after 656 * {@link BucketCache#removeFromRamCache}. 657 */ 658 try { 659 getCyclicBarrier.await(); 660 } catch (Throwable e) { 661 throw new RuntimeException(e); 662 } 663 } 664 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 665 return result; 666 } 667 668 @Override 669 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 670 boolean firstTime = false; 671 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 672 int count = this.removeRamCounter.incrementAndGet(); 673 firstTime = (count == 1); 674 if (firstTime) { 675 /** 676 * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after 677 * {@link BucketCache#putIntoBackingMap}. 678 */ 679 try { 680 evictCyclicBarrier.await(); 681 } catch (Throwable e) { 682 throw new RuntimeException(e); 683 } 684 } 685 } 686 boolean result = super.removeFromRamCache(cacheKey); 687 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 688 if (firstTime) { 689 /** 690 * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}. 691 */ 692 try { 693 getCyclicBarrier.await(); 694 } catch (Throwable e) { 695 throw new RuntimeException(e); 696 } 697 /** 698 * Wait for Caching Block completed, after Caching Block completed, evictBlock could 699 * continue. 700 */ 701 try { 702 writeThreadDoneCyclicBarrier.await(); 703 } catch (Throwable e) { 704 throw new RuntimeException(e); 705 } 706 } 707 } 708 709 return result; 710 } 711 712 @Override 713 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 714 boolean evictedByEvictionProcess) { 715 /** 716 * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create 717 * only one {@link BucketCache.WriterThread}. 718 */ 719 assertTrue(Thread.currentThread() == this.writerThreads[0]); 720 721 blockEvictCounter.incrementAndGet(); 722 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 723 } 724 725 /** 726 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 727 * {@link BucketEntry} is freed. 728 */ 729 @Override 730 void freeBucketEntry(BucketEntry bucketEntry) { 731 freeBucketEntryCounter.incrementAndGet(); 732 super.freeBucketEntry(bucketEntry); 733 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 734 try { 735 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 736 } catch (IOException e) { 737 throw new RuntimeException(e); 738 } 739 } 740 } 741 742 private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) { 743 int byteSize = bucketEntry.getLength(); 744 byte[] data = new byte[byteSize]; 745 Arrays.fill(data, (byte) 0xff); 746 return ByteBuff.wrap(ByteBuffer.wrap(data)); 747 } 748}