001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Random;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.stream.Stream;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.Waiter;
036import org.apache.hadoop.hbase.fs.HFileSystem;
037import org.apache.hadoop.hbase.io.hfile.CacheConfig;
038import org.apache.hadoop.hbase.io.hfile.HFile;
039import org.apache.hadoop.hbase.io.hfile.HFileContext;
040import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
041import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
042import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
043import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
044import org.apache.hadoop.hbase.testclassification.IOTests;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.junit.jupiter.api.AfterEach;
047import org.junit.jupiter.api.BeforeEach;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.TestTemplate;
050import org.junit.jupiter.params.provider.Arguments;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054@Tag(IOTests.TAG)
055@Tag(LargeTests.TAG)
056@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}")
057public class TestPrefetchPersistence {
058
059  @SuppressWarnings("checkstyle:Indentation")
060  public static Stream<Arguments> parameters() {
061    return Stream.of(Arguments.of(16 * 1024,
062      new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
063        28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
064        128 * 1024 + 1024 }));
065  }
066
067  final int constructedBlockSize;
068  final int[] constructedBlockSizes;
069
070  public TestPrefetchPersistence(int constructedBlockSize, int[] constructedBlockSizes) {
071    this.constructedBlockSize = constructedBlockSize;
072    this.constructedBlockSizes = constructedBlockSizes;
073  }
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchPersistence.class);
076
077  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
078
079  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
080  private static final int DATA_BLOCK_SIZE = 2048;
081  private static final int NUM_KV = 1000;
082
083  private Configuration conf;
084  private CacheConfig cacheConf;
085  private FileSystem fs;
086  String prefetchPersistencePath;
087  Path testDir;
088
089  BucketCache bucketCache;
090
091  final long capacitySize = 32 * 1024 * 1024;
092  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
093  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
094
095  @BeforeEach
096  public void setup() throws IOException {
097    conf = TEST_UTIL.getConfiguration();
098    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
099    conf.setInt(PrefetchExecutor.PREFETCH_DELAY, 0);
100    PrefetchExecutor.loadConfiguration(conf);
101    testDir = TEST_UTIL.getDataTestDir();
102    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
103    fs = HFileSystem.get(conf);
104  }
105
106  @TestTemplate
107  public void testPrefetchPersistence() throws Exception {
108    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
109      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
110      testDir + "/bucket.persistence", 60 * 1000, conf);
111    bucketCache.waitForCacheInitialization(10000);
112    cacheConf = new CacheConfig(conf, bucketCache);
113
114    long usedSize = bucketCache.getAllocator().getUsedSize();
115    assertEquals(0, usedSize);
116    assertTrue(new File(testDir + "/bucket.cache").exists());
117    // Load Cache
118    Path storeFile = writeStoreFile("TestPrefetch0");
119    Path storeFile2 = writeStoreFile("TestPrefetch1");
120    readStoreFile(storeFile);
121    readStoreFile(storeFile2);
122    usedSize = bucketCache.getAllocator().getUsedSize();
123    assertNotEquals(0, usedSize);
124
125    bucketCache.shutdown();
126    assertTrue(new File(testDir + "/bucket.persistence").exists());
127    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
128      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
129      testDir + "/bucket.persistence", 60 * 1000, conf);
130    bucketCache.waitForCacheInitialization(10000);
131    cacheConf = new CacheConfig(conf, bucketCache);
132    assertTrue(usedSize != 0);
133    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
134    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName()));
135  }
136
137  @AfterEach
138  public void cleanup() {
139    TEST_UTIL.cleanupTestDir();
140  }
141
142  public void readStoreFile(Path storeFilePath) throws Exception {
143    // Open the file
144    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
145    Waiter.waitFor(conf, 30000, () -> reader.prefetchComplete()
146      || bucketCache.fullyCachedFiles.containsKey(storeFilePath.getName()));
147  }
148
149  public Path writeStoreFile(String fname) throws IOException {
150    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
151    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
152    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
153      .withOutputDir(storeFileParentDir).withFileContext(meta).build();
154    Random rand = ThreadLocalRandom.current();
155    final int rowLen = 32;
156    for (int i = 0; i < NUM_KV; ++i) {
157      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
158      byte[] v = RandomKeyValueUtil.randomValue(rand);
159      int cfLen = rand.nextInt(k.length - rowLen + 1);
160      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
161        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
162      sfw.append(kv);
163    }
164
165    sfw.close();
166    return sfw.getPath();
167  }
168
169  public static KeyValue.Type generateKeyType(Random rand) {
170    if (rand.nextBoolean()) {
171      // Let's make half of KVs puts.
172      return KeyValue.Type.Put;
173    } else {
174      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
175      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
176        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
177          + "Probably the layout of KeyValue.Type has changed.");
178      }
179      return keyType;
180    }
181  }
182
183}