001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.File;
029import java.io.IOException;
030import java.util.Map;
031import java.util.Random;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.function.BiConsumer;
034import java.util.function.BiFunction;
035import org.apache.commons.lang3.mutable.MutableLong;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.fs.HFileSystem;
046import org.apache.hadoop.hbase.io.ByteBuffAllocator;
047import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
048import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
049import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
050import org.apache.hadoop.hbase.testclassification.IOTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.After;
054import org.junit.Before;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
064
065@Category({ IOTests.class, MediumTests.class })
066public class TestPrefetchWithBucketCache {
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class);
073
074  @Rule
075  public TestName name = new TestName();
076
077  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
078
079  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
080  private static final int DATA_BLOCK_SIZE = 2048;
081  private Configuration conf;
082  private CacheConfig cacheConf;
083  private FileSystem fs;
084  private BlockCache blockCache;
085
086  @Before
087  public void setUp() throws IOException {
088    conf = TEST_UTIL.getConfiguration();
089    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
090    fs = HFileSystem.get(conf);
091    File testDir = new File(name.getMethodName());
092    testDir.mkdir();
093    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache");
094  }
095
096  @After
097  public void tearDown() {
098    File cacheFile = new File(name.getMethodName() + "/bucket.cache");
099    File dir = new File(name.getMethodName());
100    cacheFile.delete();
101    dir.delete();
102  }
103
104  @Test
105  public void testPrefetchDoesntOverwork() throws Exception {
106    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
107    blockCache = BlockCacheFactory.createBlockCache(conf);
108    cacheConf = new CacheConfig(conf, blockCache);
109    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100);
110    // Prefetches the file blocks
111    LOG.debug("First read should prefetch the blocks.");
112    readStoreFile(storeFile);
113    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
114    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
115    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
116    Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap());
117    // Reads file again and check we are not prefetching it again
118    LOG.debug("Second read, no prefetch should happen here.");
119    readStoreFile(storeFile);
120    // Makes sure the cache hasn't changed
121    snapshot.entrySet().forEach(e -> {
122      BucketEntry entry = bc.getBackingMap().get(e.getKey());
123      assertNotNull(entry);
124      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
125    });
126    // forcibly removes first block from the bc backing map, in order to cause it to be cached again
127    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
128    LOG.debug("removing block {}", key);
129    bc.getBackingMap().remove(key);
130    bc.getFullyCachedFiles().get().remove(storeFile.getName());
131    assertTrue(snapshot.size() > bc.getBackingMap().size());
132    LOG.debug("Third read should prefetch again, as we removed one block for the file.");
133    readStoreFile(storeFile);
134    Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size());
135    assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime());
136  }
137
138  @Test
139  public void testPrefetchInterruptOnCapacity() throws Exception {
140    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
141    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
142    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
143    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
144    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
145    blockCache = BlockCacheFactory.createBlockCache(conf);
146    cacheConf = new CacheConfig(conf, blockCache);
147    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
148    // Prefetches the file blocks
149    LOG.debug("First read should prefetch the blocks.");
150    createReaderAndWaitForPrefetchInterruption(storeFile);
151    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
152    long evictionsFirstPrefetch = bc.getStats().getEvictionCount();
153    LOG.debug("evictions after first prefetch: {}", bc.getStats().getEvictionCount());
154    HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
155    LOG.debug("evictions after second prefetch: {}", bc.getStats().getEvictionCount());
156    assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 10);
157    HFileScanner scanner = reader.getScanner(conf, true, true);
158    scanner.seekTo();
159    while (scanner.next()) {
160      // do a full scan to force some evictions
161      LOG.trace("Iterating the full scan to evict some blocks");
162    }
163    scanner.close();
164    LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount());
165    // The scanner should had triggered at least 3x evictions from the prefetch,
166    // as we try cache each block without interruption.
167    assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch);
168  }
169
170  @Test
171  public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
172    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
173    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
174    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
175    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
176    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
177    blockCache = BlockCacheFactory.createBlockCache(conf);
178    ColumnFamilyDescriptor family =
179      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
180    cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP);
181    Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000);
182    // Prefetches the file blocks
183    LOG.debug("First read should prefetch the blocks.");
184    createReaderAndWaitForPrefetchInterruption(storeFile);
185    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
186    assertTrue(bc.getStats().getEvictedCount() > 200);
187  }
188
189  @Test
190  public void testPrefetchMetricProgress() throws Exception {
191    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
192    blockCache = BlockCacheFactory.createBlockCache(conf);
193    cacheConf = new CacheConfig(conf, blockCache);
194    Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100);
195    // Prefetches the file blocks
196    LOG.debug("First read should prefetch the blocks.");
197    readStoreFile(storeFile);
198    String regionName = storeFile.getParent().getParent().getName();
199    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
200    MutableLong regionCachedSize = new MutableLong(0);
201    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
202    long waitedTime = Waiter.waitFor(conf, 300, () -> {
203      if (bc.getBackingMap().size() > 0) {
204        long currentSize = bc.getRegionCachedInfo().get().get(regionName);
205        assertTrue(regionCachedSize.getValue() <= currentSize);
206        LOG.debug("Logging progress of region caching: {}", currentSize);
207        regionCachedSize.setValue(currentSize);
208      }
209      return bc.getBackingMap().size() == 6;
210    });
211  }
212
213  private void readStoreFile(Path storeFilePath) throws Exception {
214    readStoreFile(storeFilePath, (r, o) -> {
215      HFileBlock block = null;
216      try {
217        block = r.readBlock(o, -1, false, true, false, true, null, null);
218      } catch (IOException e) {
219        fail(e.getMessage());
220      }
221      return block;
222    }, (key, block) -> {
223      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
224      if (
225        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
226          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
227      ) {
228        assertTrue(isCached);
229      }
230    });
231  }
232
233  private void readStoreFile(Path storeFilePath,
234    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
235    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
236    // Open the file
237    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
238
239    while (!reader.prefetchComplete()) {
240      // Sleep for a bit
241      Thread.sleep(1000);
242    }
243    long offset = 0;
244    long sizeForDataBlocks = 0;
245    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
246      HFileBlock block = readFunction.apply(reader, offset);
247      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
248      validationFunction.accept(blockCacheKey, block);
249      offset += block.getOnDiskSizeWithHeader();
250    }
251  }
252
253  private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath)
254    throws Exception {
255    // Open the file
256    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
257
258    while (!reader.prefetchComplete()) {
259      // Sleep for a bit
260      Thread.sleep(1000);
261    }
262    assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles()
263      .get().size());
264
265    return reader;
266  }
267
268  private Path writeStoreFile(String fname, int numKVs) throws IOException {
269    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
270    return writeStoreFile(fname, meta, numKVs);
271  }
272
273  private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException {
274    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
275    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
276      .withOutputDir(storeFileParentDir).withFileContext(context).build();
277    Random rand = ThreadLocalRandom.current();
278    final int rowLen = 32;
279    for (int i = 0; i < numKVs; ++i) {
280      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
281      byte[] v = RandomKeyValueUtil.randomValue(rand);
282      int cfLen = rand.nextInt(k.length - rowLen + 1);
283      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
284        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
285      sfw.append(kv);
286    }
287
288    sfw.close();
289    return sfw.getPath();
290  }
291
292  public static KeyValue.Type generateKeyType(Random rand) {
293    if (rand.nextBoolean()) {
294      // Let's make half of KVs puts.
295      return KeyValue.Type.Put;
296    } else {
297      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
298      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
299        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
300          + "Probably the layout of KeyValue.Type has changed.");
301      }
302      return keyType;
303    }
304  }
305}