001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Set; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.testclassification.IntegrationTests; 027import org.apache.hadoop.hbase.util.Bytes; 028import org.apache.hadoop.hbase.util.HFileTestUtil; 029import org.apache.hadoop.hbase.util.LoadTestTool; 030import org.apache.hadoop.hbase.util.Threads; 031import org.apache.hadoop.util.StringUtils; 032import org.apache.hadoop.util.ToolRunner; 033import org.junit.Assert; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 039 040/** 041 * A base class for tests that do something with the cluster while running 042 * {@link LoadTestTool} to write and verify some data. 043 */ 044@Category(IntegrationTests.class) 045public class IntegrationTestIngest extends IntegrationTestBase { 046 public static final char HIPHEN = '-'; 047 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster 048 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; 049 protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000; 050 051 /** A soft limit on how long we should run */ 052 protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; 053 054 protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server"; 055 protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500; 056 057 protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads"; 058 protected static final int DEFAULT_NUM_WRITE_THREADS = 20; 059 060 protected static final String NUM_READ_THREADS_KEY = "num_read_threads"; 061 protected static final int DEFAULT_NUM_READ_THREADS = 20; 062 063 // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected 064 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class); 065 protected IntegrationTestingUtility util; 066 protected HBaseCluster cluster; 067 protected LoadTestTool loadTool; 068 069 protected String[] LOAD_TEST_TOOL_INIT_ARGS = { 070 LoadTestTool.OPT_COLUMN_FAMILIES, 071 LoadTestTool.OPT_COMPRESSION, 072 HFileTestUtil.OPT_DATA_BLOCK_ENCODING, 073 LoadTestTool.OPT_INMEMORY, 074 LoadTestTool.OPT_ENCRYPTION, 075 LoadTestTool.OPT_NUM_REGIONS_PER_SERVER, 076 LoadTestTool.OPT_REGION_REPLICATION, 077 }; 078 079 @Override 080 public void setUpCluster() throws Exception { 081 util = getTestingUtil(getConf()); 082 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); 083 util.initializeCluster(getMinServerCount()); 084 LOG.debug("Done initializing/checking cluster"); 085 cluster = util.getHBaseClusterInterface(); 086 deleteTableIfNecessary(); 087 loadTool = new LoadTestTool(); 088 loadTool.setConf(util.getConfiguration()); 089 // Initialize load test tool before we start breaking things; 090 // LoadTestTool init, even when it is a no-op, is very fragile. 091 initTable(); 092 } 093 094 protected int getMinServerCount() { 095 return SERVER_COUNT; 096 } 097 098 protected void initTable() throws IOException { 099 int ret = loadTool.run(getArgsForLoadTestToolInitTable()); 100 Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret); 101 } 102 103 @Override 104 public int runTestFromCommandLine() throws Exception { 105 internalRunIngestTest(DEFAULT_RUN_TIME); 106 return 0; 107 } 108 109 @Test 110 public void testIngest() throws Exception { 111 runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20); 112 } 113 114 protected void internalRunIngestTest(long runTime) throws Exception { 115 String clazz = this.getClass().getSimpleName(); 116 long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY), 117 DEFAULT_NUM_KEYS_PER_SERVER); 118 int numWriteThreads = conf.getInt( 119 String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS); 120 int numReadThreads = conf.getInt( 121 String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS); 122 runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads); 123 } 124 125 @Override 126 public TableName getTablename() { 127 String clazz = this.getClass().getSimpleName(); 128 return TableName.valueOf( 129 conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz)); 130 } 131 132 @Override 133 protected Set<String> getColumnFamilies() { 134 Set<String> families = Sets.newHashSet(); 135 String clazz = this.getClass().getSimpleName(); 136 // parse conf for getting the column famly names because LTT is not initialized yet. 137 String familiesString = getConf().get( 138 String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES)); 139 if (familiesString == null) { 140 for (byte[] family : HFileTestUtil.DEFAULT_COLUMN_FAMILIES) { 141 families.add(Bytes.toString(family)); 142 } 143 } else { 144 for (String family : familiesString.split(",")) { 145 families.add(family); 146 } 147 } 148 149 return families; 150 } 151 152 private void deleteTableIfNecessary() throws IOException { 153 if (util.getAdmin().tableExists(getTablename())) { 154 util.deleteTable(getTablename()); 155 } 156 } 157 158 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, 159 int recordSize, int writeThreads, int readThreads) throws Exception { 160 161 LOG.info("Running ingest"); 162 LOG.info("Cluster size:" + util.getHBaseClusterInterface() 163 .getClusterMetrics().getLiveServerMetrics().size()); 164 165 long start = System.currentTimeMillis(); 166 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 167 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); 168 long startKey = 0; 169 170 long numKeys = getNumKeys(keysPerServerPerIter); 171 while (System.currentTimeMillis() - start < 0.9 * runtime) { 172 LOG.info("Intended run time: " + (runtime/60000) + " min, left:" + 173 ((runtime - (System.currentTimeMillis() - start))/60000) + " min"); 174 175 int ret = -1; 176 ret = loadTool.run(getArgsForLoadTestTool("-write", 177 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); 178 if (0 != ret) { 179 String errorMsg = "Load failed with error code " + ret; 180 LOG.error(errorMsg); 181 Assert.fail(errorMsg); 182 } 183 184 ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads), 185 startKey, numKeys)); 186 if (0 != ret) { 187 String errorMsg = "Update failed with error code " + ret; 188 LOG.error(errorMsg); 189 Assert.fail(errorMsg); 190 } 191 192 ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads) 193 , startKey, numKeys)); 194 if (0 != ret) { 195 String errorMsg = "Verification failed with error code " + ret; 196 LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging"); 197 Threads.sleep(1000 * 60); 198 ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads) 199 , startKey, numKeys)); 200 if (0 != ret) { 201 LOG.error("Rerun of Verification failed with error code " + ret); 202 } 203 Assert.fail(errorMsg); 204 } 205 startKey += numKeys; 206 } 207 } 208 209 protected String[] getArgsForLoadTestToolInitTable() { 210 List<String> args = new ArrayList<>(); 211 args.add("-tn"); 212 args.add(getTablename().getNameAsString()); 213 // pass all remaining args from conf with keys <test class name>.<load test tool arg> 214 String clazz = this.getClass().getSimpleName(); 215 for (String arg : LOAD_TEST_TOOL_INIT_ARGS) { 216 String val = conf.get(String.format("%s.%s", clazz, arg)); 217 if (val != null) { 218 args.add("-" + arg); 219 args.add(val); 220 } 221 } 222 args.add("-init_only"); 223 return args.toArray(new String[args.size()]); 224 } 225 226 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, 227 long numKeys) { 228 List<String> args = new ArrayList<>(11); 229 args.add("-tn"); 230 args.add(getTablename().getNameAsString()); 231 args.add("-families"); 232 args.add(getColumnFamiliesAsString()); 233 args.add(mode); 234 args.add(modeSpecificArg); 235 args.add("-start_key"); 236 args.add(String.valueOf(startKey)); 237 args.add("-num_keys"); 238 args.add(String.valueOf(numKeys)); 239 args.add("-skip_init"); 240 241 return args.toArray(new String[args.size()]); 242 } 243 244 private String getColumnFamiliesAsString() { 245 return StringUtils.join(",", getColumnFamilies()); 246 } 247 248 /** Estimates a data size based on the cluster size */ 249 protected long getNumKeys(long keysPerServer) 250 throws IOException { 251 int numRegionServers = cluster.getClusterMetrics().getLiveServerMetrics().size(); 252 return keysPerServer * numRegionServers; 253 } 254 255 public static void main(String[] args) throws Exception { 256 Configuration conf = HBaseConfiguration.create(); 257 IntegrationTestingUtility.setUseDistributedCluster(conf); 258 int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args); 259 System.exit(ret); 260 } 261}