001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.Arrays;
030import java.util.HashSet;
031import java.util.Random;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.MultithreadedTestUtil;
038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
039import org.apache.hadoop.hbase.io.ByteBuffAllocator;
040import org.apache.hadoop.hbase.io.HeapSize;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
043import org.apache.hadoop.hbase.nio.ByteBuff;
044import org.apache.hadoop.hbase.util.ChecksumType;
045
046public class CacheTestUtils {
047
048  private static final boolean includesMemstoreTS = true;
049
050  /**
051   * Just checks if heapsize grows when something is cached, and gets smaller
052   * when the same object is evicted
053   */
054
055  public static void testHeapSizeChanges(final BlockCache toBeTested,
056      final int blockSize) {
057    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
058    long heapSize = ((HeapSize) toBeTested).heapSize();
059    toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
060
061    /*When we cache something HeapSize should always increase */
062    assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
063
064    toBeTested.evictBlock(blocks[0].blockName);
065
066    /*Post eviction, heapsize should be the same */
067    assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
068  }
069
070  public static void testCacheMultiThreaded(final BlockCache toBeTested,
071      final int blockSize, final int numThreads, final int numQueries,
072      final double passingScore) throws Exception {
073
074    Configuration conf = new Configuration();
075    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
076        conf);
077
078    final AtomicInteger totalQueries = new AtomicInteger();
079    final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
080    final AtomicInteger hits = new AtomicInteger();
081    final AtomicInteger miss = new AtomicInteger();
082
083    HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
084    blocksToTest.addAll(Arrays.asList(blocks));
085
086    for (int i = 0; i < numThreads; i++) {
087      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
088        @Override
089        public void doAnAction() throws Exception {
090          if (!blocksToTest.isEmpty()) {
091            HFileBlockPair ourBlock = blocksToTest.poll();
092            // if we run out of blocks to test, then we should stop the tests.
093            if (ourBlock == null) {
094              ctx.setStopFlag(true);
095              return;
096            }
097            toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
098            Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
099                false, false, true);
100            if (retrievedBlock != null) {
101              assertEquals(ourBlock.block, retrievedBlock);
102              toBeTested.evictBlock(ourBlock.blockName);
103              hits.incrementAndGet();
104              assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
105            } else {
106              miss.incrementAndGet();
107            }
108            totalQueries.incrementAndGet();
109          }
110        }
111      };
112      t.setDaemon(true);
113      ctx.addThread(t);
114    }
115    ctx.startThreads();
116    while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
117      Thread.sleep(10);
118    }
119    ctx.stop();
120    if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
121      fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
122          + miss.get());
123    }
124  }
125
126  public static void testCacheSimple(BlockCache toBeTested, int blockSize,
127      int numBlocks) throws Exception {
128
129    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks);
130    // Confirm empty
131    for (HFileBlockPair block : blocks) {
132      assertNull(toBeTested.getBlock(block.blockName, true, false, true));
133    }
134
135    // Add blocks
136    for (HFileBlockPair block : blocks) {
137      toBeTested.cacheBlock(block.blockName, block.block);
138    }
139
140    // Check if all blocks are properly cached and contain the right
141    // information, or the blocks are null.
142    // MapMaker makes no guarantees when it will evict, so neither can we.
143
144    for (HFileBlockPair block : blocks) {
145      HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
146      if (buf != null) {
147        assertEquals(block.block, buf);
148      }
149    }
150
151    // Re-add some duplicate blocks. Hope nothing breaks.
152
153    for (HFileBlockPair block : blocks) {
154      try {
155        if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
156          toBeTested.cacheBlock(block.blockName, block.block);
157          if (!(toBeTested instanceof BucketCache)) {
158            // BucketCache won't throw exception when caching already cached
159            // block
160            fail("Cache should not allow re-caching a block");
161          }
162        }
163      } catch (RuntimeException re) {
164        // expected
165      }
166    }
167
168  }
169
170  public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries)
171      throws Exception {
172    final BlockCacheKey key = new BlockCacheKey("key", 0);
173    final byte[] buf = new byte[5 * 1024];
174    Arrays.fill(buf, (byte) 5);
175
176    final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
177    Configuration conf = new Configuration();
178    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
179
180    final AtomicInteger totalQueries = new AtomicInteger();
181    toBeTested.cacheBlock(key, bac);
182
183    for (int i = 0; i < numThreads; i++) {
184      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
185        @Override
186        public void doAnAction() throws Exception {
187          ByteArrayCacheable returned =
188              (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true);
189          if (returned != null) {
190            assertArrayEquals(buf, returned.buf);
191          } else {
192            Thread.sleep(10);
193          }
194          totalQueries.incrementAndGet();
195        }
196      };
197
198      t.setDaemon(true);
199      ctx.addThread(t);
200    }
201
202    // add a thread to periodically evict and re-cache the block
203    final long blockEvictPeriod = 50;
204    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
205      @Override
206      public void doAnAction() throws Exception {
207        toBeTested.evictBlock(key);
208        toBeTested.cacheBlock(key, bac);
209        Thread.sleep(blockEvictPeriod);
210      }
211    };
212    t.setDaemon(true);
213    ctx.addThread(t);
214
215    ctx.startThreads();
216    while (totalQueries.get() < numQueries && ctx.shouldRun()) {
217      Thread.sleep(10);
218    }
219    ctx.stop();
220  }
221
222  public static class ByteArrayCacheable implements Cacheable {
223
224    private static final CacheableDeserializer<Cacheable> blockDeserializer =
225      new CacheableDeserializer<Cacheable>() {
226        @Override
227        public int getDeserializerIdentifier() {
228          return deserializerIdentifier;
229        }
230
231        @Override
232        public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
233          int len = b.getInt();
234          Thread.yield();
235          byte[] buf = new byte[len];
236          b.get(buf);
237          return new ByteArrayCacheable(buf);
238        }
239      };
240
241    final byte[] buf;
242
243    public ByteArrayCacheable(byte[] buf) {
244      this.buf = buf;
245    }
246
247    @Override
248    public long heapSize() {
249      return 4L + buf.length;
250    }
251
252    @Override
253    public int getSerializedLength() {
254      return 4 + buf.length;
255    }
256
257    @Override
258    public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
259      destination.putInt(buf.length);
260      Thread.yield();
261      destination.put(buf);
262      destination.rewind();
263    }
264
265    @Override
266    public CacheableDeserializer<Cacheable> getDeserializer() {
267      return blockDeserializer;
268    }
269
270    private static final int deserializerIdentifier;
271    static {
272      deserializerIdentifier = CacheableDeserializerIdManager
273          .registerDeserializer(blockDeserializer);
274    }
275
276    @Override
277    public BlockType getBlockType() {
278      return BlockType.DATA;
279    }
280  }
281
282
283  public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
284    HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
285    Random rand = new Random();
286    HashSet<String> usedStrings = new HashSet<>();
287    for (int i = 0; i < numBlocks; i++) {
288      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
289      rand.nextBytes(cachedBuffer.array());
290      cachedBuffer.rewind();
291      int onDiskSizeWithoutHeader = blockSize;
292      int uncompressedSizeWithoutHeader = blockSize;
293      long prevBlockOffset = rand.nextLong();
294      BlockType.DATA.write(cachedBuffer);
295      cachedBuffer.putInt(onDiskSizeWithoutHeader);
296      cachedBuffer.putInt(uncompressedSizeWithoutHeader);
297      cachedBuffer.putLong(prevBlockOffset);
298      cachedBuffer.rewind();
299      HFileContext meta = new HFileContextBuilder()
300                          .withHBaseCheckSum(false)
301                          .withIncludesMvcc(includesMemstoreTS)
302                          .withIncludesTags(false)
303                          .withCompression(Compression.Algorithm.NONE)
304                          .withBytesPerCheckSum(0)
305                          .withChecksumType(ChecksumType.NULL)
306                          .build();
307      HFileBlock generated =
308          new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
309              prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
310              onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
311              ByteBuffAllocator.HEAP);
312
313      String strKey;
314      /* No conflicting keys */
315      strKey = Long.toString(rand.nextLong());
316      while (!usedStrings.add(strKey)) {
317        strKey = Long.toString(rand.nextLong());
318      }
319
320      returnedBlocks[i] = new HFileBlockPair();
321      returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
322      returnedBlocks[i].block = generated;
323    }
324    return returnedBlocks;
325  }
326
327  public static class HFileBlockPair {
328    BlockCacheKey blockName;
329    HFileBlock block;
330
331    public BlockCacheKey getBlockName() {
332      return this.blockName;
333    }
334
335    public HFileBlock getBlock() {
336      return this.block;
337    }
338  }
339
340  public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key,
341      Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) {
342    destBuffer.clear();
343    cache.cacheBlock(key, blockToCache);
344    Cacheable actualBlock = cache.getBlock(key, false, false, false);
345    try {
346      actualBlock.serialize(destBuffer, true);
347      assertEquals(expectedBuffer, destBuffer);
348    } finally {
349      // Release the reference count increased by getBlock.
350      if (actualBlock != null) {
351        actualBlock.release();
352      }
353    }
354  }
355}