001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.List;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ClusterMetrics.Option;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.io.compress.Compression;
040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.testclassification.MiscTests;
043import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
044import org.junit.After;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.junit.runner.RunWith;
050import org.junit.runners.Parameterized;
051import org.junit.runners.Parameterized.Parameters;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * A write/read/verify load test on a mini HBase cluster. Tests reading and then writing.
057 */
058@Category({ MiscTests.class, MediumTests.class })
059@RunWith(Parameterized.class)
060public class TestMiniClusterLoadSequential {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestMiniClusterLoadSequential.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestMiniClusterLoadSequential.class);
067
068  protected static final TableName TABLE = TableName.valueOf("load_test_tbl");
069  protected static final byte[] CF = Bytes.toBytes("load_test_cf");
070  protected static final int NUM_THREADS = 8;
071  protected static final int NUM_RS = 2;
072  protected static final int TIMEOUT_MS = 180000;
073  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
074
075  protected final Configuration conf = TEST_UTIL.getConfiguration();
076  protected final boolean isMultiPut;
077  protected final DataBlockEncoding dataBlockEncoding;
078
079  protected MultiThreadedWriter writerThreads;
080  protected MultiThreadedReader readerThreads;
081  protected int numKeys;
082
083  protected Compression.Algorithm compression = Compression.Algorithm.NONE;
084
085  public TestMiniClusterLoadSequential(boolean isMultiPut, DataBlockEncoding dataBlockEncoding) {
086    this.isMultiPut = isMultiPut;
087    this.dataBlockEncoding = dataBlockEncoding;
088    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
089
090    // We don't want any region reassignments by the load balancer during the test.
091    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f);
092  }
093
094  @Parameters
095  public static Collection<Object[]> parameters() {
096    List<Object[]> parameters = new ArrayList<>();
097    for (boolean multiPut : new boolean[] { false, true }) {
098      for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] { DataBlockEncoding.NONE,
099        DataBlockEncoding.PREFIX }) {
100        parameters.add(new Object[] { multiPut, dataBlockEncoding });
101      }
102    }
103    return parameters;
104  }
105
106  @Before
107  public void setUp() throws Exception {
108    LOG.debug("Test setup: isMultiPut=" + isMultiPut);
109    TEST_UTIL.startMiniCluster(NUM_RS);
110  }
111
112  @After
113  public void tearDown() throws Exception {
114    LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
115    TEST_UTIL.shutdownMiniCluster();
116  }
117
118  protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
119    Configuration conf, TableName tableName, double verifyPercent) throws IOException {
120    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
121    return reader;
122  }
123
124  protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
125    Configuration conf, TableName tableName) throws IOException {
126    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
127    writer.setMultiPut(isMultiPut);
128    return writer;
129  }
130
131  @Test
132  public void loadTest() throws Exception {
133    prepareForLoadTest();
134    runLoadTestOnExistingTable();
135  }
136
137  protected void runLoadTestOnExistingTable() throws IOException {
138    writerThreads.start(0, numKeys, NUM_THREADS);
139    writerThreads.waitForFinish();
140    assertEquals(0, writerThreads.getNumWriteFailures());
141
142    readerThreads.start(0, numKeys, NUM_THREADS);
143    readerThreads.waitForFinish();
144    assertEquals(0, readerThreads.getNumReadFailures());
145    assertEquals(0, readerThreads.getNumReadErrors());
146    assertEquals(numKeys, readerThreads.getNumKeysVerified());
147  }
148
149  protected void createPreSplitLoadTestTable(TableDescriptor tableDescriptor,
150    ColumnFamilyDescriptor familyDescriptor) throws IOException {
151    HBaseTestingUtil.createPreSplitLoadTestTable(conf, tableDescriptor, familyDescriptor);
152    TEST_UTIL.waitUntilAllRegionsAssigned(tableDescriptor.getTableName());
153  }
154
155  protected void prepareForLoadTest() throws IOException {
156    LOG.info(
157      "Starting load test: dataBlockEncoding=" + dataBlockEncoding + ", isMultiPut=" + isMultiPut);
158    numKeys = numKeys();
159    Admin admin = TEST_UTIL.getAdmin();
160    while (
161      admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size()
162          < NUM_RS
163    ) {
164      LOG.info("Sleeping until " + NUM_RS + " RSs are online");
165      Threads.sleepWithoutInterrupt(1000);
166    }
167    admin.close();
168
169    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE).build();
170    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(CF)
171      .setCompressionType(compression).setDataBlockEncoding(dataBlockEncoding).build();
172    createPreSplitLoadTestTable(tableDescriptor, familyDescriptor);
173
174    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
175    writerThreads = prepareWriterThreads(dataGen, conf, TABLE);
176    readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100);
177  }
178
179  protected int numKeys() {
180    return 1000;
181  }
182
183  protected ColumnFamilyDescriptor getColumnDesc(Admin admin)
184    throws TableNotFoundException, IOException {
185    return admin.getDescriptor(TABLE).getColumnFamily(CF);
186  }
187
188}