001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.EnumMap;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.Set;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.ArrayBackedTag;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.Tag;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.fs.HFileSystem;
051import org.apache.hadoop.hbase.io.compress.Compression;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.regionserver.BloomType;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
057import org.apache.hadoop.hbase.testclassification.IOTests;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.BloomFilterFactory;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.ChecksumType;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.Pair;
064import org.junit.After;
065import org.junit.AfterClass;
066import org.junit.Before;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.runner.RunWith;
071import org.junit.runners.Parameterized;
072import org.junit.runners.Parameterized.Parameters;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
077import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
078
079/**
080 * Tests {@link HFile} cache-on-write functionality for the following block
081 * types: data blocks, non-root index blocks, and Bloom filter blocks.
082 */
083@RunWith(Parameterized.class)
084@Category({IOTests.class, LargeTests.class})
085public class TestCacheOnWrite {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089      HBaseClassTestRule.forClass(TestCacheOnWrite.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class);
092
093  private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
094  private Configuration conf;
095  private CacheConfig cacheConf;
096  private FileSystem fs;
097  private Random rand = new Random(12983177L);
098  private Path storeFilePath;
099  private BlockCache blockCache;
100  private String testDescription;
101
102  private final CacheOnWriteType cowType;
103  private final Compression.Algorithm compress;
104  private final boolean cacheCompressedData;
105
106  private static final int DATA_BLOCK_SIZE = 2048;
107  private static final int NUM_KV = 25000;
108  private static final int INDEX_BLOCK_SIZE = 512;
109  private static final int BLOOM_BLOCK_SIZE = 4096;
110  private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
111  private static final int CKBYTES = 512;
112
113
114  private static final Set<BlockType> INDEX_BLOCK_TYPES = ImmutableSet.of(
115    BlockType.INDEX_V1,
116    BlockType.INTERMEDIATE_INDEX,
117    BlockType.ROOT_INDEX,
118    BlockType.LEAF_INDEX
119  );
120  private static final Set<BlockType> BLOOM_BLOCK_TYPES = ImmutableSet.of(
121    BlockType.BLOOM_CHUNK,
122    BlockType.GENERAL_BLOOM_META,
123    BlockType.DELETE_FAMILY_BLOOM_META
124  );
125  private static final Set<BlockType> DATA_BLOCK_TYPES = ImmutableSet.of(
126    BlockType.ENCODED_DATA,
127    BlockType.DATA
128  );
129
130  // All test cases are supposed to generate files for compaction within this range
131  private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L;
132  private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L;
133
134  /** The number of valid key types possible in a store file */
135  private static final int NUM_VALID_KEY_TYPES =
136      KeyValue.Type.values().length - 2;
137
138  private enum CacheOnWriteType {
139    DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
140        BlockType.DATA, BlockType.ENCODED_DATA),
141    BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
142        BlockType.BLOOM_CHUNK),
143    INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
144        BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
145
146    private final String confKey;
147    private final BlockType blockType1;
148    private final BlockType blockType2;
149
150    CacheOnWriteType(String confKey, BlockType blockType) {
151      this(confKey, blockType, blockType);
152    }
153
154    CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2) {
155      this.blockType1 = blockType1;
156      this.blockType2 = blockType2;
157      this.confKey = confKey;
158    }
159
160    public boolean shouldBeCached(BlockType blockType) {
161      return blockType == blockType1 || blockType == blockType2;
162    }
163
164    public void modifyConf(Configuration conf) {
165      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
166        conf.setBoolean(cowType.confKey, cowType == this);
167      }
168    }
169  }
170
171  public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
172      boolean cacheCompressedData, BlockCache blockCache) {
173    this.cowType = cowType;
174    this.compress = compress;
175    this.cacheCompressedData = cacheCompressedData;
176    this.blockCache = blockCache;
177    testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
178        ", cacheCompressedData=" + cacheCompressedData + "]";
179    LOG.info(testDescription);
180  }
181
182  private static List<BlockCache> getBlockCaches() throws IOException {
183    Configuration conf = TEST_UTIL.getConfiguration();
184    List<BlockCache> blockcaches = new ArrayList<>();
185    // default
186    blockcaches.add(BlockCacheFactory.createBlockCache(conf));
187
188    //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287
189    TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f);
190    // memory
191    BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
192    blockcaches.add(lru);
193
194    // bucket cache
195    FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
196    int[] bucketSizes =
197        { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
198    BlockCache bucketcache =
199        new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
200    blockcaches.add(bucketcache);
201    return blockcaches;
202  }
203
204  @Parameters
205  public static Collection<Object[]> getParameters() throws IOException {
206    List<Object[]> params = new ArrayList<>();
207    for (BlockCache blockCache : getBlockCaches()) {
208      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
209        for (Compression.Algorithm compress : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
210          for (boolean cacheCompressedData : new boolean[] { false, true }) {
211            params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache });
212          }
213        }
214      }
215    }
216    return params;
217  }
218
219  private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
220    if (blockCache instanceof LruBlockCache) {
221      ((LruBlockCache) blockCache).clearCache();
222    } else {
223      // BucketCache may not return all cached blocks(blocks in write queue), so check it here.
224      for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
225        if (clearCount > 0) {
226          LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
227              + blockCache.getBlockCount() + " blocks remaining");
228          Thread.sleep(10);
229        }
230        for (CachedBlock block : Lists.newArrayList(blockCache)) {
231          BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
232          // CombinedBucketCache may need evict two times.
233          for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
234            if (evictCount > 1) {
235              LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
236                  + " times, maybe a bug here");
237            }
238          }
239        }
240      }
241    }
242  }
243
244  @Before
245  public void setUp() throws IOException {
246    conf = TEST_UTIL.getConfiguration();
247    this.conf.set("dfs.datanode.data.dir.perm", "700");
248    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
249    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
250    conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
251    cowType.modifyConf(conf);
252    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA));
253    conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
254        cowType.shouldBeCached(BlockType.LEAF_INDEX));
255    conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
256        cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
257    cacheConf = new CacheConfig(conf, blockCache);
258    fs = HFileSystem.get(conf);
259  }
260
261  @After
262  public void tearDown() throws IOException, InterruptedException {
263    clearBlockCache(blockCache);
264  }
265
266  @AfterClass
267  public static void afterClass() throws IOException {
268    TEST_UTIL.cleanupTestDir();
269  }
270
271  private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
272    writeStoreFile(useTags);
273    readStoreFile(useTags);
274  }
275
276  private void readStoreFile(boolean useTags) throws IOException {
277    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
278    LOG.info("HFile information: " + reader);
279    HFileContext meta = new HFileContextBuilder().withCompression(compress)
280      .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
281      .withBlockSize(DATA_BLOCK_SIZE)
282      .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
283      .withIncludesTags(useTags).build();
284    final boolean cacheBlocks = false;
285    final boolean pread = false;
286    HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
287    assertTrue(testDescription, scanner.seekTo());
288
289    long offset = 0;
290    EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class);
291
292    DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding();
293    List<Long> cachedBlocksOffset = new ArrayList<>();
294    Map<Long, Pair<HFileBlock, HFileBlock>> cachedBlocks = new HashMap<>();
295    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
296      // Flags: don't cache the block, use pread, this is not a compaction.
297      // Also, pass null for expected block type to avoid checking it.
298      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null,
299          encodingInCache);
300      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
301      HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
302      boolean isCached = fromCache != null;
303      cachedBlocksOffset.add(offset);
304      cachedBlocks.put(offset, fromCache == null ? null : Pair.newPair(block, fromCache));
305      boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
306      assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
307          "isCached: " + isCached + "\n" +
308          "Test description: " + testDescription + "\n" +
309          "block: " + block + "\n" +
310          "encodingInCache: " + encodingInCache + "\n" +
311          "blockCacheKey: " + blockCacheKey,
312        shouldBeCached == isCached);
313      if (isCached) {
314        if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
315          if (compress != Compression.Algorithm.NONE) {
316            assertFalse(fromCache.isUnpacked());
317          }
318          fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
319        } else {
320          assertTrue(fromCache.isUnpacked());
321        }
322        // block we cached at write-time and block read from file should be identical
323        assertEquals(block.getChecksumType(), fromCache.getChecksumType());
324        assertEquals(block.getBlockType(), fromCache.getBlockType());
325        assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
326        assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
327        assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
328        assertEquals(
329          block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
330      }
331      offset += block.getOnDiskSizeWithHeader();
332      BlockType bt = block.getBlockType();
333      Integer count = blockCountByType.get(bt);
334      blockCountByType.put(bt, (count == null ? 0 : count) + 1);
335    }
336
337    LOG.info("Block count by type: " + blockCountByType);
338    String countByType = blockCountByType.toString();
339    if (useTags) {
340      assertEquals("{" + BlockType.DATA
341          + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType);
342    } else {
343      assertEquals("{" + BlockType.DATA
344          + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType);
345    }
346
347    // iterate all the keyvalue from hfile
348    while (scanner.next()) {
349      scanner.getCell();
350    }
351    Iterator<Long> iterator = cachedBlocksOffset.iterator();
352    while(iterator.hasNext()) {
353      Long entry = iterator.next();
354      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
355          entry);
356      Pair<HFileBlock, HFileBlock> blockPair = cachedBlocks.get(entry);
357      if (blockPair != null) {
358        // Call return twice because for the isCache cased the counter would have got incremented
359        // twice. Notice that here we need to returnBlock with different blocks. see comments in
360        // BucketCache#returnBlock.
361        blockPair.getSecond().release();
362        if (cacheCompressedData) {
363          if (this.compress == Compression.Algorithm.NONE
364              || cowType == CacheOnWriteType.INDEX_BLOCKS
365              || cowType == CacheOnWriteType.BLOOM_BLOCKS) {
366            blockPair.getFirst().release();
367          }
368        } else {
369          blockPair.getFirst().release();
370        }
371      }
372    }
373    scanner.shipped();
374    reader.close();
375  }
376
377  public static KeyValue.Type generateKeyType(Random rand) {
378    if (rand.nextBoolean()) {
379      // Let's make half of KVs puts.
380      return KeyValue.Type.Put;
381    } else {
382      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
383      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
384        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
385            + "Probably the layout of KeyValue.Type has changed.");
386      }
387      return keyType;
388    }
389  }
390
391  private void writeStoreFile(boolean useTags) throws IOException {
392    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
393        "test_cache_on_write");
394    HFileContext meta = new HFileContextBuilder().withCompression(compress)
395        .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
396        .withBlockSize(DATA_BLOCK_SIZE)
397        .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
398        .withIncludesTags(useTags).build();
399    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
400        .withOutputDir(storeFileParentDir)
401        .withFileContext(meta)
402        .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
403    byte[] cf = Bytes.toBytes("fam");
404    for (int i = 0; i < NUM_KV; ++i) {
405      byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i);
406      byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand);
407      byte[] value = RandomKeyValueUtil.randomValue(rand);
408      KeyValue kv;
409      if(useTags) {
410        Tag t = new ArrayBackedTag((byte) 1, "visibility");
411        List<Tag> tagList = new ArrayList<>();
412        tagList.add(t);
413        Tag[] tags = new Tag[1];
414        tags[0] = t;
415        kv =
416            new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
417                Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList);
418      } else {
419        kv =
420            new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
421                Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length);
422      }
423      sfw.append(kv);
424    }
425
426    sfw.close();
427    storeFilePath = sfw.getPath();
428  }
429
430  private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
431    boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold)
432    throws IOException, InterruptedException {
433    // create a localConf
434    boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false);
435    long localCacheCompactedBlocksThreshold = conf
436      .getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
437        CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
438    boolean localCacheBloomBlocksValue = conf
439      .getBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
440        CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE);
441    boolean localCacheIndexBlocksValue = conf
442      .getBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
443        CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE);
444
445    try {
446      // Set the conf if testing caching compacted blocks on write
447      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
448        cacheBlocksOnCompaction);
449
450      // set size threshold if testing compaction size threshold
451      if (cacheBlocksOnCompactionThreshold > 0) {
452        conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
453          cacheBlocksOnCompactionThreshold);
454      }
455
456      // TODO: need to change this test if we add a cache size threshold for
457      // compactions, or if we implement some other kind of intelligent logic for
458      // deciding what blocks to cache-on-write on compaction.
459      final String table = "CompactionCacheOnWrite";
460      final String cf = "myCF";
461      final byte[] cfBytes = Bytes.toBytes(cf);
462      final int maxVersions = 3;
463      ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder
464        .newBuilder(cfBytes)
465        .setCompressionType(compress)
466        .setBloomFilterType(BLOOM_TYPE)
467        .setMaxVersions(maxVersions)
468        .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
469        .build();
470      HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
471      int rowIdx = 0;
472      long ts = EnvironmentEdgeManager.currentTime();
473      for (int iFile = 0; iFile < 5; ++iFile) {
474        for (int iRow = 0; iRow < 500; ++iRow) {
475          String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow;
476          Put p = new Put(Bytes.toBytes(rowStr));
477          ++rowIdx;
478          for (int iCol = 0; iCol < 10; ++iCol) {
479            String qualStr = "col" + iCol;
480            String valueStr = "value_" + rowStr + "_" + qualStr;
481            for (int iTS = 0; iTS < 5; ++iTS) {
482              if (useTags) {
483                Tag t = new ArrayBackedTag((byte) 1, "visibility");
484                Tag[] tags = new Tag[1];
485                tags[0] = t;
486                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
487                    HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
488                p.add(kv);
489              } else {
490                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
491                  ts++, Bytes.toBytes(valueStr));
492                p.add(kv);
493              }
494            }
495          }
496          p.setDurability(Durability.ASYNC_WAL);
497          region.put(p);
498        }
499        region.flush(true);
500      }
501
502      clearBlockCache(blockCache);
503      assertEquals(0, blockCache.getBlockCount());
504
505      region.compact(false);
506      LOG.debug("compactStores() returned");
507
508      boolean dataBlockCached = false;
509      boolean bloomBlockCached = false;
510      boolean indexBlockCached = false;
511
512      for (CachedBlock block : blockCache) {
513        if (DATA_BLOCK_TYPES.contains(block.getBlockType())) {
514          dataBlockCached = true;
515        } else if (BLOOM_BLOCK_TYPES.contains(block.getBlockType())) {
516          bloomBlockCached = true;
517        } else if (INDEX_BLOCK_TYPES.contains(block.getBlockType())) {
518          indexBlockCached = true;
519        }
520      }
521
522      // Data blocks should be cached in instances where we are caching blocks on write. In the case
523      // of testing
524      // BucketCache, we cannot verify block type as it is not stored in the cache.
525      boolean cacheOnCompactAndNonBucketCache = cacheBlocksOnCompaction
526        && !(blockCache instanceof BucketCache);
527
528      String assertErrorMessage = "\nTest description: " + testDescription +
529        "\ncacheBlocksOnCompaction: "
530        + cacheBlocksOnCompaction + "\n";
531
532      if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) {
533        if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) {
534          assertTrue(assertErrorMessage, dataBlockCached);
535          assertTrue(assertErrorMessage, bloomBlockCached);
536          assertTrue(assertErrorMessage, indexBlockCached);
537        } else {
538          assertFalse(assertErrorMessage, dataBlockCached);
539
540          if (localCacheBloomBlocksValue) {
541            assertTrue(assertErrorMessage, bloomBlockCached);
542          } else {
543            assertFalse(assertErrorMessage, bloomBlockCached);
544          }
545
546          if (localCacheIndexBlocksValue) {
547            assertTrue(assertErrorMessage, indexBlockCached);
548          } else {
549            assertFalse(assertErrorMessage, indexBlockCached);
550          }
551        }
552      } else {
553        assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
554
555        if (cacheOnCompactAndNonBucketCache) {
556          assertTrue(assertErrorMessage, bloomBlockCached);
557          assertTrue(assertErrorMessage, indexBlockCached);
558        }
559      }
560
561
562      region.close();
563    } finally {
564      // reset back
565      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
566      conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
567        localCacheCompactedBlocksThreshold);
568      conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue);
569      conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue);
570    }
571  }
572
573  @Test
574  public void testStoreFileCacheOnWrite() throws IOException {
575    testStoreFileCacheOnWriteInternals(false);
576    testStoreFileCacheOnWriteInternals(true);
577  }
578
579  @Test
580  public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
581    testCachingDataBlocksDuringCompactionInternals(false, false, -1);
582    testCachingDataBlocksDuringCompactionInternals(true, true, -1);
583  }
584
585  @Test
586  public void testCachingDataBlocksThresholdDuringCompaction()
587    throws IOException, InterruptedException {
588    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD);
589    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD);
590  }
591
592}