001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.EnumSet;
025import java.util.List;
026import java.util.stream.Stream;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ClusterMetrics.Option;
029import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.io.compress.Compression;
040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.MiscTests;
043import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
044import org.junit.jupiter.api.AfterEach;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.TestTemplate;
048import org.junit.jupiter.params.provider.Arguments;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A write/read/verify load test on a mini HBase cluster. Tests reading and then writing.
054 */
055@Tag(MiscTests.TAG)
056@Tag(LargeTests.TAG)
057@HBaseParameterizedTestTemplate
058public class TestMiniClusterLoadSequential {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestMiniClusterLoadSequential.class);
061
062  protected static final TableName TABLE = TableName.valueOf("load_test_tbl");
063  protected static final byte[] CF = Bytes.toBytes("load_test_cf");
064  protected static final int NUM_THREADS = 8;
065  protected static final int NUM_RS = 2;
066  protected static final int TIMEOUT_MS = 180000;
067  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
068
069  protected final Configuration conf = TEST_UTIL.getConfiguration();
070  protected final boolean isMultiPut;
071  protected final DataBlockEncoding dataBlockEncoding;
072
073  protected MultiThreadedWriter writerThreads;
074  protected MultiThreadedReader readerThreads;
075  protected int numKeys;
076
077  protected Compression.Algorithm compression = Compression.Algorithm.NONE;
078
079  public TestMiniClusterLoadSequential(boolean isMultiPut, DataBlockEncoding dataBlockEncoding) {
080    this.isMultiPut = isMultiPut;
081    this.dataBlockEncoding = dataBlockEncoding;
082    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
083
084    // We don't want any region reassignments by the load balancer during the test.
085    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f);
086  }
087
088  public static Stream<Arguments> parameters() {
089    List<Arguments> params = new ArrayList<>();
090    for (boolean multiPut : new boolean[] { false, true }) {
091      for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] { DataBlockEncoding.NONE,
092        DataBlockEncoding.PREFIX }) {
093        params.add(Arguments.of(multiPut, dataBlockEncoding));
094      }
095    }
096    return params.stream();
097  }
098
099  @BeforeEach
100  public void setUp() throws Exception {
101    LOG.debug("Test setup: isMultiPut=" + isMultiPut);
102    TEST_UTIL.startMiniCluster(NUM_RS);
103  }
104
105  @AfterEach
106  public void tearDown() throws Exception {
107    LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
108    TEST_UTIL.shutdownMiniCluster();
109  }
110
111  protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
112    Configuration conf, TableName tableName, double verifyPercent) throws IOException {
113    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
114    return reader;
115  }
116
117  protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
118    Configuration conf, TableName tableName) throws IOException {
119    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
120    writer.setMultiPut(isMultiPut);
121    return writer;
122  }
123
124  @TestTemplate
125  public void loadTest() throws Exception {
126    prepareForLoadTest();
127    runLoadTestOnExistingTable();
128  }
129
130  protected void runLoadTestOnExistingTable() throws IOException {
131    writerThreads.start(0, numKeys, NUM_THREADS);
132    writerThreads.waitForFinish();
133    assertEquals(0, writerThreads.getNumWriteFailures());
134
135    readerThreads.start(0, numKeys, NUM_THREADS);
136    readerThreads.waitForFinish();
137    assertEquals(0, readerThreads.getNumReadFailures());
138    assertEquals(0, readerThreads.getNumReadErrors());
139    assertEquals(numKeys, readerThreads.getNumKeysVerified());
140  }
141
142  protected void createPreSplitLoadTestTable(TableDescriptor tableDescriptor,
143    ColumnFamilyDescriptor familyDescriptor) throws IOException {
144    LoadTestUtil.createPreSplitLoadTestTable(conf, tableDescriptor, familyDescriptor);
145    TEST_UTIL.waitUntilAllRegionsAssigned(tableDescriptor.getTableName());
146  }
147
148  protected void prepareForLoadTest() throws IOException {
149    LOG.info(
150      "Starting load test: dataBlockEncoding=" + dataBlockEncoding + ", isMultiPut=" + isMultiPut);
151    numKeys = numKeys();
152    Admin admin = TEST_UTIL.getAdmin();
153    while (
154      admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size()
155          < NUM_RS
156    ) {
157      LOG.info("Sleeping until " + NUM_RS + " RSs are online");
158      Threads.sleepWithoutInterrupt(1000);
159    }
160    admin.close();
161
162    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE).build();
163    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(CF)
164      .setCompressionType(compression).setDataBlockEncoding(dataBlockEncoding).build();
165    createPreSplitLoadTestTable(tableDescriptor, familyDescriptor);
166
167    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
168    writerThreads = prepareWriterThreads(dataGen, conf, TABLE);
169    readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100);
170  }
171
172  protected int numKeys() {
173    return 1000;
174  }
175
176  protected ColumnFamilyDescriptor getColumnDesc(Admin admin)
177    throws TableNotFoundException, IOException {
178    return admin.getDescriptor(TABLE).getColumnFamily(CF);
179  }
180
181}