001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.CyclicBarrier;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.io.ByteBuffAllocator;
036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
038import org.apache.hadoop.hbase.io.hfile.BlockType;
039import org.apache.hadoop.hbase.io.hfile.Cacheable;
040import org.apache.hadoop.hbase.io.hfile.HFileBlock;
041import org.apache.hadoop.hbase.io.hfile.HFileContext;
042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.nio.RefCnt;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.Test;
050
051@Tag(IOTests.TAG)
052@Tag(SmallTests.TAG)
053public class TestBucketCacheRefCnt {
054
055  private static final String IO_ENGINE = "offheap";
056  private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
057  private static final int BLOCK_SIZE = 1024;
058  private static final int[] BLOCK_SIZE_ARRAY =
059    new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
060  private static final String PERSISTENCE_PATH = null;
061  private static final HFileContext CONTEXT = new HFileContextBuilder().build();
062
063  private BucketCache cache;
064
065  private static BucketCache create(int writerSize, int queueSize) throws IOException {
066    return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
067      queueSize, PERSISTENCE_PATH);
068  }
069
070  private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
071    throws IOException {
072    return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
073      queueSize, PERSISTENCE_PATH);
074  }
075
076  private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
077    throws IOException {
078    return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
079      queueSize, PERSISTENCE_PATH);
080  }
081
082  private static HFileBlock createBlock(int offset, int size) {
083    return createBlock(offset, size, ByteBuffAllocator.HEAP);
084  }
085
086  private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
087    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
088      HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
089  }
090
091  private static BlockCacheKey createKey(String hfileName, long offset) {
092    return new BlockCacheKey(hfileName, offset);
093  }
094
095  private void disableWriter() {
096    if (cache != null) {
097      for (WriterThread wt : cache.writerThreads) {
098        wt.disableWriter();
099        wt.interrupt();
100      }
101    }
102  }
103
104  @org.junit.Ignore
105  @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082
106  // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
107  public void testBlockInRAMCache() throws IOException {
108    cache = create(1, 1000);
109    disableWriter();
110    final String prefix = "testBlockInRamCache";
111    try {
112      for (int i = 0; i < 10; i++) {
113        HFileBlock blk = createBlock(i, 1020);
114        BlockCacheKey key = createKey(prefix, i);
115        assertEquals(1, blk.refCnt());
116        cache.cacheBlock(key, blk);
117        assertEquals(i + 1, cache.getBlockCount());
118        assertEquals(2, blk.refCnt());
119
120        Cacheable block = cache.getBlock(key, false, false, false);
121        try {
122          assertEquals(3, blk.refCnt());
123          assertEquals(3, block.refCnt());
124          assertEquals(blk, block);
125        } finally {
126          block.release();
127        }
128        assertEquals(2, blk.refCnt());
129        assertEquals(2, block.refCnt());
130      }
131
132      for (int i = 0; i < 10; i++) {
133        BlockCacheKey key = createKey(prefix, i);
134        Cacheable blk = cache.getBlock(key, false, false, false);
135        assertEquals(3, blk.refCnt());
136        assertFalse(blk.release());
137        assertEquals(2, blk.refCnt());
138
139        assertTrue(cache.evictBlock(key));
140        assertEquals(1, blk.refCnt());
141        assertTrue(blk.release());
142        assertEquals(0, blk.refCnt());
143      }
144    } finally {
145      cache.shutdown();
146    }
147  }
148
149  private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
150    throws InterruptedException {
151    while (
152      !bucketCache.backingMap.containsKey(blockCacheKey)
153        || bucketCache.ramCache.containsKey(blockCacheKey)
154    ) {
155      Thread.sleep(100);
156    }
157    Thread.sleep(1000);
158  }
159
160  @Test
161  public void testBlockInBackingMap() throws Exception {
162    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
163    cache = create(1, 1000);
164    try {
165      HFileBlock blk = createBlock(200, 1020, alloc);
166      BlockCacheKey key = createKey("testHFile-00", 200);
167      cache.cacheBlock(key, blk);
168      waitUntilFlushedToCache(cache, key);
169      assertEquals(1, blk.refCnt());
170
171      Cacheable block = cache.getBlock(key, false, false, false);
172      assertTrue(block instanceof HFileBlock);
173      assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
174      assertEquals(2, block.refCnt());
175
176      block.retain();
177      assertEquals(3, block.refCnt());
178
179      Cacheable newBlock = cache.getBlock(key, false, false, false);
180      assertTrue(newBlock instanceof HFileBlock);
181      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
182      assertEquals(4, newBlock.refCnt());
183
184      // release the newBlock
185      assertFalse(newBlock.release());
186      assertEquals(3, newBlock.refCnt());
187      assertEquals(3, block.refCnt());
188
189      // Evict the key
190      cache.evictBlock(key);
191      assertEquals(2, block.refCnt());
192
193      // Evict again, shouldn't change the refCnt.
194      cache.evictBlock(key);
195      assertEquals(2, block.refCnt());
196
197      assertFalse(block.release());
198      assertEquals(1, block.refCnt());
199
200      /**
201       * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
202       * so {@link BucketCache#getBlock} return null.
203       */
204      Cacheable newestBlock = cache.getBlock(key, false, false, false);
205      assertNull(newestBlock);
206      assertEquals(1, block.refCnt());
207      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
208
209      // Release the block
210      assertTrue(block.release());
211      assertEquals(0, block.refCnt());
212      assertEquals(0, newBlock.refCnt());
213    } finally {
214      cache.shutdown();
215    }
216  }
217
218  @Test
219  public void testInBucketCache() throws IOException {
220    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
221    cache = create(1, 1000);
222    try {
223      HFileBlock blk = createBlock(200, 1020, alloc);
224      BlockCacheKey key = createKey("testHFile-00", 200);
225      cache.cacheBlock(key, blk);
226      assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
227
228      // wait for block to move to backing map because refCnt get refreshed once block moves to
229      // backing map
230      Waiter.waitFor(HBaseConfiguration.create(), 12000, () -> isRamCacheDrained(key, cache));
231
232      Cacheable block1 = cache.getBlock(key, false, false, false);
233      assertTrue(block1.refCnt() >= 2);
234      assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc);
235
236      Cacheable block2 = cache.getBlock(key, false, false, false);
237      assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc);
238      assertTrue(block2.refCnt() >= 3);
239
240      cache.evictBlock(key);
241      assertTrue(blk.refCnt() >= 1);
242      assertTrue(block1.refCnt() >= 2);
243      assertTrue(block2.refCnt() >= 2);
244
245      // Get key again
246      Cacheable block3 = cache.getBlock(key, false, false, false);
247      if (block3 != null) {
248        assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc);
249        assertTrue(block3.refCnt() >= 3);
250        assertFalse(block3.release());
251      }
252
253      blk.release();
254      boolean ret1 = block1.release();
255      boolean ret2 = block2.release();
256      assertTrue(ret1 || ret2);
257      assertEquals(0, blk.refCnt());
258      assertEquals(0, block1.refCnt());
259      assertEquals(0, block2.refCnt());
260    } finally {
261      cache.shutdown();
262    }
263  }
264
265  private boolean isRamCacheDrained(BlockCacheKey key, BucketCache cache) {
266    return cache.backingMap.containsKey(key) && !cache.ramCache.containsKey(key);
267  }
268
269  @Test
270  public void testMarkStaleAsEvicted() throws Exception {
271    cache = create(1, 1000);
272    try {
273      HFileBlock blk = createBlock(200, 1020);
274      BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
275      cache.cacheBlock(key, blk);
276      waitUntilFlushedToCache(cache, key);
277      assertEquals(1, blk.refCnt());
278      assertNotNull(cache.backingMap.get(key));
279      assertEquals(1, cache.backingMap.get(key).refCnt());
280
281      // RPC reference this cache.
282      Cacheable block1 = cache.getBlock(key, false, false, false);
283      assertEquals(2, block1.refCnt());
284      BucketEntry be1 = cache.backingMap.get(key);
285      assertNotNull(be1);
286      assertEquals(2, be1.refCnt());
287
288      // We've some RPC reference, so it won't have any effect.
289      assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
290      assertEquals(2, block1.refCnt());
291      assertEquals(2, cache.backingMap.get(key).refCnt());
292
293      // Release the RPC reference.
294      block1.release();
295      assertEquals(1, block1.refCnt());
296      assertEquals(1, cache.backingMap.get(key).refCnt());
297
298      // Mark the stale as evicted again, it'll do the de-allocation.
299      assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
300      assertEquals(0, block1.refCnt());
301      assertNull(cache.backingMap.get(key));
302      assertEquals(0, cache.size());
303    } finally {
304      cache.shutdown();
305    }
306  }
307
308  /**
309   * <pre>
310   * This test is for HBASE-26281,
311   * test two threads for replacing Block and getting Block execute concurrently.
312   * The threads sequence is:
313   * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
314   * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
315   *    {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
316   *    replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
317   * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
318   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
319   * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
320   *    the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
321   *    by Thread2 and the content of Block1 would be overwritten after it is freed, which may
322   *    cause a serious error.
323   * </pre>
324   */
325  @Test
326  public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
327    ByteBuffAllocator byteBuffAllocator =
328      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
329    final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
330    try {
331      HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
332      final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
333      myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
334      waitUntilFlushedToCache(myBucketCache, blockCacheKey);
335      assertEquals(1, hfileBlock.refCnt());
336
337      assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
338      final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
339      Thread cacheBlockThread = new Thread(() -> {
340        try {
341          HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
342          myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
343          waitUntilFlushedToCache(myBucketCache, blockCacheKey);
344
345        } catch (Throwable exception) {
346          exceptionRef.set(exception);
347        }
348      });
349      cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
350      cacheBlockThread.start();
351
352      String oldThreadName = Thread.currentThread().getName();
353      HFileBlock gotHFileBlock = null;
354      try {
355
356        Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
357
358        gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
359        assertTrue(gotHFileBlock.equals(hfileBlock));
360        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
361        assertEquals(2, gotHFileBlock.refCnt());
362        /**
363         * Release the second cyclicBarrier.await in
364         * {@link MyBucketCache#cacheBlockWithWaitInternal}
365         */
366        myBucketCache.cyclicBarrier.await();
367
368      } finally {
369        Thread.currentThread().setName(oldThreadName);
370      }
371
372      cacheBlockThread.join();
373      assertTrue(exceptionRef.get() == null);
374      assertEquals(1, gotHFileBlock.refCnt());
375      assertTrue(gotHFileBlock.equals(hfileBlock));
376      assertTrue(myBucketCache.overwiteByteBuff == null);
377      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
378
379      gotHFileBlock.release();
380      assertEquals(0, gotHFileBlock.refCnt());
381      assertTrue(myBucketCache.overwiteByteBuff != null);
382      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
383      assertTrue(myBucketCache.replaceCounter.get() == 1);
384      assertTrue(myBucketCache.blockEvictCounter.get() == 1);
385    } finally {
386      myBucketCache.shutdown();
387    }
388
389  }
390
391  /**
392   * <pre>
393   * This test also is for HBASE-26281,
394   * test three threads for evicting Block,caching Block and getting Block
395   * execute concurrently.
396   * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
397   *    the {@link RefCnt} of Block1 is 1.
398   * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
399   *    but stopping after {@link BucketCache#removeFromRamCache}.
400   * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
401   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
402   * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
403   *    returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
404   *    directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
405   * </pre>
406   */
407  @Test
408  public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
409    ByteBuffAllocator byteBuffAllocator =
410      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
411    final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
412    try {
413      final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
414      final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
415      final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
416        new AtomicReference<Throwable>();
417      Thread cacheBlockThread = new Thread(() -> {
418        try {
419          myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
420          /**
421           * Wait for Caching Block completed.
422           */
423          myBucketCache2.writeThreadDoneCyclicBarrier.await();
424        } catch (Throwable exception) {
425          cacheBlockThreadExceptionRef.set(exception);
426        }
427      });
428      cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
429      cacheBlockThread.start();
430
431      final AtomicReference<Throwable> evictBlockThreadExceptionRef =
432        new AtomicReference<Throwable>();
433      Thread evictBlockThread = new Thread(() -> {
434        try {
435          myBucketCache2.evictBlock(blockCacheKey);
436        } catch (Throwable exception) {
437          evictBlockThreadExceptionRef.set(exception);
438        }
439      });
440      evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
441      evictBlockThread.start();
442
443      String oldThreadName = Thread.currentThread().getName();
444      HFileBlock gotHFileBlock = null;
445      try {
446        Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
447        gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
448        assertTrue(gotHFileBlock.equals(hfileBlock));
449        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
450        assertEquals(2, gotHFileBlock.refCnt());
451        try {
452          /**
453           * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
454           * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
455           * could continue.
456           */
457          myBucketCache2.putCyclicBarrier.await();
458        } catch (Throwable e) {
459          throw new RuntimeException(e);
460        }
461
462      } finally {
463        Thread.currentThread().setName(oldThreadName);
464      }
465
466      cacheBlockThread.join();
467      evictBlockThread.join();
468      assertTrue(cacheBlockThreadExceptionRef.get() == null);
469      assertTrue(evictBlockThreadExceptionRef.get() == null);
470
471      assertTrue(gotHFileBlock.equals(hfileBlock));
472      assertEquals(1, gotHFileBlock.refCnt());
473      assertTrue(myBucketCache2.overwiteByteBuff == null);
474      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
475
476      gotHFileBlock.release();
477      assertEquals(0, gotHFileBlock.refCnt());
478      assertTrue(myBucketCache2.overwiteByteBuff != null);
479      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
480      assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
481    } finally {
482      myBucketCache2.shutdown();
483    }
484
485  }
486
487  static class MyBucketCache extends BucketCache {
488    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
489    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
490
491    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
492    private final AtomicInteger replaceCounter = new AtomicInteger(0);
493    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
494    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
495    private ByteBuff overwiteByteBuff = null;
496
497    public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
498      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
499      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
500        persistencePath);
501    }
502
503    /**
504     * Simulate the Block could be replaced.
505     */
506    @Override
507    protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
508      replaceCounter.incrementAndGet();
509      return true;
510    }
511
512    @Override
513    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
514      boolean updateCacheMetrics) {
515      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
516        /**
517         * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
518         * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
519         * checking.
520         */
521        try {
522          cyclicBarrier.await();
523        } catch (Throwable e) {
524          throw new RuntimeException(e);
525        }
526      }
527      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
528      return result;
529    }
530
531    @Override
532    protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
533      boolean inMemory, boolean wait) {
534      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
535        /**
536         * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
537         */
538        try {
539          cyclicBarrier.await();
540        } catch (Throwable e) {
541          throw new RuntimeException(e);
542        }
543      }
544      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
545        /**
546         * Wait the cyclicBarrier.await() in
547         * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
548         * {@link MyBucketCache#getBlock} and Assert completed.
549         */
550        try {
551          cyclicBarrier.await();
552        } catch (Throwable e) {
553          throw new RuntimeException(e);
554        }
555      }
556      super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
557    }
558
559    @Override
560    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
561      boolean evictedByEvictionProcess) {
562      blockEvictCounter.incrementAndGet();
563      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
564    }
565
566    /**
567     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
568     * {@link BucketEntry} is freed.
569     */
570    @Override
571    void freeBucketEntry(BucketEntry bucketEntry) {
572      freeBucketEntryCounter.incrementAndGet();
573      super.freeBucketEntry(bucketEntry);
574      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
575      try {
576        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
577      } catch (IOException e) {
578        throw new RuntimeException(e);
579      }
580    }
581  }
582
583  static class MyBucketCache2 extends BucketCache {
584    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
585    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
586    private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
587
588    private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
589    private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
590    private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
591    /**
592     * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
593     * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
594     */
595    private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
596    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
597    private final AtomicInteger removeRamCounter = new AtomicInteger(0);
598    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
599    private ByteBuff overwiteByteBuff = null;
600
601    public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
602      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
603      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
604        persistencePath);
605    }
606
607    @Override
608    protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
609      super.putIntoBackingMap(key, bucketEntry);
610      /**
611       * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
612       * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
613       */
614      try {
615        evictCyclicBarrier.await();
616      } catch (Throwable e) {
617        throw new RuntimeException(e);
618      }
619
620      /**
621       * Wait the cyclicBarrier.await() in
622       * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
623       * {@link MyBucketCache#getBlock} and Assert completed.
624       */
625      try {
626        putCyclicBarrier.await();
627      } catch (Throwable e) {
628        throw new RuntimeException(e);
629      }
630    }
631
632    @Override
633    void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
634      super.doDrain(entries, metaBuff);
635      if (entries.size() > 0) {
636        /**
637         * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
638         * {@link #EVICT_BLOCK_THREAD_NAME}.
639         */
640        try {
641          writeThreadDoneCyclicBarrier.await();
642        } catch (Throwable e) {
643          throw new RuntimeException(e);
644        }
645      }
646
647    }
648
649    @Override
650    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
651      boolean updateCacheMetrics) {
652      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
653        /**
654         * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
655         * {@link BucketCache#removeFromRamCache}.
656         */
657        try {
658          getCyclicBarrier.await();
659        } catch (Throwable e) {
660          throw new RuntimeException(e);
661        }
662      }
663      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
664      return result;
665    }
666
667    @Override
668    protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
669      boolean firstTime = false;
670      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
671        int count = this.removeRamCounter.incrementAndGet();
672        firstTime = (count == 1);
673        if (firstTime) {
674          /**
675           * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
676           * {@link BucketCache#putIntoBackingMap}.
677           */
678          try {
679            evictCyclicBarrier.await();
680          } catch (Throwable e) {
681            throw new RuntimeException(e);
682          }
683        }
684      }
685      boolean result = super.removeFromRamCache(cacheKey);
686      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
687        if (firstTime) {
688          /**
689           * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
690           */
691          try {
692            getCyclicBarrier.await();
693          } catch (Throwable e) {
694            throw new RuntimeException(e);
695          }
696          /**
697           * Wait for Caching Block completed, after Caching Block completed, evictBlock could
698           * continue.
699           */
700          try {
701            writeThreadDoneCyclicBarrier.await();
702          } catch (Throwable e) {
703            throw new RuntimeException(e);
704          }
705        }
706      }
707
708      return result;
709    }
710
711    @Override
712    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
713      boolean evictedByEvictionProcess) {
714      /**
715       * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
716       * only one {@link BucketCache.WriterThread}.
717       */
718      assertTrue(Thread.currentThread() == this.writerThreads[0]);
719
720      blockEvictCounter.incrementAndGet();
721      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
722    }
723
724    /**
725     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
726     * {@link BucketEntry} is freed.
727     */
728    @Override
729    void freeBucketEntry(BucketEntry bucketEntry) {
730      freeBucketEntryCounter.incrementAndGet();
731      super.freeBucketEntry(bucketEntry);
732      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
733      try {
734        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
735      } catch (IOException e) {
736        throw new RuntimeException(e);
737      }
738    }
739  }
740
741  private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
742    int byteSize = bucketEntry.getLength();
743    byte[] data = new byte[byteSize];
744    Arrays.fill(data, (byte) 0xff);
745    return ByteBuff.wrap(ByteBuffer.wrap(data));
746  }
747}