001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.lang.reflect.Constructor; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.atomic.AtomicReference; 028import javax.crypto.spec.SecretKeySpec; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Durability; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.crypto.Cipher; 043import org.apache.hadoop.hbase.io.crypto.Encryption; 044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 045import org.apache.hadoop.hbase.log.HBaseMarkers; 046import org.apache.hadoop.hbase.logging.Log4jUtils; 047import org.apache.hadoop.hbase.regionserver.BloomType; 048import org.apache.hadoop.hbase.security.EncryptionUtil; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.security.access.AccessControlClient; 051import org.apache.hadoop.hbase.security.access.Permission; 052import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 053import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; 054import org.apache.hadoop.util.ToolRunner; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.apache.zookeeper.ZooKeeper; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException; 061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 062import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 063import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 064import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 065import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 067 068/** 069 * A command-line utility that reads, writes, and verifies data. Unlike 070 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, and 071 * supports simultaneously writing and reading the same set of keys. 072 */ 073@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 074public class LoadTestTool extends AbstractHBaseTool { 075 076 private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class); 077 private static final String COLON = ":"; 078 079 /** Table name for the test */ 080 private TableName tableName; 081 082 /** Column families for the test */ 083 private byte[][] families; 084 /** Column family used by the test */ 085 private static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf"); 086 /** Column families used by the test */ 087 private static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY }; 088 089 /** Table name to use of not overridden on the command line */ 090 protected static final String DEFAULT_TABLE_NAME = "cluster_test"; 091 092 /** The default data size if not specified */ 093 protected static final int DEFAULT_DATA_SIZE = 64; 094 095 /** The number of reader/writer threads if not specified */ 096 protected static final int DEFAULT_NUM_THREADS = 20; 097 098 /** Usage string for the load option */ 099 protected static final String OPT_USAGE_LOAD = 100 "<avg_cols_per_key>:<avg_data_size>" + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 101 102 /** Usage string for the read option */ 103 protected static final String OPT_USAGE_READ = 104 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 105 106 /** Usage string for the update option */ 107 protected static final String OPT_USAGE_UPDATE = "<update_percent>[:<#threads=" 108 + DEFAULT_NUM_THREADS + ">][:<#whether to ignore nonce collisions=0>]"; 109 110 protected static final String OPT_USAGE_BLOOM = 111 "Bloom filter type, one of " + Arrays.toString(BloomType.values()); 112 113 protected static final String OPT_USAGE_COMPRESSION = 114 "Compression type, " + "one of " + Arrays.toString(Compression.Algorithm.values()); 115 116 protected static final String OPT_VERBOSE = "verbose"; 117 118 public static final String OPT_BLOOM = "bloom"; 119 public static final String OPT_BLOOM_PARAM = "bloom_param"; 120 public static final String OPT_COMPRESSION = "compression"; 121 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; 122 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; 123 124 public static final String OPT_INMEMORY = "in_memory"; 125 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " 126 + "inmemory as far as possible. Not guaranteed that reads are always served from inmemory"; 127 128 public static final String OPT_GENERATOR = "generator"; 129 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." 130 + " Any args for this class can be passed as colon separated after class name"; 131 132 public static final String OPT_WRITER = "writer"; 133 public static final String OPT_WRITER_USAGE = "The class for executing the write requests"; 134 135 public static final String OPT_UPDATER = "updater"; 136 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests"; 137 138 public static final String OPT_READER = "reader"; 139 public static final String OPT_READER_USAGE = "The class for executing the read requests"; 140 141 protected static final String OPT_KEY_WINDOW = "key_window"; 142 protected static final String OPT_WRITE = "write"; 143 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; 144 public static final String OPT_MULTIPUT = "multiput"; 145 public static final String OPT_MULTIGET = "multiget_batchsize"; 146 protected static final String OPT_NUM_KEYS = "num_keys"; 147 protected static final String OPT_READ = "read"; 148 protected static final String OPT_START_KEY = "start_key"; 149 public static final String OPT_TABLE_NAME = "tn"; 150 public static final String OPT_COLUMN_FAMILIES = "families"; 151 protected static final String OPT_ZK_QUORUM = "zk"; 152 protected static final String OPT_ZK_PARENT_NODE = "zk_root"; 153 protected static final String OPT_SKIP_INIT = "skip_init"; 154 protected static final String OPT_INIT_ONLY = "init_only"; 155 protected static final String NUM_TABLES = "num_tables"; 156 protected static final String OPT_BATCHUPDATE = "batchupdate"; 157 protected static final String OPT_UPDATE = "update"; 158 159 public static final String OPT_ENCRYPTION = "encryption"; 160 protected static final String OPT_ENCRYPTION_USAGE = 161 "Enables transparent encryption on the test table, one of " 162 + Arrays.toString(Encryption.getSupportedCiphers()); 163 164 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; 165 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE = 166 "Desired number of regions per region server. Defaults to 5."; 167 public static final int DEFAULT_NUM_REGIONS_PER_SERVER = 5; 168 169 public static final String OPT_REGION_REPLICATION = "region_replication"; 170 protected static final String OPT_REGION_REPLICATION_USAGE = 171 "Desired number of replicas per region"; 172 173 public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; 174 protected static final String OPT_REGION_REPLICA_ID_USAGE = 175 "Region replica id to do the reads from"; 176 177 public static final String OPT_TIMELINE_CONSISTENCY = "timeline"; 178 protected static final String OPT_TIMELINE_CONSISTENCY_USAGE = 179 "Use TIMELINE consistency in read operations. Leave region_replica_id unset, otherwise it will override this setting."; 180 181 public static final String OPT_MOB_THRESHOLD = "mob_threshold"; 182 protected static final String OPT_MOB_THRESHOLD_USAGE = 183 "Desired cell size to exceed in bytes that will use the MOB write path"; 184 185 protected static final long DEFAULT_START_KEY = 0; 186 187 /** This will be removed as we factor out the dependency on command line */ 188 protected CommandLine cmd; 189 190 protected MultiThreadedWriter writerThreads = null; 191 protected MultiThreadedReader readerThreads = null; 192 protected MultiThreadedUpdater updaterThreads = null; 193 194 protected long startKey, endKey; 195 196 protected boolean isVerbose, isWrite, isRead, isUpdate; 197 protected boolean deferredLogFlush; 198 199 // Column family options 200 protected DataBlockEncoding dataBlockEncodingAlgo; 201 protected Compression.Algorithm compressAlgo; 202 protected BloomType bloomType; 203 private boolean inMemoryCF; 204 205 private User userOwner; 206 // Writer options 207 protected int numWriterThreads = DEFAULT_NUM_THREADS; 208 protected int minColsPerKey, maxColsPerKey; 209 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE; 210 protected boolean isMultiPut; 211 212 // Updater options 213 protected int numUpdaterThreads = DEFAULT_NUM_THREADS; 214 protected int updatePercent; 215 protected boolean ignoreConflicts = false; 216 protected boolean isBatchUpdate; 217 218 // Reader options 219 private int numReaderThreads = DEFAULT_NUM_THREADS; 220 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; 221 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; 222 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; 223 private int verifyPercent; 224 225 private int numTables = 1; 226 227 private String superUser; 228 229 private String userNames; 230 // This file is used to read authentication information in secure clusters. 231 private String authnFileName; 232 233 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 234 private int regionReplication = -1; // not set 235 private int regionReplicaId = -1; // not set 236 private boolean timelineConsistency = false; 237 238 private int mobThreshold = -1; // not set 239 240 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, 241 // console tool itself should only be used from console. 242 protected boolean isSkipInit = false; 243 protected boolean isInitOnly = false; 244 245 protected Cipher cipher = null; 246 247 protected String[] splitColonSeparated(String option, int minNumCols, int maxNumCols) { 248 String optVal = cmd.getOptionValue(option); 249 String[] cols = optVal.split(COLON); 250 if (cols.length < minNumCols || cols.length > maxNumCols) { 251 throw new IllegalArgumentException( 252 "Expected at least " + minNumCols + " columns but no more than " + maxNumCols 253 + " in the colon-separated value '" + optVal + "' of the " + "-" + option + " option"); 254 } 255 return cols; 256 } 257 258 protected int getNumThreads(String numThreadsStr) { 259 return parseInt(numThreadsStr, 1, Short.MAX_VALUE); 260 } 261 262 public byte[][] getColumnFamilies() { 263 return families; 264 } 265 266 /** 267 * Apply column family options such as Bloom filters, compression, and data block encoding. 268 */ 269 protected void applyColumnFamilyOptions(TableName tableName, byte[][] columnFamilies) 270 throws IOException { 271 try (Connection conn = ConnectionFactory.createConnection(conf); 272 Admin admin = conn.getAdmin()) { 273 TableDescriptor tableDesc = admin.getDescriptor(tableName); 274 LOG.info("Disabling table " + tableName); 275 admin.disableTable(tableName); 276 for (byte[] cf : columnFamilies) { 277 ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf); 278 boolean isNewCf = columnDesc == null; 279 ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf 280 ? ColumnFamilyDescriptorBuilder.newBuilder(cf) 281 : ColumnFamilyDescriptorBuilder.newBuilder(columnDesc); 282 if (bloomType != null) { 283 columnDescBuilder.setBloomFilterType(bloomType); 284 } 285 if (compressAlgo != null) { 286 columnDescBuilder.setCompressionType(compressAlgo); 287 } 288 if (dataBlockEncodingAlgo != null) { 289 columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo); 290 } 291 if (inMemoryCF) { 292 columnDescBuilder.setInMemory(inMemoryCF); 293 } 294 if (cipher != null) { 295 byte[] keyBytes = new byte[cipher.getKeyLength()]; 296 Bytes.secureRandom(keyBytes); 297 columnDescBuilder.setEncryptionType(cipher.getName()); 298 columnDescBuilder.setEncryptionKey(EncryptionUtil.wrapKey(conf, 299 User.getCurrent().getShortName(), new SecretKeySpec(keyBytes, cipher.getName()))); 300 } 301 if (mobThreshold >= 0) { 302 columnDescBuilder.setMobEnabled(true); 303 columnDescBuilder.setMobThreshold(mobThreshold); 304 } 305 306 if (isNewCf) { 307 admin.addColumnFamily(tableName, columnDescBuilder.build()); 308 } else { 309 admin.modifyColumnFamily(tableName, columnDescBuilder.build()); 310 } 311 } 312 LOG.info("Enabling table " + tableName); 313 admin.enableTable(tableName); 314 } 315 } 316 317 @Override 318 protected void addOptions() { 319 addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper"); 320 addOptWithArg(OPT_ZK_QUORUM, 321 "ZK quorum as comma-separated host names " + "without port numbers"); 322 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); 323 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); 324 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma"); 325 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); 326 addOptWithArg(OPT_READ, OPT_USAGE_READ); 327 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); 328 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); 329 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); 330 addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type"); 331 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); 332 addOptWithArg(LoadTestUtil.OPT_DATA_BLOCK_ENCODING, LoadTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); 333 addOptWithArg(OPT_MAX_READ_ERRORS, 334 "The maximum number of read errors " 335 + "to tolerate before terminating all reader threads. The default is " 336 + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); 337 addOptWithArg(OPT_MULTIGET, 338 "Whether to use multi-gets as opposed to " + "separate gets for every column in a row"); 339 addOptWithArg(OPT_KEY_WINDOW, 340 "The 'key window' to maintain between " 341 + "reads and writes for concurrent write/read workload. The default " + "is " 342 + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); 343 344 addOptNoArg(OPT_MULTIPUT, 345 "Whether to use multi-puts as opposed to " + "separate puts for every column in a row"); 346 addOptNoArg(OPT_BATCHUPDATE, 347 "Whether to use batch as opposed to " + "separate updates for every column in a row"); 348 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); 349 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); 350 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE); 351 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE); 352 addOptWithArg(OPT_READER, OPT_READER_USAGE); 353 354 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); 355 addOptWithArg(OPT_START_KEY, "The first key to read/write " 356 + "(a 0-based index). The default value is " + DEFAULT_START_KEY + "."); 357 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists"); 358 359 addOptWithArg(NUM_TABLES, 360 "A positive integer number. When a number n is specified, load test " 361 + "tool will load n table parallely. -tn parameter value becomes " 362 + "table name prefix. Each table name is in format <tn>_1...<tn>_n"); 363 364 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE); 365 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); 366 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); 367 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); 368 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); 369 addOptNoArg(OPT_TIMELINE_CONSISTENCY, OPT_TIMELINE_CONSISTENCY_USAGE); 370 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE); 371 } 372 373 @Override 374 protected CommandLineParser newParser() { 375 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves 376 // Validate in parse() to get helpful error messages instead of exploding in processOptions() 377 return new DefaultParser() { 378 @Override 379 public CommandLine parse(Options opts, String[] args, Properties props, boolean stop) 380 throws ParseException { 381 CommandLine cl = super.parse(opts, args, props, stop); 382 383 boolean isReadWriteUpdate = 384 cmd.hasOption(OPT_READ) || cmd.hasOption(OPT_WRITE) || cmd.hasOption(OPT_UPDATE); 385 boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 386 387 if (!isInitOnly && !isReadWriteUpdate) { 388 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY 389 + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 390 } 391 392 if (isInitOnly && isReadWriteUpdate) { 393 throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -" 394 + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 395 } 396 397 if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) { 398 throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode."); 399 } 400 401 return cl; 402 } 403 }; 404 } 405 406 @Override 407 protected void processOptions(CommandLine cmd) { 408 this.cmd = cmd; 409 410 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME)); 411 412 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) { 413 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(","); 414 families = new byte[list.length][]; 415 for (int i = 0; i < list.length; i++) { 416 families[i] = Bytes.toBytes(list[i]); 417 } 418 } else { 419 families = DEFAULT_COLUMN_FAMILIES; 420 } 421 422 isVerbose = cmd.hasOption(OPT_VERBOSE); 423 isWrite = cmd.hasOption(OPT_WRITE); 424 isRead = cmd.hasOption(OPT_READ); 425 isUpdate = cmd.hasOption(OPT_UPDATE); 426 isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 427 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH); 428 429 if (!isInitOnly) { 430 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, String.valueOf(DEFAULT_START_KEY)), 0, 431 Long.MAX_VALUE); 432 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, Long.MAX_VALUE - startKey); 433 endKey = startKey + numKeys; 434 isSkipInit = cmd.hasOption(OPT_SKIP_INIT); 435 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]"); 436 } 437 438 parseColumnFamilyOptions(cmd); 439 440 if (isWrite) { 441 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); 442 443 int colIndex = 0; 444 minColsPerKey = 1; 445 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]); 446 int avgColDataSize = parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); 447 minColDataSize = avgColDataSize / 2; 448 maxColDataSize = avgColDataSize * 3 / 2; 449 450 if (colIndex < writeOpts.length) { 451 numWriterThreads = getNumThreads(writeOpts[colIndex++]); 452 } 453 454 isMultiPut = cmd.hasOption(OPT_MULTIPUT); 455 456 mobThreshold = -1; 457 if (cmd.hasOption(OPT_MOB_THRESHOLD)) { 458 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD)); 459 } 460 461 System.out.println("Multi-puts: " + isMultiPut); 462 System.out.println("Columns per key: " + minColsPerKey + ".." + maxColsPerKey); 463 System.out.println("Data size per column: " + minColDataSize + ".." + maxColDataSize); 464 } 465 466 if (isUpdate) { 467 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3); 468 int colIndex = 0; 469 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100); 470 if (colIndex < mutateOpts.length) { 471 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]); 472 } 473 if (colIndex < mutateOpts.length) { 474 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1; 475 } 476 477 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE); 478 479 System.out.println("Batch updates: " + isBatchUpdate); 480 System.out.println("Percent of keys to update: " + updatePercent); 481 System.out.println("Updater threads: " + numUpdaterThreads); 482 System.out.println("Ignore nonce conflicts: " + ignoreConflicts); 483 } 484 485 if (isRead) { 486 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); 487 int colIndex = 0; 488 verifyPercent = parseInt(readOpts[colIndex++], 0, 100); 489 if (colIndex < readOpts.length) { 490 numReaderThreads = getNumThreads(readOpts[colIndex++]); 491 } 492 493 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { 494 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 0, Integer.MAX_VALUE); 495 } 496 497 if (cmd.hasOption(OPT_KEY_WINDOW)) { 498 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 0, Integer.MAX_VALUE); 499 } 500 501 if (cmd.hasOption(OPT_MULTIGET)) { 502 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 0, Integer.MAX_VALUE); 503 } 504 505 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize); 506 System.out.println("Percent of keys to verify: " + verifyPercent); 507 System.out.println("Reader threads: " + numReaderThreads); 508 } 509 510 numTables = 1; 511 if (cmd.hasOption(NUM_TABLES)) { 512 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); 513 } 514 515 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 516 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) { 517 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER)); 518 } 519 520 regionReplication = 1; 521 if (cmd.hasOption(OPT_REGION_REPLICATION)) { 522 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); 523 } 524 525 regionReplicaId = -1; 526 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { 527 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); 528 } 529 530 timelineConsistency = false; 531 if (cmd.hasOption(OPT_TIMELINE_CONSISTENCY)) { 532 timelineConsistency = true; 533 } 534 } 535 536 private void parseColumnFamilyOptions(CommandLine cmd) { 537 String dataBlockEncodingStr = cmd.getOptionValue(LoadTestUtil.OPT_DATA_BLOCK_ENCODING); 538 dataBlockEncodingAlgo = 539 dataBlockEncodingStr == null ? null : DataBlockEncoding.valueOf(dataBlockEncodingStr); 540 541 String compressStr = cmd.getOptionValue(OPT_COMPRESSION); 542 compressAlgo = 543 compressStr == null ? Compression.Algorithm.NONE : Compression.Algorithm.valueOf(compressStr); 544 545 String bloomStr = cmd.getOptionValue(OPT_BLOOM); 546 bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr); 547 548 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 549 if (!cmd.hasOption(OPT_BLOOM_PARAM)) { 550 LOG.error("the parameter of bloom filter {} is not specified", bloomType.name()); 551 } else { 552 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); 553 } 554 } 555 556 inMemoryCF = cmd.hasOption(OPT_INMEMORY); 557 if (cmd.hasOption(OPT_ENCRYPTION)) { 558 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); 559 } 560 561 } 562 563 public void initTestTable() throws IOException { 564 Durability durability = Durability.USE_DEFAULT; 565 if (deferredLogFlush) { 566 durability = Durability.ASYNC_WAL; 567 } 568 569 LoadTestUtil.createPreSplitLoadTestTable(conf, tableName, getColumnFamilies(), compressAlgo, 570 dataBlockEncodingAlgo, numRegionsPerServer, regionReplication, durability); 571 applyColumnFamilyOptions(tableName, getColumnFamilies()); 572 } 573 574 @Override 575 protected int doWork() throws IOException { 576 if (!isVerbose) { 577 Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN"); 578 } 579 if (numTables > 1) { 580 return parallelLoadTables(); 581 } else { 582 return loadTable(); 583 } 584 } 585 586 protected int loadTable() throws IOException { 587 if (cmd.hasOption(OPT_ZK_QUORUM)) { 588 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); 589 } 590 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) { 591 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE)); 592 } 593 594 if (isInitOnly) { 595 LOG.info("Initializing only; no reads or writes"); 596 initTestTable(); 597 return 0; 598 } 599 600 if (!isSkipInit) { 601 initTestTable(); 602 } 603 LoadTestDataGenerator dataGen = null; 604 if (cmd.hasOption(OPT_GENERATOR)) { 605 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); 606 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); 607 String[] args; 608 if (dataGen instanceof LoadTestDataGeneratorWithACL) { 609 LOG.info("Using LoadTestDataGeneratorWithACL"); 610 if (User.isHBaseSecurityEnabled(conf)) { 611 LOG.info("Security is enabled"); 612 authnFileName = clazzAndArgs[1]; 613 superUser = clazzAndArgs[2]; 614 userNames = clazzAndArgs[3]; 615 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length); 616 Properties authConfig = new Properties(); 617 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName)); 618 try { 619 addAuthInfoToConf(authConfig, conf, superUser, userNames); 620 } catch (IOException exp) { 621 LOG.error(exp.toString(), exp); 622 return EXIT_FAILURE; 623 } 624 userOwner = User.create(KerberosUtils.loginAndReturnUGI(conf, superUser)); 625 } else { 626 superUser = clazzAndArgs[1]; 627 userNames = clazzAndArgs[2]; 628 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 629 userOwner = User.createUserForTesting(conf, superUser, new String[0]); 630 } 631 } else { 632 args = clazzAndArgs.length == 1 633 ? new String[0] 634 : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 635 } 636 dataGen.initialize(args); 637 } else { 638 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator 639 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, 640 minColsPerKey, maxColsPerKey, families); 641 } 642 643 if (userOwner != null) { 644 LOG.info("Granting permissions for user " + userOwner.getShortName()); 645 Permission.Action[] actions = { Permission.Action.ADMIN, Permission.Action.CREATE, 646 Permission.Action.READ, Permission.Action.WRITE }; 647 try { 648 AccessControlClient.grant(ConnectionFactory.createConnection(conf), tableName, 649 userOwner.getShortName(), null, null, actions); 650 } catch (Throwable e) { 651 LOG.error(HBaseMarkers.FATAL, 652 "Error in granting permission for the user " + userOwner.getShortName(), e); 653 return EXIT_FAILURE; 654 } 655 } 656 657 if (userNames != null) { 658 // This will be comma separated list of expressions. 659 String users[] = userNames.split(","); 660 User user = null; 661 for (String userStr : users) { 662 if (User.isHBaseSecurityEnabled(conf)) { 663 user = User.create(KerberosUtils.loginAndReturnUGI(conf, userStr)); 664 } else { 665 user = User.createUserForTesting(conf, userStr, new String[0]); 666 } 667 } 668 } 669 670 if (isWrite) { 671 if (userOwner != null) { 672 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); 673 } else { 674 String writerClass = null; 675 if (cmd.hasOption(OPT_WRITER)) { 676 writerClass = cmd.getOptionValue(OPT_WRITER); 677 } else { 678 writerClass = MultiThreadedWriter.class.getCanonicalName(); 679 } 680 681 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen); 682 } 683 writerThreads.setMultiPut(isMultiPut); 684 } 685 686 if (isUpdate) { 687 if (userOwner != null) { 688 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, 689 userOwner, userNames); 690 } else { 691 String updaterClass = null; 692 if (cmd.hasOption(OPT_UPDATER)) { 693 updaterClass = cmd.getOptionValue(OPT_UPDATER); 694 } else { 695 updaterClass = MultiThreadedUpdater.class.getCanonicalName(); 696 } 697 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen); 698 } 699 updaterThreads.setBatchUpdate(isBatchUpdate); 700 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); 701 } 702 703 if (isRead) { 704 if (userOwner != null) { 705 readerThreads = 706 new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames); 707 } else { 708 String readerClass = null; 709 if (cmd.hasOption(OPT_READER)) { 710 readerClass = cmd.getOptionValue(OPT_READER); 711 } else { 712 readerClass = MultiThreadedReader.class.getCanonicalName(); 713 } 714 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); 715 } 716 readerThreads.setMaxErrors(maxReadErrors); 717 readerThreads.setKeyWindow(keyWindow); 718 readerThreads.setMultiGetBatchSize(multiGetBatchSize); 719 readerThreads.setRegionReplicaId(regionReplicaId); 720 readerThreads.setTimelineConsistency(timelineConsistency); 721 } 722 723 if (isUpdate && isWrite) { 724 LOG.info("Concurrent write/update workload: making updaters aware of the " + "write point"); 725 updaterThreads.linkToWriter(writerThreads); 726 } 727 728 if (isRead && (isUpdate || isWrite)) { 729 LOG.info("Concurrent write/read workload: making readers aware of the " + "write point"); 730 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads); 731 } 732 733 if (isWrite) { 734 System.out.println("Starting to write data..."); 735 writerThreads.start(startKey, endKey, numWriterThreads); 736 } 737 738 if (isUpdate) { 739 LOG.info("Starting to mutate data..."); 740 System.out.println("Starting to mutate data..."); 741 // TODO : currently append and increment operations not tested with tags 742 // Will update this after it is done 743 updaterThreads.start(startKey, endKey, numUpdaterThreads); 744 } 745 746 if (isRead) { 747 System.out.println("Starting to read data..."); 748 readerThreads.start(startKey, endKey, numReaderThreads); 749 } 750 751 if (isWrite) { 752 writerThreads.waitForFinish(); 753 } 754 755 if (isUpdate) { 756 updaterThreads.waitForFinish(); 757 } 758 759 if (isRead) { 760 readerThreads.waitForFinish(); 761 } 762 763 boolean success = true; 764 if (isWrite) { 765 success = success && writerThreads.getNumWriteFailures() == 0; 766 } 767 if (isUpdate) { 768 success = success && updaterThreads.getNumWriteFailures() == 0; 769 } 770 if (isRead) { 771 success = 772 success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0; 773 } 774 return success ? EXIT_SUCCESS : EXIT_FAILURE; 775 } 776 777 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException { 778 try { 779 Class<?> clazz = Class.forName(clazzName); 780 Constructor<?> constructor = 781 clazz.getConstructor(int.class, int.class, int.class, int.class, byte[][].class); 782 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, 783 minColsPerKey, maxColsPerKey, families); 784 } catch (Exception e) { 785 throw new IOException(e); 786 } 787 } 788 789 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName, 790 LoadTestDataGenerator dataGen) throws IOException { 791 try { 792 Class<?> clazz = Class.forName(clazzName); 793 Constructor<?> constructor = 794 clazz.getConstructor(LoadTestDataGenerator.class, Configuration.class, TableName.class); 795 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName); 796 } catch (Exception e) { 797 throw new IOException(e); 798 } 799 } 800 801 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName, 802 LoadTestDataGenerator dataGen) throws IOException { 803 try { 804 Class<?> clazz = Class.forName(clazzName); 805 Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class, 806 Configuration.class, TableName.class, double.class); 807 return (MultiThreadedUpdater) constructor.newInstance(dataGen, conf, tableName, 808 updatePercent); 809 } catch (Exception e) { 810 throw new IOException(e); 811 } 812 } 813 814 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName, 815 LoadTestDataGenerator dataGen) throws IOException { 816 try { 817 Class<?> clazz = Class.forName(clazzName); 818 Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class, 819 Configuration.class, TableName.class, double.class); 820 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent); 821 } catch (Exception e) { 822 throw new IOException(e); 823 } 824 } 825 826 public static void main(String[] args) { 827 new LoadTestTool().doStaticMain(args); 828 } 829 830 /** 831 * When NUM_TABLES is specified, the function starts multiple worker threads which individually 832 * start a LoadTestTool instance to load a table. Each table name is in format <tn>_<index>. 833 * For example, "-tn test -num_tables 2" , table names will be "test_1", "test_2" 834 * @throws IOException if one of the load tasks is unable to complete 835 */ 836 private int parallelLoadTables() throws IOException { 837 // create new command args 838 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); 839 String[] newArgs = null; 840 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { 841 newArgs = new String[cmdLineArgs.length + 2]; 842 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; 843 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME; 844 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length); 845 } else { 846 newArgs = cmdLineArgs; 847 } 848 849 int tableNameValueIndex = -1; 850 for (int j = 0; j < newArgs.length; j++) { 851 if (newArgs[j].endsWith(OPT_TABLE_NAME)) { 852 tableNameValueIndex = j + 1; 853 } else if (newArgs[j].endsWith(NUM_TABLES)) { 854 // change NUM_TABLES to 1 so that each worker loads one table 855 newArgs[j + 1] = "1"; 856 } 857 } 858 859 // starting to load multiple tables 860 List<WorkerThread> workers = new ArrayList<>(); 861 for (int i = 0; i < numTables; i++) { 862 String[] workerArgs = newArgs.clone(); 863 workerArgs[tableNameValueIndex] = tableName + "_" + (i + 1); 864 WorkerThread worker = new WorkerThread(i, workerArgs); 865 workers.add(worker); 866 LOG.info(worker + " starting"); 867 worker.start(); 868 } 869 870 // wait for all workers finish 871 LOG.info("Waiting for worker threads to finish"); 872 for (WorkerThread t : workers) { 873 try { 874 t.join(); 875 } catch (InterruptedException ie) { 876 IOException iie = new InterruptedIOException(); 877 iie.initCause(ie); 878 throw iie; 879 } 880 checkForErrors(); 881 } 882 883 return EXIT_SUCCESS; 884 } 885 886 // If an exception is thrown by one of worker threads, it will be 887 // stored here. 888 protected AtomicReference<Throwable> thrown = new AtomicReference<>(); 889 890 private void workerThreadError(Throwable t) { 891 thrown.compareAndSet(null, t); 892 } 893 894 /** 895 * Check for errors in the writer threads. If any is found, rethrow it. 896 */ 897 private void checkForErrors() throws IOException { 898 Throwable thrown = this.thrown.get(); 899 if (thrown == null) return; 900 if (thrown instanceof IOException) { 901 throw (IOException) thrown; 902 } else { 903 throw new RuntimeException(thrown); 904 } 905 } 906 907 class WorkerThread extends Thread { 908 private String[] workerArgs; 909 910 WorkerThread(int i, String[] args) { 911 super("WorkerThread-" + i); 912 workerArgs = args; 913 } 914 915 @Override 916 public void run() { 917 try { 918 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); 919 if (ret != 0) { 920 throw new RuntimeException("LoadTestTool exit with non-zero return code."); 921 } 922 } catch (Exception ex) { 923 LOG.error("Error in worker thread", ex); 924 workerThreadError(ex); 925 } 926 } 927 } 928 929 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner, 930 String userList) throws IOException { 931 List<String> users = new ArrayList<>(Arrays.asList(userList.split(","))); 932 users.add(owner); 933 for (String user : users) { 934 String keyTabFileConfKey = "hbase." + user + ".keytab.file"; 935 String principalConfKey = "hbase." + user + ".kerberos.principal"; 936 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) { 937 throw new IOException("Authentication configs missing for user : " + user); 938 } 939 } 940 for (String key : authConfig.stringPropertyNames()) { 941 conf.set(key, authConfig.getProperty(key)); 942 } 943 LOG.debug("Added authentication properties to config successfully."); 944 } 945 946}