001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.List; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ClusterMetrics.Option; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.TableNotFoundException; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.io.compress.Compression; 038import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.MiscTests; 041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 042import org.junit.After; 043import org.junit.Before; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.junit.runner.RunWith; 048import org.junit.runners.Parameterized; 049import org.junit.runners.Parameterized.Parameters; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * A write/read/verify load test on a mini HBase cluster. Tests reading 055 * and then writing. 056 */ 057@Category({MiscTests.class, MediumTests.class}) 058@RunWith(Parameterized.class) 059public class TestMiniClusterLoadSequential { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestMiniClusterLoadSequential.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger( 066 TestMiniClusterLoadSequential.class); 067 068 protected static final TableName TABLE = 069 TableName.valueOf("load_test_tbl"); 070 protected static final byte[] CF = Bytes.toBytes("load_test_cf"); 071 protected static final int NUM_THREADS = 8; 072 protected static final int NUM_RS = 2; 073 protected static final int TIMEOUT_MS = 180000; 074 protected static final HBaseTestingUtility TEST_UTIL = 075 new HBaseTestingUtility(); 076 077 protected final Configuration conf = TEST_UTIL.getConfiguration(); 078 protected final boolean isMultiPut; 079 protected final DataBlockEncoding dataBlockEncoding; 080 081 protected MultiThreadedWriter writerThreads; 082 protected MultiThreadedReader readerThreads; 083 protected int numKeys; 084 085 protected Compression.Algorithm compression = Compression.Algorithm.NONE; 086 087 public TestMiniClusterLoadSequential(boolean isMultiPut, 088 DataBlockEncoding dataBlockEncoding) { 089 this.isMultiPut = isMultiPut; 090 this.dataBlockEncoding = dataBlockEncoding; 091 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 092 093 // We don't want any region reassignments by the load balancer during the test. 094 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f); 095 } 096 097 @Parameters 098 public static Collection<Object[]> parameters() { 099 List<Object[]> parameters = new ArrayList<>(); 100 for (boolean multiPut : new boolean[]{false, true}) { 101 for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] { 102 DataBlockEncoding.NONE, DataBlockEncoding.PREFIX }) { 103 parameters.add(new Object[]{multiPut, dataBlockEncoding}); 104 } 105 } 106 return parameters; 107 } 108 109 @Before 110 public void setUp() throws Exception { 111 LOG.debug("Test setup: isMultiPut=" + isMultiPut); 112 TEST_UTIL.startMiniCluster(NUM_RS); 113 } 114 115 @After 116 public void tearDown() throws Exception { 117 LOG.debug("Test teardown: isMultiPut=" + isMultiPut); 118 TEST_UTIL.shutdownMiniCluster(); 119 } 120 121 protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen, 122 Configuration conf, TableName tableName, double verifyPercent) throws IOException { 123 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); 124 return reader; 125 } 126 127 protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen, 128 Configuration conf, TableName tableName) throws IOException { 129 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName); 130 writer.setMultiPut(isMultiPut); 131 return writer; 132 } 133 134 @Test 135 public void loadTest() throws Exception { 136 prepareForLoadTest(); 137 runLoadTestOnExistingTable(); 138 } 139 140 protected void runLoadTestOnExistingTable() throws IOException { 141 writerThreads.start(0, numKeys, NUM_THREADS); 142 writerThreads.waitForFinish(); 143 assertEquals(0, writerThreads.getNumWriteFailures()); 144 145 readerThreads.start(0, numKeys, NUM_THREADS); 146 readerThreads.waitForFinish(); 147 assertEquals(0, readerThreads.getNumReadFailures()); 148 assertEquals(0, readerThreads.getNumReadErrors()); 149 assertEquals(numKeys, readerThreads.getNumKeysVerified()); 150 } 151 152 protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd) 153 throws IOException { 154 HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd); 155 TEST_UTIL.waitUntilAllRegionsAssigned(htd.getTableName()); 156 } 157 158 protected void prepareForLoadTest() throws IOException { 159 LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding + 160 ", isMultiPut=" + isMultiPut); 161 numKeys = numKeys(); 162 Admin admin = TEST_UTIL.getAdmin(); 163 while (admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 164 .getLiveServerMetrics().size() < NUM_RS) { 165 LOG.info("Sleeping until " + NUM_RS + " RSs are online"); 166 Threads.sleepWithoutInterrupt(1000); 167 } 168 admin.close(); 169 170 HTableDescriptor htd = new HTableDescriptor(TABLE); 171 HColumnDescriptor hcd = new HColumnDescriptor(CF) 172 .setCompressionType(compression) 173 .setDataBlockEncoding(dataBlockEncoding); 174 createPreSplitLoadTestTable(htd, hcd); 175 176 LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); 177 writerThreads = prepareWriterThreads(dataGen, conf, TABLE); 178 readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100); 179 } 180 181 protected int numKeys() { 182 return 1000; 183 } 184 185 protected HColumnDescriptor getColumnDesc(Admin admin) 186 throws TableNotFoundException, IOException { 187 return admin.getTableDescriptor(TABLE).getFamily(CF); 188 } 189 190}