001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.CyclicBarrier;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.io.ByteBuffAllocator;
036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
038import org.apache.hadoop.hbase.io.hfile.BlockType;
039import org.apache.hadoop.hbase.io.hfile.Cacheable;
040import org.apache.hadoop.hbase.io.hfile.HFileBlock;
041import org.apache.hadoop.hbase.io.hfile.HFileContext;
042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.nio.RefCnt;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052@Category({ IOTests.class, SmallTests.class })
053public class TestBucketCacheRefCnt {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class);
058
059  private static final String IO_ENGINE = "offheap";
060  private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
061  private static final int BLOCK_SIZE = 1024;
062  private static final int[] BLOCK_SIZE_ARRAY =
063    new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
064  private static final String PERSISTENCE_PATH = null;
065  private static final HFileContext CONTEXT = new HFileContextBuilder().build();
066
067  private BucketCache cache;
068
069  private static BucketCache create(int writerSize, int queueSize) throws IOException {
070    return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
071      queueSize, PERSISTENCE_PATH);
072  }
073
074  private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
075    throws IOException {
076    return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
077      queueSize, PERSISTENCE_PATH);
078  }
079
080  private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
081    throws IOException {
082    return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
083      queueSize, PERSISTENCE_PATH);
084  }
085
086  private static HFileBlock createBlock(int offset, int size) {
087    return createBlock(offset, size, ByteBuffAllocator.HEAP);
088  }
089
090  private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
091    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
092      HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
093  }
094
095  private static BlockCacheKey createKey(String hfileName, long offset) {
096    return new BlockCacheKey(hfileName, offset);
097  }
098
099  private void disableWriter() {
100    if (cache != null) {
101      for (WriterThread wt : cache.writerThreads) {
102        wt.disableWriter();
103        wt.interrupt();
104      }
105    }
106  }
107
108  @org.junit.Ignore
109  @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082
110  // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
111  public void testBlockInRAMCache() throws IOException {
112    cache = create(1, 1000);
113    disableWriter();
114    final String prefix = "testBlockInRamCache";
115    try {
116      for (int i = 0; i < 10; i++) {
117        HFileBlock blk = createBlock(i, 1020);
118        BlockCacheKey key = createKey(prefix, i);
119        assertEquals(1, blk.refCnt());
120        cache.cacheBlock(key, blk);
121        assertEquals(i + 1, cache.getBlockCount());
122        assertEquals(2, blk.refCnt());
123
124        Cacheable block = cache.getBlock(key, false, false, false);
125        try {
126          assertEquals(3, blk.refCnt());
127          assertEquals(3, block.refCnt());
128          assertEquals(blk, block);
129        } finally {
130          block.release();
131        }
132        assertEquals(2, blk.refCnt());
133        assertEquals(2, block.refCnt());
134      }
135
136      for (int i = 0; i < 10; i++) {
137        BlockCacheKey key = createKey(prefix, i);
138        Cacheable blk = cache.getBlock(key, false, false, false);
139        assertEquals(3, blk.refCnt());
140        assertFalse(blk.release());
141        assertEquals(2, blk.refCnt());
142
143        assertTrue(cache.evictBlock(key));
144        assertEquals(1, blk.refCnt());
145        assertTrue(blk.release());
146        assertEquals(0, blk.refCnt());
147      }
148    } finally {
149      cache.shutdown();
150    }
151  }
152
153  private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
154    throws InterruptedException {
155    while (
156      !bucketCache.backingMap.containsKey(blockCacheKey)
157        || bucketCache.ramCache.containsKey(blockCacheKey)
158    ) {
159      Thread.sleep(100);
160    }
161    Thread.sleep(1000);
162  }
163
164  @Test
165  public void testBlockInBackingMap() throws Exception {
166    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
167    cache = create(1, 1000);
168    try {
169      HFileBlock blk = createBlock(200, 1020, alloc);
170      BlockCacheKey key = createKey("testHFile-00", 200);
171      cache.cacheBlock(key, blk);
172      waitUntilFlushedToCache(cache, key);
173      assertEquals(1, blk.refCnt());
174
175      Cacheable block = cache.getBlock(key, false, false, false);
176      assertTrue(block instanceof HFileBlock);
177      assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
178      assertEquals(2, block.refCnt());
179
180      block.retain();
181      assertEquals(3, block.refCnt());
182
183      Cacheable newBlock = cache.getBlock(key, false, false, false);
184      assertTrue(newBlock instanceof HFileBlock);
185      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
186      assertEquals(4, newBlock.refCnt());
187
188      // release the newBlock
189      assertFalse(newBlock.release());
190      assertEquals(3, newBlock.refCnt());
191      assertEquals(3, block.refCnt());
192
193      // Evict the key
194      cache.evictBlock(key);
195      assertEquals(2, block.refCnt());
196
197      // Evict again, shouldn't change the refCnt.
198      cache.evictBlock(key);
199      assertEquals(2, block.refCnt());
200
201      assertFalse(block.release());
202      assertEquals(1, block.refCnt());
203
204      /**
205       * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
206       * so {@link BucketCache#getBlock} return null.
207       */
208      Cacheable newestBlock = cache.getBlock(key, false, false, false);
209      assertNull(newestBlock);
210      assertEquals(1, block.refCnt());
211      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
212
213      // Release the block
214      assertTrue(block.release());
215      assertEquals(0, block.refCnt());
216      assertEquals(0, newBlock.refCnt());
217    } finally {
218      cache.shutdown();
219    }
220  }
221
222  @Test
223  public void testInBucketCache() throws IOException {
224    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
225    cache = create(1, 1000);
226    try {
227      HFileBlock blk = createBlock(200, 1020, alloc);
228      BlockCacheKey key = createKey("testHFile-00", 200);
229      cache.cacheBlock(key, blk);
230      assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
231
232      Cacheable block1 = cache.getBlock(key, false, false, false);
233      assertTrue(block1.refCnt() >= 2);
234      assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc);
235
236      Cacheable block2 = cache.getBlock(key, false, false, false);
237      assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc);
238      assertTrue(block2.refCnt() >= 3);
239
240      cache.evictBlock(key);
241      assertTrue(blk.refCnt() >= 1);
242      assertTrue(block1.refCnt() >= 2);
243      assertTrue(block2.refCnt() >= 2);
244
245      // Get key again
246      Cacheable block3 = cache.getBlock(key, false, false, false);
247      if (block3 != null) {
248        assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc);
249        assertTrue(block3.refCnt() >= 3);
250        assertFalse(block3.release());
251      }
252
253      blk.release();
254      boolean ret1 = block1.release();
255      boolean ret2 = block2.release();
256      assertTrue(ret1 || ret2);
257      assertEquals(0, blk.refCnt());
258      assertEquals(0, block1.refCnt());
259      assertEquals(0, block2.refCnt());
260    } finally {
261      cache.shutdown();
262    }
263  }
264
265  @Test
266  public void testMarkStaleAsEvicted() throws Exception {
267    cache = create(1, 1000);
268    try {
269      HFileBlock blk = createBlock(200, 1020);
270      BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
271      cache.cacheBlock(key, blk);
272      waitUntilFlushedToCache(cache, key);
273      assertEquals(1, blk.refCnt());
274      assertNotNull(cache.backingMap.get(key));
275      assertEquals(1, cache.backingMap.get(key).refCnt());
276
277      // RPC reference this cache.
278      Cacheable block1 = cache.getBlock(key, false, false, false);
279      assertEquals(2, block1.refCnt());
280      BucketEntry be1 = cache.backingMap.get(key);
281      assertNotNull(be1);
282      assertEquals(2, be1.refCnt());
283
284      // We've some RPC reference, so it won't have any effect.
285      assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
286      assertEquals(2, block1.refCnt());
287      assertEquals(2, cache.backingMap.get(key).refCnt());
288
289      // Release the RPC reference.
290      block1.release();
291      assertEquals(1, block1.refCnt());
292      assertEquals(1, cache.backingMap.get(key).refCnt());
293
294      // Mark the stale as evicted again, it'll do the de-allocation.
295      assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
296      assertEquals(0, block1.refCnt());
297      assertNull(cache.backingMap.get(key));
298      assertEquals(0, cache.size());
299    } finally {
300      cache.shutdown();
301    }
302  }
303
304  /**
305   * <pre>
306   * This test is for HBASE-26281,
307   * test two threads for replacing Block and getting Block execute concurrently.
308   * The threads sequence is:
309   * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
310   * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
311   *    {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
312   *    replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
313   * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
314   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
315   * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
316   *    the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
317   *    by Thread2 and the content of Block1 would be overwritten after it is freed, which may
318   *    cause a serious error.
319   * </pre>
320   */
321  @Test
322  public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
323    ByteBuffAllocator byteBuffAllocator =
324      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
325    final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
326    try {
327      HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
328      final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
329      myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
330      waitUntilFlushedToCache(myBucketCache, blockCacheKey);
331      assertEquals(1, hfileBlock.refCnt());
332
333      assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
334      final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
335      Thread cacheBlockThread = new Thread(() -> {
336        try {
337          HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
338          myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
339          waitUntilFlushedToCache(myBucketCache, blockCacheKey);
340
341        } catch (Throwable exception) {
342          exceptionRef.set(exception);
343        }
344      });
345      cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
346      cacheBlockThread.start();
347
348      String oldThreadName = Thread.currentThread().getName();
349      HFileBlock gotHFileBlock = null;
350      try {
351
352        Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
353
354        gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
355        assertTrue(gotHFileBlock.equals(hfileBlock));
356        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
357        assertEquals(2, gotHFileBlock.refCnt());
358        /**
359         * Release the second cyclicBarrier.await in
360         * {@link MyBucketCache#cacheBlockWithWaitInternal}
361         */
362        myBucketCache.cyclicBarrier.await();
363
364      } finally {
365        Thread.currentThread().setName(oldThreadName);
366      }
367
368      cacheBlockThread.join();
369      assertTrue(exceptionRef.get() == null);
370      assertEquals(1, gotHFileBlock.refCnt());
371      assertTrue(gotHFileBlock.equals(hfileBlock));
372      assertTrue(myBucketCache.overwiteByteBuff == null);
373      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
374
375      gotHFileBlock.release();
376      assertEquals(0, gotHFileBlock.refCnt());
377      assertTrue(myBucketCache.overwiteByteBuff != null);
378      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
379      assertTrue(myBucketCache.replaceCounter.get() == 1);
380      assertTrue(myBucketCache.blockEvictCounter.get() == 1);
381    } finally {
382      myBucketCache.shutdown();
383    }
384
385  }
386
387  /**
388   * <pre>
389   * This test also is for HBASE-26281,
390   * test three threads for evicting Block,caching Block and getting Block
391   * execute concurrently.
392   * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
393   *    the {@link RefCnt} of Block1 is 1.
394   * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
395   *    but stopping after {@link BucketCache#removeFromRamCache}.
396   * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
397   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
398   * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
399   *    returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
400   *    directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
401   * </pre>
402   */
403  @Test
404  public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
405    ByteBuffAllocator byteBuffAllocator =
406      ByteBuffAllocator.create(HBaseConfiguration.create(), true);
407    final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
408    try {
409      final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
410      final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
411      final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
412        new AtomicReference<Throwable>();
413      Thread cacheBlockThread = new Thread(() -> {
414        try {
415          myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
416          /**
417           * Wait for Caching Block completed.
418           */
419          myBucketCache2.writeThreadDoneCyclicBarrier.await();
420        } catch (Throwable exception) {
421          cacheBlockThreadExceptionRef.set(exception);
422        }
423      });
424      cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
425      cacheBlockThread.start();
426
427      final AtomicReference<Throwable> evictBlockThreadExceptionRef =
428        new AtomicReference<Throwable>();
429      Thread evictBlockThread = new Thread(() -> {
430        try {
431          myBucketCache2.evictBlock(blockCacheKey);
432        } catch (Throwable exception) {
433          evictBlockThreadExceptionRef.set(exception);
434        }
435      });
436      evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
437      evictBlockThread.start();
438
439      String oldThreadName = Thread.currentThread().getName();
440      HFileBlock gotHFileBlock = null;
441      try {
442        Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
443        gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
444        assertTrue(gotHFileBlock.equals(hfileBlock));
445        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
446        assertEquals(2, gotHFileBlock.refCnt());
447        try {
448          /**
449           * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
450           * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
451           * could continue.
452           */
453          myBucketCache2.putCyclicBarrier.await();
454        } catch (Throwable e) {
455          throw new RuntimeException(e);
456        }
457
458      } finally {
459        Thread.currentThread().setName(oldThreadName);
460      }
461
462      cacheBlockThread.join();
463      evictBlockThread.join();
464      assertTrue(cacheBlockThreadExceptionRef.get() == null);
465      assertTrue(evictBlockThreadExceptionRef.get() == null);
466
467      assertTrue(gotHFileBlock.equals(hfileBlock));
468      assertEquals(1, gotHFileBlock.refCnt());
469      assertTrue(myBucketCache2.overwiteByteBuff == null);
470      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
471
472      gotHFileBlock.release();
473      assertEquals(0, gotHFileBlock.refCnt());
474      assertTrue(myBucketCache2.overwiteByteBuff != null);
475      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
476      assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
477    } finally {
478      myBucketCache2.shutdown();
479    }
480
481  }
482
483  static class MyBucketCache extends BucketCache {
484    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
485    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
486
487    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
488    private final AtomicInteger replaceCounter = new AtomicInteger(0);
489    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
490    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
491    private ByteBuff overwiteByteBuff = null;
492
493    public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
494      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
495      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
496        persistencePath);
497    }
498
499    /**
500     * Simulate the Block could be replaced.
501     */
502    @Override
503    protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
504      replaceCounter.incrementAndGet();
505      return true;
506    }
507
508    @Override
509    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
510      boolean updateCacheMetrics) {
511      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
512        /**
513         * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
514         * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
515         * checking.
516         */
517        try {
518          cyclicBarrier.await();
519        } catch (Throwable e) {
520          throw new RuntimeException(e);
521        }
522      }
523      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
524      return result;
525    }
526
527    @Override
528    protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
529      boolean inMemory, boolean wait) {
530      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
531        /**
532         * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
533         */
534        try {
535          cyclicBarrier.await();
536        } catch (Throwable e) {
537          throw new RuntimeException(e);
538        }
539      }
540      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
541        /**
542         * Wait the cyclicBarrier.await() in
543         * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
544         * {@link MyBucketCache#getBlock} and Assert completed.
545         */
546        try {
547          cyclicBarrier.await();
548        } catch (Throwable e) {
549          throw new RuntimeException(e);
550        }
551      }
552      super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
553    }
554
555    @Override
556    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
557      boolean evictedByEvictionProcess) {
558      blockEvictCounter.incrementAndGet();
559      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
560    }
561
562    /**
563     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
564     * {@link BucketEntry} is freed.
565     */
566    @Override
567    void freeBucketEntry(BucketEntry bucketEntry) {
568      freeBucketEntryCounter.incrementAndGet();
569      super.freeBucketEntry(bucketEntry);
570      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
571      try {
572        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
573      } catch (IOException e) {
574        throw new RuntimeException(e);
575      }
576    }
577  }
578
579  static class MyBucketCache2 extends BucketCache {
580    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
581    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
582    private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
583
584    private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
585    private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
586    private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
587    /**
588     * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
589     * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
590     */
591    private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
592    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
593    private final AtomicInteger removeRamCounter = new AtomicInteger(0);
594    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
595    private ByteBuff overwiteByteBuff = null;
596
597    public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
598      int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
599      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
600        persistencePath);
601    }
602
603    @Override
604    protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
605      super.putIntoBackingMap(key, bucketEntry);
606      /**
607       * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
608       * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
609       */
610      try {
611        evictCyclicBarrier.await();
612      } catch (Throwable e) {
613        throw new RuntimeException(e);
614      }
615
616      /**
617       * Wait the cyclicBarrier.await() in
618       * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
619       * {@link MyBucketCache#getBlock} and Assert completed.
620       */
621      try {
622        putCyclicBarrier.await();
623      } catch (Throwable e) {
624        throw new RuntimeException(e);
625      }
626    }
627
628    @Override
629    void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
630      super.doDrain(entries, metaBuff);
631      if (entries.size() > 0) {
632        /**
633         * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
634         * {@link #EVICT_BLOCK_THREAD_NAME}.
635         */
636        try {
637          writeThreadDoneCyclicBarrier.await();
638        } catch (Throwable e) {
639          throw new RuntimeException(e);
640        }
641      }
642
643    }
644
645    @Override
646    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
647      boolean updateCacheMetrics) {
648      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
649        /**
650         * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
651         * {@link BucketCache#removeFromRamCache}.
652         */
653        try {
654          getCyclicBarrier.await();
655        } catch (Throwable e) {
656          throw new RuntimeException(e);
657        }
658      }
659      Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
660      return result;
661    }
662
663    @Override
664    protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
665      boolean firstTime = false;
666      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
667        int count = this.removeRamCounter.incrementAndGet();
668        firstTime = (count == 1);
669        if (firstTime) {
670          /**
671           * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
672           * {@link BucketCache#putIntoBackingMap}.
673           */
674          try {
675            evictCyclicBarrier.await();
676          } catch (Throwable e) {
677            throw new RuntimeException(e);
678          }
679        }
680      }
681      boolean result = super.removeFromRamCache(cacheKey);
682      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
683        if (firstTime) {
684          /**
685           * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
686           */
687          try {
688            getCyclicBarrier.await();
689          } catch (Throwable e) {
690            throw new RuntimeException(e);
691          }
692          /**
693           * Wait for Caching Block completed, after Caching Block completed, evictBlock could
694           * continue.
695           */
696          try {
697            writeThreadDoneCyclicBarrier.await();
698          } catch (Throwable e) {
699            throw new RuntimeException(e);
700          }
701        }
702      }
703
704      return result;
705    }
706
707    @Override
708    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
709      boolean evictedByEvictionProcess) {
710      /**
711       * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
712       * only one {@link BucketCache.WriterThread}.
713       */
714      assertTrue(Thread.currentThread() == this.writerThreads[0]);
715
716      blockEvictCounter.incrementAndGet();
717      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
718    }
719
720    /**
721     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
722     * {@link BucketEntry} is freed.
723     */
724    @Override
725    void freeBucketEntry(BucketEntry bucketEntry) {
726      freeBucketEntryCounter.incrementAndGet();
727      super.freeBucketEntry(bucketEntry);
728      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
729      try {
730        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
731      } catch (IOException e) {
732        throw new RuntimeException(e);
733      }
734    }
735  }
736
737  private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
738    int byteSize = bucketEntry.getLength();
739    byte[] data = new byte[byteSize];
740    Arrays.fill(data, (byte) 0xff);
741    return ByteBuff.wrap(ByteBuffer.wrap(data));
742  }
743}