001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.Arrays; 029import java.util.List; 030import java.util.concurrent.CyclicBarrier; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.Waiter; 036import org.apache.hadoop.hbase.io.ByteBuffAllocator; 037import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 038import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 039import org.apache.hadoop.hbase.io.hfile.BlockType; 040import org.apache.hadoop.hbase.io.hfile.Cacheable; 041import org.apache.hadoop.hbase.io.hfile.HFileBlock; 042import org.apache.hadoop.hbase.io.hfile.HFileContext; 043import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 044import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread; 045import org.apache.hadoop.hbase.nio.ByteBuff; 046import org.apache.hadoop.hbase.nio.RefCnt; 047import org.apache.hadoop.hbase.testclassification.IOTests; 048import org.apache.hadoop.hbase.testclassification.SmallTests; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052 053@Category({ IOTests.class, SmallTests.class }) 054public class TestBucketCacheRefCnt { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class); 059 060 private static final String IO_ENGINE = "offheap"; 061 private static final long CAPACITY_SIZE = 32 * 1024 * 1024; 062 private static final int BLOCK_SIZE = 1024; 063 private static final int[] BLOCK_SIZE_ARRAY = 064 new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 }; 065 private static final String PERSISTENCE_PATH = null; 066 private static final HFileContext CONTEXT = new HFileContextBuilder().build(); 067 068 private BucketCache cache; 069 070 private static BucketCache create(int writerSize, int queueSize) throws IOException { 071 return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 072 queueSize, PERSISTENCE_PATH); 073 } 074 075 private static MyBucketCache createMyBucketCache(int writerSize, int queueSize) 076 throws IOException { 077 return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 078 queueSize, PERSISTENCE_PATH); 079 } 080 081 private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize) 082 throws IOException { 083 return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 084 queueSize, PERSISTENCE_PATH); 085 } 086 087 private static HFileBlock createBlock(int offset, int size) { 088 return createBlock(offset, size, ByteBuffAllocator.HEAP); 089 } 090 091 private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) { 092 return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)), 093 HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc); 094 } 095 096 private static BlockCacheKey createKey(String hfileName, long offset) { 097 return new BlockCacheKey(hfileName, offset); 098 } 099 100 private void disableWriter() { 101 if (cache != null) { 102 for (WriterThread wt : cache.writerThreads) { 103 wt.disableWriter(); 104 wt.interrupt(); 105 } 106 } 107 } 108 109 @org.junit.Ignore 110 @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082 111 // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> 112 public void testBlockInRAMCache() throws IOException { 113 cache = create(1, 1000); 114 disableWriter(); 115 final String prefix = "testBlockInRamCache"; 116 try { 117 for (int i = 0; i < 10; i++) { 118 HFileBlock blk = createBlock(i, 1020); 119 BlockCacheKey key = createKey(prefix, i); 120 assertEquals(1, blk.refCnt()); 121 cache.cacheBlock(key, blk); 122 assertEquals(i + 1, cache.getBlockCount()); 123 assertEquals(2, blk.refCnt()); 124 125 Cacheable block = cache.getBlock(key, false, false, false); 126 try { 127 assertEquals(3, blk.refCnt()); 128 assertEquals(3, block.refCnt()); 129 assertEquals(blk, block); 130 } finally { 131 block.release(); 132 } 133 assertEquals(2, blk.refCnt()); 134 assertEquals(2, block.refCnt()); 135 } 136 137 for (int i = 0; i < 10; i++) { 138 BlockCacheKey key = createKey(prefix, i); 139 Cacheable blk = cache.getBlock(key, false, false, false); 140 assertEquals(3, blk.refCnt()); 141 assertFalse(blk.release()); 142 assertEquals(2, blk.refCnt()); 143 144 assertTrue(cache.evictBlock(key)); 145 assertEquals(1, blk.refCnt()); 146 assertTrue(blk.release()); 147 assertEquals(0, blk.refCnt()); 148 } 149 } finally { 150 cache.shutdown(); 151 } 152 } 153 154 private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey) 155 throws InterruptedException { 156 while ( 157 !bucketCache.backingMap.containsKey(blockCacheKey) 158 || bucketCache.ramCache.containsKey(blockCacheKey) 159 ) { 160 Thread.sleep(100); 161 } 162 Thread.sleep(1000); 163 } 164 165 @Test 166 public void testBlockInBackingMap() throws Exception { 167 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 168 cache = create(1, 1000); 169 try { 170 HFileBlock blk = createBlock(200, 1020, alloc); 171 BlockCacheKey key = createKey("testHFile-00", 200); 172 cache.cacheBlock(key, blk); 173 waitUntilFlushedToCache(cache, key); 174 assertEquals(1, blk.refCnt()); 175 176 Cacheable block = cache.getBlock(key, false, false, false); 177 assertTrue(block instanceof HFileBlock); 178 assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc); 179 assertEquals(2, block.refCnt()); 180 181 block.retain(); 182 assertEquals(3, block.refCnt()); 183 184 Cacheable newBlock = cache.getBlock(key, false, false, false); 185 assertTrue(newBlock instanceof HFileBlock); 186 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 187 assertEquals(4, newBlock.refCnt()); 188 189 // release the newBlock 190 assertFalse(newBlock.release()); 191 assertEquals(3, newBlock.refCnt()); 192 assertEquals(3, block.refCnt()); 193 194 // Evict the key 195 cache.evictBlock(key); 196 assertEquals(2, block.refCnt()); 197 198 // Evict again, shouldn't change the refCnt. 199 cache.evictBlock(key); 200 assertEquals(2, block.refCnt()); 201 202 assertFalse(block.release()); 203 assertEquals(1, block.refCnt()); 204 205 /** 206 * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache}, 207 * so {@link BucketCache#getBlock} return null. 208 */ 209 Cacheable newestBlock = cache.getBlock(key, false, false, false); 210 assertNull(newestBlock); 211 assertEquals(1, block.refCnt()); 212 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 213 214 // Release the block 215 assertTrue(block.release()); 216 assertEquals(0, block.refCnt()); 217 assertEquals(0, newBlock.refCnt()); 218 } finally { 219 cache.shutdown(); 220 } 221 } 222 223 @Test 224 public void testInBucketCache() throws IOException { 225 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 226 cache = create(1, 1000); 227 try { 228 HFileBlock blk = createBlock(200, 1020, alloc); 229 BlockCacheKey key = createKey("testHFile-00", 200); 230 cache.cacheBlock(key, blk); 231 assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2); 232 233 // wait for block to move to backing map because refCnt get refreshed once block moves to 234 // backing map 235 Waiter.waitFor(HBaseConfiguration.create(), 12000, () -> isRamCacheDrained(key, cache)); 236 237 Cacheable block1 = cache.getBlock(key, false, false, false); 238 assertTrue(block1.refCnt() >= 2); 239 assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc); 240 241 Cacheable block2 = cache.getBlock(key, false, false, false); 242 assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc); 243 assertTrue(block2.refCnt() >= 3); 244 245 cache.evictBlock(key); 246 assertTrue(blk.refCnt() >= 1); 247 assertTrue(block1.refCnt() >= 2); 248 assertTrue(block2.refCnt() >= 2); 249 250 // Get key again 251 Cacheable block3 = cache.getBlock(key, false, false, false); 252 if (block3 != null) { 253 assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc); 254 assertTrue(block3.refCnt() >= 3); 255 assertFalse(block3.release()); 256 } 257 258 blk.release(); 259 boolean ret1 = block1.release(); 260 boolean ret2 = block2.release(); 261 assertTrue(ret1 || ret2); 262 assertEquals(0, blk.refCnt()); 263 assertEquals(0, block1.refCnt()); 264 assertEquals(0, block2.refCnt()); 265 } finally { 266 cache.shutdown(); 267 } 268 } 269 270 private boolean isRamCacheDrained(BlockCacheKey key, BucketCache cache) { 271 return cache.backingMap.containsKey(key) && !cache.ramCache.containsKey(key); 272 } 273 274 @Test 275 public void testMarkStaleAsEvicted() throws Exception { 276 cache = create(1, 1000); 277 try { 278 HFileBlock blk = createBlock(200, 1020); 279 BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200); 280 cache.cacheBlock(key, blk); 281 waitUntilFlushedToCache(cache, key); 282 assertEquals(1, blk.refCnt()); 283 assertNotNull(cache.backingMap.get(key)); 284 assertEquals(1, cache.backingMap.get(key).refCnt()); 285 286 // RPC reference this cache. 287 Cacheable block1 = cache.getBlock(key, false, false, false); 288 assertEquals(2, block1.refCnt()); 289 BucketEntry be1 = cache.backingMap.get(key); 290 assertNotNull(be1); 291 assertEquals(2, be1.refCnt()); 292 293 // We've some RPC reference, so it won't have any effect. 294 assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 295 assertEquals(2, block1.refCnt()); 296 assertEquals(2, cache.backingMap.get(key).refCnt()); 297 298 // Release the RPC reference. 299 block1.release(); 300 assertEquals(1, block1.refCnt()); 301 assertEquals(1, cache.backingMap.get(key).refCnt()); 302 303 // Mark the stale as evicted again, it'll do the de-allocation. 304 assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 305 assertEquals(0, block1.refCnt()); 306 assertNull(cache.backingMap.get(key)); 307 assertEquals(0, cache.size()); 308 } finally { 309 cache.shutdown(); 310 } 311 } 312 313 /** 314 * <pre> 315 * This test is for HBASE-26281, 316 * test two threads for replacing Block and getting Block execute concurrently. 317 * The threads sequence is: 318 * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1. 319 * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied 320 * {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would 321 * replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal} 322 * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 323 * which returned Block1, the {@link RefCnt} of Block1 is 2. 324 * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap}, 325 * the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used 326 * by Thread2 and the content of Block1 would be overwritten after it is freed, which may 327 * cause a serious error. 328 * </pre> 329 */ 330 @Test 331 public void testReplacingBlockAndGettingBlockConcurrently() throws Exception { 332 ByteBuffAllocator byteBuffAllocator = 333 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 334 final MyBucketCache myBucketCache = createMyBucketCache(1, 1000); 335 try { 336 HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 337 final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200); 338 myBucketCache.cacheBlock(blockCacheKey, hfileBlock); 339 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 340 assertEquals(1, hfileBlock.refCnt()); 341 342 assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey)); 343 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 344 Thread cacheBlockThread = new Thread(() -> { 345 try { 346 HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator); 347 myBucketCache.cacheBlock(blockCacheKey, newHFileBlock); 348 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 349 350 } catch (Throwable exception) { 351 exceptionRef.set(exception); 352 } 353 }); 354 cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME); 355 cacheBlockThread.start(); 356 357 String oldThreadName = Thread.currentThread().getName(); 358 HFileBlock gotHFileBlock = null; 359 try { 360 361 Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME); 362 363 gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false)); 364 assertTrue(gotHFileBlock.equals(hfileBlock)); 365 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 366 assertEquals(2, gotHFileBlock.refCnt()); 367 /** 368 * Release the second cyclicBarrier.await in 369 * {@link MyBucketCache#cacheBlockWithWaitInternal} 370 */ 371 myBucketCache.cyclicBarrier.await(); 372 373 } finally { 374 Thread.currentThread().setName(oldThreadName); 375 } 376 377 cacheBlockThread.join(); 378 assertTrue(exceptionRef.get() == null); 379 assertEquals(1, gotHFileBlock.refCnt()); 380 assertTrue(gotHFileBlock.equals(hfileBlock)); 381 assertTrue(myBucketCache.overwiteByteBuff == null); 382 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0); 383 384 gotHFileBlock.release(); 385 assertEquals(0, gotHFileBlock.refCnt()); 386 assertTrue(myBucketCache.overwiteByteBuff != null); 387 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1); 388 assertTrue(myBucketCache.replaceCounter.get() == 1); 389 assertTrue(myBucketCache.blockEvictCounter.get() == 1); 390 } finally { 391 myBucketCache.shutdown(); 392 } 393 394 } 395 396 /** 397 * <pre> 398 * This test also is for HBASE-26281, 399 * test three threads for evicting Block,caching Block and getting Block 400 * execute concurrently. 401 * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap}, 402 * the {@link RefCnt} of Block1 is 1. 403 * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey}, 404 * but stopping after {@link BucketCache#removeFromRamCache}. 405 * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 406 * which returned Block1, the {@link RefCnt} of Block1 is 2. 407 * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove} 408 * returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1 409 * directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3. 410 * </pre> 411 */ 412 @Test 413 public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception { 414 ByteBuffAllocator byteBuffAllocator = 415 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 416 final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000); 417 try { 418 final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 419 final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200); 420 final AtomicReference<Throwable> cacheBlockThreadExceptionRef = 421 new AtomicReference<Throwable>(); 422 Thread cacheBlockThread = new Thread(() -> { 423 try { 424 myBucketCache2.cacheBlock(blockCacheKey, hfileBlock); 425 /** 426 * Wait for Caching Block completed. 427 */ 428 myBucketCache2.writeThreadDoneCyclicBarrier.await(); 429 } catch (Throwable exception) { 430 cacheBlockThreadExceptionRef.set(exception); 431 } 432 }); 433 cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME); 434 cacheBlockThread.start(); 435 436 final AtomicReference<Throwable> evictBlockThreadExceptionRef = 437 new AtomicReference<Throwable>(); 438 Thread evictBlockThread = new Thread(() -> { 439 try { 440 myBucketCache2.evictBlock(blockCacheKey); 441 } catch (Throwable exception) { 442 evictBlockThreadExceptionRef.set(exception); 443 } 444 }); 445 evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME); 446 evictBlockThread.start(); 447 448 String oldThreadName = Thread.currentThread().getName(); 449 HFileBlock gotHFileBlock = null; 450 try { 451 Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME); 452 gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false)); 453 assertTrue(gotHFileBlock.equals(hfileBlock)); 454 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 455 assertEquals(2, gotHFileBlock.refCnt()); 456 try { 457 /** 458 * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for 459 * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread} 460 * could continue. 461 */ 462 myBucketCache2.putCyclicBarrier.await(); 463 } catch (Throwable e) { 464 throw new RuntimeException(e); 465 } 466 467 } finally { 468 Thread.currentThread().setName(oldThreadName); 469 } 470 471 cacheBlockThread.join(); 472 evictBlockThread.join(); 473 assertTrue(cacheBlockThreadExceptionRef.get() == null); 474 assertTrue(evictBlockThreadExceptionRef.get() == null); 475 476 assertTrue(gotHFileBlock.equals(hfileBlock)); 477 assertEquals(1, gotHFileBlock.refCnt()); 478 assertTrue(myBucketCache2.overwiteByteBuff == null); 479 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0); 480 481 gotHFileBlock.release(); 482 assertEquals(0, gotHFileBlock.refCnt()); 483 assertTrue(myBucketCache2.overwiteByteBuff != null); 484 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1); 485 assertTrue(myBucketCache2.blockEvictCounter.get() == 1); 486 } finally { 487 myBucketCache2.shutdown(); 488 } 489 490 } 491 492 static class MyBucketCache extends BucketCache { 493 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 494 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 495 496 private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 497 private final AtomicInteger replaceCounter = new AtomicInteger(0); 498 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 499 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 500 private ByteBuff overwiteByteBuff = null; 501 502 public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 503 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 504 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 505 persistencePath); 506 } 507 508 /** 509 * Simulate the Block could be replaced. 510 */ 511 @Override 512 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 513 replaceCounter.incrementAndGet(); 514 return true; 515 } 516 517 @Override 518 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 519 boolean updateCacheMetrics) { 520 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 521 /** 522 * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal}, 523 * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef} 524 * checking. 525 */ 526 try { 527 cyclicBarrier.await(); 528 } catch (Throwable e) { 529 throw new RuntimeException(e); 530 } 531 } 532 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 533 return result; 534 } 535 536 @Override 537 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 538 boolean inMemory, boolean wait) { 539 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 540 /** 541 * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock} 542 */ 543 try { 544 cyclicBarrier.await(); 545 } catch (Throwable e) { 546 throw new RuntimeException(e); 547 } 548 } 549 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 550 /** 551 * Wait the cyclicBarrier.await() in 552 * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for 553 * {@link MyBucketCache#getBlock} and Assert completed. 554 */ 555 try { 556 cyclicBarrier.await(); 557 } catch (Throwable e) { 558 throw new RuntimeException(e); 559 } 560 } 561 super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 562 } 563 564 @Override 565 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 566 boolean evictedByEvictionProcess) { 567 blockEvictCounter.incrementAndGet(); 568 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 569 } 570 571 /** 572 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 573 * {@link BucketEntry} is freed. 574 */ 575 @Override 576 void freeBucketEntry(BucketEntry bucketEntry) { 577 freeBucketEntryCounter.incrementAndGet(); 578 super.freeBucketEntry(bucketEntry); 579 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 580 try { 581 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 582 } catch (IOException e) { 583 throw new RuntimeException(e); 584 } 585 } 586 } 587 588 static class MyBucketCache2 extends BucketCache { 589 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 590 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 591 private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread"; 592 593 private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2); 594 private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2); 595 private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2); 596 /** 597 * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and 598 * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed. 599 */ 600 private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3); 601 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 602 private final AtomicInteger removeRamCounter = new AtomicInteger(0); 603 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 604 private ByteBuff overwiteByteBuff = null; 605 606 public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 607 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 608 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 609 persistencePath); 610 } 611 612 @Override 613 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 614 super.putIntoBackingMap(key, bucketEntry); 615 /** 616 * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before 617 * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME} 618 */ 619 try { 620 evictCyclicBarrier.await(); 621 } catch (Throwable e) { 622 throw new RuntimeException(e); 623 } 624 625 /** 626 * Wait the cyclicBarrier.await() in 627 * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for 628 * {@link MyBucketCache#getBlock} and Assert completed. 629 */ 630 try { 631 putCyclicBarrier.await(); 632 } catch (Throwable e) { 633 throw new RuntimeException(e); 634 } 635 } 636 637 @Override 638 void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 639 super.doDrain(entries, metaBuff); 640 if (entries.size() > 0) { 641 /** 642 * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and 643 * {@link #EVICT_BLOCK_THREAD_NAME}. 644 */ 645 try { 646 writeThreadDoneCyclicBarrier.await(); 647 } catch (Throwable e) { 648 throw new RuntimeException(e); 649 } 650 } 651 652 } 653 654 @Override 655 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 656 boolean updateCacheMetrics) { 657 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 658 /** 659 * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after 660 * {@link BucketCache#removeFromRamCache}. 661 */ 662 try { 663 getCyclicBarrier.await(); 664 } catch (Throwable e) { 665 throw new RuntimeException(e); 666 } 667 } 668 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 669 return result; 670 } 671 672 @Override 673 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 674 boolean firstTime = false; 675 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 676 int count = this.removeRamCounter.incrementAndGet(); 677 firstTime = (count == 1); 678 if (firstTime) { 679 /** 680 * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after 681 * {@link BucketCache#putIntoBackingMap}. 682 */ 683 try { 684 evictCyclicBarrier.await(); 685 } catch (Throwable e) { 686 throw new RuntimeException(e); 687 } 688 } 689 } 690 boolean result = super.removeFromRamCache(cacheKey); 691 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 692 if (firstTime) { 693 /** 694 * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}. 695 */ 696 try { 697 getCyclicBarrier.await(); 698 } catch (Throwable e) { 699 throw new RuntimeException(e); 700 } 701 /** 702 * Wait for Caching Block completed, after Caching Block completed, evictBlock could 703 * continue. 704 */ 705 try { 706 writeThreadDoneCyclicBarrier.await(); 707 } catch (Throwable e) { 708 throw new RuntimeException(e); 709 } 710 } 711 } 712 713 return result; 714 } 715 716 @Override 717 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 718 boolean evictedByEvictionProcess) { 719 /** 720 * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create 721 * only one {@link BucketCache.WriterThread}. 722 */ 723 assertTrue(Thread.currentThread() == this.writerThreads[0]); 724 725 blockEvictCounter.incrementAndGet(); 726 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 727 } 728 729 /** 730 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 731 * {@link BucketEntry} is freed. 732 */ 733 @Override 734 void freeBucketEntry(BucketEntry bucketEntry) { 735 freeBucketEntryCounter.incrementAndGet(); 736 super.freeBucketEntry(bucketEntry); 737 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 738 try { 739 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 740 } catch (IOException e) { 741 throw new RuntimeException(e); 742 } 743 } 744 } 745 746 private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) { 747 int byteSize = bucketEntry.getLength(); 748 byte[] data = new byte[byteSize]; 749 Arrays.fill(data, (byte) 0xff); 750 return ByteBuff.wrap(ByteBuffer.wrap(data)); 751 } 752}