001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile; 020 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Arrays; 030import java.util.HashSet; 031import java.util.Random; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.MultithreadedTestUtil; 038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 039import org.apache.hadoop.hbase.io.ByteBuffAllocator; 040import org.apache.hadoop.hbase.io.HeapSize; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 043import org.apache.hadoop.hbase.nio.ByteBuff; 044import org.apache.hadoop.hbase.util.ChecksumType; 045 046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 047 048public class CacheTestUtils { 049 050 private static final boolean includesMemstoreTS = true; 051 052 /** 053 * Just checks if heapsize grows when something is cached, and gets smaller 054 * when the same object is evicted 055 */ 056 057 public static void testHeapSizeChanges(final BlockCache toBeTested, 058 final int blockSize) { 059 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1); 060 long heapSize = ((HeapSize) toBeTested).heapSize(); 061 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block); 062 063 /*When we cache something HeapSize should always increase */ 064 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize()); 065 066 toBeTested.evictBlock(blocks[0].blockName); 067 068 /*Post eviction, heapsize should be the same */ 069 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); 070 } 071 072 public static void testCacheMultiThreaded(final BlockCache toBeTested, 073 final int blockSize, final int numThreads, final int numQueries, 074 final double passingScore) throws Exception { 075 076 Configuration conf = new Configuration(); 077 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( 078 conf); 079 080 final AtomicInteger totalQueries = new AtomicInteger(); 081 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); 082 final AtomicInteger hits = new AtomicInteger(); 083 final AtomicInteger miss = new AtomicInteger(); 084 085 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); 086 blocksToTest.addAll(Arrays.asList(blocks)); 087 088 for (int i = 0; i < numThreads; i++) { 089 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 090 @Override 091 public void doAnAction() throws Exception { 092 if (!blocksToTest.isEmpty()) { 093 HFileBlockPair ourBlock = blocksToTest.poll(); 094 // if we run out of blocks to test, then we should stop the tests. 095 if (ourBlock == null) { 096 ctx.setStopFlag(true); 097 return; 098 } 099 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); 100 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, 101 false, false, true); 102 if (retrievedBlock != null) { 103 assertEquals(ourBlock.block, retrievedBlock); 104 toBeTested.evictBlock(ourBlock.blockName); 105 hits.incrementAndGet(); 106 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); 107 } else { 108 miss.incrementAndGet(); 109 } 110 totalQueries.incrementAndGet(); 111 } 112 } 113 }; 114 t.setDaemon(true); 115 ctx.addThread(t); 116 } 117 ctx.startThreads(); 118 while (!blocksToTest.isEmpty() && ctx.shouldRun()) { 119 Thread.sleep(10); 120 } 121 ctx.stop(); 122 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { 123 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " 124 + miss.get()); 125 } 126 } 127 128 public static void testCacheSimple(BlockCache toBeTested, int blockSize, 129 int numBlocks) throws Exception { 130 131 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks); 132 // Confirm empty 133 for (HFileBlockPair block : blocks) { 134 assertNull(toBeTested.getBlock(block.blockName, true, false, true)); 135 } 136 137 // Add blocks 138 for (HFileBlockPair block : blocks) { 139 toBeTested.cacheBlock(block.blockName, block.block); 140 } 141 142 // Check if all blocks are properly cached and contain the right 143 // information, or the blocks are null. 144 // MapMaker makes no guarantees when it will evict, so neither can we. 145 146 for (HFileBlockPair block : blocks) { 147 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); 148 if (buf != null) { 149 assertEquals(block.block, buf); 150 } 151 } 152 153 // Re-add some duplicate blocks. Hope nothing breaks. 154 155 for (HFileBlockPair block : blocks) { 156 try { 157 if (toBeTested.getBlock(block.blockName, true, false, true) != null) { 158 toBeTested.cacheBlock(block.blockName, block.block); 159 if (!(toBeTested instanceof BucketCache)) { 160 // BucketCache won't throw exception when caching already cached 161 // block 162 fail("Cache should not allow re-caching a block"); 163 } 164 } 165 } catch (RuntimeException re) { 166 // expected 167 } 168 } 169 170 } 171 172 public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries) 173 throws Exception { 174 final BlockCacheKey key = new BlockCacheKey("key", 0); 175 final byte[] buf = new byte[5 * 1024]; 176 Arrays.fill(buf, (byte) 5); 177 178 final ByteArrayCacheable bac = new ByteArrayCacheable(buf); 179 Configuration conf = new Configuration(); 180 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 181 182 final AtomicInteger totalQueries = new AtomicInteger(); 183 toBeTested.cacheBlock(key, bac); 184 185 for (int i = 0; i < numThreads; i++) { 186 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 187 @Override 188 public void doAnAction() throws Exception { 189 ByteArrayCacheable returned = 190 (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true); 191 if (returned != null) { 192 assertArrayEquals(buf, returned.buf); 193 } else { 194 Thread.sleep(10); 195 } 196 totalQueries.incrementAndGet(); 197 } 198 }; 199 200 t.setDaemon(true); 201 ctx.addThread(t); 202 } 203 204 // add a thread to periodically evict and re-cache the block 205 final long blockEvictPeriod = 50; 206 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 207 @Override 208 public void doAnAction() throws Exception { 209 toBeTested.evictBlock(key); 210 toBeTested.cacheBlock(key, bac); 211 Thread.sleep(blockEvictPeriod); 212 } 213 }; 214 t.setDaemon(true); 215 ctx.addThread(t); 216 217 ctx.startThreads(); 218 while (totalQueries.get() < numQueries && ctx.shouldRun()) { 219 Thread.sleep(10); 220 } 221 ctx.stop(); 222 } 223 224 public static class ByteArrayCacheable implements Cacheable { 225 226 private static final CacheableDeserializer<Cacheable> blockDeserializer = 227 new CacheableDeserializer<Cacheable>() { 228 @Override 229 public int getDeserializerIdentifier() { 230 return deserializerIdentifier; 231 } 232 233 @Override 234 public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { 235 int len = b.getInt(); 236 Thread.yield(); 237 byte[] buf = new byte[len]; 238 b.get(buf); 239 return new ByteArrayCacheable(buf); 240 } 241 }; 242 243 final byte[] buf; 244 245 public ByteArrayCacheable(byte[] buf) { 246 this.buf = buf; 247 } 248 249 @Override 250 public long heapSize() { 251 return 4L + buf.length; 252 } 253 254 @Override 255 public int getSerializedLength() { 256 return 4 + buf.length; 257 } 258 259 @Override 260 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 261 destination.putInt(buf.length); 262 Thread.yield(); 263 destination.put(buf); 264 destination.rewind(); 265 } 266 267 @Override 268 public CacheableDeserializer<Cacheable> getDeserializer() { 269 return blockDeserializer; 270 } 271 272 private static final int deserializerIdentifier; 273 static { 274 deserializerIdentifier = CacheableDeserializerIdManager 275 .registerDeserializer(blockDeserializer); 276 } 277 278 @Override 279 public BlockType getBlockType() { 280 return BlockType.DATA; 281 } 282 } 283 284 285 public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { 286 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; 287 Random rand = new Random(); 288 HashSet<String> usedStrings = new HashSet<>(); 289 for (int i = 0; i < numBlocks; i++) { 290 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); 291 rand.nextBytes(cachedBuffer.array()); 292 cachedBuffer.rewind(); 293 int onDiskSizeWithoutHeader = blockSize; 294 int uncompressedSizeWithoutHeader = blockSize; 295 long prevBlockOffset = rand.nextLong(); 296 BlockType.DATA.write(cachedBuffer); 297 cachedBuffer.putInt(onDiskSizeWithoutHeader); 298 cachedBuffer.putInt(uncompressedSizeWithoutHeader); 299 cachedBuffer.putLong(prevBlockOffset); 300 cachedBuffer.rewind(); 301 HFileContext meta = new HFileContextBuilder() 302 .withHBaseCheckSum(false) 303 .withIncludesMvcc(includesMemstoreTS) 304 .withIncludesTags(false) 305 .withCompression(Compression.Algorithm.NONE) 306 .withBytesPerCheckSum(0) 307 .withChecksumType(ChecksumType.NULL) 308 .build(); 309 HFileBlock generated = 310 new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 311 prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize, 312 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta, 313 ByteBuffAllocator.HEAP); 314 315 String strKey; 316 /* No conflicting keys */ 317 strKey = Long.toString(rand.nextLong()); 318 while (!usedStrings.add(strKey)) { 319 strKey = Long.toString(rand.nextLong()); 320 } 321 322 returnedBlocks[i] = new HFileBlockPair(); 323 returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0); 324 returnedBlocks[i].block = generated; 325 } 326 return returnedBlocks; 327 } 328 329 @VisibleForTesting 330 public static class HFileBlockPair { 331 BlockCacheKey blockName; 332 HFileBlock block; 333 334 public BlockCacheKey getBlockName() { 335 return this.blockName; 336 } 337 338 public HFileBlock getBlock() { 339 return this.block; 340 } 341 } 342 343 public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, 344 Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) { 345 destBuffer.clear(); 346 cache.cacheBlock(key, blockToCache); 347 Cacheable actualBlock = cache.getBlock(key, false, false, false); 348 try { 349 actualBlock.serialize(destBuffer, true); 350 assertEquals(expectedBuffer, destBuffer); 351 } finally { 352 // Release the reference count increased by getBlock. 353 if (actualBlock != null) { 354 actualBlock.release(); 355 } 356 } 357 } 358}