001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.List; 027import java.util.Random; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Durability; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.regionserver.HRegionServer; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.Threads; 051import org.junit.jupiter.api.AfterAll; 052import org.junit.jupiter.api.BeforeAll; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * Tests changing data block encoding settings of a column family. 060 */ 061@Tag(IOTests.TAG) 062@Tag(LargeTests.TAG) 063public class TestChangingEncoding { 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestChangingEncoding.class); 066 static final String CF = "EncodingTestCF"; 067 static final byte[] CF_BYTES = Bytes.toBytes(CF); 068 069 private static final int NUM_ROWS_PER_BATCH = 100; 070 private static final int NUM_COLS_PER_ROW = 20; 071 072 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 073 private static final Configuration conf = TEST_UTIL.getConfiguration(); 074 075 private static final int TIMEOUT_MS = 600000; 076 077 private ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder; 078 079 private TableName tableName; 080 private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE = createEncodingsToIterate(); 081 082 private static final List<DataBlockEncoding> createEncodingsToIterate() { 083 List<DataBlockEncoding> encodings = new ArrayList<>(Arrays.asList(DataBlockEncoding.values())); 084 encodings.add(DataBlockEncoding.NONE); 085 return Collections.unmodifiableList(encodings); 086 } 087 088 /** A zero-based index of the current batch of test data being written */ 089 private int numBatchesWritten; 090 091 private void prepareTest(String testId) throws IOException { 092 tableName = TableName.valueOf("test_table_" + testId); 093 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); 094 columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF)); 095 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build()); 096 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 097 admin.createTable(tableDescriptorBuilder.build()); 098 } 099 numBatchesWritten = 0; 100 } 101 102 @BeforeAll 103 public static void setUpBeforeClass() throws Exception { 104 // Use a small flush size to create more HFiles. 105 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 106 // Disabling split to make sure split does not cause modify column to wait which timesout test 107 // sometime 108 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 109 "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); 110 // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE); 111 // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE); 112 TEST_UTIL.startMiniCluster(); 113 } 114 115 @AfterAll 116 public static void tearDownAfterClass() throws Exception { 117 TEST_UTIL.shutdownMiniCluster(); 118 } 119 120 private static byte[] getRowKey(int batchId, int i) { 121 return Bytes.toBytes("batch" + batchId + "_row" + i); 122 } 123 124 private static byte[] getQualifier(int j) { 125 return Bytes.toBytes("col" + j); 126 } 127 128 private static byte[] getValue(int batchId, int i, int j) { 129 return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) + "_col" + j); 130 } 131 132 static void writeTestDataBatch(TableName tableName, int batchId) throws Exception { 133 LOG.debug("Writing test data batch " + batchId); 134 List<Put> puts = new ArrayList<>(); 135 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) { 136 Put put = new Put(getRowKey(batchId, i)); 137 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { 138 put.addColumn(CF_BYTES, getQualifier(j), getValue(batchId, i, j)); 139 } 140 put.setDurability(Durability.SKIP_WAL); 141 puts.add(put); 142 } 143 try (Connection conn = ConnectionFactory.createConnection(conf); 144 Table table = conn.getTable(tableName)) { 145 table.put(puts); 146 } 147 } 148 149 static void verifyTestDataBatch(TableName tableName, int batchId) throws Exception { 150 LOG.debug("Verifying test data batch " + batchId); 151 Table table = TEST_UTIL.getConnection().getTable(tableName); 152 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) { 153 Get get = new Get(getRowKey(batchId, i)); 154 Result result = table.get(get); 155 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { 156 Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j)); 157 if (kv == null) { 158 continue; 159 } 160 assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j))); 161 } 162 } 163 table.close(); 164 } 165 166 private void writeSomeNewData() throws Exception { 167 writeTestDataBatch(tableName, numBatchesWritten); 168 ++numBatchesWritten; 169 } 170 171 private void verifyAllData() throws Exception { 172 for (int i = 0; i < numBatchesWritten; ++i) { 173 verifyTestDataBatch(tableName, i); 174 } 175 } 176 177 private void setEncodingConf(DataBlockEncoding encoding, boolean onlineChange) throws Exception { 178 LOG.debug("Setting CF encoding to " + encoding + " (ordinal=" + encoding.ordinal() 179 + "), onlineChange=" + onlineChange); 180 columnFamilyDescriptorBuilder.setDataBlockEncoding(encoding); 181 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 182 if (!onlineChange) { 183 admin.disableTable(tableName); 184 } 185 admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build()); 186 if (!onlineChange) { 187 admin.enableTable(tableName); 188 } 189 } 190 // This is a unit test, not integration test. So let's 191 // wait for regions out of transition. Otherwise, for online 192 // encoding change, verification phase may be flaky because 193 // regions could be still in transition. 194 TEST_UTIL.waitUntilNoRegionsInTransition(TIMEOUT_MS); 195 } 196 197 @Test 198 public void testChangingEncoding() throws Exception { 199 prepareTest("ChangingEncoding"); 200 for (boolean onlineChange : new boolean[] { false, true }) { 201 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) { 202 setEncodingConf(encoding, onlineChange); 203 writeSomeNewData(); 204 verifyAllData(); 205 } 206 } 207 } 208 209 @Test 210 public void testChangingEncodingWithCompaction() throws Exception { 211 prepareTest("ChangingEncodingWithCompaction"); 212 for (boolean onlineChange : new boolean[] { false, true }) { 213 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) { 214 setEncodingConf(encoding, onlineChange); 215 writeSomeNewData(); 216 verifyAllData(); 217 compactAndWait(); 218 verifyAllData(); 219 } 220 } 221 } 222 223 private void compactAndWait() throws IOException, InterruptedException { 224 LOG.debug("Compacting table " + tableName); 225 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 226 Admin admin = TEST_UTIL.getAdmin(); 227 admin.majorCompact(tableName); 228 229 // Waiting for the compaction to start, at least .5s. 230 final long maxWaitime = EnvironmentEdgeManager.currentTime() + 500; 231 boolean cont; 232 do { 233 cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0; 234 Threads.sleep(1); 235 } while (cont && EnvironmentEdgeManager.currentTime() < maxWaitime); 236 237 while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) { 238 Threads.sleep(1); 239 } 240 LOG.debug("Compaction queue size reached 0, continuing"); 241 } 242 243 @Test 244 public void testCrazyRandomChanges() throws Exception { 245 prepareTest("RandomChanges"); 246 Random rand = ThreadLocalRandom.current(); 247 for (int i = 0; i < 10; ++i) { 248 int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length); 249 DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal]; 250 setEncodingConf(encoding, rand.nextBoolean()); 251 writeSomeNewData(); 252 verifyAllData(); 253 } 254 } 255}