001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile; 020 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Arrays; 030import java.util.HashSet; 031import java.util.Random; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.MultithreadedTestUtil; 038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 039import org.apache.hadoop.hbase.io.ByteBuffAllocator; 040import org.apache.hadoop.hbase.io.HeapSize; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 043import org.apache.hadoop.hbase.nio.ByteBuff; 044import org.apache.hadoop.hbase.util.ChecksumType; 045 046public class CacheTestUtils { 047 048 private static final boolean includesMemstoreTS = true; 049 050 /** 051 * Just checks if heapsize grows when something is cached, and gets smaller 052 * when the same object is evicted 053 */ 054 055 public static void testHeapSizeChanges(final BlockCache toBeTested, 056 final int blockSize) { 057 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1); 058 long heapSize = ((HeapSize) toBeTested).heapSize(); 059 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block); 060 061 /*When we cache something HeapSize should always increase */ 062 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize()); 063 064 toBeTested.evictBlock(blocks[0].blockName); 065 066 /*Post eviction, heapsize should be the same */ 067 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); 068 } 069 070 public static void testCacheMultiThreaded(final BlockCache toBeTested, 071 final int blockSize, final int numThreads, final int numQueries, 072 final double passingScore) throws Exception { 073 074 Configuration conf = new Configuration(); 075 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( 076 conf); 077 078 final AtomicInteger totalQueries = new AtomicInteger(); 079 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); 080 final AtomicInteger hits = new AtomicInteger(); 081 final AtomicInteger miss = new AtomicInteger(); 082 083 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); 084 blocksToTest.addAll(Arrays.asList(blocks)); 085 086 for (int i = 0; i < numThreads; i++) { 087 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 088 @Override 089 public void doAnAction() throws Exception { 090 if (!blocksToTest.isEmpty()) { 091 HFileBlockPair ourBlock = blocksToTest.poll(); 092 // if we run out of blocks to test, then we should stop the tests. 093 if (ourBlock == null) { 094 ctx.setStopFlag(true); 095 return; 096 } 097 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); 098 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, 099 false, false, true); 100 if (retrievedBlock != null) { 101 assertEquals(ourBlock.block, retrievedBlock); 102 toBeTested.evictBlock(ourBlock.blockName); 103 hits.incrementAndGet(); 104 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); 105 } else { 106 miss.incrementAndGet(); 107 } 108 totalQueries.incrementAndGet(); 109 } 110 } 111 }; 112 t.setDaemon(true); 113 ctx.addThread(t); 114 } 115 ctx.startThreads(); 116 while (!blocksToTest.isEmpty() && ctx.shouldRun()) { 117 Thread.sleep(10); 118 } 119 ctx.stop(); 120 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { 121 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " 122 + miss.get()); 123 } 124 } 125 126 public static void testCacheSimple(BlockCache toBeTested, int blockSize, 127 int numBlocks) throws Exception { 128 129 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks); 130 // Confirm empty 131 for (HFileBlockPair block : blocks) { 132 assertNull(toBeTested.getBlock(block.blockName, true, false, true)); 133 } 134 135 // Add blocks 136 for (HFileBlockPair block : blocks) { 137 toBeTested.cacheBlock(block.blockName, block.block); 138 } 139 140 // Check if all blocks are properly cached and contain the right 141 // information, or the blocks are null. 142 // MapMaker makes no guarantees when it will evict, so neither can we. 143 144 for (HFileBlockPair block : blocks) { 145 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); 146 if (buf != null) { 147 assertEquals(block.block, buf); 148 } 149 } 150 151 // Re-add some duplicate blocks. Hope nothing breaks. 152 153 for (HFileBlockPair block : blocks) { 154 try { 155 if (toBeTested.getBlock(block.blockName, true, false, true) != null) { 156 toBeTested.cacheBlock(block.blockName, block.block); 157 if (!(toBeTested instanceof BucketCache)) { 158 // BucketCache won't throw exception when caching already cached 159 // block 160 fail("Cache should not allow re-caching a block"); 161 } 162 } 163 } catch (RuntimeException re) { 164 // expected 165 } 166 } 167 168 } 169 170 public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries) 171 throws Exception { 172 final BlockCacheKey key = new BlockCacheKey("key", 0); 173 final byte[] buf = new byte[5 * 1024]; 174 Arrays.fill(buf, (byte) 5); 175 176 final ByteArrayCacheable bac = new ByteArrayCacheable(buf); 177 Configuration conf = new Configuration(); 178 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 179 180 final AtomicInteger totalQueries = new AtomicInteger(); 181 toBeTested.cacheBlock(key, bac); 182 183 for (int i = 0; i < numThreads; i++) { 184 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 185 @Override 186 public void doAnAction() throws Exception { 187 ByteArrayCacheable returned = 188 (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true); 189 if (returned != null) { 190 assertArrayEquals(buf, returned.buf); 191 } else { 192 Thread.sleep(10); 193 } 194 totalQueries.incrementAndGet(); 195 } 196 }; 197 198 t.setDaemon(true); 199 ctx.addThread(t); 200 } 201 202 // add a thread to periodically evict and re-cache the block 203 final long blockEvictPeriod = 50; 204 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 205 @Override 206 public void doAnAction() throws Exception { 207 toBeTested.evictBlock(key); 208 toBeTested.cacheBlock(key, bac); 209 Thread.sleep(blockEvictPeriod); 210 } 211 }; 212 t.setDaemon(true); 213 ctx.addThread(t); 214 215 ctx.startThreads(); 216 while (totalQueries.get() < numQueries && ctx.shouldRun()) { 217 Thread.sleep(10); 218 } 219 ctx.stop(); 220 } 221 222 public static class ByteArrayCacheable implements Cacheable { 223 224 private static final CacheableDeserializer<Cacheable> blockDeserializer = 225 new CacheableDeserializer<Cacheable>() { 226 @Override 227 public int getDeserializerIdentifier() { 228 return deserializerIdentifier; 229 } 230 231 @Override 232 public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { 233 int len = b.getInt(); 234 Thread.yield(); 235 byte[] buf = new byte[len]; 236 b.get(buf); 237 return new ByteArrayCacheable(buf); 238 } 239 }; 240 241 final byte[] buf; 242 243 public ByteArrayCacheable(byte[] buf) { 244 this.buf = buf; 245 } 246 247 @Override 248 public long heapSize() { 249 return 4L + buf.length; 250 } 251 252 @Override 253 public int getSerializedLength() { 254 return 4 + buf.length; 255 } 256 257 @Override 258 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 259 destination.putInt(buf.length); 260 Thread.yield(); 261 destination.put(buf); 262 destination.rewind(); 263 } 264 265 @Override 266 public CacheableDeserializer<Cacheable> getDeserializer() { 267 return blockDeserializer; 268 } 269 270 private static final int deserializerIdentifier; 271 static { 272 deserializerIdentifier = CacheableDeserializerIdManager 273 .registerDeserializer(blockDeserializer); 274 } 275 276 @Override 277 public BlockType getBlockType() { 278 return BlockType.DATA; 279 } 280 } 281 282 283 public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { 284 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; 285 Random rand = new Random(); 286 HashSet<String> usedStrings = new HashSet<>(); 287 for (int i = 0; i < numBlocks; i++) { 288 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); 289 rand.nextBytes(cachedBuffer.array()); 290 cachedBuffer.rewind(); 291 int onDiskSizeWithoutHeader = blockSize; 292 int uncompressedSizeWithoutHeader = blockSize; 293 long prevBlockOffset = rand.nextLong(); 294 BlockType.DATA.write(cachedBuffer); 295 cachedBuffer.putInt(onDiskSizeWithoutHeader); 296 cachedBuffer.putInt(uncompressedSizeWithoutHeader); 297 cachedBuffer.putLong(prevBlockOffset); 298 cachedBuffer.rewind(); 299 HFileContext meta = new HFileContextBuilder() 300 .withHBaseCheckSum(false) 301 .withIncludesMvcc(includesMemstoreTS) 302 .withIncludesTags(false) 303 .withCompression(Compression.Algorithm.NONE) 304 .withBytesPerCheckSum(0) 305 .withChecksumType(ChecksumType.NULL) 306 .build(); 307 HFileBlock generated = 308 new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 309 prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize, 310 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta, 311 ByteBuffAllocator.HEAP); 312 313 String strKey; 314 /* No conflicting keys */ 315 strKey = Long.toString(rand.nextLong()); 316 while (!usedStrings.add(strKey)) { 317 strKey = Long.toString(rand.nextLong()); 318 } 319 320 returnedBlocks[i] = new HFileBlockPair(); 321 returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0); 322 returnedBlocks[i].block = generated; 323 } 324 return returnedBlocks; 325 } 326 327 public static class HFileBlockPair { 328 BlockCacheKey blockName; 329 HFileBlock block; 330 331 public BlockCacheKey getBlockName() { 332 return this.blockName; 333 } 334 335 public HFileBlock getBlock() { 336 return this.block; 337 } 338 } 339 340 public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, 341 Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) { 342 destBuffer.clear(); 343 cache.cacheBlock(key, blockToCache); 344 Cacheable actualBlock = cache.getBlock(key, false, false, false); 345 try { 346 actualBlock.serialize(destBuffer, true); 347 assertEquals(expectedBuffer, destBuffer); 348 } finally { 349 // Release the reference count increased by getBlock. 350 if (actualBlock != null) { 351 actualBlock.release(); 352 } 353 } 354 } 355}