001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Random;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.fs.HFileSystem;
036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
037import org.apache.hadoop.hbase.io.hfile.BlockType;
038import org.apache.hadoop.hbase.io.hfile.CacheConfig;
039import org.apache.hadoop.hbase.io.hfile.HFile;
040import org.apache.hadoop.hbase.io.hfile.HFileBlock;
041import org.apache.hadoop.hbase.io.hfile.HFileContext;
042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
043import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
044import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
045import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Category({ IOTests.class, MediumTests.class })
056public class TestBucketCachePersister {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestBucketCachePersister.class);
061
062  public TestName name = new TestName();
063
064  public int constructedBlockSize = 16 * 1024;
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCachePersister.class);
067
068  public int[] constructedBlockSizes =
069    new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
070      28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024 };
071
072  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
073
074  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
075  private static final int DATA_BLOCK_SIZE = 2048;
076  private static final int NUM_KV = 1000;
077
078  final long capacitySize = 32 * 1024 * 1024;
079  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
080  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
081  Path testDir;
082
083  public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) throws IOException {
084    Configuration conf;
085    conf = TEST_UTIL.getConfiguration();
086    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
087    testDir = TEST_UTIL.getDataTestDir();
088    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
089    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, bucketCachePersistInterval);
090    return conf;
091  }
092
093  public BucketCache setupBucketCache(Configuration conf, String persistentCacheFile)
094    throws IOException {
095    BucketCache bucketCache = new BucketCache("file:" + testDir + "/" + persistentCacheFile,
096      capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
097      testDir + "/bucket.persistence", 60 * 1000, conf);
098    return bucketCache;
099  }
100
101  public void cleanupBucketCache(BucketCache bucketCache) throws IOException {
102    bucketCache.shutdown();
103    TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
104    assertFalse(TEST_UTIL.getTestFileSystem().exists(testDir));
105  }
106
107  @Test
108  public void testPrefetchPersistenceCrash() throws Exception {
109    long bucketCachePersistInterval = 3000;
110    Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
111    BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrash");
112    CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
113    FileSystem fs = HFileSystem.get(conf);
114    // Load Cache
115    Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
116    Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
117    readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
118    readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
119    Thread.sleep(bucketCachePersistInterval);
120    assertTrue(new File(testDir + "/bucket.persistence").exists());
121    assertTrue(new File(testDir + "/bucket.persistence").delete());
122    cleanupBucketCache(bucketCache);
123  }
124
125  @Test
126  public void testPrefetchPersistenceCrashNegative() throws Exception {
127    long bucketCachePersistInterval = Long.MAX_VALUE;
128    Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
129    BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrashNegative");
130    CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
131    FileSystem fs = HFileSystem.get(conf);
132    // Load Cache
133    Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
134    readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
135    assertFalse(new File(testDir + "/bucket.persistence").exists());
136    cleanupBucketCache(bucketCache);
137  }
138
139  @Test
140  public void testPrefetchListUponBlockEviction() throws Exception {
141    Configuration conf = setupBucketCacheConfig(200);
142    BucketCache bucketCache = setupBucketCache(conf, "testPrefetchListUponBlockEviction");
143    CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
144    FileSystem fs = HFileSystem.get(conf);
145    // Load Blocks in cache
146    Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
147    readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
148    int retries = 0;
149    while (!bucketCache.fullyCachedFiles.containsKey(storeFile.getName()) && retries < 5) {
150      Thread.sleep(500);
151      retries++;
152    }
153    assertTrue(retries < 5);
154    BlockCacheKey bucketCacheKey = bucketCache.backingMap.entrySet().iterator().next().getKey();
155    // Evict Blocks from cache
156    bucketCache.evictBlock(bucketCacheKey);
157    assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
158    cleanupBucketCache(bucketCache);
159  }
160
161  @Test
162  public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception {
163    Configuration conf = setupBucketCacheConfig(200);
164    BucketCache bucketCache =
165      setupBucketCache(conf, "testPrefetchBlockEvictionWhilePrefetchRunning");
166    CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
167    FileSystem fs = HFileSystem.get(conf);
168    // Load Blocks in cache
169    Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
170    HFile.createReader(fs, storeFile, cacheConf, true, conf);
171    boolean evicted = false;
172    while (!PrefetchExecutor.isCompleted(storeFile)) {
173      LOG.debug("Entered loop as prefetch for {} is still running.", storeFile);
174      if (bucketCache.backingMap.size() > 0 && !evicted) {
175        Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
176          bucketCache.backingMap.entrySet().iterator();
177        // Evict a data block from cache
178        Map.Entry<BlockCacheKey, BucketEntry> entry = it.next();
179        while (it.hasNext() && !evicted) {
180          if (entry.getKey().getBlockType().equals(BlockType.DATA)) {
181            evicted = bucketCache.evictBlock(it.next().getKey());
182            LOG.debug("Attempted eviction for {}. Succeeded? {}", storeFile, evicted);
183          }
184        }
185      }
186      Thread.sleep(10);
187    }
188    assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
189    cleanupBucketCache(bucketCache);
190  }
191
192  public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf,
193    Configuration conf, BucketCache bucketCache) throws Exception {
194    // Open the file
195    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
196
197    while (!reader.prefetchComplete()) {
198      // Sleep for a bit
199      Thread.sleep(1000);
200    }
201    HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
202    BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
203    BucketEntry be = bucketCache.backingMap.get(blockCacheKey);
204    boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null;
205
206    if (
207      block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
208        || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
209    ) {
210      assertTrue(isCached);
211    }
212  }
213
214  public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs)
215    throws IOException {
216    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
217    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
218    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
219      .withOutputDir(storeFileParentDir).withFileContext(meta).build();
220    Random rand = ThreadLocalRandom.current();
221    final int rowLen = 32;
222    for (int i = 0; i < NUM_KV; ++i) {
223      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
224      byte[] v = RandomKeyValueUtil.randomValue(rand);
225      int cfLen = rand.nextInt(k.length - rowLen + 1);
226      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
227        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
228      sfw.append(kv);
229    }
230
231    sfw.close();
232    return sfw.getPath();
233  }
234
235  public static KeyValue.Type generateKeyType(Random rand) {
236    if (rand.nextBoolean()) {
237      // Let's make half of KVs puts.
238      return KeyValue.Type.Put;
239    } else {
240      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
241      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
242        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
243          + "Probably the layout of KeyValue.Type has changed.");
244      }
245      return keyType;
246    }
247  }
248
249}