001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.lang.reflect.Constructor;
022import java.security.SecureRandom;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.atomic.AtomicReference;
028
029import javax.crypto.spec.SecretKeySpec;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.log4j.Level;
040import org.apache.log4j.LogManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.zookeeper.ZooKeeper;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.crypto.Cipher;
052import org.apache.hadoop.hbase.io.crypto.Encryption;
053import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
054import org.apache.hadoop.hbase.log.HBaseMarkers;
055import org.apache.hadoop.hbase.regionserver.BloomType;
056import org.apache.hadoop.hbase.security.EncryptionUtil;
057import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
058import org.apache.hadoop.hbase.security.User;
059import org.apache.hadoop.hbase.security.access.AccessControlClient;
060import org.apache.hadoop.hbase.security.access.Permission;
061import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
062import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
063import org.apache.hadoop.util.ToolRunner;
064
065import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
067import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
069import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
070import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
071import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
072
073/**
074 * A command-line utility that reads, writes, and verifies data. Unlike
075 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written,
076 * and supports simultaneously writing and reading the same set of keys.
077 */
078@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
079public class LoadTestTool extends AbstractHBaseTool {
080
081  private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class);
082  private static final String COLON = ":";
083
084  /** Table name for the test */
085  private TableName tableName;
086
087  /** Column families for the test */
088  private byte[][] families;
089
090  /** Table name to use of not overridden on the command line */
091  protected static final String DEFAULT_TABLE_NAME = "cluster_test";
092
093  /** The default data size if not specified */
094  protected static final int DEFAULT_DATA_SIZE = 64;
095
096  /** The number of reader/writer threads if not specified */
097  protected static final int DEFAULT_NUM_THREADS = 20;
098
099  /** Usage string for the load option */
100  protected static final String OPT_USAGE_LOAD =
101      "<avg_cols_per_key>:<avg_data_size>" +
102      "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
103
104  /** Usage string for the read option */
105  protected static final String OPT_USAGE_READ =
106      "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
107
108  /** Usage string for the update option */
109  protected static final String OPT_USAGE_UPDATE =
110      "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
111      + ">][:<#whether to ignore nonce collisions=0>]";
112
113  protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
114      Arrays.toString(BloomType.values());
115
116  protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
117      "one of " + Arrays.toString(Compression.Algorithm.values());
118
119  protected static final String OPT_VERBOSE = "verbose";
120
121  public static final String OPT_BLOOM = "bloom";
122  public static final String OPT_BLOOM_PARAM = "bloom_param";
123  public static final String OPT_COMPRESSION = "compression";
124  public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
125  public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
126
127  public static final String OPT_INMEMORY = "in_memory";
128  public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
129      "inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
130
131  public static final String OPT_GENERATOR = "generator";
132  public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
133      + " Any args for this class can be passed as colon separated after class name";
134
135  public static final String OPT_WRITER = "writer";
136  public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
137
138  public static final String OPT_UPDATER = "updater";
139  public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
140
141  public static final String OPT_READER = "reader";
142  public static final String OPT_READER_USAGE = "The class for executing the read requests";
143
144  protected static final String OPT_KEY_WINDOW = "key_window";
145  protected static final String OPT_WRITE = "write";
146  protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
147  public static final String OPT_MULTIPUT = "multiput";
148  public static final String OPT_MULTIGET = "multiget_batchsize";
149  protected static final String OPT_NUM_KEYS = "num_keys";
150  protected static final String OPT_READ = "read";
151  protected static final String OPT_START_KEY = "start_key";
152  public static final String OPT_TABLE_NAME = "tn";
153  public static final String OPT_COLUMN_FAMILIES = "families";
154  protected static final String OPT_ZK_QUORUM = "zk";
155  protected static final String OPT_ZK_PARENT_NODE = "zk_root";
156  protected static final String OPT_SKIP_INIT = "skip_init";
157  protected static final String OPT_INIT_ONLY = "init_only";
158  protected static final String NUM_TABLES = "num_tables";
159  protected static final String OPT_BATCHUPDATE = "batchupdate";
160  protected static final String OPT_UPDATE = "update";
161
162  public static final String OPT_ENCRYPTION = "encryption";
163  protected static final String OPT_ENCRYPTION_USAGE =
164    "Enables transparent encryption on the test table, one of " +
165    Arrays.toString(Encryption.getSupportedCiphers());
166
167  public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
168  protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
169    = "Desired number of regions per region server. Defaults to 5.";
170  public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
171
172  public static final String OPT_REGION_REPLICATION = "region_replication";
173  protected static final String OPT_REGION_REPLICATION_USAGE =
174      "Desired number of replicas per region";
175
176  public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
177  protected static final String OPT_REGION_REPLICA_ID_USAGE =
178      "Region replica id to do the reads from";
179
180  public static final String OPT_MOB_THRESHOLD = "mob_threshold";
181  protected static final String OPT_MOB_THRESHOLD_USAGE =
182      "Desired cell size to exceed in bytes that will use the MOB write path";
183
184  protected static final long DEFAULT_START_KEY = 0;
185
186  /** This will be removed as we factor out the dependency on command line */
187  protected CommandLine cmd;
188
189  protected MultiThreadedWriter writerThreads = null;
190  protected MultiThreadedReader readerThreads = null;
191  protected MultiThreadedUpdater updaterThreads = null;
192
193  protected long startKey, endKey;
194
195  protected boolean isVerbose, isWrite, isRead, isUpdate;
196  protected boolean deferredLogFlush;
197
198  // Column family options
199  protected DataBlockEncoding dataBlockEncodingAlgo;
200  protected Compression.Algorithm compressAlgo;
201  protected BloomType bloomType;
202  private boolean inMemoryCF;
203
204  private User userOwner;
205  // Writer options
206  protected int numWriterThreads = DEFAULT_NUM_THREADS;
207  protected int minColsPerKey, maxColsPerKey;
208  protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
209  protected boolean isMultiPut;
210
211  // Updater options
212  protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
213  protected int updatePercent;
214  protected boolean ignoreConflicts = false;
215  protected boolean isBatchUpdate;
216
217  // Reader options
218  private int numReaderThreads = DEFAULT_NUM_THREADS;
219  private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
220  private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
221  private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
222  private int verifyPercent;
223
224  private int numTables = 1;
225
226  private String superUser;
227
228  private String userNames;
229  //This file is used to read authentication information in secure clusters.
230  private String authnFileName;
231
232  private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
233  private int regionReplication = -1; // not set
234  private int regionReplicaId = -1; // not set
235
236  private int mobThreshold = -1; // not set
237
238  // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
239  //       console tool itself should only be used from console.
240  protected boolean isSkipInit = false;
241  protected boolean isInitOnly = false;
242
243  protected Cipher cipher = null;
244
245  protected String[] splitColonSeparated(String option,
246      int minNumCols, int maxNumCols) {
247    String optVal = cmd.getOptionValue(option);
248    String[] cols = optVal.split(COLON);
249    if (cols.length < minNumCols || cols.length > maxNumCols) {
250      throw new IllegalArgumentException("Expected at least "
251          + minNumCols + " columns but no more than " + maxNumCols +
252          " in the colon-separated value '" + optVal + "' of the " +
253          "-" + option + " option");
254    }
255    return cols;
256  }
257
258  protected int getNumThreads(String numThreadsStr) {
259    return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
260  }
261
262  public byte[][] getColumnFamilies() {
263    return families;
264  }
265
266  /**
267   * Apply column family options such as Bloom filters, compression, and data
268   * block encoding.
269   */
270  protected void applyColumnFamilyOptions(TableName tableName,
271      byte[][] columnFamilies) throws IOException {
272    try (Connection conn = ConnectionFactory.createConnection(conf);
273        Admin admin = conn.getAdmin()) {
274      TableDescriptor tableDesc = admin.getDescriptor(tableName);
275      LOG.info("Disabling table " + tableName);
276      admin.disableTable(tableName);
277      for (byte[] cf : columnFamilies) {
278        ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf);
279        boolean isNewCf = columnDesc == null;
280        ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf ?
281            ColumnFamilyDescriptorBuilder.newBuilder(cf) :
282            ColumnFamilyDescriptorBuilder.newBuilder(columnDesc);
283        if (bloomType != null) {
284          columnDescBuilder.setBloomFilterType(bloomType);
285        }
286        if (compressAlgo != null) {
287          columnDescBuilder.setCompressionType(compressAlgo);
288        }
289        if (dataBlockEncodingAlgo != null) {
290          columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo);
291        }
292        if (inMemoryCF) {
293          columnDescBuilder.setInMemory(inMemoryCF);
294        }
295        if (cipher != null) {
296          byte[] keyBytes = new byte[cipher.getKeyLength()];
297          new SecureRandom().nextBytes(keyBytes);
298          columnDescBuilder.setEncryptionType(cipher.getName());
299          columnDescBuilder.setEncryptionKey(
300              EncryptionUtil.wrapKey(conf,
301                  User.getCurrent().getShortName(),
302                  new SecretKeySpec(keyBytes,
303                      cipher.getName())));
304        }
305        if (mobThreshold >= 0) {
306          columnDescBuilder.setMobEnabled(true);
307          columnDescBuilder.setMobThreshold(mobThreshold);
308        }
309
310        if (isNewCf) {
311          admin.addColumnFamily(tableName, columnDescBuilder.build());
312        } else {
313          admin.modifyColumnFamily(tableName, columnDescBuilder.build());
314        }
315      }
316      LOG.info("Enabling table " + tableName);
317      admin.enableTable(tableName);
318    }
319  }
320
321  @Override
322  protected void addOptions() {
323    addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper");
324    addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
325        "without port numbers");
326    addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
327    addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
328    addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
329    addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
330    addOptWithArg(OPT_READ, OPT_USAGE_READ);
331    addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
332    addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
333    addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
334    addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
335    addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
336    addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
337    addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
338        "to tolerate before terminating all reader threads. The default is " +
339        MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
340    addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
341        "separate gets for every column in a row");
342    addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
343        "reads and writes for concurrent write/read workload. The default " +
344        "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
345
346    addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
347        "separate puts for every column in a row");
348    addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
349        "separate updates for every column in a row");
350    addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
351    addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
352    addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
353    addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
354    addOptWithArg(OPT_READER, OPT_READER_USAGE);
355
356    addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
357    addOptWithArg(OPT_START_KEY, "The first key to read/write " +
358        "(a 0-based index). The default value is " +
359        DEFAULT_START_KEY + ".");
360    addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
361        + "already exists");
362
363    addOptWithArg(NUM_TABLES,
364      "A positive integer number. When a number n is specified, load test "
365          + "tool  will load n table parallely. -tn parameter value becomes "
366          + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
367
368    addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
369    addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
370    addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
371    addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
372    addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
373    addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
374  }
375
376  @Override
377  protected CommandLineParser newParser() {
378    // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
379    // Validate in parse() to get helpful error messages instead of exploding in processOptions()
380    return new DefaultParser() {
381      @Override
382      public CommandLine parse(Options opts, String[] args, Properties props, boolean stop)
383          throws ParseException {
384        CommandLine cl = super.parse(opts, args, props, stop);
385
386        boolean isReadWriteUpdate = cmd.hasOption(OPT_READ)
387            || cmd.hasOption(OPT_WRITE)
388            || cmd.hasOption(OPT_UPDATE);
389        boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
390
391        if (!isInitOnly && !isReadWriteUpdate) {
392          throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
393              + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
394        }
395
396        if (isInitOnly && isReadWriteUpdate) {
397          throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -"
398              + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
399        }
400
401        if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) {
402          throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode.");
403        }
404
405        return cl;
406      }
407    };
408  }
409
410  @Override
411  protected void processOptions(CommandLine cmd) {
412    this.cmd = cmd;
413
414    tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
415        DEFAULT_TABLE_NAME));
416
417    if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
418      String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
419      families = new byte[list.length][];
420      for (int i = 0; i < list.length; i++) {
421        families[i] = Bytes.toBytes(list[i]);
422      }
423    } else {
424      families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES;
425    }
426
427    isVerbose = cmd.hasOption(OPT_VERBOSE);
428    isWrite = cmd.hasOption(OPT_WRITE);
429    isRead = cmd.hasOption(OPT_READ);
430    isUpdate = cmd.hasOption(OPT_UPDATE);
431    isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
432    deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
433
434    if (!isInitOnly) {
435      startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
436          String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
437      long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
438          Long.MAX_VALUE - startKey);
439      endKey = startKey + numKeys;
440      isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
441      System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
442    }
443
444    parseColumnFamilyOptions(cmd);
445
446    if (isWrite) {
447      String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
448
449      int colIndex = 0;
450      minColsPerKey = 1;
451      maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
452      int avgColDataSize =
453          parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
454      minColDataSize = avgColDataSize / 2;
455      maxColDataSize = avgColDataSize * 3 / 2;
456
457      if (colIndex < writeOpts.length) {
458        numWriterThreads = getNumThreads(writeOpts[colIndex++]);
459      }
460
461      isMultiPut = cmd.hasOption(OPT_MULTIPUT);
462
463      mobThreshold = -1;
464      if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
465        mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
466      }
467
468      System.out.println("Multi-puts: " + isMultiPut);
469      System.out.println("Columns per key: " + minColsPerKey + ".."
470          + maxColsPerKey);
471      System.out.println("Data size per column: " + minColDataSize + ".."
472          + maxColDataSize);
473    }
474
475    if (isUpdate) {
476      String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
477      int colIndex = 0;
478      updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
479      if (colIndex < mutateOpts.length) {
480        numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
481      }
482      if (colIndex < mutateOpts.length) {
483        ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
484      }
485
486      isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
487
488      System.out.println("Batch updates: " + isBatchUpdate);
489      System.out.println("Percent of keys to update: " + updatePercent);
490      System.out.println("Updater threads: " + numUpdaterThreads);
491      System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
492    }
493
494    if (isRead) {
495      String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
496      int colIndex = 0;
497      verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
498      if (colIndex < readOpts.length) {
499        numReaderThreads = getNumThreads(readOpts[colIndex++]);
500      }
501
502      if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
503        maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
504            0, Integer.MAX_VALUE);
505      }
506
507      if (cmd.hasOption(OPT_KEY_WINDOW)) {
508        keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
509            0, Integer.MAX_VALUE);
510      }
511
512      if (cmd.hasOption(OPT_MULTIGET)) {
513        multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
514            0, Integer.MAX_VALUE);
515      }
516
517      System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
518      System.out.println("Percent of keys to verify: " + verifyPercent);
519      System.out.println("Reader threads: " + numReaderThreads);
520    }
521
522    numTables = 1;
523    if (cmd.hasOption(NUM_TABLES)) {
524      numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
525    }
526
527    numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
528    if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
529      numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
530    }
531
532    regionReplication = 1;
533    if (cmd.hasOption(OPT_REGION_REPLICATION)) {
534      regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
535    }
536
537    regionReplicaId = -1;
538    if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
539      regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
540    }
541  }
542
543  private void parseColumnFamilyOptions(CommandLine cmd) {
544    String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING);
545    dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
546        DataBlockEncoding.valueOf(dataBlockEncodingStr);
547
548    String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
549    compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
550        Compression.Algorithm.valueOf(compressStr);
551
552    String bloomStr = cmd.getOptionValue(OPT_BLOOM);
553    bloomType = bloomStr == null ? BloomType.ROW :
554        BloomType.valueOf(bloomStr);
555
556    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
557      if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
558        LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
559      } else {
560        conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
561      }
562    }
563
564    inMemoryCF = cmd.hasOption(OPT_INMEMORY);
565    if (cmd.hasOption(OPT_ENCRYPTION)) {
566      cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
567    }
568
569  }
570
571  public void initTestTable() throws IOException {
572    Durability durability = Durability.USE_DEFAULT;
573    if (deferredLogFlush) {
574      durability = Durability.ASYNC_WAL;
575    }
576
577    HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
578      getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
579        regionReplication, durability);
580    applyColumnFamilyOptions(tableName, getColumnFamilies());
581  }
582
583  @Override
584  protected int doWork() throws IOException {
585    if (!isVerbose) {
586        LogManager.getLogger(ZooKeeper.class.getName()).setLevel(Level.WARN);
587    }
588    if (numTables > 1) {
589      return parallelLoadTables();
590    } else {
591      return loadTable();
592    }
593  }
594
595  protected int loadTable() throws IOException {
596    if (cmd.hasOption(OPT_ZK_QUORUM)) {
597      conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
598    }
599    if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
600      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
601    }
602
603    if (isInitOnly) {
604      LOG.info("Initializing only; no reads or writes");
605      initTestTable();
606      return 0;
607    }
608
609    if (!isSkipInit) {
610      initTestTable();
611    }
612    LoadTestDataGenerator dataGen = null;
613    if (cmd.hasOption(OPT_GENERATOR)) {
614      String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
615      dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
616      String[] args;
617      if (dataGen instanceof LoadTestDataGeneratorWithACL) {
618        LOG.info("Using LoadTestDataGeneratorWithACL");
619        if (User.isHBaseSecurityEnabled(conf)) {
620          LOG.info("Security is enabled");
621          authnFileName = clazzAndArgs[1];
622          superUser = clazzAndArgs[2];
623          userNames = clazzAndArgs[3];
624          args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
625          Properties authConfig = new Properties();
626          authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
627          try {
628            addAuthInfoToConf(authConfig, conf, superUser, userNames);
629          } catch (IOException exp) {
630            LOG.error(exp.toString(), exp);
631            return EXIT_FAILURE;
632          }
633          userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser));
634        } else {
635          superUser = clazzAndArgs[1];
636          userNames = clazzAndArgs[2];
637          args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
638          userOwner = User.createUserForTesting(conf, superUser, new String[0]);
639        }
640      } else {
641        args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
642            clazzAndArgs.length);
643      }
644      dataGen.initialize(args);
645    } else {
646      // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
647      dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
648          minColsPerKey, maxColsPerKey, families);
649    }
650
651    if (userOwner != null) {
652      LOG.info("Granting permissions for user " + userOwner.getShortName());
653      Permission.Action[] actions = {
654        Permission.Action.ADMIN, Permission.Action.CREATE,
655        Permission.Action.READ, Permission.Action.WRITE };
656      try {
657        AccessControlClient.grant(ConnectionFactory.createConnection(conf),
658            tableName, userOwner.getShortName(), null, null, actions);
659      } catch (Throwable e) {
660        LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " +
661            userOwner.getShortName(), e);
662        return EXIT_FAILURE;
663      }
664    }
665
666    if (userNames != null) {
667      // This will be comma separated list of expressions.
668      String users[] = userNames.split(",");
669      User user = null;
670      for (String userStr : users) {
671        if (User.isHBaseSecurityEnabled(conf)) {
672          user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr));
673        } else {
674          user = User.createUserForTesting(conf, userStr, new String[0]);
675        }
676      }
677    }
678
679    if (isWrite) {
680      if (userOwner != null) {
681        writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
682      } else {
683        String writerClass = null;
684        if (cmd.hasOption(OPT_WRITER)) {
685          writerClass = cmd.getOptionValue(OPT_WRITER);
686        } else {
687          writerClass = MultiThreadedWriter.class.getCanonicalName();
688        }
689
690        writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
691      }
692      writerThreads.setMultiPut(isMultiPut);
693    }
694
695    if (isUpdate) {
696      if (userOwner != null) {
697        updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
698            userOwner, userNames);
699      } else {
700        String updaterClass = null;
701        if (cmd.hasOption(OPT_UPDATER)) {
702          updaterClass = cmd.getOptionValue(OPT_UPDATER);
703        } else {
704          updaterClass = MultiThreadedUpdater.class.getCanonicalName();
705        }
706        updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
707      }
708      updaterThreads.setBatchUpdate(isBatchUpdate);
709      updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
710    }
711
712    if (isRead) {
713      if (userOwner != null) {
714        readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
715            userNames);
716      } else {
717        String readerClass = null;
718        if (cmd.hasOption(OPT_READER)) {
719          readerClass = cmd.getOptionValue(OPT_READER);
720        } else {
721          readerClass = MultiThreadedReader.class.getCanonicalName();
722        }
723        readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
724      }
725      readerThreads.setMaxErrors(maxReadErrors);
726      readerThreads.setKeyWindow(keyWindow);
727      readerThreads.setMultiGetBatchSize(multiGetBatchSize);
728      readerThreads.setRegionReplicaId(regionReplicaId);
729    }
730
731    if (isUpdate && isWrite) {
732      LOG.info("Concurrent write/update workload: making updaters aware of the " +
733        "write point");
734      updaterThreads.linkToWriter(writerThreads);
735    }
736
737    if (isRead && (isUpdate || isWrite)) {
738      LOG.info("Concurrent write/read workload: making readers aware of the " +
739        "write point");
740      readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
741    }
742
743    if (isWrite) {
744      System.out.println("Starting to write data...");
745      writerThreads.start(startKey, endKey, numWriterThreads);
746    }
747
748    if (isUpdate) {
749      LOG.info("Starting to mutate data...");
750      System.out.println("Starting to mutate data...");
751      // TODO : currently append and increment operations not tested with tags
752      // Will update this after it is done
753      updaterThreads.start(startKey, endKey, numUpdaterThreads);
754    }
755
756    if (isRead) {
757      System.out.println("Starting to read data...");
758      readerThreads.start(startKey, endKey, numReaderThreads);
759    }
760
761    if (isWrite) {
762      writerThreads.waitForFinish();
763    }
764
765    if (isUpdate) {
766      updaterThreads.waitForFinish();
767    }
768
769    if (isRead) {
770      readerThreads.waitForFinish();
771    }
772
773    boolean success = true;
774    if (isWrite) {
775      success = success && writerThreads.getNumWriteFailures() == 0;
776    }
777    if (isUpdate) {
778      success = success && updaterThreads.getNumWriteFailures() == 0;
779    }
780    if (isRead) {
781      success = success && readerThreads.getNumReadErrors() == 0
782          && readerThreads.getNumReadFailures() == 0;
783    }
784    return success ? EXIT_SUCCESS : EXIT_FAILURE;
785  }
786
787  private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
788    try {
789      Class<?> clazz = Class.forName(clazzName);
790      Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
791          byte[][].class);
792      return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
793          minColsPerKey, maxColsPerKey, families);
794    } catch (Exception e) {
795      throw new IOException(e);
796    }
797  }
798
799  private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
800      , LoadTestDataGenerator dataGen) throws IOException {
801    try {
802      Class<?> clazz = Class.forName(clazzName);
803      Constructor<?> constructor = clazz.getConstructor(
804        LoadTestDataGenerator.class, Configuration.class, TableName.class);
805      return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
806    } catch (Exception e) {
807      throw new IOException(e);
808    }
809  }
810
811  private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
812      , LoadTestDataGenerator dataGen) throws IOException {
813    try {
814      Class<?> clazz = Class.forName(clazzName);
815      Constructor<?> constructor = clazz.getConstructor(
816        LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
817      return (MultiThreadedUpdater) constructor.newInstance(
818        dataGen, conf, tableName, updatePercent);
819    } catch (Exception e) {
820      throw new IOException(e);
821    }
822  }
823
824  private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
825      , LoadTestDataGenerator dataGen) throws IOException {
826    try {
827      Class<?> clazz = Class.forName(clazzName);
828      Constructor<?> constructor = clazz.getConstructor(
829        LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
830      return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
831    } catch (Exception e) {
832      throw new IOException(e);
833    }
834  }
835
836  public static void main(String[] args) {
837    new LoadTestTool().doStaticMain(args);
838  }
839
840  /**
841   * When NUM_TABLES is specified, the function starts multiple worker threads
842   * which individually start a LoadTestTool instance to load a table. Each
843   * table name is in format &lt;tn>_&lt;index>. For example, "-tn test -num_tables 2"
844   * , table names will be "test_1", "test_2"
845   *
846   * @throws IOException if one of the load tasks is unable to complete
847   */
848  private int parallelLoadTables()
849      throws IOException {
850    // create new command args
851    String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
852    String[] newArgs = null;
853    if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
854      newArgs = new String[cmdLineArgs.length + 2];
855      newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
856      newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
857      System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
858    } else {
859      newArgs = cmdLineArgs;
860    }
861
862    int tableNameValueIndex = -1;
863    for (int j = 0; j < newArgs.length; j++) {
864      if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
865        tableNameValueIndex = j + 1;
866      } else if (newArgs[j].endsWith(NUM_TABLES)) {
867        // change NUM_TABLES to 1 so that each worker loads one table
868        newArgs[j + 1] = "1";
869      }
870    }
871
872    // starting to load multiple tables
873    List<WorkerThread> workers = new ArrayList<>();
874    for (int i = 0; i < numTables; i++) {
875      String[] workerArgs = newArgs.clone();
876      workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
877      WorkerThread worker = new WorkerThread(i, workerArgs);
878      workers.add(worker);
879      LOG.info(worker + " starting");
880      worker.start();
881    }
882
883    // wait for all workers finish
884    LOG.info("Waiting for worker threads to finish");
885    for (WorkerThread t : workers) {
886      try {
887        t.join();
888      } catch (InterruptedException ie) {
889        IOException iie = new InterruptedIOException();
890        iie.initCause(ie);
891        throw iie;
892      }
893      checkForErrors();
894    }
895
896    return EXIT_SUCCESS;
897  }
898
899  // If an exception is thrown by one of worker threads, it will be
900  // stored here.
901  protected AtomicReference<Throwable> thrown = new AtomicReference<>();
902
903  private void workerThreadError(Throwable t) {
904    thrown.compareAndSet(null, t);
905  }
906
907  /**
908   * Check for errors in the writer threads. If any is found, rethrow it.
909   */
910  private void checkForErrors() throws IOException {
911    Throwable thrown = this.thrown.get();
912    if (thrown == null) return;
913    if (thrown instanceof IOException) {
914      throw (IOException) thrown;
915    } else {
916      throw new RuntimeException(thrown);
917    }
918  }
919
920  class WorkerThread extends Thread {
921    private String[] workerArgs;
922
923    WorkerThread(int i, String[] args) {
924      super("WorkerThread-" + i);
925      workerArgs = args;
926    }
927
928    @Override
929    public void run() {
930      try {
931        int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
932        if (ret != 0) {
933          throw new RuntimeException("LoadTestTool exit with non-zero return code.");
934        }
935      } catch (Exception ex) {
936        LOG.error("Error in worker thread", ex);
937        workerThreadError(ex);
938      }
939    }
940  }
941
942  private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
943      String userList) throws IOException {
944    List<String> users = new ArrayList<>(Arrays.asList(userList.split(",")));
945    users.add(owner);
946    for (String user : users) {
947      String keyTabFileConfKey = "hbase." + user + ".keytab.file";
948      String principalConfKey = "hbase." + user + ".kerberos.principal";
949      if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
950        throw new IOException("Authentication configs missing for user : " + user);
951      }
952    }
953    for (String key : authConfig.stringPropertyNames()) {
954      conf.set(key, authConfig.getProperty(key));
955    }
956    LOG.debug("Added authentication properties to config successfully.");
957  }
958
959}