001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.List;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ClusterMetrics.Option;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HColumnDescriptor;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.TableNotFoundException;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.io.compress.Compression;
038import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.MiscTests;
041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
042import org.junit.After;
043import org.junit.Before;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.junit.runner.RunWith;
048import org.junit.runners.Parameterized;
049import org.junit.runners.Parameterized.Parameters;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * A write/read/verify load test on a mini HBase cluster. Tests reading
055 * and then writing.
056 */
057@Category({MiscTests.class, LargeTests.class})
058@RunWith(Parameterized.class)
059public class TestMiniClusterLoadSequential {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestMiniClusterLoadSequential.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(
066      TestMiniClusterLoadSequential.class);
067
068  protected static final TableName TABLE =
069      TableName.valueOf("load_test_tbl");
070  protected static final byte[] CF = Bytes.toBytes("load_test_cf");
071  protected static final int NUM_THREADS = 8;
072  protected static final int NUM_RS = 2;
073  protected static final int TIMEOUT_MS = 180000;
074  protected static final HBaseTestingUtility TEST_UTIL =
075      new HBaseTestingUtility();
076
077  protected final Configuration conf = TEST_UTIL.getConfiguration();
078  protected final boolean isMultiPut;
079  protected final DataBlockEncoding dataBlockEncoding;
080
081  protected MultiThreadedWriter writerThreads;
082  protected MultiThreadedReader readerThreads;
083  protected int numKeys;
084
085  protected Compression.Algorithm compression = Compression.Algorithm.NONE;
086
087  public TestMiniClusterLoadSequential(boolean isMultiPut,
088      DataBlockEncoding dataBlockEncoding) {
089    this.isMultiPut = isMultiPut;
090    this.dataBlockEncoding = dataBlockEncoding;
091    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
092
093    // We don't want any region reassignments by the load balancer during the test.
094    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f);
095  }
096
097  @Parameters
098  public static Collection<Object[]> parameters() {
099    List<Object[]> parameters = new ArrayList<>();
100    for (boolean multiPut : new boolean[]{false, true}) {
101      for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] {
102          DataBlockEncoding.NONE, DataBlockEncoding.PREFIX }) {
103        parameters.add(new Object[]{multiPut, dataBlockEncoding});
104      }
105    }
106    return parameters;
107  }
108
109  @Before
110  public void setUp() throws Exception {
111    LOG.debug("Test setup: isMultiPut=" + isMultiPut);
112    TEST_UTIL.startMiniCluster(NUM_RS);
113  }
114
115  @After
116  public void tearDown() throws Exception {
117    LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
118    TEST_UTIL.shutdownMiniCluster();
119  }
120
121  protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
122      Configuration conf, TableName tableName, double verifyPercent) throws IOException {
123    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
124    return reader;
125  }
126
127  protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
128      Configuration conf, TableName tableName) throws IOException {
129    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
130    writer.setMultiPut(isMultiPut);
131    return writer;
132  }
133
134  @Test
135  public void loadTest() throws Exception {
136    prepareForLoadTest();
137    runLoadTestOnExistingTable();
138  }
139
140  protected void runLoadTestOnExistingTable() throws IOException {
141    writerThreads.start(0, numKeys, NUM_THREADS);
142    writerThreads.waitForFinish();
143    assertEquals(0, writerThreads.getNumWriteFailures());
144
145    readerThreads.start(0, numKeys, NUM_THREADS);
146    readerThreads.waitForFinish();
147    assertEquals(0, readerThreads.getNumReadFailures());
148    assertEquals(0, readerThreads.getNumReadErrors());
149    assertEquals(numKeys, readerThreads.getNumKeysVerified());
150  }
151
152  protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd)
153      throws IOException {
154    HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd);
155    TEST_UTIL.waitUntilAllRegionsAssigned(htd.getTableName());
156  }
157
158  protected void prepareForLoadTest() throws IOException {
159    LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding +
160        ", isMultiPut=" + isMultiPut);
161    numKeys = numKeys();
162    Admin admin = TEST_UTIL.getAdmin();
163    while (admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
164                .getLiveServerMetrics().size() < NUM_RS) {
165      LOG.info("Sleeping until " + NUM_RS + " RSs are online");
166      Threads.sleepWithoutInterrupt(1000);
167    }
168    admin.close();
169
170    HTableDescriptor htd = new HTableDescriptor(TABLE);
171    HColumnDescriptor hcd = new HColumnDescriptor(CF)
172      .setCompressionType(compression)
173      .setDataBlockEncoding(dataBlockEncoding);
174    createPreSplitLoadTestTable(htd, hcd);
175
176    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
177    writerThreads = prepareWriterThreads(dataGen, conf, TABLE);
178    readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100);
179  }
180
181  protected int numKeys() {
182    return 1000;
183  }
184
185  protected HColumnDescriptor getColumnDesc(Admin admin)
186      throws TableNotFoundException, IOException {
187    return admin.getTableDescriptor(TABLE).getFamily(CF);
188  }
189
190}