001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Properties;
040import java.util.Queue;
041import java.util.Random;
042import java.util.TreeMap;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.ThreadLocalRandom;
049import org.apache.commons.lang3.StringUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.conf.Configured;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.Append;
056import org.apache.hadoop.hbase.client.AsyncConnection;
057import org.apache.hadoop.hbase.client.AsyncTable;
058import org.apache.hadoop.hbase.client.BufferedMutator;
059import org.apache.hadoop.hbase.client.BufferedMutatorParams;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
061import org.apache.hadoop.hbase.client.Connection;
062import org.apache.hadoop.hbase.client.ConnectionFactory;
063import org.apache.hadoop.hbase.client.Consistency;
064import org.apache.hadoop.hbase.client.Delete;
065import org.apache.hadoop.hbase.client.Durability;
066import org.apache.hadoop.hbase.client.Get;
067import org.apache.hadoop.hbase.client.Increment;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.RegionInfo;
070import org.apache.hadoop.hbase.client.RegionInfoBuilder;
071import org.apache.hadoop.hbase.client.RegionLocator;
072import org.apache.hadoop.hbase.client.Result;
073import org.apache.hadoop.hbase.client.ResultScanner;
074import org.apache.hadoop.hbase.client.RowMutations;
075import org.apache.hadoop.hbase.client.Scan;
076import org.apache.hadoop.hbase.client.Table;
077import org.apache.hadoop.hbase.client.TableDescriptor;
078import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
079import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
080import org.apache.hadoop.hbase.filter.BinaryComparator;
081import org.apache.hadoop.hbase.filter.Filter;
082import org.apache.hadoop.hbase.filter.FilterAllFilter;
083import org.apache.hadoop.hbase.filter.FilterList;
084import org.apache.hadoop.hbase.filter.PageFilter;
085import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
086import org.apache.hadoop.hbase.filter.WhileMatchFilter;
087import org.apache.hadoop.hbase.io.compress.Compression;
088import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
089import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
090import org.apache.hadoop.hbase.regionserver.BloomType;
091import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
092import org.apache.hadoop.hbase.trace.TraceUtil;
093import org.apache.hadoop.hbase.util.ByteArrayHashKey;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.GsonUtil;
097import org.apache.hadoop.hbase.util.Hash;
098import org.apache.hadoop.hbase.util.MurmurHash;
099import org.apache.hadoop.hbase.util.Pair;
100import org.apache.hadoop.hbase.util.RandomDistribution;
101import org.apache.hadoop.hbase.util.YammerHistogramUtils;
102import org.apache.hadoop.io.LongWritable;
103import org.apache.hadoop.io.Text;
104import org.apache.hadoop.mapreduce.Job;
105import org.apache.hadoop.mapreduce.Mapper;
106import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
108import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
109import org.apache.hadoop.util.Tool;
110import org.apache.hadoop.util.ToolRunner;
111import org.apache.yetus.audience.InterfaceAudience;
112import org.slf4j.Logger;
113import org.slf4j.LoggerFactory;
114
115import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
117import org.apache.hbase.thirdparty.com.google.gson.Gson;
118
119/**
120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
122 * etc.). Pass on the command-line which test to run and how many clients are participating in this
123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
124 * <p>
125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
127 * pages 8-10.
128 * <p>
129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
131 * about 1GB of data, unless specified otherwise.
132 */
133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
134public class PerformanceEvaluation extends Configured implements Tool {
135  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
136  static final String RANDOM_READ = "randomRead";
137  static final String PE_COMMAND_SHORTNAME = "pe";
138  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
139  private static final Gson GSON = GsonUtil.createGson().create();
140
141  public static final String TABLE_NAME = "TestTable";
142  public static final String FAMILY_NAME_BASE = "info";
143  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
144  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
145  public static final int DEFAULT_VALUE_LENGTH = 1000;
146  public static final int ROW_LENGTH = 26;
147
148  private static final int ONE_GB = 1024 * 1024 * 1000;
149  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
150  // TODO : should we make this configurable
151  private static final int TAG_LENGTH = 256;
152  private static final DecimalFormat FMT = new DecimalFormat("0.##");
153  private static final MathContext CXT = MathContext.DECIMAL64;
154  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
155  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
156  private static final TestOptions DEFAULT_OPTS = new TestOptions();
157
158  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
159  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
160
161  static {
162    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
163      "Run async random read test");
164    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
165      "Run async random write test");
166    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
167      "Run async sequential read test");
168    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
169      "Run async sequential write test");
170    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
171    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
172    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
173    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
174      "Run random seek and scan 100 test");
175    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
176      "Run random seek scan with both start and stop row (max 10 rows)");
177    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
178      "Run random seek scan with both start and stop row (max 100 rows)");
179    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
180      "Run random seek scan with both start and stop row (max 1000 rows)");
181    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
182      "Run random seek scan with both start and stop row (max 10000 rows)");
183    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
184    addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
185    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
186    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
187    addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
188      "Run sequential delete test");
189    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
190      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
191    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
192    addCommandDescriptor(ReverseScanTest.class, "reverseScan",
193      "Run reverse scan test (read every row)");
194    addCommandDescriptor(FilteredScanTest.class, "filterScan",
195      "Run scan test using a filter to find a specific row based on it's value "
196        + "(make sure to use --rows=20)");
197    addCommandDescriptor(IncrementTest.class, "increment",
198      "Increment on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(AppendTest.class, "append",
200      "Append on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
202      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
204      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
205    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
206      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
207    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
208      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
209  }
210
211  /**
212   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
213   * associated properties.
214   */
215  protected static enum Counter {
216    /** elapsed time */
217    ELAPSED_TIME,
218    /** number of rows */
219    ROWS
220  }
221
222  protected static class RunResult implements Comparable<RunResult> {
223    public RunResult(long duration, Histogram hist) {
224      this.duration = duration;
225      this.hist = hist;
226      numbOfReplyOverThreshold = 0;
227      numOfReplyFromReplica = 0;
228    }
229
230    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
231      Histogram hist) {
232      this.duration = duration;
233      this.hist = hist;
234      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
235      this.numOfReplyFromReplica = numOfReplyFromReplica;
236    }
237
238    public final long duration;
239    public final Histogram hist;
240    public final long numbOfReplyOverThreshold;
241    public final long numOfReplyFromReplica;
242
243    @Override
244    public String toString() {
245      return Long.toString(duration);
246    }
247
248    @Override
249    public int compareTo(RunResult o) {
250      return Long.compare(this.duration, o.duration);
251    }
252
253    @Override
254    public boolean equals(Object obj) {
255      if (this == obj) {
256        return true;
257      }
258      if (obj == null || getClass() != obj.getClass()) {
259        return false;
260      }
261      return this.compareTo((RunResult) obj) == 0;
262    }
263
264    @Override
265    public int hashCode() {
266      return Long.hashCode(duration);
267    }
268  }
269
270  /**
271   * Constructor
272   * @param conf Configuration object
273   */
274  public PerformanceEvaluation(final Configuration conf) {
275    super(conf);
276  }
277
278  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
279    String description) {
280    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
281    COMMANDS.put(name, cmdDescriptor);
282  }
283
284  /**
285   * Implementations can have their status set.
286   */
287  interface Status {
288    /**
289     * Sets status
290     * @param msg status message
291     */
292    void setStatus(final String msg) throws IOException;
293  }
294
295  /**
296   * MapReduce job that runs a performance evaluation client in each map task.
297   */
298  public static class EvaluationMapTask
299    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
300
301    /** configuration parameter name that contains the command */
302    public final static String CMD_KEY = "EvaluationMapTask.command";
303    /** configuration parameter name that contains the PE impl */
304    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
305
306    private Class<? extends Test> cmd;
307
308    @Override
309    protected void setup(Context context) throws IOException, InterruptedException {
310      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
311
312      // this is required so that extensions of PE are instantiated within the
313      // map reduce task...
314      Class<? extends PerformanceEvaluation> peClass =
315        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
316      try {
317        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
318      } catch (Exception e) {
319        throw new IllegalStateException("Could not instantiate PE instance", e);
320      }
321    }
322
323    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
324      try {
325        return Class.forName(className).asSubclass(type);
326      } catch (ClassNotFoundException e) {
327        throw new IllegalStateException("Could not find class for name: " + className, e);
328      }
329    }
330
331    @Override
332    protected void map(LongWritable key, Text value, final Context context)
333      throws IOException, InterruptedException {
334
335      Status status = new Status() {
336        @Override
337        public void setStatus(String msg) {
338          context.setStatus(msg);
339        }
340      };
341
342      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
343      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
344      final Connection con = ConnectionFactory.createConnection(conf);
345      AsyncConnection asyncCon = null;
346      try {
347        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
348      } catch (ExecutionException e) {
349        throw new IOException(e);
350      }
351
352      // Evaluation task
353      RunResult result =
354        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
355      // Collect how much time the thing took. Report as map output and
356      // to the ELAPSED_TIME counter.
357      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
358      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
359      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
360      context.progress();
361    }
362  }
363
364  /*
365   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
366   * is specified or when the existing table's region replica count doesn't match {@code
367   * opts.replicas}.
368   */
369  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
370    TableName tableName = TableName.valueOf(opts.tableName);
371    boolean needsDelete = false, exists = admin.tableExists(tableName);
372    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
373      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
374    boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
375    if (!exists && (isReadCmd || isDeleteCmd)) {
376      throw new IllegalStateException(
377        "Must specify an existing table for read commands. Run a write command first.");
378    }
379    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
380    byte[][] splits = getSplits(opts);
381
382    // recreate the table when user has requested presplit or when existing
383    // {RegionSplitPolicy,replica count} does not match requested, or when the
384    // number of column families does not match requested.
385    if (
386      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions
387        && opts.presplitRegions != admin.getRegions(tableName).size())
388        || (!isReadCmd && desc != null
389          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
390        || (!(isReadCmd || isDeleteCmd) && desc != null
391          && desc.getRegionReplication() != opts.replicas)
392        || (desc != null && desc.getColumnFamilyCount() != opts.families)
393    ) {
394      needsDelete = true;
395      // wait, why did it delete my table?!?
396      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
397        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
398        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
399        .add("replicas", opts.replicas).add("families", opts.families).toString());
400    }
401
402    // remove an existing table
403    if (needsDelete) {
404      if (admin.isTableEnabled(tableName)) {
405        admin.disableTable(tableName);
406      }
407      admin.deleteTable(tableName);
408    }
409
410    // table creation is necessary
411    if (!exists || needsDelete) {
412      desc = getTableDescriptor(opts);
413      if (splits != null) {
414        if (LOG.isDebugEnabled()) {
415          for (int i = 0; i < splits.length; i++) {
416            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
417          }
418        }
419      }
420      if (splits != null) {
421        admin.createTable(desc, splits);
422      } else {
423        admin.createTable(desc);
424      }
425      LOG.info("Table " + desc + " created");
426    }
427    return admin.tableExists(tableName);
428  }
429
430  /**
431   * Create an HTableDescriptor from provided TestOptions.
432   */
433  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
434    TableDescriptorBuilder builder =
435      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
436
437    for (int family = 0; family < opts.families; family++) {
438      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
439      ColumnFamilyDescriptorBuilder cfBuilder =
440        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
441      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
442      cfBuilder.setCompressionType(opts.compression);
443      cfBuilder.setEncryptionType(opts.encryption);
444      cfBuilder.setBloomFilterType(opts.bloomType);
445      cfBuilder.setBlocksize(opts.blockSize);
446      if (opts.inMemoryCF) {
447        cfBuilder.setInMemory(true);
448      }
449      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
450      builder.setColumnFamily(cfBuilder.build());
451    }
452    if (opts.replicas != DEFAULT_OPTS.replicas) {
453      builder.setRegionReplication(opts.replicas);
454    }
455    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
456      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
457    }
458    return builder.build();
459  }
460
461  /**
462   * generates splits based on total number of rows and specified split regions
463   */
464  protected static byte[][] getSplits(TestOptions opts) {
465    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
466
467    int numSplitPoints = opts.presplitRegions - 1;
468    byte[][] splits = new byte[numSplitPoints][];
469    int jump = opts.totalRows / opts.presplitRegions;
470    for (int i = 0; i < numSplitPoints; i++) {
471      int rowkey = jump * (1 + i);
472      splits[i] = format(rowkey);
473    }
474    return splits;
475  }
476
477  static void setupConnectionCount(final TestOptions opts) {
478    if (opts.oneCon) {
479      opts.connCount = 1;
480    } else {
481      if (opts.connCount == -1) {
482        // set to thread number if connCount is not set
483        opts.connCount = opts.numClientThreads;
484      }
485    }
486  }
487
488  /*
489   * Run all clients in this vm each to its own thread.
490   */
491  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
492    throws IOException, InterruptedException, ExecutionException {
493    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
494    assert cmd != null;
495    @SuppressWarnings("unchecked")
496    Future<RunResult>[] threads = new Future[opts.numClientThreads];
497    RunResult[] results = new RunResult[opts.numClientThreads];
498    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
499      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
500    setupConnectionCount(opts);
501    final Connection[] cons = new Connection[opts.connCount];
502    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
503    for (int i = 0; i < opts.connCount; i++) {
504      cons[i] = ConnectionFactory.createConnection(conf);
505      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
506    }
507    LOG
508      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
509    for (int i = 0; i < threads.length; i++) {
510      final int index = i;
511      threads[i] = pool.submit(new Callable<RunResult>() {
512        @Override
513        public RunResult call() throws Exception {
514          TestOptions threadOpts = new TestOptions(opts);
515          final Connection con = cons[index % cons.length];
516          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
517          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
518          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
519            @Override
520            public void setStatus(final String msg) throws IOException {
521              LOG.info(msg);
522            }
523          });
524          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
525            + "ms over " + threadOpts.perClientRunRows + " rows");
526          if (opts.latencyThreshold > 0) {
527            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
528              + "(ms) is " + run.numbOfReplyOverThreshold);
529          }
530          return run;
531        }
532      });
533    }
534    pool.shutdown();
535
536    for (int i = 0; i < threads.length; i++) {
537      try {
538        results[i] = threads[i].get();
539      } catch (ExecutionException e) {
540        throw new IOException(e.getCause());
541      }
542    }
543    final String test = cmd.getSimpleName();
544    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
545    Arrays.sort(results);
546    long total = 0;
547    float avgLatency = 0;
548    float avgTPS = 0;
549    long replicaWins = 0;
550    for (RunResult result : results) {
551      total += result.duration;
552      avgLatency += result.hist.getSnapshot().getMean();
553      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
554      replicaWins += result.numOfReplyFromReplica;
555    }
556    avgTPS *= 1000; // ms to second
557    avgLatency = avgLatency / results.length;
558    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
559      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
560    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
561    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
562    if (opts.replicas > 1) {
563      LOG.info("[results from replica regions] " + replicaWins);
564    }
565
566    for (int i = 0; i < opts.connCount; i++) {
567      cons[i].close();
568      asyncCons[i].close();
569    }
570
571    return results;
572  }
573
574  /*
575   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
576   * out an input file with instruction per client regards which row they are to start on.
577   * @param cmd Command to run.
578   */
579  static Job doMapReduce(TestOptions opts, final Configuration conf)
580    throws IOException, InterruptedException, ClassNotFoundException {
581    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
582    assert cmd != null;
583    Path inputDir = writeInputFile(conf, opts);
584    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
585    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
586    Job job = Job.getInstance(conf);
587    job.setJarByClass(PerformanceEvaluation.class);
588    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
589
590    job.setInputFormatClass(NLineInputFormat.class);
591    NLineInputFormat.setInputPaths(job, inputDir);
592    // this is default, but be explicit about it just in case.
593    NLineInputFormat.setNumLinesPerSplit(job, 1);
594
595    job.setOutputKeyClass(LongWritable.class);
596    job.setOutputValueClass(LongWritable.class);
597
598    job.setMapperClass(EvaluationMapTask.class);
599    job.setReducerClass(LongSumReducer.class);
600
601    job.setNumReduceTasks(1);
602
603    job.setOutputFormatClass(TextOutputFormat.class);
604    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
605
606    TableMapReduceUtil.addDependencyJars(job);
607    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
608                                                                                            // metrics
609      Gson.class, // gson
610      FilterAllFilter.class // hbase-server tests jar
611    );
612
613    TableMapReduceUtil.initCredentials(job);
614
615    job.waitForCompletion(true);
616    return job;
617  }
618
619  /**
620   * Each client has one mapper to do the work, and client do the resulting count in a map task.
621   */
622
623  static String JOB_INPUT_FILENAME = "input.txt";
624
625  /*
626   * Write input file of offsets-per-client for the mapreduce job.
627   * @param c Configuration
628   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
629   */
630  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
631    return writeInputFile(c, opts, new Path("."));
632  }
633
634  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
635    throws IOException {
636    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
637    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
638    Path inputDir = new Path(jobdir, "inputs");
639
640    FileSystem fs = FileSystem.get(c);
641    fs.mkdirs(inputDir);
642
643    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
644    PrintStream out = new PrintStream(fs.create(inputFile));
645    // Make input random.
646    Map<Integer, String> m = new TreeMap<>();
647    Hash h = MurmurHash.getInstance();
648    int perClientRows = (opts.totalRows / opts.numClientThreads);
649    try {
650      for (int j = 0; j < opts.numClientThreads; j++) {
651        TestOptions next = new TestOptions(opts);
652        next.startRow = j * perClientRows;
653        next.perClientRunRows = perClientRows;
654        String s = GSON.toJson(next);
655        LOG.info("Client=" + j + ", input=" + s);
656        byte[] b = Bytes.toBytes(s);
657        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
658        m.put(hash, s);
659      }
660      for (Map.Entry<Integer, String> e : m.entrySet()) {
661        out.println(e.getValue());
662      }
663    } finally {
664      out.close();
665    }
666    return inputDir;
667  }
668
669  /**
670   * Describes a command.
671   */
672  static class CmdDescriptor {
673    private Class<? extends TestBase> cmdClass;
674    private String name;
675    private String description;
676
677    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
678      this.cmdClass = cmdClass;
679      this.name = name;
680      this.description = description;
681    }
682
683    public Class<? extends TestBase> getCmdClass() {
684      return cmdClass;
685    }
686
687    public String getName() {
688      return name;
689    }
690
691    public String getDescription() {
692      return description;
693    }
694  }
695
696  /**
697   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
698   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
699   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
700   * need to add to the clone constructor below copying your new option from the 'that' to the
701   * 'this'. Look for 'clone' below.
702   */
703  static class TestOptions {
704    String cmdName = null;
705    boolean nomapred = false;
706    boolean filterAll = false;
707    int startRow = 0;
708    float size = 1.0f;
709    int perClientRunRows = DEFAULT_ROWS_PER_GB;
710    int numClientThreads = 1;
711    int totalRows = DEFAULT_ROWS_PER_GB;
712    int measureAfter = 0;
713    float sampleRate = 1.0f;
714    /**
715     * @deprecated Useless after switching to OpenTelemetry
716     */
717    @Deprecated
718    double traceRate = 0.0;
719    String tableName = TABLE_NAME;
720    boolean flushCommits = true;
721    boolean writeToWAL = true;
722    boolean autoFlush = false;
723    boolean oneCon = false;
724    int connCount = -1; // wil decide the actual num later
725    boolean useTags = false;
726    int noOfTags = 1;
727    boolean reportLatency = false;
728    int multiGet = 0;
729    int multiPut = 0;
730    int randomSleep = 0;
731    boolean inMemoryCF = false;
732    int presplitRegions = 0;
733    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
734    String splitPolicy = null;
735    Compression.Algorithm compression = Compression.Algorithm.NONE;
736    String encryption = null;
737    BloomType bloomType = BloomType.ROW;
738    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
739    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
740    boolean valueRandom = false;
741    boolean valueZipf = false;
742    int valueSize = DEFAULT_VALUE_LENGTH;
743    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
744    int cycles = 1;
745    int columns = 1;
746    int families = 1;
747    int caching = 30;
748    int latencyThreshold = 0; // in millsecond
749    boolean addColumns = true;
750    MemoryCompactionPolicy inMemoryCompaction =
751      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
752    boolean asyncPrefetch = false;
753    boolean cacheBlocks = true;
754    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
755    long bufferSize = 2l * 1024l * 1024l;
756    Properties commandProperties;
757
758    public TestOptions() {
759    }
760
761    /**
762     * Clone constructor.
763     * @param that Object to copy from.
764     */
765    public TestOptions(TestOptions that) {
766      this.cmdName = that.cmdName;
767      this.cycles = that.cycles;
768      this.nomapred = that.nomapred;
769      this.startRow = that.startRow;
770      this.size = that.size;
771      this.perClientRunRows = that.perClientRunRows;
772      this.numClientThreads = that.numClientThreads;
773      this.totalRows = that.totalRows;
774      this.sampleRate = that.sampleRate;
775      this.traceRate = that.traceRate;
776      this.tableName = that.tableName;
777      this.flushCommits = that.flushCommits;
778      this.writeToWAL = that.writeToWAL;
779      this.autoFlush = that.autoFlush;
780      this.oneCon = that.oneCon;
781      this.connCount = that.connCount;
782      this.useTags = that.useTags;
783      this.noOfTags = that.noOfTags;
784      this.reportLatency = that.reportLatency;
785      this.latencyThreshold = that.latencyThreshold;
786      this.multiGet = that.multiGet;
787      this.multiPut = that.multiPut;
788      this.inMemoryCF = that.inMemoryCF;
789      this.presplitRegions = that.presplitRegions;
790      this.replicas = that.replicas;
791      this.splitPolicy = that.splitPolicy;
792      this.compression = that.compression;
793      this.encryption = that.encryption;
794      this.blockEncoding = that.blockEncoding;
795      this.filterAll = that.filterAll;
796      this.bloomType = that.bloomType;
797      this.blockSize = that.blockSize;
798      this.valueRandom = that.valueRandom;
799      this.valueZipf = that.valueZipf;
800      this.valueSize = that.valueSize;
801      this.period = that.period;
802      this.randomSleep = that.randomSleep;
803      this.measureAfter = that.measureAfter;
804      this.addColumns = that.addColumns;
805      this.columns = that.columns;
806      this.families = that.families;
807      this.caching = that.caching;
808      this.inMemoryCompaction = that.inMemoryCompaction;
809      this.asyncPrefetch = that.asyncPrefetch;
810      this.cacheBlocks = that.cacheBlocks;
811      this.scanReadType = that.scanReadType;
812      this.bufferSize = that.bufferSize;
813      this.commandProperties = that.commandProperties;
814    }
815
816    public Properties getCommandProperties() {
817      return commandProperties;
818    }
819
820    public int getCaching() {
821      return this.caching;
822    }
823
824    public void setCaching(final int caching) {
825      this.caching = caching;
826    }
827
828    public int getColumns() {
829      return this.columns;
830    }
831
832    public void setColumns(final int columns) {
833      this.columns = columns;
834    }
835
836    public int getFamilies() {
837      return this.families;
838    }
839
840    public void setFamilies(final int families) {
841      this.families = families;
842    }
843
844    public int getCycles() {
845      return this.cycles;
846    }
847
848    public void setCycles(final int cycles) {
849      this.cycles = cycles;
850    }
851
852    public boolean isValueZipf() {
853      return valueZipf;
854    }
855
856    public void setValueZipf(boolean valueZipf) {
857      this.valueZipf = valueZipf;
858    }
859
860    public String getCmdName() {
861      return cmdName;
862    }
863
864    public void setCmdName(String cmdName) {
865      this.cmdName = cmdName;
866    }
867
868    public int getRandomSleep() {
869      return randomSleep;
870    }
871
872    public void setRandomSleep(int randomSleep) {
873      this.randomSleep = randomSleep;
874    }
875
876    public int getReplicas() {
877      return replicas;
878    }
879
880    public void setReplicas(int replicas) {
881      this.replicas = replicas;
882    }
883
884    public String getSplitPolicy() {
885      return splitPolicy;
886    }
887
888    public void setSplitPolicy(String splitPolicy) {
889      this.splitPolicy = splitPolicy;
890    }
891
892    public void setNomapred(boolean nomapred) {
893      this.nomapred = nomapred;
894    }
895
896    public void setFilterAll(boolean filterAll) {
897      this.filterAll = filterAll;
898    }
899
900    public void setStartRow(int startRow) {
901      this.startRow = startRow;
902    }
903
904    public void setSize(float size) {
905      this.size = size;
906    }
907
908    public void setPerClientRunRows(int perClientRunRows) {
909      this.perClientRunRows = perClientRunRows;
910    }
911
912    public void setNumClientThreads(int numClientThreads) {
913      this.numClientThreads = numClientThreads;
914    }
915
916    public void setTotalRows(int totalRows) {
917      this.totalRows = totalRows;
918    }
919
920    public void setSampleRate(float sampleRate) {
921      this.sampleRate = sampleRate;
922    }
923
924    public void setTraceRate(double traceRate) {
925      this.traceRate = traceRate;
926    }
927
928    public void setTableName(String tableName) {
929      this.tableName = tableName;
930    }
931
932    public void setFlushCommits(boolean flushCommits) {
933      this.flushCommits = flushCommits;
934    }
935
936    public void setWriteToWAL(boolean writeToWAL) {
937      this.writeToWAL = writeToWAL;
938    }
939
940    public void setAutoFlush(boolean autoFlush) {
941      this.autoFlush = autoFlush;
942    }
943
944    public void setOneCon(boolean oneCon) {
945      this.oneCon = oneCon;
946    }
947
948    public int getConnCount() {
949      return connCount;
950    }
951
952    public void setConnCount(int connCount) {
953      this.connCount = connCount;
954    }
955
956    public void setUseTags(boolean useTags) {
957      this.useTags = useTags;
958    }
959
960    public void setNoOfTags(int noOfTags) {
961      this.noOfTags = noOfTags;
962    }
963
964    public void setReportLatency(boolean reportLatency) {
965      this.reportLatency = reportLatency;
966    }
967
968    public void setMultiGet(int multiGet) {
969      this.multiGet = multiGet;
970    }
971
972    public void setMultiPut(int multiPut) {
973      this.multiPut = multiPut;
974    }
975
976    public void setInMemoryCF(boolean inMemoryCF) {
977      this.inMemoryCF = inMemoryCF;
978    }
979
980    public void setPresplitRegions(int presplitRegions) {
981      this.presplitRegions = presplitRegions;
982    }
983
984    public void setCompression(Compression.Algorithm compression) {
985      this.compression = compression;
986    }
987
988    public void setEncryption(String encryption) {
989      this.encryption = encryption;
990    }
991
992    public void setBloomType(BloomType bloomType) {
993      this.bloomType = bloomType;
994    }
995
996    public void setBlockSize(int blockSize) {
997      this.blockSize = blockSize;
998    }
999
1000    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
1001      this.blockEncoding = blockEncoding;
1002    }
1003
1004    public void setValueRandom(boolean valueRandom) {
1005      this.valueRandom = valueRandom;
1006    }
1007
1008    public void setValueSize(int valueSize) {
1009      this.valueSize = valueSize;
1010    }
1011
1012    public void setBufferSize(long bufferSize) {
1013      this.bufferSize = bufferSize;
1014    }
1015
1016    public void setPeriod(int period) {
1017      this.period = period;
1018    }
1019
1020    public boolean isNomapred() {
1021      return nomapred;
1022    }
1023
1024    public boolean isFilterAll() {
1025      return filterAll;
1026    }
1027
1028    public int getStartRow() {
1029      return startRow;
1030    }
1031
1032    public float getSize() {
1033      return size;
1034    }
1035
1036    public int getPerClientRunRows() {
1037      return perClientRunRows;
1038    }
1039
1040    public int getNumClientThreads() {
1041      return numClientThreads;
1042    }
1043
1044    public int getTotalRows() {
1045      return totalRows;
1046    }
1047
1048    public float getSampleRate() {
1049      return sampleRate;
1050    }
1051
1052    public double getTraceRate() {
1053      return traceRate;
1054    }
1055
1056    public String getTableName() {
1057      return tableName;
1058    }
1059
1060    public boolean isFlushCommits() {
1061      return flushCommits;
1062    }
1063
1064    public boolean isWriteToWAL() {
1065      return writeToWAL;
1066    }
1067
1068    public boolean isAutoFlush() {
1069      return autoFlush;
1070    }
1071
1072    public boolean isUseTags() {
1073      return useTags;
1074    }
1075
1076    public int getNoOfTags() {
1077      return noOfTags;
1078    }
1079
1080    public boolean isReportLatency() {
1081      return reportLatency;
1082    }
1083
1084    public int getMultiGet() {
1085      return multiGet;
1086    }
1087
1088    public int getMultiPut() {
1089      return multiPut;
1090    }
1091
1092    public boolean isInMemoryCF() {
1093      return inMemoryCF;
1094    }
1095
1096    public int getPresplitRegions() {
1097      return presplitRegions;
1098    }
1099
1100    public Compression.Algorithm getCompression() {
1101      return compression;
1102    }
1103
1104    public String getEncryption() {
1105      return encryption;
1106    }
1107
1108    public DataBlockEncoding getBlockEncoding() {
1109      return blockEncoding;
1110    }
1111
1112    public boolean isValueRandom() {
1113      return valueRandom;
1114    }
1115
1116    public int getValueSize() {
1117      return valueSize;
1118    }
1119
1120    public int getPeriod() {
1121      return period;
1122    }
1123
1124    public BloomType getBloomType() {
1125      return bloomType;
1126    }
1127
1128    public int getBlockSize() {
1129      return blockSize;
1130    }
1131
1132    public boolean isOneCon() {
1133      return oneCon;
1134    }
1135
1136    public int getMeasureAfter() {
1137      return measureAfter;
1138    }
1139
1140    public void setMeasureAfter(int measureAfter) {
1141      this.measureAfter = measureAfter;
1142    }
1143
1144    public boolean getAddColumns() {
1145      return addColumns;
1146    }
1147
1148    public void setAddColumns(boolean addColumns) {
1149      this.addColumns = addColumns;
1150    }
1151
1152    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1153      this.inMemoryCompaction = inMemoryCompaction;
1154    }
1155
1156    public MemoryCompactionPolicy getInMemoryCompaction() {
1157      return this.inMemoryCompaction;
1158    }
1159
1160    public long getBufferSize() {
1161      return this.bufferSize;
1162    }
1163  }
1164
1165  /*
1166   * A test. Subclass to particularize what happens per row.
1167   */
1168  static abstract class TestBase {
1169    // Below is make it so when Tests are all running in the one
1170    // jvm, that they each have a differently seeded Random.
1171    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1172
1173    private static long nextRandomSeed() {
1174      return randomSeed.nextLong();
1175    }
1176
1177    private final int everyN;
1178
1179    protected final Random rand = new Random(nextRandomSeed());
1180    protected final Configuration conf;
1181    protected final TestOptions opts;
1182
1183    protected final Status status;
1184
1185    private String testName;
1186    protected Histogram latencyHistogram;
1187    private Histogram replicaLatencyHistogram;
1188    private Histogram valueSizeHistogram;
1189    private Histogram rpcCallsHistogram;
1190    private Histogram remoteRpcCallsHistogram;
1191    private Histogram millisBetweenNextHistogram;
1192    private Histogram regionsScannedHistogram;
1193    private Histogram bytesInResultsHistogram;
1194    private Histogram bytesInRemoteResultsHistogram;
1195    private RandomDistribution.Zipf zipf;
1196    private long numOfReplyOverLatencyThreshold = 0;
1197    private long numOfReplyFromReplica = 0;
1198
1199    /**
1200     * Note that all subclasses of this class must provide a public constructor that has the exact
1201     * same list of arguments.
1202     */
1203    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1204      this.conf = conf;
1205      this.opts = options;
1206      this.status = status;
1207      this.testName = this.getClass().getSimpleName();
1208      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1209      if (options.isValueZipf()) {
1210        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1211      }
1212      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1213    }
1214
1215    int getValueLength(final Random r) {
1216      if (this.opts.isValueRandom()) {
1217        return r.nextInt(opts.valueSize);
1218      } else if (this.opts.isValueZipf()) {
1219        return Math.abs(this.zipf.nextInt());
1220      } else {
1221        return opts.valueSize;
1222      }
1223    }
1224
1225    void updateValueSize(final Result[] rs) throws IOException {
1226      updateValueSize(rs, 0);
1227    }
1228
1229    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1230      if (rs == null || (latency == 0)) return;
1231      for (Result r : rs)
1232        updateValueSize(r, latency);
1233    }
1234
1235    void updateValueSize(final Result r) throws IOException {
1236      updateValueSize(r, 0);
1237    }
1238
1239    void updateValueSize(final Result r, final long latency) throws IOException {
1240      if (r == null || (latency == 0)) return;
1241      int size = 0;
1242      // update replicaHistogram
1243      if (r.isStale()) {
1244        replicaLatencyHistogram.update(latency / 1000);
1245        numOfReplyFromReplica++;
1246      }
1247      if (!isRandomValueSize()) return;
1248
1249      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1250        size += scanner.current().getValueLength();
1251      }
1252      updateValueSize(size);
1253    }
1254
1255    void updateValueSize(final int valueSize) {
1256      if (!isRandomValueSize()) return;
1257      this.valueSizeHistogram.update(valueSize);
1258    }
1259
1260    void updateScanMetrics(final ScanMetrics metrics) {
1261      if (metrics == null) return;
1262      Map<String, Long> metricsMap = metrics.getMetricsMap();
1263      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1264      if (rpcCalls != null) {
1265        this.rpcCallsHistogram.update(rpcCalls.longValue());
1266      }
1267      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1268      if (remoteRpcCalls != null) {
1269        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1270      }
1271      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1272      if (millisBetweenNext != null) {
1273        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1274      }
1275      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1276      if (regionsScanned != null) {
1277        this.regionsScannedHistogram.update(regionsScanned.longValue());
1278      }
1279      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1280      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1281        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1282      }
1283      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1284      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1285        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1286      }
1287    }
1288
1289    String generateStatus(final int sr, final int i, final int lr) {
1290      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1291        + getShortLatencyReport() + "]"
1292        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1293    }
1294
1295    boolean isRandomValueSize() {
1296      return opts.valueRandom;
1297    }
1298
1299    protected int getReportingPeriod() {
1300      return opts.period;
1301    }
1302
1303    /**
1304     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1305     */
1306    public Histogram getLatencyHistogram() {
1307      return latencyHistogram;
1308    }
1309
1310    void testSetup() throws IOException {
1311      // test metrics
1312      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1313      // If it is a replica test, set up histogram for replica.
1314      if (opts.replicas > 1) {
1315        replicaLatencyHistogram =
1316          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1317      }
1318      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1319      // scan metrics
1320      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1321      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1322      millisBetweenNextHistogram =
1323        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1324      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1325      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1326      bytesInRemoteResultsHistogram =
1327        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1328
1329      onStartup();
1330    }
1331
1332    abstract void onStartup() throws IOException;
1333
1334    void testTakedown() throws IOException {
1335      onTakedown();
1336      // Print all stats for this thread continuously.
1337      // Synchronize on Test.class so different threads don't intermingle the
1338      // output. We can't use 'this' here because each thread has its own instance of Test class.
1339      synchronized (Test.class) {
1340        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1341        status
1342          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1343        if (opts.replicas > 1) {
1344          status.setStatus("Latency (us) from Replica Regions: "
1345            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1346        }
1347        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1348        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1349        if (valueSizeHistogram.getCount() > 0) {
1350          status.setStatus(
1351            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1352          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1353          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1354        } else {
1355          status.setStatus("No valueSize statistics available");
1356        }
1357        if (rpcCallsHistogram.getCount() > 0) {
1358          status.setStatus(
1359            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1360        }
1361        if (remoteRpcCallsHistogram.getCount() > 0) {
1362          status.setStatus("remoteRpcCalls (count): "
1363            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1364        }
1365        if (millisBetweenNextHistogram.getCount() > 0) {
1366          status.setStatus("millisBetweenNext (latency): "
1367            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1368        }
1369        if (regionsScannedHistogram.getCount() > 0) {
1370          status.setStatus("regionsScanned (count): "
1371            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1372        }
1373        if (bytesInResultsHistogram.getCount() > 0) {
1374          status.setStatus("bytesInResults (size): "
1375            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1376        }
1377        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1378          status.setStatus("bytesInRemoteResults (size): "
1379            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1380        }
1381      }
1382    }
1383
1384    abstract void onTakedown() throws IOException;
1385
1386    /*
1387     * Run test
1388     * @return Elapsed time.
1389     */
1390    long test() throws IOException, InterruptedException {
1391      testSetup();
1392      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1393      final long startTime = System.nanoTime();
1394      try {
1395        testTimed();
1396      } finally {
1397        testTakedown();
1398      }
1399      return (System.nanoTime() - startTime) / 1000000;
1400    }
1401
1402    int getStartRow() {
1403      return opts.startRow;
1404    }
1405
1406    int getLastRow() {
1407      return getStartRow() + opts.perClientRunRows;
1408    }
1409
1410    /**
1411     * Provides an extension point for tests that don't want a per row invocation.
1412     */
1413    void testTimed() throws IOException, InterruptedException {
1414      int startRow = getStartRow();
1415      int lastRow = getLastRow();
1416      // Report on completion of 1/10th of total.
1417      for (int ii = 0; ii < opts.cycles; ii++) {
1418        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1419        for (int i = startRow; i < lastRow; i++) {
1420          if (i % everyN != 0) continue;
1421          long startTime = System.nanoTime();
1422          boolean requestSent = false;
1423          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1424          try (Scope scope = span.makeCurrent()) {
1425            requestSent = testRow(i, startTime);
1426          } finally {
1427            span.end();
1428          }
1429          if ((i - startRow) > opts.measureAfter) {
1430            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1431            // first 9 times and sends the actual get request in the 10th iteration.
1432            // We should only set latency when actual request is sent because otherwise
1433            // it turns out to be 0.
1434            if (requestSent) {
1435              long latency = (System.nanoTime() - startTime) / 1000;
1436              latencyHistogram.update(latency);
1437              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1438                numOfReplyOverLatencyThreshold++;
1439              }
1440            }
1441            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1442              status.setStatus(generateStatus(startRow, i, lastRow));
1443            }
1444          }
1445        }
1446      }
1447    }
1448
1449    /** Returns Subset of the histograms' calculation. */
1450    public String getShortLatencyReport() {
1451      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1452    }
1453
1454    /** Returns Subset of the histograms' calculation. */
1455    public String getShortValueSizeReport() {
1456      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1457    }
1458
1459    /**
1460     * Test for individual row.
1461     * @param i Row index.
1462     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1463     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1464     */
1465    abstract boolean testRow(final int i, final long startTime)
1466      throws IOException, InterruptedException;
1467  }
1468
1469  static abstract class Test extends TestBase {
1470    protected Connection connection;
1471
1472    Test(final Connection con, final TestOptions options, final Status status) {
1473      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1474      this.connection = con;
1475    }
1476  }
1477
1478  static abstract class AsyncTest extends TestBase {
1479    protected AsyncConnection connection;
1480
1481    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1482      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1483      this.connection = con;
1484    }
1485  }
1486
1487  static abstract class TableTest extends Test {
1488    protected Table table;
1489
1490    TableTest(Connection con, TestOptions options, Status status) {
1491      super(con, options, status);
1492    }
1493
1494    @Override
1495    void onStartup() throws IOException {
1496      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1497    }
1498
1499    @Override
1500    void onTakedown() throws IOException {
1501      table.close();
1502    }
1503  }
1504
1505  /*
1506   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1507   */
1508  static abstract class MetaTest extends TableTest {
1509    protected int keyLength;
1510
1511    MetaTest(Connection con, TestOptions options, Status status) {
1512      super(con, options, status);
1513      keyLength = Integer.toString(opts.perClientRunRows).length();
1514    }
1515
1516    @Override
1517    void onTakedown() throws IOException {
1518      // No clean up
1519    }
1520
1521    /*
1522     * Generates Lexicographically ascending strings
1523     */
1524    protected byte[] getSplitKey(final int i) {
1525      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1526    }
1527
1528  }
1529
1530  static abstract class AsyncTableTest extends AsyncTest {
1531    protected AsyncTable<?> table;
1532
1533    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1534      super(con, options, status);
1535    }
1536
1537    @Override
1538    void onStartup() throws IOException {
1539      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1540    }
1541
1542    @Override
1543    void onTakedown() throws IOException {
1544    }
1545  }
1546
1547  static class AsyncRandomReadTest extends AsyncTableTest {
1548    private final Consistency consistency;
1549    private ArrayList<Get> gets;
1550
1551    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1552      super(con, options, status);
1553      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1554      if (opts.multiGet > 0) {
1555        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1556        this.gets = new ArrayList<>(opts.multiGet);
1557      }
1558    }
1559
1560    @Override
1561    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1562      if (opts.randomSleep > 0) {
1563        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1564      }
1565      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1566      for (int family = 0; family < opts.families; family++) {
1567        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1568        if (opts.addColumns) {
1569          for (int column = 0; column < opts.columns; column++) {
1570            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1571            get.addColumn(familyName, qualifier);
1572          }
1573        } else {
1574          get.addFamily(familyName);
1575        }
1576      }
1577      if (opts.filterAll) {
1578        get.setFilter(new FilterAllFilter());
1579      }
1580      get.setConsistency(consistency);
1581      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1582      try {
1583        if (opts.multiGet > 0) {
1584          this.gets.add(get);
1585          if (this.gets.size() == opts.multiGet) {
1586            Result[] rs =
1587              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1588            updateValueSize(rs);
1589            this.gets.clear();
1590          } else {
1591            return false;
1592          }
1593        } else {
1594          updateValueSize(this.table.get(get).get());
1595        }
1596      } catch (ExecutionException e) {
1597        throw new IOException(e);
1598      }
1599      return true;
1600    }
1601
1602    public static RuntimeException runtime(Throwable e) {
1603      if (e instanceof RuntimeException) {
1604        return (RuntimeException) e;
1605      }
1606      return new RuntimeException(e);
1607    }
1608
1609    public static <V> V propagate(Callable<V> callable) {
1610      try {
1611        return callable.call();
1612      } catch (Exception e) {
1613        throw runtime(e);
1614      }
1615    }
1616
1617    @Override
1618    protected int getReportingPeriod() {
1619      int period = opts.perClientRunRows / 10;
1620      return period == 0 ? opts.perClientRunRows : period;
1621    }
1622
1623    @Override
1624    protected void testTakedown() throws IOException {
1625      if (this.gets != null && this.gets.size() > 0) {
1626        this.table.get(gets);
1627        this.gets.clear();
1628      }
1629      super.testTakedown();
1630    }
1631  }
1632
1633  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1634
1635    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1636      super(con, options, status);
1637    }
1638
1639    @Override
1640    protected byte[] generateRow(final int i) {
1641      return getRandomRow(this.rand, opts.totalRows);
1642    }
1643  }
1644
1645  static class AsyncScanTest extends AsyncTableTest {
1646    private ResultScanner testScanner;
1647    private AsyncTable<?> asyncTable;
1648
1649    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1650      super(con, options, status);
1651    }
1652
1653    @Override
1654    void onStartup() throws IOException {
1655      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1656        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1657    }
1658
1659    @Override
1660    void testTakedown() throws IOException {
1661      if (this.testScanner != null) {
1662        updateScanMetrics(this.testScanner.getScanMetrics());
1663        this.testScanner.close();
1664      }
1665      super.testTakedown();
1666    }
1667
1668    @Override
1669    boolean testRow(final int i, final long startTime) throws IOException {
1670      if (this.testScanner == null) {
1671        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1672          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1673          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1674        for (int family = 0; family < opts.families; family++) {
1675          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1676          if (opts.addColumns) {
1677            for (int column = 0; column < opts.columns; column++) {
1678              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1679              scan.addColumn(familyName, qualifier);
1680            }
1681          } else {
1682            scan.addFamily(familyName);
1683          }
1684        }
1685        if (opts.filterAll) {
1686          scan.setFilter(new FilterAllFilter());
1687        }
1688        this.testScanner = asyncTable.getScanner(scan);
1689      }
1690      Result r = testScanner.next();
1691      updateValueSize(r);
1692      return true;
1693    }
1694  }
1695
1696  static class AsyncSequentialReadTest extends AsyncTableTest {
1697    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1698      super(con, options, status);
1699    }
1700
1701    @Override
1702    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1703      Get get = new Get(format(i));
1704      for (int family = 0; family < opts.families; family++) {
1705        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1706        if (opts.addColumns) {
1707          for (int column = 0; column < opts.columns; column++) {
1708            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1709            get.addColumn(familyName, qualifier);
1710          }
1711        } else {
1712          get.addFamily(familyName);
1713        }
1714      }
1715      if (opts.filterAll) {
1716        get.setFilter(new FilterAllFilter());
1717      }
1718      try {
1719        updateValueSize(table.get(get).get());
1720      } catch (ExecutionException e) {
1721        throw new IOException(e);
1722      }
1723      return true;
1724    }
1725  }
1726
1727  static class AsyncSequentialWriteTest extends AsyncTableTest {
1728    private ArrayList<Put> puts;
1729
1730    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1731      super(con, options, status);
1732      if (opts.multiPut > 0) {
1733        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1734        this.puts = new ArrayList<>(opts.multiPut);
1735      }
1736    }
1737
1738    protected byte[] generateRow(final int i) {
1739      return format(i);
1740    }
1741
1742    @Override
1743    @SuppressWarnings("ReturnValueIgnored")
1744    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1745      byte[] row = generateRow(i);
1746      Put put = new Put(row);
1747      for (int family = 0; family < opts.families; family++) {
1748        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1749        for (int column = 0; column < opts.columns; column++) {
1750          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1751          byte[] value = generateData(this.rand, getValueLength(this.rand));
1752          if (opts.useTags) {
1753            byte[] tag = generateData(this.rand, TAG_LENGTH);
1754            Tag[] tags = new Tag[opts.noOfTags];
1755            for (int n = 0; n < opts.noOfTags; n++) {
1756              Tag t = new ArrayBackedTag((byte) n, tag);
1757              tags[n] = t;
1758            }
1759            KeyValue kv =
1760              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1761            put.add(kv);
1762            updateValueSize(kv.getValueLength());
1763          } else {
1764            put.addColumn(familyName, qualifier, value);
1765            updateValueSize(value.length);
1766          }
1767        }
1768      }
1769      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1770      try {
1771        table.put(put).get();
1772        if (opts.multiPut > 0) {
1773          this.puts.add(put);
1774          if (this.puts.size() == opts.multiPut) {
1775            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1776            this.puts.clear();
1777          } else {
1778            return false;
1779          }
1780        } else {
1781          table.put(put).get();
1782        }
1783      } catch (ExecutionException e) {
1784        throw new IOException(e);
1785      }
1786      return true;
1787    }
1788  }
1789
1790  static abstract class BufferedMutatorTest extends Test {
1791    protected BufferedMutator mutator;
1792    protected Table table;
1793
1794    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1795      super(con, options, status);
1796    }
1797
1798    @Override
1799    void onStartup() throws IOException {
1800      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1801      p.writeBufferSize(opts.bufferSize);
1802      this.mutator = connection.getBufferedMutator(p);
1803      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1804    }
1805
1806    @Override
1807    void onTakedown() throws IOException {
1808      mutator.close();
1809      table.close();
1810    }
1811  }
1812
1813  static class RandomSeekScanTest extends TableTest {
1814    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1815      super(con, options, status);
1816    }
1817
1818    @Override
1819    boolean testRow(final int i, final long startTime) throws IOException {
1820      Scan scan =
1821        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1822          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1823          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1824      FilterList list = new FilterList();
1825      for (int family = 0; family < opts.families; family++) {
1826        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1827        if (opts.addColumns) {
1828          for (int column = 0; column < opts.columns; column++) {
1829            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1830            scan.addColumn(familyName, qualifier);
1831          }
1832        } else {
1833          scan.addFamily(familyName);
1834        }
1835      }
1836      if (opts.filterAll) {
1837        list.addFilter(new FilterAllFilter());
1838      }
1839      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1840      scan.setFilter(list);
1841      ResultScanner s = this.table.getScanner(scan);
1842      try {
1843        for (Result rr; (rr = s.next()) != null;) {
1844          updateValueSize(rr);
1845        }
1846      } finally {
1847        updateScanMetrics(s.getScanMetrics());
1848        s.close();
1849      }
1850      return true;
1851    }
1852
1853    @Override
1854    protected int getReportingPeriod() {
1855      int period = opts.perClientRunRows / 100;
1856      return period == 0 ? opts.perClientRunRows : period;
1857    }
1858
1859  }
1860
1861  static abstract class RandomScanWithRangeTest extends TableTest {
1862    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1863      super(con, options, status);
1864    }
1865
1866    @Override
1867    boolean testRow(final int i, final long startTime) throws IOException {
1868      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1869      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1870        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1871        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1872        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1873      for (int family = 0; family < opts.families; family++) {
1874        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1875        if (opts.addColumns) {
1876          for (int column = 0; column < opts.columns; column++) {
1877            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1878            scan.addColumn(familyName, qualifier);
1879          }
1880        } else {
1881          scan.addFamily(familyName);
1882        }
1883      }
1884      if (opts.filterAll) {
1885        scan.setFilter(new FilterAllFilter());
1886      }
1887      Result r = null;
1888      int count = 0;
1889      ResultScanner s = this.table.getScanner(scan);
1890      try {
1891        for (; (r = s.next()) != null;) {
1892          updateValueSize(r);
1893          count++;
1894        }
1895        if (i % 100 == 0) {
1896          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1897            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1898            count));
1899        }
1900      } finally {
1901        updateScanMetrics(s.getScanMetrics());
1902        s.close();
1903      }
1904      return true;
1905    }
1906
1907    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1908
1909    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1910      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1911      int stop = start + maxRange;
1912      return new Pair<>(format(start), format(stop));
1913    }
1914
1915    @Override
1916    protected int getReportingPeriod() {
1917      int period = opts.perClientRunRows / 100;
1918      return period == 0 ? opts.perClientRunRows : period;
1919    }
1920  }
1921
1922  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1923    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1924      super(con, options, status);
1925    }
1926
1927    @Override
1928    protected Pair<byte[], byte[]> getStartAndStopRow() {
1929      return generateStartAndStopRows(10);
1930    }
1931  }
1932
1933  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1934    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1935      super(con, options, status);
1936    }
1937
1938    @Override
1939    protected Pair<byte[], byte[]> getStartAndStopRow() {
1940      return generateStartAndStopRows(100);
1941    }
1942  }
1943
1944  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1945    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1946      super(con, options, status);
1947    }
1948
1949    @Override
1950    protected Pair<byte[], byte[]> getStartAndStopRow() {
1951      return generateStartAndStopRows(1000);
1952    }
1953  }
1954
1955  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1956    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1957      super(con, options, status);
1958    }
1959
1960    @Override
1961    protected Pair<byte[], byte[]> getStartAndStopRow() {
1962      return generateStartAndStopRows(10000);
1963    }
1964  }
1965
1966  static class RandomReadTest extends TableTest {
1967    private final Consistency consistency;
1968    private ArrayList<Get> gets;
1969
1970    RandomReadTest(Connection con, TestOptions options, Status status) {
1971      super(con, options, status);
1972      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1973      if (opts.multiGet > 0) {
1974        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1975        this.gets = new ArrayList<>(opts.multiGet);
1976      }
1977    }
1978
1979    @Override
1980    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1981      if (opts.randomSleep > 0) {
1982        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1983      }
1984      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1985      for (int family = 0; family < opts.families; family++) {
1986        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1987        if (opts.addColumns) {
1988          for (int column = 0; column < opts.columns; column++) {
1989            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1990            get.addColumn(familyName, qualifier);
1991          }
1992        } else {
1993          get.addFamily(familyName);
1994        }
1995      }
1996      if (opts.filterAll) {
1997        get.setFilter(new FilterAllFilter());
1998      }
1999      get.setConsistency(consistency);
2000      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
2001      if (opts.multiGet > 0) {
2002        this.gets.add(get);
2003        if (this.gets.size() == opts.multiGet) {
2004          Result[] rs = this.table.get(this.gets);
2005          if (opts.replicas > 1) {
2006            long latency = System.nanoTime() - startTime;
2007            updateValueSize(rs, latency);
2008          } else {
2009            updateValueSize(rs);
2010          }
2011          this.gets.clear();
2012        } else {
2013          return false;
2014        }
2015      } else {
2016        if (opts.replicas > 1) {
2017          Result r = this.table.get(get);
2018          long latency = System.nanoTime() - startTime;
2019          updateValueSize(r, latency);
2020        } else {
2021          updateValueSize(this.table.get(get));
2022        }
2023      }
2024      return true;
2025    }
2026
2027    @Override
2028    protected int getReportingPeriod() {
2029      int period = opts.perClientRunRows / 10;
2030      return period == 0 ? opts.perClientRunRows : period;
2031    }
2032
2033    @Override
2034    protected void testTakedown() throws IOException {
2035      if (this.gets != null && this.gets.size() > 0) {
2036        this.table.get(gets);
2037        this.gets.clear();
2038      }
2039      super.testTakedown();
2040    }
2041  }
2042
2043  /*
2044   * Send random reads against fake regions inserted by MetaWriteTest
2045   */
2046  static class MetaRandomReadTest extends MetaTest {
2047    private RegionLocator regionLocator;
2048
2049    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2050      super(con, options, status);
2051      LOG.info("call getRegionLocation");
2052    }
2053
2054    @Override
2055    void onStartup() throws IOException {
2056      super.onStartup();
2057      this.regionLocator = connection.getRegionLocator(table.getName());
2058    }
2059
2060    @Override
2061    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2062      if (opts.randomSleep > 0) {
2063        Thread.sleep(rand.nextInt(opts.randomSleep));
2064      }
2065      HRegionLocation hRegionLocation =
2066        regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true);
2067      LOG.debug("get location for region: " + hRegionLocation);
2068      return true;
2069    }
2070
2071    @Override
2072    protected int getReportingPeriod() {
2073      int period = opts.perClientRunRows / 10;
2074      return period == 0 ? opts.perClientRunRows : period;
2075    }
2076
2077    @Override
2078    protected void testTakedown() throws IOException {
2079      super.testTakedown();
2080    }
2081  }
2082
2083  static class RandomWriteTest extends SequentialWriteTest {
2084    RandomWriteTest(Connection con, TestOptions options, Status status) {
2085      super(con, options, status);
2086    }
2087
2088    @Override
2089    protected byte[] generateRow(final int i) {
2090      return getRandomRow(this.rand, opts.totalRows);
2091    }
2092
2093  }
2094
2095  static class RandomDeleteTest extends SequentialDeleteTest {
2096    RandomDeleteTest(Connection con, TestOptions options, Status status) {
2097      super(con, options, status);
2098    }
2099
2100    @Override
2101    protected byte[] generateRow(final int i) {
2102      return getRandomRow(this.rand, opts.totalRows);
2103    }
2104
2105  }
2106
2107  static class ScanTest extends TableTest {
2108    private ResultScanner testScanner;
2109
2110    ScanTest(Connection con, TestOptions options, Status status) {
2111      super(con, options, status);
2112    }
2113
2114    @Override
2115    void testTakedown() throws IOException {
2116      if (this.testScanner != null) {
2117        this.testScanner.close();
2118      }
2119      super.testTakedown();
2120    }
2121
2122    @Override
2123    boolean testRow(final int i, final long startTime) throws IOException {
2124      if (this.testScanner == null) {
2125        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2126          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2127          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2128        for (int family = 0; family < opts.families; family++) {
2129          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2130          if (opts.addColumns) {
2131            for (int column = 0; column < opts.columns; column++) {
2132              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2133              scan.addColumn(familyName, qualifier);
2134            }
2135          } else {
2136            scan.addFamily(familyName);
2137          }
2138        }
2139        if (opts.filterAll) {
2140          scan.setFilter(new FilterAllFilter());
2141        }
2142        this.testScanner = table.getScanner(scan);
2143      }
2144      Result r = testScanner.next();
2145      updateValueSize(r);
2146      return true;
2147    }
2148  }
2149
2150  static class ReverseScanTest extends TableTest {
2151    private ResultScanner testScanner;
2152
2153    ReverseScanTest(Connection con, TestOptions options, Status status) {
2154      super(con, options, status);
2155    }
2156
2157    @Override
2158    void testTakedown() throws IOException {
2159      if (this.testScanner != null) {
2160        this.testScanner.close();
2161      }
2162      super.testTakedown();
2163    }
2164
2165    @Override
2166    boolean testRow(final int i, final long startTime) throws IOException {
2167      if (this.testScanner == null) {
2168        Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2169          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2170          .setScanMetricsEnabled(true).setReversed(true);
2171        for (int family = 0; family < opts.families; family++) {
2172          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2173          if (opts.addColumns) {
2174            for (int column = 0; column < opts.columns; column++) {
2175              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2176              scan.addColumn(familyName, qualifier);
2177            }
2178          } else {
2179            scan.addFamily(familyName);
2180          }
2181        }
2182        if (opts.filterAll) {
2183          scan.setFilter(new FilterAllFilter());
2184        }
2185        this.testScanner = table.getScanner(scan);
2186      }
2187      Result r = testScanner.next();
2188      updateValueSize(r);
2189      return true;
2190    }
2191  }
2192
2193  /**
2194   * Base class for operations that are CAS-like; that read a value and then set it based off what
2195   * they read. In this category is increment, append, checkAndPut, etc.
2196   * <p>
2197   * These operations also want some concurrency going on. Usually when these tests run, they
2198   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2199   * same key space. We do this with our getStartRow and getLastRow overrides.
2200   */
2201  static abstract class CASTableTest extends TableTest {
2202    private final byte[] qualifier;
2203
2204    CASTableTest(Connection con, TestOptions options, Status status) {
2205      super(con, options, status);
2206      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2207    }
2208
2209    byte[] getQualifier() {
2210      return this.qualifier;
2211    }
2212
2213    @Override
2214    int getStartRow() {
2215      return 0;
2216    }
2217
2218    @Override
2219    int getLastRow() {
2220      return opts.perClientRunRows;
2221    }
2222  }
2223
2224  static class IncrementTest extends CASTableTest {
2225    IncrementTest(Connection con, TestOptions options, Status status) {
2226      super(con, options, status);
2227    }
2228
2229    @Override
2230    boolean testRow(final int i, final long startTime) throws IOException {
2231      Increment increment = new Increment(format(i));
2232      // unlike checkAndXXX tests, which make most sense to do on a single value,
2233      // if multiple families are specified for an increment test we assume it is
2234      // meant to raise the work factor
2235      for (int family = 0; family < opts.families; family++) {
2236        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2237        increment.addColumn(familyName, getQualifier(), 1l);
2238      }
2239      updateValueSize(this.table.increment(increment));
2240      return true;
2241    }
2242  }
2243
2244  static class AppendTest extends CASTableTest {
2245    AppendTest(Connection con, TestOptions options, Status status) {
2246      super(con, options, status);
2247    }
2248
2249    @Override
2250    boolean testRow(final int i, final long startTime) throws IOException {
2251      byte[] bytes = format(i);
2252      Append append = new Append(bytes);
2253      // unlike checkAndXXX tests, which make most sense to do on a single value,
2254      // if multiple families are specified for an append test we assume it is
2255      // meant to raise the work factor
2256      for (int family = 0; family < opts.families; family++) {
2257        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2258        append.addColumn(familyName, getQualifier(), bytes);
2259      }
2260      updateValueSize(this.table.append(append));
2261      return true;
2262    }
2263  }
2264
2265  static class CheckAndMutateTest extends CASTableTest {
2266    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2267      super(con, options, status);
2268    }
2269
2270    @Override
2271    boolean testRow(final int i, final long startTime) throws IOException {
2272      final byte[] bytes = format(i);
2273      // checkAndXXX tests operate on only a single value
2274      // Put a known value so when we go to check it, it is there.
2275      Put put = new Put(bytes);
2276      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2277      this.table.put(put);
2278      RowMutations mutations = new RowMutations(bytes);
2279      mutations.add(put);
2280      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2281        .thenMutate(mutations);
2282      return true;
2283    }
2284  }
2285
2286  static class CheckAndPutTest extends CASTableTest {
2287    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2288      super(con, options, status);
2289    }
2290
2291    @Override
2292    boolean testRow(final int i, final long startTime) throws IOException {
2293      final byte[] bytes = format(i);
2294      // checkAndXXX tests operate on only a single value
2295      // Put a known value so when we go to check it, it is there.
2296      Put put = new Put(bytes);
2297      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2298      this.table.put(put);
2299      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2300        .thenPut(put);
2301      return true;
2302    }
2303  }
2304
2305  static class CheckAndDeleteTest extends CASTableTest {
2306    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2307      super(con, options, status);
2308    }
2309
2310    @Override
2311    boolean testRow(final int i, final long startTime) throws IOException {
2312      final byte[] bytes = format(i);
2313      // checkAndXXX tests operate on only a single value
2314      // Put a known value so when we go to check it, it is there.
2315      Put put = new Put(bytes);
2316      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2317      this.table.put(put);
2318      Delete delete = new Delete(put.getRow());
2319      delete.addColumn(FAMILY_ZERO, getQualifier());
2320      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2321        .thenDelete(delete);
2322      return true;
2323    }
2324  }
2325
2326  /*
2327   * Delete all fake regions inserted to meta table by MetaWriteTest.
2328   */
2329  static class CleanMetaTest extends MetaTest {
2330    CleanMetaTest(Connection con, TestOptions options, Status status) {
2331      super(con, options, status);
2332    }
2333
2334    @Override
2335    boolean testRow(final int i, final long startTime) throws IOException {
2336      try {
2337        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2338          .getRegionLocation(getSplitKey(i), false).getRegion();
2339        LOG.debug("deleting region from meta: " + regionInfo);
2340
2341        Delete delete =
2342          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2343        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2344          t.delete(delete);
2345        }
2346      } catch (IOException ie) {
2347        // Log and continue
2348        LOG.error("cannot find region with start key: " + i);
2349      }
2350      return true;
2351    }
2352  }
2353
2354  static class SequentialReadTest extends TableTest {
2355    SequentialReadTest(Connection con, TestOptions options, Status status) {
2356      super(con, options, status);
2357    }
2358
2359    @Override
2360    boolean testRow(final int i, final long startTime) throws IOException {
2361      Get get = new Get(format(i));
2362      for (int family = 0; family < opts.families; family++) {
2363        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2364        if (opts.addColumns) {
2365          for (int column = 0; column < opts.columns; column++) {
2366            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2367            get.addColumn(familyName, qualifier);
2368          }
2369        } else {
2370          get.addFamily(familyName);
2371        }
2372      }
2373      if (opts.filterAll) {
2374        get.setFilter(new FilterAllFilter());
2375      }
2376      updateValueSize(table.get(get));
2377      return true;
2378    }
2379  }
2380
2381  static class SequentialWriteTest extends BufferedMutatorTest {
2382    private ArrayList<Put> puts;
2383
2384    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2385      super(con, options, status);
2386      if (opts.multiPut > 0) {
2387        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2388        this.puts = new ArrayList<>(opts.multiPut);
2389      }
2390    }
2391
2392    protected byte[] generateRow(final int i) {
2393      return format(i);
2394    }
2395
2396    @Override
2397    boolean testRow(final int i, final long startTime) throws IOException {
2398      byte[] row = generateRow(i);
2399      Put put = new Put(row);
2400      for (int family = 0; family < opts.families; family++) {
2401        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2402        for (int column = 0; column < opts.columns; column++) {
2403          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2404          byte[] value = generateData(this.rand, getValueLength(this.rand));
2405          if (opts.useTags) {
2406            byte[] tag = generateData(this.rand, TAG_LENGTH);
2407            Tag[] tags = new Tag[opts.noOfTags];
2408            for (int n = 0; n < opts.noOfTags; n++) {
2409              Tag t = new ArrayBackedTag((byte) n, tag);
2410              tags[n] = t;
2411            }
2412            KeyValue kv =
2413              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2414            put.add(kv);
2415            updateValueSize(kv.getValueLength());
2416          } else {
2417            put.addColumn(familyName, qualifier, value);
2418            updateValueSize(value.length);
2419          }
2420        }
2421      }
2422      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2423      if (opts.autoFlush) {
2424        if (opts.multiPut > 0) {
2425          this.puts.add(put);
2426          if (this.puts.size() == opts.multiPut) {
2427            table.put(this.puts);
2428            this.puts.clear();
2429          } else {
2430            return false;
2431          }
2432        } else {
2433          table.put(put);
2434        }
2435      } else {
2436        mutator.mutate(put);
2437      }
2438      return true;
2439    }
2440  }
2441
2442  static class SequentialDeleteTest extends BufferedMutatorTest {
2443
2444    SequentialDeleteTest(Connection con, TestOptions options, Status status) {
2445      super(con, options, status);
2446    }
2447
2448    protected byte[] generateRow(final int i) {
2449      return format(i);
2450    }
2451
2452    @Override
2453    boolean testRow(final int i, final long startTime) throws IOException {
2454      byte[] row = generateRow(i);
2455      Delete delete = new Delete(row);
2456      for (int family = 0; family < opts.families; family++) {
2457        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2458        delete.addFamily(familyName);
2459      }
2460      delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2461      if (opts.autoFlush) {
2462        table.delete(delete);
2463      } else {
2464        mutator.mutate(delete);
2465      }
2466      return true;
2467    }
2468  }
2469
2470  /*
2471   * Insert fake regions into meta table with contiguous split keys.
2472   */
2473  static class MetaWriteTest extends MetaTest {
2474
2475    MetaWriteTest(Connection con, TestOptions options, Status status) {
2476      super(con, options, status);
2477    }
2478
2479    @Override
2480    boolean testRow(final int i, final long startTime) throws IOException {
2481      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2482      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2483        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2484      regionInfos.add(regionInfo);
2485      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2486
2487      // write the serverName columns
2488      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2489        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2490        EnvironmentEdgeManager.currentTime());
2491      return true;
2492    }
2493  }
2494
2495  static class FilteredScanTest extends TableTest {
2496    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2497
2498    FilteredScanTest(Connection con, TestOptions options, Status status) {
2499      super(con, options, status);
2500      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2501        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2502          + ". This could take a very long time.");
2503      }
2504    }
2505
2506    @Override
2507    boolean testRow(int i, final long startTime) throws IOException {
2508      byte[] value = generateData(this.rand, getValueLength(this.rand));
2509      Scan scan = constructScan(value);
2510      ResultScanner scanner = null;
2511      try {
2512        scanner = this.table.getScanner(scan);
2513        for (Result r = null; (r = scanner.next()) != null;) {
2514          updateValueSize(r);
2515        }
2516      } finally {
2517        if (scanner != null) {
2518          updateScanMetrics(scanner.getScanMetrics());
2519          scanner.close();
2520        }
2521      }
2522      return true;
2523    }
2524
2525    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2526      FilterList list = new FilterList();
2527      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2528        new BinaryComparator(valuePrefix));
2529      list.addFilter(filter);
2530      if (opts.filterAll) {
2531        list.addFilter(new FilterAllFilter());
2532      }
2533      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2534        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2535        .setScanMetricsEnabled(true);
2536      if (opts.addColumns) {
2537        for (int column = 0; column < opts.columns; column++) {
2538          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2539          scan.addColumn(FAMILY_ZERO, qualifier);
2540        }
2541      } else {
2542        scan.addFamily(FAMILY_ZERO);
2543      }
2544      scan.setFilter(list);
2545      return scan;
2546    }
2547  }
2548
2549  /**
2550   * Compute a throughput rate in MB/s.
2551   * @param rows   Number of records consumed.
2552   * @param timeMs Time taken in milliseconds.
2553   * @return String value with label, ie '123.76 MB/s'
2554   */
2555  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2556    int columns) {
2557    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2558      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2559    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2560      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2561    return FMT.format(mbps) + " MB/s";
2562  }
2563
2564  /*
2565   * Format passed integer.
2566   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2567   * absolute in case number is negative).
2568   */
2569  public static byte[] format(final int number) {
2570    byte[] b = new byte[ROW_LENGTH];
2571    int d = Math.abs(number);
2572    for (int i = b.length - 1; i >= 0; i--) {
2573      b[i] = (byte) ((d % 10) + '0');
2574      d /= 10;
2575    }
2576    return b;
2577  }
2578
2579  /*
2580   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2581   * test, generation of the key and value consumes about 30% of CPU time.
2582   * @return Generated random value to insert into a table cell.
2583   */
2584  public static byte[] generateData(final Random r, int length) {
2585    byte[] b = new byte[length];
2586    int i;
2587
2588    for (i = 0; i < (length - 8); i += 8) {
2589      b[i] = (byte) (65 + r.nextInt(26));
2590      b[i + 1] = b[i];
2591      b[i + 2] = b[i];
2592      b[i + 3] = b[i];
2593      b[i + 4] = b[i];
2594      b[i + 5] = b[i];
2595      b[i + 6] = b[i];
2596      b[i + 7] = b[i];
2597    }
2598
2599    byte a = (byte) (65 + r.nextInt(26));
2600    for (; i < length; i++) {
2601      b[i] = a;
2602    }
2603    return b;
2604  }
2605
2606  static byte[] getRandomRow(final Random random, final int totalRows) {
2607    return format(generateRandomRow(random, totalRows));
2608  }
2609
2610  static int generateRandomRow(final Random random, final int totalRows) {
2611    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2612  }
2613
2614  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2615    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2616    throws IOException, InterruptedException {
2617    status.setStatus(
2618      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2619    long totalElapsedTime;
2620
2621    final TestBase t;
2622    try {
2623      if (AsyncTest.class.isAssignableFrom(cmd)) {
2624        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2625        Constructor<? extends AsyncTest> constructor =
2626          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2627        t = constructor.newInstance(asyncCon, opts, status);
2628      } else {
2629        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2630        Constructor<? extends Test> constructor =
2631          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2632        t = constructor.newInstance(con, opts, status);
2633      }
2634    } catch (NoSuchMethodException e) {
2635      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2636        + ".  It does not provide a constructor as described by "
2637        + "the javadoc comment.  Available constructors are: "
2638        + Arrays.toString(cmd.getConstructors()));
2639    } catch (Exception e) {
2640      throw new IllegalStateException("Failed to construct command class", e);
2641    }
2642    totalElapsedTime = t.test();
2643
2644    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2645      + " for " + opts.perClientRunRows + " rows" + " ("
2646      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2647        getAverageValueLength(opts), opts.families, opts.columns)
2648      + ")");
2649
2650    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2651      t.numOfReplyFromReplica, t.getLatencyHistogram());
2652  }
2653
2654  private static int getAverageValueLength(final TestOptions opts) {
2655    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2656  }
2657
2658  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2659    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2660    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2661    // the TestOptions introspection for us and dump the output in a readable format.
2662    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2663    Admin admin = null;
2664    Connection connection = null;
2665    try {
2666      connection = ConnectionFactory.createConnection(getConf());
2667      admin = connection.getAdmin();
2668      checkTable(admin, opts);
2669    } finally {
2670      if (admin != null) admin.close();
2671      if (connection != null) connection.close();
2672    }
2673    if (opts.nomapred) {
2674      doLocalClients(opts, getConf());
2675    } else {
2676      doMapReduce(opts, getConf());
2677    }
2678  }
2679
2680  protected void printUsage() {
2681    printUsage(PE_COMMAND_SHORTNAME, null);
2682  }
2683
2684  protected static void printUsage(final String message) {
2685    printUsage(PE_COMMAND_SHORTNAME, message);
2686  }
2687
2688  protected static void printUsageAndExit(final String message, final int exitCode) {
2689    printUsage(message);
2690    System.exit(exitCode);
2691  }
2692
2693  protected static void printUsage(final String shortName, final String message) {
2694    if (message != null && message.length() > 0) {
2695      System.err.println(message);
2696    }
2697    System.err.print("Usage: hbase " + shortName);
2698    System.err.println("  <OPTIONS> [-D<property=value>]* <command|class> <nclients>");
2699    System.err.println();
2700    System.err.println("General Options:");
2701    System.err.println(
2702      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2703    System.err
2704      .println(" oneCon          all the threads share the same connection. Default: False");
2705    System.err.println(" connCount          connections all threads share. "
2706      + "For example, if set to 2, then all thread share 2 connection. "
2707      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2708      + "if not, connCount=thread number");
2709
2710    System.err.println(" sampleRate      Execute test on a sample of total "
2711      + "rows. Only supported by randomRead. Default: 1.0");
2712    System.err.println(" period          Report every 'period' rows: "
2713      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2714    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2715    System.err.println(
2716      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2717    System.err.println(" latency         Set to report operation latencies. Default: False");
2718    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2719      + "over lantencyThreshold, unit in millisecond, default 0");
2720    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2721      + " rows have been treated. Default: 0");
2722    System.err
2723      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2724    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2725      + "'valueSize'; set on read for stats on size: Default: Not set.");
2726    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2727      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2728    System.err.println();
2729    System.err.println("Table Creation / Write Tests:");
2730    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2731    System.err.println(
2732      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2733        + ".  In case of randomReads and randomSeekScans this could"
2734        + " be specified along with --size to specify the number of rows to be scanned within"
2735        + " the total range specified by the size.");
2736    System.err.println(
2737      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2738        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2739        + " use size to specify the end range and --rows"
2740        + " specifies the number of rows within that range. " + "Default: 1.0.");
2741    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2742    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2743    System.err.println(
2744      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2745    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2746      + "'valueSize' in zipf form: Default: Not set.");
2747    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2748    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2749    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2750      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2751    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2752      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2753      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2754    System.err.println(
2755      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2756    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2757      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2758    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2759    System.err.println(" columns         Columns to write per row. Default: 1");
2760    System.err
2761      .println(" families        Specify number of column families for the table. Default: 1");
2762    System.err.println();
2763    System.err.println("Read Tests:");
2764    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2765      + " there by not returning any thing back to the client.  Helps to check the server side"
2766      + " performance.  Uses FilterAllFilter internally. ");
2767    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2768      + "by randomRead. Default: disabled");
2769    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2770      + "inmemory as far as possible. Not guaranteed that reads are always served "
2771      + "from memory.  Default: false");
2772    System.err
2773      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2774    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2775    System.err
2776      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2777        + "Uses the CompactingMemstore");
2778    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2779    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2780    System.err.println(
2781      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2782    System.err.println(" caching         Scan caching to use. Default: 30");
2783    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2784    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2785    System.err.println(
2786      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2787    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2788    System.err.println();
2789    System.err.println(" Note: -D properties will be applied to the conf used. ");
2790    System.err.println("  For example: ");
2791    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2792    System.err.println("   -Dmapreduce.task.timeout=60000");
2793    System.err.println();
2794    System.err.println("Command:");
2795    for (CmdDescriptor command : COMMANDS.values()) {
2796      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2797    }
2798    System.err.println();
2799    System.err.println("Class:");
2800    System.err.println("To run any custom implementation of PerformanceEvaluation.Test, "
2801      + "provide the classname of the implementaion class in place of "
2802      + "command name and it will be loaded at runtime from classpath.:");
2803    System.err.println("Please consider to contribute back "
2804      + "this custom test impl into a builtin PE command for the benefit of the community");
2805    System.err.println();
2806    System.err.println("Args:");
2807    System.err.println(" nclients        Integer. Required. Total number of clients "
2808      + "(and HRegionServers) running. 1 <= value <= 500");
2809    System.err.println("Examples:");
2810    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2811    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2812    System.err.println(" To run 10 clients doing increments over ten rows:");
2813    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2814  }
2815
2816  /**
2817   * Parse options passed in via an arguments array. Assumes that array has been split on
2818   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2819   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2820   * arguments.
2821   */
2822  static TestOptions parseOpts(Queue<String> args) {
2823    TestOptions opts = new TestOptions();
2824
2825    String cmd = null;
2826    while ((cmd = args.poll()) != null) {
2827      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2828        // place item back onto queue so that caller knows parsing was incomplete
2829        args.add(cmd);
2830        break;
2831      }
2832
2833      final String nmr = "--nomapred";
2834      if (cmd.startsWith(nmr)) {
2835        opts.nomapred = true;
2836        continue;
2837      }
2838
2839      final String rows = "--rows=";
2840      if (cmd.startsWith(rows)) {
2841        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2842        continue;
2843      }
2844
2845      final String cycles = "--cycles=";
2846      if (cmd.startsWith(cycles)) {
2847        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2848        continue;
2849      }
2850
2851      final String sampleRate = "--sampleRate=";
2852      if (cmd.startsWith(sampleRate)) {
2853        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2854        continue;
2855      }
2856
2857      final String table = "--table=";
2858      if (cmd.startsWith(table)) {
2859        opts.tableName = cmd.substring(table.length());
2860        continue;
2861      }
2862
2863      final String startRow = "--startRow=";
2864      if (cmd.startsWith(startRow)) {
2865        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2866        continue;
2867      }
2868
2869      final String compress = "--compress=";
2870      if (cmd.startsWith(compress)) {
2871        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2872        continue;
2873      }
2874
2875      final String encryption = "--encryption=";
2876      if (cmd.startsWith(encryption)) {
2877        opts.encryption = cmd.substring(encryption.length());
2878        continue;
2879      }
2880
2881      final String traceRate = "--traceRate=";
2882      if (cmd.startsWith(traceRate)) {
2883        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2884        continue;
2885      }
2886
2887      final String blockEncoding = "--blockEncoding=";
2888      if (cmd.startsWith(blockEncoding)) {
2889        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2890        continue;
2891      }
2892
2893      final String flushCommits = "--flushCommits=";
2894      if (cmd.startsWith(flushCommits)) {
2895        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2896        continue;
2897      }
2898
2899      final String writeToWAL = "--writeToWAL=";
2900      if (cmd.startsWith(writeToWAL)) {
2901        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2902        continue;
2903      }
2904
2905      final String presplit = "--presplit=";
2906      if (cmd.startsWith(presplit)) {
2907        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2908        continue;
2909      }
2910
2911      final String inMemory = "--inmemory=";
2912      if (cmd.startsWith(inMemory)) {
2913        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2914        continue;
2915      }
2916
2917      final String autoFlush = "--autoFlush=";
2918      if (cmd.startsWith(autoFlush)) {
2919        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2920        continue;
2921      }
2922
2923      final String onceCon = "--oneCon=";
2924      if (cmd.startsWith(onceCon)) {
2925        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2926        continue;
2927      }
2928
2929      final String connCount = "--connCount=";
2930      if (cmd.startsWith(connCount)) {
2931        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2932        continue;
2933      }
2934
2935      final String latencyThreshold = "--latencyThreshold=";
2936      if (cmd.startsWith(latencyThreshold)) {
2937        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2938        continue;
2939      }
2940
2941      final String latency = "--latency";
2942      if (cmd.startsWith(latency)) {
2943        opts.reportLatency = true;
2944        continue;
2945      }
2946
2947      final String multiGet = "--multiGet=";
2948      if (cmd.startsWith(multiGet)) {
2949        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2950        continue;
2951      }
2952
2953      final String multiPut = "--multiPut=";
2954      if (cmd.startsWith(multiPut)) {
2955        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2956        continue;
2957      }
2958
2959      final String useTags = "--usetags=";
2960      if (cmd.startsWith(useTags)) {
2961        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2962        continue;
2963      }
2964
2965      final String noOfTags = "--numoftags=";
2966      if (cmd.startsWith(noOfTags)) {
2967        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2968        continue;
2969      }
2970
2971      final String replicas = "--replicas=";
2972      if (cmd.startsWith(replicas)) {
2973        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2974        continue;
2975      }
2976
2977      final String filterOutAll = "--filterAll";
2978      if (cmd.startsWith(filterOutAll)) {
2979        opts.filterAll = true;
2980        continue;
2981      }
2982
2983      final String size = "--size=";
2984      if (cmd.startsWith(size)) {
2985        opts.size = Float.parseFloat(cmd.substring(size.length()));
2986        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2987        continue;
2988      }
2989
2990      final String splitPolicy = "--splitPolicy=";
2991      if (cmd.startsWith(splitPolicy)) {
2992        opts.splitPolicy = cmd.substring(splitPolicy.length());
2993        continue;
2994      }
2995
2996      final String randomSleep = "--randomSleep=";
2997      if (cmd.startsWith(randomSleep)) {
2998        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2999        continue;
3000      }
3001
3002      final String measureAfter = "--measureAfter=";
3003      if (cmd.startsWith(measureAfter)) {
3004        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
3005        continue;
3006      }
3007
3008      final String bloomFilter = "--bloomFilter=";
3009      if (cmd.startsWith(bloomFilter)) {
3010        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
3011        continue;
3012      }
3013
3014      final String blockSize = "--blockSize=";
3015      if (cmd.startsWith(blockSize)) {
3016        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
3017        continue;
3018      }
3019
3020      final String valueSize = "--valueSize=";
3021      if (cmd.startsWith(valueSize)) {
3022        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
3023        continue;
3024      }
3025
3026      final String valueRandom = "--valueRandom";
3027      if (cmd.startsWith(valueRandom)) {
3028        opts.valueRandom = true;
3029        continue;
3030      }
3031
3032      final String valueZipf = "--valueZipf";
3033      if (cmd.startsWith(valueZipf)) {
3034        opts.valueZipf = true;
3035        continue;
3036      }
3037
3038      final String period = "--period=";
3039      if (cmd.startsWith(period)) {
3040        opts.period = Integer.parseInt(cmd.substring(period.length()));
3041        continue;
3042      }
3043
3044      final String addColumns = "--addColumns=";
3045      if (cmd.startsWith(addColumns)) {
3046        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
3047        continue;
3048      }
3049
3050      final String inMemoryCompaction = "--inmemoryCompaction=";
3051      if (cmd.startsWith(inMemoryCompaction)) {
3052        opts.inMemoryCompaction =
3053          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
3054        continue;
3055      }
3056
3057      final String columns = "--columns=";
3058      if (cmd.startsWith(columns)) {
3059        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
3060        continue;
3061      }
3062
3063      final String families = "--families=";
3064      if (cmd.startsWith(families)) {
3065        opts.families = Integer.parseInt(cmd.substring(families.length()));
3066        continue;
3067      }
3068
3069      final String caching = "--caching=";
3070      if (cmd.startsWith(caching)) {
3071        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
3072        continue;
3073      }
3074
3075      final String asyncPrefetch = "--asyncPrefetch";
3076      if (cmd.startsWith(asyncPrefetch)) {
3077        opts.asyncPrefetch = true;
3078        continue;
3079      }
3080
3081      final String cacheBlocks = "--cacheBlocks=";
3082      if (cmd.startsWith(cacheBlocks)) {
3083        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
3084        continue;
3085      }
3086
3087      final String scanReadType = "--scanReadType=";
3088      if (cmd.startsWith(scanReadType)) {
3089        opts.scanReadType =
3090          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
3091        continue;
3092      }
3093
3094      final String bufferSize = "--bufferSize=";
3095      if (cmd.startsWith(bufferSize)) {
3096        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
3097        continue;
3098      }
3099
3100      final String commandPropertiesFile = "--commandPropertiesFile=";
3101      if (cmd.startsWith(commandPropertiesFile)) {
3102        String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length()));
3103        Properties properties = new Properties();
3104        try {
3105          properties
3106            .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName));
3107          opts.commandProperties = properties;
3108        } catch (IOException e) {
3109          LOG.error("Failed to load metricIds from properties file", e);
3110        }
3111        continue;
3112      }
3113
3114      validateParsedOpts(opts);
3115
3116      if (isCommandClass(cmd)) {
3117        opts.cmdName = cmd;
3118        try {
3119          opts.numClientThreads = Integer.parseInt(args.remove());
3120        } catch (NoSuchElementException | NumberFormatException e) {
3121          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
3122        }
3123        opts = calculateRowsAndSize(opts);
3124        break;
3125      } else {
3126        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
3127      }
3128
3129      // Not matching any option or command.
3130      System.err.println("Error: Wrong option or command: " + cmd);
3131      args.add(cmd);
3132      break;
3133    }
3134    return opts;
3135  }
3136
3137  /**
3138   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3139   */
3140  private static void validateParsedOpts(TestOptions opts) {
3141
3142    if (!opts.autoFlush && opts.multiPut > 0) {
3143      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3144    }
3145
3146    if (opts.oneCon && opts.connCount > 1) {
3147      throw new IllegalArgumentException(
3148        "oneCon is set to true, " + "connCount should not bigger than 1");
3149    }
3150
3151    if (opts.valueZipf && opts.valueRandom) {
3152      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3153    }
3154  }
3155
3156  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3157    int rowsPerGB = getRowsPerGB(opts);
3158    if (
3159      (opts.getCmdName() != null
3160        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3161        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3162    ) {
3163      opts.totalRows = (int) (opts.size * rowsPerGB);
3164    } else if (opts.size != DEFAULT_OPTS.size) {
3165      // total size in GB specified
3166      opts.totalRows = (int) (opts.size * rowsPerGB);
3167      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3168    } else {
3169      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3170      // Cast to float to ensure floating-point division
3171      opts.size = (float) opts.totalRows / rowsPerGB;
3172    }
3173    return opts;
3174  }
3175
3176  static int getRowsPerGB(final TestOptions opts) {
3177    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3178      * opts.getColumns());
3179  }
3180
3181  @Override
3182  public int run(String[] args) throws Exception {
3183    // Process command-line args. TODO: Better cmd-line processing
3184    // (but hopefully something not as painful as cli options).
3185    int errCode = -1;
3186    if (args.length < 1) {
3187      printUsage();
3188      return errCode;
3189    }
3190
3191    try {
3192      LinkedList<String> argv = new LinkedList<>();
3193      argv.addAll(Arrays.asList(args));
3194      TestOptions opts = parseOpts(argv);
3195
3196      // args remaining, print help and exit
3197      if (!argv.isEmpty()) {
3198        errCode = 0;
3199        printUsage();
3200        return errCode;
3201      }
3202
3203      // must run at least 1 client
3204      if (opts.numClientThreads <= 0) {
3205        throw new IllegalArgumentException("Number of clients must be > 0");
3206      }
3207
3208      // cmdName should not be null, print help and exit
3209      if (opts.cmdName == null) {
3210        printUsage();
3211        return errCode;
3212      }
3213
3214      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3215      if (cmdClass != null) {
3216        runTest(cmdClass, opts);
3217        errCode = 0;
3218      }
3219
3220    } catch (Exception e) {
3221      e.printStackTrace();
3222    }
3223
3224    return errCode;
3225  }
3226
3227  private static boolean isCommandClass(String cmd) {
3228    return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd);
3229  }
3230
3231  private static boolean isCustomTestClass(String cmd) {
3232    Class<? extends Test> cmdClass;
3233    try {
3234      cmdClass =
3235        (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd);
3236      addCommandDescriptor(cmdClass, cmd, "custom command");
3237      return true;
3238    } catch (Throwable th) {
3239      LOG.info("No class found for command: " + cmd, th);
3240      return false;
3241    }
3242  }
3243
3244  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3245    CmdDescriptor descriptor = COMMANDS.get(cmd);
3246    return descriptor != null ? descriptor.getCmdClass() : null;
3247  }
3248
3249  public static void main(final String[] args) throws Exception {
3250    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3251    System.exit(res);
3252  }
3253}