001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.lang.reflect.Constructor; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.atomic.AtomicReference; 028import javax.crypto.spec.SecretKeySpec; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Durability; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.crypto.Cipher; 043import org.apache.hadoop.hbase.io.crypto.Encryption; 044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 045import org.apache.hadoop.hbase.log.HBaseMarkers; 046import org.apache.hadoop.hbase.logging.Log4jUtils; 047import org.apache.hadoop.hbase.regionserver.BloomType; 048import org.apache.hadoop.hbase.security.EncryptionUtil; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.security.access.AccessControlClient; 051import org.apache.hadoop.hbase.security.access.Permission; 052import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 053import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; 054import org.apache.hadoop.util.ToolRunner; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.apache.zookeeper.ZooKeeper; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException; 061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 062import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 063import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 064import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 065import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 067 068/** 069 * A command-line utility that reads, writes, and verifies data. Unlike 070 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, and 071 * supports simultaneously writing and reading the same set of keys. 072 */ 073@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 074public class LoadTestTool extends AbstractHBaseTool { 075 076 private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class); 077 private static final String COLON = ":"; 078 079 /** Table name for the test */ 080 private TableName tableName; 081 082 /** Column families for the test */ 083 private byte[][] families; 084 /** Column family used by the test */ 085 private static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf"); 086 /** Column families used by the test */ 087 private static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY }; 088 089 /** Table name to use of not overridden on the command line */ 090 protected static final String DEFAULT_TABLE_NAME = "cluster_test"; 091 092 /** The default data size if not specified */ 093 protected static final int DEFAULT_DATA_SIZE = 64; 094 095 /** The number of reader/writer threads if not specified */ 096 protected static final int DEFAULT_NUM_THREADS = 20; 097 098 /** Usage string for the load option */ 099 protected static final String OPT_USAGE_LOAD = 100 "<avg_cols_per_key>:<avg_data_size>" + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 101 102 /** Usage string for the read option */ 103 protected static final String OPT_USAGE_READ = 104 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; 105 106 /** Usage string for the update option */ 107 protected static final String OPT_USAGE_UPDATE = "<update_percent>[:<#threads=" 108 + DEFAULT_NUM_THREADS + ">][:<#whether to ignore nonce collisions=0>]"; 109 110 protected static final String OPT_USAGE_BLOOM = 111 "Bloom filter type, one of " + Arrays.toString(BloomType.values()); 112 113 protected static final String OPT_USAGE_COMPRESSION = 114 "Compression type, " + "one of " + Arrays.toString(Compression.Algorithm.values()); 115 116 protected static final String OPT_VERBOSE = "verbose"; 117 118 public static final String OPT_BLOOM = "bloom"; 119 public static final String OPT_BLOOM_PARAM = "bloom_param"; 120 public static final String OPT_COMPRESSION = "compression"; 121 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; 122 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; 123 124 public static final String OPT_INMEMORY = "in_memory"; 125 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " 126 + "inmemory as far as possible. Not guaranteed that reads are always served from inmemory"; 127 128 public static final String OPT_GENERATOR = "generator"; 129 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." 130 + " Any args for this class can be passed as colon separated after class name"; 131 132 public static final String OPT_WRITER = "writer"; 133 public static final String OPT_WRITER_USAGE = "The class for executing the write requests"; 134 135 public static final String OPT_UPDATER = "updater"; 136 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests"; 137 138 public static final String OPT_READER = "reader"; 139 public static final String OPT_READER_USAGE = "The class for executing the read requests"; 140 141 protected static final String OPT_KEY_WINDOW = "key_window"; 142 protected static final String OPT_WRITE = "write"; 143 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; 144 public static final String OPT_MULTIPUT = "multiput"; 145 public static final String OPT_MULTIGET = "multiget_batchsize"; 146 protected static final String OPT_NUM_KEYS = "num_keys"; 147 protected static final String OPT_READ = "read"; 148 protected static final String OPT_START_KEY = "start_key"; 149 public static final String OPT_TABLE_NAME = "tn"; 150 public static final String OPT_COLUMN_FAMILIES = "families"; 151 protected static final String OPT_ZK_QUORUM = "zk"; 152 protected static final String OPT_ZK_PARENT_NODE = "zk_root"; 153 protected static final String OPT_SKIP_INIT = "skip_init"; 154 protected static final String OPT_INIT_ONLY = "init_only"; 155 protected static final String NUM_TABLES = "num_tables"; 156 protected static final String OPT_BATCHUPDATE = "batchupdate"; 157 protected static final String OPT_UPDATE = "update"; 158 159 public static final String OPT_ENCRYPTION = "encryption"; 160 protected static final String OPT_ENCRYPTION_USAGE = 161 "Enables transparent encryption on the test table, one of " 162 + Arrays.toString(Encryption.getSupportedCiphers()); 163 164 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; 165 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE = 166 "Desired number of regions per region server. Defaults to 5."; 167 public static final int DEFAULT_NUM_REGIONS_PER_SERVER = 5; 168 169 public static final String OPT_REGION_REPLICATION = "region_replication"; 170 protected static final String OPT_REGION_REPLICATION_USAGE = 171 "Desired number of replicas per region"; 172 173 public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; 174 protected static final String OPT_REGION_REPLICA_ID_USAGE = 175 "Region replica id to do the reads from"; 176 177 public static final String OPT_MOB_THRESHOLD = "mob_threshold"; 178 protected static final String OPT_MOB_THRESHOLD_USAGE = 179 "Desired cell size to exceed in bytes that will use the MOB write path"; 180 181 protected static final long DEFAULT_START_KEY = 0; 182 183 /** This will be removed as we factor out the dependency on command line */ 184 protected CommandLine cmd; 185 186 protected MultiThreadedWriter writerThreads = null; 187 protected MultiThreadedReader readerThreads = null; 188 protected MultiThreadedUpdater updaterThreads = null; 189 190 protected long startKey, endKey; 191 192 protected boolean isVerbose, isWrite, isRead, isUpdate; 193 protected boolean deferredLogFlush; 194 195 // Column family options 196 protected DataBlockEncoding dataBlockEncodingAlgo; 197 protected Compression.Algorithm compressAlgo; 198 protected BloomType bloomType; 199 private boolean inMemoryCF; 200 201 private User userOwner; 202 // Writer options 203 protected int numWriterThreads = DEFAULT_NUM_THREADS; 204 protected int minColsPerKey, maxColsPerKey; 205 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE; 206 protected boolean isMultiPut; 207 208 // Updater options 209 protected int numUpdaterThreads = DEFAULT_NUM_THREADS; 210 protected int updatePercent; 211 protected boolean ignoreConflicts = false; 212 protected boolean isBatchUpdate; 213 214 // Reader options 215 private int numReaderThreads = DEFAULT_NUM_THREADS; 216 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; 217 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; 218 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; 219 private int verifyPercent; 220 221 private int numTables = 1; 222 223 private String superUser; 224 225 private String userNames; 226 // This file is used to read authentication information in secure clusters. 227 private String authnFileName; 228 229 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 230 private int regionReplication = -1; // not set 231 private int regionReplicaId = -1; // not set 232 233 private int mobThreshold = -1; // not set 234 235 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, 236 // console tool itself should only be used from console. 237 protected boolean isSkipInit = false; 238 protected boolean isInitOnly = false; 239 240 protected Cipher cipher = null; 241 242 protected String[] splitColonSeparated(String option, int minNumCols, int maxNumCols) { 243 String optVal = cmd.getOptionValue(option); 244 String[] cols = optVal.split(COLON); 245 if (cols.length < minNumCols || cols.length > maxNumCols) { 246 throw new IllegalArgumentException( 247 "Expected at least " + minNumCols + " columns but no more than " + maxNumCols 248 + " in the colon-separated value '" + optVal + "' of the " + "-" + option + " option"); 249 } 250 return cols; 251 } 252 253 protected int getNumThreads(String numThreadsStr) { 254 return parseInt(numThreadsStr, 1, Short.MAX_VALUE); 255 } 256 257 public byte[][] getColumnFamilies() { 258 return families; 259 } 260 261 /** 262 * Apply column family options such as Bloom filters, compression, and data block encoding. 263 */ 264 protected void applyColumnFamilyOptions(TableName tableName, byte[][] columnFamilies) 265 throws IOException { 266 try (Connection conn = ConnectionFactory.createConnection(conf); 267 Admin admin = conn.getAdmin()) { 268 TableDescriptor tableDesc = admin.getDescriptor(tableName); 269 LOG.info("Disabling table " + tableName); 270 admin.disableTable(tableName); 271 for (byte[] cf : columnFamilies) { 272 ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf); 273 boolean isNewCf = columnDesc == null; 274 ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf 275 ? ColumnFamilyDescriptorBuilder.newBuilder(cf) 276 : ColumnFamilyDescriptorBuilder.newBuilder(columnDesc); 277 if (bloomType != null) { 278 columnDescBuilder.setBloomFilterType(bloomType); 279 } 280 if (compressAlgo != null) { 281 columnDescBuilder.setCompressionType(compressAlgo); 282 } 283 if (dataBlockEncodingAlgo != null) { 284 columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo); 285 } 286 if (inMemoryCF) { 287 columnDescBuilder.setInMemory(inMemoryCF); 288 } 289 if (cipher != null) { 290 byte[] keyBytes = new byte[cipher.getKeyLength()]; 291 Bytes.secureRandom(keyBytes); 292 columnDescBuilder.setEncryptionType(cipher.getName()); 293 columnDescBuilder.setEncryptionKey(EncryptionUtil.wrapKey(conf, 294 User.getCurrent().getShortName(), new SecretKeySpec(keyBytes, cipher.getName()))); 295 } 296 if (mobThreshold >= 0) { 297 columnDescBuilder.setMobEnabled(true); 298 columnDescBuilder.setMobThreshold(mobThreshold); 299 } 300 301 if (isNewCf) { 302 admin.addColumnFamily(tableName, columnDescBuilder.build()); 303 } else { 304 admin.modifyColumnFamily(tableName, columnDescBuilder.build()); 305 } 306 } 307 LOG.info("Enabling table " + tableName); 308 admin.enableTable(tableName); 309 } 310 } 311 312 @Override 313 protected void addOptions() { 314 addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper"); 315 addOptWithArg(OPT_ZK_QUORUM, 316 "ZK quorum as comma-separated host names " + "without port numbers"); 317 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); 318 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); 319 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma"); 320 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); 321 addOptWithArg(OPT_READ, OPT_USAGE_READ); 322 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); 323 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); 324 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); 325 addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type"); 326 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); 327 addOptWithArg(LoadTestUtil.OPT_DATA_BLOCK_ENCODING, LoadTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); 328 addOptWithArg(OPT_MAX_READ_ERRORS, 329 "The maximum number of read errors " 330 + "to tolerate before terminating all reader threads. The default is " 331 + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); 332 addOptWithArg(OPT_MULTIGET, 333 "Whether to use multi-gets as opposed to " + "separate gets for every column in a row"); 334 addOptWithArg(OPT_KEY_WINDOW, 335 "The 'key window' to maintain between " 336 + "reads and writes for concurrent write/read workload. The default " + "is " 337 + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); 338 339 addOptNoArg(OPT_MULTIPUT, 340 "Whether to use multi-puts as opposed to " + "separate puts for every column in a row"); 341 addOptNoArg(OPT_BATCHUPDATE, 342 "Whether to use batch as opposed to " + "separate updates for every column in a row"); 343 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); 344 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); 345 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE); 346 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE); 347 addOptWithArg(OPT_READER, OPT_READER_USAGE); 348 349 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); 350 addOptWithArg(OPT_START_KEY, "The first key to read/write " 351 + "(a 0-based index). The default value is " + DEFAULT_START_KEY + "."); 352 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists"); 353 354 addOptWithArg(NUM_TABLES, 355 "A positive integer number. When a number n is specified, load test " 356 + "tool will load n table parallely. -tn parameter value becomes " 357 + "table name prefix. Each table name is in format <tn>_1...<tn>_n"); 358 359 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE); 360 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); 361 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); 362 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); 363 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); 364 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE); 365 } 366 367 @Override 368 protected CommandLineParser newParser() { 369 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves 370 // Validate in parse() to get helpful error messages instead of exploding in processOptions() 371 return new DefaultParser() { 372 @Override 373 public CommandLine parse(Options opts, String[] args, Properties props, boolean stop) 374 throws ParseException { 375 CommandLine cl = super.parse(opts, args, props, stop); 376 377 boolean isReadWriteUpdate = 378 cmd.hasOption(OPT_READ) || cmd.hasOption(OPT_WRITE) || cmd.hasOption(OPT_UPDATE); 379 boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 380 381 if (!isInitOnly && !isReadWriteUpdate) { 382 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY 383 + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 384 } 385 386 if (isInitOnly && isReadWriteUpdate) { 387 throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -" 388 + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE); 389 } 390 391 if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) { 392 throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode."); 393 } 394 395 return cl; 396 } 397 }; 398 } 399 400 @Override 401 protected void processOptions(CommandLine cmd) { 402 this.cmd = cmd; 403 404 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME)); 405 406 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) { 407 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(","); 408 families = new byte[list.length][]; 409 for (int i = 0; i < list.length; i++) { 410 families[i] = Bytes.toBytes(list[i]); 411 } 412 } else { 413 families = DEFAULT_COLUMN_FAMILIES; 414 } 415 416 isVerbose = cmd.hasOption(OPT_VERBOSE); 417 isWrite = cmd.hasOption(OPT_WRITE); 418 isRead = cmd.hasOption(OPT_READ); 419 isUpdate = cmd.hasOption(OPT_UPDATE); 420 isInitOnly = cmd.hasOption(OPT_INIT_ONLY); 421 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH); 422 423 if (!isInitOnly) { 424 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, String.valueOf(DEFAULT_START_KEY)), 0, 425 Long.MAX_VALUE); 426 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, Long.MAX_VALUE - startKey); 427 endKey = startKey + numKeys; 428 isSkipInit = cmd.hasOption(OPT_SKIP_INIT); 429 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]"); 430 } 431 432 parseColumnFamilyOptions(cmd); 433 434 if (isWrite) { 435 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); 436 437 int colIndex = 0; 438 minColsPerKey = 1; 439 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]); 440 int avgColDataSize = parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); 441 minColDataSize = avgColDataSize / 2; 442 maxColDataSize = avgColDataSize * 3 / 2; 443 444 if (colIndex < writeOpts.length) { 445 numWriterThreads = getNumThreads(writeOpts[colIndex++]); 446 } 447 448 isMultiPut = cmd.hasOption(OPT_MULTIPUT); 449 450 mobThreshold = -1; 451 if (cmd.hasOption(OPT_MOB_THRESHOLD)) { 452 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD)); 453 } 454 455 System.out.println("Multi-puts: " + isMultiPut); 456 System.out.println("Columns per key: " + minColsPerKey + ".." + maxColsPerKey); 457 System.out.println("Data size per column: " + minColDataSize + ".." + maxColDataSize); 458 } 459 460 if (isUpdate) { 461 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3); 462 int colIndex = 0; 463 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100); 464 if (colIndex < mutateOpts.length) { 465 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]); 466 } 467 if (colIndex < mutateOpts.length) { 468 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1; 469 } 470 471 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE); 472 473 System.out.println("Batch updates: " + isBatchUpdate); 474 System.out.println("Percent of keys to update: " + updatePercent); 475 System.out.println("Updater threads: " + numUpdaterThreads); 476 System.out.println("Ignore nonce conflicts: " + ignoreConflicts); 477 } 478 479 if (isRead) { 480 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); 481 int colIndex = 0; 482 verifyPercent = parseInt(readOpts[colIndex++], 0, 100); 483 if (colIndex < readOpts.length) { 484 numReaderThreads = getNumThreads(readOpts[colIndex++]); 485 } 486 487 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { 488 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 0, Integer.MAX_VALUE); 489 } 490 491 if (cmd.hasOption(OPT_KEY_WINDOW)) { 492 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 0, Integer.MAX_VALUE); 493 } 494 495 if (cmd.hasOption(OPT_MULTIGET)) { 496 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 0, Integer.MAX_VALUE); 497 } 498 499 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize); 500 System.out.println("Percent of keys to verify: " + verifyPercent); 501 System.out.println("Reader threads: " + numReaderThreads); 502 } 503 504 numTables = 1; 505 if (cmd.hasOption(NUM_TABLES)) { 506 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); 507 } 508 509 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; 510 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) { 511 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER)); 512 } 513 514 regionReplication = 1; 515 if (cmd.hasOption(OPT_REGION_REPLICATION)) { 516 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); 517 } 518 519 regionReplicaId = -1; 520 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { 521 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); 522 } 523 } 524 525 private void parseColumnFamilyOptions(CommandLine cmd) { 526 String dataBlockEncodingStr = cmd.getOptionValue(LoadTestUtil.OPT_DATA_BLOCK_ENCODING); 527 dataBlockEncodingAlgo = 528 dataBlockEncodingStr == null ? null : DataBlockEncoding.valueOf(dataBlockEncodingStr); 529 530 String compressStr = cmd.getOptionValue(OPT_COMPRESSION); 531 compressAlgo = 532 compressStr == null ? Compression.Algorithm.NONE : Compression.Algorithm.valueOf(compressStr); 533 534 String bloomStr = cmd.getOptionValue(OPT_BLOOM); 535 bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr); 536 537 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 538 if (!cmd.hasOption(OPT_BLOOM_PARAM)) { 539 LOG.error("the parameter of bloom filter {} is not specified", bloomType.name()); 540 } else { 541 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); 542 } 543 } 544 545 inMemoryCF = cmd.hasOption(OPT_INMEMORY); 546 if (cmd.hasOption(OPT_ENCRYPTION)) { 547 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); 548 } 549 550 } 551 552 public void initTestTable() throws IOException { 553 Durability durability = Durability.USE_DEFAULT; 554 if (deferredLogFlush) { 555 durability = Durability.ASYNC_WAL; 556 } 557 558 LoadTestUtil.createPreSplitLoadTestTable(conf, tableName, getColumnFamilies(), compressAlgo, 559 dataBlockEncodingAlgo, numRegionsPerServer, regionReplication, durability); 560 applyColumnFamilyOptions(tableName, getColumnFamilies()); 561 } 562 563 @Override 564 protected int doWork() throws IOException { 565 if (!isVerbose) { 566 Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN"); 567 } 568 if (numTables > 1) { 569 return parallelLoadTables(); 570 } else { 571 return loadTable(); 572 } 573 } 574 575 protected int loadTable() throws IOException { 576 if (cmd.hasOption(OPT_ZK_QUORUM)) { 577 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); 578 } 579 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) { 580 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE)); 581 } 582 583 if (isInitOnly) { 584 LOG.info("Initializing only; no reads or writes"); 585 initTestTable(); 586 return 0; 587 } 588 589 if (!isSkipInit) { 590 initTestTable(); 591 } 592 LoadTestDataGenerator dataGen = null; 593 if (cmd.hasOption(OPT_GENERATOR)) { 594 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); 595 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); 596 String[] args; 597 if (dataGen instanceof LoadTestDataGeneratorWithACL) { 598 LOG.info("Using LoadTestDataGeneratorWithACL"); 599 if (User.isHBaseSecurityEnabled(conf)) { 600 LOG.info("Security is enabled"); 601 authnFileName = clazzAndArgs[1]; 602 superUser = clazzAndArgs[2]; 603 userNames = clazzAndArgs[3]; 604 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length); 605 Properties authConfig = new Properties(); 606 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName)); 607 try { 608 addAuthInfoToConf(authConfig, conf, superUser, userNames); 609 } catch (IOException exp) { 610 LOG.error(exp.toString(), exp); 611 return EXIT_FAILURE; 612 } 613 userOwner = User.create(KerberosUtils.loginAndReturnUGI(conf, superUser)); 614 } else { 615 superUser = clazzAndArgs[1]; 616 userNames = clazzAndArgs[2]; 617 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 618 userOwner = User.createUserForTesting(conf, superUser, new String[0]); 619 } 620 } else { 621 args = clazzAndArgs.length == 1 622 ? new String[0] 623 : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); 624 } 625 dataGen.initialize(args); 626 } else { 627 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator 628 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, 629 minColsPerKey, maxColsPerKey, families); 630 } 631 632 if (userOwner != null) { 633 LOG.info("Granting permissions for user " + userOwner.getShortName()); 634 Permission.Action[] actions = { Permission.Action.ADMIN, Permission.Action.CREATE, 635 Permission.Action.READ, Permission.Action.WRITE }; 636 try { 637 AccessControlClient.grant(ConnectionFactory.createConnection(conf), tableName, 638 userOwner.getShortName(), null, null, actions); 639 } catch (Throwable e) { 640 LOG.error(HBaseMarkers.FATAL, 641 "Error in granting permission for the user " + userOwner.getShortName(), e); 642 return EXIT_FAILURE; 643 } 644 } 645 646 if (userNames != null) { 647 // This will be comma separated list of expressions. 648 String users[] = userNames.split(","); 649 User user = null; 650 for (String userStr : users) { 651 if (User.isHBaseSecurityEnabled(conf)) { 652 user = User.create(KerberosUtils.loginAndReturnUGI(conf, userStr)); 653 } else { 654 user = User.createUserForTesting(conf, userStr, new String[0]); 655 } 656 } 657 } 658 659 if (isWrite) { 660 if (userOwner != null) { 661 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); 662 } else { 663 String writerClass = null; 664 if (cmd.hasOption(OPT_WRITER)) { 665 writerClass = cmd.getOptionValue(OPT_WRITER); 666 } else { 667 writerClass = MultiThreadedWriter.class.getCanonicalName(); 668 } 669 670 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen); 671 } 672 writerThreads.setMultiPut(isMultiPut); 673 } 674 675 if (isUpdate) { 676 if (userOwner != null) { 677 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, 678 userOwner, userNames); 679 } else { 680 String updaterClass = null; 681 if (cmd.hasOption(OPT_UPDATER)) { 682 updaterClass = cmd.getOptionValue(OPT_UPDATER); 683 } else { 684 updaterClass = MultiThreadedUpdater.class.getCanonicalName(); 685 } 686 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen); 687 } 688 updaterThreads.setBatchUpdate(isBatchUpdate); 689 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); 690 } 691 692 if (isRead) { 693 if (userOwner != null) { 694 readerThreads = 695 new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames); 696 } else { 697 String readerClass = null; 698 if (cmd.hasOption(OPT_READER)) { 699 readerClass = cmd.getOptionValue(OPT_READER); 700 } else { 701 readerClass = MultiThreadedReader.class.getCanonicalName(); 702 } 703 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); 704 } 705 readerThreads.setMaxErrors(maxReadErrors); 706 readerThreads.setKeyWindow(keyWindow); 707 readerThreads.setMultiGetBatchSize(multiGetBatchSize); 708 readerThreads.setRegionReplicaId(regionReplicaId); 709 } 710 711 if (isUpdate && isWrite) { 712 LOG.info("Concurrent write/update workload: making updaters aware of the " + "write point"); 713 updaterThreads.linkToWriter(writerThreads); 714 } 715 716 if (isRead && (isUpdate || isWrite)) { 717 LOG.info("Concurrent write/read workload: making readers aware of the " + "write point"); 718 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads); 719 } 720 721 if (isWrite) { 722 System.out.println("Starting to write data..."); 723 writerThreads.start(startKey, endKey, numWriterThreads); 724 } 725 726 if (isUpdate) { 727 LOG.info("Starting to mutate data..."); 728 System.out.println("Starting to mutate data..."); 729 // TODO : currently append and increment operations not tested with tags 730 // Will update this after it is done 731 updaterThreads.start(startKey, endKey, numUpdaterThreads); 732 } 733 734 if (isRead) { 735 System.out.println("Starting to read data..."); 736 readerThreads.start(startKey, endKey, numReaderThreads); 737 } 738 739 if (isWrite) { 740 writerThreads.waitForFinish(); 741 } 742 743 if (isUpdate) { 744 updaterThreads.waitForFinish(); 745 } 746 747 if (isRead) { 748 readerThreads.waitForFinish(); 749 } 750 751 boolean success = true; 752 if (isWrite) { 753 success = success && writerThreads.getNumWriteFailures() == 0; 754 } 755 if (isUpdate) { 756 success = success && updaterThreads.getNumWriteFailures() == 0; 757 } 758 if (isRead) { 759 success = 760 success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0; 761 } 762 return success ? EXIT_SUCCESS : EXIT_FAILURE; 763 } 764 765 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException { 766 try { 767 Class<?> clazz = Class.forName(clazzName); 768 Constructor<?> constructor = 769 clazz.getConstructor(int.class, int.class, int.class, int.class, byte[][].class); 770 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, 771 minColsPerKey, maxColsPerKey, families); 772 } catch (Exception e) { 773 throw new IOException(e); 774 } 775 } 776 777 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName, 778 LoadTestDataGenerator dataGen) throws IOException { 779 try { 780 Class<?> clazz = Class.forName(clazzName); 781 Constructor<?> constructor = 782 clazz.getConstructor(LoadTestDataGenerator.class, Configuration.class, TableName.class); 783 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName); 784 } catch (Exception e) { 785 throw new IOException(e); 786 } 787 } 788 789 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName, 790 LoadTestDataGenerator dataGen) throws IOException { 791 try { 792 Class<?> clazz = Class.forName(clazzName); 793 Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class, 794 Configuration.class, TableName.class, double.class); 795 return (MultiThreadedUpdater) constructor.newInstance(dataGen, conf, tableName, 796 updatePercent); 797 } catch (Exception e) { 798 throw new IOException(e); 799 } 800 } 801 802 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName, 803 LoadTestDataGenerator dataGen) throws IOException { 804 try { 805 Class<?> clazz = Class.forName(clazzName); 806 Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class, 807 Configuration.class, TableName.class, double.class); 808 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent); 809 } catch (Exception e) { 810 throw new IOException(e); 811 } 812 } 813 814 public static void main(String[] args) { 815 new LoadTestTool().doStaticMain(args); 816 } 817 818 /** 819 * When NUM_TABLES is specified, the function starts multiple worker threads which individually 820 * start a LoadTestTool instance to load a table. Each table name is in format <tn>_<index>. 821 * For example, "-tn test -num_tables 2" , table names will be "test_1", "test_2" 822 * @throws IOException if one of the load tasks is unable to complete 823 */ 824 private int parallelLoadTables() throws IOException { 825 // create new command args 826 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); 827 String[] newArgs = null; 828 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { 829 newArgs = new String[cmdLineArgs.length + 2]; 830 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; 831 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME; 832 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length); 833 } else { 834 newArgs = cmdLineArgs; 835 } 836 837 int tableNameValueIndex = -1; 838 for (int j = 0; j < newArgs.length; j++) { 839 if (newArgs[j].endsWith(OPT_TABLE_NAME)) { 840 tableNameValueIndex = j + 1; 841 } else if (newArgs[j].endsWith(NUM_TABLES)) { 842 // change NUM_TABLES to 1 so that each worker loads one table 843 newArgs[j + 1] = "1"; 844 } 845 } 846 847 // starting to load multiple tables 848 List<WorkerThread> workers = new ArrayList<>(); 849 for (int i = 0; i < numTables; i++) { 850 String[] workerArgs = newArgs.clone(); 851 workerArgs[tableNameValueIndex] = tableName + "_" + (i + 1); 852 WorkerThread worker = new WorkerThread(i, workerArgs); 853 workers.add(worker); 854 LOG.info(worker + " starting"); 855 worker.start(); 856 } 857 858 // wait for all workers finish 859 LOG.info("Waiting for worker threads to finish"); 860 for (WorkerThread t : workers) { 861 try { 862 t.join(); 863 } catch (InterruptedException ie) { 864 IOException iie = new InterruptedIOException(); 865 iie.initCause(ie); 866 throw iie; 867 } 868 checkForErrors(); 869 } 870 871 return EXIT_SUCCESS; 872 } 873 874 // If an exception is thrown by one of worker threads, it will be 875 // stored here. 876 protected AtomicReference<Throwable> thrown = new AtomicReference<>(); 877 878 private void workerThreadError(Throwable t) { 879 thrown.compareAndSet(null, t); 880 } 881 882 /** 883 * Check for errors in the writer threads. If any is found, rethrow it. 884 */ 885 private void checkForErrors() throws IOException { 886 Throwable thrown = this.thrown.get(); 887 if (thrown == null) return; 888 if (thrown instanceof IOException) { 889 throw (IOException) thrown; 890 } else { 891 throw new RuntimeException(thrown); 892 } 893 } 894 895 class WorkerThread extends Thread { 896 private String[] workerArgs; 897 898 WorkerThread(int i, String[] args) { 899 super("WorkerThread-" + i); 900 workerArgs = args; 901 } 902 903 @Override 904 public void run() { 905 try { 906 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); 907 if (ret != 0) { 908 throw new RuntimeException("LoadTestTool exit with non-zero return code."); 909 } 910 } catch (Exception ex) { 911 LOG.error("Error in worker thread", ex); 912 workerThreadError(ex); 913 } 914 } 915 } 916 917 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner, 918 String userList) throws IOException { 919 List<String> users = new ArrayList<>(Arrays.asList(userList.split(","))); 920 users.add(owner); 921 for (String user : users) { 922 String keyTabFileConfKey = "hbase." + user + ".keytab.file"; 923 String principalConfKey = "hbase." + user + ".kerberos.principal"; 924 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) { 925 throw new IOException("Authentication configs missing for user : " + user); 926 } 927 } 928 for (String key : authConfig.stringPropertyNames()) { 929 conf.set(key, authConfig.getProperty(key)); 930 } 931 LOG.debug("Added authentication properties to config successfully."); 932 } 933 934}