001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.Arrays;
030import java.util.HashSet;
031import java.util.Random;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.MultithreadedTestUtil;
038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
039import org.apache.hadoop.hbase.io.ByteBuffAllocator;
040import org.apache.hadoop.hbase.io.HeapSize;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
043import org.apache.hadoop.hbase.nio.ByteBuff;
044import org.apache.hadoop.hbase.util.ChecksumType;
045
046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
047
048public class CacheTestUtils {
049
050  private static final boolean includesMemstoreTS = true;
051
052  /**
053   * Just checks if heapsize grows when something is cached, and gets smaller
054   * when the same object is evicted
055   */
056
057  public static void testHeapSizeChanges(final BlockCache toBeTested,
058      final int blockSize) {
059    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
060    long heapSize = ((HeapSize) toBeTested).heapSize();
061    toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
062
063    /*When we cache something HeapSize should always increase */
064    assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
065
066    toBeTested.evictBlock(blocks[0].blockName);
067
068    /*Post eviction, heapsize should be the same */
069    assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
070  }
071
072  public static void testCacheMultiThreaded(final BlockCache toBeTested,
073      final int blockSize, final int numThreads, final int numQueries,
074      final double passingScore) throws Exception {
075
076    Configuration conf = new Configuration();
077    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
078        conf);
079
080    final AtomicInteger totalQueries = new AtomicInteger();
081    final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
082    final AtomicInteger hits = new AtomicInteger();
083    final AtomicInteger miss = new AtomicInteger();
084
085    HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
086    blocksToTest.addAll(Arrays.asList(blocks));
087
088    for (int i = 0; i < numThreads; i++) {
089      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
090        @Override
091        public void doAnAction() throws Exception {
092          if (!blocksToTest.isEmpty()) {
093            HFileBlockPair ourBlock = blocksToTest.poll();
094            // if we run out of blocks to test, then we should stop the tests.
095            if (ourBlock == null) {
096              ctx.setStopFlag(true);
097              return;
098            }
099            toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
100            Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
101                false, false, true);
102            if (retrievedBlock != null) {
103              assertEquals(ourBlock.block, retrievedBlock);
104              toBeTested.evictBlock(ourBlock.blockName);
105              hits.incrementAndGet();
106              assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
107            } else {
108              miss.incrementAndGet();
109            }
110            totalQueries.incrementAndGet();
111          }
112        }
113      };
114      t.setDaemon(true);
115      ctx.addThread(t);
116    }
117    ctx.startThreads();
118    while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
119      Thread.sleep(10);
120    }
121    ctx.stop();
122    if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
123      fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
124          + miss.get());
125    }
126  }
127
128  public static void testCacheSimple(BlockCache toBeTested, int blockSize,
129      int numBlocks) throws Exception {
130
131    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks);
132    // Confirm empty
133    for (HFileBlockPair block : blocks) {
134      assertNull(toBeTested.getBlock(block.blockName, true, false, true));
135    }
136
137    // Add blocks
138    for (HFileBlockPair block : blocks) {
139      toBeTested.cacheBlock(block.blockName, block.block);
140    }
141
142    // Check if all blocks are properly cached and contain the right
143    // information, or the blocks are null.
144    // MapMaker makes no guarantees when it will evict, so neither can we.
145
146    for (HFileBlockPair block : blocks) {
147      HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
148      if (buf != null) {
149        assertEquals(block.block, buf);
150      }
151    }
152
153    // Re-add some duplicate blocks. Hope nothing breaks.
154
155    for (HFileBlockPair block : blocks) {
156      try {
157        if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
158          toBeTested.cacheBlock(block.blockName, block.block);
159          if (!(toBeTested instanceof BucketCache)) {
160            // BucketCache won't throw exception when caching already cached
161            // block
162            fail("Cache should not allow re-caching a block");
163          }
164        }
165      } catch (RuntimeException re) {
166        // expected
167      }
168    }
169
170  }
171
172  public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries)
173      throws Exception {
174    final BlockCacheKey key = new BlockCacheKey("key", 0);
175    final byte[] buf = new byte[5 * 1024];
176    Arrays.fill(buf, (byte) 5);
177
178    final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
179    Configuration conf = new Configuration();
180    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
181
182    final AtomicInteger totalQueries = new AtomicInteger();
183    toBeTested.cacheBlock(key, bac);
184
185    for (int i = 0; i < numThreads; i++) {
186      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
187        @Override
188        public void doAnAction() throws Exception {
189          ByteArrayCacheable returned =
190              (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true);
191          if (returned != null) {
192            assertArrayEquals(buf, returned.buf);
193          } else {
194            Thread.sleep(10);
195          }
196          totalQueries.incrementAndGet();
197        }
198      };
199
200      t.setDaemon(true);
201      ctx.addThread(t);
202    }
203
204    // add a thread to periodically evict and re-cache the block
205    final long blockEvictPeriod = 50;
206    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
207      @Override
208      public void doAnAction() throws Exception {
209        toBeTested.evictBlock(key);
210        toBeTested.cacheBlock(key, bac);
211        Thread.sleep(blockEvictPeriod);
212      }
213    };
214    t.setDaemon(true);
215    ctx.addThread(t);
216
217    ctx.startThreads();
218    while (totalQueries.get() < numQueries && ctx.shouldRun()) {
219      Thread.sleep(10);
220    }
221    ctx.stop();
222  }
223
224  public static class ByteArrayCacheable implements Cacheable {
225
226    private static final CacheableDeserializer<Cacheable> blockDeserializer =
227      new CacheableDeserializer<Cacheable>() {
228        @Override
229        public int getDeserializerIdentifier() {
230          return deserializerIdentifier;
231        }
232
233        @Override
234        public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
235          int len = b.getInt();
236          Thread.yield();
237          byte[] buf = new byte[len];
238          b.get(buf);
239          return new ByteArrayCacheable(buf);
240        }
241      };
242
243    final byte[] buf;
244
245    public ByteArrayCacheable(byte[] buf) {
246      this.buf = buf;
247    }
248
249    @Override
250    public long heapSize() {
251      return 4L + buf.length;
252    }
253
254    @Override
255    public int getSerializedLength() {
256      return 4 + buf.length;
257    }
258
259    @Override
260    public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
261      destination.putInt(buf.length);
262      Thread.yield();
263      destination.put(buf);
264      destination.rewind();
265    }
266
267    @Override
268    public CacheableDeserializer<Cacheable> getDeserializer() {
269      return blockDeserializer;
270    }
271
272    private static final int deserializerIdentifier;
273    static {
274      deserializerIdentifier = CacheableDeserializerIdManager
275          .registerDeserializer(blockDeserializer);
276    }
277
278    @Override
279    public BlockType getBlockType() {
280      return BlockType.DATA;
281    }
282  }
283
284
285  public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
286    HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
287    Random rand = new Random();
288    HashSet<String> usedStrings = new HashSet<>();
289    for (int i = 0; i < numBlocks; i++) {
290      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
291      rand.nextBytes(cachedBuffer.array());
292      cachedBuffer.rewind();
293      int onDiskSizeWithoutHeader = blockSize;
294      int uncompressedSizeWithoutHeader = blockSize;
295      long prevBlockOffset = rand.nextLong();
296      BlockType.DATA.write(cachedBuffer);
297      cachedBuffer.putInt(onDiskSizeWithoutHeader);
298      cachedBuffer.putInt(uncompressedSizeWithoutHeader);
299      cachedBuffer.putLong(prevBlockOffset);
300      cachedBuffer.rewind();
301      HFileContext meta = new HFileContextBuilder()
302                          .withHBaseCheckSum(false)
303                          .withIncludesMvcc(includesMemstoreTS)
304                          .withIncludesTags(false)
305                          .withCompression(Compression.Algorithm.NONE)
306                          .withBytesPerCheckSum(0)
307                          .withChecksumType(ChecksumType.NULL)
308                          .build();
309      HFileBlock generated =
310          new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
311              prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
312              onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
313              ByteBuffAllocator.HEAP);
314
315      String strKey;
316      /* No conflicting keys */
317      strKey = Long.toString(rand.nextLong());
318      while (!usedStrings.add(strKey)) {
319        strKey = Long.toString(rand.nextLong());
320      }
321
322      returnedBlocks[i] = new HFileBlockPair();
323      returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
324      returnedBlocks[i].block = generated;
325    }
326    return returnedBlocks;
327  }
328
329  @VisibleForTesting
330  public static class HFileBlockPair {
331    BlockCacheKey blockName;
332    HFileBlock block;
333
334    public BlockCacheKey getBlockName() {
335      return this.blockName;
336    }
337
338    public HFileBlock getBlock() {
339      return this.block;
340    }
341  }
342
343  public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key,
344      Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) {
345    destBuffer.clear();
346    cache.cacheBlock(key, blockToCache);
347    Cacheable actualBlock = cache.getBlock(key, false, false, false);
348    try {
349      actualBlock.serialize(destBuffer, true);
350      assertEquals(expectedBuffer, destBuffer);
351    } finally {
352      // Release the reference count increased by getBlock.
353      if (actualBlock != null) {
354        actualBlock.release();
355      }
356    }
357  }
358}