001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.List;
027import java.util.Random;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Durability;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.regionserver.HRegionServer;
047import org.apache.hadoop.hbase.testclassification.IOTests;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.Threads;
052import org.junit.AfterClass;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061 * Tests changing data block encoding settings of a column family.
062 */
063@Category({ IOTests.class, LargeTests.class })
064public class TestChangingEncoding {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestChangingEncoding.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestChangingEncoding.class);
071  static final String CF = "EncodingTestCF";
072  static final byte[] CF_BYTES = Bytes.toBytes(CF);
073
074  private static final int NUM_ROWS_PER_BATCH = 100;
075  private static final int NUM_COLS_PER_ROW = 20;
076
077  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
078  private static final Configuration conf = TEST_UTIL.getConfiguration();
079
080  private static final int TIMEOUT_MS = 600000;
081
082  private ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder;
083
084  private TableName tableName;
085  private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE = createEncodingsToIterate();
086
087  private static final List<DataBlockEncoding> createEncodingsToIterate() {
088    List<DataBlockEncoding> encodings = new ArrayList<>(Arrays.asList(DataBlockEncoding.values()));
089    encodings.add(DataBlockEncoding.NONE);
090    return Collections.unmodifiableList(encodings);
091  }
092
093  /** A zero-based index of the current batch of test data being written */
094  private int numBatchesWritten;
095
096  private void prepareTest(String testId) throws IOException {
097    tableName = TableName.valueOf("test_table_" + testId);
098    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
099    columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF));
100    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
101    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
102      admin.createTable(tableDescriptorBuilder.build());
103    }
104    numBatchesWritten = 0;
105  }
106
107  @BeforeClass
108  public static void setUpBeforeClass() throws Exception {
109    // Use a small flush size to create more HFiles.
110    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
111    // Disabling split to make sure split does not cause modify column to wait which timesout test
112    // sometime
113    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
114      "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy");
115    // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE);
116    // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE);
117    TEST_UTIL.startMiniCluster();
118  }
119
120  @AfterClass
121  public static void tearDownAfterClass() throws Exception {
122    TEST_UTIL.shutdownMiniCluster();
123  }
124
125  private static byte[] getRowKey(int batchId, int i) {
126    return Bytes.toBytes("batch" + batchId + "_row" + i);
127  }
128
129  private static byte[] getQualifier(int j) {
130    return Bytes.toBytes("col" + j);
131  }
132
133  private static byte[] getValue(int batchId, int i, int j) {
134    return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) + "_col" + j);
135  }
136
137  static void writeTestDataBatch(TableName tableName, int batchId) throws Exception {
138    LOG.debug("Writing test data batch " + batchId);
139    List<Put> puts = new ArrayList<>();
140    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
141      Put put = new Put(getRowKey(batchId, i));
142      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
143        put.addColumn(CF_BYTES, getQualifier(j), getValue(batchId, i, j));
144      }
145      put.setDurability(Durability.SKIP_WAL);
146      puts.add(put);
147    }
148    try (Connection conn = ConnectionFactory.createConnection(conf);
149      Table table = conn.getTable(tableName)) {
150      table.put(puts);
151    }
152  }
153
154  static void verifyTestDataBatch(TableName tableName, int batchId) throws Exception {
155    LOG.debug("Verifying test data batch " + batchId);
156    Table table = TEST_UTIL.getConnection().getTable(tableName);
157    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
158      Get get = new Get(getRowKey(batchId, i));
159      Result result = table.get(get);
160      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
161        Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j));
162        if (kv == null) {
163          continue;
164        }
165        assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j)));
166      }
167    }
168    table.close();
169  }
170
171  private void writeSomeNewData() throws Exception {
172    writeTestDataBatch(tableName, numBatchesWritten);
173    ++numBatchesWritten;
174  }
175
176  private void verifyAllData() throws Exception {
177    for (int i = 0; i < numBatchesWritten; ++i) {
178      verifyTestDataBatch(tableName, i);
179    }
180  }
181
182  private void setEncodingConf(DataBlockEncoding encoding, boolean onlineChange) throws Exception {
183    LOG.debug("Setting CF encoding to " + encoding + " (ordinal=" + encoding.ordinal()
184      + "), onlineChange=" + onlineChange);
185    columnFamilyDescriptorBuilder.setDataBlockEncoding(encoding);
186    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
187      if (!onlineChange) {
188        admin.disableTable(tableName);
189      }
190      admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build());
191      if (!onlineChange) {
192        admin.enableTable(tableName);
193      }
194    }
195    // This is a unit test, not integration test. So let's
196    // wait for regions out of transition. Otherwise, for online
197    // encoding change, verification phase may be flaky because
198    // regions could be still in transition.
199    TEST_UTIL.waitUntilNoRegionsInTransition(TIMEOUT_MS);
200  }
201
202  @Test
203  public void testChangingEncoding() throws Exception {
204    prepareTest("ChangingEncoding");
205    for (boolean onlineChange : new boolean[] { false, true }) {
206      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
207        setEncodingConf(encoding, onlineChange);
208        writeSomeNewData();
209        verifyAllData();
210      }
211    }
212  }
213
214  @Test
215  public void testChangingEncodingWithCompaction() throws Exception {
216    prepareTest("ChangingEncodingWithCompaction");
217    for (boolean onlineChange : new boolean[] { false, true }) {
218      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
219        setEncodingConf(encoding, onlineChange);
220        writeSomeNewData();
221        verifyAllData();
222        compactAndWait();
223        verifyAllData();
224      }
225    }
226  }
227
228  private void compactAndWait() throws IOException, InterruptedException {
229    LOG.debug("Compacting table " + tableName);
230    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
231    Admin admin = TEST_UTIL.getAdmin();
232    admin.majorCompact(tableName);
233
234    // Waiting for the compaction to start, at least .5s.
235    final long maxWaitime = EnvironmentEdgeManager.currentTime() + 500;
236    boolean cont;
237    do {
238      cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0;
239      Threads.sleep(1);
240    } while (cont && EnvironmentEdgeManager.currentTime() < maxWaitime);
241
242    while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
243      Threads.sleep(1);
244    }
245    LOG.debug("Compaction queue size reached 0, continuing");
246  }
247
248  @Test
249  public void testCrazyRandomChanges() throws Exception {
250    prepareTest("RandomChanges");
251    Random rand = ThreadLocalRandom.current();
252    for (int i = 0; i < 10; ++i) {
253      int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
254      DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
255      setEncodingConf(encoding, rand.nextBoolean());
256      writeSomeNewData();
257      verifyAllData();
258    }
259  }
260}