001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.nio.ByteBuffer;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.atomic.AtomicBoolean;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.io.ByteBuffAllocator;
026import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
027import org.apache.hadoop.hbase.io.hfile.BlockType;
028import org.apache.hadoop.hbase.io.hfile.HFileBlock;
029import org.apache.hadoop.hbase.io.hfile.HFileContext;
030import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
031import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
032import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
033import org.apache.hadoop.hbase.nio.ByteBuff;
034import org.apache.hadoop.hbase.testclassification.IOTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.junit.Assert;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Category({ IOTests.class, SmallTests.class })
044public class TestRAMCache {
045  private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class);
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestRAMCache.class);
050
051  // Define a mock HFileBlock.
052  private static class MockHFileBlock extends HFileBlock {
053
054    private volatile CountDownLatch latch;
055
056    MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
057      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
058      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
059      ByteBuffAllocator allocator) {
060      super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
061        ByteBuff.wrap(b), fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader,
062        fileContext, allocator);
063    }
064
065    public void setLatch(CountDownLatch latch) {
066      this.latch = latch;
067    }
068
069    public MockHFileBlock retain() {
070      try {
071        if (latch != null) {
072          latch.await();
073        }
074      } catch (InterruptedException e) {
075        LOG.info("Interrupted exception error: ", e);
076      }
077      super.retain();
078      return this;
079    }
080  }
081
082  @Test
083  public void testAtomicRAMCache() throws Exception {
084    int size = 100;
085    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
086    byte[] byteArr = new byte[length];
087
088    RAMCache cache = new RAMCache();
089    BlockCacheKey key = new BlockCacheKey("file-1", 1);
090    MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
091      ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
092      new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
093    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
094
095    Assert.assertNull(cache.putIfAbsent(key, re));
096    Assert.assertEquals(cache.putIfAbsent(key, re), re);
097
098    CountDownLatch latch = new CountDownLatch(1);
099    blk.setLatch(latch);
100
101    AtomicBoolean error = new AtomicBoolean(false);
102    Thread t1 = new Thread(() -> {
103      try {
104        cache.get(key);
105      } catch (Exception e) {
106        error.set(true);
107      }
108    });
109    t1.start();
110    Thread.sleep(200);
111
112    AtomicBoolean removed = new AtomicBoolean(false);
113    Thread t2 = new Thread(() -> {
114      cache.remove(key);
115      removed.set(true);
116    });
117    t2.start();
118    Thread.sleep(200);
119    Assert.assertFalse(removed.get());
120
121    latch.countDown();
122    Thread.sleep(200);
123    Assert.assertTrue(removed.get());
124    Assert.assertFalse(error.get());
125  }
126}