001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.AsyncConnection;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.io.compress.DictionaryCache;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.RegionServerTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051@Category({ RegionServerTests.class, LargeTests.class })
052public class TestZstdDictionarySplitMerge {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestZstdDictionarySplitMerge.class);
057
058  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
059  private static Configuration conf;
060
061  @BeforeClass
062  public static void setUp() throws Exception {
063    // NOTE: Don't put configuration settings in global site schema. We are testing if per
064    // CF or per table schema settings are applied correctly.
065    conf = TEST_UTIL.getConfiguration();
066    conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
067    Compression.Algorithm.ZSTD.reload(conf);
068    conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000);
069    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
070    TEST_UTIL.startMiniCluster(1);
071  }
072
073  @AfterClass
074  public static void tearDown() throws Exception {
075    TEST_UTIL.shutdownMiniCluster();
076  }
077
078  @Test
079  public void test() throws Exception {
080    // Create the table
081
082    final TableName tableName = TableName.valueOf("TestZstdDictionarySplitMerge");
083    final byte[] cfName = Bytes.toBytes("info");
084    final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
085    final TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
086      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfName)
087        .setCompressionType(Compression.Algorithm.ZSTD)
088        .setConfiguration(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath).build())
089      .build();
090    final Admin admin = TEST_UTIL.getAdmin();
091    admin.createTable(td, new byte[][] { Bytes.toBytes(1) });
092    TEST_UTIL.waitTableAvailable(tableName);
093
094    // Load some data
095
096    Table t = ConnectionFactory.createConnection(conf).getTable(tableName);
097    TEST_UTIL.loadNumericRows(t, cfName, 0, 100_000);
098    admin.flush(tableName);
099    assertTrue("Dictionary was not loaded", DictionaryCache.contains(dictionaryPath));
100    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
101
102    // Test split procedure
103
104    admin.split(tableName, Bytes.toBytes(50_000));
105    TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
106      @Override
107      public boolean evaluate() throws Exception {
108        return TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 3;
109      }
110
111      @Override
112      public String explainFailure() throws Exception {
113        return "Split has not finished yet";
114      }
115    });
116    TEST_UTIL.waitUntilNoRegionsInTransition();
117    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
118
119    // Test merge procedure
120
121    RegionInfo regionA = null;
122    RegionInfo regionB = null;
123    for (RegionInfo region : admin.getRegions(tableName)) {
124      if (region.getStartKey().length == 0) {
125        regionA = region;
126      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(1))) {
127        regionB = region;
128      }
129    }
130    assertNotNull(regionA);
131    assertNotNull(regionB);
132    admin
133      .mergeRegionsAsync(new byte[][] { regionA.getRegionName(), regionB.getRegionName() }, false)
134      .get(30, TimeUnit.SECONDS);
135    assertEquals(2, admin.getRegions(tableName).size());
136    ServerName expected = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName();
137    assertEquals(expected, TEST_UTIL.getConnection().getRegionLocator(tableName)
138      .getRegionLocation(Bytes.toBytes(1), true).getServerName());
139    try (AsyncConnection asyncConn = ConnectionFactory.createAsyncConnection(conf).get()) {
140      assertEquals(expected, asyncConn.getRegionLocator(tableName)
141        .getRegionLocation(Bytes.toBytes(1), true).get().getServerName());
142    }
143    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
144  }
145
146}