001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Random;
037import java.util.Set;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Waiter;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionInfoBuilder;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.fs.HFileSystem;
054import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
055import org.apache.hadoop.hbase.io.hfile.BlockCache;
056import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
057import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
058import org.apache.hadoop.hbase.io.hfile.BlockType;
059import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
060import org.apache.hadoop.hbase.io.hfile.CacheConfig;
061import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
062import org.apache.hadoop.hbase.io.hfile.HFileBlock;
063import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
064import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
065import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
066import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.testclassification.SmallTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.Pair;
072import org.junit.BeforeClass;
073import org.junit.ClassRule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * This class is used to test the functionality of the DataTieringManager.
081 *
082 * The mock online regions are stored in {@link TestDataTieringManager#testOnlineRegions}.
083 * For all tests, the setup of {@link TestDataTieringManager#testOnlineRegions} occurs only once.
084 * Please refer to {@link TestDataTieringManager#setupOnlineRegions()} for the structure.
085 * Additionally, a list of all store files is maintained in {@link TestDataTieringManager#hStoreFiles}.
086 * The characteristics of these store files are listed below:
087 * @formatter:off ## HStoreFile Information
088 *
089 * | HStoreFile       | Region             | Store               | DataTiering           | isHot |
090 * |------------------|--------------------|---------------------|-----------------------|-------|
091 * | hStoreFile0      | region1            | hStore11            | TIME_RANGE            | true  |
092 * | hStoreFile1      | region1            | hStore12            | NONE                  | true  |
093 * | hStoreFile2      | region2            | hStore21            | TIME_RANGE            | true  |
094 * | hStoreFile3      | region2            | hStore22            | TIME_RANGE            | false |
095 * @formatter:on
096 */
097
098@Category({ RegionServerTests.class, SmallTests.class })
099public class TestDataTieringManager {
100
101  @ClassRule
102  public static final HBaseClassTestRule CLASS_RULE =
103    HBaseClassTestRule.forClass(TestDataTieringManager.class);
104
105  private static final Logger LOG = LoggerFactory.getLogger(TestDataTieringManager.class);
106  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
107  private static final long DAY = 24 * 60 * 60 * 1000;
108  private static Configuration defaultConf;
109  private static FileSystem fs;
110  private static BlockCache blockCache;
111  private static CacheConfig cacheConf;
112  private static Path testDir;
113  private static final Map<String, HRegion> testOnlineRegions = new HashMap<>();
114
115  private static DataTieringManager dataTieringManager;
116  private static final List<HStoreFile> hStoreFiles = new ArrayList<>();
117
118  /**
119   * Represents the current lexicographically increasing string used as a row key when writing
120   * HFiles. It is incremented each time {@link #nextString()} is called to generate unique row
121   * keys.
122   */
123  private static String rowKeyString;
124
125  @BeforeClass
126  public static void setupBeforeClass() throws Exception {
127    testDir = TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName());
128    defaultConf = TEST_UTIL.getConfiguration();
129    updateCommonConfigurations();
130    assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
131    dataTieringManager = DataTieringManager.getInstance();
132    rowKeyString = "";
133  }
134
135  private static void updateCommonConfigurations() {
136    defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
137    defaultConf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
138    defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32);
139  }
140
141  @FunctionalInterface
142  interface DataTieringMethodCallerWithKey {
143    boolean call(DataTieringManager manager, BlockCacheKey key) throws DataTieringException;
144  }
145
146  @Test
147  public void testDataTieringEnabledWithKey() throws IOException {
148    initializeTestEnvironment();
149    DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isDataTieringEnabled;
150
151    // Test with valid key
152    BlockCacheKey key = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
153    testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, true);
154
155    // Test with another valid key
156    key = new BlockCacheKey(hStoreFiles.get(1).getPath(), 0, true, BlockType.DATA);
157    testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
158  }
159
160  @Test
161  public void testHotDataWithKey() throws IOException {
162    initializeTestEnvironment();
163    DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isHotData;
164    // Test with valid key
165    BlockCacheKey key = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
166    testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, true);
167
168    // Test with another valid key
169    key = new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA);
170    testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
171  }
172
173  @Test
174  public void testGracePeriodMakesColdFileHot() throws IOException, DataTieringException {
175    initializeTestEnvironment();
176
177    long hotAge = 1 * DAY;
178    long gracePeriod = 3 * DAY;
179
180    long currentTime = System.currentTimeMillis();
181    long fileTimestamp = currentTime - (2 * DAY);
182
183    Configuration conf = getConfWithGracePeriod(hotAge, gracePeriod);
184    HRegion region = createHRegion("tableGracePeriod", conf);
185    HStore hStore = createHStore(region, "cf1", conf);
186
187    HStoreFile file = createHStoreFile(hStore.getStoreContext().getFamilyStoreDirectoryPath(),
188      hStore.getReadOnlyConfiguration(), fileTimestamp, region.getRegionFileSystem());
189    file.initReader();
190
191    hStore.refreshStoreFiles();
192    region.stores.put(Bytes.toBytes("cf1"), hStore);
193    testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region);
194    Path hFilePath = file.getPath();
195    BlockCacheKey key = new BlockCacheKey(hFilePath, 0, true, BlockType.DATA);
196    assertTrue("File should be hot due to grace period", dataTieringManager.isHotData(key));
197  }
198
199  @Test
200  public void testFileIsColdWithoutGracePeriod() throws IOException, DataTieringException {
201    initializeTestEnvironment();
202
203    long hotAge = 1 * DAY;
204    long gracePeriod = 0;
205    long currentTime = System.currentTimeMillis();
206    long fileTimestamp = currentTime - (2 * DAY);
207
208    Configuration conf = getConfWithGracePeriod(hotAge, gracePeriod);
209    HRegion region = createHRegion("tableNoGracePeriod", conf);
210    HStore hStore = createHStore(region, "cf1", conf);
211
212    HStoreFile file = createHStoreFile(hStore.getStoreContext().getFamilyStoreDirectoryPath(),
213      hStore.getReadOnlyConfiguration(), fileTimestamp, region.getRegionFileSystem());
214    file.initReader();
215
216    hStore.refreshStoreFiles();
217    region.stores.put(Bytes.toBytes("cf1"), hStore);
218    testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region);
219
220    Path hFilePath = file.getPath();
221    BlockCacheKey key = new BlockCacheKey(hFilePath, 0, true, BlockType.DATA);
222    assertFalse("File should be cold without grace period", dataTieringManager.isHotData(key));
223  }
224
225  @Test
226  public void testPrefetchWhenDataTieringEnabled() throws IOException {
227    setPrefetchBlocksOnOpen();
228    initializeTestEnvironment();
229    // Evict blocks from cache by closing the files and passing evict on close.
230    // Then initialize the reader again. Since Prefetch on open is set to true, it should prefetch
231    // those blocks.
232    for (HStoreFile file : hStoreFiles) {
233      file.closeStoreFile(true);
234      file.initReader();
235    }
236
237    // Since we have one cold file among four files, only three should get prefetched.
238    Optional<Map<String, Pair<String, Long>>> fullyCachedFiles = blockCache.getFullyCachedFiles();
239    assertTrue("We should get the fully cached files from the cache", fullyCachedFiles.isPresent());
240    Waiter.waitFor(defaultConf, 10000, () -> fullyCachedFiles.get().size() == 3);
241    assertEquals("Number of fully cached files are incorrect", 3, fullyCachedFiles.get().size());
242  }
243
244  private void setPrefetchBlocksOnOpen() {
245    defaultConf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
246  }
247
248  @Test
249  public void testColdDataFiles() throws IOException {
250    initializeTestEnvironment();
251    Set<BlockCacheKey> allCachedBlocks = new HashSet<>();
252    for (HStoreFile file : hStoreFiles) {
253      allCachedBlocks.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA));
254    }
255
256    // Verify hStoreFile3 is identified as cold data
257    DataTieringMethodCallerWithKey methodCallerWithPath = DataTieringManager::isHotData;
258    Path hFilePath = hStoreFiles.get(3).getPath();
259    testDataTieringMethodWithKeyNoException(methodCallerWithPath,
260      new BlockCacheKey(hFilePath, 0, true, BlockType.DATA), false);
261
262    // Verify all the other files in hStoreFiles are hot data
263    for (int i = 0; i < hStoreFiles.size() - 1; i++) {
264      hFilePath = hStoreFiles.get(i).getPath();
265      testDataTieringMethodWithKeyNoException(methodCallerWithPath,
266        new BlockCacheKey(hFilePath, 0, true, BlockType.DATA), true);
267    }
268
269    try {
270      Set<String> coldFilePaths = dataTieringManager.getColdDataFiles(allCachedBlocks);
271      assertEquals(1, coldFilePaths.size());
272    } catch (DataTieringException e) {
273      fail("Unexpected DataTieringException: " + e.getMessage());
274    }
275  }
276
277  @Test
278  public void testCacheCompactedBlocksOnWriteDataTieringDisabled() throws IOException {
279    setCacheCompactBlocksOnWrite();
280    initializeTestEnvironment();
281
282    HRegion region = createHRegion("table3");
283    testCacheCompactedBlocksOnWrite(region, true);
284  }
285
286  @Test
287  public void testCacheCompactedBlocksOnWriteWithHotData() throws IOException {
288    setCacheCompactBlocksOnWrite();
289    initializeTestEnvironment();
290
291    HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(5 * DAY));
292    testCacheCompactedBlocksOnWrite(region, true);
293  }
294
295  @Test
296  public void testCacheCompactedBlocksOnWriteWithColdData() throws IOException {
297    setCacheCompactBlocksOnWrite();
298    initializeTestEnvironment();
299
300    HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(DAY));
301    testCacheCompactedBlocksOnWrite(region, false);
302  }
303
304  private void setCacheCompactBlocksOnWrite() {
305    defaultConf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, true);
306  }
307
308  private void testCacheCompactedBlocksOnWrite(HRegion region, boolean expectDataBlocksCached)
309    throws IOException {
310    HStore hStore = createHStore(region, "cf1");
311    createTestFilesForCompaction(hStore);
312    hStore.refreshStoreFiles();
313
314    region.stores.put(Bytes.toBytes("cf1"), hStore);
315    testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region);
316
317    long initialStoreFilesCount = hStore.getStorefilesCount();
318    long initialCacheDataBlockCount = blockCache.getDataBlockCount();
319    assertEquals(3, initialStoreFilesCount);
320    assertEquals(0, initialCacheDataBlockCount);
321
322    region.compact(true);
323
324    long compactedStoreFilesCount = hStore.getStorefilesCount();
325    long compactedCacheDataBlockCount = blockCache.getDataBlockCount();
326    assertEquals(1, compactedStoreFilesCount);
327    assertEquals(expectDataBlocksCached, compactedCacheDataBlockCount > 0);
328  }
329
330  private void createTestFilesForCompaction(HStore hStore) throws IOException {
331    long currentTime = System.currentTimeMillis();
332    Path storeDir = hStore.getStoreContext().getFamilyStoreDirectoryPath();
333    Configuration configuration = hStore.getReadOnlyConfiguration();
334
335    createHStoreFile(storeDir, configuration, currentTime - 2 * DAY,
336      hStore.getHRegion().getRegionFileSystem());
337    createHStoreFile(storeDir, configuration, currentTime - 3 * DAY,
338      hStore.getHRegion().getRegionFileSystem());
339    createHStoreFile(storeDir, configuration, currentTime - 4 * DAY,
340      hStore.getHRegion().getRegionFileSystem());
341  }
342
343  @Test
344  public void testPickColdDataFiles() throws IOException {
345    initializeTestEnvironment();
346    Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
347    assertEquals(1, coldDataFiles.size());
348    // hStoreFiles[3] is the cold file.
349    assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
350  }
351
352  /*
353   * Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file
354   * remains in the cache.
355   */
356  @Test
357  public void testBlockEvictions() throws Exception {
358    initializeTestEnvironment();
359    long capacitySize = 40 * 1024;
360    int writeThreads = 3;
361    int writerQLen = 64;
362    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
363
364    // Setup: Create a bucket cache with lower capacity
365    BucketCache bucketCache =
366      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
367        writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
368
369    // Create three Cache keys with cold data files and a block with hot data.
370    // hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
371    Set<BlockCacheKey> cacheKeys = new HashSet<>();
372    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
373    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
374    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
375
376    // Create dummy data to be cached and fill the cache completely.
377    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
378
379    int blocksIter = 0;
380    for (BlockCacheKey key : cacheKeys) {
381      bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
382      // Ensure that the block is persisted to the file.
383      Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
384    }
385
386    // Verify that the bucket cache contains 3 blocks.
387    assertEquals(3, bucketCache.getBackingMap().keySet().size());
388
389    // Add an additional block into cache with hot data which should trigger the eviction
390    BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
391    CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
392
393    bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
394    Waiter.waitFor(defaultConf, 10000, 100,
395      () -> (bucketCache.getBackingMap().containsKey(newKey)));
396
397    // Verify that the bucket cache now contains 2 hot blocks blocks only.
398    // Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional
399    // space.
400    validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
401  }
402
403  /*
404   * Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold
405   * block remains in the cache since the required space is freed.
406   */
407  @Test
408  public void testBlockEvictionsAllColdBlocks() throws Exception {
409    initializeTestEnvironment();
410    long capacitySize = 40 * 1024;
411    int writeThreads = 3;
412    int writerQLen = 64;
413    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
414
415    // Setup: Create a bucket cache with lower capacity
416    BucketCache bucketCache =
417      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
418        writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
419
420    // Create three Cache keys with three cold data blocks.
421    // hStoreFiles.get(3) is a cold data file.
422    Set<BlockCacheKey> cacheKeys = new HashSet<>();
423    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
424    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
425    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA));
426
427    // Create dummy data to be cached and fill the cache completely.
428    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
429
430    int blocksIter = 0;
431    for (BlockCacheKey key : cacheKeys) {
432      bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
433      // Ensure that the block is persisted to the file.
434      Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
435    }
436
437    // Verify that the bucket cache contains 3 blocks.
438    assertEquals(3, bucketCache.getBackingMap().keySet().size());
439
440    // Add an additional block into cache with hot data which should trigger the eviction
441    BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
442    CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
443
444    bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
445    Waiter.waitFor(defaultConf, 10000, 100,
446      () -> (bucketCache.getBackingMap().containsKey(newKey)));
447
448    // Verify that the bucket cache now contains 1 cold block and a newly added hot block.
449    validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
450  }
451
452  /*
453   * Verify that a hot block evicted along with a cold block when bucket reaches its capacity.
454   */
455  @Test
456  public void testBlockEvictionsHotBlocks() throws Exception {
457    initializeTestEnvironment();
458    long capacitySize = 40 * 1024;
459    int writeThreads = 3;
460    int writerQLen = 64;
461    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
462
463    // Setup: Create a bucket cache with lower capacity
464    BucketCache bucketCache =
465      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
466        writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
467
468    // Create three Cache keys with two hot data blocks and one cold data block
469    // hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
470    Set<BlockCacheKey> cacheKeys = new HashSet<>();
471    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
472    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
473    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
474
475    // Create dummy data to be cached and fill the cache completely.
476    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
477
478    int blocksIter = 0;
479    for (BlockCacheKey key : cacheKeys) {
480      bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
481      // Ensure that the block is persisted to the file.
482      Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
483    }
484
485    // Verify that the bucket cache contains 3 blocks.
486    assertEquals(3, bucketCache.getBackingMap().keySet().size());
487
488    // Add an additional block which should evict the only cold block with an additional hot block.
489    BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
490    CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
491
492    bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
493    Waiter.waitFor(defaultConf, 10000, 100,
494      () -> (bucketCache.getBackingMap().containsKey(newKey)));
495
496    // Verify that the bucket cache now contains 2 hot blocks.
497    // Only one of the older hot blocks is retained and other one is the newly added hot block.
498    validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
499  }
500
501  @Test
502  public void testFeatureKeyDisabled() throws Exception {
503    DataTieringManager.resetForTestingOnly();
504    defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, false);
505    initializeTestEnvironment();
506
507    try {
508      assertFalse(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
509      // Verify that the DataaTieringManager instance is not instantiated in the
510      // instantiate call above.
511      assertNull(DataTieringManager.getInstance());
512
513      // Also validate that data temperature is not honoured.
514      long capacitySize = 40 * 1024;
515      int writeThreads = 3;
516      int writerQLen = 64;
517      int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
518
519      // Setup: Create a bucket cache with lower capacity
520      BucketCache bucketCache =
521        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
522          writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
523
524      // Create three Cache keys with two hot data blocks and one cold data block
525      // hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
526      List<BlockCacheKey> cacheKeys = new ArrayList<>();
527      cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
528      cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
529      cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
530
531      // Create dummy data to be cached and fill the cache completely.
532      CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
533
534      int blocksIter = 0;
535      for (BlockCacheKey key : cacheKeys) {
536        LOG.info("Adding {}", key);
537        bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
538        // Ensure that the block is persisted to the file.
539        Waiter.waitFor(defaultConf, 10000, 100,
540          () -> (bucketCache.getBackingMap().containsKey(key)));
541      }
542
543      // Verify that the bucket cache contains 3 blocks.
544      assertEquals(3, bucketCache.getBackingMap().keySet().size());
545
546      // Add an additional hot block, which triggers eviction.
547      BlockCacheKey newKey =
548        new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
549      CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
550
551      bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
552      Waiter.waitFor(defaultConf, 10000, 100,
553        () -> (bucketCache.getBackingMap().containsKey(newKey)));
554
555      // Verify that the bucket still contains the only cold block and one newly added hot block.
556      // The older hot blocks are evicted and data-tiering mechanism does not kick in to evict
557      // the cold block.
558      validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
559    } finally {
560      DataTieringManager.resetForTestingOnly();
561      defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
562      assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
563    }
564  }
565
566  @Test
567  public void testCacheConfigShouldCacheFile() throws Exception {
568    // Evict the files from cache.
569    for (HStoreFile file : hStoreFiles) {
570      file.closeStoreFile(true);
571    }
572    // Verify that the API shouldCacheFileBlock returns the result correctly.
573    // hStoreFiles[0], hStoreFiles[1], hStoreFiles[2] are hot files.
574    // hStoreFiles[3] is a cold file.
575    try {
576      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
577        hStoreFiles.get(0).getFileInfo().getHFileInfo(),
578        hStoreFiles.get(0).getFileInfo().getConf()));
579      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
580        hStoreFiles.get(1).getFileInfo().getHFileInfo(),
581        hStoreFiles.get(1).getFileInfo().getConf()));
582      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
583        hStoreFiles.get(2).getFileInfo().getHFileInfo(),
584        hStoreFiles.get(2).getFileInfo().getConf()));
585      assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
586        hStoreFiles.get(3).getFileInfo().getHFileInfo(),
587        hStoreFiles.get(3).getFileInfo().getConf()));
588    } finally {
589      for (HStoreFile file : hStoreFiles) {
590        file.initReader();
591      }
592    }
593  }
594
595  @Test
596  public void testCacheOnReadColdFile() throws Exception {
597    initializeTestEnvironment();
598    // hStoreFiles[3] is a cold file. the blocks should not get loaded after a readBlock call.
599    HStoreFile hStoreFile = hStoreFiles.get(3);
600    BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, BlockType.DATA);
601    testCacheOnRead(hStoreFile, cacheKey, -1, false);
602  }
603
604  @Test
605  public void testCacheOnReadHotFile() throws Exception {
606    initializeTestEnvironment();
607    // hStoreFiles[0] is a hot file. the blocks should get loaded after a readBlock call.
608    HStoreFile hStoreFile = hStoreFiles.get(0);
609    BlockCacheKey cacheKey =
610      new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
611    testCacheOnRead(hStoreFile, cacheKey, -1, true);
612  }
613
614  private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long onDiskBlockSize,
615    boolean expectedCached) throws Exception {
616    // Execute the read block API which will try to cache the block if the block is a hot block.
617    hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), onDiskBlockSize, true, false,
618      false, false, key.getBlockType(), DataBlockEncoding.NONE);
619    // Validate that the hot block gets cached and cold block is not cached.
620    HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false, BlockType.DATA);
621    if (expectedCached) {
622      assertNotNull(block);
623    } else {
624      assertNull(block);
625    }
626  }
627
628  private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
629    int expectedColdBlocks) {
630    int numHotBlocks = 0, numColdBlocks = 0;
631
632    Waiter.waitFor(defaultConf, 10000, 100, () -> (expectedTotalKeys == keys.size()));
633    int iter = 0;
634    for (BlockCacheKey key : keys) {
635      try {
636        if (dataTieringManager.isHotData(key)) {
637          numHotBlocks++;
638        } else {
639          numColdBlocks++;
640        }
641      } catch (Exception e) {
642        fail("Unexpected exception!");
643      }
644    }
645    assertEquals(expectedHotBlocks, numHotBlocks);
646    assertEquals(expectedColdBlocks, numColdBlocks);
647  }
648
649  private void testDataTieringMethodWithKey(DataTieringMethodCallerWithKey caller,
650    BlockCacheKey key, boolean expectedResult, DataTieringException exception) {
651    try {
652      boolean value = caller.call(dataTieringManager, key);
653      if (exception != null) {
654        fail("Expected DataTieringException to be thrown");
655      }
656      assertEquals(expectedResult, value);
657    } catch (DataTieringException e) {
658      if (exception == null) {
659        fail("Unexpected DataTieringException: " + e.getMessage());
660      }
661      assertEquals(exception.getMessage(), e.getMessage());
662    }
663  }
664
665  private void testDataTieringMethodWithKeyExpectingException(DataTieringMethodCallerWithKey caller,
666    BlockCacheKey key, DataTieringException exception) {
667    testDataTieringMethodWithKey(caller, key, false, exception);
668  }
669
670  private void testDataTieringMethodWithKeyNoException(DataTieringMethodCallerWithKey caller,
671    BlockCacheKey key, boolean expectedResult) {
672    testDataTieringMethodWithKey(caller, key, expectedResult, null);
673  }
674
675  private static void initializeTestEnvironment() throws IOException {
676    setupFileSystemAndCache();
677    setupOnlineRegions();
678  }
679
680  private static void setupFileSystemAndCache() throws IOException {
681    fs = HFileSystem.get(defaultConf);
682    blockCache = BlockCacheFactory.createBlockCache(defaultConf);
683    cacheConf = new CacheConfig(defaultConf, blockCache);
684  }
685
686  private static void setupOnlineRegions() throws IOException {
687    testOnlineRegions.clear();
688    hStoreFiles.clear();
689    long day = 24 * 60 * 60 * 1000;
690    long currentTime = System.currentTimeMillis();
691
692    HRegion region1 = createHRegion("table1");
693
694    HStore hStore11 = createHStore(region1, "cf1", getConfWithTimeRangeDataTieringEnabled(day));
695    hStoreFiles.add(createHStoreFile(hStore11.getStoreContext().getFamilyStoreDirectoryPath(),
696      hStore11.getReadOnlyConfiguration(), currentTime, region1.getRegionFileSystem()));
697    hStore11.refreshStoreFiles();
698    HStore hStore12 = createHStore(region1, "cf2");
699    hStoreFiles.add(createHStoreFile(hStore12.getStoreContext().getFamilyStoreDirectoryPath(),
700      hStore12.getReadOnlyConfiguration(), currentTime - day, region1.getRegionFileSystem()));
701    hStore12.refreshStoreFiles();
702
703    region1.stores.put(Bytes.toBytes("cf1"), hStore11);
704    region1.stores.put(Bytes.toBytes("cf2"), hStore12);
705
706    HRegion region2 =
707      createHRegion("table2", getConfWithTimeRangeDataTieringEnabled((long) (2.5 * day)));
708
709    HStore hStore21 = createHStore(region2, "cf1");
710    hStoreFiles.add(createHStoreFile(hStore21.getStoreContext().getFamilyStoreDirectoryPath(),
711      hStore21.getReadOnlyConfiguration(), currentTime - 2 * day, region2.getRegionFileSystem()));
712    hStore21.refreshStoreFiles();
713    HStore hStore22 = createHStore(region2, "cf2");
714    hStoreFiles.add(createHStoreFile(hStore22.getStoreContext().getFamilyStoreDirectoryPath(),
715      hStore22.getReadOnlyConfiguration(), currentTime - 3 * day, region2.getRegionFileSystem()));
716    hStore22.refreshStoreFiles();
717
718    region2.stores.put(Bytes.toBytes("cf1"), hStore21);
719    region2.stores.put(Bytes.toBytes("cf2"), hStore22);
720
721    for (HStoreFile file : hStoreFiles) {
722      file.initReader();
723    }
724
725    testOnlineRegions.put(region1.getRegionInfo().getEncodedName(), region1);
726    testOnlineRegions.put(region2.getRegionInfo().getEncodedName(), region2);
727  }
728
729  private static HRegion createHRegion(String table) throws IOException {
730    return createHRegion(table, defaultConf);
731  }
732
733  private static HRegion createHRegion(String table, Configuration conf) throws IOException {
734    TableName tableName = TableName.valueOf(table);
735
736    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
737      .setValue(DataTieringManager.DATATIERING_KEY, conf.get(DataTieringManager.DATATIERING_KEY))
738      .setValue(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY,
739        conf.get(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY))
740      .setValue(DataTieringManager.HSTORE_DATATIERING_GRACE_PERIOD_MILLIS_KEY,
741        conf.get(DataTieringManager.HSTORE_DATATIERING_GRACE_PERIOD_MILLIS_KEY))
742      .build();
743    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
744
745    Configuration testConf = new Configuration(conf);
746    CommonFSUtils.setRootDir(testConf, testDir);
747    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
748      CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
749
750    HRegion region = new HRegion(regionFs, null, conf, htd, null);
751    // Manually sets the BlockCache for the HRegion instance.
752    // This is necessary because the region server is not started within this method,
753    // and therefore the BlockCache needs to be explicitly configured.
754    region.setBlockCache(blockCache);
755    return region;
756  }
757
758  private static HStore createHStore(HRegion region, String columnFamily) throws IOException {
759    return createHStore(region, columnFamily, defaultConf);
760  }
761
762  private static HStore createHStore(HRegion region, String columnFamily, Configuration conf)
763    throws IOException {
764    ColumnFamilyDescriptor columnFamilyDescriptor =
765      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily))
766        .setValue(DataTieringManager.DATATIERING_KEY, conf.get(DataTieringManager.DATATIERING_KEY))
767        .setValue(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY,
768          conf.get(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY))
769        .setValue(DataTieringManager.HSTORE_DATATIERING_GRACE_PERIOD_MILLIS_KEY,
770          conf.get(DataTieringManager.HSTORE_DATATIERING_GRACE_PERIOD_MILLIS_KEY))
771        .build();
772
773    return new HStore(region, columnFamilyDescriptor, conf, false);
774  }
775
776  private static Configuration getConfWithTimeRangeDataTieringEnabled(long hotDataAge) {
777    Configuration conf = new Configuration(defaultConf);
778    conf.set(DataTieringManager.DATATIERING_KEY, DataTieringType.TIME_RANGE.name());
779    conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(hotDataAge));
780    return conf;
781  }
782
783  private static Configuration getConfWithGracePeriod(long hotDataAge, long gracePeriod) {
784    Configuration conf = getConfWithTimeRangeDataTieringEnabled(hotDataAge);
785    conf.set(DataTieringManager.HSTORE_DATATIERING_GRACE_PERIOD_MILLIS_KEY,
786      String.valueOf(gracePeriod));
787    return conf;
788  }
789
790  static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp,
791    HRegionFileSystem regionFs) throws IOException {
792    String columnFamily = storeDir.getName();
793
794    StoreFileWriter storeFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
795      .withOutputDir(storeDir).withFileContext(new HFileContextBuilder().build()).build();
796
797    writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily), timestamp);
798
799    StoreContext storeContext = StoreContext.getBuilder().withRegionFileSystem(regionFs).build();
800
801    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, storeContext);
802    return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, BloomType.NONE, true,
803      sft);
804  }
805
806  /**
807   * Writes random data to a store file with rows arranged in lexicographically increasing order.
808   * Each row is generated using the {@link #nextString()} method, ensuring that each subsequent row
809   * is lexicographically larger than the previous one.
810   */
811  private static void writeStoreFileRandomData(final StoreFileWriter writer, byte[] columnFamily,
812    long timestamp) throws IOException {
813    int cellsPerFile = 10;
814    byte[] qualifier = Bytes.toBytes("qualifier");
815    byte[] value = generateRandomBytes(4 * 1024);
816    try {
817      for (int i = 0; i < cellsPerFile; i++) {
818        byte[] row = Bytes.toBytes(nextString());
819        writer.append(new KeyValue(row, columnFamily, qualifier, timestamp, value));
820      }
821    } finally {
822      writer.appendTrackedTimestampsToMetadata();
823      writer.close();
824    }
825  }
826
827  private static byte[] generateRandomBytes(int sizeInBytes) {
828    Random random = new Random();
829    byte[] randomBytes = new byte[sizeInBytes];
830    random.nextBytes(randomBytes);
831    return randomBytes;
832  }
833
834  /**
835   * Returns the lexicographically larger string every time it's called.
836   */
837  private static String nextString() {
838    if (rowKeyString == null || rowKeyString.isEmpty()) {
839      rowKeyString = "a";
840    }
841    char lastChar = rowKeyString.charAt(rowKeyString.length() - 1);
842    if (lastChar < 'z') {
843      rowKeyString = rowKeyString.substring(0, rowKeyString.length() - 1) + (char) (lastChar + 1);
844    } else {
845      rowKeyString = rowKeyString + "a";
846    }
847    return rowKeyString;
848  }
849}