001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Set; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.testclassification.IntegrationTests; 026import org.apache.hadoop.hbase.util.Bytes; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.util.HFileTestUtil; 029import org.apache.hadoop.hbase.util.LoadTestTool; 030import org.apache.hadoop.hbase.util.Threads; 031import org.apache.hadoop.util.StringUtils; 032import org.apache.hadoop.util.ToolRunner; 033import org.junit.Assert; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 040import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 041 042/** 043 * A base class for tests that do something with the cluster while running {@link LoadTestTool} to 044 * write and verify some data. 045 */ 046@Category(IntegrationTests.class) 047public class IntegrationTestIngest extends IntegrationTestBase { 048 public static final char HIPHEN = '-'; 049 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster 050 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; 051 protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000; 052 053 /** A soft limit on how long we should run */ 054 protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; 055 056 protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server"; 057 protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500; 058 059 protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads"; 060 protected static final int DEFAULT_NUM_WRITE_THREADS = 20; 061 062 protected static final String NUM_READ_THREADS_KEY = "num_read_threads"; 063 protected static final int DEFAULT_NUM_READ_THREADS = 20; 064 065 // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected 066 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class); 067 068 protected IntegrationTestingUtility util; 069 protected HBaseCluster cluster; 070 protected LoadTestTool loadTool; 071 072 protected String[] LOAD_TEST_TOOL_INIT_ARGS = 073 { LoadTestTool.OPT_COLUMN_FAMILIES, LoadTestTool.OPT_COMPRESSION, 074 HFileTestUtil.OPT_DATA_BLOCK_ENCODING, LoadTestTool.OPT_INMEMORY, LoadTestTool.OPT_ENCRYPTION, 075 LoadTestTool.OPT_NUM_REGIONS_PER_SERVER, LoadTestTool.OPT_REGION_REPLICATION, }; 076 077 @Override 078 public void setUpCluster() throws Exception { 079 util = getTestingUtil(getConf()); 080 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); 081 util.initializeCluster(getMinServerCount()); 082 LOG.debug("Done initializing/checking cluster"); 083 cluster = util.getHBaseClusterInterface(); 084 deleteTableIfNecessary(); 085 loadTool = new LoadTestTool(); 086 loadTool.setConf(util.getConfiguration()); 087 // Initialize load test tool before we start breaking things; 088 // LoadTestTool init, even when it is a no-op, is very fragile. 089 initTable(); 090 } 091 092 protected int getMinServerCount() { 093 return SERVER_COUNT; 094 } 095 096 protected void initTable() throws IOException { 097 int ret = loadTool.run(getArgsForLoadTestToolInitTable()); 098 Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret); 099 } 100 101 @Override 102 public int runTestFromCommandLine() throws Exception { 103 internalRunIngestTest(DEFAULT_RUN_TIME); 104 return 0; 105 } 106 107 @Test 108 public void testIngest() throws Exception { 109 runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20); 110 } 111 112 protected void internalRunIngestTest(long runTime) throws Exception { 113 String clazz = this.getClass().getSimpleName(); 114 long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY), 115 DEFAULT_NUM_KEYS_PER_SERVER); 116 int numWriteThreads = 117 conf.getInt(String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS); 118 int numReadThreads = 119 conf.getInt(String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS); 120 runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads); 121 } 122 123 @Override 124 public TableName getTablename() { 125 String clazz = this.getClass().getSimpleName(); 126 return TableName 127 .valueOf(conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz)); 128 } 129 130 @Override 131 protected Set<String> getColumnFamilies() { 132 Set<String> families = Sets.newHashSet(); 133 String clazz = this.getClass().getSimpleName(); 134 // parse conf for getting the column famly names because LTT is not initialized yet. 135 String familiesString = 136 getConf().get(String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES)); 137 if (familiesString == null) { 138 for (byte[] family : HFileTestUtil.DEFAULT_COLUMN_FAMILIES) { 139 families.add(Bytes.toString(family)); 140 } 141 } else { 142 for (String family : Splitter.on(',').split(familiesString)) { 143 families.add(family); 144 } 145 } 146 147 return families; 148 } 149 150 private void deleteTableIfNecessary() throws IOException { 151 if (util.getAdmin().tableExists(getTablename())) { 152 util.deleteTable(getTablename()); 153 } 154 } 155 156 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, 157 int recordSize, int writeThreads, int readThreads) throws Exception { 158 159 LOG.info("Running ingest"); 160 LOG.info("Cluster size:" 161 + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 162 163 long start = EnvironmentEdgeManager.currentTime(); 164 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 165 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); 166 long startKey = 0; 167 168 long numKeys = getNumKeys(keysPerServerPerIter); 169 while (EnvironmentEdgeManager.currentTime() - start < 0.9 * runtime) { 170 LOG.info("Intended run time: " + (runtime / 60000) + " min, left:" 171 + ((runtime - (EnvironmentEdgeManager.currentTime() - start)) / 60000) + " min"); 172 173 int ret = loadTool.run(getArgsForLoadTestTool("-write", 174 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); 175 if (0 != ret) { 176 String errorMsg = "Load failed with error code " + ret; 177 LOG.error(errorMsg); 178 Assert.fail(errorMsg); 179 } 180 181 ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads), 182 startKey, numKeys)); 183 if (0 != ret) { 184 String errorMsg = "Update failed with error code " + ret; 185 LOG.error(errorMsg); 186 Assert.fail(errorMsg); 187 } 188 189 ret = loadTool.run( 190 getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys)); 191 if (0 != ret) { 192 String errorMsg = "Verification failed with error code " + ret; 193 LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging"); 194 Threads.sleep(1000 * 60); 195 ret = loadTool.run( 196 getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys)); 197 if (0 != ret) { 198 LOG.error("Rerun of Verification failed with error code " + ret); 199 } 200 Assert.fail(errorMsg); 201 } 202 startKey += numKeys; 203 } 204 } 205 206 protected String[] getArgsForLoadTestToolInitTable() { 207 List<String> args = new ArrayList<>(); 208 args.add("-tn"); 209 args.add(getTablename().getNameAsString()); 210 // pass all remaining args from conf with keys <test class name>.<load test tool arg> 211 String clazz = this.getClass().getSimpleName(); 212 for (String arg : LOAD_TEST_TOOL_INIT_ARGS) { 213 String val = conf.get(String.format("%s.%s", clazz, arg)); 214 if (val != null) { 215 args.add("-" + arg); 216 args.add(val); 217 } 218 } 219 args.add("-init_only"); 220 return args.toArray(new String[args.size()]); 221 } 222 223 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, 224 long numKeys) { 225 List<String> args = new ArrayList<>(11); 226 args.add("-tn"); 227 args.add(getTablename().getNameAsString()); 228 args.add("-families"); 229 args.add(getColumnFamiliesAsString()); 230 args.add(mode); 231 args.add(modeSpecificArg); 232 args.add("-start_key"); 233 args.add(String.valueOf(startKey)); 234 args.add("-num_keys"); 235 args.add(String.valueOf(numKeys)); 236 args.add("-skip_init"); 237 238 return args.toArray(new String[args.size()]); 239 } 240 241 private String getColumnFamiliesAsString() { 242 return StringUtils.join(",", getColumnFamilies()); 243 } 244 245 /** Estimates a data size based on the cluster size */ 246 protected long getNumKeys(long keysPerServer) throws IOException { 247 int numRegionServers = cluster.getClusterMetrics().getLiveServerMetrics().size(); 248 return keysPerServer * numRegionServers; 249 } 250 251 public static void main(String[] args) throws Exception { 252 Configuration conf = HBaseConfiguration.create(); 253 IntegrationTestingUtility.setUseDistributedCluster(conf); 254 int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args); 255 System.exit(ret); 256 } 257}