001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.util; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.lang.reflect.Constructor; 022import java.security.SecureRandom; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.atomic.AtomicReference; 028 029import javax.crypto.spec.SecretKeySpec; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseInterfaceAudience; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.log4j.Level; 040import org.apache.log4j.LogManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.zookeeper.ZooKeeper; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Durability; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.crypto.Cipher; 052import org.apache.hadoop.hbase.io.crypto.Encryption; 053import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 054import org.apache.hadoop.hbase.log.HBaseMarkers; 055import org.apache.hadoop.hbase.regionserver.BloomType; 056import org.apache.hadoop.hbase.security.EncryptionUtil; 057import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 058import org.apache.hadoop.hbase.security.User; 059import org.apache.hadoop.hbase.security.access.AccessControlClient; 060import org.apache.hadoop.hbase.security.access.Permission; 061import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 062import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; 063import org.apache.hadoop.util.ToolRunner; 064 065import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 067import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 068import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 069import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 070import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 071import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 072 073/** 074 * A command-line utility that reads, writes, and verifies data. Unlike 075 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, 076 * and supports simultaneously writing and reading the same set of keys. 077 */ 078@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 079public class LoadTestTool extends AbstractHBaseTool { 080 081 private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class); 082 private static final String COLON = ":"; 083 084 /** Table name for the test */ 085 private TableName tableName; 086 087 /** Column families for the test */ 088 private byte[][] families; 089 090 /** Table name to use of not overridden on the command line */ 091 protected static final String DEFAULT_TABLE_NAME = "cluster_test"; 092 093 /** The default data size if not specified */ 094 protected static final int DEFAULT_DATA_SIZE = 64; 095 096 /** The number of reader/writer threads if not specified */ 097 protected static final int DEFAULT_NUM_THREADS = 20; 098 099 /** Usage string for the load option */ 100 protected static final String OPT_USAGE_LOAD = 101 "<avg_cols_per_key>:<avg_data_size>" + 102 "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 103 104 /** Usage string for the read option */ 105 protected static final String OPT_USAGE_READ = 106 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 107 108 /** Usage string for the update option */ 109 protected static final String OPT_USAGE_UPDATE = 110 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS 111 + ">][:<#whether to ignore nonce collisions=0>]"; 112 113 protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " + 114 Arrays.toString(BloomType.values()); 115 116 protected static final String OPT_USAGE_COMPRESSION = "Compression type, " + 117 "one of " + Arrays.toString(Compression.Algorithm.values()); 118 119 protected static final String OPT_VERBOSE = "verbose"; 120 121 public static final String OPT_BLOOM = "bloom"; 122 public static final String OPT_BLOOM_PARAM = "bloom_param"; 123 public static final String OPT_COMPRESSION = "compression"; 124 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; 125 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; 126 127 public static final String OPT_INMEMORY = "in_memory"; 128 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " + 129 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory"; 130 131 public static final String OPT_GENERATOR = "generator"; 132 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." 133 + " Any args for this class can be passed as colon separated after class name"; 134 135 public static final String OPT_WRITER = "writer"; 136 public static final String OPT_WRITER_USAGE = "The class for executing the write requests"; 137 138 public static final String OPT_UPDATER = "updater"; 139 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests"; 140 141 public static final String OPT_READER = "reader"; 142 public static final String OPT_READER_USAGE = "The class for executing the read requests"; 143 144 protected static final String OPT_KEY_WINDOW = "key_window"; 145 protected static final String OPT_WRITE = "write"; 146 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; 147 public static final String OPT_MULTIPUT = "multiput"; 148 public static final String OPT_MULTIGET = "multiget_batchsize"; 149 protected static final String OPT_NUM_KEYS = "num_keys"; 150 protected static final String OPT_READ = "read"; 151 protected static final String OPT_START_KEY = "start_key"; 152 public static final String OPT_TABLE_NAME = "tn"; 153 public static final String OPT_COLUMN_FAMILIES = "families"; 154 protected static final String OPT_ZK_QUORUM = "zk"; 155 protected static final String OPT_ZK_PARENT_NODE = "zk_root"; 156 protected static final String OPT_SKIP_INIT = "skip_init"; 157 protected static final String OPT_INIT_ONLY = "init_only"; 158 protected static final String NUM_TABLES = "num_tables"; 159 protected static final String OPT_BATCHUPDATE = "batchupdate"; 160 protected static final String OPT_UPDATE = "update"; 161 162 public static final String OPT_ENCRYPTION = "encryption"; 163 protected static final String OPT_ENCRYPTION_USAGE = 164 "Enables transparent encryption on the test table, one of " + 165 Arrays.toString(Encryption.getSupportedCiphers()); 166 167 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; 168 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE 169 = "Desired number of regions per region server. Defaults to 5."; 170 public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5; 171 172 public static final String OPT_REGION_REPLICATION = "region_replication"; 173 protected static final String OPT_REGION_REPLICATION_USAGE = 174 "Desired number of replicas per region"; 175 176 public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; 177 protected static final String OPT_REGION_REPLICA_ID_USAGE = 178 "Region replica id to do the reads from"; 179 180 public static final String OPT_MOB_THRESHOLD = "mob_threshold"; 181 protected static final String OPT_MOB_THRESHOLD_USAGE = 182 "Desired cell size to exceed in bytes that will use the MOB write path"; 183 184 protected static final long DEFAULT_START_KEY = 0; 185 186 /** This will be removed as we factor out the dependency on command line */ 187 protected CommandLine cmd; 188 189 protected MultiThreadedWriter writerThreads = null; 190 protected MultiThreadedReader readerThreads = null; 191 protected MultiThreadedUpdater updaterThreads = null; 192 193 protected long startKey, endKey; 194 195 protected boolean isVerbose, isWrite, isRead, isUpdate; 196 protected boolean deferredLogFlush; 197 198 // Column family options 199 protected DataBlockEncoding dataBlockEncodingAlgo; 200 protected Compression.Algorithm compressAlgo; 201 protected BloomType bloomType; 202 private boolean inMemoryCF; 203 204 private User userOwner; 205 // Writer options 206 protected int numWriterThreads = DEFAULT_NUM_THREADS; 207 protected int minColsPerKey, maxColsPerKey; 208 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE; 209 protected boolean isMultiPut; 210 211 // Updater options 212 protected int numUpdaterThreads = DEFAULT_NUM_THREADS; 213 protected int updatePercent; 214 protected boolean ignoreConflicts = false; 215 protected boolean isBatchUpdate; 216 217 // Reader options 218 private int numReaderThreads = DEFAULT_NUM_THREADS; 219 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; 220 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; 221 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; 222 private int verifyPercent; 223 224 private int numTables = 1; 225 226 private String superUser; 227 228 private String userNames; 229 //This file is used to read authentication information in secure clusters. 230 private String authnFileName; 231 232 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 233 private int regionReplication = -1; // not set 234 private int regionReplicaId = -1; // not set 235 236 private int mobThreshold = -1; // not set 237 238 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, 239 // console tool itself should only be used from console. 240 protected boolean isSkipInit = false; 241 protected boolean isInitOnly = false; 242 243 protected Cipher cipher = null; 244 245 protected String[] splitColonSeparated(String option, 246 int minNumCols, int maxNumCols) { 247 String optVal = cmd.getOptionValue(option); 248 String[] cols = optVal.split(COLON); 249 if (cols.length < minNumCols || cols.length > maxNumCols) { 250 throw new IllegalArgumentException("Expected at least " 251 + minNumCols + " columns but no more than " + maxNumCols + 252 " in the colon-separated value '" + optVal + "' of the " + 253 "-" + option + " option"); 254 } 255 return cols; 256 } 257 258 protected int getNumThreads(String numThreadsStr) { 259 return parseInt(numThreadsStr, 1, Short.MAX_VALUE); 260 } 261 262 public byte[][] getColumnFamilies() { 263 return families; 264 } 265 266 /** 267 * Apply column family options such as Bloom filters, compression, and data 268 * block encoding. 269 */ 270 protected void applyColumnFamilyOptions(TableName tableName, 271 byte[][] columnFamilies) throws IOException { 272 try (Connection conn = ConnectionFactory.createConnection(conf); 273 Admin admin = conn.getAdmin()) { 274 TableDescriptor tableDesc = admin.getDescriptor(tableName); 275 LOG.info("Disabling table " + tableName); 276 admin.disableTable(tableName); 277 for (byte[] cf : columnFamilies) { 278 ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf); 279 boolean isNewCf = columnDesc == null; 280 ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf ? 281 ColumnFamilyDescriptorBuilder.newBuilder(cf) : 282 ColumnFamilyDescriptorBuilder.newBuilder(columnDesc); 283 if (bloomType != null) { 284 columnDescBuilder.setBloomFilterType(bloomType); 285 } 286 if (compressAlgo != null) { 287 columnDescBuilder.setCompressionType(compressAlgo); 288 } 289 if (dataBlockEncodingAlgo != null) { 290 columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo); 291 } 292 if (inMemoryCF) { 293 columnDescBuilder.setInMemory(inMemoryCF); 294 } 295 if (cipher != null) { 296 byte[] keyBytes = new byte[cipher.getKeyLength()]; 297 new SecureRandom().nextBytes(keyBytes); 298 columnDescBuilder.setEncryptionType(cipher.getName()); 299 columnDescBuilder.setEncryptionKey( 300 EncryptionUtil.wrapKey(conf, 301 User.getCurrent().getShortName(), 302 new SecretKeySpec(keyBytes, 303 cipher.getName()))); 304 } 305 if (mobThreshold >= 0) { 306 columnDescBuilder.setMobEnabled(true); 307 columnDescBuilder.setMobThreshold(mobThreshold); 308 } 309 310 if (isNewCf) { 311 admin.addColumnFamily(tableName, columnDescBuilder.build()); 312 } else { 313 admin.modifyColumnFamily(tableName, columnDescBuilder.build()); 314 } 315 } 316 LOG.info("Enabling table " + tableName); 317 admin.enableTable(tableName); 318 } 319 } 320 321 @Override 322 protected void addOptions() { 323 addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper"); 324 addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " + 325 "without port numbers"); 326 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); 327 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); 328 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma"); 329 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); 330 addOptWithArg(OPT_READ, OPT_USAGE_READ); 331 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); 332 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); 333 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); 334 addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type"); 335 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); 336 addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); 337 addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + 338 "to tolerate before terminating all reader threads. The default is " + 339 MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); 340 addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " + 341 "separate gets for every column in a row"); 342 addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " + 343 "reads and writes for concurrent write/read workload. The default " + 344 "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); 345 346 addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " + 347 "separate puts for every column in a row"); 348 addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " + 349 "separate updates for every column in a row"); 350 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); 351 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); 352 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE); 353 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE); 354 addOptWithArg(OPT_READER, OPT_READER_USAGE); 355 356 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); 357 addOptWithArg(OPT_START_KEY, "The first key to read/write " + 358 "(a 0-based index). The default value is " + 359 DEFAULT_START_KEY + "."); 360 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " 361 + "already exists"); 362 363 addOptWithArg(NUM_TABLES, 364 "A positive integer number. When a number n is specified, load test " 365 + "tool will load n table parallely. -tn parameter value becomes " 366 + "table name prefix. Each table name is in format <tn>_1...<tn>_n"); 367 368 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE); 369 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); 370 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); 371 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); 372 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); 373 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE); 374 } 375 376 @Override 377 protected CommandLineParser newParser() { 378 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves 379 // Validate in parse() to get helpful error messages instead of exploding in processOptions() 380 return new DefaultParser() { 381 @Override 382 public CommandLine parse(Options opts, String[] args, Properties props, boolean stop) 383 throws ParseException { 384 CommandLine cl = super.parse(opts, args, props, stop); 385 386 boolean isReadWriteUpdate = cmd.hasOption(OPT_READ) 387 || cmd.hasOption(OPT_WRITE) 388 || cmd.hasOption(OPT_UPDATE); 389 boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 390 391 if (!isInitOnly && !isReadWriteUpdate) { 392 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY 393 + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 394 } 395 396 if (isInitOnly && isReadWriteUpdate) { 397 throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -" 398 + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 399 } 400 401 if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) { 402 throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode."); 403 } 404 405 return cl; 406 } 407 }; 408 } 409 410 @Override 411 protected void processOptions(CommandLine cmd) { 412 this.cmd = cmd; 413 414 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, 415 DEFAULT_TABLE_NAME)); 416 417 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) { 418 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(","); 419 families = new byte[list.length][]; 420 for (int i = 0; i < list.length; i++) { 421 families[i] = Bytes.toBytes(list[i]); 422 } 423 } else { 424 families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES; 425 } 426 427 isVerbose = cmd.hasOption(OPT_VERBOSE); 428 isWrite = cmd.hasOption(OPT_WRITE); 429 isRead = cmd.hasOption(OPT_READ); 430 isUpdate = cmd.hasOption(OPT_UPDATE); 431 isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 432 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH); 433 434 if (!isInitOnly) { 435 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, 436 String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE); 437 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, 438 Long.MAX_VALUE - startKey); 439 endKey = startKey + numKeys; 440 isSkipInit = cmd.hasOption(OPT_SKIP_INIT); 441 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]"); 442 } 443 444 parseColumnFamilyOptions(cmd); 445 446 if (isWrite) { 447 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); 448 449 int colIndex = 0; 450 minColsPerKey = 1; 451 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]); 452 int avgColDataSize = 453 parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); 454 minColDataSize = avgColDataSize / 2; 455 maxColDataSize = avgColDataSize * 3 / 2; 456 457 if (colIndex < writeOpts.length) { 458 numWriterThreads = getNumThreads(writeOpts[colIndex++]); 459 } 460 461 isMultiPut = cmd.hasOption(OPT_MULTIPUT); 462 463 mobThreshold = -1; 464 if (cmd.hasOption(OPT_MOB_THRESHOLD)) { 465 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD)); 466 } 467 468 System.out.println("Multi-puts: " + isMultiPut); 469 System.out.println("Columns per key: " + minColsPerKey + ".." 470 + maxColsPerKey); 471 System.out.println("Data size per column: " + minColDataSize + ".." 472 + maxColDataSize); 473 } 474 475 if (isUpdate) { 476 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3); 477 int colIndex = 0; 478 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100); 479 if (colIndex < mutateOpts.length) { 480 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]); 481 } 482 if (colIndex < mutateOpts.length) { 483 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1; 484 } 485 486 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE); 487 488 System.out.println("Batch updates: " + isBatchUpdate); 489 System.out.println("Percent of keys to update: " + updatePercent); 490 System.out.println("Updater threads: " + numUpdaterThreads); 491 System.out.println("Ignore nonce conflicts: " + ignoreConflicts); 492 } 493 494 if (isRead) { 495 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); 496 int colIndex = 0; 497 verifyPercent = parseInt(readOpts[colIndex++], 0, 100); 498 if (colIndex < readOpts.length) { 499 numReaderThreads = getNumThreads(readOpts[colIndex++]); 500 } 501 502 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { 503 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 504 0, Integer.MAX_VALUE); 505 } 506 507 if (cmd.hasOption(OPT_KEY_WINDOW)) { 508 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 509 0, Integer.MAX_VALUE); 510 } 511 512 if (cmd.hasOption(OPT_MULTIGET)) { 513 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 514 0, Integer.MAX_VALUE); 515 } 516 517 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize); 518 System.out.println("Percent of keys to verify: " + verifyPercent); 519 System.out.println("Reader threads: " + numReaderThreads); 520 } 521 522 numTables = 1; 523 if (cmd.hasOption(NUM_TABLES)) { 524 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); 525 } 526 527 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 528 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) { 529 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER)); 530 } 531 532 regionReplication = 1; 533 if (cmd.hasOption(OPT_REGION_REPLICATION)) { 534 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); 535 } 536 537 regionReplicaId = -1; 538 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { 539 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); 540 } 541 } 542 543 private void parseColumnFamilyOptions(CommandLine cmd) { 544 String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING); 545 dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null : 546 DataBlockEncoding.valueOf(dataBlockEncodingStr); 547 548 String compressStr = cmd.getOptionValue(OPT_COMPRESSION); 549 compressAlgo = compressStr == null ? Compression.Algorithm.NONE : 550 Compression.Algorithm.valueOf(compressStr); 551 552 String bloomStr = cmd.getOptionValue(OPT_BLOOM); 553 bloomType = bloomStr == null ? BloomType.ROW : 554 BloomType.valueOf(bloomStr); 555 556 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 557 if (!cmd.hasOption(OPT_BLOOM_PARAM)) { 558 LOG.error("the parameter of bloom filter {} is not specified", bloomType.name()); 559 } else { 560 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); 561 } 562 } 563 564 inMemoryCF = cmd.hasOption(OPT_INMEMORY); 565 if (cmd.hasOption(OPT_ENCRYPTION)) { 566 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); 567 } 568 569 } 570 571 public void initTestTable() throws IOException { 572 Durability durability = Durability.USE_DEFAULT; 573 if (deferredLogFlush) { 574 durability = Durability.ASYNC_WAL; 575 } 576 577 HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, 578 getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, 579 regionReplication, durability); 580 applyColumnFamilyOptions(tableName, getColumnFamilies()); 581 } 582 583 @Override 584 protected int doWork() throws IOException { 585 if (!isVerbose) { 586 LogManager.getLogger(ZooKeeper.class.getName()).setLevel(Level.WARN); 587 } 588 if (numTables > 1) { 589 return parallelLoadTables(); 590 } else { 591 return loadTable(); 592 } 593 } 594 595 protected int loadTable() throws IOException { 596 if (cmd.hasOption(OPT_ZK_QUORUM)) { 597 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); 598 } 599 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) { 600 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE)); 601 } 602 603 if (isInitOnly) { 604 LOG.info("Initializing only; no reads or writes"); 605 initTestTable(); 606 return 0; 607 } 608 609 if (!isSkipInit) { 610 initTestTable(); 611 } 612 LoadTestDataGenerator dataGen = null; 613 if (cmd.hasOption(OPT_GENERATOR)) { 614 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); 615 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); 616 String[] args; 617 if (dataGen instanceof LoadTestDataGeneratorWithACL) { 618 LOG.info("Using LoadTestDataGeneratorWithACL"); 619 if (User.isHBaseSecurityEnabled(conf)) { 620 LOG.info("Security is enabled"); 621 authnFileName = clazzAndArgs[1]; 622 superUser = clazzAndArgs[2]; 623 userNames = clazzAndArgs[3]; 624 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length); 625 Properties authConfig = new Properties(); 626 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName)); 627 try { 628 addAuthInfoToConf(authConfig, conf, superUser, userNames); 629 } catch (IOException exp) { 630 LOG.error(exp.toString(), exp); 631 return EXIT_FAILURE; 632 } 633 userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser)); 634 } else { 635 superUser = clazzAndArgs[1]; 636 userNames = clazzAndArgs[2]; 637 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 638 userOwner = User.createUserForTesting(conf, superUser, new String[0]); 639 } 640 } else { 641 args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1, 642 clazzAndArgs.length); 643 } 644 dataGen.initialize(args); 645 } else { 646 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator 647 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, 648 minColsPerKey, maxColsPerKey, families); 649 } 650 651 if (userOwner != null) { 652 LOG.info("Granting permissions for user " + userOwner.getShortName()); 653 Permission.Action[] actions = { 654 Permission.Action.ADMIN, Permission.Action.CREATE, 655 Permission.Action.READ, Permission.Action.WRITE }; 656 try { 657 AccessControlClient.grant(ConnectionFactory.createConnection(conf), 658 tableName, userOwner.getShortName(), null, null, actions); 659 } catch (Throwable e) { 660 LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " + 661 userOwner.getShortName(), e); 662 return EXIT_FAILURE; 663 } 664 } 665 666 if (userNames != null) { 667 // This will be comma separated list of expressions. 668 String users[] = userNames.split(","); 669 User user = null; 670 for (String userStr : users) { 671 if (User.isHBaseSecurityEnabled(conf)) { 672 user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr)); 673 } else { 674 user = User.createUserForTesting(conf, userStr, new String[0]); 675 } 676 } 677 } 678 679 if (isWrite) { 680 if (userOwner != null) { 681 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); 682 } else { 683 String writerClass = null; 684 if (cmd.hasOption(OPT_WRITER)) { 685 writerClass = cmd.getOptionValue(OPT_WRITER); 686 } else { 687 writerClass = MultiThreadedWriter.class.getCanonicalName(); 688 } 689 690 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen); 691 } 692 writerThreads.setMultiPut(isMultiPut); 693 } 694 695 if (isUpdate) { 696 if (userOwner != null) { 697 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, 698 userOwner, userNames); 699 } else { 700 String updaterClass = null; 701 if (cmd.hasOption(OPT_UPDATER)) { 702 updaterClass = cmd.getOptionValue(OPT_UPDATER); 703 } else { 704 updaterClass = MultiThreadedUpdater.class.getCanonicalName(); 705 } 706 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen); 707 } 708 updaterThreads.setBatchUpdate(isBatchUpdate); 709 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); 710 } 711 712 if (isRead) { 713 if (userOwner != null) { 714 readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, 715 userNames); 716 } else { 717 String readerClass = null; 718 if (cmd.hasOption(OPT_READER)) { 719 readerClass = cmd.getOptionValue(OPT_READER); 720 } else { 721 readerClass = MultiThreadedReader.class.getCanonicalName(); 722 } 723 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); 724 } 725 readerThreads.setMaxErrors(maxReadErrors); 726 readerThreads.setKeyWindow(keyWindow); 727 readerThreads.setMultiGetBatchSize(multiGetBatchSize); 728 readerThreads.setRegionReplicaId(regionReplicaId); 729 } 730 731 if (isUpdate && isWrite) { 732 LOG.info("Concurrent write/update workload: making updaters aware of the " + 733 "write point"); 734 updaterThreads.linkToWriter(writerThreads); 735 } 736 737 if (isRead && (isUpdate || isWrite)) { 738 LOG.info("Concurrent write/read workload: making readers aware of the " + 739 "write point"); 740 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads); 741 } 742 743 if (isWrite) { 744 System.out.println("Starting to write data..."); 745 writerThreads.start(startKey, endKey, numWriterThreads); 746 } 747 748 if (isUpdate) { 749 LOG.info("Starting to mutate data..."); 750 System.out.println("Starting to mutate data..."); 751 // TODO : currently append and increment operations not tested with tags 752 // Will update this after it is done 753 updaterThreads.start(startKey, endKey, numUpdaterThreads); 754 } 755 756 if (isRead) { 757 System.out.println("Starting to read data..."); 758 readerThreads.start(startKey, endKey, numReaderThreads); 759 } 760 761 if (isWrite) { 762 writerThreads.waitForFinish(); 763 } 764 765 if (isUpdate) { 766 updaterThreads.waitForFinish(); 767 } 768 769 if (isRead) { 770 readerThreads.waitForFinish(); 771 } 772 773 boolean success = true; 774 if (isWrite) { 775 success = success && writerThreads.getNumWriteFailures() == 0; 776 } 777 if (isUpdate) { 778 success = success && updaterThreads.getNumWriteFailures() == 0; 779 } 780 if (isRead) { 781 success = success && readerThreads.getNumReadErrors() == 0 782 && readerThreads.getNumReadFailures() == 0; 783 } 784 return success ? EXIT_SUCCESS : EXIT_FAILURE; 785 } 786 787 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException { 788 try { 789 Class<?> clazz = Class.forName(clazzName); 790 Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class, 791 byte[][].class); 792 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, 793 minColsPerKey, maxColsPerKey, families); 794 } catch (Exception e) { 795 throw new IOException(e); 796 } 797 } 798 799 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName 800 , LoadTestDataGenerator dataGen) throws IOException { 801 try { 802 Class<?> clazz = Class.forName(clazzName); 803 Constructor<?> constructor = clazz.getConstructor( 804 LoadTestDataGenerator.class, Configuration.class, TableName.class); 805 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName); 806 } catch (Exception e) { 807 throw new IOException(e); 808 } 809 } 810 811 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName 812 , LoadTestDataGenerator dataGen) throws IOException { 813 try { 814 Class<?> clazz = Class.forName(clazzName); 815 Constructor<?> constructor = clazz.getConstructor( 816 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); 817 return (MultiThreadedUpdater) constructor.newInstance( 818 dataGen, conf, tableName, updatePercent); 819 } catch (Exception e) { 820 throw new IOException(e); 821 } 822 } 823 824 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName 825 , LoadTestDataGenerator dataGen) throws IOException { 826 try { 827 Class<?> clazz = Class.forName(clazzName); 828 Constructor<?> constructor = clazz.getConstructor( 829 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); 830 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent); 831 } catch (Exception e) { 832 throw new IOException(e); 833 } 834 } 835 836 public static void main(String[] args) { 837 new LoadTestTool().doStaticMain(args); 838 } 839 840 /** 841 * When NUM_TABLES is specified, the function starts multiple worker threads 842 * which individually start a LoadTestTool instance to load a table. Each 843 * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2" 844 * , table names will be "test_1", "test_2" 845 * 846 * @throws IOException if one of the load tasks is unable to complete 847 */ 848 private int parallelLoadTables() 849 throws IOException { 850 // create new command args 851 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); 852 String[] newArgs = null; 853 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { 854 newArgs = new String[cmdLineArgs.length + 2]; 855 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; 856 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME; 857 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length); 858 } else { 859 newArgs = cmdLineArgs; 860 } 861 862 int tableNameValueIndex = -1; 863 for (int j = 0; j < newArgs.length; j++) { 864 if (newArgs[j].endsWith(OPT_TABLE_NAME)) { 865 tableNameValueIndex = j + 1; 866 } else if (newArgs[j].endsWith(NUM_TABLES)) { 867 // change NUM_TABLES to 1 so that each worker loads one table 868 newArgs[j + 1] = "1"; 869 } 870 } 871 872 // starting to load multiple tables 873 List<WorkerThread> workers = new ArrayList<>(); 874 for (int i = 0; i < numTables; i++) { 875 String[] workerArgs = newArgs.clone(); 876 workerArgs[tableNameValueIndex] = tableName + "_" + (i+1); 877 WorkerThread worker = new WorkerThread(i, workerArgs); 878 workers.add(worker); 879 LOG.info(worker + " starting"); 880 worker.start(); 881 } 882 883 // wait for all workers finish 884 LOG.info("Waiting for worker threads to finish"); 885 for (WorkerThread t : workers) { 886 try { 887 t.join(); 888 } catch (InterruptedException ie) { 889 IOException iie = new InterruptedIOException(); 890 iie.initCause(ie); 891 throw iie; 892 } 893 checkForErrors(); 894 } 895 896 return EXIT_SUCCESS; 897 } 898 899 // If an exception is thrown by one of worker threads, it will be 900 // stored here. 901 protected AtomicReference<Throwable> thrown = new AtomicReference<>(); 902 903 private void workerThreadError(Throwable t) { 904 thrown.compareAndSet(null, t); 905 } 906 907 /** 908 * Check for errors in the writer threads. If any is found, rethrow it. 909 */ 910 private void checkForErrors() throws IOException { 911 Throwable thrown = this.thrown.get(); 912 if (thrown == null) return; 913 if (thrown instanceof IOException) { 914 throw (IOException) thrown; 915 } else { 916 throw new RuntimeException(thrown); 917 } 918 } 919 920 class WorkerThread extends Thread { 921 private String[] workerArgs; 922 923 WorkerThread(int i, String[] args) { 924 super("WorkerThread-" + i); 925 workerArgs = args; 926 } 927 928 @Override 929 public void run() { 930 try { 931 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); 932 if (ret != 0) { 933 throw new RuntimeException("LoadTestTool exit with non-zero return code."); 934 } 935 } catch (Exception ex) { 936 LOG.error("Error in worker thread", ex); 937 workerThreadError(ex); 938 } 939 } 940 } 941 942 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner, 943 String userList) throws IOException { 944 List<String> users = new ArrayList<>(Arrays.asList(userList.split(","))); 945 users.add(owner); 946 for (String user : users) { 947 String keyTabFileConfKey = "hbase." + user + ".keytab.file"; 948 String principalConfKey = "hbase." + user + ".kerberos.principal"; 949 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) { 950 throw new IOException("Authentication configs missing for user : " + user); 951 } 952 } 953 for (String key : authConfig.stringPropertyNames()) { 954 conf.set(key, authConfig.getProperty(key)); 955 } 956 LOG.debug("Added authentication properties to config successfully."); 957 } 958 959}