001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.List;
027import java.util.Random;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Durability;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.Threads;
051import org.junit.jupiter.api.AfterAll;
052import org.junit.jupiter.api.BeforeAll;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Tests changing data block encoding settings of a column family.
060 */
061@Tag(IOTests.TAG)
062@Tag(LargeTests.TAG)
063public class TestChangingEncoding {
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestChangingEncoding.class);
066  static final String CF = "EncodingTestCF";
067  static final byte[] CF_BYTES = Bytes.toBytes(CF);
068
069  private static final int NUM_ROWS_PER_BATCH = 100;
070  private static final int NUM_COLS_PER_ROW = 20;
071
072  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
073  private static final Configuration conf = TEST_UTIL.getConfiguration();
074
075  private static final int TIMEOUT_MS = 600000;
076
077  private ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder;
078
079  private TableName tableName;
080  private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE = createEncodingsToIterate();
081
082  private static final List<DataBlockEncoding> createEncodingsToIterate() {
083    List<DataBlockEncoding> encodings = new ArrayList<>(Arrays.asList(DataBlockEncoding.values()));
084    encodings.add(DataBlockEncoding.NONE);
085    return Collections.unmodifiableList(encodings);
086  }
087
088  /** A zero-based index of the current batch of test data being written */
089  private int numBatchesWritten;
090
091  private void prepareTest(String testId) throws IOException {
092    tableName = TableName.valueOf("test_table_" + testId);
093    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
094    columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF));
095    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
096    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
097      admin.createTable(tableDescriptorBuilder.build());
098    }
099    numBatchesWritten = 0;
100  }
101
102  @BeforeAll
103  public static void setUpBeforeClass() throws Exception {
104    // Use a small flush size to create more HFiles.
105    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
106    // Disabling split to make sure split does not cause modify column to wait which timesout test
107    // sometime
108    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
109      "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy");
110    // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE);
111    // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE);
112    TEST_UTIL.startMiniCluster();
113  }
114
115  @AfterAll
116  public static void tearDownAfterClass() throws Exception {
117    TEST_UTIL.shutdownMiniCluster();
118  }
119
120  private static byte[] getRowKey(int batchId, int i) {
121    return Bytes.toBytes("batch" + batchId + "_row" + i);
122  }
123
124  private static byte[] getQualifier(int j) {
125    return Bytes.toBytes("col" + j);
126  }
127
128  private static byte[] getValue(int batchId, int i, int j) {
129    return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) + "_col" + j);
130  }
131
132  static void writeTestDataBatch(TableName tableName, int batchId) throws Exception {
133    LOG.debug("Writing test data batch " + batchId);
134    List<Put> puts = new ArrayList<>();
135    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
136      Put put = new Put(getRowKey(batchId, i));
137      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
138        put.addColumn(CF_BYTES, getQualifier(j), getValue(batchId, i, j));
139      }
140      put.setDurability(Durability.SKIP_WAL);
141      puts.add(put);
142    }
143    try (Connection conn = ConnectionFactory.createConnection(conf);
144      Table table = conn.getTable(tableName)) {
145      table.put(puts);
146    }
147  }
148
149  static void verifyTestDataBatch(TableName tableName, int batchId) throws Exception {
150    LOG.debug("Verifying test data batch " + batchId);
151    Table table = TEST_UTIL.getConnection().getTable(tableName);
152    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
153      Get get = new Get(getRowKey(batchId, i));
154      Result result = table.get(get);
155      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
156        Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j));
157        if (kv == null) {
158          continue;
159        }
160        assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j)));
161      }
162    }
163    table.close();
164  }
165
166  private void writeSomeNewData() throws Exception {
167    writeTestDataBatch(tableName, numBatchesWritten);
168    ++numBatchesWritten;
169  }
170
171  private void verifyAllData() throws Exception {
172    for (int i = 0; i < numBatchesWritten; ++i) {
173      verifyTestDataBatch(tableName, i);
174    }
175  }
176
177  private void setEncodingConf(DataBlockEncoding encoding, boolean onlineChange) throws Exception {
178    LOG.debug("Setting CF encoding to " + encoding + " (ordinal=" + encoding.ordinal()
179      + "), onlineChange=" + onlineChange);
180    columnFamilyDescriptorBuilder.setDataBlockEncoding(encoding);
181    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
182      if (!onlineChange) {
183        admin.disableTable(tableName);
184      }
185      admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build());
186      if (!onlineChange) {
187        admin.enableTable(tableName);
188      }
189    }
190    // This is a unit test, not integration test. So let's
191    // wait for regions out of transition. Otherwise, for online
192    // encoding change, verification phase may be flaky because
193    // regions could be still in transition.
194    TEST_UTIL.waitUntilNoRegionsInTransition(TIMEOUT_MS);
195  }
196
197  @Test
198  public void testChangingEncoding() throws Exception {
199    prepareTest("ChangingEncoding");
200    for (boolean onlineChange : new boolean[] { false, true }) {
201      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
202        setEncodingConf(encoding, onlineChange);
203        writeSomeNewData();
204        verifyAllData();
205      }
206    }
207  }
208
209  @Test
210  public void testChangingEncodingWithCompaction() throws Exception {
211    prepareTest("ChangingEncodingWithCompaction");
212    for (boolean onlineChange : new boolean[] { false, true }) {
213      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
214        setEncodingConf(encoding, onlineChange);
215        writeSomeNewData();
216        verifyAllData();
217        compactAndWait();
218        verifyAllData();
219      }
220    }
221  }
222
223  private void compactAndWait() throws IOException, InterruptedException {
224    LOG.debug("Compacting table " + tableName);
225    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
226    Admin admin = TEST_UTIL.getAdmin();
227    admin.majorCompact(tableName);
228
229    // Waiting for the compaction to start, at least .5s.
230    final long maxWaitime = EnvironmentEdgeManager.currentTime() + 500;
231    boolean cont;
232    do {
233      cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0;
234      Threads.sleep(1);
235    } while (cont && EnvironmentEdgeManager.currentTime() < maxWaitime);
236
237    while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
238      Threads.sleep(1);
239    }
240    LOG.debug("Compaction queue size reached 0, continuing");
241  }
242
243  @Test
244  public void testCrazyRandomChanges() throws Exception {
245    prepareTest("RandomChanges");
246    Random rand = ThreadLocalRandom.current();
247    for (int i = 0; i < 10; ++i) {
248      int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
249      DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
250      setEncodingConf(encoding, rand.nextBoolean());
251      writeSomeNewData();
252      verifyAllData();
253    }
254  }
255}