001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.EnumMap; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.ArrayBackedTag; 039import org.apache.hadoop.hbase.CellComparatorImpl; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.Tag; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Durability; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.fs.HFileSystem; 051import org.apache.hadoop.hbase.io.compress.Compression; 052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 054import org.apache.hadoop.hbase.regionserver.BloomType; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 057import org.apache.hadoop.hbase.testclassification.IOTests; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.util.BloomFilterFactory; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.ChecksumType; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.junit.runners.Parameterized.Parameters; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 076 077/** 078 * Tests {@link HFile} cache-on-write functionality for the following block 079 * types: data blocks, non-root index blocks, and Bloom filter blocks. 080 */ 081@RunWith(Parameterized.class) 082@Category({IOTests.class, LargeTests.class}) 083public class TestCacheOnWrite { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestCacheOnWrite.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class); 090 091 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 092 private Configuration conf; 093 private CacheConfig cacheConf; 094 private FileSystem fs; 095 private Random rand = new Random(12983177L); 096 private Path storeFilePath; 097 private BlockCache blockCache; 098 private String testDescription; 099 100 private final CacheOnWriteType cowType; 101 private final Compression.Algorithm compress; 102 private final boolean cacheCompressedData; 103 104 private static final int DATA_BLOCK_SIZE = 2048; 105 private static final int NUM_KV = 25000; 106 private static final int INDEX_BLOCK_SIZE = 512; 107 private static final int BLOOM_BLOCK_SIZE = 4096; 108 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; 109 private static final int CKBYTES = 512; 110 111 /** The number of valid key types possible in a store file */ 112 private static final int NUM_VALID_KEY_TYPES = 113 KeyValue.Type.values().length - 2; 114 115 private static enum CacheOnWriteType { 116 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, 117 BlockType.DATA, BlockType.ENCODED_DATA), 118 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 119 BlockType.BLOOM_CHUNK), 120 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 121 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX); 122 123 private final String confKey; 124 private final BlockType blockType1; 125 private final BlockType blockType2; 126 127 private CacheOnWriteType(String confKey, BlockType blockType) { 128 this(confKey, blockType, blockType); 129 } 130 131 private CacheOnWriteType(String confKey, BlockType blockType1, 132 BlockType blockType2) { 133 this.blockType1 = blockType1; 134 this.blockType2 = blockType2; 135 this.confKey = confKey; 136 } 137 138 public boolean shouldBeCached(BlockType blockType) { 139 return blockType == blockType1 || blockType == blockType2; 140 } 141 142 public void modifyConf(Configuration conf) { 143 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 144 conf.setBoolean(cowType.confKey, cowType == this); 145 } 146 } 147 } 148 149 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, 150 boolean cacheCompressedData, BlockCache blockCache) { 151 this.cowType = cowType; 152 this.compress = compress; 153 this.cacheCompressedData = cacheCompressedData; 154 this.blockCache = blockCache; 155 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + 156 ", cacheCompressedData=" + cacheCompressedData + "]"; 157 LOG.info(testDescription); 158 } 159 160 private static List<BlockCache> getBlockCaches() throws IOException { 161 Configuration conf = TEST_UTIL.getConfiguration(); 162 List<BlockCache> blockcaches = new ArrayList<>(); 163 // default 164 blockcaches.add(BlockCacheFactory.createBlockCache(conf)); 165 166 //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 167 TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f); 168 // memory 169 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); 170 blockcaches.add(lru); 171 172 // bucket cache 173 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); 174 int[] bucketSizes = 175 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; 176 BlockCache bucketcache = 177 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); 178 blockcaches.add(bucketcache); 179 return blockcaches; 180 } 181 182 @Parameters 183 public static Collection<Object[]> getParameters() throws IOException { 184 List<Object[]> params = new ArrayList<>(); 185 for (BlockCache blockCache : getBlockCaches()) { 186 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 187 for (Compression.Algorithm compress : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) { 188 for (boolean cacheCompressedData : new boolean[] { false, true }) { 189 params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); 190 } 191 } 192 } 193 } 194 return params; 195 } 196 197 private void clearBlockCache(BlockCache blockCache) throws InterruptedException { 198 if (blockCache instanceof LruBlockCache) { 199 ((LruBlockCache) blockCache).clearCache(); 200 } else { 201 // BucketCache may not return all cached blocks(blocks in write queue), so check it here. 202 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { 203 if (clearCount > 0) { 204 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " 205 + blockCache.getBlockCount() + " blocks remaining"); 206 Thread.sleep(10); 207 } 208 for (CachedBlock block : Lists.newArrayList(blockCache)) { 209 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); 210 // CombinedBucketCache may need evict two times. 211 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { 212 if (evictCount > 1) { 213 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount 214 + " times, maybe a bug here"); 215 } 216 } 217 } 218 } 219 } 220 } 221 222 @Before 223 public void setUp() throws IOException { 224 conf = TEST_UTIL.getConfiguration(); 225 this.conf.set("dfs.datanode.data.dir.perm", "700"); 226 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); 227 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); 228 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); 229 cowType.modifyConf(conf); 230 conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA)); 231 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 232 cowType.shouldBeCached(BlockType.LEAF_INDEX)); 233 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 234 cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); 235 cacheConf = new CacheConfig(conf, blockCache); 236 fs = HFileSystem.get(conf); 237 } 238 239 @After 240 public void tearDown() throws IOException, InterruptedException { 241 clearBlockCache(blockCache); 242 } 243 244 @AfterClass 245 public static void afterClass() throws IOException { 246 TEST_UTIL.cleanupTestDir(); 247 } 248 249 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { 250 writeStoreFile(useTags); 251 readStoreFile(useTags); 252 } 253 254 private void readStoreFile(boolean useTags) throws IOException { 255 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 256 LOG.info("HFile information: " + reader); 257 HFileContext meta = new HFileContextBuilder().withCompression(compress) 258 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) 259 .withBlockSize(DATA_BLOCK_SIZE) 260 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 261 .withIncludesTags(useTags).build(); 262 final boolean cacheBlocks = false; 263 final boolean pread = false; 264 HFileScanner scanner = reader.getScanner(cacheBlocks, pread); 265 assertTrue(testDescription, scanner.seekTo()); 266 267 long offset = 0; 268 EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class); 269 270 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); 271 List<Long> cachedBlocksOffset = new ArrayList<>(); 272 Map<Long, HFileBlock> cachedBlocks = new HashMap<>(); 273 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 274 // Flags: don't cache the block, use pread, this is not a compaction. 275 // Also, pass null for expected block type to avoid checking it. 276 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, 277 encodingInCache); 278 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), 279 offset); 280 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); 281 boolean isCached = fromCache != null; 282 cachedBlocksOffset.add(offset); 283 cachedBlocks.put(offset, fromCache); 284 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); 285 assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + 286 "isCached: " + isCached + "\n" + 287 "Test description: " + testDescription + "\n" + 288 "block: " + block + "\n" + 289 "encodingInCache: " + encodingInCache + "\n" + 290 "blockCacheKey: " + blockCacheKey, 291 shouldBeCached == isCached); 292 if (isCached) { 293 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { 294 if (compress != Compression.Algorithm.NONE) { 295 assertFalse(fromCache.isUnpacked()); 296 } 297 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); 298 } else { 299 assertTrue(fromCache.isUnpacked()); 300 } 301 // block we cached at write-time and block read from file should be identical 302 assertEquals(block.getChecksumType(), fromCache.getChecksumType()); 303 assertEquals(block.getBlockType(), fromCache.getBlockType()); 304 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); 305 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); 306 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); 307 assertEquals( 308 block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); 309 } 310 offset += block.getOnDiskSizeWithHeader(); 311 BlockType bt = block.getBlockType(); 312 Integer count = blockCountByType.get(bt); 313 blockCountByType.put(bt, (count == null ? 0 : count) + 1); 314 } 315 316 LOG.info("Block count by type: " + blockCountByType); 317 String countByType = blockCountByType.toString(); 318 if (useTags) { 319 assertEquals("{" + BlockType.DATA 320 + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType); 321 } else { 322 assertEquals("{" + BlockType.DATA 323 + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType); 324 } 325 326 // iterate all the keyvalue from hfile 327 while (scanner.next()) { 328 scanner.getCell(); 329 } 330 Iterator<Long> iterator = cachedBlocksOffset.iterator(); 331 while(iterator.hasNext()) { 332 Long entry = iterator.next(); 333 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), 334 entry); 335 HFileBlock hFileBlock = cachedBlocks.get(entry); 336 if (hFileBlock != null) { 337 // call return twice because for the isCache cased the counter would have got incremented 338 // twice 339 blockCache.returnBlock(blockCacheKey, hFileBlock); 340 if(cacheCompressedData) { 341 if (this.compress == Compression.Algorithm.NONE 342 || cowType == CacheOnWriteType.INDEX_BLOCKS 343 || cowType == CacheOnWriteType.BLOOM_BLOCKS) { 344 blockCache.returnBlock(blockCacheKey, hFileBlock); 345 } 346 } else { 347 blockCache.returnBlock(blockCacheKey, hFileBlock); 348 } 349 } 350 } 351 scanner.shipped(); 352 reader.close(); 353 } 354 355 public static KeyValue.Type generateKeyType(Random rand) { 356 if (rand.nextBoolean()) { 357 // Let's make half of KVs puts. 358 return KeyValue.Type.Put; 359 } else { 360 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 361 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 362 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 363 + "Probably the layout of KeyValue.Type has changed."); 364 } 365 return keyType; 366 } 367 } 368 369 private void writeStoreFile(boolean useTags) throws IOException { 370 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), 371 "test_cache_on_write"); 372 HFileContext meta = new HFileContextBuilder().withCompression(compress) 373 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) 374 .withBlockSize(DATA_BLOCK_SIZE) 375 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 376 .withIncludesTags(useTags).build(); 377 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 378 .withOutputDir(storeFileParentDir).withComparator(CellComparatorImpl.COMPARATOR) 379 .withFileContext(meta) 380 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); 381 byte[] cf = Bytes.toBytes("fam"); 382 for (int i = 0; i < NUM_KV; ++i) { 383 byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i); 384 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); 385 byte[] value = RandomKeyValueUtil.randomValue(rand); 386 KeyValue kv; 387 if(useTags) { 388 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 389 List<Tag> tagList = new ArrayList<>(); 390 tagList.add(t); 391 Tag[] tags = new Tag[1]; 392 tags[0] = t; 393 kv = 394 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 395 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList); 396 } else { 397 kv = 398 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 399 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length); 400 } 401 sfw.append(kv); 402 } 403 404 sfw.close(); 405 storeFilePath = sfw.getPath(); 406 } 407 408 private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) 409 throws IOException, InterruptedException { 410 // TODO: need to change this test if we add a cache size threshold for 411 // compactions, or if we implement some other kind of intelligent logic for 412 // deciding what blocks to cache-on-write on compaction. 413 final String table = "CompactionCacheOnWrite"; 414 final String cf = "myCF"; 415 final byte[] cfBytes = Bytes.toBytes(cf); 416 final int maxVersions = 3; 417 ColumnFamilyDescriptor cfd = 418 ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress) 419 .setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions) 420 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build(); 421 HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache); 422 int rowIdx = 0; 423 long ts = EnvironmentEdgeManager.currentTime(); 424 for (int iFile = 0; iFile < 5; ++iFile) { 425 for (int iRow = 0; iRow < 500; ++iRow) { 426 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + 427 iRow; 428 Put p = new Put(Bytes.toBytes(rowStr)); 429 ++rowIdx; 430 for (int iCol = 0; iCol < 10; ++iCol) { 431 String qualStr = "col" + iCol; 432 String valueStr = "value_" + rowStr + "_" + qualStr; 433 for (int iTS = 0; iTS < 5; ++iTS) { 434 if (useTags) { 435 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 436 Tag[] tags = new Tag[1]; 437 tags[0] = t; 438 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 439 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); 440 p.add(kv); 441 } else { 442 p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); 443 } 444 } 445 } 446 p.setDurability(Durability.ASYNC_WAL); 447 region.put(p); 448 } 449 region.flush(true); 450 } 451 clearBlockCache(blockCache); 452 assertEquals(0, blockCache.getBlockCount()); 453 region.compact(false); 454 LOG.debug("compactStores() returned"); 455 456 for (CachedBlock block: blockCache) { 457 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); 458 assertNotEquals(BlockType.DATA, block.getBlockType()); 459 } 460 ((HRegion)region).close(); 461 } 462 463 @Test 464 public void testStoreFileCacheOnWrite() throws IOException { 465 testStoreFileCacheOnWriteInternals(false); 466 testStoreFileCacheOnWriteInternals(true); 467 } 468 469 @Test 470 public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { 471 testNotCachingDataBlocksDuringCompactionInternals(false); 472 testNotCachingDataBlocksDuringCompactionInternals(true); 473 } 474}