001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.Arrays; 029import java.util.List; 030import java.util.concurrent.CyclicBarrier; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.io.ByteBuffAllocator; 036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 038import org.apache.hadoop.hbase.io.hfile.BlockType; 039import org.apache.hadoop.hbase.io.hfile.Cacheable; 040import org.apache.hadoop.hbase.io.hfile.HFileBlock; 041import org.apache.hadoop.hbase.io.hfile.HFileContext; 042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread; 044import org.apache.hadoop.hbase.nio.ByteBuff; 045import org.apache.hadoop.hbase.nio.RefCnt; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051 052@Category({ IOTests.class, SmallTests.class }) 053public class TestBucketCacheRefCnt { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class); 058 059 private static final String IO_ENGINE = "offheap"; 060 private static final long CAPACITY_SIZE = 32 * 1024 * 1024; 061 private static final int BLOCK_SIZE = 1024; 062 private static final int[] BLOCK_SIZE_ARRAY = 063 new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 }; 064 private static final String PERSISTENCE_PATH = null; 065 private static final HFileContext CONTEXT = new HFileContextBuilder().build(); 066 067 private BucketCache cache; 068 069 private static BucketCache create(int writerSize, int queueSize) throws IOException { 070 return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 071 queueSize, PERSISTENCE_PATH); 072 } 073 074 private static MyBucketCache createMyBucketCache(int writerSize, int queueSize) 075 throws IOException { 076 return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 077 queueSize, PERSISTENCE_PATH); 078 } 079 080 private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize) 081 throws IOException { 082 return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 083 queueSize, PERSISTENCE_PATH); 084 } 085 086 private static HFileBlock createBlock(int offset, int size) { 087 return createBlock(offset, size, ByteBuffAllocator.HEAP); 088 } 089 090 private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) { 091 return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)), 092 HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc); 093 } 094 095 private static BlockCacheKey createKey(String hfileName, long offset) { 096 return new BlockCacheKey(hfileName, offset); 097 } 098 099 private void disableWriter() { 100 if (cache != null) { 101 for (WriterThread wt : cache.writerThreads) { 102 wt.disableWriter(); 103 wt.interrupt(); 104 } 105 } 106 } 107 108 @org.junit.Ignore 109 @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082 110 // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> 111 public void testBlockInRAMCache() throws IOException { 112 cache = create(1, 1000); 113 // Set this to true; 114 cache.wait_when_cache = true; 115 disableWriter(); 116 final String prefix = "testBlockInRamCache"; 117 try { 118 for (int i = 0; i < 10; i++) { 119 HFileBlock blk = createBlock(i, 1020); 120 BlockCacheKey key = createKey(prefix, i); 121 assertEquals(1, blk.refCnt()); 122 cache.cacheBlock(key, blk); 123 assertEquals(i + 1, cache.getBlockCount()); 124 assertEquals(2, blk.refCnt()); 125 126 Cacheable block = cache.getBlock(key, false, false, false); 127 try { 128 assertEquals(3, blk.refCnt()); 129 assertEquals(3, block.refCnt()); 130 assertEquals(blk, block); 131 } finally { 132 block.release(); 133 } 134 assertEquals(2, blk.refCnt()); 135 assertEquals(2, block.refCnt()); 136 } 137 138 for (int i = 0; i < 10; i++) { 139 BlockCacheKey key = createKey(prefix, i); 140 Cacheable blk = cache.getBlock(key, false, false, false); 141 assertEquals(3, blk.refCnt()); 142 assertFalse(blk.release()); 143 assertEquals(2, blk.refCnt()); 144 145 assertTrue(cache.evictBlock(key)); 146 assertEquals(1, blk.refCnt()); 147 assertTrue(blk.release()); 148 assertEquals(0, blk.refCnt()); 149 } 150 } finally { 151 cache.shutdown(); 152 } 153 } 154 155 private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey) 156 throws InterruptedException { 157 while ( 158 !bucketCache.backingMap.containsKey(blockCacheKey) 159 || bucketCache.ramCache.containsKey(blockCacheKey) 160 ) { 161 Thread.sleep(100); 162 } 163 Thread.sleep(1000); 164 } 165 166 @Test 167 public void testBlockInBackingMap() throws Exception { 168 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 169 cache = create(1, 1000); 170 try { 171 HFileBlock blk = createBlock(200, 1020, alloc); 172 BlockCacheKey key = createKey("testHFile-00", 200); 173 cache.cacheBlock(key, blk); 174 waitUntilFlushedToCache(cache, key); 175 assertEquals(1, blk.refCnt()); 176 177 Cacheable block = cache.getBlock(key, false, false, false); 178 assertTrue(block instanceof HFileBlock); 179 assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc); 180 assertEquals(2, block.refCnt()); 181 182 block.retain(); 183 assertEquals(3, block.refCnt()); 184 185 Cacheable newBlock = cache.getBlock(key, false, false, false); 186 assertTrue(newBlock instanceof HFileBlock); 187 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 188 assertEquals(4, newBlock.refCnt()); 189 190 // release the newBlock 191 assertFalse(newBlock.release()); 192 assertEquals(3, newBlock.refCnt()); 193 assertEquals(3, block.refCnt()); 194 195 // Evict the key 196 cache.evictBlock(key); 197 assertEquals(2, block.refCnt()); 198 199 // Evict again, shouldn't change the refCnt. 200 cache.evictBlock(key); 201 assertEquals(2, block.refCnt()); 202 203 assertFalse(block.release()); 204 assertEquals(1, block.refCnt()); 205 206 /** 207 * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache}, 208 * so {@link BucketCache#getBlock} return null. 209 */ 210 Cacheable newestBlock = cache.getBlock(key, false, false, false); 211 assertNull(newestBlock); 212 assertEquals(1, block.refCnt()); 213 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 214 215 // Release the block 216 assertTrue(block.release()); 217 assertEquals(0, block.refCnt()); 218 assertEquals(0, newBlock.refCnt()); 219 } finally { 220 cache.shutdown(); 221 } 222 } 223 224 @Test 225 public void testInBucketCache() throws IOException { 226 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 227 cache = create(1, 1000); 228 try { 229 HFileBlock blk = createBlock(200, 1020, alloc); 230 BlockCacheKey key = createKey("testHFile-00", 200); 231 cache.cacheBlock(key, blk); 232 assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2); 233 234 Cacheable block1 = cache.getBlock(key, false, false, false); 235 assertTrue(block1.refCnt() >= 2); 236 assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc); 237 238 Cacheable block2 = cache.getBlock(key, false, false, false); 239 assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc); 240 assertTrue(block2.refCnt() >= 3); 241 242 cache.evictBlock(key); 243 assertTrue(blk.refCnt() >= 1); 244 assertTrue(block1.refCnt() >= 2); 245 assertTrue(block2.refCnt() >= 2); 246 247 // Get key again 248 Cacheable block3 = cache.getBlock(key, false, false, false); 249 if (block3 != null) { 250 assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc); 251 assertTrue(block3.refCnt() >= 3); 252 assertFalse(block3.release()); 253 } 254 255 blk.release(); 256 boolean ret1 = block1.release(); 257 boolean ret2 = block2.release(); 258 assertTrue(ret1 || ret2); 259 assertEquals(0, blk.refCnt()); 260 assertEquals(0, block1.refCnt()); 261 assertEquals(0, block2.refCnt()); 262 } finally { 263 cache.shutdown(); 264 } 265 } 266 267 @Test 268 public void testMarkStaleAsEvicted() throws Exception { 269 cache = create(1, 1000); 270 try { 271 HFileBlock blk = createBlock(200, 1020); 272 BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200); 273 cache.cacheBlock(key, blk); 274 waitUntilFlushedToCache(cache, key); 275 assertEquals(1, blk.refCnt()); 276 assertNotNull(cache.backingMap.get(key)); 277 assertEquals(1, cache.backingMap.get(key).refCnt()); 278 279 // RPC reference this cache. 280 Cacheable block1 = cache.getBlock(key, false, false, false); 281 assertEquals(2, block1.refCnt()); 282 BucketEntry be1 = cache.backingMap.get(key); 283 assertNotNull(be1); 284 assertEquals(2, be1.refCnt()); 285 286 // We've some RPC reference, so it won't have any effect. 287 assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 288 assertEquals(2, block1.refCnt()); 289 assertEquals(2, cache.backingMap.get(key).refCnt()); 290 291 // Release the RPC reference. 292 block1.release(); 293 assertEquals(1, block1.refCnt()); 294 assertEquals(1, cache.backingMap.get(key).refCnt()); 295 296 // Mark the stale as evicted again, it'll do the de-allocation. 297 assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 298 assertEquals(0, block1.refCnt()); 299 assertNull(cache.backingMap.get(key)); 300 assertEquals(0, cache.size()); 301 } finally { 302 cache.shutdown(); 303 } 304 } 305 306 /** 307 * <pre> 308 * This test is for HBASE-26281, 309 * test two threads for replacing Block and getting Block execute concurrently. 310 * The threads sequence is: 311 * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1. 312 * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied 313 * {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would 314 * replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal} 315 * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 316 * which returned Block1, the {@link RefCnt} of Block1 is 2. 317 * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap}, 318 * the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used 319 * by Thread2 and the content of Block1 would be overwritten after it is freed, which may 320 * cause a serious error. 321 * </pre> 322 * 323 * n 324 */ 325 @Test 326 public void testReplacingBlockAndGettingBlockConcurrently() throws Exception { 327 ByteBuffAllocator byteBuffAllocator = 328 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 329 final MyBucketCache myBucketCache = createMyBucketCache(1, 1000); 330 try { 331 HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 332 final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200); 333 myBucketCache.cacheBlock(blockCacheKey, hfileBlock); 334 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 335 assertEquals(1, hfileBlock.refCnt()); 336 337 assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey)); 338 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 339 Thread cacheBlockThread = new Thread(() -> { 340 try { 341 HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator); 342 myBucketCache.cacheBlock(blockCacheKey, newHFileBlock); 343 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 344 345 } catch (Throwable exception) { 346 exceptionRef.set(exception); 347 } 348 }); 349 cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME); 350 cacheBlockThread.start(); 351 352 String oldThreadName = Thread.currentThread().getName(); 353 HFileBlock gotHFileBlock = null; 354 try { 355 356 Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME); 357 358 gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false)); 359 assertTrue(gotHFileBlock.equals(hfileBlock)); 360 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 361 assertEquals(2, gotHFileBlock.refCnt()); 362 /** 363 * Release the second cyclicBarrier.await in 364 * {@link MyBucketCache#cacheBlockWithWaitInternal} 365 */ 366 myBucketCache.cyclicBarrier.await(); 367 368 } finally { 369 Thread.currentThread().setName(oldThreadName); 370 } 371 372 cacheBlockThread.join(); 373 assertTrue(exceptionRef.get() == null); 374 assertEquals(1, gotHFileBlock.refCnt()); 375 assertTrue(gotHFileBlock.equals(hfileBlock)); 376 assertTrue(myBucketCache.overwiteByteBuff == null); 377 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0); 378 379 gotHFileBlock.release(); 380 assertEquals(0, gotHFileBlock.refCnt()); 381 assertTrue(myBucketCache.overwiteByteBuff != null); 382 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1); 383 assertTrue(myBucketCache.replaceCounter.get() == 1); 384 assertTrue(myBucketCache.blockEvictCounter.get() == 1); 385 } finally { 386 myBucketCache.shutdown(); 387 } 388 389 } 390 391 /** 392 * <pre> 393 * This test also is for HBASE-26281, 394 * test three threads for evicting Block,caching Block and getting Block 395 * execute concurrently. 396 * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap}, 397 * the {@link RefCnt} of Block1 is 1. 398 * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey}, 399 * but stopping after {@link BucketCache#removeFromRamCache}. 400 * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 401 * which returned Block1, the {@link RefCnt} of Block1 is 2. 402 * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove} 403 * returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1 404 * directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3. 405 * </pre> 406 */ 407 @Test 408 public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception { 409 ByteBuffAllocator byteBuffAllocator = 410 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 411 final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000); 412 try { 413 final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 414 final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200); 415 final AtomicReference<Throwable> cacheBlockThreadExceptionRef = 416 new AtomicReference<Throwable>(); 417 Thread cacheBlockThread = new Thread(() -> { 418 try { 419 myBucketCache2.cacheBlock(blockCacheKey, hfileBlock); 420 /** 421 * Wait for Caching Block completed. 422 */ 423 myBucketCache2.writeThreadDoneCyclicBarrier.await(); 424 } catch (Throwable exception) { 425 cacheBlockThreadExceptionRef.set(exception); 426 } 427 }); 428 cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME); 429 cacheBlockThread.start(); 430 431 final AtomicReference<Throwable> evictBlockThreadExceptionRef = 432 new AtomicReference<Throwable>(); 433 Thread evictBlockThread = new Thread(() -> { 434 try { 435 myBucketCache2.evictBlock(blockCacheKey); 436 } catch (Throwable exception) { 437 evictBlockThreadExceptionRef.set(exception); 438 } 439 }); 440 evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME); 441 evictBlockThread.start(); 442 443 String oldThreadName = Thread.currentThread().getName(); 444 HFileBlock gotHFileBlock = null; 445 try { 446 Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME); 447 gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false)); 448 assertTrue(gotHFileBlock.equals(hfileBlock)); 449 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 450 assertEquals(2, gotHFileBlock.refCnt()); 451 try { 452 /** 453 * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for 454 * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread} 455 * could continue. 456 */ 457 myBucketCache2.putCyclicBarrier.await(); 458 } catch (Throwable e) { 459 throw new RuntimeException(e); 460 } 461 462 } finally { 463 Thread.currentThread().setName(oldThreadName); 464 } 465 466 cacheBlockThread.join(); 467 evictBlockThread.join(); 468 assertTrue(cacheBlockThreadExceptionRef.get() == null); 469 assertTrue(evictBlockThreadExceptionRef.get() == null); 470 471 assertTrue(gotHFileBlock.equals(hfileBlock)); 472 assertEquals(1, gotHFileBlock.refCnt()); 473 assertTrue(myBucketCache2.overwiteByteBuff == null); 474 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0); 475 476 gotHFileBlock.release(); 477 assertEquals(0, gotHFileBlock.refCnt()); 478 assertTrue(myBucketCache2.overwiteByteBuff != null); 479 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1); 480 assertTrue(myBucketCache2.blockEvictCounter.get() == 1); 481 } finally { 482 myBucketCache2.shutdown(); 483 } 484 485 } 486 487 static class MyBucketCache extends BucketCache { 488 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 489 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 490 491 private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 492 private final AtomicInteger replaceCounter = new AtomicInteger(0); 493 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 494 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 495 private ByteBuff overwiteByteBuff = null; 496 497 public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 498 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 499 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 500 persistencePath); 501 } 502 503 /** 504 * Simulate the Block could be replaced. 505 */ 506 @Override 507 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 508 replaceCounter.incrementAndGet(); 509 return true; 510 } 511 512 @Override 513 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 514 boolean updateCacheMetrics) { 515 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 516 /** 517 * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal}, 518 * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef} 519 * checking. 520 */ 521 try { 522 cyclicBarrier.await(); 523 } catch (Throwable e) { 524 throw new RuntimeException(e); 525 } 526 } 527 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 528 return result; 529 } 530 531 @Override 532 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 533 boolean inMemory, boolean wait) { 534 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 535 /** 536 * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock} 537 */ 538 try { 539 cyclicBarrier.await(); 540 } catch (Throwable e) { 541 throw new RuntimeException(e); 542 } 543 } 544 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 545 /** 546 * Wait the cyclicBarrier.await() in 547 * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for 548 * {@link MyBucketCache#getBlock} and Assert completed. 549 */ 550 try { 551 cyclicBarrier.await(); 552 } catch (Throwable e) { 553 throw new RuntimeException(e); 554 } 555 } 556 super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 557 } 558 559 @Override 560 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 561 boolean evictedByEvictionProcess) { 562 blockEvictCounter.incrementAndGet(); 563 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 564 } 565 566 /** 567 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 568 * {@link BucketEntry} is freed. 569 */ 570 @Override 571 void freeBucketEntry(BucketEntry bucketEntry) { 572 freeBucketEntryCounter.incrementAndGet(); 573 super.freeBucketEntry(bucketEntry); 574 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 575 try { 576 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 577 } catch (IOException e) { 578 throw new RuntimeException(e); 579 } 580 } 581 } 582 583 static class MyBucketCache2 extends BucketCache { 584 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 585 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 586 private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread"; 587 588 private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2); 589 private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2); 590 private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2); 591 /** 592 * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and 593 * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed. 594 */ 595 private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3); 596 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 597 private final AtomicInteger removeRamCounter = new AtomicInteger(0); 598 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 599 private ByteBuff overwiteByteBuff = null; 600 601 public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 602 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 603 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 604 persistencePath); 605 } 606 607 @Override 608 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 609 super.putIntoBackingMap(key, bucketEntry); 610 /** 611 * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before 612 * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME} 613 */ 614 try { 615 evictCyclicBarrier.await(); 616 } catch (Throwable e) { 617 throw new RuntimeException(e); 618 } 619 620 /** 621 * Wait the cyclicBarrier.await() in 622 * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for 623 * {@link MyBucketCache#getBlock} and Assert completed. 624 */ 625 try { 626 putCyclicBarrier.await(); 627 } catch (Throwable e) { 628 throw new RuntimeException(e); 629 } 630 } 631 632 @Override 633 void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 634 super.doDrain(entries, metaBuff); 635 if (entries.size() > 0) { 636 /** 637 * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and 638 * {@link #EVICT_BLOCK_THREAD_NAME}. 639 */ 640 try { 641 writeThreadDoneCyclicBarrier.await(); 642 } catch (Throwable e) { 643 throw new RuntimeException(e); 644 } 645 } 646 647 } 648 649 @Override 650 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 651 boolean updateCacheMetrics) { 652 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 653 /** 654 * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after 655 * {@link BucketCache#removeFromRamCache}. 656 */ 657 try { 658 getCyclicBarrier.await(); 659 } catch (Throwable e) { 660 throw new RuntimeException(e); 661 } 662 } 663 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 664 return result; 665 } 666 667 @Override 668 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 669 boolean firstTime = false; 670 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 671 int count = this.removeRamCounter.incrementAndGet(); 672 firstTime = (count == 1); 673 if (firstTime) { 674 /** 675 * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after 676 * {@link BucketCache#putIntoBackingMap}. 677 */ 678 try { 679 evictCyclicBarrier.await(); 680 } catch (Throwable e) { 681 throw new RuntimeException(e); 682 } 683 } 684 } 685 boolean result = super.removeFromRamCache(cacheKey); 686 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 687 if (firstTime) { 688 /** 689 * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}. 690 */ 691 try { 692 getCyclicBarrier.await(); 693 } catch (Throwable e) { 694 throw new RuntimeException(e); 695 } 696 /** 697 * Wait for Caching Block completed, after Caching Block completed, evictBlock could 698 * continue. 699 */ 700 try { 701 writeThreadDoneCyclicBarrier.await(); 702 } catch (Throwable e) { 703 throw new RuntimeException(e); 704 } 705 } 706 } 707 708 return result; 709 } 710 711 @Override 712 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 713 boolean evictedByEvictionProcess) { 714 /** 715 * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create 716 * only one {@link BucketCache.WriterThread}. 717 */ 718 assertTrue(Thread.currentThread() == this.writerThreads[0]); 719 720 blockEvictCounter.incrementAndGet(); 721 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 722 } 723 724 /** 725 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 726 * {@link BucketEntry} is freed. 727 */ 728 @Override 729 void freeBucketEntry(BucketEntry bucketEntry) { 730 freeBucketEntryCounter.incrementAndGet(); 731 super.freeBucketEntry(bucketEntry); 732 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 733 try { 734 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 735 } catch (IOException e) { 736 throw new RuntimeException(e); 737 } 738 } 739 } 740 741 private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) { 742 int byteSize = bucketEntry.getLength(); 743 byte[] data = new byte[byteSize]; 744 Arrays.fill(data, (byte) 0xff); 745 return ByteBuff.wrap(ByteBuffer.wrap(data)); 746 } 747}