001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.List; 027import java.util.Random; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HColumnDescriptor; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.regionserver.HRegionServer; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.Threads; 050import org.junit.AfterClass; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * Tests changing data block encoding settings of a column family. 060 */ 061@Category({IOTests.class, LargeTests.class}) 062public class TestChangingEncoding { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestChangingEncoding.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestChangingEncoding.class); 069 static final String CF = "EncodingTestCF"; 070 static final byte[] CF_BYTES = Bytes.toBytes(CF); 071 072 private static final int NUM_ROWS_PER_BATCH = 100; 073 private static final int NUM_COLS_PER_ROW = 20; 074 075 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 076 private static final Configuration conf = TEST_UTIL.getConfiguration(); 077 078 private static final int TIMEOUT_MS = 600000; 079 080 private HColumnDescriptor hcd; 081 082 private TableName tableName; 083 private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE = 084 createEncodingsToIterate(); 085 086 private static final List<DataBlockEncoding> createEncodingsToIterate() { 087 List<DataBlockEncoding> encodings = new ArrayList<>(Arrays.asList(DataBlockEncoding.values())); 088 encodings.add(DataBlockEncoding.NONE); 089 return Collections.unmodifiableList(encodings); 090 } 091 092 /** A zero-based index of the current batch of test data being written */ 093 private int numBatchesWritten; 094 095 private void prepareTest(String testId) throws IOException { 096 tableName = TableName.valueOf("test_table_" + testId); 097 HTableDescriptor htd = new HTableDescriptor(tableName); 098 hcd = new HColumnDescriptor(CF); 099 htd.addFamily(hcd); 100 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 101 admin.createTable(htd); 102 } 103 numBatchesWritten = 0; 104 } 105 106 @BeforeClass 107 public static void setUpBeforeClass() throws Exception { 108 // Use a small flush size to create more HFiles. 109 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 110 // Disabling split to make sure split does not cause modify column to wait which timesout test 111 // sometime 112 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 113 "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); 114 // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE); 115 // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE); 116 TEST_UTIL.startMiniCluster(); 117 } 118 119 @AfterClass 120 public static void tearDownAfterClass() throws Exception { 121 TEST_UTIL.shutdownMiniCluster(); 122 } 123 124 private static byte[] getRowKey(int batchId, int i) { 125 return Bytes.toBytes("batch" + batchId + "_row" + i); 126 } 127 128 private static byte[] getQualifier(int j) { 129 return Bytes.toBytes("col" + j); 130 } 131 132 private static byte[] getValue(int batchId, int i, int j) { 133 return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) 134 + "_col" + j); 135 } 136 137 static void writeTestDataBatch(TableName tableName, 138 int batchId) throws Exception { 139 LOG.debug("Writing test data batch " + batchId); 140 List<Put> puts = new ArrayList<>(); 141 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) { 142 Put put = new Put(getRowKey(batchId, i)); 143 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { 144 put.addColumn(CF_BYTES, getQualifier(j), getValue(batchId, i, j)); 145 } 146 put.setDurability(Durability.SKIP_WAL); 147 puts.add(put); 148 } 149 try (Connection conn = ConnectionFactory.createConnection(conf); 150 Table table = conn.getTable(tableName)) { 151 table.put(puts); 152 } 153 } 154 155 static void verifyTestDataBatch(TableName tableName, 156 int batchId) throws Exception { 157 LOG.debug("Verifying test data batch " + batchId); 158 Table table = TEST_UTIL.getConnection().getTable(tableName); 159 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) { 160 Get get = new Get(getRowKey(batchId, i)); 161 Result result = table.get(get); 162 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { 163 Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j)); 164 if (kv == null) { 165 continue; 166 } 167 assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j))); 168 } 169 } 170 table.close(); 171 } 172 173 private void writeSomeNewData() throws Exception { 174 writeTestDataBatch(tableName, numBatchesWritten); 175 ++numBatchesWritten; 176 } 177 178 private void verifyAllData() throws Exception { 179 for (int i = 0; i < numBatchesWritten; ++i) { 180 verifyTestDataBatch(tableName, i); 181 } 182 } 183 184 private void setEncodingConf(DataBlockEncoding encoding, 185 boolean onlineChange) throws Exception { 186 LOG.debug("Setting CF encoding to " + encoding + " (ordinal=" 187 + encoding.ordinal() + "), onlineChange=" + onlineChange); 188 hcd.setDataBlockEncoding(encoding); 189 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 190 if (!onlineChange) { 191 admin.disableTable(tableName); 192 } 193 admin.modifyColumnFamily(tableName, hcd); 194 if (!onlineChange) { 195 admin.enableTable(tableName); 196 } 197 } 198 // This is a unit test, not integration test. So let's 199 // wait for regions out of transition. Otherwise, for online 200 // encoding change, verification phase may be flaky because 201 // regions could be still in transition. 202 TEST_UTIL.waitUntilNoRegionsInTransition(TIMEOUT_MS); 203 } 204 205 @Test 206 public void testChangingEncoding() throws Exception { 207 prepareTest("ChangingEncoding"); 208 for (boolean onlineChange : new boolean[]{false, true}) { 209 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) { 210 setEncodingConf(encoding, onlineChange); 211 writeSomeNewData(); 212 verifyAllData(); 213 } 214 } 215 } 216 217 @Test 218 public void testChangingEncodingWithCompaction() throws Exception { 219 prepareTest("ChangingEncodingWithCompaction"); 220 for (boolean onlineChange : new boolean[]{false, true}) { 221 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) { 222 setEncodingConf(encoding, onlineChange); 223 writeSomeNewData(); 224 verifyAllData(); 225 compactAndWait(); 226 verifyAllData(); 227 } 228 } 229 } 230 231 private void compactAndWait() throws IOException, InterruptedException { 232 LOG.debug("Compacting table " + tableName); 233 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 234 Admin admin = TEST_UTIL.getAdmin(); 235 admin.majorCompact(tableName); 236 237 // Waiting for the compaction to start, at least .5s. 238 final long maxWaitime = System.currentTimeMillis() + 500; 239 boolean cont; 240 do { 241 cont = rs.compactSplitThread.getCompactionQueueSize() == 0; 242 Threads.sleep(1); 243 } while (cont && System.currentTimeMillis() < maxWaitime); 244 245 while (rs.compactSplitThread.getCompactionQueueSize() > 0) { 246 Threads.sleep(1); 247 } 248 LOG.debug("Compaction queue size reached 0, continuing"); 249 } 250 251 @Test 252 public void testCrazyRandomChanges() throws Exception { 253 prepareTest("RandomChanges"); 254 Random rand = new Random(2934298742974297L); 255 for (int i = 0; i < 10; ++i) { 256 int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length); 257 DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal]; 258 setEncodingConf(encoding, rand.nextBoolean()); 259 writeSomeNewData(); 260 verifyAllData(); 261 } 262 } 263}