001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.AsyncConnection; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.compress.DictionaryCache; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.RegionServerTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051@Category({ RegionServerTests.class, LargeTests.class }) 052public class TestZstdDictionarySplitMerge { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestZstdDictionarySplitMerge.class); 057 058 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 059 private static Configuration conf; 060 061 @BeforeClass 062 public static void setUp() throws Exception { 063 // NOTE: Don't put configuration settings in global site schema. We are testing if per 064 // CF or per table schema settings are applied correctly. 065 conf = TEST_UTIL.getConfiguration(); 066 conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName()); 067 Compression.Algorithm.ZSTD.reload(conf); 068 conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000); 069 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 070 TEST_UTIL.startMiniCluster(1); 071 } 072 073 @AfterClass 074 public static void tearDown() throws Exception { 075 TEST_UTIL.shutdownMiniCluster(); 076 } 077 078 @Test 079 public void test() throws Exception { 080 // Create the table 081 082 final TableName tableName = TableName.valueOf("TestZstdDictionarySplitMerge"); 083 final byte[] cfName = Bytes.toBytes("info"); 084 final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict"; 085 final TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 086 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfName) 087 .setCompressionType(Compression.Algorithm.ZSTD) 088 .setConfiguration(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath).build()) 089 .build(); 090 final Admin admin = TEST_UTIL.getAdmin(); 091 admin.createTable(td, new byte[][] { Bytes.toBytes(1) }); 092 TEST_UTIL.waitTableAvailable(tableName); 093 094 // Load some data 095 096 Table t = ConnectionFactory.createConnection(conf).getTable(tableName); 097 TEST_UTIL.loadNumericRows(t, cfName, 0, 100_000); 098 admin.flush(tableName); 099 assertTrue("Dictionary was not loaded", DictionaryCache.contains(dictionaryPath)); 100 TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0); 101 102 // Test split procedure 103 104 admin.split(tableName, Bytes.toBytes(50_000)); 105 TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 106 @Override 107 public boolean evaluate() throws Exception { 108 return TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 3; 109 } 110 111 @Override 112 public String explainFailure() throws Exception { 113 return "Split has not finished yet"; 114 } 115 }); 116 TEST_UTIL.waitUntilNoRegionsInTransition(); 117 TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0); 118 119 // Test merge procedure 120 121 RegionInfo regionA = null; 122 RegionInfo regionB = null; 123 for (RegionInfo region : admin.getRegions(tableName)) { 124 if (region.getStartKey().length == 0) { 125 regionA = region; 126 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(1))) { 127 regionB = region; 128 } 129 } 130 assertNotNull(regionA); 131 assertNotNull(regionB); 132 admin 133 .mergeRegionsAsync(new byte[][] { regionA.getRegionName(), regionB.getRegionName() }, false) 134 .get(30, TimeUnit.SECONDS); 135 assertEquals(2, admin.getRegions(tableName).size()); 136 ServerName expected = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(); 137 assertEquals(expected, TEST_UTIL.getConnection().getRegionLocator(tableName) 138 .getRegionLocation(Bytes.toBytes(1), true).getServerName()); 139 try (AsyncConnection asyncConn = ConnectionFactory.createAsyncConnection(conf).get()) { 140 assertEquals(expected, asyncConn.getRegionLocator(tableName) 141 .getRegionLocation(Bytes.toBytes(1), true).get().getServerName()); 142 } 143 TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0); 144 } 145 146}