001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.Arrays;
030import java.util.HashSet;
031import java.util.Random;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.MultithreadedTestUtil;
038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
039import org.apache.hadoop.hbase.io.HeapSize;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
042import org.apache.hadoop.hbase.nio.ByteBuff;
043import org.apache.hadoop.hbase.util.ChecksumType;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046
047public class CacheTestUtils {
048
049  private static final boolean includesMemstoreTS = true;
050
051  /**
052   * Just checks if heapsize grows when something is cached, and gets smaller
053   * when the same object is evicted
054   */
055
056  public static void testHeapSizeChanges(final BlockCache toBeTested,
057      final int blockSize) {
058    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
059    long heapSize = ((HeapSize) toBeTested).heapSize();
060    toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
061
062    /*When we cache something HeapSize should always increase */
063    assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
064
065    toBeTested.evictBlock(blocks[0].blockName);
066
067    /*Post eviction, heapsize should be the same */
068    assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
069  }
070
071  public static void testCacheMultiThreaded(final BlockCache toBeTested,
072      final int blockSize, final int numThreads, final int numQueries,
073      final double passingScore) throws Exception {
074
075    Configuration conf = new Configuration();
076    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
077        conf);
078
079    final AtomicInteger totalQueries = new AtomicInteger();
080    final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
081    final AtomicInteger hits = new AtomicInteger();
082    final AtomicInteger miss = new AtomicInteger();
083
084    HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
085    blocksToTest.addAll(Arrays.asList(blocks));
086
087    for (int i = 0; i < numThreads; i++) {
088      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
089        @Override
090        public void doAnAction() throws Exception {
091          if (!blocksToTest.isEmpty()) {
092            HFileBlockPair ourBlock = blocksToTest.poll();
093            // if we run out of blocks to test, then we should stop the tests.
094            if (ourBlock == null) {
095              ctx.setStopFlag(true);
096              return;
097            }
098            toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
099            Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
100                false, false, true);
101            if (retrievedBlock != null) {
102              assertEquals(ourBlock.block, retrievedBlock);
103              toBeTested.evictBlock(ourBlock.blockName);
104              hits.incrementAndGet();
105              assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
106            } else {
107              miss.incrementAndGet();
108            }
109            totalQueries.incrementAndGet();
110          }
111        }
112      };
113      t.setDaemon(true);
114      ctx.addThread(t);
115    }
116    ctx.startThreads();
117    while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
118      Thread.sleep(10);
119    }
120    ctx.stop();
121    if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
122      fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
123          + miss.get());
124    }
125  }
126
127  public static void testCacheSimple(BlockCache toBeTested, int blockSize,
128      int numBlocks) throws Exception {
129
130    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks);
131    // Confirm empty
132    for (HFileBlockPair block : blocks) {
133      assertNull(toBeTested.getBlock(block.blockName, true, false, true));
134    }
135
136    // Add blocks
137    for (HFileBlockPair block : blocks) {
138      toBeTested.cacheBlock(block.blockName, block.block);
139    }
140
141    // Check if all blocks are properly cached and contain the right
142    // information, or the blocks are null.
143    // MapMaker makes no guarantees when it will evict, so neither can we.
144
145    for (HFileBlockPair block : blocks) {
146      HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
147      if (buf != null) {
148        assertEquals(block.block, buf);
149      }
150
151    }
152
153    // Re-add some duplicate blocks. Hope nothing breaks.
154
155    for (HFileBlockPair block : blocks) {
156      try {
157        if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
158          toBeTested.cacheBlock(block.blockName, block.block);
159          if (!(toBeTested instanceof BucketCache)) {
160            // BucketCache won't throw exception when caching already cached
161            // block
162            fail("Cache should not allow re-caching a block");
163          }
164        }
165      } catch (RuntimeException re) {
166        // expected
167      }
168    }
169
170  }
171
172  public static void hammerSingleKey(final BlockCache toBeTested,
173      int BlockSize, int numThreads, int numQueries) throws Exception {
174    final BlockCacheKey key = new BlockCacheKey("key", 0);
175    final byte[] buf = new byte[5 * 1024];
176    Arrays.fill(buf, (byte) 5);
177
178    final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
179    Configuration conf = new Configuration();
180    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
181        conf);
182
183    final AtomicInteger totalQueries = new AtomicInteger();
184    toBeTested.cacheBlock(key, bac);
185
186    for (int i = 0; i < numThreads; i++) {
187      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
188        @Override
189        public void doAnAction() throws Exception {
190          ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
191              .getBlock(key, false, false, true);
192          if (returned != null) {
193            assertArrayEquals(buf, returned.buf);
194          } else {
195            Thread.sleep(10);
196          }
197          totalQueries.incrementAndGet();
198        }
199      };
200
201      t.setDaemon(true);
202      ctx.addThread(t);
203    }
204
205    // add a thread to periodically evict and re-cache the block
206    final long blockEvictPeriod = 50;
207    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
208      @Override
209      public void doAnAction() throws Exception {
210        toBeTested.evictBlock(key);
211        toBeTested.cacheBlock(key, bac);
212        Thread.sleep(blockEvictPeriod);
213      }
214    };
215    t.setDaemon(true);
216    ctx.addThread(t);
217
218    ctx.startThreads();
219    while (totalQueries.get() < numQueries && ctx.shouldRun()) {
220      Thread.sleep(10);
221    }
222    ctx.stop();
223  }
224
225  public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
226      int numThreads, int numQueries) throws Exception {
227
228    Configuration conf = new Configuration();
229    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
230        conf);
231
232    final AtomicInteger totalQueries = new AtomicInteger();
233
234    for (int i = 0; i < numThreads; i++) {
235      final int finalI = i;
236
237      final byte[] buf = new byte[5 * 1024];
238      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
239        @Override
240        public void doAnAction() throws Exception {
241          for (int j = 0; j < 100; j++) {
242            BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0);
243            Arrays.fill(buf, (byte) (finalI * j));
244            final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
245
246            ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
247                .getBlock(key, true, false, true);
248            if (gotBack != null) {
249              assertArrayEquals(gotBack.buf, bac.buf);
250            } else {
251              toBeTested.cacheBlock(key, bac);
252            }
253          }
254          totalQueries.incrementAndGet();
255        }
256      };
257
258      t.setDaemon(true);
259      ctx.addThread(t);
260    }
261
262    ctx.startThreads();
263    while (totalQueries.get() < numQueries && ctx.shouldRun()) {
264      Thread.sleep(10);
265    }
266    ctx.stop();
267
268    assertTrue(toBeTested.getStats().getEvictedCount() > 0);
269  }
270
271  public static class ByteArrayCacheable implements Cacheable {
272
273    static final CacheableDeserializer<Cacheable> blockDeserializer =
274      new CacheableDeserializer<Cacheable>() {
275
276      @Override
277      public Cacheable deserialize(ByteBuff b) throws IOException {
278        int len = b.getInt();
279        Thread.yield();
280        byte buf[] = new byte[len];
281        b.get(buf);
282        return new ByteArrayCacheable(buf);
283      }
284
285      @Override
286      public int getDeserialiserIdentifier() {
287        return deserializerIdentifier;
288      }
289
290      @Override
291      public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType)
292          throws IOException {
293        return deserialize(b);
294      }
295    };
296
297    final byte[] buf;
298
299    public ByteArrayCacheable(byte[] buf) {
300      this.buf = buf;
301    }
302
303    @Override
304    public long heapSize() {
305      return 4L + buf.length;
306    }
307
308    @Override
309    public int getSerializedLength() {
310      return 4 + buf.length;
311    }
312
313    @Override
314    public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
315      destination.putInt(buf.length);
316      Thread.yield();
317      destination.put(buf);
318      destination.rewind();
319    }
320
321    @Override
322    public CacheableDeserializer<Cacheable> getDeserializer() {
323      return blockDeserializer;
324    }
325
326    private static final int deserializerIdentifier;
327    static {
328      deserializerIdentifier = CacheableDeserializerIdManager
329          .registerDeserializer(blockDeserializer);
330    }
331
332    @Override
333    public BlockType getBlockType() {
334      return BlockType.DATA;
335    }
336
337    @Override
338    public MemoryType getMemoryType() {
339      return MemoryType.EXCLUSIVE;
340    }
341  }
342
343
344  public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
345    HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
346    Random rand = new Random();
347    HashSet<String> usedStrings = new HashSet<>();
348    for (int i = 0; i < numBlocks; i++) {
349      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
350      rand.nextBytes(cachedBuffer.array());
351      cachedBuffer.rewind();
352      int onDiskSizeWithoutHeader = blockSize;
353      int uncompressedSizeWithoutHeader = blockSize;
354      long prevBlockOffset = rand.nextLong();
355      BlockType.DATA.write(cachedBuffer);
356      cachedBuffer.putInt(onDiskSizeWithoutHeader);
357      cachedBuffer.putInt(uncompressedSizeWithoutHeader);
358      cachedBuffer.putLong(prevBlockOffset);
359      cachedBuffer.rewind();
360      HFileContext meta = new HFileContextBuilder()
361                          .withHBaseCheckSum(false)
362                          .withIncludesMvcc(includesMemstoreTS)
363                          .withIncludesTags(false)
364                          .withCompression(Compression.Algorithm.NONE)
365                          .withBytesPerCheckSum(0)
366                          .withChecksumType(ChecksumType.NULL)
367                          .build();
368      HFileBlock generated = new HFileBlock(BlockType.DATA,
369          onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
370          prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
371          blockSize,
372          onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta);
373
374      String strKey;
375      /* No conflicting keys */
376      strKey = Long.toString(rand.nextLong());
377      while (!usedStrings.add(strKey)) {
378        strKey = Long.toString(rand.nextLong());
379      }
380
381      returnedBlocks[i] = new HFileBlockPair();
382      returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
383      returnedBlocks[i].block = generated;
384    }
385    return returnedBlocks;
386  }
387
388  @VisibleForTesting
389  public static class HFileBlockPair {
390    BlockCacheKey blockName;
391    HFileBlock block;
392
393    public BlockCacheKey getBlockName() {
394      return this.blockName;
395    }
396
397    public HFileBlock getBlock() {
398      return this.block;
399    }
400  }
401
402  public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key,
403                                             Cacheable blockToCache, ByteBuffer destBuffer,
404                                             ByteBuffer expectedBuffer) {
405    destBuffer.clear();
406    cache.cacheBlock(key, blockToCache);
407    Cacheable actualBlock = cache.getBlock(key, false, false, false);
408    actualBlock.serialize(destBuffer, true);
409    assertEquals(expectedBuffer, destBuffer);
410    cache.returnBlock(key, actualBlock);
411  }
412}