001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseTestingUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.AsyncConnection;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.io.compress.Compression;
040import org.apache.hadoop.hbase.io.compress.DictionaryCache;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.RegionServerTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.jupiter.api.AfterAll;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049@Tag(RegionServerTests.TAG)
050@Tag(LargeTests.TAG)
051public class TestZstdDictionarySplitMerge {
052
053  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
054  private static Configuration conf;
055
056  @BeforeAll
057  public static void setUp() throws Exception {
058    // NOTE: Don't put configuration settings in global site schema. We are testing if per
059    // CF or per table schema settings are applied correctly.
060    conf = TEST_UTIL.getConfiguration();
061    conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
062    Compression.Algorithm.ZSTD.reload(conf);
063    conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000);
064    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
065    TEST_UTIL.startMiniCluster(1);
066  }
067
068  @AfterAll
069  public static void tearDown() throws Exception {
070    TEST_UTIL.shutdownMiniCluster();
071  }
072
073  @Test
074  public void test() throws Exception {
075    // Create the table
076    final TableName tableName = TableName.valueOf("TestZstdDictionarySplitMerge");
077    final byte[] cfName = Bytes.toBytes("info");
078    final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
079    final TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
080      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfName)
081        .setCompressionType(Compression.Algorithm.ZSTD)
082        .setConfiguration(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath).build())
083      .build();
084    final Admin admin = TEST_UTIL.getAdmin();
085    admin.createTable(td, new byte[][] { Bytes.toBytes(1) });
086    TEST_UTIL.waitTableAvailable(tableName);
087    // Load some data
088    Table t = ConnectionFactory.createConnection(conf).getTable(tableName);
089    TEST_UTIL.loadNumericRows(t, cfName, 0, 100_000);
090    admin.flush(tableName);
091    assertTrue(DictionaryCache.contains(dictionaryPath), "Dictionary was not loaded");
092    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
093    // Test split procedure
094    admin.split(tableName, Bytes.toBytes(50_000));
095    TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
096      @Override
097      public boolean evaluate() throws Exception {
098        return TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 3;
099      }
100
101      @Override
102      public String explainFailure() throws Exception {
103        return "Split has not finished yet";
104      }
105    });
106    TEST_UTIL.waitUntilNoRegionsInTransition();
107    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
108    // Test merge procedure
109    RegionInfo regionA = null;
110    RegionInfo regionB = null;
111    for (RegionInfo region : admin.getRegions(tableName)) {
112      if (region.getStartKey().length == 0) {
113        regionA = region;
114      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(1))) {
115        regionB = region;
116      }
117    }
118    assertNotNull(regionA);
119    assertNotNull(regionB);
120    admin
121      .mergeRegionsAsync(new byte[][] { regionA.getRegionName(), regionB.getRegionName() }, false)
122      .get(30, TimeUnit.SECONDS);
123    assertEquals(2, admin.getRegions(tableName).size());
124    ServerName expected = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName();
125    assertEquals(expected, TEST_UTIL.getConnection().getRegionLocator(tableName)
126      .getRegionLocation(Bytes.toBytes(1), true).getServerName());
127    try (AsyncConnection asyncConn = ConnectionFactory.createAsyncConnection(conf).get()) {
128      assertEquals(expected, asyncConn.getRegionLocator(tableName)
129        .getRegionLocation(Bytes.toBytes(1), true).get().getServerName());
130    }
131    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
132  }
133}