001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.CyclicBarrier;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.io.ByteBuffAllocator;
036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
038import org.apache.hadoop.hbase.io.hfile.BlockType;
039import org.apache.hadoop.hbase.io.hfile.Cacheable;
040import org.apache.hadoop.hbase.io.hfile.HFileBlock;
041import org.apache.hadoop.hbase.io.hfile.HFileContext;
042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.nio.RefCnt;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052@Category({ IOTests.class, SmallTests.class })
053public class TestBucketCacheRefCnt {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class);
058
059  private static final String IO_ENGINE = "offheap";
060  private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
061  private static final int BLOCK_SIZE = 1024;
062  private static final int[] BLOCK_SIZE_ARRAY =
063    new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
064  private static final String PERSISTENCE_PATH = null;
065  private static final HFileContext CONTEXT = new HFileContextBuilder().build();
066
067  private BucketCache cache;
068
069  private static BucketCache create(int writerSize, int queueSize) throws IOException {
070    return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
071      queueSize, PERSISTENCE_PATH);
072  }
073
074  private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
075    throws IOException {
076    return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
077      queueSize, PERSISTENCE_PATH);
078  }
079
080  private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
081    throws IOException {
082    return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
083      queueSize, PERSISTENCE_PATH);
084  }
085
086  private static HFileBlock createBlock(int offset, int size) {
087    return createBlock(offset, size, ByteBuffAllocator.HEAP);
088  }
089
090  private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
091    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
092      HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
093  }
094
095  private static BlockCacheKey createKey(String hfileName, long offset) {
096    return new BlockCacheKey(hfileName, offset);
097  }
098
099  private void disableWriter() {
100    if (cache != null) {
101      for (WriterThread wt : cache.writerThreads) {
102        wt.disableWriter();
103        wt.interrupt();
104      }
105    }
106  }
107
108  @org.junit.Ignore
109  @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082
110  // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
111  public void testBlockInRAMCache() throws IOException {
112    cache = create(1, 1000);
113    // Set this to true;
114    cache.wait_when_cache = true;
115    disableWriter();
116    final String prefix = "testBlockInRamCache";
117    try {
118      for (int i = 0; i < 10; i++) {
119        HFileBlock blk = createBlock(i, 1020);
120        BlockCacheKey key = createKey(prefix, i);
121        assertEquals(1, blk.refCnt());
122        cache.cacheBlock(key, blk);
123        assertEquals(i + 1, cache.getBlockCount());
124        assertEquals(2, blk.refCnt());
125
126        Cacheable block = cache.getBlock(key, false, false, false);
127        try {
128          assertEquals(3, blk.refCnt());
129          assertEquals(3, block.refCnt());
130          assertEquals(blk, block);
131        } finally {
132          block.release();
133        }
134        assertEquals(2, blk.refCnt());
135        assertEquals(2, block.refCnt());
136      }
137
138      for (int i = 0; i < 10; i++) {
139        BlockCacheKey key = createKey(prefix, i);
140        Cacheable blk = cache.getBlock(key, false, false, false);
141        assertEquals(3, blk.refCnt());
142        assertFalse(blk.release());
143        assertEquals(2, blk.refCnt());
144
145        assertTrue(cache.evictBlock(key));
146        assertEquals(1, blk.refCnt());
147        assertTrue(blk.release());
148        assertEquals(0, blk.refCnt());
149      }
150    } finally {
151      cache.shutdown();
152    }
153  }
154
155  private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
156    throws InterruptedException {
157    while (
158      !bucketCache.backingMap.containsKey(blockCacheKey)
159        || bucketCache.ramCache.containsKey(blockCacheKey)
160    ) {
161      Thread.sleep(100);
162    }
163    Thread.sleep(1000);
164  }
165
166  @Test
167  public void testBlockInBackingMap() throws Exception {
168    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
169    cache = create(1, 1000);
170    try {
171      HFileBlock blk = createBlock(200, 1020, alloc);
172      BlockCacheKey key = createKey("testHFile-00", 200);
173      cache.cacheBlock(key, blk);
174      waitUntilFlushedToCache(cache, key);
175      assertEquals(1, blk.refCnt());
176
177      Cacheable block = cache.getBlock(key, false, false, false);
178      assertTrue(block instanceof HFileBlock);
179      assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
180      assertEquals(2, block.refCnt());
181
182      block.retain();
183      assertEquals(3, block.refCnt());
184
185      Cacheable newBlock = cache.getBlock(key, false, false, false);
186      assertTrue(newBlock instanceof HFileBlock);
187      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
188      assertEquals(4, newBlock.refCnt());
189
190      // release the newBlock
191      assertFalse(newBlock.release());
192      assertEquals(3, newBlock.refCnt());
193      assertEquals(3, block.refCnt());
194
195      // Evict the key
196      cache.evictBlock(key);
197      assertEquals(2, block.refCnt());
198
199      // Evict again, shouldn't change the refCnt.
200      cache.evictBlock(key);
201      assertEquals(2, block.refCnt());
202
203      assertFalse(block.release());
204      assertEquals(1, block.refCnt());
205
206      /**
207       * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
208       * so {@link BucketCache#getBlock} return null.
209       */
210      Cacheable newestBlock = cache.getBlock(key, false, false, false);
211      assertNull(newestBlock);
212      assertEquals(1, block.refCnt());
213      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
214
215      // Release the block
216      assertTrue(block.release());
217      assertEquals(0, block.refCnt());
218      assertEquals(0, newBlock.refCnt());
219    } finally {
220      cache.shutdown();
221    }
222  }
223
224  @Test
225  public void testInBucketCache() throws IOException {
226    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
227    cache = create(1, 1000);
228    try {
229      HFileBlock blk = createBlock(200, 1020, alloc);
230      BlockCacheKey key = createKey("testHFile-00", 200);
231      cache.cacheBlock(key, blk);
232      assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
233
234      Cacheable block1 = cache.getBlock(key, false, false, false);
235      assertTrue(block1.refCnt() >= 2);
236      assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc);
237
238      Cacheable block2 = cache.getBlock(key, false, false, false);
239      assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc);
240      assertTrue(block2.refCnt() >= 3);
241
242      cache.evictBlock(key);
243      assertTrue(blk.refCnt() >= 1);
244      assertTrue(block1.refCnt() >= 2);
245      assertTrue(block2.refCnt() >= 2);
246
247      // Get key again
248      Cacheable block3 = cache.getBlock(key, false, false, false);
249      if (block3 != null) {
250        assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc);
251        assertTrue(block3.refCnt() >= 3);
252        assertFalse(block3.release());
253      }
254
255      blk.release();
256      boolean ret1 = block1.release();
257      boolean ret2 = block2.release();
258      assertTrue(ret1 || ret2);
259      assertEquals(0, blk.refCnt());
260      assertEquals(0, block1.refCnt());
261      assertEquals(0, block2.refCnt());
262    } finally {
263      cache.shutdown();
264    }
265  }
266
267  @Test
268  public void testMarkStaleAsEvicted() throws Exception {
269    cache = create(1, 1000);
270    try {
271      HFileBlock blk = createBlock(200, 1020);
272      BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
273      cache.cacheBlock(key, blk);
274      waitUntilFlushedToCache(cache, key);
275      assertEquals(1, blk.refCnt());
276      assertNotNull(cache.backingMap.get(key));
277      assertEquals(1, cache.backingMap.get(key).refCnt());
278
279      // RPC reference this cache.
280      Cacheable block1 = cache.getBlock(key, false, false, false);
281      assertEquals(2, block1.refCnt());
282      BucketEntry be1 = cache.backingMap.get(key);
283      assertNotNull(be1);
284      assertEquals(2, be1.refCnt());
285
286      // We've some RPC reference, so it won't have any effect.
287      assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
288      assertEquals(2, block1.refCnt());
289      assertEquals(2, cache.backingMap.get(key).refCnt());
290
291      // Release the RPC reference.
292      block1.release();
293      assertEquals(1, block1.refCnt());
294      assertEquals(1, cache.backingMap.get(key).refCnt());
295
296      // Mark the stale as evicted again, it'll do the de-allocation.
297      assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
298      assertEquals(0, block1.refCnt());
299      assertNull(cache.backingMap.get(key));
300      assertEquals(0, cache.size());
301    } finally {
302      cache.shutdown();
303    }
304  }
305
306  /**
307   * <pre>
308   * This test is for HBASE-26281,
309   * test two threads for replacing Block and getting Block execute concurrently.
310   * The threads sequence is:
311   * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
312   * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
313   *    {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
314   *    replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
315   * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
316   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
317   * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
318   *    the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
319   *    by Thread2 and the content of Block1 would be overwritten after it is freed, which may
320   *    cause a serious error.
321   * </pre>
322   *
323   * n
324   */
325  @Test
326  public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
327    ByteBuffAllocator byteBuffAllocator =
328      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
329    final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
330    try {
331      HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
332      final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
333      myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
334      waitUntilFlushedToCache(myBucketCache, blockCacheKey);
335      assertEquals(1, hfileBlock.refCnt());
336
337      assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
338      final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
339      Thread cacheBlockThread = new Thread(() -> {
340        try {
341          HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
342          myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
343          waitUntilFlushedToCache(myBucketCache, blockCacheKey);
344
345        } catch (Throwable exception) {
346          exceptionRef.set(exception);
347        }
348      });
349      cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
350      cacheBlockThread.start();
351
352      String oldThreadName = Thread.currentThread().getName();
353      HFileBlock gotHFileBlock = null;
354      try {
355
356        Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
357
358        gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
359        assertTrue(gotHFileBlock.equals(hfileBlock));
360        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
361        assertEquals(2, gotHFileBlock.refCnt());
362        /**
363         * Release the second cyclicBarrier.await in
364         * {@link MyBucketCache#cacheBlockWithWaitInternal}
365         */
366        myBucketCache.cyclicBarrier.await();
367
368      } finally {
369        Thread.currentThread().setName(oldThreadName);
370      }
371
372      cacheBlockThread.join();
373      assertTrue(exceptionRef.get() == null);
374      assertEquals(1, gotHFileBlock.refCnt());
375      assertTrue(gotHFileBlock.equals(hfileBlock));
376      assertTrue(myBucketCache.overwiteByteBuff == null);
377      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
378
379      gotHFileBlock.release();
380      assertEquals(0, gotHFileBlock.refCnt());
381      assertTrue(myBucketCache.overwiteByteBuff != null);
382      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
383      assertTrue(myBucketCache.replaceCounter.get() == 1);
384      assertTrue(myBucketCache.blockEvictCounter.get() == 1);
385    } finally {
386      myBucketCache.shutdown();
387    }
388
389  }
390
391  /**
392   * <pre>
393   * This test also is for HBASE-26281,
394   * test three threads for evicting Block,caching Block and getting Block
395   * execute concurrently.
396   * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
397   *    the {@link RefCnt} of Block1 is 1.
398   * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
399   *    but stopping after {@link BucketCache#removeFromRamCache}.
400   * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
401   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
402   * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
403   *    returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
404   *    directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
405   * </pre>
406   */
407  @Test
408  public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
409    ByteBuffAllocator byteBuffAllocator =
410      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
411    final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
412    try {
413      final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
414      final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
415      final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
416        new AtomicReference<Throwable>();
417      Thread cacheBlockThread = new Thread(() -> {
418        try {
419          myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
420          /**
421           * Wait for Caching Block completed.
422           */
423          myBucketCache2.writeThreadDoneCyclicBarrier.await();
424        } catch (Throwable exception) {
425          cacheBlockThreadExceptionRef.set(exception);
426        }
427      });
428      cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
429      cacheBlockThread.start();
430
431      final AtomicReference<Throwable> evictBlockThreadExceptionRef =
432        new AtomicReference<Throwable>();
433      Thread evictBlockThread = new Thread(() -> {
434        try {
435          myBucketCache2.evictBlock(blockCacheKey);
436        } catch (Throwable exception) {
437          evictBlockThreadExceptionRef.set(exception);
438        }
439      });
440      evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
441      evictBlockThread.start();
442
443      String oldThreadName = Thread.currentThread().getName();
444      HFileBlock gotHFileBlock = null;
445      try {
446        Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
447        gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
448        assertTrue(gotHFileBlock.equals(hfileBlock));
449        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
450        assertEquals(2, gotHFileBlock.refCnt());
451        try {
452          /**
453           * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
454           * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
455           * could continue.
456           */
457          myBucketCache2.putCyclicBarrier.await();
458        } catch (Throwable e) {
459          throw new RuntimeException(e);
460        }
461
462      } finally {
463        Thread.currentThread().setName(oldThreadName);
464      }
465
466      cacheBlockThread.join();
467      evictBlockThread.join();
468      assertTrue(cacheBlockThreadExceptionRef.get() == null);
469      assertTrue(evictBlockThreadExceptionRef.get() == null);
470
471      assertTrue(gotHFileBlock.equals(hfileBlock));
472      assertEquals(1, gotHFileBlock.refCnt());
473      assertTrue(myBucketCache2.overwiteByteBuff == null);
474      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
475
476      gotHFileBlock.release();
477      assertEquals(0, gotHFileBlock.refCnt());
478      assertTrue(myBucketCache2.overwiteByteBuff != null);
479      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
480      assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
481    } finally {
482      myBucketCache2.shutdown();
483    }
484
485  }
486
487  static class MyBucketCache extends BucketCache {
488    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
489    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
490
491    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
492    private final AtomicInteger replaceCounter = new AtomicInteger(0);
493    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
494    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
495    private ByteBuff overwiteByteBuff = null;
496
497    public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
498      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
499      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
500        persistencePath);
501    }
502
503    /**
504     * Simulate the Block could be replaced.
505     */
506    @Override
507    protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
508      replaceCounter.incrementAndGet();
509      return true;
510    }
511
512    @Override
513    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
514      boolean updateCacheMetrics) {
515      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
516        /**
517         * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
518         * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
519         * checking.
520         */
521        try {
522          cyclicBarrier.await();
523        } catch (Throwable e) {
524          throw new RuntimeException(e);
525        }
526      }
527      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
528      return result;
529    }
530
531    @Override
532    protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
533      boolean inMemory, boolean wait) {
534      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
535        /**
536         * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
537         */
538        try {
539          cyclicBarrier.await();
540        } catch (Throwable e) {
541          throw new RuntimeException(e);
542        }
543      }
544      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
545        /**
546         * Wait the cyclicBarrier.await() in
547         * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
548         * {@link MyBucketCache#getBlock} and Assert completed.
549         */
550        try {
551          cyclicBarrier.await();
552        } catch (Throwable e) {
553          throw new RuntimeException(e);
554        }
555      }
556      super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
557    }
558
559    @Override
560    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
561      boolean evictedByEvictionProcess) {
562      blockEvictCounter.incrementAndGet();
563      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
564    }
565
566    /**
567     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
568     * {@link BucketEntry} is freed.
569     */
570    @Override
571    void freeBucketEntry(BucketEntry bucketEntry) {
572      freeBucketEntryCounter.incrementAndGet();
573      super.freeBucketEntry(bucketEntry);
574      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
575      try {
576        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
577      } catch (IOException e) {
578        throw new RuntimeException(e);
579      }
580    }
581  }
582
583  static class MyBucketCache2 extends BucketCache {
584    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
585    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
586    private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
587
588    private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
589    private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
590    private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
591    /**
592     * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
593     * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
594     */
595    private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
596    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
597    private final AtomicInteger removeRamCounter = new AtomicInteger(0);
598    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
599    private ByteBuff overwiteByteBuff = null;
600
601    public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
602      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
603      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
604        persistencePath);
605    }
606
607    @Override
608    protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
609      super.putIntoBackingMap(key, bucketEntry);
610      /**
611       * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
612       * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
613       */
614      try {
615        evictCyclicBarrier.await();
616      } catch (Throwable e) {
617        throw new RuntimeException(e);
618      }
619
620      /**
621       * Wait the cyclicBarrier.await() in
622       * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
623       * {@link MyBucketCache#getBlock} and Assert completed.
624       */
625      try {
626        putCyclicBarrier.await();
627      } catch (Throwable e) {
628        throw new RuntimeException(e);
629      }
630    }
631
632    @Override
633    void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
634      super.doDrain(entries, metaBuff);
635      if (entries.size() > 0) {
636        /**
637         * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
638         * {@link #EVICT_BLOCK_THREAD_NAME}.
639         */
640        try {
641          writeThreadDoneCyclicBarrier.await();
642        } catch (Throwable e) {
643          throw new RuntimeException(e);
644        }
645      }
646
647    }
648
649    @Override
650    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
651      boolean updateCacheMetrics) {
652      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
653        /**
654         * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
655         * {@link BucketCache#removeFromRamCache}.
656         */
657        try {
658          getCyclicBarrier.await();
659        } catch (Throwable e) {
660          throw new RuntimeException(e);
661        }
662      }
663      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
664      return result;
665    }
666
667    @Override
668    protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
669      boolean firstTime = false;
670      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
671        int count = this.removeRamCounter.incrementAndGet();
672        firstTime = (count == 1);
673        if (firstTime) {
674          /**
675           * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
676           * {@link BucketCache#putIntoBackingMap}.
677           */
678          try {
679            evictCyclicBarrier.await();
680          } catch (Throwable e) {
681            throw new RuntimeException(e);
682          }
683        }
684      }
685      boolean result = super.removeFromRamCache(cacheKey);
686      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
687        if (firstTime) {
688          /**
689           * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
690           */
691          try {
692            getCyclicBarrier.await();
693          } catch (Throwable e) {
694            throw new RuntimeException(e);
695          }
696          /**
697           * Wait for Caching Block completed, after Caching Block completed, evictBlock could
698           * continue.
699           */
700          try {
701            writeThreadDoneCyclicBarrier.await();
702          } catch (Throwable e) {
703            throw new RuntimeException(e);
704          }
705        }
706      }
707
708      return result;
709    }
710
711    @Override
712    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
713      boolean evictedByEvictionProcess) {
714      /**
715       * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
716       * only one {@link BucketCache.WriterThread}.
717       */
718      assertTrue(Thread.currentThread() == this.writerThreads[0]);
719
720      blockEvictCounter.incrementAndGet();
721      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
722    }
723
724    /**
725     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
726     * {@link BucketEntry} is freed.
727     */
728    @Override
729    void freeBucketEntry(BucketEntry bucketEntry) {
730      freeBucketEntryCounter.incrementAndGet();
731      super.freeBucketEntry(bucketEntry);
732      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
733      try {
734        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
735      } catch (IOException e) {
736        throw new RuntimeException(e);
737      }
738    }
739  }
740
741  private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
742    int byteSize = bucketEntry.getLength();
743    byte[] data = new byte[byteSize];
744    Arrays.fill(data, (byte) 0xff);
745    return ByteBuff.wrap(ByteBuffer.wrap(data));
746  }
747}