001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Random;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.stream.Stream;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.fs.HFileSystem;
036import org.apache.hadoop.hbase.io.hfile.CacheConfig;
037import org.apache.hadoop.hbase.io.hfile.HFile;
038import org.apache.hadoop.hbase.io.hfile.HFileContext;
039import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
040import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
041import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
042import org.apache.hadoop.hbase.testclassification.IOTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.junit.jupiter.api.AfterEach;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.TestTemplate;
048import org.junit.jupiter.params.provider.Arguments;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Tag(IOTests.TAG)
053@Tag(LargeTests.TAG)
054@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}")
055public class TestPrefetchPersistence {
056
057  @SuppressWarnings("checkstyle:Indentation")
058  public static Stream<Arguments> parameters() {
059    return Stream.of(Arguments.of(16 * 1024,
060      new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
061        28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
062        128 * 1024 + 1024 }));
063  }
064
065  final int constructedBlockSize;
066  final int[] constructedBlockSizes;
067
068  public TestPrefetchPersistence(int constructedBlockSize, int[] constructedBlockSizes) {
069    this.constructedBlockSize = constructedBlockSize;
070    this.constructedBlockSizes = constructedBlockSizes;
071  }
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchPersistence.class);
074
075  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
076
077  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
078  private static final int DATA_BLOCK_SIZE = 2048;
079  private static final int NUM_KV = 1000;
080
081  private Configuration conf;
082  private CacheConfig cacheConf;
083  private FileSystem fs;
084  String prefetchPersistencePath;
085  Path testDir;
086
087  BucketCache bucketCache;
088
089  final long capacitySize = 32 * 1024 * 1024;
090  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
091  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
092
093  @BeforeEach
094  public void setup() throws IOException {
095    conf = TEST_UTIL.getConfiguration();
096    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
097    testDir = TEST_UTIL.getDataTestDir();
098    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
099    fs = HFileSystem.get(conf);
100  }
101
102  @TestTemplate
103  public void testPrefetchPersistence() throws Exception {
104    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
105      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
106      testDir + "/bucket.persistence", 60 * 1000, conf);
107    bucketCache.waitForCacheInitialization(10000);
108    cacheConf = new CacheConfig(conf, bucketCache);
109
110    long usedSize = bucketCache.getAllocator().getUsedSize();
111    assertEquals(0, usedSize);
112    assertTrue(new File(testDir + "/bucket.cache").exists());
113    // Load Cache
114    Path storeFile = writeStoreFile("TestPrefetch0");
115    Path storeFile2 = writeStoreFile("TestPrefetch1");
116    readStoreFile(storeFile);
117    readStoreFile(storeFile2);
118    usedSize = bucketCache.getAllocator().getUsedSize();
119    assertNotEquals(0, usedSize);
120
121    bucketCache.shutdown();
122    assertTrue(new File(testDir + "/bucket.persistence").exists());
123    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
124      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
125      testDir + "/bucket.persistence", 60 * 1000, conf);
126    bucketCache.waitForCacheInitialization(10000);
127    cacheConf = new CacheConfig(conf, bucketCache);
128    assertTrue(usedSize != 0);
129    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
130    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName()));
131  }
132
133  @AfterEach
134  public void cleanup() {
135    TEST_UTIL.cleanupTestDir();
136  }
137
138  public void readStoreFile(Path storeFilePath) throws Exception {
139    // Open the file
140    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
141    int retries = 0;
142    while (
143      !reader.prefetchComplete()
144        && !bucketCache.fullyCachedFiles.containsKey(storeFilePath.getName()) && retries < 5
145    ) {
146      Thread.sleep(500);
147      retries++;
148    }
149  }
150
151  public Path writeStoreFile(String fname) throws IOException {
152    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
153    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
154    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
155      .withOutputDir(storeFileParentDir).withFileContext(meta).build();
156    Random rand = ThreadLocalRandom.current();
157    final int rowLen = 32;
158    for (int i = 0; i < NUM_KV; ++i) {
159      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
160      byte[] v = RandomKeyValueUtil.randomValue(rand);
161      int cfLen = rand.nextInt(k.length - rowLen + 1);
162      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
163        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
164      sfw.append(kv);
165    }
166
167    sfw.close();
168    return sfw.getPath();
169  }
170
171  public static KeyValue.Type generateKeyType(Random rand) {
172    if (rand.nextBoolean()) {
173      // Let's make half of KVs puts.
174      return KeyValue.Type.Put;
175    } else {
176      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
177      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
178        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
179          + "Probably the layout of KeyValue.Type has changed.");
180      }
181      return keyType;
182    }
183  }
184
185}