001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Random;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.stream.Stream;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.fs.HFileSystem;
036import org.apache.hadoop.hbase.io.hfile.CacheConfig;
037import org.apache.hadoop.hbase.io.hfile.HFile;
038import org.apache.hadoop.hbase.io.hfile.HFileContext;
039import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
040import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
041import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
042import org.apache.hadoop.hbase.testclassification.IOTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.junit.jupiter.api.BeforeEach;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.TestTemplate;
047import org.junit.jupiter.params.provider.Arguments;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Tag(IOTests.TAG)
052@Tag(LargeTests.TAG)
053@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}")
054public class TestPrefetchPersistence {
055
056  @SuppressWarnings("checkstyle:Indentation")
057  public static Stream<Arguments> parameters() {
058    return Stream.of(Arguments.of(16 * 1024,
059      new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
060        28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
061        128 * 1024 + 1024 }));
062  }
063
064  final int constructedBlockSize;
065  final int[] constructedBlockSizes;
066
067  public TestPrefetchPersistence(int constructedBlockSize, int[] constructedBlockSizes) {
068    this.constructedBlockSize = constructedBlockSize;
069    this.constructedBlockSizes = constructedBlockSizes;
070  }
071
072  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchPersistence.class);
073
074  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
075
076  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
077  private static final int DATA_BLOCK_SIZE = 2048;
078  private static final int NUM_KV = 1000;
079
080  private Configuration conf;
081  private CacheConfig cacheConf;
082  private FileSystem fs;
083  String prefetchPersistencePath;
084  Path testDir;
085
086  BucketCache bucketCache;
087
088  final long capacitySize = 32 * 1024 * 1024;
089  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
090  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
091
092  @BeforeEach
093  public void setup() throws IOException {
094    conf = TEST_UTIL.getConfiguration();
095    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
096    testDir = TEST_UTIL.getDataTestDir();
097    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
098    fs = HFileSystem.get(conf);
099  }
100
101  @TestTemplate
102  public void testPrefetchPersistence() throws Exception {
103    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
104      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
105      testDir + "/bucket.persistence", 60 * 1000, conf);
106    bucketCache.waitForCacheInitialization(10000);
107    cacheConf = new CacheConfig(conf, bucketCache);
108
109    long usedSize = bucketCache.getAllocator().getUsedSize();
110    assertEquals(0, usedSize);
111    assertTrue(new File(testDir + "/bucket.cache").exists());
112    // Load Cache
113    Path storeFile = writeStoreFile("TestPrefetch0");
114    Path storeFile2 = writeStoreFile("TestPrefetch1");
115    readStoreFile(storeFile);
116    readStoreFile(storeFile2);
117    usedSize = bucketCache.getAllocator().getUsedSize();
118    assertNotEquals(0, usedSize);
119
120    bucketCache.shutdown();
121    assertTrue(new File(testDir + "/bucket.persistence").exists());
122    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
123      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
124      testDir + "/bucket.persistence", 60 * 1000, conf);
125    bucketCache.waitForCacheInitialization(10000);
126    cacheConf = new CacheConfig(conf, bucketCache);
127    assertTrue(usedSize != 0);
128    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
129    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName()));
130    TEST_UTIL.cleanupTestDir();
131  }
132
133  public void readStoreFile(Path storeFilePath) throws Exception {
134    // Open the file
135    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
136    while (!reader.prefetchComplete()) {
137      // Sleep for a bit
138      Thread.sleep(1000);
139    }
140  }
141
142  public Path writeStoreFile(String fname) throws IOException {
143    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
144    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
145    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
146      .withOutputDir(storeFileParentDir).withFileContext(meta).build();
147    Random rand = ThreadLocalRandom.current();
148    final int rowLen = 32;
149    for (int i = 0; i < NUM_KV; ++i) {
150      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
151      byte[] v = RandomKeyValueUtil.randomValue(rand);
152      int cfLen = rand.nextInt(k.length - rowLen + 1);
153      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
154        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
155      sfw.append(kv);
156    }
157
158    sfw.close();
159    return sfw.getPath();
160  }
161
162  public static KeyValue.Type generateKeyType(Random rand) {
163    if (rand.nextBoolean()) {
164      // Let's make half of KVs puts.
165      return KeyValue.Type.Put;
166    } else {
167      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
168      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
169        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
170          + "Probably the layout of KeyValue.Type has changed.");
171      }
172      return keyType;
173    }
174  }
175
176}