001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.List;
027import java.util.Random;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HColumnDescriptor;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HTableDescriptor;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Durability;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.Threads;
050import org.junit.AfterClass;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Tests changing data block encoding settings of a column family.
060 */
061@Category({IOTests.class, LargeTests.class})
062public class TestChangingEncoding {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestChangingEncoding.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestChangingEncoding.class);
069  static final String CF = "EncodingTestCF";
070  static final byte[] CF_BYTES = Bytes.toBytes(CF);
071
072  private static final int NUM_ROWS_PER_BATCH = 100;
073  private static final int NUM_COLS_PER_ROW = 20;
074
075  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
076  private static final Configuration conf = TEST_UTIL.getConfiguration();
077
078  private static final int TIMEOUT_MS = 600000;
079
080  private HColumnDescriptor hcd;
081
082  private TableName tableName;
083  private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE =
084      createEncodingsToIterate();
085
086  private static final List<DataBlockEncoding> createEncodingsToIterate() {
087    List<DataBlockEncoding> encodings = new ArrayList<>(Arrays.asList(DataBlockEncoding.values()));
088    encodings.add(DataBlockEncoding.NONE);
089    return Collections.unmodifiableList(encodings);
090  }
091
092  /** A zero-based index of the current batch of test data being written */
093  private int numBatchesWritten;
094
095  private void prepareTest(String testId) throws IOException {
096    tableName = TableName.valueOf("test_table_" + testId);
097    HTableDescriptor htd = new HTableDescriptor(tableName);
098    hcd = new HColumnDescriptor(CF);
099    htd.addFamily(hcd);
100    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
101      admin.createTable(htd);
102    }
103    numBatchesWritten = 0;
104  }
105
106  @BeforeClass
107  public static void setUpBeforeClass() throws Exception {
108    // Use a small flush size to create more HFiles.
109    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
110    // Disabling split to make sure split does not cause modify column to wait which timesout test
111    // sometime
112    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
113        "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy");
114    // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE);
115    // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE);
116    TEST_UTIL.startMiniCluster();
117  }
118
119  @AfterClass
120  public static void tearDownAfterClass() throws Exception {
121    TEST_UTIL.shutdownMiniCluster();
122  }
123
124  private static byte[] getRowKey(int batchId, int i) {
125    return Bytes.toBytes("batch" + batchId + "_row" + i);
126  }
127
128  private static byte[] getQualifier(int j) {
129    return Bytes.toBytes("col" + j);
130  }
131
132  private static byte[] getValue(int batchId, int i, int j) {
133    return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i))
134        + "_col" + j);
135  }
136
137  static void writeTestDataBatch(TableName tableName,
138      int batchId) throws Exception {
139    LOG.debug("Writing test data batch " + batchId);
140    List<Put> puts = new ArrayList<>();
141    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
142      Put put = new Put(getRowKey(batchId, i));
143      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
144        put.addColumn(CF_BYTES, getQualifier(j), getValue(batchId, i, j));
145      }
146      put.setDurability(Durability.SKIP_WAL);
147      puts.add(put);
148    }
149    try (Connection conn = ConnectionFactory.createConnection(conf);
150        Table table = conn.getTable(tableName)) {
151      table.put(puts);
152    }
153  }
154
155  static void verifyTestDataBatch(TableName tableName,
156      int batchId) throws Exception {
157    LOG.debug("Verifying test data batch " + batchId);
158    Table table = TEST_UTIL.getConnection().getTable(tableName);
159    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
160      Get get = new Get(getRowKey(batchId, i));
161      Result result = table.get(get);
162      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
163        Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j));
164        if (kv == null) {
165          continue;
166        }
167        assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j)));
168      }
169    }
170    table.close();
171  }
172
173  private void writeSomeNewData() throws Exception {
174    writeTestDataBatch(tableName, numBatchesWritten);
175    ++numBatchesWritten;
176  }
177
178  private void verifyAllData() throws Exception {
179    for (int i = 0; i < numBatchesWritten; ++i) {
180      verifyTestDataBatch(tableName, i);
181    }
182  }
183
184  private void setEncodingConf(DataBlockEncoding encoding,
185      boolean onlineChange) throws Exception {
186    LOG.debug("Setting CF encoding to " + encoding + " (ordinal="
187      + encoding.ordinal() + "), onlineChange=" + onlineChange);
188    hcd.setDataBlockEncoding(encoding);
189    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
190      if (!onlineChange) {
191        admin.disableTable(tableName);
192      }
193      admin.modifyColumnFamily(tableName, hcd);
194      if (!onlineChange) {
195        admin.enableTable(tableName);
196      }
197    }
198    // This is a unit test, not integration test. So let's
199    // wait for regions out of transition. Otherwise, for online
200    // encoding change, verification phase may be flaky because
201    // regions could be still in transition.
202    TEST_UTIL.waitUntilNoRegionsInTransition(TIMEOUT_MS);
203  }
204
205  @Test
206  public void testChangingEncoding() throws Exception {
207    prepareTest("ChangingEncoding");
208    for (boolean onlineChange : new boolean[]{false, true}) {
209      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
210        setEncodingConf(encoding, onlineChange);
211        writeSomeNewData();
212        verifyAllData();
213      }
214    }
215  }
216
217  @Test
218  public void testChangingEncodingWithCompaction() throws Exception {
219    prepareTest("ChangingEncodingWithCompaction");
220    for (boolean onlineChange : new boolean[]{false, true}) {
221      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
222        setEncodingConf(encoding, onlineChange);
223        writeSomeNewData();
224        verifyAllData();
225        compactAndWait();
226        verifyAllData();
227      }
228    }
229  }
230
231  private void compactAndWait() throws IOException, InterruptedException {
232    LOG.debug("Compacting table " + tableName);
233    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
234    Admin admin = TEST_UTIL.getAdmin();
235    admin.majorCompact(tableName);
236
237    // Waiting for the compaction to start, at least .5s.
238    final long maxWaitime = System.currentTimeMillis() + 500;
239    boolean cont;
240    do {
241      cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
242      Threads.sleep(1);
243    } while (cont && System.currentTimeMillis() < maxWaitime);
244
245    while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
246      Threads.sleep(1);
247    }
248    LOG.debug("Compaction queue size reached 0, continuing");
249  }
250
251  @Test
252  public void testCrazyRandomChanges() throws Exception {
253    prepareTest("RandomChanges");
254    Random rand = new Random(2934298742974297L);
255    for (int i = 0; i < 10; ++i) {
256      int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
257      DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
258      setEncodingConf(encoding, rand.nextBoolean());
259      writeSomeNewData();
260      verifyAllData();
261    }
262  }
263}