001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.io.ByteBuffAllocator;
032import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
033import org.apache.hadoop.hbase.io.hfile.BlockType;
034import org.apache.hadoop.hbase.io.hfile.Cacheable;
035import org.apache.hadoop.hbase.io.hfile.HFileBlock;
036import org.apache.hadoop.hbase.io.hfile.HFileContext;
037import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
038import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
039import org.apache.hadoop.hbase.nio.ByteBuff;
040import org.apache.hadoop.hbase.testclassification.IOTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045
046@Category({ IOTests.class, SmallTests.class })
047public class TestBucketCacheRefCnt {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class);
052
053  private static final String IO_ENGINE = "offheap";
054  private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
055  private static final int BLOCK_SIZE = 1024;
056  private static final int[] BLOCK_SIZE_ARRAY =
057      new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
058  private static final String PERSISTENCE_PATH = null;
059  private static final HFileContext CONTEXT = new HFileContextBuilder().build();
060
061  private BucketCache cache;
062
063  private static BucketCache create(int writerSize, int queueSize) throws IOException {
064    return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
065        queueSize, PERSISTENCE_PATH);
066  }
067
068  private static HFileBlock createBlock(int offset, int size) {
069    return createBlock(offset, size, ByteBuffAllocator.HEAP);
070  }
071
072  private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
073    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
074        HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
075  }
076
077  private static BlockCacheKey createKey(String hfileName, long offset) {
078    return new BlockCacheKey(hfileName, offset);
079  }
080
081  private void disableWriter() {
082    if (cache != null) {
083      for (WriterThread wt : cache.writerThreads) {
084        wt.disableWriter();
085        wt.interrupt();
086      }
087    }
088  }
089
090  @org.junit.Ignore @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082
091  // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
092  public void testBlockInRAMCache() throws IOException {
093    cache = create(1, 1000);
094    // Set this to true;
095    cache.wait_when_cache = true;
096    disableWriter();
097    final String prefix = "testBlockInRamCache";
098    try {
099      for (int i = 0; i < 10; i++) {
100        HFileBlock blk = createBlock(i, 1020);
101        BlockCacheKey key = createKey(prefix, i);
102        assertEquals(1, blk.refCnt());
103        cache.cacheBlock(key, blk);
104        assertEquals(i + 1, cache.getBlockCount());
105        assertEquals(2, blk.refCnt());
106
107        Cacheable block = cache.getBlock(key, false, false, false);
108        try {
109          assertEquals(3, blk.refCnt());
110          assertEquals(3, block.refCnt());
111          assertEquals(blk, block);
112        } finally {
113          block.release();
114        }
115        assertEquals(2, blk.refCnt());
116        assertEquals(2, block.refCnt());
117      }
118
119      for (int i = 0; i < 10; i++) {
120        BlockCacheKey key = createKey(prefix, i);
121        Cacheable blk = cache.getBlock(key, false, false, false);
122        assertEquals(3, blk.refCnt());
123        assertFalse(blk.release());
124        assertEquals(2, blk.refCnt());
125
126        assertTrue(cache.evictBlock(key));
127        assertEquals(1, blk.refCnt());
128        assertTrue(blk.release());
129        assertEquals(0, blk.refCnt());
130      }
131    } finally {
132      cache.shutdown();
133    }
134  }
135
136  private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException {
137    while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) {
138      Thread.sleep(100);
139    }
140    Thread.sleep(1000);
141  }
142
143  @Test
144  public void testBlockInBackingMap() throws Exception {
145    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
146    cache = create(1, 1000);
147    try {
148      HFileBlock blk = createBlock(200, 1020, alloc);
149      BlockCacheKey key = createKey("testHFile-00", 200);
150      cache.cacheBlock(key, blk);
151      waitUntilFlushedToCache(key);
152      assertEquals(1, blk.refCnt());
153
154      Cacheable block = cache.getBlock(key, false, false, false);
155      assertTrue(block instanceof HFileBlock);
156      assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
157      assertEquals(2, block.refCnt());
158
159      block.retain();
160      assertEquals(3, block.refCnt());
161
162      Cacheable newBlock = cache.getBlock(key, false, false, false);
163      assertTrue(newBlock instanceof HFileBlock);
164      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
165      assertEquals(4, newBlock.refCnt());
166
167      // release the newBlock
168      assertFalse(newBlock.release());
169      assertEquals(3, newBlock.refCnt());
170      assertEquals(3, block.refCnt());
171
172      // Evict the key
173      cache.evictBlock(key);
174      assertEquals(2, block.refCnt());
175
176      // Evict again, shouldn't change the refCnt.
177      cache.evictBlock(key);
178      assertEquals(2, block.refCnt());
179
180      assertFalse(block.release());
181      assertEquals(1, block.refCnt());
182
183      newBlock = cache.getBlock(key, false, false, false);
184      assertEquals(2, block.refCnt());
185      assertEquals(2, newBlock.refCnt());
186      assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
187
188      // Release the block
189      assertFalse(block.release());
190      assertEquals(1, block.refCnt());
191
192      // Release the newBlock;
193      assertTrue(newBlock.release());
194      assertEquals(0, newBlock.refCnt());
195    } finally {
196      cache.shutdown();
197    }
198  }
199
200  @Test
201  public void testInBucketCache() throws IOException {
202    ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
203    cache = create(1, 1000);
204    try {
205      HFileBlock blk = createBlock(200, 1020, alloc);
206      BlockCacheKey key = createKey("testHFile-00", 200);
207      cache.cacheBlock(key, blk);
208      assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
209
210      Cacheable block1 = cache.getBlock(key, false, false, false);
211      assertTrue(block1.refCnt() >= 2);
212      assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc);
213
214      Cacheable block2 = cache.getBlock(key, false, false, false);
215      assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc);
216      assertTrue(block2.refCnt() >= 3);
217
218      cache.evictBlock(key);
219      assertTrue(blk.refCnt() >= 1);
220      assertTrue(block1.refCnt() >= 2);
221      assertTrue(block2.refCnt() >= 2);
222
223      // Get key again
224      Cacheable block3 = cache.getBlock(key, false, false, false);
225      if (block3 != null) {
226        assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc);
227        assertTrue(block3.refCnt() >= 3);
228        assertFalse(block3.release());
229      }
230
231      blk.release();
232      boolean ret1 = block1.release();
233      boolean ret2 = block2.release();
234      assertTrue(ret1 || ret2);
235      assertEquals(0, blk.refCnt());
236      assertEquals(0, block1.refCnt());
237      assertEquals(0, block2.refCnt());
238    } finally {
239      cache.shutdown();
240    }
241  }
242
243  @Test
244  public void testMarkStaleAsEvicted() throws Exception {
245    cache = create(1, 1000);
246    try {
247      HFileBlock blk = createBlock(200, 1020);
248      BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
249      cache.cacheBlock(key, blk);
250      waitUntilFlushedToCache(key);
251      assertEquals(1, blk.refCnt());
252      assertNotNull(cache.backingMap.get(key));
253      assertEquals(1, cache.backingMap.get(key).refCnt());
254
255      // RPC reference this cache.
256      Cacheable block1 = cache.getBlock(key, false, false, false);
257      assertEquals(2, block1.refCnt());
258      BucketEntry be1 = cache.backingMap.get(key);
259      assertNotNull(be1);
260      assertEquals(2, be1.refCnt());
261
262      // We've some RPC reference, so it won't have any effect.
263      assertFalse(be1.markStaleAsEvicted());
264      assertEquals(2, block1.refCnt());
265      assertEquals(2, cache.backingMap.get(key).refCnt());
266
267      // Release the RPC reference.
268      block1.release();
269      assertEquals(1, block1.refCnt());
270      assertEquals(1, cache.backingMap.get(key).refCnt());
271
272      // Mark the stale as evicted again, it'll do the de-allocation.
273      assertTrue(be1.markStaleAsEvicted());
274      assertEquals(0, block1.refCnt());
275      assertNull(cache.backingMap.get(key));
276      assertEquals(0, cache.size());
277    } finally {
278      cache.shutdown();
279    }
280  }
281}