001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.Arrays; 029import java.util.HashSet; 030import java.util.Random; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.MultithreadedTestUtil; 037import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 038import org.apache.hadoop.hbase.io.ByteBuffAllocator; 039import org.apache.hadoop.hbase.io.HeapSize; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 042import org.apache.hadoop.hbase.nio.ByteBuff; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.ChecksumType; 045 046public class CacheTestUtils { 047 048 private static final boolean includesMemstoreTS = true; 049 050 /** 051 * Just checks if heapsize grows when something is cached, and gets smaller when the same object 052 * is evicted 053 */ 054 055 public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize) { 056 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1); 057 long heapSize = ((HeapSize) toBeTested).heapSize(); 058 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block); 059 060 /* When we cache something HeapSize should always increase */ 061 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize()); 062 063 toBeTested.evictBlock(blocks[0].blockName); 064 065 /* Post eviction, heapsize should be the same */ 066 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); 067 } 068 069 public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, 070 final int numThreads, final int numQueries, final double passingScore) throws Exception { 071 072 Configuration conf = new Configuration(); 073 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 074 075 final AtomicInteger totalQueries = new AtomicInteger(); 076 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); 077 final AtomicInteger hits = new AtomicInteger(); 078 final AtomicInteger miss = new AtomicInteger(); 079 080 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); 081 blocksToTest.addAll(Arrays.asList(blocks)); 082 083 for (int i = 0; i < numThreads; i++) { 084 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 085 @Override 086 public void doAnAction() throws Exception { 087 if (!blocksToTest.isEmpty()) { 088 HFileBlockPair ourBlock = blocksToTest.poll(); 089 // if we run out of blocks to test, then we should stop the tests. 090 if (ourBlock == null) { 091 ctx.setStopFlag(true); 092 return; 093 } 094 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); 095 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true); 096 if (retrievedBlock != null) { 097 assertEquals(ourBlock.block, retrievedBlock); 098 toBeTested.evictBlock(ourBlock.blockName); 099 hits.incrementAndGet(); 100 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); 101 } else { 102 miss.incrementAndGet(); 103 } 104 totalQueries.incrementAndGet(); 105 } 106 } 107 }; 108 t.setDaemon(true); 109 ctx.addThread(t); 110 } 111 ctx.startThreads(); 112 while (!blocksToTest.isEmpty() && ctx.shouldRun()) { 113 Thread.sleep(10); 114 } 115 ctx.stop(); 116 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { 117 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); 118 } 119 } 120 121 public static void testCacheSimple(BlockCache toBeTested, int blockSize, int numBlocks) 122 throws Exception { 123 124 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks); 125 // Confirm empty 126 for (HFileBlockPair block : blocks) { 127 assertNull(toBeTested.getBlock(block.blockName, true, false, true)); 128 } 129 130 // Add blocks 131 for (HFileBlockPair block : blocks) { 132 toBeTested.cacheBlock(block.blockName, block.block); 133 } 134 135 // Check if all blocks are properly cached and contain the right 136 // information, or the blocks are null. 137 // MapMaker makes no guarantees when it will evict, so neither can we. 138 139 for (HFileBlockPair block : blocks) { 140 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); 141 if (buf != null) { 142 assertEquals(block.block, buf); 143 } 144 } 145 146 // Re-add some duplicate blocks. Hope nothing breaks. 147 148 for (HFileBlockPair block : blocks) { 149 try { 150 if (toBeTested.getBlock(block.blockName, true, false, true) != null) { 151 toBeTested.cacheBlock(block.blockName, block.block); 152 if (!(toBeTested instanceof BucketCache)) { 153 // BucketCache won't throw exception when caching already cached 154 // block 155 fail("Cache should not allow re-caching a block"); 156 } 157 } 158 } catch (RuntimeException re) { 159 // expected 160 } 161 } 162 163 } 164 165 public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries) 166 throws Exception { 167 final BlockCacheKey key = new BlockCacheKey("key", 0); 168 final byte[] buf = new byte[5 * 1024]; 169 Arrays.fill(buf, (byte) 5); 170 171 final ByteArrayCacheable bac = new ByteArrayCacheable(buf); 172 Configuration conf = new Configuration(); 173 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 174 175 final AtomicInteger totalQueries = new AtomicInteger(); 176 toBeTested.cacheBlock(key, bac); 177 178 for (int i = 0; i < numThreads; i++) { 179 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 180 @Override 181 public void doAnAction() throws Exception { 182 ByteArrayCacheable returned = 183 (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true); 184 if (returned != null) { 185 assertArrayEquals(buf, returned.buf); 186 } else { 187 Thread.sleep(10); 188 } 189 totalQueries.incrementAndGet(); 190 } 191 }; 192 193 t.setDaemon(true); 194 ctx.addThread(t); 195 } 196 197 // add a thread to periodically evict and re-cache the block 198 final long blockEvictPeriod = 50; 199 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 200 @Override 201 public void doAnAction() throws Exception { 202 toBeTested.evictBlock(key); 203 toBeTested.cacheBlock(key, bac); 204 Thread.sleep(blockEvictPeriod); 205 } 206 }; 207 t.setDaemon(true); 208 ctx.addThread(t); 209 210 ctx.startThreads(); 211 while (totalQueries.get() < numQueries && ctx.shouldRun()) { 212 Thread.sleep(10); 213 } 214 ctx.stop(); 215 } 216 217 public static class ByteArrayCacheable implements Cacheable { 218 219 private static final CacheableDeserializer<Cacheable> blockDeserializer = 220 new CacheableDeserializer<Cacheable>() { 221 @Override 222 public int getDeserializerIdentifier() { 223 return deserializerIdentifier; 224 } 225 226 @Override 227 public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { 228 int len = b.getInt(); 229 Thread.yield(); 230 byte[] buf = new byte[len]; 231 b.get(buf); 232 return new ByteArrayCacheable(buf); 233 } 234 }; 235 236 final byte[] buf; 237 238 public ByteArrayCacheable(byte[] buf) { 239 this.buf = buf; 240 } 241 242 @Override 243 public long heapSize() { 244 return 4L + buf.length; 245 } 246 247 @Override 248 public int getSerializedLength() { 249 return 4 + buf.length; 250 } 251 252 @Override 253 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 254 destination.putInt(buf.length); 255 Thread.yield(); 256 destination.put(buf); 257 destination.rewind(); 258 } 259 260 @Override 261 public CacheableDeserializer<Cacheable> getDeserializer() { 262 return blockDeserializer; 263 } 264 265 private static final int deserializerIdentifier; 266 static { 267 deserializerIdentifier = 268 CacheableDeserializerIdManager.registerDeserializer(blockDeserializer); 269 } 270 271 @Override 272 public BlockType getBlockType() { 273 return BlockType.DATA; 274 } 275 } 276 277 public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { 278 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; 279 Random rand = ThreadLocalRandom.current(); 280 HashSet<String> usedStrings = new HashSet<>(); 281 for (int i = 0; i < numBlocks; i++) { 282 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); 283 Bytes.random(cachedBuffer.array()); 284 cachedBuffer.rewind(); 285 int onDiskSizeWithoutHeader = blockSize; 286 int uncompressedSizeWithoutHeader = blockSize; 287 long prevBlockOffset = rand.nextLong(); 288 BlockType.DATA.write(cachedBuffer); 289 cachedBuffer.putInt(onDiskSizeWithoutHeader); 290 cachedBuffer.putInt(uncompressedSizeWithoutHeader); 291 cachedBuffer.putLong(prevBlockOffset); 292 cachedBuffer.rewind(); 293 HFileContext meta = 294 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 295 .withIncludesTags(false).withCompression(Compression.Algorithm.NONE) 296 .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL).build(); 297 HFileBlock generated = 298 new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 299 prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize, 300 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta, 301 ByteBuffAllocator.HEAP); 302 303 String strKey; 304 /* No conflicting keys */ 305 strKey = Long.toString(rand.nextLong()); 306 while (!usedStrings.add(strKey)) { 307 strKey = Long.toString(rand.nextLong()); 308 } 309 310 returnedBlocks[i] = new HFileBlockPair(); 311 returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0); 312 returnedBlocks[i].block = generated; 313 } 314 return returnedBlocks; 315 } 316 317 public static class HFileBlockPair { 318 BlockCacheKey blockName; 319 HFileBlock block; 320 321 public BlockCacheKey getBlockName() { 322 return this.blockName; 323 } 324 325 public HFileBlock getBlock() { 326 return this.block; 327 } 328 } 329 330 public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, 331 Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) { 332 destBuffer.clear(); 333 cache.cacheBlock(key, blockToCache); 334 Cacheable actualBlock = cache.getBlock(key, false, false, false); 335 try { 336 actualBlock.serialize(destBuffer, true); 337 assertEquals(expectedBuffer, destBuffer); 338 } finally { 339 // Release the reference count increased by getBlock. 340 if (actualBlock != null) { 341 actualBlock.release(); 342 } 343 } 344 } 345}