001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.EnumSet; 025import java.util.List; 026import java.util.stream.Stream; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ClusterMetrics.Option; 029import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.TableNotFoundException; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.testclassification.MiscTests; 043import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 044import org.junit.jupiter.api.AfterEach; 045import org.junit.jupiter.api.BeforeEach; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.TestTemplate; 048import org.junit.jupiter.params.provider.Arguments; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A write/read/verify load test on a mini HBase cluster. Tests reading and then writing. 054 */ 055@Tag(MiscTests.TAG) 056@Tag(LargeTests.TAG) 057@HBaseParameterizedTestTemplate 058public class TestMiniClusterLoadSequential { 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestMiniClusterLoadSequential.class); 061 062 protected static final TableName TABLE = TableName.valueOf("load_test_tbl"); 063 protected static final byte[] CF = Bytes.toBytes("load_test_cf"); 064 protected static final int NUM_THREADS = 8; 065 protected static final int NUM_RS = 2; 066 protected static final int TIMEOUT_MS = 180000; 067 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 068 069 protected final Configuration conf = TEST_UTIL.getConfiguration(); 070 protected final boolean isMultiPut; 071 protected final DataBlockEncoding dataBlockEncoding; 072 073 protected MultiThreadedWriter writerThreads; 074 protected MultiThreadedReader readerThreads; 075 protected int numKeys; 076 077 protected Compression.Algorithm compression = Compression.Algorithm.NONE; 078 079 public TestMiniClusterLoadSequential(boolean isMultiPut, DataBlockEncoding dataBlockEncoding) { 080 this.isMultiPut = isMultiPut; 081 this.dataBlockEncoding = dataBlockEncoding; 082 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 083 084 // We don't want any region reassignments by the load balancer during the test. 085 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f); 086 } 087 088 public static Stream<Arguments> parameters() { 089 List<Arguments> params = new ArrayList<>(); 090 for (boolean multiPut : new boolean[] { false, true }) { 091 for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] { DataBlockEncoding.NONE, 092 DataBlockEncoding.PREFIX }) { 093 params.add(Arguments.of(multiPut, dataBlockEncoding)); 094 } 095 } 096 return params.stream(); 097 } 098 099 @BeforeEach 100 public void setUp() throws Exception { 101 LOG.debug("Test setup: isMultiPut=" + isMultiPut); 102 TEST_UTIL.startMiniCluster(NUM_RS); 103 } 104 105 @AfterEach 106 public void tearDown() throws Exception { 107 LOG.debug("Test teardown: isMultiPut=" + isMultiPut); 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 111 protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen, 112 Configuration conf, TableName tableName, double verifyPercent) throws IOException { 113 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); 114 return reader; 115 } 116 117 protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen, 118 Configuration conf, TableName tableName) throws IOException { 119 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName); 120 writer.setMultiPut(isMultiPut); 121 return writer; 122 } 123 124 @TestTemplate 125 public void loadTest() throws Exception { 126 prepareForLoadTest(); 127 runLoadTestOnExistingTable(); 128 } 129 130 protected void runLoadTestOnExistingTable() throws IOException { 131 writerThreads.start(0, numKeys, NUM_THREADS); 132 writerThreads.waitForFinish(); 133 assertEquals(0, writerThreads.getNumWriteFailures()); 134 135 readerThreads.start(0, numKeys, NUM_THREADS); 136 readerThreads.waitForFinish(); 137 assertEquals(0, readerThreads.getNumReadFailures()); 138 assertEquals(0, readerThreads.getNumReadErrors()); 139 assertEquals(numKeys, readerThreads.getNumKeysVerified()); 140 } 141 142 protected void createPreSplitLoadTestTable(TableDescriptor tableDescriptor, 143 ColumnFamilyDescriptor familyDescriptor) throws IOException { 144 LoadTestUtil.createPreSplitLoadTestTable(conf, tableDescriptor, familyDescriptor); 145 TEST_UTIL.waitUntilAllRegionsAssigned(tableDescriptor.getTableName()); 146 } 147 148 protected void prepareForLoadTest() throws IOException { 149 LOG.info( 150 "Starting load test: dataBlockEncoding=" + dataBlockEncoding + ", isMultiPut=" + isMultiPut); 151 numKeys = numKeys(); 152 Admin admin = TEST_UTIL.getAdmin(); 153 while ( 154 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size() 155 < NUM_RS 156 ) { 157 LOG.info("Sleeping until " + NUM_RS + " RSs are online"); 158 Threads.sleepWithoutInterrupt(1000); 159 } 160 admin.close(); 161 162 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE).build(); 163 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(CF) 164 .setCompressionType(compression).setDataBlockEncoding(dataBlockEncoding).build(); 165 createPreSplitLoadTestTable(tableDescriptor, familyDescriptor); 166 167 LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); 168 writerThreads = prepareWriterThreads(dataGen, conf, TABLE); 169 readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100); 170 } 171 172 protected int numKeys() { 173 return 1000; 174 } 175 176 protected ColumnFamilyDescriptor getColumnDesc(Admin admin) 177 throws TableNotFoundException, IOException { 178 return admin.getDescriptor(TABLE).getColumnFamily(CF); 179 } 180 181}