001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.nio.ByteBuffer;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.atomic.AtomicBoolean;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.io.ByteBuffAllocator;
030import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
031import org.apache.hadoop.hbase.io.hfile.BlockType;
032import org.apache.hadoop.hbase.io.hfile.HFileBlock;
033import org.apache.hadoop.hbase.io.hfile.HFileContext;
034import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
036import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.testclassification.IOTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@Tag(IOTests.TAG)
046@Tag(SmallTests.TAG)
047public class TestRAMCache {
048  private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class);
049
050  // Define a mock HFileBlock.
051  private static class MockHFileBlock extends HFileBlock {
052
053    private volatile CountDownLatch latch;
054
055    MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
056      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
057      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
058      ByteBuffAllocator allocator) {
059      super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
060        ByteBuff.wrap(b), fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader,
061        fileContext, allocator);
062    }
063
064    public void setLatch(CountDownLatch latch) {
065      this.latch = latch;
066    }
067
068    public MockHFileBlock retain() {
069      try {
070        if (latch != null) {
071          latch.await();
072        }
073      } catch (InterruptedException e) {
074        LOG.info("Interrupted exception error: ", e);
075      }
076      super.retain();
077      return this;
078    }
079  }
080
081  @Test
082  public void testAtomicRAMCache() throws Exception {
083    int size = 100;
084    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
085    byte[] byteArr = new byte[length];
086
087    RAMCache cache = new RAMCache();
088    BlockCacheKey key = new BlockCacheKey("file-1", 1);
089    MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
090      ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
091      new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
092    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false, false);
093
094    assertNull(cache.putIfAbsent(key, re));
095    assertEquals(cache.putIfAbsent(key, re), re);
096
097    CountDownLatch latch = new CountDownLatch(1);
098    blk.setLatch(latch);
099
100    AtomicBoolean error = new AtomicBoolean(false);
101    Thread t1 = new Thread(() -> {
102      try {
103        cache.get(key);
104      } catch (Exception e) {
105        error.set(true);
106      }
107    });
108    t1.start();
109    Thread.sleep(200);
110
111    AtomicBoolean removed = new AtomicBoolean(false);
112    Thread t2 = new Thread(() -> {
113      cache.remove(key);
114      removed.set(true);
115    });
116    t2.start();
117    Thread.sleep(200);
118    assertFalse(removed.get());
119
120    latch.countDown();
121    Thread.sleep(200);
122    assertTrue(removed.get());
123    assertFalse(error.get());
124  }
125}