001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.Arrays;
029import java.util.HashSet;
030import java.util.Random;
031import java.util.concurrent.ConcurrentLinkedQueue;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.MultithreadedTestUtil;
037import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
038import org.apache.hadoop.hbase.io.ByteBuffAllocator;
039import org.apache.hadoop.hbase.io.HeapSize;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
042import org.apache.hadoop.hbase.nio.ByteBuff;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.ChecksumType;
045
046public class CacheTestUtils {
047
048  private static final boolean includesMemstoreTS = true;
049
050  /**
051   * Just checks if heapsize grows when something is cached, and gets smaller when the same object
052   * is evicted
053   */
054
055  public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize) {
056    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
057    long heapSize = ((HeapSize) toBeTested).heapSize();
058    toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
059
060    /* When we cache something HeapSize should always increase */
061    assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
062
063    toBeTested.evictBlock(blocks[0].blockName);
064
065    /* Post eviction, heapsize should be the same */
066    assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
067  }
068
069  public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize,
070    final int numThreads, final int numQueries, final double passingScore) throws Exception {
071
072    Configuration conf = new Configuration();
073    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
074
075    final AtomicInteger totalQueries = new AtomicInteger();
076    final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
077    final AtomicInteger hits = new AtomicInteger();
078    final AtomicInteger miss = new AtomicInteger();
079
080    HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
081    blocksToTest.addAll(Arrays.asList(blocks));
082
083    for (int i = 0; i < numThreads; i++) {
084      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
085        @Override
086        public void doAnAction() throws Exception {
087          if (!blocksToTest.isEmpty()) {
088            HFileBlockPair ourBlock = blocksToTest.poll();
089            // if we run out of blocks to test, then we should stop the tests.
090            if (ourBlock == null) {
091              ctx.setStopFlag(true);
092              return;
093            }
094            toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
095            Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true);
096            if (retrievedBlock != null) {
097              assertEquals(ourBlock.block, retrievedBlock);
098              toBeTested.evictBlock(ourBlock.blockName);
099              hits.incrementAndGet();
100              assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
101            } else {
102              miss.incrementAndGet();
103            }
104            totalQueries.incrementAndGet();
105          }
106        }
107      };
108      t.setDaemon(true);
109      ctx.addThread(t);
110    }
111    ctx.startThreads();
112    while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
113      Thread.sleep(10);
114    }
115    ctx.stop();
116    if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
117      fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get());
118    }
119  }
120
121  public static void testCacheSimple(BlockCache toBeTested, int blockSize, int numBlocks)
122    throws Exception {
123
124    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks);
125    // Confirm empty
126    for (HFileBlockPair block : blocks) {
127      assertNull(toBeTested.getBlock(block.blockName, true, false, true));
128    }
129
130    // Add blocks
131    for (HFileBlockPair block : blocks) {
132      toBeTested.cacheBlock(block.blockName, block.block);
133    }
134
135    // Check if all blocks are properly cached and contain the right
136    // information, or the blocks are null.
137    // MapMaker makes no guarantees when it will evict, so neither can we.
138
139    for (HFileBlockPair block : blocks) {
140      HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
141      if (buf != null) {
142        assertEquals(block.block, buf);
143      }
144    }
145
146    // Re-add some duplicate blocks. Hope nothing breaks.
147
148    for (HFileBlockPair block : blocks) {
149      try {
150        if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
151          toBeTested.cacheBlock(block.blockName, block.block);
152          if (!(toBeTested instanceof BucketCache)) {
153            // BucketCache won't throw exception when caching already cached
154            // block
155            fail("Cache should not allow re-caching a block");
156          }
157        }
158      } catch (RuntimeException re) {
159        // expected
160      }
161    }
162
163  }
164
165  public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries)
166    throws Exception {
167    final BlockCacheKey key = new BlockCacheKey("key", 0);
168    final byte[] buf = new byte[5 * 1024];
169    Arrays.fill(buf, (byte) 5);
170
171    final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
172    Configuration conf = new Configuration();
173    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
174
175    final AtomicInteger totalQueries = new AtomicInteger();
176    toBeTested.cacheBlock(key, bac);
177
178    for (int i = 0; i < numThreads; i++) {
179      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
180        @Override
181        public void doAnAction() throws Exception {
182          ByteArrayCacheable returned =
183            (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true);
184          if (returned != null) {
185            assertArrayEquals(buf, returned.buf);
186          } else {
187            Thread.sleep(10);
188          }
189          totalQueries.incrementAndGet();
190        }
191      };
192
193      t.setDaemon(true);
194      ctx.addThread(t);
195    }
196
197    // add a thread to periodically evict and re-cache the block
198    final long blockEvictPeriod = 50;
199    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
200      @Override
201      public void doAnAction() throws Exception {
202        toBeTested.evictBlock(key);
203        toBeTested.cacheBlock(key, bac);
204        Thread.sleep(blockEvictPeriod);
205      }
206    };
207    t.setDaemon(true);
208    ctx.addThread(t);
209
210    ctx.startThreads();
211    while (totalQueries.get() < numQueries && ctx.shouldRun()) {
212      Thread.sleep(10);
213    }
214    ctx.stop();
215  }
216
217  public static class ByteArrayCacheable implements Cacheable {
218
219    static final CacheableDeserializer<Cacheable> blockDeserializer =
220      new CacheableDeserializer<Cacheable>() {
221        @Override
222        public int getDeserializerIdentifier() {
223          return deserializerIdentifier;
224        }
225
226        @Override
227        public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
228          int len = b.getInt();
229          Thread.yield();
230          byte buf[] = new byte[len];
231          b.get(buf);
232          return new ByteArrayCacheable(buf);
233        }
234      };
235
236    final byte[] buf;
237
238    public ByteArrayCacheable(byte[] buf) {
239      this.buf = buf;
240    }
241
242    @Override
243    public long heapSize() {
244      return 4L + buf.length;
245    }
246
247    @Override
248    public int getSerializedLength() {
249      return 4 + buf.length;
250    }
251
252    @Override
253    public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
254      destination.putInt(buf.length);
255      Thread.yield();
256      destination.put(buf);
257      destination.rewind();
258    }
259
260    @Override
261    public CacheableDeserializer<Cacheable> getDeserializer() {
262      return blockDeserializer;
263    }
264
265    private static final int deserializerIdentifier;
266    static {
267      deserializerIdentifier =
268        CacheableDeserializerIdManager.registerDeserializer(blockDeserializer);
269    }
270
271    @Override
272    public BlockType getBlockType() {
273      return BlockType.DATA;
274    }
275  }
276
277  public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
278    HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
279    Random rand = ThreadLocalRandom.current();
280    HashSet<String> usedStrings = new HashSet<>();
281    for (int i = 0; i < numBlocks; i++) {
282      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
283      Bytes.random(cachedBuffer.array());
284      cachedBuffer.rewind();
285      int onDiskSizeWithoutHeader = blockSize;
286      int uncompressedSizeWithoutHeader = blockSize;
287      long prevBlockOffset = rand.nextLong();
288      BlockType.DATA.write(cachedBuffer);
289      cachedBuffer.putInt(onDiskSizeWithoutHeader);
290      cachedBuffer.putInt(uncompressedSizeWithoutHeader);
291      cachedBuffer.putLong(prevBlockOffset);
292      cachedBuffer.rewind();
293      HFileContext meta =
294        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
295          .withIncludesTags(false).withCompression(Compression.Algorithm.NONE)
296          .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL).build();
297      HFileBlock generated =
298        new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
299          prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
300          onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
301          ByteBuffAllocator.HEAP);
302
303      String strKey;
304      /* No conflicting keys */
305      strKey = Long.toString(rand.nextLong());
306      while (!usedStrings.add(strKey)) {
307        strKey = Long.toString(rand.nextLong());
308      }
309
310      returnedBlocks[i] = new HFileBlockPair();
311      returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
312      returnedBlocks[i].block = generated;
313    }
314    return returnedBlocks;
315  }
316
317  public static class HFileBlockPair {
318    BlockCacheKey blockName;
319    HFileBlock block;
320
321    public BlockCacheKey getBlockName() {
322      return this.blockName;
323    }
324
325    public HFileBlock getBlock() {
326      return this.block;
327    }
328  }
329
330  public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key,
331    Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) {
332    destBuffer.clear();
333    cache.cacheBlock(key, blockToCache);
334    Cacheable actualBlock = cache.getBlock(key, false, false, false);
335    try {
336      actualBlock.serialize(destBuffer, true);
337      assertEquals(expectedBuffer, destBuffer);
338    } finally {
339      // Release the reference count increased by getBlock.
340      if (actualBlock != null) {
341        actualBlock.release();
342      }
343    }
344  }
345}