001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile; 020 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Arrays; 030import java.util.HashSet; 031import java.util.Random; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.MultithreadedTestUtil; 038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 039import org.apache.hadoop.hbase.io.HeapSize; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 042import org.apache.hadoop.hbase.nio.ByteBuff; 043import org.apache.hadoop.hbase.util.ChecksumType; 044 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046 047public class CacheTestUtils { 048 049 private static final boolean includesMemstoreTS = true; 050 051 /** 052 * Just checks if heapsize grows when something is cached, and gets smaller 053 * when the same object is evicted 054 */ 055 056 public static void testHeapSizeChanges(final BlockCache toBeTested, 057 final int blockSize) { 058 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1); 059 long heapSize = ((HeapSize) toBeTested).heapSize(); 060 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block); 061 062 /*When we cache something HeapSize should always increase */ 063 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize()); 064 065 toBeTested.evictBlock(blocks[0].blockName); 066 067 /*Post eviction, heapsize should be the same */ 068 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); 069 } 070 071 public static void testCacheMultiThreaded(final BlockCache toBeTested, 072 final int blockSize, final int numThreads, final int numQueries, 073 final double passingScore) throws Exception { 074 075 Configuration conf = new Configuration(); 076 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( 077 conf); 078 079 final AtomicInteger totalQueries = new AtomicInteger(); 080 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); 081 final AtomicInteger hits = new AtomicInteger(); 082 final AtomicInteger miss = new AtomicInteger(); 083 084 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); 085 blocksToTest.addAll(Arrays.asList(blocks)); 086 087 for (int i = 0; i < numThreads; i++) { 088 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 089 @Override 090 public void doAnAction() throws Exception { 091 if (!blocksToTest.isEmpty()) { 092 HFileBlockPair ourBlock = blocksToTest.poll(); 093 // if we run out of blocks to test, then we should stop the tests. 094 if (ourBlock == null) { 095 ctx.setStopFlag(true); 096 return; 097 } 098 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); 099 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, 100 false, false, true); 101 if (retrievedBlock != null) { 102 assertEquals(ourBlock.block, retrievedBlock); 103 toBeTested.evictBlock(ourBlock.blockName); 104 hits.incrementAndGet(); 105 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); 106 } else { 107 miss.incrementAndGet(); 108 } 109 totalQueries.incrementAndGet(); 110 } 111 } 112 }; 113 t.setDaemon(true); 114 ctx.addThread(t); 115 } 116 ctx.startThreads(); 117 while (!blocksToTest.isEmpty() && ctx.shouldRun()) { 118 Thread.sleep(10); 119 } 120 ctx.stop(); 121 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { 122 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " 123 + miss.get()); 124 } 125 } 126 127 public static void testCacheSimple(BlockCache toBeTested, int blockSize, 128 int numBlocks) throws Exception { 129 130 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks); 131 // Confirm empty 132 for (HFileBlockPair block : blocks) { 133 assertNull(toBeTested.getBlock(block.blockName, true, false, true)); 134 } 135 136 // Add blocks 137 for (HFileBlockPair block : blocks) { 138 toBeTested.cacheBlock(block.blockName, block.block); 139 } 140 141 // Check if all blocks are properly cached and contain the right 142 // information, or the blocks are null. 143 // MapMaker makes no guarantees when it will evict, so neither can we. 144 145 for (HFileBlockPair block : blocks) { 146 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); 147 if (buf != null) { 148 assertEquals(block.block, buf); 149 } 150 151 } 152 153 // Re-add some duplicate blocks. Hope nothing breaks. 154 155 for (HFileBlockPair block : blocks) { 156 try { 157 if (toBeTested.getBlock(block.blockName, true, false, true) != null) { 158 toBeTested.cacheBlock(block.blockName, block.block); 159 if (!(toBeTested instanceof BucketCache)) { 160 // BucketCache won't throw exception when caching already cached 161 // block 162 fail("Cache should not allow re-caching a block"); 163 } 164 } 165 } catch (RuntimeException re) { 166 // expected 167 } 168 } 169 170 } 171 172 public static void hammerSingleKey(final BlockCache toBeTested, 173 int BlockSize, int numThreads, int numQueries) throws Exception { 174 final BlockCacheKey key = new BlockCacheKey("key", 0); 175 final byte[] buf = new byte[5 * 1024]; 176 Arrays.fill(buf, (byte) 5); 177 178 final ByteArrayCacheable bac = new ByteArrayCacheable(buf); 179 Configuration conf = new Configuration(); 180 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( 181 conf); 182 183 final AtomicInteger totalQueries = new AtomicInteger(); 184 toBeTested.cacheBlock(key, bac); 185 186 for (int i = 0; i < numThreads; i++) { 187 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 188 @Override 189 public void doAnAction() throws Exception { 190 ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested 191 .getBlock(key, false, false, true); 192 if (returned != null) { 193 assertArrayEquals(buf, returned.buf); 194 } else { 195 Thread.sleep(10); 196 } 197 totalQueries.incrementAndGet(); 198 } 199 }; 200 201 t.setDaemon(true); 202 ctx.addThread(t); 203 } 204 205 // add a thread to periodically evict and re-cache the block 206 final long blockEvictPeriod = 50; 207 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 208 @Override 209 public void doAnAction() throws Exception { 210 toBeTested.evictBlock(key); 211 toBeTested.cacheBlock(key, bac); 212 Thread.sleep(blockEvictPeriod); 213 } 214 }; 215 t.setDaemon(true); 216 ctx.addThread(t); 217 218 ctx.startThreads(); 219 while (totalQueries.get() < numQueries && ctx.shouldRun()) { 220 Thread.sleep(10); 221 } 222 ctx.stop(); 223 } 224 225 public static void hammerEviction(final BlockCache toBeTested, int BlockSize, 226 int numThreads, int numQueries) throws Exception { 227 228 Configuration conf = new Configuration(); 229 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( 230 conf); 231 232 final AtomicInteger totalQueries = new AtomicInteger(); 233 234 for (int i = 0; i < numThreads; i++) { 235 final int finalI = i; 236 237 final byte[] buf = new byte[5 * 1024]; 238 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 239 @Override 240 public void doAnAction() throws Exception { 241 for (int j = 0; j < 100; j++) { 242 BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0); 243 Arrays.fill(buf, (byte) (finalI * j)); 244 final ByteArrayCacheable bac = new ByteArrayCacheable(buf); 245 246 ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested 247 .getBlock(key, true, false, true); 248 if (gotBack != null) { 249 assertArrayEquals(gotBack.buf, bac.buf); 250 } else { 251 toBeTested.cacheBlock(key, bac); 252 } 253 } 254 totalQueries.incrementAndGet(); 255 } 256 }; 257 258 t.setDaemon(true); 259 ctx.addThread(t); 260 } 261 262 ctx.startThreads(); 263 while (totalQueries.get() < numQueries && ctx.shouldRun()) { 264 Thread.sleep(10); 265 } 266 ctx.stop(); 267 268 assertTrue(toBeTested.getStats().getEvictedCount() > 0); 269 } 270 271 public static class ByteArrayCacheable implements Cacheable { 272 273 static final CacheableDeserializer<Cacheable> blockDeserializer = 274 new CacheableDeserializer<Cacheable>() { 275 276 @Override 277 public Cacheable deserialize(ByteBuff b) throws IOException { 278 int len = b.getInt(); 279 Thread.yield(); 280 byte buf[] = new byte[len]; 281 b.get(buf); 282 return new ByteArrayCacheable(buf); 283 } 284 285 @Override 286 public int getDeserialiserIdentifier() { 287 return deserializerIdentifier; 288 } 289 290 @Override 291 public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType) 292 throws IOException { 293 return deserialize(b); 294 } 295 }; 296 297 final byte[] buf; 298 299 public ByteArrayCacheable(byte[] buf) { 300 this.buf = buf; 301 } 302 303 @Override 304 public long heapSize() { 305 return 4L + buf.length; 306 } 307 308 @Override 309 public int getSerializedLength() { 310 return 4 + buf.length; 311 } 312 313 @Override 314 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 315 destination.putInt(buf.length); 316 Thread.yield(); 317 destination.put(buf); 318 destination.rewind(); 319 } 320 321 @Override 322 public CacheableDeserializer<Cacheable> getDeserializer() { 323 return blockDeserializer; 324 } 325 326 private static final int deserializerIdentifier; 327 static { 328 deserializerIdentifier = CacheableDeserializerIdManager 329 .registerDeserializer(blockDeserializer); 330 } 331 332 @Override 333 public BlockType getBlockType() { 334 return BlockType.DATA; 335 } 336 337 @Override 338 public MemoryType getMemoryType() { 339 return MemoryType.EXCLUSIVE; 340 } 341 } 342 343 344 public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { 345 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; 346 Random rand = new Random(); 347 HashSet<String> usedStrings = new HashSet<>(); 348 for (int i = 0; i < numBlocks; i++) { 349 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); 350 rand.nextBytes(cachedBuffer.array()); 351 cachedBuffer.rewind(); 352 int onDiskSizeWithoutHeader = blockSize; 353 int uncompressedSizeWithoutHeader = blockSize; 354 long prevBlockOffset = rand.nextLong(); 355 BlockType.DATA.write(cachedBuffer); 356 cachedBuffer.putInt(onDiskSizeWithoutHeader); 357 cachedBuffer.putInt(uncompressedSizeWithoutHeader); 358 cachedBuffer.putLong(prevBlockOffset); 359 cachedBuffer.rewind(); 360 HFileContext meta = new HFileContextBuilder() 361 .withHBaseCheckSum(false) 362 .withIncludesMvcc(includesMemstoreTS) 363 .withIncludesTags(false) 364 .withCompression(Compression.Algorithm.NONE) 365 .withBytesPerCheckSum(0) 366 .withChecksumType(ChecksumType.NULL) 367 .build(); 368 HFileBlock generated = new HFileBlock(BlockType.DATA, 369 onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 370 prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER, 371 blockSize, 372 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta); 373 374 String strKey; 375 /* No conflicting keys */ 376 strKey = Long.toString(rand.nextLong()); 377 while (!usedStrings.add(strKey)) { 378 strKey = Long.toString(rand.nextLong()); 379 } 380 381 returnedBlocks[i] = new HFileBlockPair(); 382 returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0); 383 returnedBlocks[i].block = generated; 384 } 385 return returnedBlocks; 386 } 387 388 @VisibleForTesting 389 public static class HFileBlockPair { 390 BlockCacheKey blockName; 391 HFileBlock block; 392 393 public BlockCacheKey getBlockName() { 394 return this.blockName; 395 } 396 397 public HFileBlock getBlock() { 398 return this.block; 399 } 400 } 401 402 public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, 403 Cacheable blockToCache, ByteBuffer destBuffer, 404 ByteBuffer expectedBuffer) { 405 destBuffer.clear(); 406 cache.cacheBlock(key, blockToCache); 407 Cacheable actualBlock = cache.getBlock(key, false, false, false); 408 actualBlock.serialize(destBuffer, true); 409 assertEquals(expectedBuffer, destBuffer); 410 cache.returnBlock(key, actualBlock); 411 } 412}