001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.AsyncConnection; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.io.compress.DictionaryCache; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.testclassification.RegionServerTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.jupiter.api.AfterAll; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048 049@Tag(RegionServerTests.TAG) 050@Tag(LargeTests.TAG) 051public class TestZstdDictionarySplitMerge { 052 053 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 054 private static Configuration conf; 055 056 @BeforeAll 057 public static void setUp() throws Exception { 058 // NOTE: Don't put configuration settings in global site schema. We are testing if per 059 // CF or per table schema settings are applied correctly. 060 conf = TEST_UTIL.getConfiguration(); 061 conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName()); 062 Compression.Algorithm.ZSTD.reload(conf); 063 conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000); 064 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 065 TEST_UTIL.startMiniCluster(1); 066 } 067 068 @AfterAll 069 public static void tearDown() throws Exception { 070 TEST_UTIL.shutdownMiniCluster(); 071 } 072 073 @Test 074 public void test() throws Exception { 075 // Create the table 076 final TableName tableName = TableName.valueOf("TestZstdDictionarySplitMerge"); 077 final byte[] cfName = Bytes.toBytes("info"); 078 final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict"; 079 final TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 080 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfName) 081 .setCompressionType(Compression.Algorithm.ZSTD) 082 .setConfiguration(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath).build()) 083 .build(); 084 final Admin admin = TEST_UTIL.getAdmin(); 085 admin.createTable(td, new byte[][] { Bytes.toBytes(1) }); 086 TEST_UTIL.waitTableAvailable(tableName); 087 // Load some data 088 Table t = ConnectionFactory.createConnection(conf).getTable(tableName); 089 TEST_UTIL.loadNumericRows(t, cfName, 0, 100_000); 090 admin.flush(tableName); 091 assertTrue(DictionaryCache.contains(dictionaryPath), "Dictionary was not loaded"); 092 TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0); 093 // Test split procedure 094 admin.split(tableName, Bytes.toBytes(50_000)); 095 TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 096 @Override 097 public boolean evaluate() throws Exception { 098 return TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 3; 099 } 100 101 @Override 102 public String explainFailure() throws Exception { 103 return "Split has not finished yet"; 104 } 105 }); 106 TEST_UTIL.waitUntilNoRegionsInTransition(); 107 TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0); 108 // Test merge procedure 109 RegionInfo regionA = null; 110 RegionInfo regionB = null; 111 for (RegionInfo region : admin.getRegions(tableName)) { 112 if (region.getStartKey().length == 0) { 113 regionA = region; 114 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(1))) { 115 regionB = region; 116 } 117 } 118 assertNotNull(regionA); 119 assertNotNull(regionB); 120 admin 121 .mergeRegionsAsync(new byte[][] { regionA.getRegionName(), regionB.getRegionName() }, false) 122 .get(30, TimeUnit.SECONDS); 123 assertEquals(2, admin.getRegions(tableName).size()); 124 ServerName expected = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(); 125 assertEquals(expected, TEST_UTIL.getConnection().getRegionLocator(tableName) 126 .getRegionLocation(Bytes.toBytes(1), true).getServerName()); 127 try (AsyncConnection asyncConn = ConnectionFactory.createAsyncConnection(conf).get()) { 128 assertEquals(expected, asyncConn.getRegionLocator(tableName) 129 .getRegionLocation(Bytes.toBytes(1), true).get().getServerName()); 130 } 131 TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0); 132 } 133}