001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.Random; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.stream.Stream; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.Waiter; 036import org.apache.hadoop.hbase.fs.HFileSystem; 037import org.apache.hadoop.hbase.io.hfile.CacheConfig; 038import org.apache.hadoop.hbase.io.hfile.HFile; 039import org.apache.hadoop.hbase.io.hfile.HFileContext; 040import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 041import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 042import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 043import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 044import org.apache.hadoop.hbase.testclassification.IOTests; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.junit.jupiter.api.AfterEach; 047import org.junit.jupiter.api.BeforeEach; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.TestTemplate; 050import org.junit.jupiter.params.provider.Arguments; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054@Tag(IOTests.TAG) 055@Tag(LargeTests.TAG) 056@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}") 057public class TestPrefetchPersistence { 058 059 @SuppressWarnings("checkstyle:Indentation") 060 public static Stream<Arguments> parameters() { 061 return Stream.of(Arguments.of(16 * 1024, 062 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 063 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 064 128 * 1024 + 1024 })); 065 } 066 067 final int constructedBlockSize; 068 final int[] constructedBlockSizes; 069 070 public TestPrefetchPersistence(int constructedBlockSize, int[] constructedBlockSizes) { 071 this.constructedBlockSize = constructedBlockSize; 072 this.constructedBlockSizes = constructedBlockSizes; 073 } 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchPersistence.class); 076 077 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 078 079 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 080 private static final int DATA_BLOCK_SIZE = 2048; 081 private static final int NUM_KV = 1000; 082 083 private Configuration conf; 084 private CacheConfig cacheConf; 085 private FileSystem fs; 086 String prefetchPersistencePath; 087 Path testDir; 088 089 BucketCache bucketCache; 090 091 final long capacitySize = 32 * 1024 * 1024; 092 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 093 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 094 095 @BeforeEach 096 public void setup() throws IOException { 097 conf = TEST_UTIL.getConfiguration(); 098 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 099 conf.setInt(PrefetchExecutor.PREFETCH_DELAY, 0); 100 PrefetchExecutor.loadConfiguration(conf); 101 testDir = TEST_UTIL.getDataTestDir(); 102 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 103 fs = HFileSystem.get(conf); 104 } 105 106 @TestTemplate 107 public void testPrefetchPersistence() throws Exception { 108 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 109 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 110 testDir + "/bucket.persistence", 60 * 1000, conf); 111 bucketCache.waitForCacheInitialization(10000); 112 cacheConf = new CacheConfig(conf, bucketCache); 113 114 long usedSize = bucketCache.getAllocator().getUsedSize(); 115 assertEquals(0, usedSize); 116 assertTrue(new File(testDir + "/bucket.cache").exists()); 117 // Load Cache 118 Path storeFile = writeStoreFile("TestPrefetch0"); 119 Path storeFile2 = writeStoreFile("TestPrefetch1"); 120 readStoreFile(storeFile); 121 readStoreFile(storeFile2); 122 usedSize = bucketCache.getAllocator().getUsedSize(); 123 assertNotEquals(0, usedSize); 124 125 bucketCache.shutdown(); 126 assertTrue(new File(testDir + "/bucket.persistence").exists()); 127 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 128 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 129 testDir + "/bucket.persistence", 60 * 1000, conf); 130 bucketCache.waitForCacheInitialization(10000); 131 cacheConf = new CacheConfig(conf, bucketCache); 132 assertTrue(usedSize != 0); 133 assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); 134 assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName())); 135 } 136 137 @AfterEach 138 public void cleanup() { 139 TEST_UTIL.cleanupTestDir(); 140 } 141 142 public void readStoreFile(Path storeFilePath) throws Exception { 143 // Open the file 144 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 145 Waiter.waitFor(conf, 30000, () -> reader.prefetchComplete() 146 || bucketCache.fullyCachedFiles.containsKey(storeFilePath.getName())); 147 } 148 149 public Path writeStoreFile(String fname) throws IOException { 150 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 151 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 152 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 153 .withOutputDir(storeFileParentDir).withFileContext(meta).build(); 154 Random rand = ThreadLocalRandom.current(); 155 final int rowLen = 32; 156 for (int i = 0; i < NUM_KV; ++i) { 157 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 158 byte[] v = RandomKeyValueUtil.randomValue(rand); 159 int cfLen = rand.nextInt(k.length - rowLen + 1); 160 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 161 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 162 sfw.append(kv); 163 } 164 165 sfw.close(); 166 return sfw.getPath(); 167 } 168 169 public static KeyValue.Type generateKeyType(Random rand) { 170 if (rand.nextBoolean()) { 171 // Let's make half of KVs puts. 172 return KeyValue.Type.Put; 173 } else { 174 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 175 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 176 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 177 + "Probably the layout of KeyValue.Type has changed."); 178 } 179 return keyType; 180 } 181 } 182 183}