001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.lang.reflect.Constructor; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.atomic.AtomicReference; 028import javax.crypto.spec.SecretKeySpec; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.io.compress.Compression; 043import org.apache.hadoop.hbase.io.crypto.Cipher; 044import org.apache.hadoop.hbase.io.crypto.Encryption; 045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 046import org.apache.hadoop.hbase.log.HBaseMarkers; 047import org.apache.hadoop.hbase.logging.Log4jUtils; 048import org.apache.hadoop.hbase.regionserver.BloomType; 049import org.apache.hadoop.hbase.security.EncryptionUtil; 050import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.security.access.AccessControlClient; 053import org.apache.hadoop.hbase.security.access.Permission; 054import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 055import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; 056import org.apache.hadoop.util.ToolRunner; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.apache.zookeeper.ZooKeeper; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException; 063import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 064import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 065import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 067import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 068import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 069 070/** 071 * A command-line utility that reads, writes, and verifies data. Unlike 072 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, and 073 * supports simultaneously writing and reading the same set of keys. 074 */ 075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 076public class LoadTestTool extends AbstractHBaseTool { 077 078 private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class); 079 private static final String COLON = ":"; 080 081 /** Table name for the test */ 082 private TableName tableName; 083 084 /** Column families for the test */ 085 private byte[][] families; 086 087 /** Table name to use of not overridden on the command line */ 088 protected static final String DEFAULT_TABLE_NAME = "cluster_test"; 089 090 /** The default data size if not specified */ 091 protected static final int DEFAULT_DATA_SIZE = 64; 092 093 /** The number of reader/writer threads if not specified */ 094 protected static final int DEFAULT_NUM_THREADS = 20; 095 096 /** Usage string for the load option */ 097 protected static final String OPT_USAGE_LOAD = 098 "<avg_cols_per_key>:<avg_data_size>" + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 099 100 /** Usage string for the read option */ 101 protected static final String OPT_USAGE_READ = 102 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 103 104 /** Usage string for the update option */ 105 protected static final String OPT_USAGE_UPDATE = "<update_percent>[:<#threads=" 106 + DEFAULT_NUM_THREADS + ">][:<#whether to ignore nonce collisions=0>]"; 107 108 protected static final String OPT_USAGE_BLOOM = 109 "Bloom filter type, one of " + Arrays.toString(BloomType.values()); 110 111 protected static final String OPT_USAGE_COMPRESSION = 112 "Compression type, " + "one of " + Arrays.toString(Compression.Algorithm.values()); 113 114 protected static final String OPT_VERBOSE = "verbose"; 115 116 public static final String OPT_BLOOM = "bloom"; 117 public static final String OPT_BLOOM_PARAM = "bloom_param"; 118 public static final String OPT_COMPRESSION = "compression"; 119 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; 120 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; 121 122 public static final String OPT_INMEMORY = "in_memory"; 123 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " 124 + "inmemory as far as possible. Not guaranteed that reads are always served from inmemory"; 125 126 public static final String OPT_GENERATOR = "generator"; 127 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." 128 + " Any args for this class can be passed as colon separated after class name"; 129 130 public static final String OPT_WRITER = "writer"; 131 public static final String OPT_WRITER_USAGE = "The class for executing the write requests"; 132 133 public static final String OPT_UPDATER = "updater"; 134 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests"; 135 136 public static final String OPT_READER = "reader"; 137 public static final String OPT_READER_USAGE = "The class for executing the read requests"; 138 139 protected static final String OPT_KEY_WINDOW = "key_window"; 140 protected static final String OPT_WRITE = "write"; 141 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; 142 public static final String OPT_MULTIPUT = "multiput"; 143 public static final String OPT_MULTIGET = "multiget_batchsize"; 144 protected static final String OPT_NUM_KEYS = "num_keys"; 145 protected static final String OPT_READ = "read"; 146 protected static final String OPT_START_KEY = "start_key"; 147 public static final String OPT_TABLE_NAME = "tn"; 148 public static final String OPT_COLUMN_FAMILIES = "families"; 149 protected static final String OPT_ZK_QUORUM = "zk"; 150 protected static final String OPT_ZK_PARENT_NODE = "zk_root"; 151 protected static final String OPT_SKIP_INIT = "skip_init"; 152 protected static final String OPT_INIT_ONLY = "init_only"; 153 protected static final String NUM_TABLES = "num_tables"; 154 protected static final String OPT_BATCHUPDATE = "batchupdate"; 155 protected static final String OPT_UPDATE = "update"; 156 157 public static final String OPT_ENCRYPTION = "encryption"; 158 protected static final String OPT_ENCRYPTION_USAGE = 159 "Enables transparent encryption on the test table, one of " 160 + Arrays.toString(Encryption.getSupportedCiphers()); 161 162 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; 163 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE = 164 "Desired number of regions per region server. Defaults to 5."; 165 public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5; 166 167 public static final String OPT_REGION_REPLICATION = "region_replication"; 168 protected static final String OPT_REGION_REPLICATION_USAGE = 169 "Desired number of replicas per region"; 170 171 public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; 172 protected static final String OPT_REGION_REPLICA_ID_USAGE = 173 "Region replica id to do the reads from"; 174 175 public static final String OPT_MOB_THRESHOLD = "mob_threshold"; 176 protected static final String OPT_MOB_THRESHOLD_USAGE = 177 "Desired cell size to exceed in bytes that will use the MOB write path"; 178 179 protected static final long DEFAULT_START_KEY = 0; 180 181 /** This will be removed as we factor out the dependency on command line */ 182 protected CommandLine cmd; 183 184 protected MultiThreadedWriter writerThreads = null; 185 protected MultiThreadedReader readerThreads = null; 186 protected MultiThreadedUpdater updaterThreads = null; 187 188 protected long startKey, endKey; 189 190 protected boolean isVerbose, isWrite, isRead, isUpdate; 191 protected boolean deferredLogFlush; 192 193 // Column family options 194 protected DataBlockEncoding dataBlockEncodingAlgo; 195 protected Compression.Algorithm compressAlgo; 196 protected BloomType bloomType; 197 private boolean inMemoryCF; 198 199 private User userOwner; 200 // Writer options 201 protected int numWriterThreads = DEFAULT_NUM_THREADS; 202 protected int minColsPerKey, maxColsPerKey; 203 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE; 204 protected boolean isMultiPut; 205 206 // Updater options 207 protected int numUpdaterThreads = DEFAULT_NUM_THREADS; 208 protected int updatePercent; 209 protected boolean ignoreConflicts = false; 210 protected boolean isBatchUpdate; 211 212 // Reader options 213 private int numReaderThreads = DEFAULT_NUM_THREADS; 214 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; 215 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; 216 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; 217 private int verifyPercent; 218 219 private int numTables = 1; 220 221 private String superUser; 222 223 private String userNames; 224 // This file is used to read authentication information in secure clusters. 225 private String authnFileName; 226 227 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 228 private int regionReplication = -1; // not set 229 private int regionReplicaId = -1; // not set 230 231 private int mobThreshold = -1; // not set 232 233 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, 234 // console tool itself should only be used from console. 235 protected boolean isSkipInit = false; 236 protected boolean isInitOnly = false; 237 238 protected Cipher cipher = null; 239 240 protected String[] splitColonSeparated(String option, int minNumCols, int maxNumCols) { 241 String optVal = cmd.getOptionValue(option); 242 String[] cols = optVal.split(COLON); 243 if (cols.length < minNumCols || cols.length > maxNumCols) { 244 throw new IllegalArgumentException( 245 "Expected at least " + minNumCols + " columns but no more than " + maxNumCols 246 + " in the colon-separated value '" + optVal + "' of the " + "-" + option + " option"); 247 } 248 return cols; 249 } 250 251 protected int getNumThreads(String numThreadsStr) { 252 return parseInt(numThreadsStr, 1, Short.MAX_VALUE); 253 } 254 255 public byte[][] getColumnFamilies() { 256 return families; 257 } 258 259 /** 260 * Apply column family options such as Bloom filters, compression, and data block encoding. 261 */ 262 protected void applyColumnFamilyOptions(TableName tableName, byte[][] columnFamilies) 263 throws IOException { 264 try (Connection conn = ConnectionFactory.createConnection(conf); 265 Admin admin = conn.getAdmin()) { 266 TableDescriptor tableDesc = admin.getDescriptor(tableName); 267 LOG.info("Disabling table " + tableName); 268 admin.disableTable(tableName); 269 for (byte[] cf : columnFamilies) { 270 ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf); 271 boolean isNewCf = columnDesc == null; 272 ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf 273 ? ColumnFamilyDescriptorBuilder.newBuilder(cf) 274 : ColumnFamilyDescriptorBuilder.newBuilder(columnDesc); 275 if (bloomType != null) { 276 columnDescBuilder.setBloomFilterType(bloomType); 277 } 278 if (compressAlgo != null) { 279 columnDescBuilder.setCompressionType(compressAlgo); 280 } 281 if (dataBlockEncodingAlgo != null) { 282 columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo); 283 } 284 if (inMemoryCF) { 285 columnDescBuilder.setInMemory(inMemoryCF); 286 } 287 if (cipher != null) { 288 byte[] keyBytes = new byte[cipher.getKeyLength()]; 289 Bytes.secureRandom(keyBytes); 290 columnDescBuilder.setEncryptionType(cipher.getName()); 291 columnDescBuilder.setEncryptionKey(EncryptionUtil.wrapKey(conf, 292 User.getCurrent().getShortName(), new SecretKeySpec(keyBytes, cipher.getName()))); 293 } 294 if (mobThreshold >= 0) { 295 columnDescBuilder.setMobEnabled(true); 296 columnDescBuilder.setMobThreshold(mobThreshold); 297 } 298 299 if (isNewCf) { 300 admin.addColumnFamily(tableName, columnDescBuilder.build()); 301 } else { 302 admin.modifyColumnFamily(tableName, columnDescBuilder.build()); 303 } 304 } 305 LOG.info("Enabling table " + tableName); 306 admin.enableTable(tableName); 307 } 308 } 309 310 @Override 311 protected void addOptions() { 312 addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper"); 313 addOptWithArg(OPT_ZK_QUORUM, 314 "ZK quorum as comma-separated host names " + "without port numbers"); 315 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); 316 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); 317 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma"); 318 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); 319 addOptWithArg(OPT_READ, OPT_USAGE_READ); 320 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); 321 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); 322 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); 323 addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type"); 324 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); 325 addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, 326 HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); 327 addOptWithArg(OPT_MAX_READ_ERRORS, 328 "The maximum number of read errors " 329 + "to tolerate before terminating all reader threads. The default is " 330 + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); 331 addOptWithArg(OPT_MULTIGET, 332 "Whether to use multi-gets as opposed to " + "separate gets for every column in a row"); 333 addOptWithArg(OPT_KEY_WINDOW, 334 "The 'key window' to maintain between " 335 + "reads and writes for concurrent write/read workload. The default " + "is " 336 + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); 337 338 addOptNoArg(OPT_MULTIPUT, 339 "Whether to use multi-puts as opposed to " + "separate puts for every column in a row"); 340 addOptNoArg(OPT_BATCHUPDATE, 341 "Whether to use batch as opposed to " + "separate updates for every column in a row"); 342 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); 343 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); 344 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE); 345 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE); 346 addOptWithArg(OPT_READER, OPT_READER_USAGE); 347 348 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); 349 addOptWithArg(OPT_START_KEY, "The first key to read/write " 350 + "(a 0-based index). The default value is " + DEFAULT_START_KEY + "."); 351 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists"); 352 353 addOptWithArg(NUM_TABLES, 354 "A positive integer number. When a number n is specified, load test " 355 + "tool will load n table parallely. -tn parameter value becomes " 356 + "table name prefix. Each table name is in format <tn>_1...<tn>_n"); 357 358 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE); 359 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); 360 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); 361 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); 362 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); 363 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE); 364 } 365 366 @Override 367 protected CommandLineParser newParser() { 368 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves 369 // Validate in parse() to get helpful error messages instead of exploding in processOptions() 370 return new DefaultParser() { 371 @Override 372 public CommandLine parse(Options opts, String[] args, Properties props, boolean stop) 373 throws ParseException { 374 CommandLine cl = super.parse(opts, args, props, stop); 375 376 boolean isReadWriteUpdate = 377 cmd.hasOption(OPT_READ) || cmd.hasOption(OPT_WRITE) || cmd.hasOption(OPT_UPDATE); 378 boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 379 380 if (!isInitOnly && !isReadWriteUpdate) { 381 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY 382 + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 383 } 384 385 if (isInitOnly && isReadWriteUpdate) { 386 throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -" 387 + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 388 } 389 390 if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) { 391 throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode."); 392 } 393 394 return cl; 395 } 396 }; 397 } 398 399 @Override 400 protected void processOptions(CommandLine cmd) { 401 this.cmd = cmd; 402 403 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME)); 404 405 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) { 406 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(","); 407 families = new byte[list.length][]; 408 for (int i = 0; i < list.length; i++) { 409 families[i] = Bytes.toBytes(list[i]); 410 } 411 } else { 412 families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES; 413 } 414 415 isVerbose = cmd.hasOption(OPT_VERBOSE); 416 isWrite = cmd.hasOption(OPT_WRITE); 417 isRead = cmd.hasOption(OPT_READ); 418 isUpdate = cmd.hasOption(OPT_UPDATE); 419 isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 420 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH); 421 422 if (!isInitOnly) { 423 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, String.valueOf(DEFAULT_START_KEY)), 0, 424 Long.MAX_VALUE); 425 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, Long.MAX_VALUE - startKey); 426 endKey = startKey + numKeys; 427 isSkipInit = cmd.hasOption(OPT_SKIP_INIT); 428 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]"); 429 } 430 431 parseColumnFamilyOptions(cmd); 432 433 if (isWrite) { 434 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); 435 436 int colIndex = 0; 437 minColsPerKey = 1; 438 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]); 439 int avgColDataSize = parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); 440 minColDataSize = avgColDataSize / 2; 441 maxColDataSize = avgColDataSize * 3 / 2; 442 443 if (colIndex < writeOpts.length) { 444 numWriterThreads = getNumThreads(writeOpts[colIndex++]); 445 } 446 447 isMultiPut = cmd.hasOption(OPT_MULTIPUT); 448 449 mobThreshold = -1; 450 if (cmd.hasOption(OPT_MOB_THRESHOLD)) { 451 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD)); 452 } 453 454 System.out.println("Multi-puts: " + isMultiPut); 455 System.out.println("Columns per key: " + minColsPerKey + ".." + maxColsPerKey); 456 System.out.println("Data size per column: " + minColDataSize + ".." + maxColDataSize); 457 } 458 459 if (isUpdate) { 460 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3); 461 int colIndex = 0; 462 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100); 463 if (colIndex < mutateOpts.length) { 464 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]); 465 } 466 if (colIndex < mutateOpts.length) { 467 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1; 468 } 469 470 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE); 471 472 System.out.println("Batch updates: " + isBatchUpdate); 473 System.out.println("Percent of keys to update: " + updatePercent); 474 System.out.println("Updater threads: " + numUpdaterThreads); 475 System.out.println("Ignore nonce conflicts: " + ignoreConflicts); 476 } 477 478 if (isRead) { 479 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); 480 int colIndex = 0; 481 verifyPercent = parseInt(readOpts[colIndex++], 0, 100); 482 if (colIndex < readOpts.length) { 483 numReaderThreads = getNumThreads(readOpts[colIndex++]); 484 } 485 486 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { 487 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 0, Integer.MAX_VALUE); 488 } 489 490 if (cmd.hasOption(OPT_KEY_WINDOW)) { 491 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 0, Integer.MAX_VALUE); 492 } 493 494 if (cmd.hasOption(OPT_MULTIGET)) { 495 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 0, Integer.MAX_VALUE); 496 } 497 498 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize); 499 System.out.println("Percent of keys to verify: " + verifyPercent); 500 System.out.println("Reader threads: " + numReaderThreads); 501 } 502 503 numTables = 1; 504 if (cmd.hasOption(NUM_TABLES)) { 505 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); 506 } 507 508 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 509 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) { 510 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER)); 511 } 512 513 regionReplication = 1; 514 if (cmd.hasOption(OPT_REGION_REPLICATION)) { 515 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); 516 } 517 518 regionReplicaId = -1; 519 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { 520 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); 521 } 522 } 523 524 private void parseColumnFamilyOptions(CommandLine cmd) { 525 String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING); 526 dataBlockEncodingAlgo = 527 dataBlockEncodingStr == null ? null : DataBlockEncoding.valueOf(dataBlockEncodingStr); 528 529 String compressStr = cmd.getOptionValue(OPT_COMPRESSION); 530 compressAlgo = 531 compressStr == null ? Compression.Algorithm.NONE : Compression.Algorithm.valueOf(compressStr); 532 533 String bloomStr = cmd.getOptionValue(OPT_BLOOM); 534 bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr); 535 536 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 537 if (!cmd.hasOption(OPT_BLOOM_PARAM)) { 538 LOG.error("the parameter of bloom filter {} is not specified", bloomType.name()); 539 } else { 540 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); 541 } 542 } 543 544 inMemoryCF = cmd.hasOption(OPT_INMEMORY); 545 if (cmd.hasOption(OPT_ENCRYPTION)) { 546 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); 547 } 548 549 } 550 551 public void initTestTable() throws IOException { 552 Durability durability = Durability.USE_DEFAULT; 553 if (deferredLogFlush) { 554 durability = Durability.ASYNC_WAL; 555 } 556 557 HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, getColumnFamilies(), 558 compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, regionReplication, durability); 559 applyColumnFamilyOptions(tableName, getColumnFamilies()); 560 } 561 562 @Override 563 protected int doWork() throws IOException { 564 if (!isVerbose) { 565 Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN"); 566 } 567 if (numTables > 1) { 568 return parallelLoadTables(); 569 } else { 570 return loadTable(); 571 } 572 } 573 574 protected int loadTable() throws IOException { 575 if (cmd.hasOption(OPT_ZK_QUORUM)) { 576 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); 577 } 578 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) { 579 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE)); 580 } 581 582 if (isInitOnly) { 583 LOG.info("Initializing only; no reads or writes"); 584 initTestTable(); 585 return 0; 586 } 587 588 if (!isSkipInit) { 589 initTestTable(); 590 } 591 LoadTestDataGenerator dataGen = null; 592 if (cmd.hasOption(OPT_GENERATOR)) { 593 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); 594 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); 595 String[] args; 596 if (dataGen instanceof LoadTestDataGeneratorWithACL) { 597 LOG.info("Using LoadTestDataGeneratorWithACL"); 598 if (User.isHBaseSecurityEnabled(conf)) { 599 LOG.info("Security is enabled"); 600 authnFileName = clazzAndArgs[1]; 601 superUser = clazzAndArgs[2]; 602 userNames = clazzAndArgs[3]; 603 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length); 604 Properties authConfig = new Properties(); 605 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName)); 606 try { 607 addAuthInfoToConf(authConfig, conf, superUser, userNames); 608 } catch (IOException exp) { 609 LOG.error(exp.toString(), exp); 610 return EXIT_FAILURE; 611 } 612 userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser)); 613 } else { 614 superUser = clazzAndArgs[1]; 615 userNames = clazzAndArgs[2]; 616 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 617 userOwner = User.createUserForTesting(conf, superUser, new String[0]); 618 } 619 } else { 620 args = clazzAndArgs.length == 1 621 ? new String[0] 622 : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 623 } 624 dataGen.initialize(args); 625 } else { 626 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator 627 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, 628 minColsPerKey, maxColsPerKey, families); 629 } 630 631 if (userOwner != null) { 632 LOG.info("Granting permissions for user " + userOwner.getShortName()); 633 Permission.Action[] actions = { Permission.Action.ADMIN, Permission.Action.CREATE, 634 Permission.Action.READ, Permission.Action.WRITE }; 635 try { 636 AccessControlClient.grant(ConnectionFactory.createConnection(conf), tableName, 637 userOwner.getShortName(), null, null, actions); 638 } catch (Throwable e) { 639 LOG.error(HBaseMarkers.FATAL, 640 "Error in granting permission for the user " + userOwner.getShortName(), e); 641 return EXIT_FAILURE; 642 } 643 } 644 645 if (userNames != null) { 646 // This will be comma separated list of expressions. 647 String users[] = userNames.split(","); 648 User user = null; 649 for (String userStr : users) { 650 if (User.isHBaseSecurityEnabled(conf)) { 651 user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr)); 652 } else { 653 user = User.createUserForTesting(conf, userStr, new String[0]); 654 } 655 } 656 } 657 658 if (isWrite) { 659 if (userOwner != null) { 660 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); 661 } else { 662 String writerClass = null; 663 if (cmd.hasOption(OPT_WRITER)) { 664 writerClass = cmd.getOptionValue(OPT_WRITER); 665 } else { 666 writerClass = MultiThreadedWriter.class.getCanonicalName(); 667 } 668 669 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen); 670 } 671 writerThreads.setMultiPut(isMultiPut); 672 } 673 674 if (isUpdate) { 675 if (userOwner != null) { 676 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, 677 userOwner, userNames); 678 } else { 679 String updaterClass = null; 680 if (cmd.hasOption(OPT_UPDATER)) { 681 updaterClass = cmd.getOptionValue(OPT_UPDATER); 682 } else { 683 updaterClass = MultiThreadedUpdater.class.getCanonicalName(); 684 } 685 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen); 686 } 687 updaterThreads.setBatchUpdate(isBatchUpdate); 688 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); 689 } 690 691 if (isRead) { 692 if (userOwner != null) { 693 readerThreads = 694 new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames); 695 } else { 696 String readerClass = null; 697 if (cmd.hasOption(OPT_READER)) { 698 readerClass = cmd.getOptionValue(OPT_READER); 699 } else { 700 readerClass = MultiThreadedReader.class.getCanonicalName(); 701 } 702 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); 703 } 704 readerThreads.setMaxErrors(maxReadErrors); 705 readerThreads.setKeyWindow(keyWindow); 706 readerThreads.setMultiGetBatchSize(multiGetBatchSize); 707 readerThreads.setRegionReplicaId(regionReplicaId); 708 } 709 710 if (isUpdate && isWrite) { 711 LOG.info("Concurrent write/update workload: making updaters aware of the " + "write point"); 712 updaterThreads.linkToWriter(writerThreads); 713 } 714 715 if (isRead && (isUpdate || isWrite)) { 716 LOG.info("Concurrent write/read workload: making readers aware of the " + "write point"); 717 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads); 718 } 719 720 if (isWrite) { 721 System.out.println("Starting to write data..."); 722 writerThreads.start(startKey, endKey, numWriterThreads); 723 } 724 725 if (isUpdate) { 726 LOG.info("Starting to mutate data..."); 727 System.out.println("Starting to mutate data..."); 728 // TODO : currently append and increment operations not tested with tags 729 // Will update this after it is done 730 updaterThreads.start(startKey, endKey, numUpdaterThreads); 731 } 732 733 if (isRead) { 734 System.out.println("Starting to read data..."); 735 readerThreads.start(startKey, endKey, numReaderThreads); 736 } 737 738 if (isWrite) { 739 writerThreads.waitForFinish(); 740 } 741 742 if (isUpdate) { 743 updaterThreads.waitForFinish(); 744 } 745 746 if (isRead) { 747 readerThreads.waitForFinish(); 748 } 749 750 boolean success = true; 751 if (isWrite) { 752 success = success && writerThreads.getNumWriteFailures() == 0; 753 } 754 if (isUpdate) { 755 success = success && updaterThreads.getNumWriteFailures() == 0; 756 } 757 if (isRead) { 758 success = 759 success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0; 760 } 761 return success ? EXIT_SUCCESS : EXIT_FAILURE; 762 } 763 764 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException { 765 try { 766 Class<?> clazz = Class.forName(clazzName); 767 Constructor<?> constructor = 768 clazz.getConstructor(int.class, int.class, int.class, int.class, byte[][].class); 769 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, 770 minColsPerKey, maxColsPerKey, families); 771 } catch (Exception e) { 772 throw new IOException(e); 773 } 774 } 775 776 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName, 777 LoadTestDataGenerator dataGen) throws IOException { 778 try { 779 Class<?> clazz = Class.forName(clazzName); 780 Constructor<?> constructor = 781 clazz.getConstructor(LoadTestDataGenerator.class, Configuration.class, TableName.class); 782 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName); 783 } catch (Exception e) { 784 throw new IOException(e); 785 } 786 } 787 788 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName, 789 LoadTestDataGenerator dataGen) throws IOException { 790 try { 791 Class<?> clazz = Class.forName(clazzName); 792 Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class, 793 Configuration.class, TableName.class, double.class); 794 return (MultiThreadedUpdater) constructor.newInstance(dataGen, conf, tableName, 795 updatePercent); 796 } catch (Exception e) { 797 throw new IOException(e); 798 } 799 } 800 801 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName, 802 LoadTestDataGenerator dataGen) throws IOException { 803 try { 804 Class<?> clazz = Class.forName(clazzName); 805 Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class, 806 Configuration.class, TableName.class, double.class); 807 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent); 808 } catch (Exception e) { 809 throw new IOException(e); 810 } 811 } 812 813 public static void main(String[] args) { 814 new LoadTestTool().doStaticMain(args); 815 } 816 817 /** 818 * When NUM_TABLES is specified, the function starts multiple worker threads which individually 819 * start a LoadTestTool instance to load a table. Each table name is in format <tn>_<index>. 820 * For example, "-tn test -num_tables 2" , table names will be "test_1", "test_2" 821 * @throws IOException if one of the load tasks is unable to complete 822 */ 823 private int parallelLoadTables() throws IOException { 824 // create new command args 825 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); 826 String[] newArgs = null; 827 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { 828 newArgs = new String[cmdLineArgs.length + 2]; 829 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; 830 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME; 831 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length); 832 } else { 833 newArgs = cmdLineArgs; 834 } 835 836 int tableNameValueIndex = -1; 837 for (int j = 0; j < newArgs.length; j++) { 838 if (newArgs[j].endsWith(OPT_TABLE_NAME)) { 839 tableNameValueIndex = j + 1; 840 } else if (newArgs[j].endsWith(NUM_TABLES)) { 841 // change NUM_TABLES to 1 so that each worker loads one table 842 newArgs[j + 1] = "1"; 843 } 844 } 845 846 // starting to load multiple tables 847 List<WorkerThread> workers = new ArrayList<>(); 848 for (int i = 0; i < numTables; i++) { 849 String[] workerArgs = newArgs.clone(); 850 workerArgs[tableNameValueIndex] = tableName + "_" + (i + 1); 851 WorkerThread worker = new WorkerThread(i, workerArgs); 852 workers.add(worker); 853 LOG.info(worker + " starting"); 854 worker.start(); 855 } 856 857 // wait for all workers finish 858 LOG.info("Waiting for worker threads to finish"); 859 for (WorkerThread t : workers) { 860 try { 861 t.join(); 862 } catch (InterruptedException ie) { 863 IOException iie = new InterruptedIOException(); 864 iie.initCause(ie); 865 throw iie; 866 } 867 checkForErrors(); 868 } 869 870 return EXIT_SUCCESS; 871 } 872 873 // If an exception is thrown by one of worker threads, it will be 874 // stored here. 875 protected AtomicReference<Throwable> thrown = new AtomicReference<>(); 876 877 private void workerThreadError(Throwable t) { 878 thrown.compareAndSet(null, t); 879 } 880 881 /** 882 * Check for errors in the writer threads. If any is found, rethrow it. 883 */ 884 private void checkForErrors() throws IOException { 885 Throwable thrown = this.thrown.get(); 886 if (thrown == null) return; 887 if (thrown instanceof IOException) { 888 throw (IOException) thrown; 889 } else { 890 throw new RuntimeException(thrown); 891 } 892 } 893 894 class WorkerThread extends Thread { 895 private String[] workerArgs; 896 897 WorkerThread(int i, String[] args) { 898 super("WorkerThread-" + i); 899 workerArgs = args; 900 } 901 902 @Override 903 public void run() { 904 try { 905 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); 906 if (ret != 0) { 907 throw new RuntimeException("LoadTestTool exit with non-zero return code."); 908 } 909 } catch (Exception ex) { 910 LOG.error("Error in worker thread", ex); 911 workerThreadError(ex); 912 } 913 } 914 } 915 916 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner, 917 String userList) throws IOException { 918 List<String> users = new ArrayList<>(Arrays.asList(userList.split(","))); 919 users.add(owner); 920 for (String user : users) { 921 String keyTabFileConfKey = "hbase." + user + ".keytab.file"; 922 String principalConfKey = "hbase." + user + ".kerberos.principal"; 923 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) { 924 throw new IOException("Authentication configs missing for user : " + user); 925 } 926 } 927 for (String key : authConfig.stringPropertyNames()) { 928 conf.set(key, authConfig.getProperty(key)); 929 } 930 LOG.debug("Added authentication properties to config successfully."); 931 } 932 933}