001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.CyclicBarrier;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.Waiter;
036import org.apache.hadoop.hbase.io.ByteBuffAllocator;
037import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
038import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
039import org.apache.hadoop.hbase.io.hfile.BlockType;
040import org.apache.hadoop.hbase.io.hfile.Cacheable;
041import org.apache.hadoop.hbase.io.hfile.HFileBlock;
042import org.apache.hadoop.hbase.io.hfile.HFileContext;
043import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
044import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.nio.RefCnt;
047import org.apache.hadoop.hbase.testclassification.IOTests;
048import org.apache.hadoop.hbase.testclassification.SmallTests;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052
053@Category({ IOTests.class, SmallTests.class })
054public class TestBucketCacheRefCnt {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class);
059
060  private static final String IO_ENGINE = "offheap";
061  private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
062  private static final int BLOCK_SIZE = 1024;
063  private static final int[] BLOCK_SIZE_ARRAY =
064    new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
065  private static final String PERSISTENCE_PATH = null;
066  private static final HFileContext CONTEXT = new HFileContextBuilder().build();
067
068  private BucketCache cache;
069
070  private static BucketCache create(int writerSize, int queueSize) throws IOException {
071    return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
072      queueSize, PERSISTENCE_PATH);
073  }
074
075  private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
076    throws IOException {
077    return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
078      queueSize, PERSISTENCE_PATH);
079  }
080
081  private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
082    throws IOException {
083    return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
084      queueSize, PERSISTENCE_PATH);
085  }
086
087  private static HFileBlock createBlock(int offset, int size) {
088    return createBlock(offset, size, ByteBuffAllocator.HEAP);
089  }
090
091  private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
092    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
093      HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
094  }
095
096  private static BlockCacheKey createKey(String hfileName, long offset) {
097    return new BlockCacheKey(hfileName, offset);
098  }
099
100  private void disableWriter() {
101    if (cache != null) {
102      for (WriterThread wt : cache.writerThreads) {
103        wt.disableWriter();
104        wt.interrupt();
105      }
106    }
107  }
108
109  @org.junit.Ignore
110  @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082
111  // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
112  public void testBlockInRAMCache() throws IOException {
113    cache = create(1, 1000);
114    disableWriter();
115    final String prefix = "testBlockInRamCache";
116    try {
117      for (int i = 0; i < 10; i++) {
118        HFileBlock blk = createBlock(i, 1020);
119        BlockCacheKey key = createKey(prefix, i);
120        assertEquals(1, blk.refCnt());
121        cache.cacheBlock(key, blk);
122        assertEquals(i + 1, cache.getBlockCount());
123        assertEquals(2, blk.refCnt());
124
125        Cacheable block = cache.getBlock(key, false, false, false);
126        try {
127          assertEquals(3, blk.refCnt());
128          assertEquals(3, block.refCnt());
129          assertEquals(blk, block);
130        } finally {
131          block.release();
132        }
133        assertEquals(2, blk.refCnt());
134        assertEquals(2, block.refCnt());
135      }
136
137      for (int i = 0; i < 10; i++) {
138        BlockCacheKey key = createKey(prefix, i);
139        Cacheable blk = cache.getBlock(key, false, false, false);
140        assertEquals(3, blk.refCnt());
141        assertFalse(blk.release());
142        assertEquals(2, blk.refCnt());
143
144        assertTrue(cache.evictBlock(key));
145        assertEquals(1, blk.refCnt());
146        assertTrue(blk.release());
147        assertEquals(0, blk.refCnt());
148      }
149    } finally {
150      cache.shutdown();
151    }
152  }
153
154  private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
155    throws InterruptedException {
156    while (
157      !bucketCache.backingMap.containsKey(blockCacheKey)
158        || bucketCache.ramCache.containsKey(blockCacheKey)
159    ) {
160      Thread.sleep(100);
161    }
162    Thread.sleep(1000);
163  }
164
165  @Test
166  public void testBlockInBackingMap() throws Exception {
167    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
168    cache = create(1, 1000);
169    try {
170      HFileBlock blk = createBlock(200, 1020, alloc);
171      BlockCacheKey key = createKey("testHFile-00", 200);
172      cache.cacheBlock(key, blk);
173      waitUntilFlushedToCache(cache, key);
174      assertEquals(1, blk.refCnt());
175
176      Cacheable block = cache.getBlock(key, false, false, false);
177      assertTrue(block instanceof HFileBlock);
178      assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
179      assertEquals(2, block.refCnt());
180
181      block.retain();
182      assertEquals(3, block.refCnt());
183
184      Cacheable newBlock = cache.getBlock(key, false, false, false);
185      assertTrue(newBlock instanceof HFileBlock);
186      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
187      assertEquals(4, newBlock.refCnt());
188
189      // release the newBlock
190      assertFalse(newBlock.release());
191      assertEquals(3, newBlock.refCnt());
192      assertEquals(3, block.refCnt());
193
194      // Evict the key
195      cache.evictBlock(key);
196      assertEquals(2, block.refCnt());
197
198      // Evict again, shouldn't change the refCnt.
199      cache.evictBlock(key);
200      assertEquals(2, block.refCnt());
201
202      assertFalse(block.release());
203      assertEquals(1, block.refCnt());
204
205      /**
206       * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
207       * so {@link BucketCache#getBlock} return null.
208       */
209      Cacheable newestBlock = cache.getBlock(key, false, false, false);
210      assertNull(newestBlock);
211      assertEquals(1, block.refCnt());
212      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
213
214      // Release the block
215      assertTrue(block.release());
216      assertEquals(0, block.refCnt());
217      assertEquals(0, newBlock.refCnt());
218    } finally {
219      cache.shutdown();
220    }
221  }
222
223  @Test
224  public void testInBucketCache() throws IOException {
225    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
226    cache = create(1, 1000);
227    try {
228      HFileBlock blk = createBlock(200, 1020, alloc);
229      BlockCacheKey key = createKey("testHFile-00", 200);
230      cache.cacheBlock(key, blk);
231      assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
232
233      // wait for block to move to backing map because refCnt get refreshed once block moves to
234      // backing map
235      Waiter.waitFor(HBaseConfiguration.create(), 12000, () -> isRamCacheDrained(key, cache));
236
237      Cacheable block1 = cache.getBlock(key, false, false, false);
238      assertTrue(block1.refCnt() >= 2);
239      assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc);
240
241      Cacheable block2 = cache.getBlock(key, false, false, false);
242      assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc);
243      assertTrue(block2.refCnt() >= 3);
244
245      cache.evictBlock(key);
246      assertTrue(blk.refCnt() >= 1);
247      assertTrue(block1.refCnt() >= 2);
248      assertTrue(block2.refCnt() >= 2);
249
250      // Get key again
251      Cacheable block3 = cache.getBlock(key, false, false, false);
252      if (block3 != null) {
253        assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc);
254        assertTrue(block3.refCnt() >= 3);
255        assertFalse(block3.release());
256      }
257
258      blk.release();
259      boolean ret1 = block1.release();
260      boolean ret2 = block2.release();
261      assertTrue(ret1 || ret2);
262      assertEquals(0, blk.refCnt());
263      assertEquals(0, block1.refCnt());
264      assertEquals(0, block2.refCnt());
265    } finally {
266      cache.shutdown();
267    }
268  }
269
270  private boolean isRamCacheDrained(BlockCacheKey key, BucketCache cache) {
271    return cache.backingMap.containsKey(key) && !cache.ramCache.containsKey(key);
272  }
273
274  @Test
275  public void testMarkStaleAsEvicted() throws Exception {
276    cache = create(1, 1000);
277    try {
278      HFileBlock blk = createBlock(200, 1020);
279      BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
280      cache.cacheBlock(key, blk);
281      waitUntilFlushedToCache(cache, key);
282      assertEquals(1, blk.refCnt());
283      assertNotNull(cache.backingMap.get(key));
284      assertEquals(1, cache.backingMap.get(key).refCnt());
285
286      // RPC reference this cache.
287      Cacheable block1 = cache.getBlock(key, false, false, false);
288      assertEquals(2, block1.refCnt());
289      BucketEntry be1 = cache.backingMap.get(key);
290      assertNotNull(be1);
291      assertEquals(2, be1.refCnt());
292
293      // We've some RPC reference, so it won't have any effect.
294      assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
295      assertEquals(2, block1.refCnt());
296      assertEquals(2, cache.backingMap.get(key).refCnt());
297
298      // Release the RPC reference.
299      block1.release();
300      assertEquals(1, block1.refCnt());
301      assertEquals(1, cache.backingMap.get(key).refCnt());
302
303      // Mark the stale as evicted again, it'll do the de-allocation.
304      assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
305      assertEquals(0, block1.refCnt());
306      assertNull(cache.backingMap.get(key));
307      assertEquals(0, cache.size());
308    } finally {
309      cache.shutdown();
310    }
311  }
312
313  /**
314   * <pre>
315   * This test is for HBASE-26281,
316   * test two threads for replacing Block and getting Block execute concurrently.
317   * The threads sequence is:
318   * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
319   * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
320   *    {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
321   *    replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
322   * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
323   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
324   * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
325   *    the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
326   *    by Thread2 and the content of Block1 would be overwritten after it is freed, which may
327   *    cause a serious error.
328   * </pre>
329   */
330  @Test
331  public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
332    ByteBuffAllocator byteBuffAllocator =
333      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
334    final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
335    try {
336      HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
337      final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
338      myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
339      waitUntilFlushedToCache(myBucketCache, blockCacheKey);
340      assertEquals(1, hfileBlock.refCnt());
341
342      assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
343      final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
344      Thread cacheBlockThread = new Thread(() -> {
345        try {
346          HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
347          myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
348          waitUntilFlushedToCache(myBucketCache, blockCacheKey);
349
350        } catch (Throwable exception) {
351          exceptionRef.set(exception);
352        }
353      });
354      cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
355      cacheBlockThread.start();
356
357      String oldThreadName = Thread.currentThread().getName();
358      HFileBlock gotHFileBlock = null;
359      try {
360
361        Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
362
363        gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
364        assertTrue(gotHFileBlock.equals(hfileBlock));
365        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
366        assertEquals(2, gotHFileBlock.refCnt());
367        /**
368         * Release the second cyclicBarrier.await in
369         * {@link MyBucketCache#cacheBlockWithWaitInternal}
370         */
371        myBucketCache.cyclicBarrier.await();
372
373      } finally {
374        Thread.currentThread().setName(oldThreadName);
375      }
376
377      cacheBlockThread.join();
378      assertTrue(exceptionRef.get() == null);
379      assertEquals(1, gotHFileBlock.refCnt());
380      assertTrue(gotHFileBlock.equals(hfileBlock));
381      assertTrue(myBucketCache.overwiteByteBuff == null);
382      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
383
384      gotHFileBlock.release();
385      assertEquals(0, gotHFileBlock.refCnt());
386      assertTrue(myBucketCache.overwiteByteBuff != null);
387      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
388      assertTrue(myBucketCache.replaceCounter.get() == 1);
389      assertTrue(myBucketCache.blockEvictCounter.get() == 1);
390    } finally {
391      myBucketCache.shutdown();
392    }
393
394  }
395
396  /**
397   * <pre>
398   * This test also is for HBASE-26281,
399   * test three threads for evicting Block,caching Block and getting Block
400   * execute concurrently.
401   * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
402   *    the {@link RefCnt} of Block1 is 1.
403   * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
404   *    but stopping after {@link BucketCache#removeFromRamCache}.
405   * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
406   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
407   * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
408   *    returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
409   *    directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
410   * </pre>
411   */
412  @Test
413  public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
414    ByteBuffAllocator byteBuffAllocator =
415      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
416    final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
417    try {
418      final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
419      final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
420      final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
421        new AtomicReference<Throwable>();
422      Thread cacheBlockThread = new Thread(() -> {
423        try {
424          myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
425          /**
426           * Wait for Caching Block completed.
427           */
428          myBucketCache2.writeThreadDoneCyclicBarrier.await();
429        } catch (Throwable exception) {
430          cacheBlockThreadExceptionRef.set(exception);
431        }
432      });
433      cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
434      cacheBlockThread.start();
435
436      final AtomicReference<Throwable> evictBlockThreadExceptionRef =
437        new AtomicReference<Throwable>();
438      Thread evictBlockThread = new Thread(() -> {
439        try {
440          myBucketCache2.evictBlock(blockCacheKey);
441        } catch (Throwable exception) {
442          evictBlockThreadExceptionRef.set(exception);
443        }
444      });
445      evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
446      evictBlockThread.start();
447
448      String oldThreadName = Thread.currentThread().getName();
449      HFileBlock gotHFileBlock = null;
450      try {
451        Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
452        gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
453        assertTrue(gotHFileBlock.equals(hfileBlock));
454        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
455        assertEquals(2, gotHFileBlock.refCnt());
456        try {
457          /**
458           * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
459           * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
460           * could continue.
461           */
462          myBucketCache2.putCyclicBarrier.await();
463        } catch (Throwable e) {
464          throw new RuntimeException(e);
465        }
466
467      } finally {
468        Thread.currentThread().setName(oldThreadName);
469      }
470
471      cacheBlockThread.join();
472      evictBlockThread.join();
473      assertTrue(cacheBlockThreadExceptionRef.get() == null);
474      assertTrue(evictBlockThreadExceptionRef.get() == null);
475
476      assertTrue(gotHFileBlock.equals(hfileBlock));
477      assertEquals(1, gotHFileBlock.refCnt());
478      assertTrue(myBucketCache2.overwiteByteBuff == null);
479      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
480
481      gotHFileBlock.release();
482      assertEquals(0, gotHFileBlock.refCnt());
483      assertTrue(myBucketCache2.overwiteByteBuff != null);
484      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
485      assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
486    } finally {
487      myBucketCache2.shutdown();
488    }
489
490  }
491
492  static class MyBucketCache extends BucketCache {
493    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
494    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
495
496    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
497    private final AtomicInteger replaceCounter = new AtomicInteger(0);
498    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
499    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
500    private ByteBuff overwiteByteBuff = null;
501
502    public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
503      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
504      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
505        persistencePath);
506    }
507
508    /**
509     * Simulate the Block could be replaced.
510     */
511    @Override
512    protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
513      replaceCounter.incrementAndGet();
514      return true;
515    }
516
517    @Override
518    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
519      boolean updateCacheMetrics) {
520      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
521        /**
522         * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
523         * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
524         * checking.
525         */
526        try {
527          cyclicBarrier.await();
528        } catch (Throwable e) {
529          throw new RuntimeException(e);
530        }
531      }
532      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
533      return result;
534    }
535
536    @Override
537    protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
538      boolean inMemory, boolean wait) {
539      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
540        /**
541         * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
542         */
543        try {
544          cyclicBarrier.await();
545        } catch (Throwable e) {
546          throw new RuntimeException(e);
547        }
548      }
549      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
550        /**
551         * Wait the cyclicBarrier.await() in
552         * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
553         * {@link MyBucketCache#getBlock} and Assert completed.
554         */
555        try {
556          cyclicBarrier.await();
557        } catch (Throwable e) {
558          throw new RuntimeException(e);
559        }
560      }
561      super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
562    }
563
564    @Override
565    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
566      boolean evictedByEvictionProcess) {
567      blockEvictCounter.incrementAndGet();
568      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
569    }
570
571    /**
572     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
573     * {@link BucketEntry} is freed.
574     */
575    @Override
576    void freeBucketEntry(BucketEntry bucketEntry) {
577      freeBucketEntryCounter.incrementAndGet();
578      super.freeBucketEntry(bucketEntry);
579      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
580      try {
581        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
582      } catch (IOException e) {
583        throw new RuntimeException(e);
584      }
585    }
586  }
587
588  static class MyBucketCache2 extends BucketCache {
589    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
590    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
591    private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
592
593    private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
594    private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
595    private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
596    /**
597     * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
598     * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
599     */
600    private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
601    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
602    private final AtomicInteger removeRamCounter = new AtomicInteger(0);
603    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
604    private ByteBuff overwiteByteBuff = null;
605
606    public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
607      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
608      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
609        persistencePath);
610    }
611
612    @Override
613    protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
614      super.putIntoBackingMap(key, bucketEntry);
615      /**
616       * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
617       * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
618       */
619      try {
620        evictCyclicBarrier.await();
621      } catch (Throwable e) {
622        throw new RuntimeException(e);
623      }
624
625      /**
626       * Wait the cyclicBarrier.await() in
627       * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
628       * {@link MyBucketCache#getBlock} and Assert completed.
629       */
630      try {
631        putCyclicBarrier.await();
632      } catch (Throwable e) {
633        throw new RuntimeException(e);
634      }
635    }
636
637    @Override
638    void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
639      super.doDrain(entries, metaBuff);
640      if (entries.size() > 0) {
641        /**
642         * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
643         * {@link #EVICT_BLOCK_THREAD_NAME}.
644         */
645        try {
646          writeThreadDoneCyclicBarrier.await();
647        } catch (Throwable e) {
648          throw new RuntimeException(e);
649        }
650      }
651
652    }
653
654    @Override
655    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
656      boolean updateCacheMetrics) {
657      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
658        /**
659         * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
660         * {@link BucketCache#removeFromRamCache}.
661         */
662        try {
663          getCyclicBarrier.await();
664        } catch (Throwable e) {
665          throw new RuntimeException(e);
666        }
667      }
668      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
669      return result;
670    }
671
672    @Override
673    protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
674      boolean firstTime = false;
675      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
676        int count = this.removeRamCounter.incrementAndGet();
677        firstTime = (count == 1);
678        if (firstTime) {
679          /**
680           * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
681           * {@link BucketCache#putIntoBackingMap}.
682           */
683          try {
684            evictCyclicBarrier.await();
685          } catch (Throwable e) {
686            throw new RuntimeException(e);
687          }
688        }
689      }
690      boolean result = super.removeFromRamCache(cacheKey);
691      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
692        if (firstTime) {
693          /**
694           * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
695           */
696          try {
697            getCyclicBarrier.await();
698          } catch (Throwable e) {
699            throw new RuntimeException(e);
700          }
701          /**
702           * Wait for Caching Block completed, after Caching Block completed, evictBlock could
703           * continue.
704           */
705          try {
706            writeThreadDoneCyclicBarrier.await();
707          } catch (Throwable e) {
708            throw new RuntimeException(e);
709          }
710        }
711      }
712
713      return result;
714    }
715
716    @Override
717    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
718      boolean evictedByEvictionProcess) {
719      /**
720       * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
721       * only one {@link BucketCache.WriterThread}.
722       */
723      assertTrue(Thread.currentThread() == this.writerThreads[0]);
724
725      blockEvictCounter.incrementAndGet();
726      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
727    }
728
729    /**
730     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
731     * {@link BucketEntry} is freed.
732     */
733    @Override
734    void freeBucketEntry(BucketEntry bucketEntry) {
735      freeBucketEntryCounter.incrementAndGet();
736      super.freeBucketEntry(bucketEntry);
737      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
738      try {
739        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
740      } catch (IOException e) {
741        throw new RuntimeException(e);
742      }
743    }
744  }
745
746  private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
747    int byteSize = bucketEntry.getLength();
748    byte[] data = new byte[byteSize];
749    Arrays.fill(data, (byte) 0xff);
750    return ByteBuff.wrap(ByteBuffer.wrap(data));
751  }
752}