001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.CyclicBarrier;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.io.ByteBuffAllocator;
036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
038import org.apache.hadoop.hbase.io.hfile.BlockType;
039import org.apache.hadoop.hbase.io.hfile.Cacheable;
040import org.apache.hadoop.hbase.io.hfile.HFileBlock;
041import org.apache.hadoop.hbase.io.hfile.HFileContext;
042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.nio.RefCnt;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.junit.jupiter.api.Disabled;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051
052@Tag(IOTests.TAG)
053@Tag(SmallTests.TAG)
054public class TestBucketCacheRefCnt {
055
056  private static final String IO_ENGINE = "offheap";
057  private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
058  private static final int BLOCK_SIZE = 1024;
059  private static final int[] BLOCK_SIZE_ARRAY =
060    new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
061  private static final String PERSISTENCE_PATH = null;
062  private static final HFileContext CONTEXT = new HFileContextBuilder().build();
063
064  private BucketCache cache;
065
066  private static BucketCache create(int writerSize, int queueSize) throws IOException {
067    return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
068      queueSize, PERSISTENCE_PATH);
069  }
070
071  private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
072    throws IOException {
073    return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
074      queueSize, PERSISTENCE_PATH);
075  }
076
077  private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
078    throws IOException {
079    return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
080      queueSize, PERSISTENCE_PATH);
081  }
082
083  private static HFileBlock createBlock(int offset, int size) {
084    return createBlock(offset, size, ByteBuffAllocator.HEAP);
085  }
086
087  private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
088    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
089      HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
090  }
091
092  private static BlockCacheKey createKey(String hfileName, long offset) {
093    return new BlockCacheKey(hfileName, offset);
094  }
095
096  private void disableWriter() {
097    if (cache != null) {
098      for (WriterThread wt : cache.writerThreads) {
099        wt.disableWriter();
100        wt.interrupt();
101      }
102    }
103  }
104
105  @Disabled
106  @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082
107  // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
108  public void testBlockInRAMCache() throws IOException {
109    cache = create(1, 1000);
110    disableWriter();
111    final String prefix = "testBlockInRamCache";
112    try {
113      for (int i = 0; i < 10; i++) {
114        HFileBlock blk = createBlock(i, 1020);
115        BlockCacheKey key = createKey(prefix, i);
116        assertEquals(1, blk.refCnt());
117        cache.cacheBlock(key, blk);
118        assertEquals(i + 1, cache.getBlockCount());
119        assertEquals(2, blk.refCnt());
120
121        Cacheable block = cache.getBlock(key, false, false, false);
122        try {
123          assertEquals(3, blk.refCnt());
124          assertEquals(3, block.refCnt());
125          assertEquals(blk, block);
126        } finally {
127          block.release();
128        }
129        assertEquals(2, blk.refCnt());
130        assertEquals(2, block.refCnt());
131      }
132
133      for (int i = 0; i < 10; i++) {
134        BlockCacheKey key = createKey(prefix, i);
135        Cacheable blk = cache.getBlock(key, false, false, false);
136        assertEquals(3, blk.refCnt());
137        assertFalse(blk.release());
138        assertEquals(2, blk.refCnt());
139
140        assertTrue(cache.evictBlock(key));
141        assertEquals(1, blk.refCnt());
142        assertTrue(blk.release());
143        assertEquals(0, blk.refCnt());
144      }
145    } finally {
146      cache.shutdown();
147    }
148  }
149
150  private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
151    throws InterruptedException {
152    while (
153      !bucketCache.backingMap.containsKey(blockCacheKey)
154        || bucketCache.ramCache.containsKey(blockCacheKey)
155    ) {
156      Thread.sleep(100);
157    }
158    Thread.sleep(1000);
159  }
160
161  @Test
162  public void testBlockInBackingMap() throws Exception {
163    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
164    cache = create(1, 1000);
165    try {
166      HFileBlock blk = createBlock(200, 1020, alloc);
167      BlockCacheKey key = createKey("testHFile-00", 200);
168      cache.cacheBlock(key, blk);
169      waitUntilFlushedToCache(cache, key);
170      assertEquals(1, blk.refCnt());
171
172      Cacheable block = cache.getBlock(key, false, false, false);
173      assertTrue(block instanceof HFileBlock);
174      assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
175      assertEquals(2, block.refCnt());
176
177      block.retain();
178      assertEquals(3, block.refCnt());
179
180      Cacheable newBlock = cache.getBlock(key, false, false, false);
181      assertTrue(newBlock instanceof HFileBlock);
182      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
183      assertEquals(4, newBlock.refCnt());
184
185      // release the newBlock
186      assertFalse(newBlock.release());
187      assertEquals(3, newBlock.refCnt());
188      assertEquals(3, block.refCnt());
189
190      // Evict the key
191      cache.evictBlock(key);
192      assertEquals(2, block.refCnt());
193
194      // Evict again, shouldn't change the refCnt.
195      cache.evictBlock(key);
196      assertEquals(2, block.refCnt());
197
198      assertFalse(block.release());
199      assertEquals(1, block.refCnt());
200
201      /**
202       * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
203       * so {@link BucketCache#getBlock} return null.
204       */
205      Cacheable newestBlock = cache.getBlock(key, false, false, false);
206      assertNull(newestBlock);
207      assertEquals(1, block.refCnt());
208      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
209
210      // Release the block
211      assertTrue(block.release());
212      assertEquals(0, block.refCnt());
213      assertEquals(0, newBlock.refCnt());
214    } finally {
215      cache.shutdown();
216    }
217  }
218
219  @Test
220  public void testInBucketCache() throws IOException {
221    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
222    cache = create(1, 1000);
223    try {
224      HFileBlock blk = createBlock(200, 1020, alloc);
225      BlockCacheKey key = createKey("testHFile-00", 200);
226      cache.cacheBlock(key, blk);
227      assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
228
229      // wait for block to move to backing map because refCnt get refreshed once block moves to
230      // backing map
231      Waiter.waitFor(HBaseConfiguration.create(), 12000, () -> isRamCacheDrained(key, cache));
232
233      Cacheable block1 = cache.getBlock(key, false, false, false);
234      assertTrue(block1.refCnt() >= 2);
235      assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc);
236
237      Cacheable block2 = cache.getBlock(key, false, false, false);
238      assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc);
239      assertTrue(block2.refCnt() >= 3);
240
241      cache.evictBlock(key);
242      assertTrue(blk.refCnt() >= 1);
243      assertTrue(block1.refCnt() >= 2);
244      assertTrue(block2.refCnt() >= 2);
245
246      // Get key again
247      Cacheable block3 = cache.getBlock(key, false, false, false);
248      if (block3 != null) {
249        assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc);
250        assertTrue(block3.refCnt() >= 3);
251        assertFalse(block3.release());
252      }
253
254      blk.release();
255      boolean ret1 = block1.release();
256      boolean ret2 = block2.release();
257      assertTrue(ret1 || ret2);
258      assertEquals(0, blk.refCnt());
259      assertEquals(0, block1.refCnt());
260      assertEquals(0, block2.refCnt());
261    } finally {
262      cache.shutdown();
263    }
264  }
265
266  private boolean isRamCacheDrained(BlockCacheKey key, BucketCache cache) {
267    return cache.backingMap.containsKey(key) && !cache.ramCache.containsKey(key);
268  }
269
270  @Test
271  public void testMarkStaleAsEvicted() throws Exception {
272    cache = create(1, 1000);
273    try {
274      HFileBlock blk = createBlock(200, 1020);
275      BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
276      cache.cacheBlock(key, blk);
277      waitUntilFlushedToCache(cache, key);
278      assertEquals(1, blk.refCnt());
279      assertNotNull(cache.backingMap.get(key));
280      assertEquals(1, cache.backingMap.get(key).refCnt());
281
282      // RPC reference this cache.
283      Cacheable block1 = cache.getBlock(key, false, false, false);
284      assertEquals(2, block1.refCnt());
285      BucketEntry be1 = cache.backingMap.get(key);
286      assertNotNull(be1);
287      assertEquals(2, be1.refCnt());
288
289      // We've some RPC reference, so it won't have any effect.
290      assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
291      assertEquals(2, block1.refCnt());
292      assertEquals(2, cache.backingMap.get(key).refCnt());
293
294      // Release the RPC reference.
295      block1.release();
296      assertEquals(1, block1.refCnt());
297      assertEquals(1, cache.backingMap.get(key).refCnt());
298
299      // Mark the stale as evicted again, it'll do the de-allocation.
300      assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
301      assertEquals(0, block1.refCnt());
302      assertNull(cache.backingMap.get(key));
303      assertEquals(0, cache.size());
304    } finally {
305      cache.shutdown();
306    }
307  }
308
309  /**
310   * <pre>
311   * This test is for HBASE-26281,
312   * test two threads for replacing Block and getting Block execute concurrently.
313   * The threads sequence is:
314   * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
315   * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
316   *    {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
317   *    replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
318   * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
319   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
320   * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
321   *    the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
322   *    by Thread2 and the content of Block1 would be overwritten after it is freed, which may
323   *    cause a serious error.
324   * </pre>
325   */
326  @Test
327  public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
328    ByteBuffAllocator byteBuffAllocator =
329      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
330    final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
331    try {
332      HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
333      final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
334      myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
335      waitUntilFlushedToCache(myBucketCache, blockCacheKey);
336      assertEquals(1, hfileBlock.refCnt());
337
338      assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
339      final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
340      Thread cacheBlockThread = new Thread(() -> {
341        try {
342          HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
343          myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
344          waitUntilFlushedToCache(myBucketCache, blockCacheKey);
345
346        } catch (Throwable exception) {
347          exceptionRef.set(exception);
348        }
349      });
350      cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
351      cacheBlockThread.start();
352
353      String oldThreadName = Thread.currentThread().getName();
354      HFileBlock gotHFileBlock = null;
355      try {
356
357        Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
358
359        gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
360        assertTrue(gotHFileBlock.equals(hfileBlock));
361        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
362        assertEquals(2, gotHFileBlock.refCnt());
363        /**
364         * Release the second cyclicBarrier.await in
365         * {@link MyBucketCache#cacheBlockWithWaitInternal}
366         */
367        myBucketCache.cyclicBarrier.await();
368
369      } finally {
370        Thread.currentThread().setName(oldThreadName);
371      }
372
373      cacheBlockThread.join();
374      assertTrue(exceptionRef.get() == null);
375      assertEquals(1, gotHFileBlock.refCnt());
376      assertTrue(gotHFileBlock.equals(hfileBlock));
377      assertTrue(myBucketCache.overwiteByteBuff == null);
378      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
379
380      gotHFileBlock.release();
381      assertEquals(0, gotHFileBlock.refCnt());
382      assertTrue(myBucketCache.overwiteByteBuff != null);
383      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
384      assertTrue(myBucketCache.replaceCounter.get() == 1);
385      assertTrue(myBucketCache.blockEvictCounter.get() == 1);
386    } finally {
387      myBucketCache.shutdown();
388    }
389
390  }
391
392  /**
393   * <pre>
394   * This test also is for HBASE-26281,
395   * test three threads for evicting Block,caching Block and getting Block
396   * execute concurrently.
397   * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
398   *    the {@link RefCnt} of Block1 is 1.
399   * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
400   *    but stopping after {@link BucketCache#removeFromRamCache}.
401   * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
402   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
403   * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
404   *    returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
405   *    directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
406   * </pre>
407   */
408  @Test
409  public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
410    ByteBuffAllocator byteBuffAllocator =
411      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
412    final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
413    try {
414      final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
415      final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
416      final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
417        new AtomicReference<Throwable>();
418      Thread cacheBlockThread = new Thread(() -> {
419        try {
420          myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
421          /**
422           * Wait for Caching Block completed.
423           */
424          myBucketCache2.writeThreadDoneCyclicBarrier.await();
425        } catch (Throwable exception) {
426          cacheBlockThreadExceptionRef.set(exception);
427        }
428      });
429      cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
430      cacheBlockThread.start();
431
432      final AtomicReference<Throwable> evictBlockThreadExceptionRef =
433        new AtomicReference<Throwable>();
434      Thread evictBlockThread = new Thread(() -> {
435        try {
436          myBucketCache2.evictBlock(blockCacheKey);
437        } catch (Throwable exception) {
438          evictBlockThreadExceptionRef.set(exception);
439        }
440      });
441      evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
442      evictBlockThread.start();
443
444      String oldThreadName = Thread.currentThread().getName();
445      HFileBlock gotHFileBlock = null;
446      try {
447        Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
448        gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
449        assertTrue(gotHFileBlock.equals(hfileBlock));
450        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
451        assertEquals(2, gotHFileBlock.refCnt());
452        try {
453          /**
454           * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
455           * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
456           * could continue.
457           */
458          myBucketCache2.putCyclicBarrier.await();
459        } catch (Throwable e) {
460          throw new RuntimeException(e);
461        }
462
463      } finally {
464        Thread.currentThread().setName(oldThreadName);
465      }
466
467      cacheBlockThread.join();
468      evictBlockThread.join();
469      assertTrue(cacheBlockThreadExceptionRef.get() == null);
470      assertTrue(evictBlockThreadExceptionRef.get() == null);
471
472      assertTrue(gotHFileBlock.equals(hfileBlock));
473      assertEquals(1, gotHFileBlock.refCnt());
474      assertTrue(myBucketCache2.overwiteByteBuff == null);
475      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
476
477      gotHFileBlock.release();
478      assertEquals(0, gotHFileBlock.refCnt());
479      assertTrue(myBucketCache2.overwiteByteBuff != null);
480      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
481      assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
482    } finally {
483      myBucketCache2.shutdown();
484    }
485
486  }
487
488  static class MyBucketCache extends BucketCache {
489    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
490    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
491
492    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
493    private final AtomicInteger replaceCounter = new AtomicInteger(0);
494    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
495    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
496    private ByteBuff overwiteByteBuff = null;
497
498    public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
499      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
500      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
501        persistencePath);
502    }
503
504    /**
505     * Simulate the Block could be replaced.
506     */
507    @Override
508    protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
509      replaceCounter.incrementAndGet();
510      return true;
511    }
512
513    @Override
514    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
515      boolean updateCacheMetrics) {
516      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
517        /**
518         * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
519         * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
520         * checking.
521         */
522        try {
523          cyclicBarrier.await();
524        } catch (Throwable e) {
525          throw new RuntimeException(e);
526        }
527      }
528      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
529      return result;
530    }
531
532    @Override
533    protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
534      boolean inMemory, boolean wait) {
535      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
536        /**
537         * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
538         */
539        try {
540          cyclicBarrier.await();
541        } catch (Throwable e) {
542          throw new RuntimeException(e);
543        }
544      }
545      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
546        /**
547         * Wait the cyclicBarrier.await() in
548         * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
549         * {@link MyBucketCache#getBlock} and Assert completed.
550         */
551        try {
552          cyclicBarrier.await();
553        } catch (Throwable e) {
554          throw new RuntimeException(e);
555        }
556      }
557      super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
558    }
559
560    @Override
561    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
562      boolean evictedByEvictionProcess) {
563      blockEvictCounter.incrementAndGet();
564      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
565    }
566
567    /**
568     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
569     * {@link BucketEntry} is freed.
570     */
571    @Override
572    void freeBucketEntry(BucketEntry bucketEntry) {
573      freeBucketEntryCounter.incrementAndGet();
574      super.freeBucketEntry(bucketEntry);
575      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
576      try {
577        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
578      } catch (IOException e) {
579        throw new RuntimeException(e);
580      }
581    }
582  }
583
584  static class MyBucketCache2 extends BucketCache {
585    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
586    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
587    private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
588
589    private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
590    private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
591    private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
592    /**
593     * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
594     * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
595     */
596    private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
597    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
598    private final AtomicInteger removeRamCounter = new AtomicInteger(0);
599    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
600    private ByteBuff overwiteByteBuff = null;
601
602    public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
603      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
604      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
605        persistencePath);
606    }
607
608    @Override
609    protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
610      super.putIntoBackingMap(key, bucketEntry);
611      /**
612       * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
613       * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
614       */
615      try {
616        evictCyclicBarrier.await();
617      } catch (Throwable e) {
618        throw new RuntimeException(e);
619      }
620
621      /**
622       * Wait the cyclicBarrier.await() in
623       * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
624       * {@link MyBucketCache#getBlock} and Assert completed.
625       */
626      try {
627        putCyclicBarrier.await();
628      } catch (Throwable e) {
629        throw new RuntimeException(e);
630      }
631    }
632
633    @Override
634    void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
635      super.doDrain(entries, metaBuff);
636      if (entries.size() > 0) {
637        /**
638         * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
639         * {@link #EVICT_BLOCK_THREAD_NAME}.
640         */
641        try {
642          writeThreadDoneCyclicBarrier.await();
643        } catch (Throwable e) {
644          throw new RuntimeException(e);
645        }
646      }
647
648    }
649
650    @Override
651    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
652      boolean updateCacheMetrics) {
653      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
654        /**
655         * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
656         * {@link BucketCache#removeFromRamCache}.
657         */
658        try {
659          getCyclicBarrier.await();
660        } catch (Throwable e) {
661          throw new RuntimeException(e);
662        }
663      }
664      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
665      return result;
666    }
667
668    @Override
669    protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
670      boolean firstTime = false;
671      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
672        int count = this.removeRamCounter.incrementAndGet();
673        firstTime = (count == 1);
674        if (firstTime) {
675          /**
676           * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
677           * {@link BucketCache#putIntoBackingMap}.
678           */
679          try {
680            evictCyclicBarrier.await();
681          } catch (Throwable e) {
682            throw new RuntimeException(e);
683          }
684        }
685      }
686      boolean result = super.removeFromRamCache(cacheKey);
687      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
688        if (firstTime) {
689          /**
690           * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
691           */
692          try {
693            getCyclicBarrier.await();
694          } catch (Throwable e) {
695            throw new RuntimeException(e);
696          }
697          /**
698           * Wait for Caching Block completed, after Caching Block completed, evictBlock could
699           * continue.
700           */
701          try {
702            writeThreadDoneCyclicBarrier.await();
703          } catch (Throwable e) {
704            throw new RuntimeException(e);
705          }
706        }
707      }
708
709      return result;
710    }
711
712    @Override
713    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
714      boolean evictedByEvictionProcess) {
715      /**
716       * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
717       * only one {@link BucketCache.WriterThread}.
718       */
719      assertTrue(Thread.currentThread() == this.writerThreads[0]);
720
721      blockEvictCounter.incrementAndGet();
722      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
723    }
724
725    /**
726     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
727     * {@link BucketEntry} is freed.
728     */
729    @Override
730    void freeBucketEntry(BucketEntry bucketEntry) {
731      freeBucketEntryCounter.incrementAndGet();
732      super.freeBucketEntry(bucketEntry);
733      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
734      try {
735        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
736      } catch (IOException e) {
737        throw new RuntimeException(e);
738      }
739    }
740  }
741
742  private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
743    int byteSize = bucketEntry.getLength();
744    byte[] data = new byte[byteSize];
745    Arrays.fill(data, (byte) 0xff);
746    return ByteBuff.wrap(ByteBuffer.wrap(data));
747  }
748}