001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.File; 029import java.io.IOException; 030import java.util.Map; 031import java.util.Random; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.function.BiConsumer; 034import java.util.function.BiFunction; 035import org.apache.commons.lang3.mutable.MutableLong; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.fs.HFileSystem; 046import org.apache.hadoop.hbase.io.ByteBuffAllocator; 047import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 048import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; 049import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 050import org.apache.hadoop.hbase.testclassification.IOTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.After; 054import org.junit.Before; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 064 065@Category({ IOTests.class, MediumTests.class }) 066public class TestPrefetchWithBucketCache { 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class); 073 074 @Rule 075 public TestName name = new TestName(); 076 077 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 078 079 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 080 private static final int DATA_BLOCK_SIZE = 2048; 081 private Configuration conf; 082 private CacheConfig cacheConf; 083 private FileSystem fs; 084 private BlockCache blockCache; 085 086 @Before 087 public void setUp() throws IOException { 088 conf = TEST_UTIL.getConfiguration(); 089 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 090 fs = HFileSystem.get(conf); 091 File testDir = new File(name.getMethodName()); 092 testDir.mkdir(); 093 conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); 094 } 095 096 @After 097 public void tearDown() { 098 File cacheFile = new File(name.getMethodName() + "/bucket.cache"); 099 File dir = new File(name.getMethodName()); 100 cacheFile.delete(); 101 dir.delete(); 102 } 103 104 @Test 105 public void testPrefetchDoesntOverwork() throws Exception { 106 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 107 blockCache = BlockCacheFactory.createBlockCache(conf); 108 cacheConf = new CacheConfig(conf, blockCache); 109 Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100); 110 // Prefetches the file blocks 111 LOG.debug("First read should prefetch the blocks."); 112 readStoreFile(storeFile); 113 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 114 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 115 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 116 Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap()); 117 // Reads file again and check we are not prefetching it again 118 LOG.debug("Second read, no prefetch should happen here."); 119 readStoreFile(storeFile); 120 // Makes sure the cache hasn't changed 121 snapshot.entrySet().forEach(e -> { 122 BucketEntry entry = bc.getBackingMap().get(e.getKey()); 123 assertNotNull(entry); 124 assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); 125 }); 126 // forcibly removes first block from the bc backing map, in order to cause it to be cached again 127 BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); 128 LOG.debug("removing block {}", key); 129 bc.getBackingMap().remove(key); 130 bc.getFullyCachedFiles().get().remove(storeFile.getName()); 131 assertTrue(snapshot.size() > bc.getBackingMap().size()); 132 LOG.debug("Third read should prefetch again, as we removed one block for the file."); 133 readStoreFile(storeFile); 134 Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); 135 assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); 136 } 137 138 @Test 139 public void testPrefetchInterruptOnCapacity() throws Exception { 140 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 141 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 142 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 143 conf.setDouble("hbase.bucketcache.minfactor", 0.95); 144 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01); 145 blockCache = BlockCacheFactory.createBlockCache(conf); 146 cacheConf = new CacheConfig(conf, blockCache); 147 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 148 // Prefetches the file blocks 149 LOG.debug("First read should prefetch the blocks."); 150 createReaderAndWaitForPrefetchInterruption(storeFile); 151 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 152 long evictionsFirstPrefetch = bc.getStats().getEvictionCount(); 153 LOG.debug("evictions after first prefetch: {}", bc.getStats().getEvictionCount()); 154 HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile); 155 LOG.debug("evictions after second prefetch: {}", bc.getStats().getEvictionCount()); 156 assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 10); 157 HFileScanner scanner = reader.getScanner(conf, true, true); 158 scanner.seekTo(); 159 while (scanner.next()) { 160 // do a full scan to force some evictions 161 LOG.trace("Iterating the full scan to evict some blocks"); 162 } 163 scanner.close(); 164 LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount()); 165 // The scanner should had triggered at least 3x evictions from the prefetch, 166 // as we try cache each block without interruption. 167 assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch); 168 } 169 170 @Test 171 public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception { 172 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 173 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 174 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 175 conf.setDouble("hbase.bucketcache.minfactor", 0.95); 176 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01); 177 blockCache = BlockCacheFactory.createBlockCache(conf); 178 ColumnFamilyDescriptor family = 179 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build(); 180 cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP); 181 Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000); 182 // Prefetches the file blocks 183 LOG.debug("First read should prefetch the blocks."); 184 createReaderAndWaitForPrefetchInterruption(storeFile); 185 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 186 assertTrue(bc.getStats().getEvictedCount() > 200); 187 } 188 189 @Test 190 public void testPrefetchMetricProgress() throws Exception { 191 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 192 blockCache = BlockCacheFactory.createBlockCache(conf); 193 cacheConf = new CacheConfig(conf, blockCache); 194 Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100); 195 // Prefetches the file blocks 196 LOG.debug("First read should prefetch the blocks."); 197 readStoreFile(storeFile); 198 String regionName = storeFile.getParent().getParent().getName(); 199 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 200 MutableLong regionCachedSize = new MutableLong(0); 201 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 202 long waitedTime = Waiter.waitFor(conf, 300, () -> { 203 if (bc.getBackingMap().size() > 0) { 204 long currentSize = bc.getRegionCachedInfo().get().get(regionName); 205 assertTrue(regionCachedSize.getValue() <= currentSize); 206 LOG.debug("Logging progress of region caching: {}", currentSize); 207 regionCachedSize.setValue(currentSize); 208 } 209 return bc.getBackingMap().size() == 6; 210 }); 211 } 212 213 private void readStoreFile(Path storeFilePath) throws Exception { 214 readStoreFile(storeFilePath, (r, o) -> { 215 HFileBlock block = null; 216 try { 217 block = r.readBlock(o, -1, false, true, false, true, null, null); 218 } catch (IOException e) { 219 fail(e.getMessage()); 220 } 221 return block; 222 }, (key, block) -> { 223 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 224 if ( 225 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 226 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 227 ) { 228 assertTrue(isCached); 229 } 230 }); 231 } 232 233 private void readStoreFile(Path storeFilePath, 234 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 235 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 236 // Open the file 237 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 238 239 while (!reader.prefetchComplete()) { 240 // Sleep for a bit 241 Thread.sleep(1000); 242 } 243 long offset = 0; 244 long sizeForDataBlocks = 0; 245 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 246 HFileBlock block = readFunction.apply(reader, offset); 247 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 248 validationFunction.accept(blockCacheKey, block); 249 offset += block.getOnDiskSizeWithHeader(); 250 } 251 } 252 253 private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath) 254 throws Exception { 255 // Open the file 256 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 257 258 while (!reader.prefetchComplete()) { 259 // Sleep for a bit 260 Thread.sleep(1000); 261 } 262 assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles() 263 .get().size()); 264 265 return reader; 266 } 267 268 private Path writeStoreFile(String fname, int numKVs) throws IOException { 269 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 270 return writeStoreFile(fname, meta, numKVs); 271 } 272 273 private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { 274 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 275 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 276 .withOutputDir(storeFileParentDir).withFileContext(context).build(); 277 Random rand = ThreadLocalRandom.current(); 278 final int rowLen = 32; 279 for (int i = 0; i < numKVs; ++i) { 280 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 281 byte[] v = RandomKeyValueUtil.randomValue(rand); 282 int cfLen = rand.nextInt(k.length - rowLen + 1); 283 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 284 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 285 sfw.append(kv); 286 } 287 288 sfw.close(); 289 return sfw.getPath(); 290 } 291 292 public static KeyValue.Type generateKeyType(Random rand) { 293 if (rand.nextBoolean()) { 294 // Let's make half of KVs puts. 295 return KeyValue.Type.Put; 296 } else { 297 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 298 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 299 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 300 + "Probably the layout of KeyValue.Type has changed."); 301 } 302 return keyType; 303 } 304 } 305}