001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.nio.ByteBuffer;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.io.ByteBuffAllocator;
027import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
028import org.apache.hadoop.hbase.io.hfile.BlockType;
029import org.apache.hadoop.hbase.io.hfile.HFileBlock;
030import org.apache.hadoop.hbase.io.hfile.HFileContext;
031import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
032import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
033import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
034import org.apache.hadoop.hbase.nio.ByteBuff;
035import org.apache.hadoop.hbase.testclassification.IOTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.junit.Assert;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@Category({ IOTests.class, SmallTests.class })
045public class TestRAMCache {
046  private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class);
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestRAMCache.class);
051
052  // Define a mock HFileBlock.
053  private static class MockHFileBlock extends HFileBlock {
054
055    private volatile CountDownLatch latch;
056
057    MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
058        int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
059        long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
060        HFileContext fileContext, ByteBuffAllocator allocator) {
061      super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
062          ByteBuff.wrap(b), fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader,
063          fileContext, allocator);
064    }
065
066    public void setLatch(CountDownLatch latch) {
067      this.latch = latch;
068    }
069
070    public MockHFileBlock retain() {
071      try {
072        if (latch != null) {
073          latch.await();
074        }
075      } catch (InterruptedException e) {
076        LOG.info("Interrupted exception error: ", e);
077      }
078      super.retain();
079      return this;
080    }
081  }
082
083  @Test
084  public void testAtomicRAMCache() throws Exception {
085    int size = 100;
086    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
087    byte[] byteArr = new byte[length];
088
089    RAMCache cache = new RAMCache();
090    BlockCacheKey key = new BlockCacheKey("file-1", 1);
091    MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
092        ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
093        new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
094    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
095
096    Assert.assertNull(cache.putIfAbsent(key, re));
097    Assert.assertEquals(cache.putIfAbsent(key, re), re);
098
099    CountDownLatch latch = new CountDownLatch(1);
100    blk.setLatch(latch);
101
102    AtomicBoolean error = new AtomicBoolean(false);
103    Thread t1 = new Thread(() -> {
104      try {
105        cache.get(key);
106      } catch (Exception e) {
107        error.set(true);
108      }
109    });
110    t1.start();
111    Thread.sleep(200);
112
113    AtomicBoolean removed = new AtomicBoolean(false);
114    Thread t2 = new Thread(() -> {
115      cache.remove(key);
116      removed.set(true);
117    });
118    t2.start();
119    Thread.sleep(200);
120    Assert.assertFalse(removed.get());
121
122    latch.countDown();
123    Thread.sleep(200);
124    Assert.assertTrue(removed.get());
125    Assert.assertFalse(error.get());
126  }
127}