001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.lang.reflect.Constructor;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.atomic.AtomicReference;
028import javax.crypto.spec.SecretKeySpec;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Durability;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.crypto.Cipher;
043import org.apache.hadoop.hbase.io.crypto.Encryption;
044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
045import org.apache.hadoop.hbase.log.HBaseMarkers;
046import org.apache.hadoop.hbase.logging.Log4jUtils;
047import org.apache.hadoop.hbase.regionserver.BloomType;
048import org.apache.hadoop.hbase.security.EncryptionUtil;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.security.access.AccessControlClient;
051import org.apache.hadoop.hbase.security.access.Permission;
052import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
053import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
054import org.apache.hadoop.util.ToolRunner;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.apache.zookeeper.ZooKeeper;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException;
061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
062import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
063import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
067
068/**
069 * A command-line utility that reads, writes, and verifies data. Unlike
070 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, and
071 * supports simultaneously writing and reading the same set of keys.
072 */
073@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
074public class LoadTestTool extends AbstractHBaseTool {
075
076  private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class);
077  private static final String COLON = ":";
078
079  /** Table name for the test */
080  private TableName tableName;
081
082  /** Column families for the test */
083  private byte[][] families;
084  /** Column family used by the test */
085  private static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
086  /** Column families used by the test */
087  private static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
088
089  /** Table name to use of not overridden on the command line */
090  protected static final String DEFAULT_TABLE_NAME = "cluster_test";
091
092  /** The default data size if not specified */
093  protected static final int DEFAULT_DATA_SIZE = 64;
094
095  /** The number of reader/writer threads if not specified */
096  protected static final int DEFAULT_NUM_THREADS = 20;
097
098  /** Usage string for the load option */
099  protected static final String OPT_USAGE_LOAD =
100    "<avg_cols_per_key>:<avg_data_size>" + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
101
102  /** Usage string for the read option */
103  protected static final String OPT_USAGE_READ =
104    "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
105
106  /** Usage string for the update option */
107  protected static final String OPT_USAGE_UPDATE = "<update_percent>[:<#threads="
108    + DEFAULT_NUM_THREADS + ">][:<#whether to ignore nonce collisions=0>]";
109
110  protected static final String OPT_USAGE_BLOOM =
111    "Bloom filter type, one of " + Arrays.toString(BloomType.values());
112
113  protected static final String OPT_USAGE_COMPRESSION =
114    "Compression type, " + "one of " + Arrays.toString(Compression.Algorithm.values());
115
116  protected static final String OPT_VERBOSE = "verbose";
117
118  public static final String OPT_BLOOM = "bloom";
119  public static final String OPT_BLOOM_PARAM = "bloom_param";
120  public static final String OPT_COMPRESSION = "compression";
121  public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
122  public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
123
124  public static final String OPT_INMEMORY = "in_memory";
125  public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF "
126    + "inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
127
128  public static final String OPT_GENERATOR = "generator";
129  public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
130    + " Any args for this class can be passed as colon separated after class name";
131
132  public static final String OPT_WRITER = "writer";
133  public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
134
135  public static final String OPT_UPDATER = "updater";
136  public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
137
138  public static final String OPT_READER = "reader";
139  public static final String OPT_READER_USAGE = "The class for executing the read requests";
140
141  protected static final String OPT_KEY_WINDOW = "key_window";
142  protected static final String OPT_WRITE = "write";
143  protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
144  public static final String OPT_MULTIPUT = "multiput";
145  public static final String OPT_MULTIGET = "multiget_batchsize";
146  protected static final String OPT_NUM_KEYS = "num_keys";
147  protected static final String OPT_READ = "read";
148  protected static final String OPT_START_KEY = "start_key";
149  public static final String OPT_TABLE_NAME = "tn";
150  public static final String OPT_COLUMN_FAMILIES = "families";
151  protected static final String OPT_ZK_QUORUM = "zk";
152  protected static final String OPT_ZK_PARENT_NODE = "zk_root";
153  protected static final String OPT_SKIP_INIT = "skip_init";
154  protected static final String OPT_INIT_ONLY = "init_only";
155  protected static final String NUM_TABLES = "num_tables";
156  protected static final String OPT_BATCHUPDATE = "batchupdate";
157  protected static final String OPT_UPDATE = "update";
158
159  public static final String OPT_ENCRYPTION = "encryption";
160  protected static final String OPT_ENCRYPTION_USAGE =
161    "Enables transparent encryption on the test table, one of "
162      + Arrays.toString(Encryption.getSupportedCiphers());
163
164  public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
165  protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE =
166    "Desired number of regions per region server. Defaults to 5.";
167  public static final int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
168
169  public static final String OPT_REGION_REPLICATION = "region_replication";
170  protected static final String OPT_REGION_REPLICATION_USAGE =
171    "Desired number of replicas per region";
172
173  public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
174  protected static final String OPT_REGION_REPLICA_ID_USAGE =
175    "Region replica id to do the reads from";
176
177  public static final String OPT_MOB_THRESHOLD = "mob_threshold";
178  protected static final String OPT_MOB_THRESHOLD_USAGE =
179    "Desired cell size to exceed in bytes that will use the MOB write path";
180
181  protected static final long DEFAULT_START_KEY = 0;
182
183  /** This will be removed as we factor out the dependency on command line */
184  protected CommandLine cmd;
185
186  protected MultiThreadedWriter writerThreads = null;
187  protected MultiThreadedReader readerThreads = null;
188  protected MultiThreadedUpdater updaterThreads = null;
189
190  protected long startKey, endKey;
191
192  protected boolean isVerbose, isWrite, isRead, isUpdate;
193  protected boolean deferredLogFlush;
194
195  // Column family options
196  protected DataBlockEncoding dataBlockEncodingAlgo;
197  protected Compression.Algorithm compressAlgo;
198  protected BloomType bloomType;
199  private boolean inMemoryCF;
200
201  private User userOwner;
202  // Writer options
203  protected int numWriterThreads = DEFAULT_NUM_THREADS;
204  protected int minColsPerKey, maxColsPerKey;
205  protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
206  protected boolean isMultiPut;
207
208  // Updater options
209  protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
210  protected int updatePercent;
211  protected boolean ignoreConflicts = false;
212  protected boolean isBatchUpdate;
213
214  // Reader options
215  private int numReaderThreads = DEFAULT_NUM_THREADS;
216  private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
217  private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
218  private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
219  private int verifyPercent;
220
221  private int numTables = 1;
222
223  private String superUser;
224
225  private String userNames;
226  // This file is used to read authentication information in secure clusters.
227  private String authnFileName;
228
229  private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
230  private int regionReplication = -1; // not set
231  private int regionReplicaId = -1; // not set
232
233  private int mobThreshold = -1; // not set
234
235  // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
236  // console tool itself should only be used from console.
237  protected boolean isSkipInit = false;
238  protected boolean isInitOnly = false;
239
240  protected Cipher cipher = null;
241
242  protected String[] splitColonSeparated(String option, int minNumCols, int maxNumCols) {
243    String optVal = cmd.getOptionValue(option);
244    String[] cols = optVal.split(COLON);
245    if (cols.length < minNumCols || cols.length > maxNumCols) {
246      throw new IllegalArgumentException(
247        "Expected at least " + minNumCols + " columns but no more than " + maxNumCols
248          + " in the colon-separated value '" + optVal + "' of the " + "-" + option + " option");
249    }
250    return cols;
251  }
252
253  protected int getNumThreads(String numThreadsStr) {
254    return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
255  }
256
257  public byte[][] getColumnFamilies() {
258    return families;
259  }
260
261  /**
262   * Apply column family options such as Bloom filters, compression, and data block encoding.
263   */
264  protected void applyColumnFamilyOptions(TableName tableName, byte[][] columnFamilies)
265    throws IOException {
266    try (Connection conn = ConnectionFactory.createConnection(conf);
267      Admin admin = conn.getAdmin()) {
268      TableDescriptor tableDesc = admin.getDescriptor(tableName);
269      LOG.info("Disabling table " + tableName);
270      admin.disableTable(tableName);
271      for (byte[] cf : columnFamilies) {
272        ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf);
273        boolean isNewCf = columnDesc == null;
274        ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf
275          ? ColumnFamilyDescriptorBuilder.newBuilder(cf)
276          : ColumnFamilyDescriptorBuilder.newBuilder(columnDesc);
277        if (bloomType != null) {
278          columnDescBuilder.setBloomFilterType(bloomType);
279        }
280        if (compressAlgo != null) {
281          columnDescBuilder.setCompressionType(compressAlgo);
282        }
283        if (dataBlockEncodingAlgo != null) {
284          columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo);
285        }
286        if (inMemoryCF) {
287          columnDescBuilder.setInMemory(inMemoryCF);
288        }
289        if (cipher != null) {
290          byte[] keyBytes = new byte[cipher.getKeyLength()];
291          Bytes.secureRandom(keyBytes);
292          columnDescBuilder.setEncryptionType(cipher.getName());
293          columnDescBuilder.setEncryptionKey(EncryptionUtil.wrapKey(conf,
294            User.getCurrent().getShortName(), new SecretKeySpec(keyBytes, cipher.getName())));
295        }
296        if (mobThreshold >= 0) {
297          columnDescBuilder.setMobEnabled(true);
298          columnDescBuilder.setMobThreshold(mobThreshold);
299        }
300
301        if (isNewCf) {
302          admin.addColumnFamily(tableName, columnDescBuilder.build());
303        } else {
304          admin.modifyColumnFamily(tableName, columnDescBuilder.build());
305        }
306      }
307      LOG.info("Enabling table " + tableName);
308      admin.enableTable(tableName);
309    }
310  }
311
312  @Override
313  protected void addOptions() {
314    addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper");
315    addOptWithArg(OPT_ZK_QUORUM,
316      "ZK quorum as comma-separated host names " + "without port numbers");
317    addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
318    addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
319    addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
320    addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
321    addOptWithArg(OPT_READ, OPT_USAGE_READ);
322    addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
323    addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
324    addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
325    addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
326    addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
327    addOptWithArg(LoadTestUtil.OPT_DATA_BLOCK_ENCODING, LoadTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
328    addOptWithArg(OPT_MAX_READ_ERRORS,
329      "The maximum number of read errors "
330        + "to tolerate before terminating all reader threads. The default is "
331        + MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
332    addOptWithArg(OPT_MULTIGET,
333      "Whether to use multi-gets as opposed to " + "separate gets for every column in a row");
334    addOptWithArg(OPT_KEY_WINDOW,
335      "The 'key window' to maintain between "
336        + "reads and writes for concurrent write/read workload. The default " + "is "
337        + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
338
339    addOptNoArg(OPT_MULTIPUT,
340      "Whether to use multi-puts as opposed to " + "separate puts for every column in a row");
341    addOptNoArg(OPT_BATCHUPDATE,
342      "Whether to use batch as opposed to " + "separate updates for every column in a row");
343    addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
344    addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
345    addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
346    addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
347    addOptWithArg(OPT_READER, OPT_READER_USAGE);
348
349    addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
350    addOptWithArg(OPT_START_KEY, "The first key to read/write "
351      + "(a 0-based index). The default value is " + DEFAULT_START_KEY + ".");
352    addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists");
353
354    addOptWithArg(NUM_TABLES,
355      "A positive integer number. When a number n is specified, load test "
356        + "tool  will load n table parallely. -tn parameter value becomes "
357        + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
358
359    addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
360    addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
361    addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
362    addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
363    addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
364    addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
365  }
366
367  @Override
368  protected CommandLineParser newParser() {
369    // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
370    // Validate in parse() to get helpful error messages instead of exploding in processOptions()
371    return new DefaultParser() {
372      @Override
373      public CommandLine parse(Options opts, String[] args, Properties props, boolean stop)
374        throws ParseException {
375        CommandLine cl = super.parse(opts, args, props, stop);
376
377        boolean isReadWriteUpdate =
378          cmd.hasOption(OPT_READ) || cmd.hasOption(OPT_WRITE) || cmd.hasOption(OPT_UPDATE);
379        boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
380
381        if (!isInitOnly && !isReadWriteUpdate) {
382          throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
383            + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
384        }
385
386        if (isInitOnly && isReadWriteUpdate) {
387          throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -"
388            + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
389        }
390
391        if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) {
392          throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode.");
393        }
394
395        return cl;
396      }
397    };
398  }
399
400  @Override
401  protected void processOptions(CommandLine cmd) {
402    this.cmd = cmd;
403
404    tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME));
405
406    if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
407      String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
408      families = new byte[list.length][];
409      for (int i = 0; i < list.length; i++) {
410        families[i] = Bytes.toBytes(list[i]);
411      }
412    } else {
413      families = DEFAULT_COLUMN_FAMILIES;
414    }
415
416    isVerbose = cmd.hasOption(OPT_VERBOSE);
417    isWrite = cmd.hasOption(OPT_WRITE);
418    isRead = cmd.hasOption(OPT_READ);
419    isUpdate = cmd.hasOption(OPT_UPDATE);
420    isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
421    deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
422
423    if (!isInitOnly) {
424      startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, String.valueOf(DEFAULT_START_KEY)), 0,
425        Long.MAX_VALUE);
426      long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, Long.MAX_VALUE - startKey);
427      endKey = startKey + numKeys;
428      isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
429      System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
430    }
431
432    parseColumnFamilyOptions(cmd);
433
434    if (isWrite) {
435      String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
436
437      int colIndex = 0;
438      minColsPerKey = 1;
439      maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
440      int avgColDataSize = parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
441      minColDataSize = avgColDataSize / 2;
442      maxColDataSize = avgColDataSize * 3 / 2;
443
444      if (colIndex < writeOpts.length) {
445        numWriterThreads = getNumThreads(writeOpts[colIndex++]);
446      }
447
448      isMultiPut = cmd.hasOption(OPT_MULTIPUT);
449
450      mobThreshold = -1;
451      if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
452        mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
453      }
454
455      System.out.println("Multi-puts: " + isMultiPut);
456      System.out.println("Columns per key: " + minColsPerKey + ".." + maxColsPerKey);
457      System.out.println("Data size per column: " + minColDataSize + ".." + maxColDataSize);
458    }
459
460    if (isUpdate) {
461      String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
462      int colIndex = 0;
463      updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
464      if (colIndex < mutateOpts.length) {
465        numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
466      }
467      if (colIndex < mutateOpts.length) {
468        ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
469      }
470
471      isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
472
473      System.out.println("Batch updates: " + isBatchUpdate);
474      System.out.println("Percent of keys to update: " + updatePercent);
475      System.out.println("Updater threads: " + numUpdaterThreads);
476      System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
477    }
478
479    if (isRead) {
480      String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
481      int colIndex = 0;
482      verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
483      if (colIndex < readOpts.length) {
484        numReaderThreads = getNumThreads(readOpts[colIndex++]);
485      }
486
487      if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
488        maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 0, Integer.MAX_VALUE);
489      }
490
491      if (cmd.hasOption(OPT_KEY_WINDOW)) {
492        keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 0, Integer.MAX_VALUE);
493      }
494
495      if (cmd.hasOption(OPT_MULTIGET)) {
496        multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 0, Integer.MAX_VALUE);
497      }
498
499      System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
500      System.out.println("Percent of keys to verify: " + verifyPercent);
501      System.out.println("Reader threads: " + numReaderThreads);
502    }
503
504    numTables = 1;
505    if (cmd.hasOption(NUM_TABLES)) {
506      numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
507    }
508
509    numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
510    if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
511      numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
512    }
513
514    regionReplication = 1;
515    if (cmd.hasOption(OPT_REGION_REPLICATION)) {
516      regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
517    }
518
519    regionReplicaId = -1;
520    if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
521      regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
522    }
523  }
524
525  private void parseColumnFamilyOptions(CommandLine cmd) {
526    String dataBlockEncodingStr = cmd.getOptionValue(LoadTestUtil.OPT_DATA_BLOCK_ENCODING);
527    dataBlockEncodingAlgo =
528      dataBlockEncodingStr == null ? null : DataBlockEncoding.valueOf(dataBlockEncodingStr);
529
530    String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
531    compressAlgo =
532      compressStr == null ? Compression.Algorithm.NONE : Compression.Algorithm.valueOf(compressStr);
533
534    String bloomStr = cmd.getOptionValue(OPT_BLOOM);
535    bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr);
536
537    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
538      if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
539        LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
540      } else {
541        conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
542      }
543    }
544
545    inMemoryCF = cmd.hasOption(OPT_INMEMORY);
546    if (cmd.hasOption(OPT_ENCRYPTION)) {
547      cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
548    }
549
550  }
551
552  public void initTestTable() throws IOException {
553    Durability durability = Durability.USE_DEFAULT;
554    if (deferredLogFlush) {
555      durability = Durability.ASYNC_WAL;
556    }
557
558    LoadTestUtil.createPreSplitLoadTestTable(conf, tableName, getColumnFamilies(), compressAlgo,
559      dataBlockEncodingAlgo, numRegionsPerServer, regionReplication, durability);
560    applyColumnFamilyOptions(tableName, getColumnFamilies());
561  }
562
563  @Override
564  protected int doWork() throws IOException {
565    if (!isVerbose) {
566      Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN");
567    }
568    if (numTables > 1) {
569      return parallelLoadTables();
570    } else {
571      return loadTable();
572    }
573  }
574
575  protected int loadTable() throws IOException {
576    if (cmd.hasOption(OPT_ZK_QUORUM)) {
577      conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
578    }
579    if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
580      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
581    }
582
583    if (isInitOnly) {
584      LOG.info("Initializing only; no reads or writes");
585      initTestTable();
586      return 0;
587    }
588
589    if (!isSkipInit) {
590      initTestTable();
591    }
592    LoadTestDataGenerator dataGen = null;
593    if (cmd.hasOption(OPT_GENERATOR)) {
594      String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
595      dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
596      String[] args;
597      if (dataGen instanceof LoadTestDataGeneratorWithACL) {
598        LOG.info("Using LoadTestDataGeneratorWithACL");
599        if (User.isHBaseSecurityEnabled(conf)) {
600          LOG.info("Security is enabled");
601          authnFileName = clazzAndArgs[1];
602          superUser = clazzAndArgs[2];
603          userNames = clazzAndArgs[3];
604          args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
605          Properties authConfig = new Properties();
606          authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
607          try {
608            addAuthInfoToConf(authConfig, conf, superUser, userNames);
609          } catch (IOException exp) {
610            LOG.error(exp.toString(), exp);
611            return EXIT_FAILURE;
612          }
613          userOwner = User.create(KerberosUtils.loginAndReturnUGI(conf, superUser));
614        } else {
615          superUser = clazzAndArgs[1];
616          userNames = clazzAndArgs[2];
617          args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
618          userOwner = User.createUserForTesting(conf, superUser, new String[0]);
619        }
620      } else {
621        args = clazzAndArgs.length == 1
622          ? new String[0]
623          : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
624      }
625      dataGen.initialize(args);
626    } else {
627      // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
628      dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
629        minColsPerKey, maxColsPerKey, families);
630    }
631
632    if (userOwner != null) {
633      LOG.info("Granting permissions for user " + userOwner.getShortName());
634      Permission.Action[] actions = { Permission.Action.ADMIN, Permission.Action.CREATE,
635        Permission.Action.READ, Permission.Action.WRITE };
636      try {
637        AccessControlClient.grant(ConnectionFactory.createConnection(conf), tableName,
638          userOwner.getShortName(), null, null, actions);
639      } catch (Throwable e) {
640        LOG.error(HBaseMarkers.FATAL,
641          "Error in granting permission for the user " + userOwner.getShortName(), e);
642        return EXIT_FAILURE;
643      }
644    }
645
646    if (userNames != null) {
647      // This will be comma separated list of expressions.
648      String users[] = userNames.split(",");
649      User user = null;
650      for (String userStr : users) {
651        if (User.isHBaseSecurityEnabled(conf)) {
652          user = User.create(KerberosUtils.loginAndReturnUGI(conf, userStr));
653        } else {
654          user = User.createUserForTesting(conf, userStr, new String[0]);
655        }
656      }
657    }
658
659    if (isWrite) {
660      if (userOwner != null) {
661        writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
662      } else {
663        String writerClass = null;
664        if (cmd.hasOption(OPT_WRITER)) {
665          writerClass = cmd.getOptionValue(OPT_WRITER);
666        } else {
667          writerClass = MultiThreadedWriter.class.getCanonicalName();
668        }
669
670        writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
671      }
672      writerThreads.setMultiPut(isMultiPut);
673    }
674
675    if (isUpdate) {
676      if (userOwner != null) {
677        updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
678          userOwner, userNames);
679      } else {
680        String updaterClass = null;
681        if (cmd.hasOption(OPT_UPDATER)) {
682          updaterClass = cmd.getOptionValue(OPT_UPDATER);
683        } else {
684          updaterClass = MultiThreadedUpdater.class.getCanonicalName();
685        }
686        updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
687      }
688      updaterThreads.setBatchUpdate(isBatchUpdate);
689      updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
690    }
691
692    if (isRead) {
693      if (userOwner != null) {
694        readerThreads =
695          new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames);
696      } else {
697        String readerClass = null;
698        if (cmd.hasOption(OPT_READER)) {
699          readerClass = cmd.getOptionValue(OPT_READER);
700        } else {
701          readerClass = MultiThreadedReader.class.getCanonicalName();
702        }
703        readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
704      }
705      readerThreads.setMaxErrors(maxReadErrors);
706      readerThreads.setKeyWindow(keyWindow);
707      readerThreads.setMultiGetBatchSize(multiGetBatchSize);
708      readerThreads.setRegionReplicaId(regionReplicaId);
709    }
710
711    if (isUpdate && isWrite) {
712      LOG.info("Concurrent write/update workload: making updaters aware of the " + "write point");
713      updaterThreads.linkToWriter(writerThreads);
714    }
715
716    if (isRead && (isUpdate || isWrite)) {
717      LOG.info("Concurrent write/read workload: making readers aware of the " + "write point");
718      readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
719    }
720
721    if (isWrite) {
722      System.out.println("Starting to write data...");
723      writerThreads.start(startKey, endKey, numWriterThreads);
724    }
725
726    if (isUpdate) {
727      LOG.info("Starting to mutate data...");
728      System.out.println("Starting to mutate data...");
729      // TODO : currently append and increment operations not tested with tags
730      // Will update this after it is done
731      updaterThreads.start(startKey, endKey, numUpdaterThreads);
732    }
733
734    if (isRead) {
735      System.out.println("Starting to read data...");
736      readerThreads.start(startKey, endKey, numReaderThreads);
737    }
738
739    if (isWrite) {
740      writerThreads.waitForFinish();
741    }
742
743    if (isUpdate) {
744      updaterThreads.waitForFinish();
745    }
746
747    if (isRead) {
748      readerThreads.waitForFinish();
749    }
750
751    boolean success = true;
752    if (isWrite) {
753      success = success && writerThreads.getNumWriteFailures() == 0;
754    }
755    if (isUpdate) {
756      success = success && updaterThreads.getNumWriteFailures() == 0;
757    }
758    if (isRead) {
759      success =
760        success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0;
761    }
762    return success ? EXIT_SUCCESS : EXIT_FAILURE;
763  }
764
765  private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
766    try {
767      Class<?> clazz = Class.forName(clazzName);
768      Constructor<?> constructor =
769        clazz.getConstructor(int.class, int.class, int.class, int.class, byte[][].class);
770      return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
771        minColsPerKey, maxColsPerKey, families);
772    } catch (Exception e) {
773      throw new IOException(e);
774    }
775  }
776
777  private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName,
778    LoadTestDataGenerator dataGen) throws IOException {
779    try {
780      Class<?> clazz = Class.forName(clazzName);
781      Constructor<?> constructor =
782        clazz.getConstructor(LoadTestDataGenerator.class, Configuration.class, TableName.class);
783      return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
784    } catch (Exception e) {
785      throw new IOException(e);
786    }
787  }
788
789  private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName,
790    LoadTestDataGenerator dataGen) throws IOException {
791    try {
792      Class<?> clazz = Class.forName(clazzName);
793      Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class,
794        Configuration.class, TableName.class, double.class);
795      return (MultiThreadedUpdater) constructor.newInstance(dataGen, conf, tableName,
796        updatePercent);
797    } catch (Exception e) {
798      throw new IOException(e);
799    }
800  }
801
802  private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName,
803    LoadTestDataGenerator dataGen) throws IOException {
804    try {
805      Class<?> clazz = Class.forName(clazzName);
806      Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class,
807        Configuration.class, TableName.class, double.class);
808      return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
809    } catch (Exception e) {
810      throw new IOException(e);
811    }
812  }
813
814  public static void main(String[] args) {
815    new LoadTestTool().doStaticMain(args);
816  }
817
818  /**
819   * When NUM_TABLES is specified, the function starts multiple worker threads which individually
820   * start a LoadTestTool instance to load a table. Each table name is in format &lt;tn>_&lt;index>.
821   * For example, "-tn test -num_tables 2" , table names will be "test_1", "test_2"
822   * @throws IOException if one of the load tasks is unable to complete
823   */
824  private int parallelLoadTables() throws IOException {
825    // create new command args
826    String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
827    String[] newArgs = null;
828    if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
829      newArgs = new String[cmdLineArgs.length + 2];
830      newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
831      newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
832      System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
833    } else {
834      newArgs = cmdLineArgs;
835    }
836
837    int tableNameValueIndex = -1;
838    for (int j = 0; j < newArgs.length; j++) {
839      if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
840        tableNameValueIndex = j + 1;
841      } else if (newArgs[j].endsWith(NUM_TABLES)) {
842        // change NUM_TABLES to 1 so that each worker loads one table
843        newArgs[j + 1] = "1";
844      }
845    }
846
847    // starting to load multiple tables
848    List<WorkerThread> workers = new ArrayList<>();
849    for (int i = 0; i < numTables; i++) {
850      String[] workerArgs = newArgs.clone();
851      workerArgs[tableNameValueIndex] = tableName + "_" + (i + 1);
852      WorkerThread worker = new WorkerThread(i, workerArgs);
853      workers.add(worker);
854      LOG.info(worker + " starting");
855      worker.start();
856    }
857
858    // wait for all workers finish
859    LOG.info("Waiting for worker threads to finish");
860    for (WorkerThread t : workers) {
861      try {
862        t.join();
863      } catch (InterruptedException ie) {
864        IOException iie = new InterruptedIOException();
865        iie.initCause(ie);
866        throw iie;
867      }
868      checkForErrors();
869    }
870
871    return EXIT_SUCCESS;
872  }
873
874  // If an exception is thrown by one of worker threads, it will be
875  // stored here.
876  protected AtomicReference<Throwable> thrown = new AtomicReference<>();
877
878  private void workerThreadError(Throwable t) {
879    thrown.compareAndSet(null, t);
880  }
881
882  /**
883   * Check for errors in the writer threads. If any is found, rethrow it.
884   */
885  private void checkForErrors() throws IOException {
886    Throwable thrown = this.thrown.get();
887    if (thrown == null) return;
888    if (thrown instanceof IOException) {
889      throw (IOException) thrown;
890    } else {
891      throw new RuntimeException(thrown);
892    }
893  }
894
895  class WorkerThread extends Thread {
896    private String[] workerArgs;
897
898    WorkerThread(int i, String[] args) {
899      super("WorkerThread-" + i);
900      workerArgs = args;
901    }
902
903    @Override
904    public void run() {
905      try {
906        int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
907        if (ret != 0) {
908          throw new RuntimeException("LoadTestTool exit with non-zero return code.");
909        }
910      } catch (Exception ex) {
911        LOG.error("Error in worker thread", ex);
912        workerThreadError(ex);
913      }
914    }
915  }
916
917  private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
918    String userList) throws IOException {
919    List<String> users = new ArrayList<>(Arrays.asList(userList.split(",")));
920    users.add(owner);
921    for (String user : users) {
922      String keyTabFileConfKey = "hbase." + user + ".keytab.file";
923      String principalConfKey = "hbase." + user + ".kerberos.principal";
924      if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
925        throw new IOException("Authentication configs missing for user : " + user);
926      }
927    }
928    for (String key : authConfig.stringPropertyNames()) {
929      conf.set(key, authConfig.getProperty(key));
930    }
931    LOG.debug("Added authentication properties to config successfully.");
932  }
933
934}