001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.lang.reflect.Constructor;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.atomic.AtomicReference;
028import javax.crypto.spec.SecretKeySpec;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Durability;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.crypto.Cipher;
043import org.apache.hadoop.hbase.io.crypto.Encryption;
044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
045import org.apache.hadoop.hbase.log.HBaseMarkers;
046import org.apache.hadoop.hbase.logging.Log4jUtils;
047import org.apache.hadoop.hbase.regionserver.BloomType;
048import org.apache.hadoop.hbase.security.EncryptionUtil;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.security.access.AccessControlClient;
051import org.apache.hadoop.hbase.security.access.Permission;
052import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
053import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
054import org.apache.hadoop.util.ToolRunner;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.apache.zookeeper.ZooKeeper;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException;
061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
062import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
063import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
067
068/**
069 * A command-line utility that reads, writes, and verifies data. Unlike
070 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, and
071 * supports simultaneously writing and reading the same set of keys.
072 */
073@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
074public class LoadTestTool extends AbstractHBaseTool {
075
076  private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class);
077  private static final String COLON = ":";
078
079  /** Table name for the test */
080  private TableName tableName;
081
082  /** Column families for the test */
083  private byte[][] families;
084  /** Column family used by the test */
085  private static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
086  /** Column families used by the test */
087  private static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
088
089  /** Table name to use of not overridden on the command line */
090  protected static final String DEFAULT_TABLE_NAME = "cluster_test";
091
092  /** The default data size if not specified */
093  protected static final int DEFAULT_DATA_SIZE = 64;
094
095  /** The number of reader/writer threads if not specified */
096  protected static final int DEFAULT_NUM_THREADS = 20;
097
098  /** Usage string for the load option */
099  protected static final String OPT_USAGE_LOAD =
100    "<avg_cols_per_key>:<avg_data_size>" + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
101
102  /** Usage string for the read option */
103  protected static final String OPT_USAGE_READ =
104    "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
105
106  /** Usage string for the update option */
107  protected static final String OPT_USAGE_UPDATE = "<update_percent>[:<#threads="
108    + DEFAULT_NUM_THREADS + ">][:<#whether to ignore nonce collisions=0>]";
109
110  protected static final String OPT_USAGE_BLOOM =
111    "Bloom filter type, one of " + Arrays.toString(BloomType.values());
112
113  protected static final String OPT_USAGE_COMPRESSION =
114    "Compression type, " + "one of " + Arrays.toString(Compression.Algorithm.values());
115
116  protected static final String OPT_VERBOSE = "verbose";
117
118  public static final String OPT_BLOOM = "bloom";
119  public static final String OPT_BLOOM_PARAM = "bloom_param";
120  public static final String OPT_COMPRESSION = "compression";
121  public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
122  public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
123
124  public static final String OPT_INMEMORY = "in_memory";
125  public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF "
126    + "inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
127
128  public static final String OPT_GENERATOR = "generator";
129  public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
130    + " Any args for this class can be passed as colon separated after class name";
131
132  public static final String OPT_WRITER = "writer";
133  public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
134
135  public static final String OPT_UPDATER = "updater";
136  public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
137
138  public static final String OPT_READER = "reader";
139  public static final String OPT_READER_USAGE = "The class for executing the read requests";
140
141  protected static final String OPT_KEY_WINDOW = "key_window";
142  protected static final String OPT_WRITE = "write";
143  protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
144  public static final String OPT_MULTIPUT = "multiput";
145  public static final String OPT_MULTIGET = "multiget_batchsize";
146  protected static final String OPT_NUM_KEYS = "num_keys";
147  protected static final String OPT_READ = "read";
148  protected static final String OPT_START_KEY = "start_key";
149  public static final String OPT_TABLE_NAME = "tn";
150  public static final String OPT_COLUMN_FAMILIES = "families";
151  protected static final String OPT_ZK_QUORUM = "zk";
152  protected static final String OPT_ZK_PARENT_NODE = "zk_root";
153  protected static final String OPT_SKIP_INIT = "skip_init";
154  protected static final String OPT_INIT_ONLY = "init_only";
155  protected static final String NUM_TABLES = "num_tables";
156  protected static final String OPT_BATCHUPDATE = "batchupdate";
157  protected static final String OPT_UPDATE = "update";
158
159  public static final String OPT_ENCRYPTION = "encryption";
160  protected static final String OPT_ENCRYPTION_USAGE =
161    "Enables transparent encryption on the test table, one of "
162      + Arrays.toString(Encryption.getSupportedCiphers());
163
164  public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
165  protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE =
166    "Desired number of regions per region server. Defaults to 5.";
167  public static final int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
168
169  public static final String OPT_REGION_REPLICATION = "region_replication";
170  protected static final String OPT_REGION_REPLICATION_USAGE =
171    "Desired number of replicas per region";
172
173  public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
174  protected static final String OPT_REGION_REPLICA_ID_USAGE =
175    "Region replica id to do the reads from";
176
177  public static final String OPT_TIMELINE_CONSISTENCY = "timeline";
178  protected static final String OPT_TIMELINE_CONSISTENCY_USAGE =
179    "Use TIMELINE consistency in read operations. Leave region_replica_id unset, otherwise it will override this setting.";
180
181  public static final String OPT_MOB_THRESHOLD = "mob_threshold";
182  protected static final String OPT_MOB_THRESHOLD_USAGE =
183    "Desired cell size to exceed in bytes that will use the MOB write path";
184
185  protected static final long DEFAULT_START_KEY = 0;
186
187  /** This will be removed as we factor out the dependency on command line */
188  protected CommandLine cmd;
189
190  protected MultiThreadedWriter writerThreads = null;
191  protected MultiThreadedReader readerThreads = null;
192  protected MultiThreadedUpdater updaterThreads = null;
193
194  protected long startKey, endKey;
195
196  protected boolean isVerbose, isWrite, isRead, isUpdate;
197  protected boolean deferredLogFlush;
198
199  // Column family options
200  protected DataBlockEncoding dataBlockEncodingAlgo;
201  protected Compression.Algorithm compressAlgo;
202  protected BloomType bloomType;
203  private boolean inMemoryCF;
204
205  private User userOwner;
206  // Writer options
207  protected int numWriterThreads = DEFAULT_NUM_THREADS;
208  protected int minColsPerKey, maxColsPerKey;
209  protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
210  protected boolean isMultiPut;
211
212  // Updater options
213  protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
214  protected int updatePercent;
215  protected boolean ignoreConflicts = false;
216  protected boolean isBatchUpdate;
217
218  // Reader options
219  private int numReaderThreads = DEFAULT_NUM_THREADS;
220  private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
221  private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
222  private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
223  private int verifyPercent;
224
225  private int numTables = 1;
226
227  private String superUser;
228
229  private String userNames;
230  // This file is used to read authentication information in secure clusters.
231  private String authnFileName;
232
233  private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
234  private int regionReplication = -1; // not set
235  private int regionReplicaId = -1; // not set
236  private boolean timelineConsistency = false;
237
238  private int mobThreshold = -1; // not set
239
240  // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
241  // console tool itself should only be used from console.
242  protected boolean isSkipInit = false;
243  protected boolean isInitOnly = false;
244
245  protected Cipher cipher = null;
246
247  protected String[] splitColonSeparated(String option, int minNumCols, int maxNumCols) {
248    String optVal = cmd.getOptionValue(option);
249    String[] cols = optVal.split(COLON);
250    if (cols.length < minNumCols || cols.length > maxNumCols) {
251      throw new IllegalArgumentException(
252        "Expected at least " + minNumCols + " columns but no more than " + maxNumCols
253          + " in the colon-separated value '" + optVal + "' of the " + "-" + option + " option");
254    }
255    return cols;
256  }
257
258  protected int getNumThreads(String numThreadsStr) {
259    return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
260  }
261
262  public byte[][] getColumnFamilies() {
263    return families;
264  }
265
266  /**
267   * Apply column family options such as Bloom filters, compression, and data block encoding.
268   */
269  protected void applyColumnFamilyOptions(TableName tableName, byte[][] columnFamilies)
270    throws IOException {
271    try (Connection conn = ConnectionFactory.createConnection(conf);
272      Admin admin = conn.getAdmin()) {
273      TableDescriptor tableDesc = admin.getDescriptor(tableName);
274      LOG.info("Disabling table " + tableName);
275      admin.disableTable(tableName);
276      for (byte[] cf : columnFamilies) {
277        ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf);
278        boolean isNewCf = columnDesc == null;
279        ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf
280          ? ColumnFamilyDescriptorBuilder.newBuilder(cf)
281          : ColumnFamilyDescriptorBuilder.newBuilder(columnDesc);
282        if (bloomType != null) {
283          columnDescBuilder.setBloomFilterType(bloomType);
284        }
285        if (compressAlgo != null) {
286          columnDescBuilder.setCompressionType(compressAlgo);
287        }
288        if (dataBlockEncodingAlgo != null) {
289          columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo);
290        }
291        if (inMemoryCF) {
292          columnDescBuilder.setInMemory(inMemoryCF);
293        }
294        if (cipher != null) {
295          byte[] keyBytes = new byte[cipher.getKeyLength()];
296          Bytes.secureRandom(keyBytes);
297          columnDescBuilder.setEncryptionType(cipher.getName());
298          columnDescBuilder.setEncryptionKey(EncryptionUtil.wrapKey(conf,
299            User.getCurrent().getShortName(), new SecretKeySpec(keyBytes, cipher.getName())));
300        }
301        if (mobThreshold >= 0) {
302          columnDescBuilder.setMobEnabled(true);
303          columnDescBuilder.setMobThreshold(mobThreshold);
304        }
305
306        if (isNewCf) {
307          admin.addColumnFamily(tableName, columnDescBuilder.build());
308        } else {
309          admin.modifyColumnFamily(tableName, columnDescBuilder.build());
310        }
311      }
312      LOG.info("Enabling table " + tableName);
313      admin.enableTable(tableName);
314    }
315  }
316
317  @Override
318  protected void addOptions() {
319    addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper");
320    addOptWithArg(OPT_ZK_QUORUM,
321      "ZK quorum as comma-separated host names " + "without port numbers");
322    addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
323    addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
324    addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
325    addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
326    addOptWithArg(OPT_READ, OPT_USAGE_READ);
327    addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
328    addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
329    addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
330    addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
331    addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
332    addOptWithArg(LoadTestUtil.OPT_DATA_BLOCK_ENCODING, LoadTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
333    addOptWithArg(OPT_MAX_READ_ERRORS,
334      "The maximum number of read errors "
335        + "to tolerate before terminating all reader threads. The default is "
336        + MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
337    addOptWithArg(OPT_MULTIGET,
338      "Whether to use multi-gets as opposed to " + "separate gets for every column in a row");
339    addOptWithArg(OPT_KEY_WINDOW,
340      "The 'key window' to maintain between "
341        + "reads and writes for concurrent write/read workload. The default " + "is "
342        + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
343
344    addOptNoArg(OPT_MULTIPUT,
345      "Whether to use multi-puts as opposed to " + "separate puts for every column in a row");
346    addOptNoArg(OPT_BATCHUPDATE,
347      "Whether to use batch as opposed to " + "separate updates for every column in a row");
348    addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
349    addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
350    addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
351    addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
352    addOptWithArg(OPT_READER, OPT_READER_USAGE);
353
354    addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
355    addOptWithArg(OPT_START_KEY, "The first key to read/write "
356      + "(a 0-based index). The default value is " + DEFAULT_START_KEY + ".");
357    addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists");
358
359    addOptWithArg(NUM_TABLES,
360      "A positive integer number. When a number n is specified, load test "
361        + "tool  will load n table parallely. -tn parameter value becomes "
362        + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
363
364    addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
365    addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
366    addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
367    addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
368    addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
369    addOptNoArg(OPT_TIMELINE_CONSISTENCY, OPT_TIMELINE_CONSISTENCY_USAGE);
370    addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
371  }
372
373  @Override
374  protected CommandLineParser newParser() {
375    // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
376    // Validate in parse() to get helpful error messages instead of exploding in processOptions()
377    return new DefaultParser() {
378      @Override
379      public CommandLine parse(Options opts, String[] args, Properties props, boolean stop)
380        throws ParseException {
381        CommandLine cl = super.parse(opts, args, props, stop);
382
383        boolean isReadWriteUpdate =
384          cmd.hasOption(OPT_READ) || cmd.hasOption(OPT_WRITE) || cmd.hasOption(OPT_UPDATE);
385        boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
386
387        if (!isInitOnly && !isReadWriteUpdate) {
388          throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
389            + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
390        }
391
392        if (isInitOnly && isReadWriteUpdate) {
393          throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -"
394            + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
395        }
396
397        if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) {
398          throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode.");
399        }
400
401        return cl;
402      }
403    };
404  }
405
406  @Override
407  protected void processOptions(CommandLine cmd) {
408    this.cmd = cmd;
409
410    tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME));
411
412    if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
413      String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
414      families = new byte[list.length][];
415      for (int i = 0; i < list.length; i++) {
416        families[i] = Bytes.toBytes(list[i]);
417      }
418    } else {
419      families = DEFAULT_COLUMN_FAMILIES;
420    }
421
422    isVerbose = cmd.hasOption(OPT_VERBOSE);
423    isWrite = cmd.hasOption(OPT_WRITE);
424    isRead = cmd.hasOption(OPT_READ);
425    isUpdate = cmd.hasOption(OPT_UPDATE);
426    isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
427    deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
428
429    if (!isInitOnly) {
430      startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, String.valueOf(DEFAULT_START_KEY)), 0,
431        Long.MAX_VALUE);
432      long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, Long.MAX_VALUE - startKey);
433      endKey = startKey + numKeys;
434      isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
435      System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
436    }
437
438    parseColumnFamilyOptions(cmd);
439
440    if (isWrite) {
441      String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
442
443      int colIndex = 0;
444      minColsPerKey = 1;
445      maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
446      int avgColDataSize = parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
447      minColDataSize = avgColDataSize / 2;
448      maxColDataSize = avgColDataSize * 3 / 2;
449
450      if (colIndex < writeOpts.length) {
451        numWriterThreads = getNumThreads(writeOpts[colIndex++]);
452      }
453
454      isMultiPut = cmd.hasOption(OPT_MULTIPUT);
455
456      mobThreshold = -1;
457      if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
458        mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
459      }
460
461      System.out.println("Multi-puts: " + isMultiPut);
462      System.out.println("Columns per key: " + minColsPerKey + ".." + maxColsPerKey);
463      System.out.println("Data size per column: " + minColDataSize + ".." + maxColDataSize);
464    }
465
466    if (isUpdate) {
467      String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
468      int colIndex = 0;
469      updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
470      if (colIndex < mutateOpts.length) {
471        numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
472      }
473      if (colIndex < mutateOpts.length) {
474        ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
475      }
476
477      isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
478
479      System.out.println("Batch updates: " + isBatchUpdate);
480      System.out.println("Percent of keys to update: " + updatePercent);
481      System.out.println("Updater threads: " + numUpdaterThreads);
482      System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
483    }
484
485    if (isRead) {
486      String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
487      int colIndex = 0;
488      verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
489      if (colIndex < readOpts.length) {
490        numReaderThreads = getNumThreads(readOpts[colIndex++]);
491      }
492
493      if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
494        maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 0, Integer.MAX_VALUE);
495      }
496
497      if (cmd.hasOption(OPT_KEY_WINDOW)) {
498        keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 0, Integer.MAX_VALUE);
499      }
500
501      if (cmd.hasOption(OPT_MULTIGET)) {
502        multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 0, Integer.MAX_VALUE);
503      }
504
505      System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
506      System.out.println("Percent of keys to verify: " + verifyPercent);
507      System.out.println("Reader threads: " + numReaderThreads);
508    }
509
510    numTables = 1;
511    if (cmd.hasOption(NUM_TABLES)) {
512      numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
513    }
514
515    numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
516    if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
517      numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
518    }
519
520    regionReplication = 1;
521    if (cmd.hasOption(OPT_REGION_REPLICATION)) {
522      regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
523    }
524
525    regionReplicaId = -1;
526    if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
527      regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
528    }
529
530    timelineConsistency = false;
531    if (cmd.hasOption(OPT_TIMELINE_CONSISTENCY)) {
532      timelineConsistency = true;
533    }
534  }
535
536  private void parseColumnFamilyOptions(CommandLine cmd) {
537    String dataBlockEncodingStr = cmd.getOptionValue(LoadTestUtil.OPT_DATA_BLOCK_ENCODING);
538    dataBlockEncodingAlgo =
539      dataBlockEncodingStr == null ? null : DataBlockEncoding.valueOf(dataBlockEncodingStr);
540
541    String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
542    compressAlgo =
543      compressStr == null ? Compression.Algorithm.NONE : Compression.Algorithm.valueOf(compressStr);
544
545    String bloomStr = cmd.getOptionValue(OPT_BLOOM);
546    bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr);
547
548    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
549      if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
550        LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
551      } else {
552        conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
553      }
554    }
555
556    inMemoryCF = cmd.hasOption(OPT_INMEMORY);
557    if (cmd.hasOption(OPT_ENCRYPTION)) {
558      cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
559    }
560
561  }
562
563  public void initTestTable() throws IOException {
564    Durability durability = Durability.USE_DEFAULT;
565    if (deferredLogFlush) {
566      durability = Durability.ASYNC_WAL;
567    }
568
569    LoadTestUtil.createPreSplitLoadTestTable(conf, tableName, getColumnFamilies(), compressAlgo,
570      dataBlockEncodingAlgo, numRegionsPerServer, regionReplication, durability);
571    applyColumnFamilyOptions(tableName, getColumnFamilies());
572  }
573
574  @Override
575  protected int doWork() throws IOException {
576    if (!isVerbose) {
577      Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN");
578    }
579    if (numTables > 1) {
580      return parallelLoadTables();
581    } else {
582      return loadTable();
583    }
584  }
585
586  protected int loadTable() throws IOException {
587    if (cmd.hasOption(OPT_ZK_QUORUM)) {
588      conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
589    }
590    if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
591      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
592    }
593
594    if (isInitOnly) {
595      LOG.info("Initializing only; no reads or writes");
596      initTestTable();
597      return 0;
598    }
599
600    if (!isSkipInit) {
601      initTestTable();
602    }
603    LoadTestDataGenerator dataGen = null;
604    if (cmd.hasOption(OPT_GENERATOR)) {
605      String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
606      dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
607      String[] args;
608      if (dataGen instanceof LoadTestDataGeneratorWithACL) {
609        LOG.info("Using LoadTestDataGeneratorWithACL");
610        if (User.isHBaseSecurityEnabled(conf)) {
611          LOG.info("Security is enabled");
612          authnFileName = clazzAndArgs[1];
613          superUser = clazzAndArgs[2];
614          userNames = clazzAndArgs[3];
615          args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
616          Properties authConfig = new Properties();
617          authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
618          try {
619            addAuthInfoToConf(authConfig, conf, superUser, userNames);
620          } catch (IOException exp) {
621            LOG.error(exp.toString(), exp);
622            return EXIT_FAILURE;
623          }
624          userOwner = User.create(KerberosUtils.loginAndReturnUGI(conf, superUser));
625        } else {
626          superUser = clazzAndArgs[1];
627          userNames = clazzAndArgs[2];
628          args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
629          userOwner = User.createUserForTesting(conf, superUser, new String[0]);
630        }
631      } else {
632        args = clazzAndArgs.length == 1
633          ? new String[0]
634          : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
635      }
636      dataGen.initialize(args);
637    } else {
638      // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
639      dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
640        minColsPerKey, maxColsPerKey, families);
641    }
642
643    if (userOwner != null) {
644      LOG.info("Granting permissions for user " + userOwner.getShortName());
645      Permission.Action[] actions = { Permission.Action.ADMIN, Permission.Action.CREATE,
646        Permission.Action.READ, Permission.Action.WRITE };
647      try {
648        AccessControlClient.grant(ConnectionFactory.createConnection(conf), tableName,
649          userOwner.getShortName(), null, null, actions);
650      } catch (Throwable e) {
651        LOG.error(HBaseMarkers.FATAL,
652          "Error in granting permission for the user " + userOwner.getShortName(), e);
653        return EXIT_FAILURE;
654      }
655    }
656
657    if (userNames != null) {
658      // This will be comma separated list of expressions.
659      String users[] = userNames.split(",");
660      User user = null;
661      for (String userStr : users) {
662        if (User.isHBaseSecurityEnabled(conf)) {
663          user = User.create(KerberosUtils.loginAndReturnUGI(conf, userStr));
664        } else {
665          user = User.createUserForTesting(conf, userStr, new String[0]);
666        }
667      }
668    }
669
670    if (isWrite) {
671      if (userOwner != null) {
672        writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
673      } else {
674        String writerClass = null;
675        if (cmd.hasOption(OPT_WRITER)) {
676          writerClass = cmd.getOptionValue(OPT_WRITER);
677        } else {
678          writerClass = MultiThreadedWriter.class.getCanonicalName();
679        }
680
681        writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
682      }
683      writerThreads.setMultiPut(isMultiPut);
684    }
685
686    if (isUpdate) {
687      if (userOwner != null) {
688        updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
689          userOwner, userNames);
690      } else {
691        String updaterClass = null;
692        if (cmd.hasOption(OPT_UPDATER)) {
693          updaterClass = cmd.getOptionValue(OPT_UPDATER);
694        } else {
695          updaterClass = MultiThreadedUpdater.class.getCanonicalName();
696        }
697        updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
698      }
699      updaterThreads.setBatchUpdate(isBatchUpdate);
700      updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
701    }
702
703    if (isRead) {
704      if (userOwner != null) {
705        readerThreads =
706          new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames);
707      } else {
708        String readerClass = null;
709        if (cmd.hasOption(OPT_READER)) {
710          readerClass = cmd.getOptionValue(OPT_READER);
711        } else {
712          readerClass = MultiThreadedReader.class.getCanonicalName();
713        }
714        readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
715      }
716      readerThreads.setMaxErrors(maxReadErrors);
717      readerThreads.setKeyWindow(keyWindow);
718      readerThreads.setMultiGetBatchSize(multiGetBatchSize);
719      readerThreads.setRegionReplicaId(regionReplicaId);
720      readerThreads.setTimelineConsistency(timelineConsistency);
721    }
722
723    if (isUpdate && isWrite) {
724      LOG.info("Concurrent write/update workload: making updaters aware of the " + "write point");
725      updaterThreads.linkToWriter(writerThreads);
726    }
727
728    if (isRead && (isUpdate || isWrite)) {
729      LOG.info("Concurrent write/read workload: making readers aware of the " + "write point");
730      readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
731    }
732
733    if (isWrite) {
734      System.out.println("Starting to write data...");
735      writerThreads.start(startKey, endKey, numWriterThreads);
736    }
737
738    if (isUpdate) {
739      LOG.info("Starting to mutate data...");
740      System.out.println("Starting to mutate data...");
741      // TODO : currently append and increment operations not tested with tags
742      // Will update this after it is done
743      updaterThreads.start(startKey, endKey, numUpdaterThreads);
744    }
745
746    if (isRead) {
747      System.out.println("Starting to read data...");
748      readerThreads.start(startKey, endKey, numReaderThreads);
749    }
750
751    if (isWrite) {
752      writerThreads.waitForFinish();
753    }
754
755    if (isUpdate) {
756      updaterThreads.waitForFinish();
757    }
758
759    if (isRead) {
760      readerThreads.waitForFinish();
761    }
762
763    boolean success = true;
764    if (isWrite) {
765      success = success && writerThreads.getNumWriteFailures() == 0;
766    }
767    if (isUpdate) {
768      success = success && updaterThreads.getNumWriteFailures() == 0;
769    }
770    if (isRead) {
771      success =
772        success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0;
773    }
774    return success ? EXIT_SUCCESS : EXIT_FAILURE;
775  }
776
777  private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
778    try {
779      Class<?> clazz = Class.forName(clazzName);
780      Constructor<?> constructor =
781        clazz.getConstructor(int.class, int.class, int.class, int.class, byte[][].class);
782      return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
783        minColsPerKey, maxColsPerKey, families);
784    } catch (Exception e) {
785      throw new IOException(e);
786    }
787  }
788
789  private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName,
790    LoadTestDataGenerator dataGen) throws IOException {
791    try {
792      Class<?> clazz = Class.forName(clazzName);
793      Constructor<?> constructor =
794        clazz.getConstructor(LoadTestDataGenerator.class, Configuration.class, TableName.class);
795      return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
796    } catch (Exception e) {
797      throw new IOException(e);
798    }
799  }
800
801  private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName,
802    LoadTestDataGenerator dataGen) throws IOException {
803    try {
804      Class<?> clazz = Class.forName(clazzName);
805      Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class,
806        Configuration.class, TableName.class, double.class);
807      return (MultiThreadedUpdater) constructor.newInstance(dataGen, conf, tableName,
808        updatePercent);
809    } catch (Exception e) {
810      throw new IOException(e);
811    }
812  }
813
814  private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName,
815    LoadTestDataGenerator dataGen) throws IOException {
816    try {
817      Class<?> clazz = Class.forName(clazzName);
818      Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class,
819        Configuration.class, TableName.class, double.class);
820      return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
821    } catch (Exception e) {
822      throw new IOException(e);
823    }
824  }
825
826  public static void main(String[] args) {
827    new LoadTestTool().doStaticMain(args);
828  }
829
830  /**
831   * When NUM_TABLES is specified, the function starts multiple worker threads which individually
832   * start a LoadTestTool instance to load a table. Each table name is in format &lt;tn>_&lt;index>.
833   * For example, "-tn test -num_tables 2" , table names will be "test_1", "test_2"
834   * @throws IOException if one of the load tasks is unable to complete
835   */
836  private int parallelLoadTables() throws IOException {
837    // create new command args
838    String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
839    String[] newArgs = null;
840    if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
841      newArgs = new String[cmdLineArgs.length + 2];
842      newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
843      newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
844      System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
845    } else {
846      newArgs = cmdLineArgs;
847    }
848
849    int tableNameValueIndex = -1;
850    for (int j = 0; j < newArgs.length; j++) {
851      if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
852        tableNameValueIndex = j + 1;
853      } else if (newArgs[j].endsWith(NUM_TABLES)) {
854        // change NUM_TABLES to 1 so that each worker loads one table
855        newArgs[j + 1] = "1";
856      }
857    }
858
859    // starting to load multiple tables
860    List<WorkerThread> workers = new ArrayList<>();
861    for (int i = 0; i < numTables; i++) {
862      String[] workerArgs = newArgs.clone();
863      workerArgs[tableNameValueIndex] = tableName + "_" + (i + 1);
864      WorkerThread worker = new WorkerThread(i, workerArgs);
865      workers.add(worker);
866      LOG.info(worker + " starting");
867      worker.start();
868    }
869
870    // wait for all workers finish
871    LOG.info("Waiting for worker threads to finish");
872    for (WorkerThread t : workers) {
873      try {
874        t.join();
875      } catch (InterruptedException ie) {
876        IOException iie = new InterruptedIOException();
877        iie.initCause(ie);
878        throw iie;
879      }
880      checkForErrors();
881    }
882
883    return EXIT_SUCCESS;
884  }
885
886  // If an exception is thrown by one of worker threads, it will be
887  // stored here.
888  protected AtomicReference<Throwable> thrown = new AtomicReference<>();
889
890  private void workerThreadError(Throwable t) {
891    thrown.compareAndSet(null, t);
892  }
893
894  /**
895   * Check for errors in the writer threads. If any is found, rethrow it.
896   */
897  private void checkForErrors() throws IOException {
898    Throwable thrown = this.thrown.get();
899    if (thrown == null) return;
900    if (thrown instanceof IOException) {
901      throw (IOException) thrown;
902    } else {
903      throw new RuntimeException(thrown);
904    }
905  }
906
907  class WorkerThread extends Thread {
908    private String[] workerArgs;
909
910    WorkerThread(int i, String[] args) {
911      super("WorkerThread-" + i);
912      workerArgs = args;
913    }
914
915    @Override
916    public void run() {
917      try {
918        int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
919        if (ret != 0) {
920          throw new RuntimeException("LoadTestTool exit with non-zero return code.");
921        }
922      } catch (Exception ex) {
923        LOG.error("Error in worker thread", ex);
924        workerThreadError(ex);
925      }
926    }
927  }
928
929  private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
930    String userList) throws IOException {
931    List<String> users = new ArrayList<>(Arrays.asList(userList.split(",")));
932    users.add(owner);
933    for (String user : users) {
934      String keyTabFileConfKey = "hbase." + user + ".keytab.file";
935      String principalConfKey = "hbase." + user + ".kerberos.principal";
936      if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
937        throw new IOException("Authentication configs missing for user : " + user);
938      }
939    }
940    for (String key : authConfig.stringPropertyNames()) {
941      conf.set(key, authConfig.getProperty(key));
942    }
943    LOG.debug("Added authentication properties to config successfully.");
944  }
945
946}