001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.EnumMap;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.ArrayBackedTag;
039import org.apache.hadoop.hbase.CellComparatorImpl;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.Tag;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.fs.HFileSystem;
051import org.apache.hadoop.hbase.io.compress.Compression;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.regionserver.BloomType;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
057import org.apache.hadoop.hbase.testclassification.IOTests;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.BloomFilterFactory;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.ChecksumType;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.junit.runners.Parameterized.Parameters;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
076
077/**
078 * Tests {@link HFile} cache-on-write functionality for the following block
079 * types: data blocks, non-root index blocks, and Bloom filter blocks.
080 */
081@RunWith(Parameterized.class)
082@Category({IOTests.class, LargeTests.class})
083public class TestCacheOnWrite {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestCacheOnWrite.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class);
090
091  private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
092  private Configuration conf;
093  private CacheConfig cacheConf;
094  private FileSystem fs;
095  private Random rand = new Random(12983177L);
096  private Path storeFilePath;
097  private BlockCache blockCache;
098  private String testDescription;
099
100  private final CacheOnWriteType cowType;
101  private final Compression.Algorithm compress;
102  private final boolean cacheCompressedData;
103
104  private static final int DATA_BLOCK_SIZE = 2048;
105  private static final int NUM_KV = 25000;
106  private static final int INDEX_BLOCK_SIZE = 512;
107  private static final int BLOOM_BLOCK_SIZE = 4096;
108  private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
109  private static final int CKBYTES = 512;
110
111  /** The number of valid key types possible in a store file */
112  private static final int NUM_VALID_KEY_TYPES =
113      KeyValue.Type.values().length - 2;
114
115  private static enum CacheOnWriteType {
116    DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
117        BlockType.DATA, BlockType.ENCODED_DATA),
118    BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
119        BlockType.BLOOM_CHUNK),
120    INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
121        BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
122
123    private final String confKey;
124    private final BlockType blockType1;
125    private final BlockType blockType2;
126
127    private CacheOnWriteType(String confKey, BlockType blockType) {
128      this(confKey, blockType, blockType);
129    }
130
131    private CacheOnWriteType(String confKey, BlockType blockType1,
132        BlockType blockType2) {
133      this.blockType1 = blockType1;
134      this.blockType2 = blockType2;
135      this.confKey = confKey;
136    }
137
138    public boolean shouldBeCached(BlockType blockType) {
139      return blockType == blockType1 || blockType == blockType2;
140    }
141
142    public void modifyConf(Configuration conf) {
143      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
144        conf.setBoolean(cowType.confKey, cowType == this);
145      }
146    }
147  }
148
149  public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
150      boolean cacheCompressedData, BlockCache blockCache) {
151    this.cowType = cowType;
152    this.compress = compress;
153    this.cacheCompressedData = cacheCompressedData;
154    this.blockCache = blockCache;
155    testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
156        ", cacheCompressedData=" + cacheCompressedData + "]";
157    LOG.info(testDescription);
158  }
159
160  private static List<BlockCache> getBlockCaches() throws IOException {
161    Configuration conf = TEST_UTIL.getConfiguration();
162    List<BlockCache> blockcaches = new ArrayList<>();
163    // default
164    blockcaches.add(BlockCacheFactory.createBlockCache(conf));
165
166    //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287
167    TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f);
168    // memory
169    BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
170    blockcaches.add(lru);
171
172    // bucket cache
173    FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
174    int[] bucketSizes =
175        { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
176    BlockCache bucketcache =
177        new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
178    blockcaches.add(bucketcache);
179    return blockcaches;
180  }
181
182  @Parameters
183  public static Collection<Object[]> getParameters() throws IOException {
184    List<Object[]> params = new ArrayList<>();
185    for (BlockCache blockCache : getBlockCaches()) {
186      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
187        for (Compression.Algorithm compress : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
188          for (boolean cacheCompressedData : new boolean[] { false, true }) {
189            params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache });
190          }
191        }
192      }
193    }
194    return params;
195  }
196
197  private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
198    if (blockCache instanceof LruBlockCache) {
199      ((LruBlockCache) blockCache).clearCache();
200    } else {
201      // BucketCache may not return all cached blocks(blocks in write queue), so check it here.
202      for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
203        if (clearCount > 0) {
204          LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
205              + blockCache.getBlockCount() + " blocks remaining");
206          Thread.sleep(10);
207        }
208        for (CachedBlock block : Lists.newArrayList(blockCache)) {
209          BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
210          // CombinedBucketCache may need evict two times.
211          for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
212            if (evictCount > 1) {
213              LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
214                  + " times, maybe a bug here");
215            }
216          }
217        }
218      }
219    }
220  }
221
222  @Before
223  public void setUp() throws IOException {
224    conf = TEST_UTIL.getConfiguration();
225    this.conf.set("dfs.datanode.data.dir.perm", "700");
226    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
227    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
228    conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
229    cowType.modifyConf(conf);
230    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA));
231    conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
232        cowType.shouldBeCached(BlockType.LEAF_INDEX));
233    conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
234        cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
235    cacheConf = new CacheConfig(conf, blockCache);
236    fs = HFileSystem.get(conf);
237  }
238
239  @After
240  public void tearDown() throws IOException, InterruptedException {
241    clearBlockCache(blockCache);
242  }
243
244  @AfterClass
245  public static void afterClass() throws IOException {
246    TEST_UTIL.cleanupTestDir();
247  }
248
249  private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
250    writeStoreFile(useTags);
251    readStoreFile(useTags);
252  }
253
254  private void readStoreFile(boolean useTags) throws IOException {
255    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
256    LOG.info("HFile information: " + reader);
257    HFileContext meta = new HFileContextBuilder().withCompression(compress)
258      .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
259      .withBlockSize(DATA_BLOCK_SIZE)
260      .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
261      .withIncludesTags(useTags).build();
262    final boolean cacheBlocks = false;
263    final boolean pread = false;
264    HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
265    assertTrue(testDescription, scanner.seekTo());
266
267    long offset = 0;
268    EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class);
269
270    DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding();
271    List<Long> cachedBlocksOffset = new ArrayList<>();
272    Map<Long, HFileBlock> cachedBlocks = new HashMap<>();
273    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
274      // Flags: don't cache the block, use pread, this is not a compaction.
275      // Also, pass null for expected block type to avoid checking it.
276      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null,
277          encodingInCache);
278      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
279          offset);
280      HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
281      boolean isCached = fromCache != null;
282      cachedBlocksOffset.add(offset);
283      cachedBlocks.put(offset, fromCache);
284      boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
285      assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
286          "isCached: " + isCached + "\n" +
287          "Test description: " + testDescription + "\n" +
288          "block: " + block + "\n" +
289          "encodingInCache: " + encodingInCache + "\n" +
290          "blockCacheKey: " + blockCacheKey,
291        shouldBeCached == isCached);
292      if (isCached) {
293        if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
294          if (compress != Compression.Algorithm.NONE) {
295            assertFalse(fromCache.isUnpacked());
296          }
297          fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
298        } else {
299          assertTrue(fromCache.isUnpacked());
300        }
301        // block we cached at write-time and block read from file should be identical
302        assertEquals(block.getChecksumType(), fromCache.getChecksumType());
303        assertEquals(block.getBlockType(), fromCache.getBlockType());
304        assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
305        assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
306        assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
307        assertEquals(
308          block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
309      }
310      offset += block.getOnDiskSizeWithHeader();
311      BlockType bt = block.getBlockType();
312      Integer count = blockCountByType.get(bt);
313      blockCountByType.put(bt, (count == null ? 0 : count) + 1);
314    }
315
316    LOG.info("Block count by type: " + blockCountByType);
317    String countByType = blockCountByType.toString();
318    if (useTags) {
319      assertEquals("{" + BlockType.DATA
320          + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType);
321    } else {
322      assertEquals("{" + BlockType.DATA
323          + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType);
324    }
325
326    // iterate all the keyvalue from hfile
327    while (scanner.next()) {
328      scanner.getCell();
329    }
330    Iterator<Long> iterator = cachedBlocksOffset.iterator();
331    while(iterator.hasNext()) {
332      Long entry = iterator.next();
333      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
334          entry);
335      HFileBlock hFileBlock = cachedBlocks.get(entry);
336      if (hFileBlock != null) {
337        // call return twice because for the isCache cased the counter would have got incremented
338        // twice
339        blockCache.returnBlock(blockCacheKey, hFileBlock);
340        if(cacheCompressedData) {
341          if (this.compress == Compression.Algorithm.NONE
342              || cowType == CacheOnWriteType.INDEX_BLOCKS
343              || cowType == CacheOnWriteType.BLOOM_BLOCKS) {
344            blockCache.returnBlock(blockCacheKey, hFileBlock);
345          }
346        } else {
347          blockCache.returnBlock(blockCacheKey, hFileBlock);
348        }
349      }
350    }
351    scanner.shipped();
352    reader.close();
353  }
354
355  public static KeyValue.Type generateKeyType(Random rand) {
356    if (rand.nextBoolean()) {
357      // Let's make half of KVs puts.
358      return KeyValue.Type.Put;
359    } else {
360      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
361      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
362        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
363            + "Probably the layout of KeyValue.Type has changed.");
364      }
365      return keyType;
366    }
367  }
368
369  private void writeStoreFile(boolean useTags) throws IOException {
370    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
371        "test_cache_on_write");
372    HFileContext meta = new HFileContextBuilder().withCompression(compress)
373        .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
374        .withBlockSize(DATA_BLOCK_SIZE)
375        .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
376        .withIncludesTags(useTags).build();
377    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
378        .withOutputDir(storeFileParentDir).withComparator(CellComparatorImpl.COMPARATOR)
379        .withFileContext(meta)
380        .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
381    byte[] cf = Bytes.toBytes("fam");
382    for (int i = 0; i < NUM_KV; ++i) {
383      byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i);
384      byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand);
385      byte[] value = RandomKeyValueUtil.randomValue(rand);
386      KeyValue kv;
387      if(useTags) {
388        Tag t = new ArrayBackedTag((byte) 1, "visibility");
389        List<Tag> tagList = new ArrayList<>();
390        tagList.add(t);
391        Tag[] tags = new Tag[1];
392        tags[0] = t;
393        kv =
394            new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
395                Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList);
396      } else {
397        kv =
398            new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
399                Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length);
400      }
401      sfw.append(kv);
402    }
403
404    sfw.close();
405    storeFilePath = sfw.getPath();
406  }
407
408  private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
409      throws IOException, InterruptedException {
410    // TODO: need to change this test if we add a cache size threshold for
411    // compactions, or if we implement some other kind of intelligent logic for
412    // deciding what blocks to cache-on-write on compaction.
413    final String table = "CompactionCacheOnWrite";
414    final String cf = "myCF";
415    final byte[] cfBytes = Bytes.toBytes(cf);
416    final int maxVersions = 3;
417    ColumnFamilyDescriptor cfd =
418        ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress)
419            .setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
420            .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
421    HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
422    int rowIdx = 0;
423    long ts = EnvironmentEdgeManager.currentTime();
424    for (int iFile = 0; iFile < 5; ++iFile) {
425      for (int iRow = 0; iRow < 500; ++iRow) {
426        String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
427            iRow;
428        Put p = new Put(Bytes.toBytes(rowStr));
429        ++rowIdx;
430        for (int iCol = 0; iCol < 10; ++iCol) {
431          String qualStr = "col" + iCol;
432          String valueStr = "value_" + rowStr + "_" + qualStr;
433          for (int iTS = 0; iTS < 5; ++iTS) {
434            if (useTags) {
435              Tag t = new ArrayBackedTag((byte) 1, "visibility");
436              Tag[] tags = new Tag[1];
437              tags[0] = t;
438              KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
439                  HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
440              p.add(kv);
441            } else {
442              p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
443            }
444          }
445        }
446        p.setDurability(Durability.ASYNC_WAL);
447        region.put(p);
448      }
449      region.flush(true);
450    }
451    clearBlockCache(blockCache);
452    assertEquals(0, blockCache.getBlockCount());
453    region.compact(false);
454    LOG.debug("compactStores() returned");
455
456    for (CachedBlock block: blockCache) {
457      assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
458      assertNotEquals(BlockType.DATA, block.getBlockType());
459    }
460    ((HRegion)region).close();
461  }
462
463  @Test
464  public void testStoreFileCacheOnWrite() throws IOException {
465    testStoreFileCacheOnWriteInternals(false);
466    testStoreFileCacheOnWriteInternals(true);
467  }
468
469  @Test
470  public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
471    testNotCachingDataBlocksDuringCompactionInternals(false);
472    testNotCachingDataBlocksDuringCompactionInternals(true);
473  }
474}