001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.List; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ClusterMetrics.Option; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.TableNotFoundException; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.io.compress.Compression; 038import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.MiscTests; 041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 042import org.junit.After; 043import org.junit.Before; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.junit.runner.RunWith; 048import org.junit.runners.Parameterized; 049import org.junit.runners.Parameterized.Parameters; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * A write/read/verify load test on a mini HBase cluster. Tests reading and then writing. 055 */ 056@Category({ MiscTests.class, MediumTests.class }) 057@RunWith(Parameterized.class) 058public class TestMiniClusterLoadSequential { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestMiniClusterLoadSequential.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestMiniClusterLoadSequential.class); 065 066 protected static final TableName TABLE = TableName.valueOf("load_test_tbl"); 067 protected static final byte[] CF = Bytes.toBytes("load_test_cf"); 068 protected static final int NUM_THREADS = 8; 069 protected static final int NUM_RS = 2; 070 protected static final int TIMEOUT_MS = 180000; 071 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 072 073 protected final Configuration conf = TEST_UTIL.getConfiguration(); 074 protected final boolean isMultiPut; 075 protected final DataBlockEncoding dataBlockEncoding; 076 077 protected MultiThreadedWriter writerThreads; 078 protected MultiThreadedReader readerThreads; 079 protected int numKeys; 080 081 protected Compression.Algorithm compression = Compression.Algorithm.NONE; 082 083 public TestMiniClusterLoadSequential(boolean isMultiPut, DataBlockEncoding dataBlockEncoding) { 084 this.isMultiPut = isMultiPut; 085 this.dataBlockEncoding = dataBlockEncoding; 086 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 087 088 // We don't want any region reassignments by the load balancer during the test. 089 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f); 090 } 091 092 @Parameters 093 public static Collection<Object[]> parameters() { 094 List<Object[]> parameters = new ArrayList<>(); 095 for (boolean multiPut : new boolean[] { false, true }) { 096 for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] { DataBlockEncoding.NONE, 097 DataBlockEncoding.PREFIX }) { 098 parameters.add(new Object[] { multiPut, dataBlockEncoding }); 099 } 100 } 101 return parameters; 102 } 103 104 @Before 105 public void setUp() throws Exception { 106 LOG.debug("Test setup: isMultiPut=" + isMultiPut); 107 TEST_UTIL.startMiniCluster(NUM_RS); 108 } 109 110 @After 111 public void tearDown() throws Exception { 112 LOG.debug("Test teardown: isMultiPut=" + isMultiPut); 113 TEST_UTIL.shutdownMiniCluster(); 114 } 115 116 protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen, 117 Configuration conf, TableName tableName, double verifyPercent) throws IOException { 118 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); 119 return reader; 120 } 121 122 protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen, 123 Configuration conf, TableName tableName) throws IOException { 124 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName); 125 writer.setMultiPut(isMultiPut); 126 return writer; 127 } 128 129 @Test 130 public void loadTest() throws Exception { 131 prepareForLoadTest(); 132 runLoadTestOnExistingTable(); 133 } 134 135 protected void runLoadTestOnExistingTable() throws IOException { 136 writerThreads.start(0, numKeys, NUM_THREADS); 137 writerThreads.waitForFinish(); 138 assertEquals(0, writerThreads.getNumWriteFailures()); 139 140 readerThreads.start(0, numKeys, NUM_THREADS); 141 readerThreads.waitForFinish(); 142 assertEquals(0, readerThreads.getNumReadFailures()); 143 assertEquals(0, readerThreads.getNumReadErrors()); 144 assertEquals(numKeys, readerThreads.getNumKeysVerified()); 145 } 146 147 protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd) 148 throws IOException { 149 HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd); 150 TEST_UTIL.waitUntilAllRegionsAssigned(htd.getTableName()); 151 } 152 153 protected void prepareForLoadTest() throws IOException { 154 LOG.info( 155 "Starting load test: dataBlockEncoding=" + dataBlockEncoding + ", isMultiPut=" + isMultiPut); 156 numKeys = numKeys(); 157 Admin admin = TEST_UTIL.getAdmin(); 158 while ( 159 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size() 160 < NUM_RS 161 ) { 162 LOG.info("Sleeping until " + NUM_RS + " RSs are online"); 163 Threads.sleepWithoutInterrupt(1000); 164 } 165 admin.close(); 166 167 HTableDescriptor htd = new HTableDescriptor(TABLE); 168 HColumnDescriptor hcd = new HColumnDescriptor(CF).setCompressionType(compression) 169 .setDataBlockEncoding(dataBlockEncoding); 170 createPreSplitLoadTestTable(htd, hcd); 171 172 LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); 173 writerThreads = prepareWriterThreads(dataGen, conf, TABLE); 174 readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100); 175 } 176 177 protected int numKeys() { 178 return 1000; 179 } 180 181 protected HColumnDescriptor getColumnDesc(Admin admin) 182 throws TableNotFoundException, IOException { 183 return admin.getTableDescriptor(TABLE).getFamily(CF); 184 } 185 186}