001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.lang.reflect.Constructor;
022import java.security.SecureRandom;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.atomic.AtomicReference;
028import javax.crypto.spec.SecretKeySpec;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Durability;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.io.compress.Compression;
043import org.apache.hadoop.hbase.io.crypto.Cipher;
044import org.apache.hadoop.hbase.io.crypto.Encryption;
045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
046import org.apache.hadoop.hbase.log.HBaseMarkers;
047import org.apache.hadoop.hbase.logging.Log4jUtils;
048import org.apache.hadoop.hbase.regionserver.BloomType;
049import org.apache.hadoop.hbase.security.EncryptionUtil;
050import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.AccessControlClient;
053import org.apache.hadoop.hbase.security.access.Permission;
054import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
055import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
056import org.apache.hadoop.util.ToolRunner;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.apache.zookeeper.ZooKeeper;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException;
063import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
067import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
069
070/**
071 * A command-line utility that reads, writes, and verifies data. Unlike
072 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written,
073 * and supports simultaneously writing and reading the same set of keys.
074 */
075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
076public class LoadTestTool extends AbstractHBaseTool {
077
078  private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class);
079  private static final String COLON = ":";
080
081  /** Table name for the test */
082  private TableName tableName;
083
084  /** Column families for the test */
085  private byte[][] families;
086
087  /** Table name to use of not overridden on the command line */
088  protected static final String DEFAULT_TABLE_NAME = "cluster_test";
089
090  /** The default data size if not specified */
091  protected static final int DEFAULT_DATA_SIZE = 64;
092
093  /** The number of reader/writer threads if not specified */
094  protected static final int DEFAULT_NUM_THREADS = 20;
095
096  /** Usage string for the load option */
097  protected static final String OPT_USAGE_LOAD =
098      "<avg_cols_per_key>:<avg_data_size>" +
099      "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
100
101  /** Usage string for the read option */
102  protected static final String OPT_USAGE_READ =
103      "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
104
105  /** Usage string for the update option */
106  protected static final String OPT_USAGE_UPDATE =
107      "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
108      + ">][:<#whether to ignore nonce collisions=0>]";
109
110  protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
111      Arrays.toString(BloomType.values());
112
113  protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
114      "one of " + Arrays.toString(Compression.Algorithm.values());
115
116  protected static final String OPT_VERBOSE = "verbose";
117
118  public static final String OPT_BLOOM = "bloom";
119  public static final String OPT_BLOOM_PARAM = "bloom_param";
120  public static final String OPT_COMPRESSION = "compression";
121  public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
122  public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
123
124  public static final String OPT_INMEMORY = "in_memory";
125  public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
126      "inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
127
128  public static final String OPT_GENERATOR = "generator";
129  public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
130      + " Any args for this class can be passed as colon separated after class name";
131
132  public static final String OPT_WRITER = "writer";
133  public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
134
135  public static final String OPT_UPDATER = "updater";
136  public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
137
138  public static final String OPT_READER = "reader";
139  public static final String OPT_READER_USAGE = "The class for executing the read requests";
140
141  protected static final String OPT_KEY_WINDOW = "key_window";
142  protected static final String OPT_WRITE = "write";
143  protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
144  public static final String OPT_MULTIPUT = "multiput";
145  public static final String OPT_MULTIGET = "multiget_batchsize";
146  protected static final String OPT_NUM_KEYS = "num_keys";
147  protected static final String OPT_READ = "read";
148  protected static final String OPT_START_KEY = "start_key";
149  public static final String OPT_TABLE_NAME = "tn";
150  public static final String OPT_COLUMN_FAMILIES = "families";
151  protected static final String OPT_ZK_QUORUM = "zk";
152  protected static final String OPT_ZK_PARENT_NODE = "zk_root";
153  protected static final String OPT_SKIP_INIT = "skip_init";
154  protected static final String OPT_INIT_ONLY = "init_only";
155  protected static final String NUM_TABLES = "num_tables";
156  protected static final String OPT_BATCHUPDATE = "batchupdate";
157  protected static final String OPT_UPDATE = "update";
158
159  public static final String OPT_ENCRYPTION = "encryption";
160  protected static final String OPT_ENCRYPTION_USAGE =
161    "Enables transparent encryption on the test table, one of " +
162    Arrays.toString(Encryption.getSupportedCiphers());
163
164  public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
165  protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
166    = "Desired number of regions per region server. Defaults to 5.";
167  public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
168
169  public static final String OPT_REGION_REPLICATION = "region_replication";
170  protected static final String OPT_REGION_REPLICATION_USAGE =
171      "Desired number of replicas per region";
172
173  public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
174  protected static final String OPT_REGION_REPLICA_ID_USAGE =
175      "Region replica id to do the reads from";
176
177  public static final String OPT_MOB_THRESHOLD = "mob_threshold";
178  protected static final String OPT_MOB_THRESHOLD_USAGE =
179      "Desired cell size to exceed in bytes that will use the MOB write path";
180
181  protected static final long DEFAULT_START_KEY = 0;
182
183  /** This will be removed as we factor out the dependency on command line */
184  protected CommandLine cmd;
185
186  protected MultiThreadedWriter writerThreads = null;
187  protected MultiThreadedReader readerThreads = null;
188  protected MultiThreadedUpdater updaterThreads = null;
189
190  protected long startKey, endKey;
191
192  protected boolean isVerbose, isWrite, isRead, isUpdate;
193  protected boolean deferredLogFlush;
194
195  // Column family options
196  protected DataBlockEncoding dataBlockEncodingAlgo;
197  protected Compression.Algorithm compressAlgo;
198  protected BloomType bloomType;
199  private boolean inMemoryCF;
200
201  private User userOwner;
202  // Writer options
203  protected int numWriterThreads = DEFAULT_NUM_THREADS;
204  protected int minColsPerKey, maxColsPerKey;
205  protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
206  protected boolean isMultiPut;
207
208  // Updater options
209  protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
210  protected int updatePercent;
211  protected boolean ignoreConflicts = false;
212  protected boolean isBatchUpdate;
213
214  // Reader options
215  private int numReaderThreads = DEFAULT_NUM_THREADS;
216  private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
217  private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
218  private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
219  private int verifyPercent;
220
221  private int numTables = 1;
222
223  private String superUser;
224
225  private String userNames;
226  //This file is used to read authentication information in secure clusters.
227  private String authnFileName;
228
229  private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
230  private int regionReplication = -1; // not set
231  private int regionReplicaId = -1; // not set
232
233  private int mobThreshold = -1; // not set
234
235  // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
236  //       console tool itself should only be used from console.
237  protected boolean isSkipInit = false;
238  protected boolean isInitOnly = false;
239
240  protected Cipher cipher = null;
241
242  protected String[] splitColonSeparated(String option,
243      int minNumCols, int maxNumCols) {
244    String optVal = cmd.getOptionValue(option);
245    String[] cols = optVal.split(COLON);
246    if (cols.length < minNumCols || cols.length > maxNumCols) {
247      throw new IllegalArgumentException("Expected at least "
248          + minNumCols + " columns but no more than " + maxNumCols +
249          " in the colon-separated value '" + optVal + "' of the " +
250          "-" + option + " option");
251    }
252    return cols;
253  }
254
255  protected int getNumThreads(String numThreadsStr) {
256    return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
257  }
258
259  public byte[][] getColumnFamilies() {
260    return families;
261  }
262
263  /**
264   * Apply column family options such as Bloom filters, compression, and data
265   * block encoding.
266   */
267  protected void applyColumnFamilyOptions(TableName tableName,
268      byte[][] columnFamilies) throws IOException {
269    try (Connection conn = ConnectionFactory.createConnection(conf);
270        Admin admin = conn.getAdmin()) {
271      TableDescriptor tableDesc = admin.getDescriptor(tableName);
272      LOG.info("Disabling table " + tableName);
273      admin.disableTable(tableName);
274      for (byte[] cf : columnFamilies) {
275        ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf);
276        boolean isNewCf = columnDesc == null;
277        ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf ?
278            ColumnFamilyDescriptorBuilder.newBuilder(cf) :
279            ColumnFamilyDescriptorBuilder.newBuilder(columnDesc);
280        if (bloomType != null) {
281          columnDescBuilder.setBloomFilterType(bloomType);
282        }
283        if (compressAlgo != null) {
284          columnDescBuilder.setCompressionType(compressAlgo);
285        }
286        if (dataBlockEncodingAlgo != null) {
287          columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo);
288        }
289        if (inMemoryCF) {
290          columnDescBuilder.setInMemory(inMemoryCF);
291        }
292        if (cipher != null) {
293          byte[] keyBytes = new byte[cipher.getKeyLength()];
294          new SecureRandom().nextBytes(keyBytes);
295          columnDescBuilder.setEncryptionType(cipher.getName());
296          columnDescBuilder.setEncryptionKey(
297              EncryptionUtil.wrapKey(conf,
298                  User.getCurrent().getShortName(),
299                  new SecretKeySpec(keyBytes,
300                      cipher.getName())));
301        }
302        if (mobThreshold >= 0) {
303          columnDescBuilder.setMobEnabled(true);
304          columnDescBuilder.setMobThreshold(mobThreshold);
305        }
306
307        if (isNewCf) {
308          admin.addColumnFamily(tableName, columnDescBuilder.build());
309        } else {
310          admin.modifyColumnFamily(tableName, columnDescBuilder.build());
311        }
312      }
313      LOG.info("Enabling table " + tableName);
314      admin.enableTable(tableName);
315    }
316  }
317
318  @Override
319  protected void addOptions() {
320    addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper");
321    addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
322        "without port numbers");
323    addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
324    addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
325    addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
326    addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
327    addOptWithArg(OPT_READ, OPT_USAGE_READ);
328    addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
329    addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
330    addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
331    addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
332    addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
333    addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
334    addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
335        "to tolerate before terminating all reader threads. The default is " +
336        MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
337    addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
338        "separate gets for every column in a row");
339    addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
340        "reads and writes for concurrent write/read workload. The default " +
341        "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
342
343    addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
344        "separate puts for every column in a row");
345    addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
346        "separate updates for every column in a row");
347    addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
348    addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
349    addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
350    addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
351    addOptWithArg(OPT_READER, OPT_READER_USAGE);
352
353    addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
354    addOptWithArg(OPT_START_KEY, "The first key to read/write " +
355        "(a 0-based index). The default value is " +
356        DEFAULT_START_KEY + ".");
357    addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
358        + "already exists");
359
360    addOptWithArg(NUM_TABLES,
361      "A positive integer number. When a number n is specified, load test "
362          + "tool  will load n table parallely. -tn parameter value becomes "
363          + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
364
365    addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
366    addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
367    addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
368    addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
369    addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
370    addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
371  }
372
373  @Override
374  protected CommandLineParser newParser() {
375    // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
376    // Validate in parse() to get helpful error messages instead of exploding in processOptions()
377    return new DefaultParser() {
378      @Override
379      public CommandLine parse(Options opts, String[] args, Properties props, boolean stop)
380          throws ParseException {
381        CommandLine cl = super.parse(opts, args, props, stop);
382
383        boolean isReadWriteUpdate = cmd.hasOption(OPT_READ)
384            || cmd.hasOption(OPT_WRITE)
385            || cmd.hasOption(OPT_UPDATE);
386        boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
387
388        if (!isInitOnly && !isReadWriteUpdate) {
389          throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
390              + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
391        }
392
393        if (isInitOnly && isReadWriteUpdate) {
394          throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -"
395              + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
396        }
397
398        if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) {
399          throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode.");
400        }
401
402        return cl;
403      }
404    };
405  }
406
407  @Override
408  protected void processOptions(CommandLine cmd) {
409    this.cmd = cmd;
410
411    tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
412        DEFAULT_TABLE_NAME));
413
414    if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
415      String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
416      families = new byte[list.length][];
417      for (int i = 0; i < list.length; i++) {
418        families[i] = Bytes.toBytes(list[i]);
419      }
420    } else {
421      families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES;
422    }
423
424    isVerbose = cmd.hasOption(OPT_VERBOSE);
425    isWrite = cmd.hasOption(OPT_WRITE);
426    isRead = cmd.hasOption(OPT_READ);
427    isUpdate = cmd.hasOption(OPT_UPDATE);
428    isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
429    deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
430
431    if (!isInitOnly) {
432      startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
433          String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
434      long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
435          Long.MAX_VALUE - startKey);
436      endKey = startKey + numKeys;
437      isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
438      System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
439    }
440
441    parseColumnFamilyOptions(cmd);
442
443    if (isWrite) {
444      String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
445
446      int colIndex = 0;
447      minColsPerKey = 1;
448      maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
449      int avgColDataSize =
450          parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
451      minColDataSize = avgColDataSize / 2;
452      maxColDataSize = avgColDataSize * 3 / 2;
453
454      if (colIndex < writeOpts.length) {
455        numWriterThreads = getNumThreads(writeOpts[colIndex++]);
456      }
457
458      isMultiPut = cmd.hasOption(OPT_MULTIPUT);
459
460      mobThreshold = -1;
461      if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
462        mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
463      }
464
465      System.out.println("Multi-puts: " + isMultiPut);
466      System.out.println("Columns per key: " + minColsPerKey + ".."
467          + maxColsPerKey);
468      System.out.println("Data size per column: " + minColDataSize + ".."
469          + maxColDataSize);
470    }
471
472    if (isUpdate) {
473      String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
474      int colIndex = 0;
475      updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
476      if (colIndex < mutateOpts.length) {
477        numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
478      }
479      if (colIndex < mutateOpts.length) {
480        ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
481      }
482
483      isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
484
485      System.out.println("Batch updates: " + isBatchUpdate);
486      System.out.println("Percent of keys to update: " + updatePercent);
487      System.out.println("Updater threads: " + numUpdaterThreads);
488      System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
489    }
490
491    if (isRead) {
492      String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
493      int colIndex = 0;
494      verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
495      if (colIndex < readOpts.length) {
496        numReaderThreads = getNumThreads(readOpts[colIndex++]);
497      }
498
499      if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
500        maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
501            0, Integer.MAX_VALUE);
502      }
503
504      if (cmd.hasOption(OPT_KEY_WINDOW)) {
505        keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
506            0, Integer.MAX_VALUE);
507      }
508
509      if (cmd.hasOption(OPT_MULTIGET)) {
510        multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
511            0, Integer.MAX_VALUE);
512      }
513
514      System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
515      System.out.println("Percent of keys to verify: " + verifyPercent);
516      System.out.println("Reader threads: " + numReaderThreads);
517    }
518
519    numTables = 1;
520    if (cmd.hasOption(NUM_TABLES)) {
521      numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
522    }
523
524    numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
525    if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
526      numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
527    }
528
529    regionReplication = 1;
530    if (cmd.hasOption(OPT_REGION_REPLICATION)) {
531      regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
532    }
533
534    regionReplicaId = -1;
535    if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
536      regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
537    }
538  }
539
540  private void parseColumnFamilyOptions(CommandLine cmd) {
541    String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING);
542    dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
543        DataBlockEncoding.valueOf(dataBlockEncodingStr);
544
545    String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
546    compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
547        Compression.Algorithm.valueOf(compressStr);
548
549    String bloomStr = cmd.getOptionValue(OPT_BLOOM);
550    bloomType = bloomStr == null ? BloomType.ROW :
551        BloomType.valueOf(bloomStr);
552
553    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
554      if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
555        LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
556      } else {
557        conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
558      }
559    }
560
561    inMemoryCF = cmd.hasOption(OPT_INMEMORY);
562    if (cmd.hasOption(OPT_ENCRYPTION)) {
563      cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
564    }
565
566  }
567
568  public void initTestTable() throws IOException {
569    Durability durability = Durability.USE_DEFAULT;
570    if (deferredLogFlush) {
571      durability = Durability.ASYNC_WAL;
572    }
573
574    HBaseTestingUtil.createPreSplitLoadTestTable(conf, tableName,
575      getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
576        regionReplication, durability);
577    applyColumnFamilyOptions(tableName, getColumnFamilies());
578  }
579
580  @Override
581  protected int doWork() throws IOException {
582    if (!isVerbose) {
583      Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN");
584    }
585    if (numTables > 1) {
586      return parallelLoadTables();
587    } else {
588      return loadTable();
589    }
590  }
591
592  protected int loadTable() throws IOException {
593    if (cmd.hasOption(OPT_ZK_QUORUM)) {
594      conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
595    }
596    if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
597      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
598    }
599
600    if (isInitOnly) {
601      LOG.info("Initializing only; no reads or writes");
602      initTestTable();
603      return 0;
604    }
605
606    if (!isSkipInit) {
607      initTestTable();
608    }
609    LoadTestDataGenerator dataGen = null;
610    if (cmd.hasOption(OPT_GENERATOR)) {
611      String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
612      dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
613      String[] args;
614      if (dataGen instanceof LoadTestDataGeneratorWithACL) {
615        LOG.info("Using LoadTestDataGeneratorWithACL");
616        if (User.isHBaseSecurityEnabled(conf)) {
617          LOG.info("Security is enabled");
618          authnFileName = clazzAndArgs[1];
619          superUser = clazzAndArgs[2];
620          userNames = clazzAndArgs[3];
621          args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
622          Properties authConfig = new Properties();
623          authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
624          try {
625            addAuthInfoToConf(authConfig, conf, superUser, userNames);
626          } catch (IOException exp) {
627            LOG.error(exp.toString(), exp);
628            return EXIT_FAILURE;
629          }
630          userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser));
631        } else {
632          superUser = clazzAndArgs[1];
633          userNames = clazzAndArgs[2];
634          args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
635          userOwner = User.createUserForTesting(conf, superUser, new String[0]);
636        }
637      } else {
638        args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
639            clazzAndArgs.length);
640      }
641      dataGen.initialize(args);
642    } else {
643      // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
644      dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
645          minColsPerKey, maxColsPerKey, families);
646    }
647
648    if (userOwner != null) {
649      LOG.info("Granting permissions for user " + userOwner.getShortName());
650      Permission.Action[] actions = {
651        Permission.Action.ADMIN, Permission.Action.CREATE,
652        Permission.Action.READ, Permission.Action.WRITE };
653      try {
654        AccessControlClient.grant(ConnectionFactory.createConnection(conf),
655            tableName, userOwner.getShortName(), null, null, actions);
656      } catch (Throwable e) {
657        LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " +
658            userOwner.getShortName(), e);
659        return EXIT_FAILURE;
660      }
661    }
662
663    if (userNames != null) {
664      // This will be comma separated list of expressions.
665      String users[] = userNames.split(",");
666      User user = null;
667      for (String userStr : users) {
668        if (User.isHBaseSecurityEnabled(conf)) {
669          user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr));
670        } else {
671          user = User.createUserForTesting(conf, userStr, new String[0]);
672        }
673      }
674    }
675
676    if (isWrite) {
677      if (userOwner != null) {
678        writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
679      } else {
680        String writerClass = null;
681        if (cmd.hasOption(OPT_WRITER)) {
682          writerClass = cmd.getOptionValue(OPT_WRITER);
683        } else {
684          writerClass = MultiThreadedWriter.class.getCanonicalName();
685        }
686
687        writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
688      }
689      writerThreads.setMultiPut(isMultiPut);
690    }
691
692    if (isUpdate) {
693      if (userOwner != null) {
694        updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
695            userOwner, userNames);
696      } else {
697        String updaterClass = null;
698        if (cmd.hasOption(OPT_UPDATER)) {
699          updaterClass = cmd.getOptionValue(OPT_UPDATER);
700        } else {
701          updaterClass = MultiThreadedUpdater.class.getCanonicalName();
702        }
703        updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
704      }
705      updaterThreads.setBatchUpdate(isBatchUpdate);
706      updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
707    }
708
709    if (isRead) {
710      if (userOwner != null) {
711        readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
712            userNames);
713      } else {
714        String readerClass = null;
715        if (cmd.hasOption(OPT_READER)) {
716          readerClass = cmd.getOptionValue(OPT_READER);
717        } else {
718          readerClass = MultiThreadedReader.class.getCanonicalName();
719        }
720        readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
721      }
722      readerThreads.setMaxErrors(maxReadErrors);
723      readerThreads.setKeyWindow(keyWindow);
724      readerThreads.setMultiGetBatchSize(multiGetBatchSize);
725      readerThreads.setRegionReplicaId(regionReplicaId);
726    }
727
728    if (isUpdate && isWrite) {
729      LOG.info("Concurrent write/update workload: making updaters aware of the " +
730        "write point");
731      updaterThreads.linkToWriter(writerThreads);
732    }
733
734    if (isRead && (isUpdate || isWrite)) {
735      LOG.info("Concurrent write/read workload: making readers aware of the " +
736        "write point");
737      readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
738    }
739
740    if (isWrite) {
741      System.out.println("Starting to write data...");
742      writerThreads.start(startKey, endKey, numWriterThreads);
743    }
744
745    if (isUpdate) {
746      LOG.info("Starting to mutate data...");
747      System.out.println("Starting to mutate data...");
748      // TODO : currently append and increment operations not tested with tags
749      // Will update this after it is done
750      updaterThreads.start(startKey, endKey, numUpdaterThreads);
751    }
752
753    if (isRead) {
754      System.out.println("Starting to read data...");
755      readerThreads.start(startKey, endKey, numReaderThreads);
756    }
757
758    if (isWrite) {
759      writerThreads.waitForFinish();
760    }
761
762    if (isUpdate) {
763      updaterThreads.waitForFinish();
764    }
765
766    if (isRead) {
767      readerThreads.waitForFinish();
768    }
769
770    boolean success = true;
771    if (isWrite) {
772      success = success && writerThreads.getNumWriteFailures() == 0;
773    }
774    if (isUpdate) {
775      success = success && updaterThreads.getNumWriteFailures() == 0;
776    }
777    if (isRead) {
778      success = success && readerThreads.getNumReadErrors() == 0
779          && readerThreads.getNumReadFailures() == 0;
780    }
781    return success ? EXIT_SUCCESS : EXIT_FAILURE;
782  }
783
784  private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
785    try {
786      Class<?> clazz = Class.forName(clazzName);
787      Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
788          byte[][].class);
789      return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
790          minColsPerKey, maxColsPerKey, families);
791    } catch (Exception e) {
792      throw new IOException(e);
793    }
794  }
795
796  private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
797      , LoadTestDataGenerator dataGen) throws IOException {
798    try {
799      Class<?> clazz = Class.forName(clazzName);
800      Constructor<?> constructor = clazz.getConstructor(
801        LoadTestDataGenerator.class, Configuration.class, TableName.class);
802      return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
803    } catch (Exception e) {
804      throw new IOException(e);
805    }
806  }
807
808  private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
809      , LoadTestDataGenerator dataGen) throws IOException {
810    try {
811      Class<?> clazz = Class.forName(clazzName);
812      Constructor<?> constructor = clazz.getConstructor(
813        LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
814      return (MultiThreadedUpdater) constructor.newInstance(
815        dataGen, conf, tableName, updatePercent);
816    } catch (Exception e) {
817      throw new IOException(e);
818    }
819  }
820
821  private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
822      , LoadTestDataGenerator dataGen) throws IOException {
823    try {
824      Class<?> clazz = Class.forName(clazzName);
825      Constructor<?> constructor = clazz.getConstructor(
826        LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
827      return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
828    } catch (Exception e) {
829      throw new IOException(e);
830    }
831  }
832
833  public static void main(String[] args) {
834    new LoadTestTool().doStaticMain(args);
835  }
836
837  /**
838   * When NUM_TABLES is specified, the function starts multiple worker threads
839   * which individually start a LoadTestTool instance to load a table. Each
840   * table name is in format &lt;tn>_&lt;index>. For example, "-tn test -num_tables 2"
841   * , table names will be "test_1", "test_2"
842   *
843   * @throws IOException if one of the load tasks is unable to complete
844   */
845  private int parallelLoadTables()
846      throws IOException {
847    // create new command args
848    String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
849    String[] newArgs = null;
850    if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
851      newArgs = new String[cmdLineArgs.length + 2];
852      newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
853      newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
854      System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
855    } else {
856      newArgs = cmdLineArgs;
857    }
858
859    int tableNameValueIndex = -1;
860    for (int j = 0; j < newArgs.length; j++) {
861      if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
862        tableNameValueIndex = j + 1;
863      } else if (newArgs[j].endsWith(NUM_TABLES)) {
864        // change NUM_TABLES to 1 so that each worker loads one table
865        newArgs[j + 1] = "1";
866      }
867    }
868
869    // starting to load multiple tables
870    List<WorkerThread> workers = new ArrayList<>();
871    for (int i = 0; i < numTables; i++) {
872      String[] workerArgs = newArgs.clone();
873      workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
874      WorkerThread worker = new WorkerThread(i, workerArgs);
875      workers.add(worker);
876      LOG.info(worker + " starting");
877      worker.start();
878    }
879
880    // wait for all workers finish
881    LOG.info("Waiting for worker threads to finish");
882    for (WorkerThread t : workers) {
883      try {
884        t.join();
885      } catch (InterruptedException ie) {
886        IOException iie = new InterruptedIOException();
887        iie.initCause(ie);
888        throw iie;
889      }
890      checkForErrors();
891    }
892
893    return EXIT_SUCCESS;
894  }
895
896  // If an exception is thrown by one of worker threads, it will be
897  // stored here.
898  protected AtomicReference<Throwable> thrown = new AtomicReference<>();
899
900  private void workerThreadError(Throwable t) {
901    thrown.compareAndSet(null, t);
902  }
903
904  /**
905   * Check for errors in the writer threads. If any is found, rethrow it.
906   */
907  private void checkForErrors() throws IOException {
908    Throwable thrown = this.thrown.get();
909    if (thrown == null) return;
910    if (thrown instanceof IOException) {
911      throw (IOException) thrown;
912    } else {
913      throw new RuntimeException(thrown);
914    }
915  }
916
917  class WorkerThread extends Thread {
918    private String[] workerArgs;
919
920    WorkerThread(int i, String[] args) {
921      super("WorkerThread-" + i);
922      workerArgs = args;
923    }
924
925    @Override
926    public void run() {
927      try {
928        int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
929        if (ret != 0) {
930          throw new RuntimeException("LoadTestTool exit with non-zero return code.");
931        }
932      } catch (Exception ex) {
933        LOG.error("Error in worker thread", ex);
934        workerThreadError(ex);
935      }
936    }
937  }
938
939  private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
940      String userList) throws IOException {
941    List<String> users = new ArrayList<>(Arrays.asList(userList.split(",")));
942    users.add(owner);
943    for (String user : users) {
944      String keyTabFileConfKey = "hbase." + user + ".keytab.file";
945      String principalConfKey = "hbase." + user + ".kerberos.principal";
946      if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
947        throw new IOException("Authentication configs missing for user : " + user);
948      }
949    }
950    for (String key : authConfig.stringPropertyNames()) {
951      conf.set(key, authConfig.getProperty(key));
952    }
953    LOG.debug("Added authentication properties to config successfully.");
954  }
955
956}