001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.util; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.lang.reflect.Constructor; 022import java.security.SecureRandom; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.atomic.AtomicReference; 028 029import javax.crypto.spec.SecretKeySpec; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseInterfaceAudience; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.log4j.Level; 040import org.apache.log4j.LogManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.zookeeper.ZooKeeper; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Durability; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.crypto.Cipher; 052import org.apache.hadoop.hbase.io.crypto.Encryption; 053import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 054import org.apache.hadoop.hbase.log.HBaseMarkers; 055import org.apache.hadoop.hbase.regionserver.BloomType; 056import org.apache.hadoop.hbase.security.EncryptionUtil; 057import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 058import org.apache.hadoop.hbase.security.User; 059import org.apache.hadoop.hbase.security.access.AccessControlClient; 060import org.apache.hadoop.hbase.security.access.Permission; 061import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 062import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; 063import org.apache.hadoop.util.ToolRunner; 064 065import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 067import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 068import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 069import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 070import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 071import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 072 073/** 074 * A command-line utility that reads, writes, and verifies data. Unlike 075 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, 076 * and supports simultaneously writing and reading the same set of keys. 077 */ 078@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 079public class LoadTestTool extends AbstractHBaseTool { 080 081 private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class); 082 private static final String COLON = ":"; 083 084 /** Table name for the test */ 085 private TableName tableName; 086 087 /** Column families for the test */ 088 private byte[][] families; 089 090 /** Table name to use of not overridden on the command line */ 091 protected static final String DEFAULT_TABLE_NAME = "cluster_test"; 092 093 /** The default data size if not specified */ 094 protected static final int DEFAULT_DATA_SIZE = 64; 095 096 /** The number of reader/writer threads if not specified */ 097 protected static final int DEFAULT_NUM_THREADS = 20; 098 099 /** Usage string for the load option */ 100 protected static final String OPT_USAGE_LOAD = 101 "<avg_cols_per_key>:<avg_data_size>" + 102 "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 103 104 /** Usage string for the read option */ 105 protected static final String OPT_USAGE_READ = 106 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 107 108 /** Usage string for the update option */ 109 protected static final String OPT_USAGE_UPDATE = 110 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS 111 + ">][:<#whether to ignore nonce collisions=0>]"; 112 113 protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " + 114 Arrays.toString(BloomType.values()); 115 116 protected static final String OPT_USAGE_COMPRESSION = "Compression type, " + 117 "one of " + Arrays.toString(Compression.Algorithm.values()); 118 119 protected static final String OPT_VERBOSE = "verbose"; 120 121 public static final String OPT_BLOOM = "bloom"; 122 public static final String OPT_COMPRESSION = "compression"; 123 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; 124 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; 125 126 public static final String OPT_INMEMORY = "in_memory"; 127 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " + 128 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory"; 129 130 public static final String OPT_GENERATOR = "generator"; 131 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." 132 + " Any args for this class can be passed as colon separated after class name"; 133 134 public static final String OPT_WRITER = "writer"; 135 public static final String OPT_WRITER_USAGE = "The class for executing the write requests"; 136 137 public static final String OPT_UPDATER = "updater"; 138 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests"; 139 140 public static final String OPT_READER = "reader"; 141 public static final String OPT_READER_USAGE = "The class for executing the read requests"; 142 143 protected static final String OPT_KEY_WINDOW = "key_window"; 144 protected static final String OPT_WRITE = "write"; 145 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; 146 public static final String OPT_MULTIPUT = "multiput"; 147 public static final String OPT_MULTIGET = "multiget_batchsize"; 148 protected static final String OPT_NUM_KEYS = "num_keys"; 149 protected static final String OPT_READ = "read"; 150 protected static final String OPT_START_KEY = "start_key"; 151 public static final String OPT_TABLE_NAME = "tn"; 152 public static final String OPT_COLUMN_FAMILIES = "families"; 153 protected static final String OPT_ZK_QUORUM = "zk"; 154 protected static final String OPT_ZK_PARENT_NODE = "zk_root"; 155 protected static final String OPT_SKIP_INIT = "skip_init"; 156 protected static final String OPT_INIT_ONLY = "init_only"; 157 protected static final String NUM_TABLES = "num_tables"; 158 protected static final String OPT_BATCHUPDATE = "batchupdate"; 159 protected static final String OPT_UPDATE = "update"; 160 161 public static final String OPT_ENCRYPTION = "encryption"; 162 protected static final String OPT_ENCRYPTION_USAGE = 163 "Enables transparent encryption on the test table, one of " + 164 Arrays.toString(Encryption.getSupportedCiphers()); 165 166 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; 167 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE 168 = "Desired number of regions per region server. Defaults to 5."; 169 public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5; 170 171 public static final String OPT_REGION_REPLICATION = "region_replication"; 172 protected static final String OPT_REGION_REPLICATION_USAGE = 173 "Desired number of replicas per region"; 174 175 public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; 176 protected static final String OPT_REGION_REPLICA_ID_USAGE = 177 "Region replica id to do the reads from"; 178 179 public static final String OPT_MOB_THRESHOLD = "mob_threshold"; 180 protected static final String OPT_MOB_THRESHOLD_USAGE = 181 "Desired cell size to exceed in bytes that will use the MOB write path"; 182 183 protected static final long DEFAULT_START_KEY = 0; 184 185 /** This will be removed as we factor out the dependency on command line */ 186 protected CommandLine cmd; 187 188 protected MultiThreadedWriter writerThreads = null; 189 protected MultiThreadedReader readerThreads = null; 190 protected MultiThreadedUpdater updaterThreads = null; 191 192 protected long startKey, endKey; 193 194 protected boolean isVerbose, isWrite, isRead, isUpdate; 195 protected boolean deferredLogFlush; 196 197 // Column family options 198 protected DataBlockEncoding dataBlockEncodingAlgo; 199 protected Compression.Algorithm compressAlgo; 200 protected BloomType bloomType; 201 private boolean inMemoryCF; 202 203 private User userOwner; 204 // Writer options 205 protected int numWriterThreads = DEFAULT_NUM_THREADS; 206 protected int minColsPerKey, maxColsPerKey; 207 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE; 208 protected boolean isMultiPut; 209 210 // Updater options 211 protected int numUpdaterThreads = DEFAULT_NUM_THREADS; 212 protected int updatePercent; 213 protected boolean ignoreConflicts = false; 214 protected boolean isBatchUpdate; 215 216 // Reader options 217 private int numReaderThreads = DEFAULT_NUM_THREADS; 218 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; 219 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; 220 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; 221 private int verifyPercent; 222 223 private int numTables = 1; 224 225 private String superUser; 226 227 private String userNames; 228 //This file is used to read authentication information in secure clusters. 229 private String authnFileName; 230 231 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 232 private int regionReplication = -1; // not set 233 private int regionReplicaId = -1; // not set 234 235 private int mobThreshold = -1; // not set 236 237 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, 238 // console tool itself should only be used from console. 239 protected boolean isSkipInit = false; 240 protected boolean isInitOnly = false; 241 242 protected Cipher cipher = null; 243 244 protected String[] splitColonSeparated(String option, 245 int minNumCols, int maxNumCols) { 246 String optVal = cmd.getOptionValue(option); 247 String[] cols = optVal.split(COLON); 248 if (cols.length < minNumCols || cols.length > maxNumCols) { 249 throw new IllegalArgumentException("Expected at least " 250 + minNumCols + " columns but no more than " + maxNumCols + 251 " in the colon-separated value '" + optVal + "' of the " + 252 "-" + option + " option"); 253 } 254 return cols; 255 } 256 257 protected int getNumThreads(String numThreadsStr) { 258 return parseInt(numThreadsStr, 1, Short.MAX_VALUE); 259 } 260 261 public byte[][] getColumnFamilies() { 262 return families; 263 } 264 265 /** 266 * Apply column family options such as Bloom filters, compression, and data 267 * block encoding. 268 */ 269 protected void applyColumnFamilyOptions(TableName tableName, 270 byte[][] columnFamilies) throws IOException { 271 try (Connection conn = ConnectionFactory.createConnection(conf); 272 Admin admin = conn.getAdmin()) { 273 TableDescriptor tableDesc = admin.getDescriptor(tableName); 274 LOG.info("Disabling table " + tableName); 275 admin.disableTable(tableName); 276 for (byte[] cf : columnFamilies) { 277 ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf); 278 boolean isNewCf = columnDesc == null; 279 ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf ? 280 ColumnFamilyDescriptorBuilder.newBuilder(cf) : 281 ColumnFamilyDescriptorBuilder.newBuilder(columnDesc); 282 if (bloomType != null) { 283 columnDescBuilder.setBloomFilterType(bloomType); 284 } 285 if (compressAlgo != null) { 286 columnDescBuilder.setCompressionType(compressAlgo); 287 } 288 if (dataBlockEncodingAlgo != null) { 289 columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo); 290 } 291 if (inMemoryCF) { 292 columnDescBuilder.setInMemory(inMemoryCF); 293 } 294 if (cipher != null) { 295 byte[] keyBytes = new byte[cipher.getKeyLength()]; 296 new SecureRandom().nextBytes(keyBytes); 297 columnDescBuilder.setEncryptionType(cipher.getName()); 298 columnDescBuilder.setEncryptionKey( 299 EncryptionUtil.wrapKey(conf, 300 User.getCurrent().getShortName(), 301 new SecretKeySpec(keyBytes, 302 cipher.getName()))); 303 } 304 if (mobThreshold >= 0) { 305 columnDescBuilder.setMobEnabled(true); 306 columnDescBuilder.setMobThreshold(mobThreshold); 307 } 308 309 if (isNewCf) { 310 admin.addColumnFamily(tableName, columnDescBuilder.build()); 311 } else { 312 admin.modifyColumnFamily(tableName, columnDescBuilder.build()); 313 } 314 } 315 LOG.info("Enabling table " + tableName); 316 admin.enableTable(tableName); 317 } 318 } 319 320 @Override 321 protected void addOptions() { 322 addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper"); 323 addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " + 324 "without port numbers"); 325 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); 326 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); 327 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma"); 328 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); 329 addOptWithArg(OPT_READ, OPT_USAGE_READ); 330 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); 331 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); 332 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); 333 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); 334 addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); 335 addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + 336 "to tolerate before terminating all reader threads. The default is " + 337 MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); 338 addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " + 339 "separate gets for every column in a row"); 340 addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " + 341 "reads and writes for concurrent write/read workload. The default " + 342 "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); 343 344 addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " + 345 "separate puts for every column in a row"); 346 addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " + 347 "separate updates for every column in a row"); 348 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); 349 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); 350 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE); 351 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE); 352 addOptWithArg(OPT_READER, OPT_READER_USAGE); 353 354 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); 355 addOptWithArg(OPT_START_KEY, "The first key to read/write " + 356 "(a 0-based index). The default value is " + 357 DEFAULT_START_KEY + "."); 358 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " 359 + "already exists"); 360 361 addOptWithArg(NUM_TABLES, 362 "A positive integer number. When a number n is specified, load test " 363 + "tool will load n table parallely. -tn parameter value becomes " 364 + "table name prefix. Each table name is in format <tn>_1...<tn>_n"); 365 366 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE); 367 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); 368 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); 369 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); 370 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); 371 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE); 372 } 373 374 @Override 375 protected CommandLineParser newParser() { 376 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves 377 // Validate in parse() to get helpful error messages instead of exploding in processOptions() 378 return new DefaultParser() { 379 @Override 380 public CommandLine parse(Options opts, String[] args, Properties props, boolean stop) 381 throws ParseException { 382 CommandLine cl = super.parse(opts, args, props, stop); 383 384 boolean isReadWriteUpdate = cmd.hasOption(OPT_READ) 385 || cmd.hasOption(OPT_WRITE) 386 || cmd.hasOption(OPT_UPDATE); 387 boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 388 389 if (!isInitOnly && !isReadWriteUpdate) { 390 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY 391 + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 392 } 393 394 if (isInitOnly && isReadWriteUpdate) { 395 throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -" 396 + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 397 } 398 399 if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) { 400 throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode."); 401 } 402 403 return cl; 404 } 405 }; 406 } 407 408 @Override 409 protected void processOptions(CommandLine cmd) { 410 this.cmd = cmd; 411 412 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, 413 DEFAULT_TABLE_NAME)); 414 415 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) { 416 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(","); 417 families = new byte[list.length][]; 418 for (int i = 0; i < list.length; i++) { 419 families[i] = Bytes.toBytes(list[i]); 420 } 421 } else { 422 families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES; 423 } 424 425 isVerbose = cmd.hasOption(OPT_VERBOSE); 426 isWrite = cmd.hasOption(OPT_WRITE); 427 isRead = cmd.hasOption(OPT_READ); 428 isUpdate = cmd.hasOption(OPT_UPDATE); 429 isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 430 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH); 431 432 if (!isInitOnly) { 433 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, 434 String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE); 435 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, 436 Long.MAX_VALUE - startKey); 437 endKey = startKey + numKeys; 438 isSkipInit = cmd.hasOption(OPT_SKIP_INIT); 439 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]"); 440 } 441 442 parseColumnFamilyOptions(cmd); 443 444 if (isWrite) { 445 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); 446 447 int colIndex = 0; 448 minColsPerKey = 1; 449 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]); 450 int avgColDataSize = 451 parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); 452 minColDataSize = avgColDataSize / 2; 453 maxColDataSize = avgColDataSize * 3 / 2; 454 455 if (colIndex < writeOpts.length) { 456 numWriterThreads = getNumThreads(writeOpts[colIndex++]); 457 } 458 459 isMultiPut = cmd.hasOption(OPT_MULTIPUT); 460 461 mobThreshold = -1; 462 if (cmd.hasOption(OPT_MOB_THRESHOLD)) { 463 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD)); 464 } 465 466 System.out.println("Multi-puts: " + isMultiPut); 467 System.out.println("Columns per key: " + minColsPerKey + ".." 468 + maxColsPerKey); 469 System.out.println("Data size per column: " + minColDataSize + ".." 470 + maxColDataSize); 471 } 472 473 if (isUpdate) { 474 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3); 475 int colIndex = 0; 476 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100); 477 if (colIndex < mutateOpts.length) { 478 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]); 479 } 480 if (colIndex < mutateOpts.length) { 481 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1; 482 } 483 484 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE); 485 486 System.out.println("Batch updates: " + isBatchUpdate); 487 System.out.println("Percent of keys to update: " + updatePercent); 488 System.out.println("Updater threads: " + numUpdaterThreads); 489 System.out.println("Ignore nonce conflicts: " + ignoreConflicts); 490 } 491 492 if (isRead) { 493 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); 494 int colIndex = 0; 495 verifyPercent = parseInt(readOpts[colIndex++], 0, 100); 496 if (colIndex < readOpts.length) { 497 numReaderThreads = getNumThreads(readOpts[colIndex++]); 498 } 499 500 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { 501 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 502 0, Integer.MAX_VALUE); 503 } 504 505 if (cmd.hasOption(OPT_KEY_WINDOW)) { 506 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 507 0, Integer.MAX_VALUE); 508 } 509 510 if (cmd.hasOption(OPT_MULTIGET)) { 511 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 512 0, Integer.MAX_VALUE); 513 } 514 515 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize); 516 System.out.println("Percent of keys to verify: " + verifyPercent); 517 System.out.println("Reader threads: " + numReaderThreads); 518 } 519 520 numTables = 1; 521 if (cmd.hasOption(NUM_TABLES)) { 522 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); 523 } 524 525 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 526 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) { 527 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER)); 528 } 529 530 regionReplication = 1; 531 if (cmd.hasOption(OPT_REGION_REPLICATION)) { 532 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); 533 } 534 535 regionReplicaId = -1; 536 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { 537 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); 538 } 539 } 540 541 private void parseColumnFamilyOptions(CommandLine cmd) { 542 String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING); 543 dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null : 544 DataBlockEncoding.valueOf(dataBlockEncodingStr); 545 546 String compressStr = cmd.getOptionValue(OPT_COMPRESSION); 547 compressAlgo = compressStr == null ? Compression.Algorithm.NONE : 548 Compression.Algorithm.valueOf(compressStr); 549 550 String bloomStr = cmd.getOptionValue(OPT_BLOOM); 551 bloomType = bloomStr == null ? BloomType.ROW : 552 BloomType.valueOf(bloomStr); 553 554 inMemoryCF = cmd.hasOption(OPT_INMEMORY); 555 if (cmd.hasOption(OPT_ENCRYPTION)) { 556 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); 557 } 558 559 } 560 561 public void initTestTable() throws IOException { 562 Durability durability = Durability.USE_DEFAULT; 563 if (deferredLogFlush) { 564 durability = Durability.ASYNC_WAL; 565 } 566 567 HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, 568 getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, 569 regionReplication, durability); 570 applyColumnFamilyOptions(tableName, getColumnFamilies()); 571 } 572 573 @Override 574 protected int doWork() throws IOException { 575 if (!isVerbose) { 576 LogManager.getLogger(ZooKeeper.class.getName()).setLevel(Level.WARN); 577 } 578 if (numTables > 1) { 579 return parallelLoadTables(); 580 } else { 581 return loadTable(); 582 } 583 } 584 585 protected int loadTable() throws IOException { 586 if (cmd.hasOption(OPT_ZK_QUORUM)) { 587 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); 588 } 589 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) { 590 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE)); 591 } 592 593 if (isInitOnly) { 594 LOG.info("Initializing only; no reads or writes"); 595 initTestTable(); 596 return 0; 597 } 598 599 if (!isSkipInit) { 600 initTestTable(); 601 } 602 LoadTestDataGenerator dataGen = null; 603 if (cmd.hasOption(OPT_GENERATOR)) { 604 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); 605 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); 606 String[] args; 607 if (dataGen instanceof LoadTestDataGeneratorWithACL) { 608 LOG.info("Using LoadTestDataGeneratorWithACL"); 609 if (User.isHBaseSecurityEnabled(conf)) { 610 LOG.info("Security is enabled"); 611 authnFileName = clazzAndArgs[1]; 612 superUser = clazzAndArgs[2]; 613 userNames = clazzAndArgs[3]; 614 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length); 615 Properties authConfig = new Properties(); 616 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName)); 617 try { 618 addAuthInfoToConf(authConfig, conf, superUser, userNames); 619 } catch (IOException exp) { 620 LOG.error(exp.toString(), exp); 621 return EXIT_FAILURE; 622 } 623 userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser)); 624 } else { 625 superUser = clazzAndArgs[1]; 626 userNames = clazzAndArgs[2]; 627 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 628 userOwner = User.createUserForTesting(conf, superUser, new String[0]); 629 } 630 } else { 631 args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1, 632 clazzAndArgs.length); 633 } 634 dataGen.initialize(args); 635 } else { 636 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator 637 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, 638 minColsPerKey, maxColsPerKey, families); 639 } 640 641 if (userOwner != null) { 642 LOG.info("Granting permissions for user " + userOwner.getShortName()); 643 Permission.Action[] actions = { 644 Permission.Action.ADMIN, Permission.Action.CREATE, 645 Permission.Action.READ, Permission.Action.WRITE }; 646 try { 647 AccessControlClient.grant(ConnectionFactory.createConnection(conf), 648 tableName, userOwner.getShortName(), null, null, actions); 649 } catch (Throwable e) { 650 LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " + 651 userOwner.getShortName(), e); 652 return EXIT_FAILURE; 653 } 654 } 655 656 if (userNames != null) { 657 // This will be comma separated list of expressions. 658 String users[] = userNames.split(","); 659 User user = null; 660 for (String userStr : users) { 661 if (User.isHBaseSecurityEnabled(conf)) { 662 user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr)); 663 } else { 664 user = User.createUserForTesting(conf, userStr, new String[0]); 665 } 666 } 667 } 668 669 if (isWrite) { 670 if (userOwner != null) { 671 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); 672 } else { 673 String writerClass = null; 674 if (cmd.hasOption(OPT_WRITER)) { 675 writerClass = cmd.getOptionValue(OPT_WRITER); 676 } else { 677 writerClass = MultiThreadedWriter.class.getCanonicalName(); 678 } 679 680 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen); 681 } 682 writerThreads.setMultiPut(isMultiPut); 683 } 684 685 if (isUpdate) { 686 if (userOwner != null) { 687 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, 688 userOwner, userNames); 689 } else { 690 String updaterClass = null; 691 if (cmd.hasOption(OPT_UPDATER)) { 692 updaterClass = cmd.getOptionValue(OPT_UPDATER); 693 } else { 694 updaterClass = MultiThreadedUpdater.class.getCanonicalName(); 695 } 696 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen); 697 } 698 updaterThreads.setBatchUpdate(isBatchUpdate); 699 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); 700 } 701 702 if (isRead) { 703 if (userOwner != null) { 704 readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, 705 userNames); 706 } else { 707 String readerClass = null; 708 if (cmd.hasOption(OPT_READER)) { 709 readerClass = cmd.getOptionValue(OPT_READER); 710 } else { 711 readerClass = MultiThreadedReader.class.getCanonicalName(); 712 } 713 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); 714 } 715 readerThreads.setMaxErrors(maxReadErrors); 716 readerThreads.setKeyWindow(keyWindow); 717 readerThreads.setMultiGetBatchSize(multiGetBatchSize); 718 readerThreads.setRegionReplicaId(regionReplicaId); 719 } 720 721 if (isUpdate && isWrite) { 722 LOG.info("Concurrent write/update workload: making updaters aware of the " + 723 "write point"); 724 updaterThreads.linkToWriter(writerThreads); 725 } 726 727 if (isRead && (isUpdate || isWrite)) { 728 LOG.info("Concurrent write/read workload: making readers aware of the " + 729 "write point"); 730 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads); 731 } 732 733 if (isWrite) { 734 System.out.println("Starting to write data..."); 735 writerThreads.start(startKey, endKey, numWriterThreads); 736 } 737 738 if (isUpdate) { 739 LOG.info("Starting to mutate data..."); 740 System.out.println("Starting to mutate data..."); 741 // TODO : currently append and increment operations not tested with tags 742 // Will update this after it is done 743 updaterThreads.start(startKey, endKey, numUpdaterThreads); 744 } 745 746 if (isRead) { 747 System.out.println("Starting to read data..."); 748 readerThreads.start(startKey, endKey, numReaderThreads); 749 } 750 751 if (isWrite) { 752 writerThreads.waitForFinish(); 753 } 754 755 if (isUpdate) { 756 updaterThreads.waitForFinish(); 757 } 758 759 if (isRead) { 760 readerThreads.waitForFinish(); 761 } 762 763 boolean success = true; 764 if (isWrite) { 765 success = success && writerThreads.getNumWriteFailures() == 0; 766 } 767 if (isUpdate) { 768 success = success && updaterThreads.getNumWriteFailures() == 0; 769 } 770 if (isRead) { 771 success = success && readerThreads.getNumReadErrors() == 0 772 && readerThreads.getNumReadFailures() == 0; 773 } 774 return success ? EXIT_SUCCESS : EXIT_FAILURE; 775 } 776 777 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException { 778 try { 779 Class<?> clazz = Class.forName(clazzName); 780 Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class, 781 byte[][].class); 782 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, 783 minColsPerKey, maxColsPerKey, families); 784 } catch (Exception e) { 785 throw new IOException(e); 786 } 787 } 788 789 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName 790 , LoadTestDataGenerator dataGen) throws IOException { 791 try { 792 Class<?> clazz = Class.forName(clazzName); 793 Constructor<?> constructor = clazz.getConstructor( 794 LoadTestDataGenerator.class, Configuration.class, TableName.class); 795 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName); 796 } catch (Exception e) { 797 throw new IOException(e); 798 } 799 } 800 801 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName 802 , LoadTestDataGenerator dataGen) throws IOException { 803 try { 804 Class<?> clazz = Class.forName(clazzName); 805 Constructor<?> constructor = clazz.getConstructor( 806 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); 807 return (MultiThreadedUpdater) constructor.newInstance( 808 dataGen, conf, tableName, updatePercent); 809 } catch (Exception e) { 810 throw new IOException(e); 811 } 812 } 813 814 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName 815 , LoadTestDataGenerator dataGen) throws IOException { 816 try { 817 Class<?> clazz = Class.forName(clazzName); 818 Constructor<?> constructor = clazz.getConstructor( 819 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); 820 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent); 821 } catch (Exception e) { 822 throw new IOException(e); 823 } 824 } 825 826 public static void main(String[] args) { 827 new LoadTestTool().doStaticMain(args); 828 } 829 830 /** 831 * When NUM_TABLES is specified, the function starts multiple worker threads 832 * which individually start a LoadTestTool instance to load a table. Each 833 * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2" 834 * , table names will be "test_1", "test_2" 835 * 836 * @throws IOException 837 */ 838 private int parallelLoadTables() 839 throws IOException { 840 // create new command args 841 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); 842 String[] newArgs = null; 843 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { 844 newArgs = new String[cmdLineArgs.length + 2]; 845 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; 846 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME; 847 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length); 848 } else { 849 newArgs = cmdLineArgs; 850 } 851 852 int tableNameValueIndex = -1; 853 for (int j = 0; j < newArgs.length; j++) { 854 if (newArgs[j].endsWith(OPT_TABLE_NAME)) { 855 tableNameValueIndex = j + 1; 856 } else if (newArgs[j].endsWith(NUM_TABLES)) { 857 // change NUM_TABLES to 1 so that each worker loads one table 858 newArgs[j + 1] = "1"; 859 } 860 } 861 862 // starting to load multiple tables 863 List<WorkerThread> workers = new ArrayList<>(); 864 for (int i = 0; i < numTables; i++) { 865 String[] workerArgs = newArgs.clone(); 866 workerArgs[tableNameValueIndex] = tableName + "_" + (i+1); 867 WorkerThread worker = new WorkerThread(i, workerArgs); 868 workers.add(worker); 869 LOG.info(worker + " starting"); 870 worker.start(); 871 } 872 873 // wait for all workers finish 874 LOG.info("Waiting for worker threads to finish"); 875 for (WorkerThread t : workers) { 876 try { 877 t.join(); 878 } catch (InterruptedException ie) { 879 IOException iie = new InterruptedIOException(); 880 iie.initCause(ie); 881 throw iie; 882 } 883 checkForErrors(); 884 } 885 886 return EXIT_SUCCESS; 887 } 888 889 // If an exception is thrown by one of worker threads, it will be 890 // stored here. 891 protected AtomicReference<Throwable> thrown = new AtomicReference<>(); 892 893 private void workerThreadError(Throwable t) { 894 thrown.compareAndSet(null, t); 895 } 896 897 /** 898 * Check for errors in the writer threads. If any is found, rethrow it. 899 */ 900 private void checkForErrors() throws IOException { 901 Throwable thrown = this.thrown.get(); 902 if (thrown == null) return; 903 if (thrown instanceof IOException) { 904 throw (IOException) thrown; 905 } else { 906 throw new RuntimeException(thrown); 907 } 908 } 909 910 class WorkerThread extends Thread { 911 private String[] workerArgs; 912 913 WorkerThread(int i, String[] args) { 914 super("WorkerThread-" + i); 915 workerArgs = args; 916 } 917 918 @Override 919 public void run() { 920 try { 921 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); 922 if (ret != 0) { 923 throw new RuntimeException("LoadTestTool exit with non-zero return code."); 924 } 925 } catch (Exception ex) { 926 LOG.error("Error in worker thread", ex); 927 workerThreadError(ex); 928 } 929 } 930 } 931 932 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner, 933 String userList) throws IOException { 934 List<String> users = new ArrayList<>(Arrays.asList(userList.split(","))); 935 users.add(owner); 936 for (String user : users) { 937 String keyTabFileConfKey = "hbase." + user + ".keytab.file"; 938 String principalConfKey = "hbase." + user + ".kerberos.principal"; 939 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) { 940 throw new IOException("Authentication configs missing for user : " + user); 941 } 942 } 943 for (String key : authConfig.stringPropertyNames()) { 944 conf.set(key, authConfig.getProperty(key)); 945 } 946 LOG.debug("Added authentication properties to config successfully."); 947 } 948 949}