001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.lang.reflect.Constructor;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.atomic.AtomicReference;
028import javax.crypto.spec.SecretKeySpec;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Durability;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.io.compress.Compression;
043import org.apache.hadoop.hbase.io.crypto.Cipher;
044import org.apache.hadoop.hbase.io.crypto.Encryption;
045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
046import org.apache.hadoop.hbase.log.HBaseMarkers;
047import org.apache.hadoop.hbase.logging.Log4jUtils;
048import org.apache.hadoop.hbase.regionserver.BloomType;
049import org.apache.hadoop.hbase.security.EncryptionUtil;
050import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.AccessControlClient;
053import org.apache.hadoop.hbase.security.access.Permission;
054import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
055import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
056import org.apache.hadoop.util.ToolRunner;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.apache.zookeeper.ZooKeeper;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException;
063import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
067import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
069
070/**
071 * A command-line utility that reads, writes, and verifies data. Unlike
072 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, and
073 * supports simultaneously writing and reading the same set of keys.
074 */
075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
076public class LoadTestTool extends AbstractHBaseTool {
077
078  private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class);
079  private static final String COLON = ":";
080
081  /** Table name for the test */
082  private TableName tableName;
083
084  /** Column families for the test */
085  private byte[][] families;
086
087  /** Table name to use of not overridden on the command line */
088  protected static final String DEFAULT_TABLE_NAME = "cluster_test";
089
090  /** The default data size if not specified */
091  protected static final int DEFAULT_DATA_SIZE = 64;
092
093  /** The number of reader/writer threads if not specified */
094  protected static final int DEFAULT_NUM_THREADS = 20;
095
096  /** Usage string for the load option */
097  protected static final String OPT_USAGE_LOAD =
098    "<avg_cols_per_key>:<avg_data_size>" + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
099
100  /** Usage string for the read option */
101  protected static final String OPT_USAGE_READ =
102    "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
103
104  /** Usage string for the update option */
105  protected static final String OPT_USAGE_UPDATE = "<update_percent>[:<#threads="
106    + DEFAULT_NUM_THREADS + ">][:<#whether to ignore nonce collisions=0>]";
107
108  protected static final String OPT_USAGE_BLOOM =
109    "Bloom filter type, one of " + Arrays.toString(BloomType.values());
110
111  protected static final String OPT_USAGE_COMPRESSION =
112    "Compression type, " + "one of " + Arrays.toString(Compression.Algorithm.values());
113
114  protected static final String OPT_VERBOSE = "verbose";
115
116  public static final String OPT_BLOOM = "bloom";
117  public static final String OPT_BLOOM_PARAM = "bloom_param";
118  public static final String OPT_COMPRESSION = "compression";
119  public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
120  public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
121
122  public static final String OPT_INMEMORY = "in_memory";
123  public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF "
124    + "inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
125
126  public static final String OPT_GENERATOR = "generator";
127  public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
128    + " Any args for this class can be passed as colon separated after class name";
129
130  public static final String OPT_WRITER = "writer";
131  public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
132
133  public static final String OPT_UPDATER = "updater";
134  public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
135
136  public static final String OPT_READER = "reader";
137  public static final String OPT_READER_USAGE = "The class for executing the read requests";
138
139  protected static final String OPT_KEY_WINDOW = "key_window";
140  protected static final String OPT_WRITE = "write";
141  protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
142  public static final String OPT_MULTIPUT = "multiput";
143  public static final String OPT_MULTIGET = "multiget_batchsize";
144  protected static final String OPT_NUM_KEYS = "num_keys";
145  protected static final String OPT_READ = "read";
146  protected static final String OPT_START_KEY = "start_key";
147  public static final String OPT_TABLE_NAME = "tn";
148  public static final String OPT_COLUMN_FAMILIES = "families";
149  protected static final String OPT_ZK_QUORUM = "zk";
150  protected static final String OPT_ZK_PARENT_NODE = "zk_root";
151  protected static final String OPT_SKIP_INIT = "skip_init";
152  protected static final String OPT_INIT_ONLY = "init_only";
153  protected static final String NUM_TABLES = "num_tables";
154  protected static final String OPT_BATCHUPDATE = "batchupdate";
155  protected static final String OPT_UPDATE = "update";
156
157  public static final String OPT_ENCRYPTION = "encryption";
158  protected static final String OPT_ENCRYPTION_USAGE =
159    "Enables transparent encryption on the test table, one of "
160      + Arrays.toString(Encryption.getSupportedCiphers());
161
162  public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
163  protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE =
164    "Desired number of regions per region server. Defaults to 5.";
165  public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
166
167  public static final String OPT_REGION_REPLICATION = "region_replication";
168  protected static final String OPT_REGION_REPLICATION_USAGE =
169    "Desired number of replicas per region";
170
171  public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
172  protected static final String OPT_REGION_REPLICA_ID_USAGE =
173    "Region replica id to do the reads from";
174
175  public static final String OPT_MOB_THRESHOLD = "mob_threshold";
176  protected static final String OPT_MOB_THRESHOLD_USAGE =
177    "Desired cell size to exceed in bytes that will use the MOB write path";
178
179  protected static final long DEFAULT_START_KEY = 0;
180
181  /** This will be removed as we factor out the dependency on command line */
182  protected CommandLine cmd;
183
184  protected MultiThreadedWriter writerThreads = null;
185  protected MultiThreadedReader readerThreads = null;
186  protected MultiThreadedUpdater updaterThreads = null;
187
188  protected long startKey, endKey;
189
190  protected boolean isVerbose, isWrite, isRead, isUpdate;
191  protected boolean deferredLogFlush;
192
193  // Column family options
194  protected DataBlockEncoding dataBlockEncodingAlgo;
195  protected Compression.Algorithm compressAlgo;
196  protected BloomType bloomType;
197  private boolean inMemoryCF;
198
199  private User userOwner;
200  // Writer options
201  protected int numWriterThreads = DEFAULT_NUM_THREADS;
202  protected int minColsPerKey, maxColsPerKey;
203  protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
204  protected boolean isMultiPut;
205
206  // Updater options
207  protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
208  protected int updatePercent;
209  protected boolean ignoreConflicts = false;
210  protected boolean isBatchUpdate;
211
212  // Reader options
213  private int numReaderThreads = DEFAULT_NUM_THREADS;
214  private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
215  private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
216  private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
217  private int verifyPercent;
218
219  private int numTables = 1;
220
221  private String superUser;
222
223  private String userNames;
224  // This file is used to read authentication information in secure clusters.
225  private String authnFileName;
226
227  private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
228  private int regionReplication = -1; // not set
229  private int regionReplicaId = -1; // not set
230
231  private int mobThreshold = -1; // not set
232
233  // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
234  // console tool itself should only be used from console.
235  protected boolean isSkipInit = false;
236  protected boolean isInitOnly = false;
237
238  protected Cipher cipher = null;
239
240  protected String[] splitColonSeparated(String option, int minNumCols, int maxNumCols) {
241    String optVal = cmd.getOptionValue(option);
242    String[] cols = optVal.split(COLON);
243    if (cols.length < minNumCols || cols.length > maxNumCols) {
244      throw new IllegalArgumentException(
245        "Expected at least " + minNumCols + " columns but no more than " + maxNumCols
246          + " in the colon-separated value '" + optVal + "' of the " + "-" + option + " option");
247    }
248    return cols;
249  }
250
251  protected int getNumThreads(String numThreadsStr) {
252    return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
253  }
254
255  public byte[][] getColumnFamilies() {
256    return families;
257  }
258
259  /**
260   * Apply column family options such as Bloom filters, compression, and data block encoding.
261   */
262  protected void applyColumnFamilyOptions(TableName tableName, byte[][] columnFamilies)
263    throws IOException {
264    try (Connection conn = ConnectionFactory.createConnection(conf);
265      Admin admin = conn.getAdmin()) {
266      TableDescriptor tableDesc = admin.getDescriptor(tableName);
267      LOG.info("Disabling table " + tableName);
268      admin.disableTable(tableName);
269      for (byte[] cf : columnFamilies) {
270        ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf);
271        boolean isNewCf = columnDesc == null;
272        ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf
273          ? ColumnFamilyDescriptorBuilder.newBuilder(cf)
274          : ColumnFamilyDescriptorBuilder.newBuilder(columnDesc);
275        if (bloomType != null) {
276          columnDescBuilder.setBloomFilterType(bloomType);
277        }
278        if (compressAlgo != null) {
279          columnDescBuilder.setCompressionType(compressAlgo);
280        }
281        if (dataBlockEncodingAlgo != null) {
282          columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo);
283        }
284        if (inMemoryCF) {
285          columnDescBuilder.setInMemory(inMemoryCF);
286        }
287        if (cipher != null) {
288          byte[] keyBytes = new byte[cipher.getKeyLength()];
289          Bytes.secureRandom(keyBytes);
290          columnDescBuilder.setEncryptionType(cipher.getName());
291          columnDescBuilder.setEncryptionKey(EncryptionUtil.wrapKey(conf,
292            User.getCurrent().getShortName(), new SecretKeySpec(keyBytes, cipher.getName())));
293        }
294        if (mobThreshold >= 0) {
295          columnDescBuilder.setMobEnabled(true);
296          columnDescBuilder.setMobThreshold(mobThreshold);
297        }
298
299        if (isNewCf) {
300          admin.addColumnFamily(tableName, columnDescBuilder.build());
301        } else {
302          admin.modifyColumnFamily(tableName, columnDescBuilder.build());
303        }
304      }
305      LOG.info("Enabling table " + tableName);
306      admin.enableTable(tableName);
307    }
308  }
309
310  @Override
311  protected void addOptions() {
312    addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper");
313    addOptWithArg(OPT_ZK_QUORUM,
314      "ZK quorum as comma-separated host names " + "without port numbers");
315    addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
316    addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
317    addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
318    addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
319    addOptWithArg(OPT_READ, OPT_USAGE_READ);
320    addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
321    addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
322    addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
323    addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
324    addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
325    addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING,
326      HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
327    addOptWithArg(OPT_MAX_READ_ERRORS,
328      "The maximum number of read errors "
329        + "to tolerate before terminating all reader threads. The default is "
330        + MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
331    addOptWithArg(OPT_MULTIGET,
332      "Whether to use multi-gets as opposed to " + "separate gets for every column in a row");
333    addOptWithArg(OPT_KEY_WINDOW,
334      "The 'key window' to maintain between "
335        + "reads and writes for concurrent write/read workload. The default " + "is "
336        + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
337
338    addOptNoArg(OPT_MULTIPUT,
339      "Whether to use multi-puts as opposed to " + "separate puts for every column in a row");
340    addOptNoArg(OPT_BATCHUPDATE,
341      "Whether to use batch as opposed to " + "separate updates for every column in a row");
342    addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
343    addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
344    addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
345    addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
346    addOptWithArg(OPT_READER, OPT_READER_USAGE);
347
348    addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
349    addOptWithArg(OPT_START_KEY, "The first key to read/write "
350      + "(a 0-based index). The default value is " + DEFAULT_START_KEY + ".");
351    addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists");
352
353    addOptWithArg(NUM_TABLES,
354      "A positive integer number. When a number n is specified, load test "
355        + "tool  will load n table parallely. -tn parameter value becomes "
356        + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
357
358    addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
359    addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
360    addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
361    addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
362    addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
363    addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
364  }
365
366  @Override
367  protected CommandLineParser newParser() {
368    // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
369    // Validate in parse() to get helpful error messages instead of exploding in processOptions()
370    return new DefaultParser() {
371      @Override
372      public CommandLine parse(Options opts, String[] args, Properties props, boolean stop)
373        throws ParseException {
374        CommandLine cl = super.parse(opts, args, props, stop);
375
376        boolean isReadWriteUpdate =
377          cmd.hasOption(OPT_READ) || cmd.hasOption(OPT_WRITE) || cmd.hasOption(OPT_UPDATE);
378        boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
379
380        if (!isInitOnly && !isReadWriteUpdate) {
381          throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
382            + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
383        }
384
385        if (isInitOnly && isReadWriteUpdate) {
386          throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -"
387            + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
388        }
389
390        if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) {
391          throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode.");
392        }
393
394        return cl;
395      }
396    };
397  }
398
399  @Override
400  protected void processOptions(CommandLine cmd) {
401    this.cmd = cmd;
402
403    tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME));
404
405    if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
406      String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
407      families = new byte[list.length][];
408      for (int i = 0; i < list.length; i++) {
409        families[i] = Bytes.toBytes(list[i]);
410      }
411    } else {
412      families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES;
413    }
414
415    isVerbose = cmd.hasOption(OPT_VERBOSE);
416    isWrite = cmd.hasOption(OPT_WRITE);
417    isRead = cmd.hasOption(OPT_READ);
418    isUpdate = cmd.hasOption(OPT_UPDATE);
419    isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
420    deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
421
422    if (!isInitOnly) {
423      startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, String.valueOf(DEFAULT_START_KEY)), 0,
424        Long.MAX_VALUE);
425      long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, Long.MAX_VALUE - startKey);
426      endKey = startKey + numKeys;
427      isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
428      System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
429    }
430
431    parseColumnFamilyOptions(cmd);
432
433    if (isWrite) {
434      String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
435
436      int colIndex = 0;
437      minColsPerKey = 1;
438      maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
439      int avgColDataSize = parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
440      minColDataSize = avgColDataSize / 2;
441      maxColDataSize = avgColDataSize * 3 / 2;
442
443      if (colIndex < writeOpts.length) {
444        numWriterThreads = getNumThreads(writeOpts[colIndex++]);
445      }
446
447      isMultiPut = cmd.hasOption(OPT_MULTIPUT);
448
449      mobThreshold = -1;
450      if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
451        mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
452      }
453
454      System.out.println("Multi-puts: " + isMultiPut);
455      System.out.println("Columns per key: " + minColsPerKey + ".." + maxColsPerKey);
456      System.out.println("Data size per column: " + minColDataSize + ".." + maxColDataSize);
457    }
458
459    if (isUpdate) {
460      String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
461      int colIndex = 0;
462      updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
463      if (colIndex < mutateOpts.length) {
464        numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
465      }
466      if (colIndex < mutateOpts.length) {
467        ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
468      }
469
470      isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
471
472      System.out.println("Batch updates: " + isBatchUpdate);
473      System.out.println("Percent of keys to update: " + updatePercent);
474      System.out.println("Updater threads: " + numUpdaterThreads);
475      System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
476    }
477
478    if (isRead) {
479      String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
480      int colIndex = 0;
481      verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
482      if (colIndex < readOpts.length) {
483        numReaderThreads = getNumThreads(readOpts[colIndex++]);
484      }
485
486      if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
487        maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), 0, Integer.MAX_VALUE);
488      }
489
490      if (cmd.hasOption(OPT_KEY_WINDOW)) {
491        keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), 0, Integer.MAX_VALUE);
492      }
493
494      if (cmd.hasOption(OPT_MULTIGET)) {
495        multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), 0, Integer.MAX_VALUE);
496      }
497
498      System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
499      System.out.println("Percent of keys to verify: " + verifyPercent);
500      System.out.println("Reader threads: " + numReaderThreads);
501    }
502
503    numTables = 1;
504    if (cmd.hasOption(NUM_TABLES)) {
505      numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
506    }
507
508    numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
509    if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
510      numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
511    }
512
513    regionReplication = 1;
514    if (cmd.hasOption(OPT_REGION_REPLICATION)) {
515      regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
516    }
517
518    regionReplicaId = -1;
519    if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
520      regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
521    }
522  }
523
524  private void parseColumnFamilyOptions(CommandLine cmd) {
525    String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING);
526    dataBlockEncodingAlgo =
527      dataBlockEncodingStr == null ? null : DataBlockEncoding.valueOf(dataBlockEncodingStr);
528
529    String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
530    compressAlgo =
531      compressStr == null ? Compression.Algorithm.NONE : Compression.Algorithm.valueOf(compressStr);
532
533    String bloomStr = cmd.getOptionValue(OPT_BLOOM);
534    bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr);
535
536    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
537      if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
538        LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
539      } else {
540        conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
541      }
542    }
543
544    inMemoryCF = cmd.hasOption(OPT_INMEMORY);
545    if (cmd.hasOption(OPT_ENCRYPTION)) {
546      cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
547    }
548
549  }
550
551  public void initTestTable() throws IOException {
552    Durability durability = Durability.USE_DEFAULT;
553    if (deferredLogFlush) {
554      durability = Durability.ASYNC_WAL;
555    }
556
557    HBaseTestingUtil.createPreSplitLoadTestTable(conf, tableName, getColumnFamilies(), compressAlgo,
558      dataBlockEncodingAlgo, numRegionsPerServer, regionReplication, durability);
559    applyColumnFamilyOptions(tableName, getColumnFamilies());
560  }
561
562  @Override
563  protected int doWork() throws IOException {
564    if (!isVerbose) {
565      Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN");
566    }
567    if (numTables > 1) {
568      return parallelLoadTables();
569    } else {
570      return loadTable();
571    }
572  }
573
574  protected int loadTable() throws IOException {
575    if (cmd.hasOption(OPT_ZK_QUORUM)) {
576      conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
577    }
578    if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
579      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
580    }
581
582    if (isInitOnly) {
583      LOG.info("Initializing only; no reads or writes");
584      initTestTable();
585      return 0;
586    }
587
588    if (!isSkipInit) {
589      initTestTable();
590    }
591    LoadTestDataGenerator dataGen = null;
592    if (cmd.hasOption(OPT_GENERATOR)) {
593      String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
594      dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
595      String[] args;
596      if (dataGen instanceof LoadTestDataGeneratorWithACL) {
597        LOG.info("Using LoadTestDataGeneratorWithACL");
598        if (User.isHBaseSecurityEnabled(conf)) {
599          LOG.info("Security is enabled");
600          authnFileName = clazzAndArgs[1];
601          superUser = clazzAndArgs[2];
602          userNames = clazzAndArgs[3];
603          args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
604          Properties authConfig = new Properties();
605          authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
606          try {
607            addAuthInfoToConf(authConfig, conf, superUser, userNames);
608          } catch (IOException exp) {
609            LOG.error(exp.toString(), exp);
610            return EXIT_FAILURE;
611          }
612          userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser));
613        } else {
614          superUser = clazzAndArgs[1];
615          userNames = clazzAndArgs[2];
616          args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
617          userOwner = User.createUserForTesting(conf, superUser, new String[0]);
618        }
619      } else {
620        args = clazzAndArgs.length == 1
621          ? new String[0]
622          : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
623      }
624      dataGen.initialize(args);
625    } else {
626      // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
627      dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
628        minColsPerKey, maxColsPerKey, families);
629    }
630
631    if (userOwner != null) {
632      LOG.info("Granting permissions for user " + userOwner.getShortName());
633      Permission.Action[] actions = { Permission.Action.ADMIN, Permission.Action.CREATE,
634        Permission.Action.READ, Permission.Action.WRITE };
635      try {
636        AccessControlClient.grant(ConnectionFactory.createConnection(conf), tableName,
637          userOwner.getShortName(), null, null, actions);
638      } catch (Throwable e) {
639        LOG.error(HBaseMarkers.FATAL,
640          "Error in granting permission for the user " + userOwner.getShortName(), e);
641        return EXIT_FAILURE;
642      }
643    }
644
645    if (userNames != null) {
646      // This will be comma separated list of expressions.
647      String users[] = userNames.split(",");
648      User user = null;
649      for (String userStr : users) {
650        if (User.isHBaseSecurityEnabled(conf)) {
651          user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr));
652        } else {
653          user = User.createUserForTesting(conf, userStr, new String[0]);
654        }
655      }
656    }
657
658    if (isWrite) {
659      if (userOwner != null) {
660        writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
661      } else {
662        String writerClass = null;
663        if (cmd.hasOption(OPT_WRITER)) {
664          writerClass = cmd.getOptionValue(OPT_WRITER);
665        } else {
666          writerClass = MultiThreadedWriter.class.getCanonicalName();
667        }
668
669        writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
670      }
671      writerThreads.setMultiPut(isMultiPut);
672    }
673
674    if (isUpdate) {
675      if (userOwner != null) {
676        updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
677          userOwner, userNames);
678      } else {
679        String updaterClass = null;
680        if (cmd.hasOption(OPT_UPDATER)) {
681          updaterClass = cmd.getOptionValue(OPT_UPDATER);
682        } else {
683          updaterClass = MultiThreadedUpdater.class.getCanonicalName();
684        }
685        updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
686      }
687      updaterThreads.setBatchUpdate(isBatchUpdate);
688      updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
689    }
690
691    if (isRead) {
692      if (userOwner != null) {
693        readerThreads =
694          new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames);
695      } else {
696        String readerClass = null;
697        if (cmd.hasOption(OPT_READER)) {
698          readerClass = cmd.getOptionValue(OPT_READER);
699        } else {
700          readerClass = MultiThreadedReader.class.getCanonicalName();
701        }
702        readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
703      }
704      readerThreads.setMaxErrors(maxReadErrors);
705      readerThreads.setKeyWindow(keyWindow);
706      readerThreads.setMultiGetBatchSize(multiGetBatchSize);
707      readerThreads.setRegionReplicaId(regionReplicaId);
708    }
709
710    if (isUpdate && isWrite) {
711      LOG.info("Concurrent write/update workload: making updaters aware of the " + "write point");
712      updaterThreads.linkToWriter(writerThreads);
713    }
714
715    if (isRead && (isUpdate || isWrite)) {
716      LOG.info("Concurrent write/read workload: making readers aware of the " + "write point");
717      readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
718    }
719
720    if (isWrite) {
721      System.out.println("Starting to write data...");
722      writerThreads.start(startKey, endKey, numWriterThreads);
723    }
724
725    if (isUpdate) {
726      LOG.info("Starting to mutate data...");
727      System.out.println("Starting to mutate data...");
728      // TODO : currently append and increment operations not tested with tags
729      // Will update this after it is done
730      updaterThreads.start(startKey, endKey, numUpdaterThreads);
731    }
732
733    if (isRead) {
734      System.out.println("Starting to read data...");
735      readerThreads.start(startKey, endKey, numReaderThreads);
736    }
737
738    if (isWrite) {
739      writerThreads.waitForFinish();
740    }
741
742    if (isUpdate) {
743      updaterThreads.waitForFinish();
744    }
745
746    if (isRead) {
747      readerThreads.waitForFinish();
748    }
749
750    boolean success = true;
751    if (isWrite) {
752      success = success && writerThreads.getNumWriteFailures() == 0;
753    }
754    if (isUpdate) {
755      success = success && updaterThreads.getNumWriteFailures() == 0;
756    }
757    if (isRead) {
758      success =
759        success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0;
760    }
761    return success ? EXIT_SUCCESS : EXIT_FAILURE;
762  }
763
764  private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
765    try {
766      Class<?> clazz = Class.forName(clazzName);
767      Constructor<?> constructor =
768        clazz.getConstructor(int.class, int.class, int.class, int.class, byte[][].class);
769      return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
770        minColsPerKey, maxColsPerKey, families);
771    } catch (Exception e) {
772      throw new IOException(e);
773    }
774  }
775
776  private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName,
777    LoadTestDataGenerator dataGen) throws IOException {
778    try {
779      Class<?> clazz = Class.forName(clazzName);
780      Constructor<?> constructor =
781        clazz.getConstructor(LoadTestDataGenerator.class, Configuration.class, TableName.class);
782      return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
783    } catch (Exception e) {
784      throw new IOException(e);
785    }
786  }
787
788  private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName,
789    LoadTestDataGenerator dataGen) throws IOException {
790    try {
791      Class<?> clazz = Class.forName(clazzName);
792      Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class,
793        Configuration.class, TableName.class, double.class);
794      return (MultiThreadedUpdater) constructor.newInstance(dataGen, conf, tableName,
795        updatePercent);
796    } catch (Exception e) {
797      throw new IOException(e);
798    }
799  }
800
801  private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName,
802    LoadTestDataGenerator dataGen) throws IOException {
803    try {
804      Class<?> clazz = Class.forName(clazzName);
805      Constructor<?> constructor = clazz.getConstructor(LoadTestDataGenerator.class,
806        Configuration.class, TableName.class, double.class);
807      return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
808    } catch (Exception e) {
809      throw new IOException(e);
810    }
811  }
812
813  public static void main(String[] args) {
814    new LoadTestTool().doStaticMain(args);
815  }
816
817  /**
818   * When NUM_TABLES is specified, the function starts multiple worker threads which individually
819   * start a LoadTestTool instance to load a table. Each table name is in format &lt;tn>_&lt;index>.
820   * For example, "-tn test -num_tables 2" , table names will be "test_1", "test_2"
821   * @throws IOException if one of the load tasks is unable to complete
822   */
823  private int parallelLoadTables() throws IOException {
824    // create new command args
825    String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
826    String[] newArgs = null;
827    if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
828      newArgs = new String[cmdLineArgs.length + 2];
829      newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
830      newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
831      System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
832    } else {
833      newArgs = cmdLineArgs;
834    }
835
836    int tableNameValueIndex = -1;
837    for (int j = 0; j < newArgs.length; j++) {
838      if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
839        tableNameValueIndex = j + 1;
840      } else if (newArgs[j].endsWith(NUM_TABLES)) {
841        // change NUM_TABLES to 1 so that each worker loads one table
842        newArgs[j + 1] = "1";
843      }
844    }
845
846    // starting to load multiple tables
847    List<WorkerThread> workers = new ArrayList<>();
848    for (int i = 0; i < numTables; i++) {
849      String[] workerArgs = newArgs.clone();
850      workerArgs[tableNameValueIndex] = tableName + "_" + (i + 1);
851      WorkerThread worker = new WorkerThread(i, workerArgs);
852      workers.add(worker);
853      LOG.info(worker + " starting");
854      worker.start();
855    }
856
857    // wait for all workers finish
858    LOG.info("Waiting for worker threads to finish");
859    for (WorkerThread t : workers) {
860      try {
861        t.join();
862      } catch (InterruptedException ie) {
863        IOException iie = new InterruptedIOException();
864        iie.initCause(ie);
865        throw iie;
866      }
867      checkForErrors();
868    }
869
870    return EXIT_SUCCESS;
871  }
872
873  // If an exception is thrown by one of worker threads, it will be
874  // stored here.
875  protected AtomicReference<Throwable> thrown = new AtomicReference<>();
876
877  private void workerThreadError(Throwable t) {
878    thrown.compareAndSet(null, t);
879  }
880
881  /**
882   * Check for errors in the writer threads. If any is found, rethrow it.
883   */
884  private void checkForErrors() throws IOException {
885    Throwable thrown = this.thrown.get();
886    if (thrown == null) return;
887    if (thrown instanceof IOException) {
888      throw (IOException) thrown;
889    } else {
890      throw new RuntimeException(thrown);
891    }
892  }
893
894  class WorkerThread extends Thread {
895    private String[] workerArgs;
896
897    WorkerThread(int i, String[] args) {
898      super("WorkerThread-" + i);
899      workerArgs = args;
900    }
901
902    @Override
903    public void run() {
904      try {
905        int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
906        if (ret != 0) {
907          throw new RuntimeException("LoadTestTool exit with non-zero return code.");
908        }
909      } catch (Exception ex) {
910        LOG.error("Error in worker thread", ex);
911        workerThreadError(ex);
912      }
913    }
914  }
915
916  private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
917    String userList) throws IOException {
918    List<String> users = new ArrayList<>(Arrays.asList(userList.split(",")));
919    users.add(owner);
920    for (String user : users) {
921      String keyTabFileConfKey = "hbase." + user + ".keytab.file";
922      String principalConfKey = "hbase." + user + ".kerberos.principal";
923      if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
924        throw new IOException("Authentication configs missing for user : " + user);
925      }
926    }
927    for (String key : authConfig.stringPropertyNames()) {
928      conf.set(key, authConfig.getProperty(key));
929    }
930    LOG.debug("Added authentication properties to config successfully.");
931  }
932
933}