001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.Arrays; 029import java.util.List; 030import java.util.concurrent.CyclicBarrier; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.Waiter; 035import org.apache.hadoop.hbase.io.ByteBuffAllocator; 036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 038import org.apache.hadoop.hbase.io.hfile.BlockType; 039import org.apache.hadoop.hbase.io.hfile.Cacheable; 040import org.apache.hadoop.hbase.io.hfile.HFileBlock; 041import org.apache.hadoop.hbase.io.hfile.HFileContext; 042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread; 044import org.apache.hadoop.hbase.nio.ByteBuff; 045import org.apache.hadoop.hbase.nio.RefCnt; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.Test; 050 051@Tag(IOTests.TAG) 052@Tag(SmallTests.TAG) 053public class TestBucketCacheRefCnt { 054 055 private static final String IO_ENGINE = "offheap"; 056 private static final long CAPACITY_SIZE = 32 * 1024 * 1024; 057 private static final int BLOCK_SIZE = 1024; 058 private static final int[] BLOCK_SIZE_ARRAY = 059 new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 }; 060 private static final String PERSISTENCE_PATH = null; 061 private static final HFileContext CONTEXT = new HFileContextBuilder().build(); 062 063 private BucketCache cache; 064 065 private static BucketCache create(int writerSize, int queueSize) throws IOException { 066 return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 067 queueSize, PERSISTENCE_PATH); 068 } 069 070 private static MyBucketCache createMyBucketCache(int writerSize, int queueSize) 071 throws IOException { 072 return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 073 queueSize, PERSISTENCE_PATH); 074 } 075 076 private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize) 077 throws IOException { 078 return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 079 queueSize, PERSISTENCE_PATH); 080 } 081 082 private static HFileBlock createBlock(int offset, int size) { 083 return createBlock(offset, size, ByteBuffAllocator.HEAP); 084 } 085 086 private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) { 087 return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)), 088 HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc); 089 } 090 091 private static BlockCacheKey createKey(String hfileName, long offset) { 092 return new BlockCacheKey(hfileName, offset); 093 } 094 095 private void disableWriter() { 096 if (cache != null) { 097 for (WriterThread wt : cache.writerThreads) { 098 wt.disableWriter(); 099 wt.interrupt(); 100 } 101 } 102 } 103 104 @org.junit.Ignore 105 @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082 106 // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> 107 public void testBlockInRAMCache() throws IOException { 108 cache = create(1, 1000); 109 disableWriter(); 110 final String prefix = "testBlockInRamCache"; 111 try { 112 for (int i = 0; i < 10; i++) { 113 HFileBlock blk = createBlock(i, 1020); 114 BlockCacheKey key = createKey(prefix, i); 115 assertEquals(1, blk.refCnt()); 116 cache.cacheBlock(key, blk); 117 assertEquals(i + 1, cache.getBlockCount()); 118 assertEquals(2, blk.refCnt()); 119 120 Cacheable block = cache.getBlock(key, false, false, false); 121 try { 122 assertEquals(3, blk.refCnt()); 123 assertEquals(3, block.refCnt()); 124 assertEquals(blk, block); 125 } finally { 126 block.release(); 127 } 128 assertEquals(2, blk.refCnt()); 129 assertEquals(2, block.refCnt()); 130 } 131 132 for (int i = 0; i < 10; i++) { 133 BlockCacheKey key = createKey(prefix, i); 134 Cacheable blk = cache.getBlock(key, false, false, false); 135 assertEquals(3, blk.refCnt()); 136 assertFalse(blk.release()); 137 assertEquals(2, blk.refCnt()); 138 139 assertTrue(cache.evictBlock(key)); 140 assertEquals(1, blk.refCnt()); 141 assertTrue(blk.release()); 142 assertEquals(0, blk.refCnt()); 143 } 144 } finally { 145 cache.shutdown(); 146 } 147 } 148 149 private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey) 150 throws InterruptedException { 151 while ( 152 !bucketCache.backingMap.containsKey(blockCacheKey) 153 || bucketCache.ramCache.containsKey(blockCacheKey) 154 ) { 155 Thread.sleep(100); 156 } 157 Thread.sleep(1000); 158 } 159 160 @Test 161 public void testBlockInBackingMap() throws Exception { 162 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 163 cache = create(1, 1000); 164 try { 165 HFileBlock blk = createBlock(200, 1020, alloc); 166 BlockCacheKey key = createKey("testHFile-00", 200); 167 cache.cacheBlock(key, blk); 168 waitUntilFlushedToCache(cache, key); 169 assertEquals(1, blk.refCnt()); 170 171 Cacheable block = cache.getBlock(key, false, false, false); 172 assertTrue(block instanceof HFileBlock); 173 assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc); 174 assertEquals(2, block.refCnt()); 175 176 block.retain(); 177 assertEquals(3, block.refCnt()); 178 179 Cacheable newBlock = cache.getBlock(key, false, false, false); 180 assertTrue(newBlock instanceof HFileBlock); 181 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 182 assertEquals(4, newBlock.refCnt()); 183 184 // release the newBlock 185 assertFalse(newBlock.release()); 186 assertEquals(3, newBlock.refCnt()); 187 assertEquals(3, block.refCnt()); 188 189 // Evict the key 190 cache.evictBlock(key); 191 assertEquals(2, block.refCnt()); 192 193 // Evict again, shouldn't change the refCnt. 194 cache.evictBlock(key); 195 assertEquals(2, block.refCnt()); 196 197 assertFalse(block.release()); 198 assertEquals(1, block.refCnt()); 199 200 /** 201 * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache}, 202 * so {@link BucketCache#getBlock} return null. 203 */ 204 Cacheable newestBlock = cache.getBlock(key, false, false, false); 205 assertNull(newestBlock); 206 assertEquals(1, block.refCnt()); 207 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 208 209 // Release the block 210 assertTrue(block.release()); 211 assertEquals(0, block.refCnt()); 212 assertEquals(0, newBlock.refCnt()); 213 } finally { 214 cache.shutdown(); 215 } 216 } 217 218 @Test 219 public void testInBucketCache() throws IOException { 220 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 221 cache = create(1, 1000); 222 try { 223 HFileBlock blk = createBlock(200, 1020, alloc); 224 BlockCacheKey key = createKey("testHFile-00", 200); 225 cache.cacheBlock(key, blk); 226 assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2); 227 228 // wait for block to move to backing map because refCnt get refreshed once block moves to 229 // backing map 230 Waiter.waitFor(HBaseConfiguration.create(), 12000, () -> isRamCacheDrained(key, cache)); 231 232 Cacheable block1 = cache.getBlock(key, false, false, false); 233 assertTrue(block1.refCnt() >= 2); 234 assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc); 235 236 Cacheable block2 = cache.getBlock(key, false, false, false); 237 assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc); 238 assertTrue(block2.refCnt() >= 3); 239 240 cache.evictBlock(key); 241 assertTrue(blk.refCnt() >= 1); 242 assertTrue(block1.refCnt() >= 2); 243 assertTrue(block2.refCnt() >= 2); 244 245 // Get key again 246 Cacheable block3 = cache.getBlock(key, false, false, false); 247 if (block3 != null) { 248 assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc); 249 assertTrue(block3.refCnt() >= 3); 250 assertFalse(block3.release()); 251 } 252 253 blk.release(); 254 boolean ret1 = block1.release(); 255 boolean ret2 = block2.release(); 256 assertTrue(ret1 || ret2); 257 assertEquals(0, blk.refCnt()); 258 assertEquals(0, block1.refCnt()); 259 assertEquals(0, block2.refCnt()); 260 } finally { 261 cache.shutdown(); 262 } 263 } 264 265 private boolean isRamCacheDrained(BlockCacheKey key, BucketCache cache) { 266 return cache.backingMap.containsKey(key) && !cache.ramCache.containsKey(key); 267 } 268 269 @Test 270 public void testMarkStaleAsEvicted() throws Exception { 271 cache = create(1, 1000); 272 try { 273 HFileBlock blk = createBlock(200, 1020); 274 BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200); 275 cache.cacheBlock(key, blk); 276 waitUntilFlushedToCache(cache, key); 277 assertEquals(1, blk.refCnt()); 278 assertNotNull(cache.backingMap.get(key)); 279 assertEquals(1, cache.backingMap.get(key).refCnt()); 280 281 // RPC reference this cache. 282 Cacheable block1 = cache.getBlock(key, false, false, false); 283 assertEquals(2, block1.refCnt()); 284 BucketEntry be1 = cache.backingMap.get(key); 285 assertNotNull(be1); 286 assertEquals(2, be1.refCnt()); 287 288 // We've some RPC reference, so it won't have any effect. 289 assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 290 assertEquals(2, block1.refCnt()); 291 assertEquals(2, cache.backingMap.get(key).refCnt()); 292 293 // Release the RPC reference. 294 block1.release(); 295 assertEquals(1, block1.refCnt()); 296 assertEquals(1, cache.backingMap.get(key).refCnt()); 297 298 // Mark the stale as evicted again, it'll do the de-allocation. 299 assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 300 assertEquals(0, block1.refCnt()); 301 assertNull(cache.backingMap.get(key)); 302 assertEquals(0, cache.size()); 303 } finally { 304 cache.shutdown(); 305 } 306 } 307 308 /** 309 * <pre> 310 * This test is for HBASE-26281, 311 * test two threads for replacing Block and getting Block execute concurrently. 312 * The threads sequence is: 313 * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1. 314 * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied 315 * {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would 316 * replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal} 317 * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 318 * which returned Block1, the {@link RefCnt} of Block1 is 2. 319 * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap}, 320 * the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used 321 * by Thread2 and the content of Block1 would be overwritten after it is freed, which may 322 * cause a serious error. 323 * </pre> 324 */ 325 @Test 326 public void testReplacingBlockAndGettingBlockConcurrently() throws Exception { 327 ByteBuffAllocator byteBuffAllocator = 328 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 329 final MyBucketCache myBucketCache = createMyBucketCache(1, 1000); 330 try { 331 HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 332 final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200); 333 myBucketCache.cacheBlock(blockCacheKey, hfileBlock); 334 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 335 assertEquals(1, hfileBlock.refCnt()); 336 337 assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey)); 338 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 339 Thread cacheBlockThread = new Thread(() -> { 340 try { 341 HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator); 342 myBucketCache.cacheBlock(blockCacheKey, newHFileBlock); 343 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 344 345 } catch (Throwable exception) { 346 exceptionRef.set(exception); 347 } 348 }); 349 cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME); 350 cacheBlockThread.start(); 351 352 String oldThreadName = Thread.currentThread().getName(); 353 HFileBlock gotHFileBlock = null; 354 try { 355 356 Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME); 357 358 gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false)); 359 assertTrue(gotHFileBlock.equals(hfileBlock)); 360 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 361 assertEquals(2, gotHFileBlock.refCnt()); 362 /** 363 * Release the second cyclicBarrier.await in 364 * {@link MyBucketCache#cacheBlockWithWaitInternal} 365 */ 366 myBucketCache.cyclicBarrier.await(); 367 368 } finally { 369 Thread.currentThread().setName(oldThreadName); 370 } 371 372 cacheBlockThread.join(); 373 assertTrue(exceptionRef.get() == null); 374 assertEquals(1, gotHFileBlock.refCnt()); 375 assertTrue(gotHFileBlock.equals(hfileBlock)); 376 assertTrue(myBucketCache.overwiteByteBuff == null); 377 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0); 378 379 gotHFileBlock.release(); 380 assertEquals(0, gotHFileBlock.refCnt()); 381 assertTrue(myBucketCache.overwiteByteBuff != null); 382 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1); 383 assertTrue(myBucketCache.replaceCounter.get() == 1); 384 assertTrue(myBucketCache.blockEvictCounter.get() == 1); 385 } finally { 386 myBucketCache.shutdown(); 387 } 388 389 } 390 391 /** 392 * <pre> 393 * This test also is for HBASE-26281, 394 * test three threads for evicting Block,caching Block and getting Block 395 * execute concurrently. 396 * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap}, 397 * the {@link RefCnt} of Block1 is 1. 398 * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey}, 399 * but stopping after {@link BucketCache#removeFromRamCache}. 400 * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 401 * which returned Block1, the {@link RefCnt} of Block1 is 2. 402 * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove} 403 * returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1 404 * directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3. 405 * </pre> 406 */ 407 @Test 408 public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception { 409 ByteBuffAllocator byteBuffAllocator = 410 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 411 final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000); 412 try { 413 final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 414 final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200); 415 final AtomicReference<Throwable> cacheBlockThreadExceptionRef = 416 new AtomicReference<Throwable>(); 417 Thread cacheBlockThread = new Thread(() -> { 418 try { 419 myBucketCache2.cacheBlock(blockCacheKey, hfileBlock); 420 /** 421 * Wait for Caching Block completed. 422 */ 423 myBucketCache2.writeThreadDoneCyclicBarrier.await(); 424 } catch (Throwable exception) { 425 cacheBlockThreadExceptionRef.set(exception); 426 } 427 }); 428 cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME); 429 cacheBlockThread.start(); 430 431 final AtomicReference<Throwable> evictBlockThreadExceptionRef = 432 new AtomicReference<Throwable>(); 433 Thread evictBlockThread = new Thread(() -> { 434 try { 435 myBucketCache2.evictBlock(blockCacheKey); 436 } catch (Throwable exception) { 437 evictBlockThreadExceptionRef.set(exception); 438 } 439 }); 440 evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME); 441 evictBlockThread.start(); 442 443 String oldThreadName = Thread.currentThread().getName(); 444 HFileBlock gotHFileBlock = null; 445 try { 446 Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME); 447 gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false)); 448 assertTrue(gotHFileBlock.equals(hfileBlock)); 449 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 450 assertEquals(2, gotHFileBlock.refCnt()); 451 try { 452 /** 453 * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for 454 * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread} 455 * could continue. 456 */ 457 myBucketCache2.putCyclicBarrier.await(); 458 } catch (Throwable e) { 459 throw new RuntimeException(e); 460 } 461 462 } finally { 463 Thread.currentThread().setName(oldThreadName); 464 } 465 466 cacheBlockThread.join(); 467 evictBlockThread.join(); 468 assertTrue(cacheBlockThreadExceptionRef.get() == null); 469 assertTrue(evictBlockThreadExceptionRef.get() == null); 470 471 assertTrue(gotHFileBlock.equals(hfileBlock)); 472 assertEquals(1, gotHFileBlock.refCnt()); 473 assertTrue(myBucketCache2.overwiteByteBuff == null); 474 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0); 475 476 gotHFileBlock.release(); 477 assertEquals(0, gotHFileBlock.refCnt()); 478 assertTrue(myBucketCache2.overwiteByteBuff != null); 479 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1); 480 assertTrue(myBucketCache2.blockEvictCounter.get() == 1); 481 } finally { 482 myBucketCache2.shutdown(); 483 } 484 485 } 486 487 static class MyBucketCache extends BucketCache { 488 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 489 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 490 491 private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 492 private final AtomicInteger replaceCounter = new AtomicInteger(0); 493 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 494 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 495 private ByteBuff overwiteByteBuff = null; 496 497 public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 498 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 499 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 500 persistencePath); 501 } 502 503 /** 504 * Simulate the Block could be replaced. 505 */ 506 @Override 507 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 508 replaceCounter.incrementAndGet(); 509 return true; 510 } 511 512 @Override 513 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 514 boolean updateCacheMetrics) { 515 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 516 /** 517 * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal}, 518 * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef} 519 * checking. 520 */ 521 try { 522 cyclicBarrier.await(); 523 } catch (Throwable e) { 524 throw new RuntimeException(e); 525 } 526 } 527 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 528 return result; 529 } 530 531 @Override 532 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 533 boolean inMemory, boolean wait) { 534 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 535 /** 536 * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock} 537 */ 538 try { 539 cyclicBarrier.await(); 540 } catch (Throwable e) { 541 throw new RuntimeException(e); 542 } 543 } 544 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 545 /** 546 * Wait the cyclicBarrier.await() in 547 * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for 548 * {@link MyBucketCache#getBlock} and Assert completed. 549 */ 550 try { 551 cyclicBarrier.await(); 552 } catch (Throwable e) { 553 throw new RuntimeException(e); 554 } 555 } 556 super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 557 } 558 559 @Override 560 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 561 boolean evictedByEvictionProcess) { 562 blockEvictCounter.incrementAndGet(); 563 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 564 } 565 566 /** 567 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 568 * {@link BucketEntry} is freed. 569 */ 570 @Override 571 void freeBucketEntry(BucketEntry bucketEntry) { 572 freeBucketEntryCounter.incrementAndGet(); 573 super.freeBucketEntry(bucketEntry); 574 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 575 try { 576 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 577 } catch (IOException e) { 578 throw new RuntimeException(e); 579 } 580 } 581 } 582 583 static class MyBucketCache2 extends BucketCache { 584 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 585 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 586 private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread"; 587 588 private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2); 589 private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2); 590 private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2); 591 /** 592 * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and 593 * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed. 594 */ 595 private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3); 596 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 597 private final AtomicInteger removeRamCounter = new AtomicInteger(0); 598 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 599 private ByteBuff overwiteByteBuff = null; 600 601 public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 602 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 603 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 604 persistencePath); 605 } 606 607 @Override 608 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 609 super.putIntoBackingMap(key, bucketEntry); 610 /** 611 * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before 612 * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME} 613 */ 614 try { 615 evictCyclicBarrier.await(); 616 } catch (Throwable e) { 617 throw new RuntimeException(e); 618 } 619 620 /** 621 * Wait the cyclicBarrier.await() in 622 * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for 623 * {@link MyBucketCache#getBlock} and Assert completed. 624 */ 625 try { 626 putCyclicBarrier.await(); 627 } catch (Throwable e) { 628 throw new RuntimeException(e); 629 } 630 } 631 632 @Override 633 void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 634 super.doDrain(entries, metaBuff); 635 if (entries.size() > 0) { 636 /** 637 * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and 638 * {@link #EVICT_BLOCK_THREAD_NAME}. 639 */ 640 try { 641 writeThreadDoneCyclicBarrier.await(); 642 } catch (Throwable e) { 643 throw new RuntimeException(e); 644 } 645 } 646 647 } 648 649 @Override 650 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 651 boolean updateCacheMetrics) { 652 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 653 /** 654 * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after 655 * {@link BucketCache#removeFromRamCache}. 656 */ 657 try { 658 getCyclicBarrier.await(); 659 } catch (Throwable e) { 660 throw new RuntimeException(e); 661 } 662 } 663 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 664 return result; 665 } 666 667 @Override 668 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 669 boolean firstTime = false; 670 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 671 int count = this.removeRamCounter.incrementAndGet(); 672 firstTime = (count == 1); 673 if (firstTime) { 674 /** 675 * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after 676 * {@link BucketCache#putIntoBackingMap}. 677 */ 678 try { 679 evictCyclicBarrier.await(); 680 } catch (Throwable e) { 681 throw new RuntimeException(e); 682 } 683 } 684 } 685 boolean result = super.removeFromRamCache(cacheKey); 686 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 687 if (firstTime) { 688 /** 689 * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}. 690 */ 691 try { 692 getCyclicBarrier.await(); 693 } catch (Throwable e) { 694 throw new RuntimeException(e); 695 } 696 /** 697 * Wait for Caching Block completed, after Caching Block completed, evictBlock could 698 * continue. 699 */ 700 try { 701 writeThreadDoneCyclicBarrier.await(); 702 } catch (Throwable e) { 703 throw new RuntimeException(e); 704 } 705 } 706 } 707 708 return result; 709 } 710 711 @Override 712 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 713 boolean evictedByEvictionProcess) { 714 /** 715 * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create 716 * only one {@link BucketCache.WriterThread}. 717 */ 718 assertTrue(Thread.currentThread() == this.writerThreads[0]); 719 720 blockEvictCounter.incrementAndGet(); 721 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 722 } 723 724 /** 725 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 726 * {@link BucketEntry} is freed. 727 */ 728 @Override 729 void freeBucketEntry(BucketEntry bucketEntry) { 730 freeBucketEntryCounter.incrementAndGet(); 731 super.freeBucketEntry(bucketEntry); 732 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 733 try { 734 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 735 } catch (IOException e) { 736 throw new RuntimeException(e); 737 } 738 } 739 } 740 741 private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) { 742 int byteSize = bucketEntry.getLength(); 743 byte[] data = new byte[byteSize]; 744 Arrays.fill(data, (byte) 0xff); 745 return ByteBuff.wrap(ByteBuffer.wrap(data)); 746 } 747}