001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Properties;
040import java.util.Queue;
041import java.util.Random;
042import java.util.TreeMap;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.ThreadLocalRandom;
049import org.apache.commons.lang3.StringUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.conf.Configured;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.Append;
056import org.apache.hadoop.hbase.client.AsyncConnection;
057import org.apache.hadoop.hbase.client.AsyncTable;
058import org.apache.hadoop.hbase.client.BufferedMutator;
059import org.apache.hadoop.hbase.client.BufferedMutatorParams;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
061import org.apache.hadoop.hbase.client.Connection;
062import org.apache.hadoop.hbase.client.ConnectionFactory;
063import org.apache.hadoop.hbase.client.Consistency;
064import org.apache.hadoop.hbase.client.Delete;
065import org.apache.hadoop.hbase.client.Durability;
066import org.apache.hadoop.hbase.client.Get;
067import org.apache.hadoop.hbase.client.Increment;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.RegionInfo;
070import org.apache.hadoop.hbase.client.RegionInfoBuilder;
071import org.apache.hadoop.hbase.client.RegionLocator;
072import org.apache.hadoop.hbase.client.Result;
073import org.apache.hadoop.hbase.client.ResultScanner;
074import org.apache.hadoop.hbase.client.RowMutations;
075import org.apache.hadoop.hbase.client.Scan;
076import org.apache.hadoop.hbase.client.Table;
077import org.apache.hadoop.hbase.client.TableDescriptor;
078import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
079import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
080import org.apache.hadoop.hbase.filter.BinaryComparator;
081import org.apache.hadoop.hbase.filter.Filter;
082import org.apache.hadoop.hbase.filter.FilterAllFilter;
083import org.apache.hadoop.hbase.filter.FilterList;
084import org.apache.hadoop.hbase.filter.PageFilter;
085import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
086import org.apache.hadoop.hbase.filter.WhileMatchFilter;
087import org.apache.hadoop.hbase.io.compress.Compression;
088import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
089import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
090import org.apache.hadoop.hbase.regionserver.BloomType;
091import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
092import org.apache.hadoop.hbase.trace.TraceUtil;
093import org.apache.hadoop.hbase.util.ByteArrayHashKey;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.GsonUtil;
097import org.apache.hadoop.hbase.util.Hash;
098import org.apache.hadoop.hbase.util.MurmurHash;
099import org.apache.hadoop.hbase.util.Pair;
100import org.apache.hadoop.hbase.util.RandomDistribution;
101import org.apache.hadoop.hbase.util.YammerHistogramUtils;
102import org.apache.hadoop.io.LongWritable;
103import org.apache.hadoop.io.Text;
104import org.apache.hadoop.mapreduce.Job;
105import org.apache.hadoop.mapreduce.Mapper;
106import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
108import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
109import org.apache.hadoop.util.Tool;
110import org.apache.hadoop.util.ToolRunner;
111import org.apache.yetus.audience.InterfaceAudience;
112import org.slf4j.Logger;
113import org.slf4j.LoggerFactory;
114
115import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
117import org.apache.hbase.thirdparty.com.google.gson.Gson;
118
119/**
120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
122 * etc.). Pass on the command-line which test to run and how many clients are participating in this
123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
124 * <p>
125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
127 * pages 8-10.
128 * <p>
129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
131 * about 1GB of data, unless specified otherwise.
132 */
133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
134public class PerformanceEvaluation extends Configured implements Tool {
135  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
136  static final String RANDOM_READ = "randomRead";
137  static final String PE_COMMAND_SHORTNAME = "pe";
138  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
139  private static final Gson GSON = GsonUtil.createGson().create();
140
141  public static final String TABLE_NAME = "TestTable";
142  public static final String FAMILY_NAME_BASE = "info";
143  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
144  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
145  public static final int DEFAULT_VALUE_LENGTH = 1000;
146  public static final int ROW_LENGTH = 26;
147
148  private static final int ONE_GB = 1024 * 1024 * 1000;
149  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
150  // TODO : should we make this configurable
151  private static final int TAG_LENGTH = 256;
152  private static final DecimalFormat FMT = new DecimalFormat("0.##");
153  private static final MathContext CXT = MathContext.DECIMAL64;
154  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
155  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
156  private static final TestOptions DEFAULT_OPTS = new TestOptions();
157
158  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
159  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
160
161  static {
162    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
163      "Run async random read test");
164    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
165      "Run async random write test");
166    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
167      "Run async sequential read test");
168    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
169      "Run async sequential write test");
170    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
171    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
172    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
173    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
174      "Run random seek and scan 100 test");
175    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
176      "Run random seek scan with both start and stop row (max 10 rows)");
177    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
178      "Run random seek scan with both start and stop row (max 100 rows)");
179    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
180      "Run random seek scan with both start and stop row (max 1000 rows)");
181    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
182      "Run random seek scan with both start and stop row (max 10000 rows)");
183    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
184    addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
185    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
186    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
187    addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
188      "Run sequential delete test");
189    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
190      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
191    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
192    addCommandDescriptor(ReverseScanTest.class, "reverseScan",
193      "Run reverse scan test (read every row)");
194    addCommandDescriptor(FilteredScanTest.class, "filterScan",
195      "Run scan test using a filter to find a specific row based on it's value "
196        + "(make sure to use --rows=20)");
197    addCommandDescriptor(IncrementTest.class, "increment",
198      "Increment on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(AppendTest.class, "append",
200      "Append on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
202      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
204      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
205    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
206      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
207    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
208      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
209  }
210
211  /**
212   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
213   * associated properties.
214   */
215  protected static enum Counter {
216    /** elapsed time */
217    ELAPSED_TIME,
218    /** number of rows */
219    ROWS
220  }
221
222  protected static class RunResult implements Comparable<RunResult> {
223    public RunResult(long duration, Histogram hist) {
224      this.duration = duration;
225      this.hist = hist;
226      numbOfReplyOverThreshold = 0;
227      numOfReplyFromReplica = 0;
228    }
229
230    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
231      Histogram hist) {
232      this.duration = duration;
233      this.hist = hist;
234      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
235      this.numOfReplyFromReplica = numOfReplyFromReplica;
236    }
237
238    public final long duration;
239    public final Histogram hist;
240    public final long numbOfReplyOverThreshold;
241    public final long numOfReplyFromReplica;
242
243    @Override
244    public String toString() {
245      return Long.toString(duration);
246    }
247
248    @Override
249    public int compareTo(RunResult o) {
250      return Long.compare(this.duration, o.duration);
251    }
252  }
253
254  /**
255   * Constructor
256   * @param conf Configuration object
257   */
258  public PerformanceEvaluation(final Configuration conf) {
259    super(conf);
260  }
261
262  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
263    String description) {
264    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
265    COMMANDS.put(name, cmdDescriptor);
266  }
267
268  /**
269   * Implementations can have their status set.
270   */
271  interface Status {
272    /**
273     * Sets status
274     * @param msg status message
275     */
276    void setStatus(final String msg) throws IOException;
277  }
278
279  /**
280   * MapReduce job that runs a performance evaluation client in each map task.
281   */
282  public static class EvaluationMapTask
283    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
284
285    /** configuration parameter name that contains the command */
286    public final static String CMD_KEY = "EvaluationMapTask.command";
287    /** configuration parameter name that contains the PE impl */
288    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
289
290    private Class<? extends Test> cmd;
291
292    @Override
293    protected void setup(Context context) throws IOException, InterruptedException {
294      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
295
296      // this is required so that extensions of PE are instantiated within the
297      // map reduce task...
298      Class<? extends PerformanceEvaluation> peClass =
299        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
300      try {
301        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
302      } catch (Exception e) {
303        throw new IllegalStateException("Could not instantiate PE instance", e);
304      }
305    }
306
307    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
308      try {
309        return Class.forName(className).asSubclass(type);
310      } catch (ClassNotFoundException e) {
311        throw new IllegalStateException("Could not find class for name: " + className, e);
312      }
313    }
314
315    @Override
316    protected void map(LongWritable key, Text value, final Context context)
317      throws IOException, InterruptedException {
318
319      Status status = new Status() {
320        @Override
321        public void setStatus(String msg) {
322          context.setStatus(msg);
323        }
324      };
325
326      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
327      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
328      final Connection con = ConnectionFactory.createConnection(conf);
329      AsyncConnection asyncCon = null;
330      try {
331        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
332      } catch (ExecutionException e) {
333        throw new IOException(e);
334      }
335
336      // Evaluation task
337      RunResult result =
338        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
339      // Collect how much time the thing took. Report as map output and
340      // to the ELAPSED_TIME counter.
341      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
342      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
343      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
344      context.progress();
345    }
346  }
347
348  /*
349   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
350   * is specified or when the existing table's region replica count doesn't match {@code
351   * opts.replicas}.
352   */
353  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
354    TableName tableName = TableName.valueOf(opts.tableName);
355    boolean needsDelete = false, exists = admin.tableExists(tableName);
356    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
357      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
358    boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
359    if (!exists && (isReadCmd || isDeleteCmd)) {
360      throw new IllegalStateException(
361        "Must specify an existing table for read commands. Run a write command first.");
362    }
363    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
364    byte[][] splits = getSplits(opts);
365
366    // recreate the table when user has requested presplit or when existing
367    // {RegionSplitPolicy,replica count} does not match requested, or when the
368    // number of column families does not match requested.
369    if (
370      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions
371        && opts.presplitRegions != admin.getRegions(tableName).size())
372        || (!isReadCmd && desc != null
373          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
374        || (!(isReadCmd || isDeleteCmd) && desc != null
375          && desc.getRegionReplication() != opts.replicas)
376        || (desc != null && desc.getColumnFamilyCount() != opts.families)
377    ) {
378      needsDelete = true;
379      // wait, why did it delete my table?!?
380      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
381        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
382        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
383        .add("replicas", opts.replicas).add("families", opts.families).toString());
384    }
385
386    // remove an existing table
387    if (needsDelete) {
388      if (admin.isTableEnabled(tableName)) {
389        admin.disableTable(tableName);
390      }
391      admin.deleteTable(tableName);
392    }
393
394    // table creation is necessary
395    if (!exists || needsDelete) {
396      desc = getTableDescriptor(opts);
397      if (splits != null) {
398        if (LOG.isDebugEnabled()) {
399          for (int i = 0; i < splits.length; i++) {
400            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
401          }
402        }
403      }
404      if (splits != null) {
405        admin.createTable(desc, splits);
406      } else {
407        admin.createTable(desc);
408      }
409      LOG.info("Table " + desc + " created");
410    }
411    return admin.tableExists(tableName);
412  }
413
414  /**
415   * Create an HTableDescriptor from provided TestOptions.
416   */
417  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
418    TableDescriptorBuilder builder =
419      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
420
421    for (int family = 0; family < opts.families; family++) {
422      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
423      ColumnFamilyDescriptorBuilder cfBuilder =
424        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
425      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
426      cfBuilder.setCompressionType(opts.compression);
427      cfBuilder.setEncryptionType(opts.encryption);
428      cfBuilder.setBloomFilterType(opts.bloomType);
429      cfBuilder.setBlocksize(opts.blockSize);
430      if (opts.inMemoryCF) {
431        cfBuilder.setInMemory(true);
432      }
433      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
434      builder.setColumnFamily(cfBuilder.build());
435    }
436    if (opts.replicas != DEFAULT_OPTS.replicas) {
437      builder.setRegionReplication(opts.replicas);
438    }
439    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
440      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
441    }
442    return builder.build();
443  }
444
445  /**
446   * generates splits based on total number of rows and specified split regions
447   */
448  protected static byte[][] getSplits(TestOptions opts) {
449    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
450
451    int numSplitPoints = opts.presplitRegions - 1;
452    byte[][] splits = new byte[numSplitPoints][];
453    int jump = opts.totalRows / opts.presplitRegions;
454    for (int i = 0; i < numSplitPoints; i++) {
455      int rowkey = jump * (1 + i);
456      splits[i] = format(rowkey);
457    }
458    return splits;
459  }
460
461  static void setupConnectionCount(final TestOptions opts) {
462    if (opts.oneCon) {
463      opts.connCount = 1;
464    } else {
465      if (opts.connCount == -1) {
466        // set to thread number if connCount is not set
467        opts.connCount = opts.numClientThreads;
468      }
469    }
470  }
471
472  /*
473   * Run all clients in this vm each to its own thread.
474   */
475  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
476    throws IOException, InterruptedException, ExecutionException {
477    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
478    assert cmd != null;
479    @SuppressWarnings("unchecked")
480    Future<RunResult>[] threads = new Future[opts.numClientThreads];
481    RunResult[] results = new RunResult[opts.numClientThreads];
482    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
483      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
484    setupConnectionCount(opts);
485    final Connection[] cons = new Connection[opts.connCount];
486    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
487    for (int i = 0; i < opts.connCount; i++) {
488      cons[i] = ConnectionFactory.createConnection(conf);
489      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
490    }
491    LOG
492      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
493    for (int i = 0; i < threads.length; i++) {
494      final int index = i;
495      threads[i] = pool.submit(new Callable<RunResult>() {
496        @Override
497        public RunResult call() throws Exception {
498          TestOptions threadOpts = new TestOptions(opts);
499          final Connection con = cons[index % cons.length];
500          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
501          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
502          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
503            @Override
504            public void setStatus(final String msg) throws IOException {
505              LOG.info(msg);
506            }
507          });
508          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
509            + "ms over " + threadOpts.perClientRunRows + " rows");
510          if (opts.latencyThreshold > 0) {
511            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
512              + "(ms) is " + run.numbOfReplyOverThreshold);
513          }
514          return run;
515        }
516      });
517    }
518    pool.shutdown();
519
520    for (int i = 0; i < threads.length; i++) {
521      try {
522        results[i] = threads[i].get();
523      } catch (ExecutionException e) {
524        throw new IOException(e.getCause());
525      }
526    }
527    final String test = cmd.getSimpleName();
528    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
529    Arrays.sort(results);
530    long total = 0;
531    float avgLatency = 0;
532    float avgTPS = 0;
533    long replicaWins = 0;
534    for (RunResult result : results) {
535      total += result.duration;
536      avgLatency += result.hist.getSnapshot().getMean();
537      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
538      replicaWins += result.numOfReplyFromReplica;
539    }
540    avgTPS *= 1000; // ms to second
541    avgLatency = avgLatency / results.length;
542    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
543      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
544    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
545    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
546    if (opts.replicas > 1) {
547      LOG.info("[results from replica regions] " + replicaWins);
548    }
549
550    for (int i = 0; i < opts.connCount; i++) {
551      cons[i].close();
552      asyncCons[i].close();
553    }
554
555    return results;
556  }
557
558  /*
559   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
560   * out an input file with instruction per client regards which row they are to start on.
561   * @param cmd Command to run.
562   */
563  static Job doMapReduce(TestOptions opts, final Configuration conf)
564    throws IOException, InterruptedException, ClassNotFoundException {
565    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
566    assert cmd != null;
567    Path inputDir = writeInputFile(conf, opts);
568    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
569    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
570    Job job = Job.getInstance(conf);
571    job.setJarByClass(PerformanceEvaluation.class);
572    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
573
574    job.setInputFormatClass(NLineInputFormat.class);
575    NLineInputFormat.setInputPaths(job, inputDir);
576    // this is default, but be explicit about it just in case.
577    NLineInputFormat.setNumLinesPerSplit(job, 1);
578
579    job.setOutputKeyClass(LongWritable.class);
580    job.setOutputValueClass(LongWritable.class);
581
582    job.setMapperClass(EvaluationMapTask.class);
583    job.setReducerClass(LongSumReducer.class);
584
585    job.setNumReduceTasks(1);
586
587    job.setOutputFormatClass(TextOutputFormat.class);
588    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
589
590    TableMapReduceUtil.addDependencyJars(job);
591    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
592                                                                                            // metrics
593      Gson.class, // gson
594      FilterAllFilter.class // hbase-server tests jar
595    );
596
597    TableMapReduceUtil.initCredentials(job);
598
599    job.waitForCompletion(true);
600    return job;
601  }
602
603  /**
604   * Each client has one mapper to do the work, and client do the resulting count in a map task.
605   */
606
607  static String JOB_INPUT_FILENAME = "input.txt";
608
609  /*
610   * Write input file of offsets-per-client for the mapreduce job.
611   * @param c Configuration
612   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
613   */
614  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
615    return writeInputFile(c, opts, new Path("."));
616  }
617
618  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
619    throws IOException {
620    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
621    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
622    Path inputDir = new Path(jobdir, "inputs");
623
624    FileSystem fs = FileSystem.get(c);
625    fs.mkdirs(inputDir);
626
627    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
628    PrintStream out = new PrintStream(fs.create(inputFile));
629    // Make input random.
630    Map<Integer, String> m = new TreeMap<>();
631    Hash h = MurmurHash.getInstance();
632    int perClientRows = (opts.totalRows / opts.numClientThreads);
633    try {
634      for (int j = 0; j < opts.numClientThreads; j++) {
635        TestOptions next = new TestOptions(opts);
636        next.startRow = j * perClientRows;
637        next.perClientRunRows = perClientRows;
638        String s = GSON.toJson(next);
639        LOG.info("Client=" + j + ", input=" + s);
640        byte[] b = Bytes.toBytes(s);
641        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
642        m.put(hash, s);
643      }
644      for (Map.Entry<Integer, String> e : m.entrySet()) {
645        out.println(e.getValue());
646      }
647    } finally {
648      out.close();
649    }
650    return inputDir;
651  }
652
653  /**
654   * Describes a command.
655   */
656  static class CmdDescriptor {
657    private Class<? extends TestBase> cmdClass;
658    private String name;
659    private String description;
660
661    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
662      this.cmdClass = cmdClass;
663      this.name = name;
664      this.description = description;
665    }
666
667    public Class<? extends TestBase> getCmdClass() {
668      return cmdClass;
669    }
670
671    public String getName() {
672      return name;
673    }
674
675    public String getDescription() {
676      return description;
677    }
678  }
679
680  /**
681   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
682   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
683   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
684   * need to add to the clone constructor below copying your new option from the 'that' to the
685   * 'this'. Look for 'clone' below.
686   */
687  static class TestOptions {
688    String cmdName = null;
689    boolean nomapred = false;
690    boolean filterAll = false;
691    int startRow = 0;
692    float size = 1.0f;
693    int perClientRunRows = DEFAULT_ROWS_PER_GB;
694    int numClientThreads = 1;
695    int totalRows = DEFAULT_ROWS_PER_GB;
696    int measureAfter = 0;
697    float sampleRate = 1.0f;
698    /**
699     * @deprecated Useless after switching to OpenTelemetry
700     */
701    @Deprecated
702    double traceRate = 0.0;
703    String tableName = TABLE_NAME;
704    boolean flushCommits = true;
705    boolean writeToWAL = true;
706    boolean autoFlush = false;
707    boolean oneCon = false;
708    int connCount = -1; // wil decide the actual num later
709    boolean useTags = false;
710    int noOfTags = 1;
711    boolean reportLatency = false;
712    int multiGet = 0;
713    int multiPut = 0;
714    int randomSleep = 0;
715    boolean inMemoryCF = false;
716    int presplitRegions = 0;
717    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
718    String splitPolicy = null;
719    Compression.Algorithm compression = Compression.Algorithm.NONE;
720    String encryption = null;
721    BloomType bloomType = BloomType.ROW;
722    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
723    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
724    boolean valueRandom = false;
725    boolean valueZipf = false;
726    int valueSize = DEFAULT_VALUE_LENGTH;
727    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
728    int cycles = 1;
729    int columns = 1;
730    int families = 1;
731    int caching = 30;
732    int latencyThreshold = 0; // in millsecond
733    boolean addColumns = true;
734    MemoryCompactionPolicy inMemoryCompaction =
735      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
736    boolean asyncPrefetch = false;
737    boolean cacheBlocks = true;
738    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
739    long bufferSize = 2l * 1024l * 1024l;
740    Properties commandProperties;
741
742    public TestOptions() {
743    }
744
745    /**
746     * Clone constructor.
747     * @param that Object to copy from.
748     */
749    public TestOptions(TestOptions that) {
750      this.cmdName = that.cmdName;
751      this.cycles = that.cycles;
752      this.nomapred = that.nomapred;
753      this.startRow = that.startRow;
754      this.size = that.size;
755      this.perClientRunRows = that.perClientRunRows;
756      this.numClientThreads = that.numClientThreads;
757      this.totalRows = that.totalRows;
758      this.sampleRate = that.sampleRate;
759      this.traceRate = that.traceRate;
760      this.tableName = that.tableName;
761      this.flushCommits = that.flushCommits;
762      this.writeToWAL = that.writeToWAL;
763      this.autoFlush = that.autoFlush;
764      this.oneCon = that.oneCon;
765      this.connCount = that.connCount;
766      this.useTags = that.useTags;
767      this.noOfTags = that.noOfTags;
768      this.reportLatency = that.reportLatency;
769      this.latencyThreshold = that.latencyThreshold;
770      this.multiGet = that.multiGet;
771      this.multiPut = that.multiPut;
772      this.inMemoryCF = that.inMemoryCF;
773      this.presplitRegions = that.presplitRegions;
774      this.replicas = that.replicas;
775      this.splitPolicy = that.splitPolicy;
776      this.compression = that.compression;
777      this.encryption = that.encryption;
778      this.blockEncoding = that.blockEncoding;
779      this.filterAll = that.filterAll;
780      this.bloomType = that.bloomType;
781      this.blockSize = that.blockSize;
782      this.valueRandom = that.valueRandom;
783      this.valueZipf = that.valueZipf;
784      this.valueSize = that.valueSize;
785      this.period = that.period;
786      this.randomSleep = that.randomSleep;
787      this.measureAfter = that.measureAfter;
788      this.addColumns = that.addColumns;
789      this.columns = that.columns;
790      this.families = that.families;
791      this.caching = that.caching;
792      this.inMemoryCompaction = that.inMemoryCompaction;
793      this.asyncPrefetch = that.asyncPrefetch;
794      this.cacheBlocks = that.cacheBlocks;
795      this.scanReadType = that.scanReadType;
796      this.bufferSize = that.bufferSize;
797      this.commandProperties = that.commandProperties;
798    }
799
800    public Properties getCommandProperties() {
801      return commandProperties;
802    }
803
804    public int getCaching() {
805      return this.caching;
806    }
807
808    public void setCaching(final int caching) {
809      this.caching = caching;
810    }
811
812    public int getColumns() {
813      return this.columns;
814    }
815
816    public void setColumns(final int columns) {
817      this.columns = columns;
818    }
819
820    public int getFamilies() {
821      return this.families;
822    }
823
824    public void setFamilies(final int families) {
825      this.families = families;
826    }
827
828    public int getCycles() {
829      return this.cycles;
830    }
831
832    public void setCycles(final int cycles) {
833      this.cycles = cycles;
834    }
835
836    public boolean isValueZipf() {
837      return valueZipf;
838    }
839
840    public void setValueZipf(boolean valueZipf) {
841      this.valueZipf = valueZipf;
842    }
843
844    public String getCmdName() {
845      return cmdName;
846    }
847
848    public void setCmdName(String cmdName) {
849      this.cmdName = cmdName;
850    }
851
852    public int getRandomSleep() {
853      return randomSleep;
854    }
855
856    public void setRandomSleep(int randomSleep) {
857      this.randomSleep = randomSleep;
858    }
859
860    public int getReplicas() {
861      return replicas;
862    }
863
864    public void setReplicas(int replicas) {
865      this.replicas = replicas;
866    }
867
868    public String getSplitPolicy() {
869      return splitPolicy;
870    }
871
872    public void setSplitPolicy(String splitPolicy) {
873      this.splitPolicy = splitPolicy;
874    }
875
876    public void setNomapred(boolean nomapred) {
877      this.nomapred = nomapred;
878    }
879
880    public void setFilterAll(boolean filterAll) {
881      this.filterAll = filterAll;
882    }
883
884    public void setStartRow(int startRow) {
885      this.startRow = startRow;
886    }
887
888    public void setSize(float size) {
889      this.size = size;
890    }
891
892    public void setPerClientRunRows(int perClientRunRows) {
893      this.perClientRunRows = perClientRunRows;
894    }
895
896    public void setNumClientThreads(int numClientThreads) {
897      this.numClientThreads = numClientThreads;
898    }
899
900    public void setTotalRows(int totalRows) {
901      this.totalRows = totalRows;
902    }
903
904    public void setSampleRate(float sampleRate) {
905      this.sampleRate = sampleRate;
906    }
907
908    public void setTraceRate(double traceRate) {
909      this.traceRate = traceRate;
910    }
911
912    public void setTableName(String tableName) {
913      this.tableName = tableName;
914    }
915
916    public void setFlushCommits(boolean flushCommits) {
917      this.flushCommits = flushCommits;
918    }
919
920    public void setWriteToWAL(boolean writeToWAL) {
921      this.writeToWAL = writeToWAL;
922    }
923
924    public void setAutoFlush(boolean autoFlush) {
925      this.autoFlush = autoFlush;
926    }
927
928    public void setOneCon(boolean oneCon) {
929      this.oneCon = oneCon;
930    }
931
932    public int getConnCount() {
933      return connCount;
934    }
935
936    public void setConnCount(int connCount) {
937      this.connCount = connCount;
938    }
939
940    public void setUseTags(boolean useTags) {
941      this.useTags = useTags;
942    }
943
944    public void setNoOfTags(int noOfTags) {
945      this.noOfTags = noOfTags;
946    }
947
948    public void setReportLatency(boolean reportLatency) {
949      this.reportLatency = reportLatency;
950    }
951
952    public void setMultiGet(int multiGet) {
953      this.multiGet = multiGet;
954    }
955
956    public void setMultiPut(int multiPut) {
957      this.multiPut = multiPut;
958    }
959
960    public void setInMemoryCF(boolean inMemoryCF) {
961      this.inMemoryCF = inMemoryCF;
962    }
963
964    public void setPresplitRegions(int presplitRegions) {
965      this.presplitRegions = presplitRegions;
966    }
967
968    public void setCompression(Compression.Algorithm compression) {
969      this.compression = compression;
970    }
971
972    public void setEncryption(String encryption) {
973      this.encryption = encryption;
974    }
975
976    public void setBloomType(BloomType bloomType) {
977      this.bloomType = bloomType;
978    }
979
980    public void setBlockSize(int blockSize) {
981      this.blockSize = blockSize;
982    }
983
984    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
985      this.blockEncoding = blockEncoding;
986    }
987
988    public void setValueRandom(boolean valueRandom) {
989      this.valueRandom = valueRandom;
990    }
991
992    public void setValueSize(int valueSize) {
993      this.valueSize = valueSize;
994    }
995
996    public void setBufferSize(long bufferSize) {
997      this.bufferSize = bufferSize;
998    }
999
1000    public void setPeriod(int period) {
1001      this.period = period;
1002    }
1003
1004    public boolean isNomapred() {
1005      return nomapred;
1006    }
1007
1008    public boolean isFilterAll() {
1009      return filterAll;
1010    }
1011
1012    public int getStartRow() {
1013      return startRow;
1014    }
1015
1016    public float getSize() {
1017      return size;
1018    }
1019
1020    public int getPerClientRunRows() {
1021      return perClientRunRows;
1022    }
1023
1024    public int getNumClientThreads() {
1025      return numClientThreads;
1026    }
1027
1028    public int getTotalRows() {
1029      return totalRows;
1030    }
1031
1032    public float getSampleRate() {
1033      return sampleRate;
1034    }
1035
1036    public double getTraceRate() {
1037      return traceRate;
1038    }
1039
1040    public String getTableName() {
1041      return tableName;
1042    }
1043
1044    public boolean isFlushCommits() {
1045      return flushCommits;
1046    }
1047
1048    public boolean isWriteToWAL() {
1049      return writeToWAL;
1050    }
1051
1052    public boolean isAutoFlush() {
1053      return autoFlush;
1054    }
1055
1056    public boolean isUseTags() {
1057      return useTags;
1058    }
1059
1060    public int getNoOfTags() {
1061      return noOfTags;
1062    }
1063
1064    public boolean isReportLatency() {
1065      return reportLatency;
1066    }
1067
1068    public int getMultiGet() {
1069      return multiGet;
1070    }
1071
1072    public int getMultiPut() {
1073      return multiPut;
1074    }
1075
1076    public boolean isInMemoryCF() {
1077      return inMemoryCF;
1078    }
1079
1080    public int getPresplitRegions() {
1081      return presplitRegions;
1082    }
1083
1084    public Compression.Algorithm getCompression() {
1085      return compression;
1086    }
1087
1088    public String getEncryption() {
1089      return encryption;
1090    }
1091
1092    public DataBlockEncoding getBlockEncoding() {
1093      return blockEncoding;
1094    }
1095
1096    public boolean isValueRandom() {
1097      return valueRandom;
1098    }
1099
1100    public int getValueSize() {
1101      return valueSize;
1102    }
1103
1104    public int getPeriod() {
1105      return period;
1106    }
1107
1108    public BloomType getBloomType() {
1109      return bloomType;
1110    }
1111
1112    public int getBlockSize() {
1113      return blockSize;
1114    }
1115
1116    public boolean isOneCon() {
1117      return oneCon;
1118    }
1119
1120    public int getMeasureAfter() {
1121      return measureAfter;
1122    }
1123
1124    public void setMeasureAfter(int measureAfter) {
1125      this.measureAfter = measureAfter;
1126    }
1127
1128    public boolean getAddColumns() {
1129      return addColumns;
1130    }
1131
1132    public void setAddColumns(boolean addColumns) {
1133      this.addColumns = addColumns;
1134    }
1135
1136    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1137      this.inMemoryCompaction = inMemoryCompaction;
1138    }
1139
1140    public MemoryCompactionPolicy getInMemoryCompaction() {
1141      return this.inMemoryCompaction;
1142    }
1143
1144    public long getBufferSize() {
1145      return this.bufferSize;
1146    }
1147  }
1148
1149  /*
1150   * A test. Subclass to particularize what happens per row.
1151   */
1152  static abstract class TestBase {
1153    // Below is make it so when Tests are all running in the one
1154    // jvm, that they each have a differently seeded Random.
1155    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1156
1157    private static long nextRandomSeed() {
1158      return randomSeed.nextLong();
1159    }
1160
1161    private final int everyN;
1162
1163    protected final Random rand = new Random(nextRandomSeed());
1164    protected final Configuration conf;
1165    protected final TestOptions opts;
1166
1167    protected final Status status;
1168
1169    private String testName;
1170    protected Histogram latencyHistogram;
1171    private Histogram replicaLatencyHistogram;
1172    private Histogram valueSizeHistogram;
1173    private Histogram rpcCallsHistogram;
1174    private Histogram remoteRpcCallsHistogram;
1175    private Histogram millisBetweenNextHistogram;
1176    private Histogram regionsScannedHistogram;
1177    private Histogram bytesInResultsHistogram;
1178    private Histogram bytesInRemoteResultsHistogram;
1179    private RandomDistribution.Zipf zipf;
1180    private long numOfReplyOverLatencyThreshold = 0;
1181    private long numOfReplyFromReplica = 0;
1182
1183    /**
1184     * Note that all subclasses of this class must provide a public constructor that has the exact
1185     * same list of arguments.
1186     */
1187    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1188      this.conf = conf;
1189      this.opts = options;
1190      this.status = status;
1191      this.testName = this.getClass().getSimpleName();
1192      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1193      if (options.isValueZipf()) {
1194        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1195      }
1196      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1197    }
1198
1199    int getValueLength(final Random r) {
1200      if (this.opts.isValueRandom()) {
1201        return r.nextInt(opts.valueSize);
1202      } else if (this.opts.isValueZipf()) {
1203        return Math.abs(this.zipf.nextInt());
1204      } else {
1205        return opts.valueSize;
1206      }
1207    }
1208
1209    void updateValueSize(final Result[] rs) throws IOException {
1210      updateValueSize(rs, 0);
1211    }
1212
1213    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1214      if (rs == null || (latency == 0)) return;
1215      for (Result r : rs)
1216        updateValueSize(r, latency);
1217    }
1218
1219    void updateValueSize(final Result r) throws IOException {
1220      updateValueSize(r, 0);
1221    }
1222
1223    void updateValueSize(final Result r, final long latency) throws IOException {
1224      if (r == null || (latency == 0)) return;
1225      int size = 0;
1226      // update replicaHistogram
1227      if (r.isStale()) {
1228        replicaLatencyHistogram.update(latency / 1000);
1229        numOfReplyFromReplica++;
1230      }
1231      if (!isRandomValueSize()) return;
1232
1233      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1234        size += scanner.current().getValueLength();
1235      }
1236      updateValueSize(size);
1237    }
1238
1239    void updateValueSize(final int valueSize) {
1240      if (!isRandomValueSize()) return;
1241      this.valueSizeHistogram.update(valueSize);
1242    }
1243
1244    void updateScanMetrics(final ScanMetrics metrics) {
1245      if (metrics == null) return;
1246      Map<String, Long> metricsMap = metrics.getMetricsMap();
1247      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1248      if (rpcCalls != null) {
1249        this.rpcCallsHistogram.update(rpcCalls.longValue());
1250      }
1251      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1252      if (remoteRpcCalls != null) {
1253        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1254      }
1255      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1256      if (millisBetweenNext != null) {
1257        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1258      }
1259      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1260      if (regionsScanned != null) {
1261        this.regionsScannedHistogram.update(regionsScanned.longValue());
1262      }
1263      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1264      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1265        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1266      }
1267      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1268      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1269        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1270      }
1271    }
1272
1273    String generateStatus(final int sr, final int i, final int lr) {
1274      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1275        + getShortLatencyReport() + "]"
1276        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1277    }
1278
1279    boolean isRandomValueSize() {
1280      return opts.valueRandom;
1281    }
1282
1283    protected int getReportingPeriod() {
1284      return opts.period;
1285    }
1286
1287    /**
1288     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1289     */
1290    public Histogram getLatencyHistogram() {
1291      return latencyHistogram;
1292    }
1293
1294    void testSetup() throws IOException {
1295      // test metrics
1296      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1297      // If it is a replica test, set up histogram for replica.
1298      if (opts.replicas > 1) {
1299        replicaLatencyHistogram =
1300          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1301      }
1302      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1303      // scan metrics
1304      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1305      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1306      millisBetweenNextHistogram =
1307        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1308      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1309      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1310      bytesInRemoteResultsHistogram =
1311        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1312
1313      onStartup();
1314    }
1315
1316    abstract void onStartup() throws IOException;
1317
1318    void testTakedown() throws IOException {
1319      onTakedown();
1320      // Print all stats for this thread continuously.
1321      // Synchronize on Test.class so different threads don't intermingle the
1322      // output. We can't use 'this' here because each thread has its own instance of Test class.
1323      synchronized (Test.class) {
1324        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1325        status
1326          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1327        if (opts.replicas > 1) {
1328          status.setStatus("Latency (us) from Replica Regions: "
1329            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1330        }
1331        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1332        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1333        if (valueSizeHistogram.getCount() > 0) {
1334          status.setStatus(
1335            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1336          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1337          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1338        } else {
1339          status.setStatus("No valueSize statistics available");
1340        }
1341        if (rpcCallsHistogram.getCount() > 0) {
1342          status.setStatus(
1343            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1344        }
1345        if (remoteRpcCallsHistogram.getCount() > 0) {
1346          status.setStatus("remoteRpcCalls (count): "
1347            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1348        }
1349        if (millisBetweenNextHistogram.getCount() > 0) {
1350          status.setStatus("millisBetweenNext (latency): "
1351            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1352        }
1353        if (regionsScannedHistogram.getCount() > 0) {
1354          status.setStatus("regionsScanned (count): "
1355            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1356        }
1357        if (bytesInResultsHistogram.getCount() > 0) {
1358          status.setStatus("bytesInResults (size): "
1359            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1360        }
1361        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1362          status.setStatus("bytesInRemoteResults (size): "
1363            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1364        }
1365      }
1366    }
1367
1368    abstract void onTakedown() throws IOException;
1369
1370    /*
1371     * Run test
1372     * @return Elapsed time.
1373     */
1374    long test() throws IOException, InterruptedException {
1375      testSetup();
1376      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1377      final long startTime = System.nanoTime();
1378      try {
1379        testTimed();
1380      } finally {
1381        testTakedown();
1382      }
1383      return (System.nanoTime() - startTime) / 1000000;
1384    }
1385
1386    int getStartRow() {
1387      return opts.startRow;
1388    }
1389
1390    int getLastRow() {
1391      return getStartRow() + opts.perClientRunRows;
1392    }
1393
1394    /**
1395     * Provides an extension point for tests that don't want a per row invocation.
1396     */
1397    void testTimed() throws IOException, InterruptedException {
1398      int startRow = getStartRow();
1399      int lastRow = getLastRow();
1400      // Report on completion of 1/10th of total.
1401      for (int ii = 0; ii < opts.cycles; ii++) {
1402        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1403        for (int i = startRow; i < lastRow; i++) {
1404          if (i % everyN != 0) continue;
1405          long startTime = System.nanoTime();
1406          boolean requestSent = false;
1407          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1408          try (Scope scope = span.makeCurrent()) {
1409            requestSent = testRow(i, startTime);
1410          } finally {
1411            span.end();
1412          }
1413          if ((i - startRow) > opts.measureAfter) {
1414            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1415            // first 9 times and sends the actual get request in the 10th iteration.
1416            // We should only set latency when actual request is sent because otherwise
1417            // it turns out to be 0.
1418            if (requestSent) {
1419              long latency = (System.nanoTime() - startTime) / 1000;
1420              latencyHistogram.update(latency);
1421              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1422                numOfReplyOverLatencyThreshold++;
1423              }
1424            }
1425            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1426              status.setStatus(generateStatus(startRow, i, lastRow));
1427            }
1428          }
1429        }
1430      }
1431    }
1432
1433    /** Returns Subset of the histograms' calculation. */
1434    public String getShortLatencyReport() {
1435      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1436    }
1437
1438    /** Returns Subset of the histograms' calculation. */
1439    public String getShortValueSizeReport() {
1440      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1441    }
1442
1443    /**
1444     * Test for individual row.
1445     * @param i Row index.
1446     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1447     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1448     */
1449    abstract boolean testRow(final int i, final long startTime)
1450      throws IOException, InterruptedException;
1451  }
1452
1453  static abstract class Test extends TestBase {
1454    protected Connection connection;
1455
1456    Test(final Connection con, final TestOptions options, final Status status) {
1457      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1458      this.connection = con;
1459    }
1460  }
1461
1462  static abstract class AsyncTest extends TestBase {
1463    protected AsyncConnection connection;
1464
1465    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1466      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1467      this.connection = con;
1468    }
1469  }
1470
1471  static abstract class TableTest extends Test {
1472    protected Table table;
1473
1474    TableTest(Connection con, TestOptions options, Status status) {
1475      super(con, options, status);
1476    }
1477
1478    @Override
1479    void onStartup() throws IOException {
1480      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1481    }
1482
1483    @Override
1484    void onTakedown() throws IOException {
1485      table.close();
1486    }
1487  }
1488
1489  /*
1490   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1491   */
1492  static abstract class MetaTest extends TableTest {
1493    protected int keyLength;
1494
1495    MetaTest(Connection con, TestOptions options, Status status) {
1496      super(con, options, status);
1497      keyLength = Integer.toString(opts.perClientRunRows).length();
1498    }
1499
1500    @Override
1501    void onTakedown() throws IOException {
1502      // No clean up
1503    }
1504
1505    /*
1506     * Generates Lexicographically ascending strings
1507     */
1508    protected byte[] getSplitKey(final int i) {
1509      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1510    }
1511
1512  }
1513
1514  static abstract class AsyncTableTest extends AsyncTest {
1515    protected AsyncTable<?> table;
1516
1517    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1518      super(con, options, status);
1519    }
1520
1521    @Override
1522    void onStartup() throws IOException {
1523      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1524    }
1525
1526    @Override
1527    void onTakedown() throws IOException {
1528    }
1529  }
1530
1531  static class AsyncRandomReadTest extends AsyncTableTest {
1532    private final Consistency consistency;
1533    private ArrayList<Get> gets;
1534
1535    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1536      super(con, options, status);
1537      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1538      if (opts.multiGet > 0) {
1539        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1540        this.gets = new ArrayList<>(opts.multiGet);
1541      }
1542    }
1543
1544    @Override
1545    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1546      if (opts.randomSleep > 0) {
1547        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1548      }
1549      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1550      for (int family = 0; family < opts.families; family++) {
1551        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1552        if (opts.addColumns) {
1553          for (int column = 0; column < opts.columns; column++) {
1554            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1555            get.addColumn(familyName, qualifier);
1556          }
1557        } else {
1558          get.addFamily(familyName);
1559        }
1560      }
1561      if (opts.filterAll) {
1562        get.setFilter(new FilterAllFilter());
1563      }
1564      get.setConsistency(consistency);
1565      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1566      try {
1567        if (opts.multiGet > 0) {
1568          this.gets.add(get);
1569          if (this.gets.size() == opts.multiGet) {
1570            Result[] rs =
1571              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1572            updateValueSize(rs);
1573            this.gets.clear();
1574          } else {
1575            return false;
1576          }
1577        } else {
1578          updateValueSize(this.table.get(get).get());
1579        }
1580      } catch (ExecutionException e) {
1581        throw new IOException(e);
1582      }
1583      return true;
1584    }
1585
1586    public static RuntimeException runtime(Throwable e) {
1587      if (e instanceof RuntimeException) {
1588        return (RuntimeException) e;
1589      }
1590      return new RuntimeException(e);
1591    }
1592
1593    public static <V> V propagate(Callable<V> callable) {
1594      try {
1595        return callable.call();
1596      } catch (Exception e) {
1597        throw runtime(e);
1598      }
1599    }
1600
1601    @Override
1602    protected int getReportingPeriod() {
1603      int period = opts.perClientRunRows / 10;
1604      return period == 0 ? opts.perClientRunRows : period;
1605    }
1606
1607    @Override
1608    protected void testTakedown() throws IOException {
1609      if (this.gets != null && this.gets.size() > 0) {
1610        this.table.get(gets);
1611        this.gets.clear();
1612      }
1613      super.testTakedown();
1614    }
1615  }
1616
1617  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1618
1619    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1620      super(con, options, status);
1621    }
1622
1623    @Override
1624    protected byte[] generateRow(final int i) {
1625      return getRandomRow(this.rand, opts.totalRows);
1626    }
1627  }
1628
1629  static class AsyncScanTest extends AsyncTableTest {
1630    private ResultScanner testScanner;
1631    private AsyncTable<?> asyncTable;
1632
1633    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1634      super(con, options, status);
1635    }
1636
1637    @Override
1638    void onStartup() throws IOException {
1639      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1640        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1641    }
1642
1643    @Override
1644    void testTakedown() throws IOException {
1645      if (this.testScanner != null) {
1646        updateScanMetrics(this.testScanner.getScanMetrics());
1647        this.testScanner.close();
1648      }
1649      super.testTakedown();
1650    }
1651
1652    @Override
1653    boolean testRow(final int i, final long startTime) throws IOException {
1654      if (this.testScanner == null) {
1655        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1656          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1657          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1658        for (int family = 0; family < opts.families; family++) {
1659          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1660          if (opts.addColumns) {
1661            for (int column = 0; column < opts.columns; column++) {
1662              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1663              scan.addColumn(familyName, qualifier);
1664            }
1665          } else {
1666            scan.addFamily(familyName);
1667          }
1668        }
1669        if (opts.filterAll) {
1670          scan.setFilter(new FilterAllFilter());
1671        }
1672        this.testScanner = asyncTable.getScanner(scan);
1673      }
1674      Result r = testScanner.next();
1675      updateValueSize(r);
1676      return true;
1677    }
1678  }
1679
1680  static class AsyncSequentialReadTest extends AsyncTableTest {
1681    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1682      super(con, options, status);
1683    }
1684
1685    @Override
1686    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1687      Get get = new Get(format(i));
1688      for (int family = 0; family < opts.families; family++) {
1689        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1690        if (opts.addColumns) {
1691          for (int column = 0; column < opts.columns; column++) {
1692            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1693            get.addColumn(familyName, qualifier);
1694          }
1695        } else {
1696          get.addFamily(familyName);
1697        }
1698      }
1699      if (opts.filterAll) {
1700        get.setFilter(new FilterAllFilter());
1701      }
1702      try {
1703        updateValueSize(table.get(get).get());
1704      } catch (ExecutionException e) {
1705        throw new IOException(e);
1706      }
1707      return true;
1708    }
1709  }
1710
1711  static class AsyncSequentialWriteTest extends AsyncTableTest {
1712    private ArrayList<Put> puts;
1713
1714    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1715      super(con, options, status);
1716      if (opts.multiPut > 0) {
1717        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1718        this.puts = new ArrayList<>(opts.multiPut);
1719      }
1720    }
1721
1722    protected byte[] generateRow(final int i) {
1723      return format(i);
1724    }
1725
1726    @Override
1727    @SuppressWarnings("ReturnValueIgnored")
1728    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1729      byte[] row = generateRow(i);
1730      Put put = new Put(row);
1731      for (int family = 0; family < opts.families; family++) {
1732        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1733        for (int column = 0; column < opts.columns; column++) {
1734          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1735          byte[] value = generateData(this.rand, getValueLength(this.rand));
1736          if (opts.useTags) {
1737            byte[] tag = generateData(this.rand, TAG_LENGTH);
1738            Tag[] tags = new Tag[opts.noOfTags];
1739            for (int n = 0; n < opts.noOfTags; n++) {
1740              Tag t = new ArrayBackedTag((byte) n, tag);
1741              tags[n] = t;
1742            }
1743            KeyValue kv =
1744              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1745            put.add(kv);
1746            updateValueSize(kv.getValueLength());
1747          } else {
1748            put.addColumn(familyName, qualifier, value);
1749            updateValueSize(value.length);
1750          }
1751        }
1752      }
1753      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1754      try {
1755        table.put(put).get();
1756        if (opts.multiPut > 0) {
1757          this.puts.add(put);
1758          if (this.puts.size() == opts.multiPut) {
1759            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1760            this.puts.clear();
1761          } else {
1762            return false;
1763          }
1764        } else {
1765          table.put(put).get();
1766        }
1767      } catch (ExecutionException e) {
1768        throw new IOException(e);
1769      }
1770      return true;
1771    }
1772  }
1773
1774  static abstract class BufferedMutatorTest extends Test {
1775    protected BufferedMutator mutator;
1776    protected Table table;
1777
1778    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1779      super(con, options, status);
1780    }
1781
1782    @Override
1783    void onStartup() throws IOException {
1784      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1785      p.writeBufferSize(opts.bufferSize);
1786      this.mutator = connection.getBufferedMutator(p);
1787      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1788    }
1789
1790    @Override
1791    void onTakedown() throws IOException {
1792      mutator.close();
1793      table.close();
1794    }
1795  }
1796
1797  static class RandomSeekScanTest extends TableTest {
1798    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1799      super(con, options, status);
1800    }
1801
1802    @Override
1803    boolean testRow(final int i, final long startTime) throws IOException {
1804      Scan scan =
1805        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1806          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1807          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1808      FilterList list = new FilterList();
1809      for (int family = 0; family < opts.families; family++) {
1810        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1811        if (opts.addColumns) {
1812          for (int column = 0; column < opts.columns; column++) {
1813            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1814            scan.addColumn(familyName, qualifier);
1815          }
1816        } else {
1817          scan.addFamily(familyName);
1818        }
1819      }
1820      if (opts.filterAll) {
1821        list.addFilter(new FilterAllFilter());
1822      }
1823      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1824      scan.setFilter(list);
1825      ResultScanner s = this.table.getScanner(scan);
1826      try {
1827        for (Result rr; (rr = s.next()) != null;) {
1828          updateValueSize(rr);
1829        }
1830      } finally {
1831        updateScanMetrics(s.getScanMetrics());
1832        s.close();
1833      }
1834      return true;
1835    }
1836
1837    @Override
1838    protected int getReportingPeriod() {
1839      int period = opts.perClientRunRows / 100;
1840      return period == 0 ? opts.perClientRunRows : period;
1841    }
1842
1843  }
1844
1845  static abstract class RandomScanWithRangeTest extends TableTest {
1846    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1847      super(con, options, status);
1848    }
1849
1850    @Override
1851    boolean testRow(final int i, final long startTime) throws IOException {
1852      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1853      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1854        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1855        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1856        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1857      for (int family = 0; family < opts.families; family++) {
1858        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1859        if (opts.addColumns) {
1860          for (int column = 0; column < opts.columns; column++) {
1861            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1862            scan.addColumn(familyName, qualifier);
1863          }
1864        } else {
1865          scan.addFamily(familyName);
1866        }
1867      }
1868      if (opts.filterAll) {
1869        scan.setFilter(new FilterAllFilter());
1870      }
1871      Result r = null;
1872      int count = 0;
1873      ResultScanner s = this.table.getScanner(scan);
1874      try {
1875        for (; (r = s.next()) != null;) {
1876          updateValueSize(r);
1877          count++;
1878        }
1879        if (i % 100 == 0) {
1880          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1881            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1882            count));
1883        }
1884      } finally {
1885        updateScanMetrics(s.getScanMetrics());
1886        s.close();
1887      }
1888      return true;
1889    }
1890
1891    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1892
1893    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1894      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1895      int stop = start + maxRange;
1896      return new Pair<>(format(start), format(stop));
1897    }
1898
1899    @Override
1900    protected int getReportingPeriod() {
1901      int period = opts.perClientRunRows / 100;
1902      return period == 0 ? opts.perClientRunRows : period;
1903    }
1904  }
1905
1906  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1907    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1908      super(con, options, status);
1909    }
1910
1911    @Override
1912    protected Pair<byte[], byte[]> getStartAndStopRow() {
1913      return generateStartAndStopRows(10);
1914    }
1915  }
1916
1917  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1918    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1919      super(con, options, status);
1920    }
1921
1922    @Override
1923    protected Pair<byte[], byte[]> getStartAndStopRow() {
1924      return generateStartAndStopRows(100);
1925    }
1926  }
1927
1928  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1929    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1930      super(con, options, status);
1931    }
1932
1933    @Override
1934    protected Pair<byte[], byte[]> getStartAndStopRow() {
1935      return generateStartAndStopRows(1000);
1936    }
1937  }
1938
1939  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1940    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1941      super(con, options, status);
1942    }
1943
1944    @Override
1945    protected Pair<byte[], byte[]> getStartAndStopRow() {
1946      return generateStartAndStopRows(10000);
1947    }
1948  }
1949
1950  static class RandomReadTest extends TableTest {
1951    private final Consistency consistency;
1952    private ArrayList<Get> gets;
1953
1954    RandomReadTest(Connection con, TestOptions options, Status status) {
1955      super(con, options, status);
1956      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1957      if (opts.multiGet > 0) {
1958        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1959        this.gets = new ArrayList<>(opts.multiGet);
1960      }
1961    }
1962
1963    @Override
1964    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1965      if (opts.randomSleep > 0) {
1966        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1967      }
1968      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1969      for (int family = 0; family < opts.families; family++) {
1970        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1971        if (opts.addColumns) {
1972          for (int column = 0; column < opts.columns; column++) {
1973            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1974            get.addColumn(familyName, qualifier);
1975          }
1976        } else {
1977          get.addFamily(familyName);
1978        }
1979      }
1980      if (opts.filterAll) {
1981        get.setFilter(new FilterAllFilter());
1982      }
1983      get.setConsistency(consistency);
1984      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1985      if (opts.multiGet > 0) {
1986        this.gets.add(get);
1987        if (this.gets.size() == opts.multiGet) {
1988          Result[] rs = this.table.get(this.gets);
1989          if (opts.replicas > 1) {
1990            long latency = System.nanoTime() - startTime;
1991            updateValueSize(rs, latency);
1992          } else {
1993            updateValueSize(rs);
1994          }
1995          this.gets.clear();
1996        } else {
1997          return false;
1998        }
1999      } else {
2000        if (opts.replicas > 1) {
2001          Result r = this.table.get(get);
2002          long latency = System.nanoTime() - startTime;
2003          updateValueSize(r, latency);
2004        } else {
2005          updateValueSize(this.table.get(get));
2006        }
2007      }
2008      return true;
2009    }
2010
2011    @Override
2012    protected int getReportingPeriod() {
2013      int period = opts.perClientRunRows / 10;
2014      return period == 0 ? opts.perClientRunRows : period;
2015    }
2016
2017    @Override
2018    protected void testTakedown() throws IOException {
2019      if (this.gets != null && this.gets.size() > 0) {
2020        this.table.get(gets);
2021        this.gets.clear();
2022      }
2023      super.testTakedown();
2024    }
2025  }
2026
2027  /*
2028   * Send random reads against fake regions inserted by MetaWriteTest
2029   */
2030  static class MetaRandomReadTest extends MetaTest {
2031    private RegionLocator regionLocator;
2032
2033    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2034      super(con, options, status);
2035      LOG.info("call getRegionLocation");
2036    }
2037
2038    @Override
2039    void onStartup() throws IOException {
2040      super.onStartup();
2041      this.regionLocator = connection.getRegionLocator(table.getName());
2042    }
2043
2044    @Override
2045    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2046      if (opts.randomSleep > 0) {
2047        Thread.sleep(rand.nextInt(opts.randomSleep));
2048      }
2049      HRegionLocation hRegionLocation =
2050        regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true);
2051      LOG.debug("get location for region: " + hRegionLocation);
2052      return true;
2053    }
2054
2055    @Override
2056    protected int getReportingPeriod() {
2057      int period = opts.perClientRunRows / 10;
2058      return period == 0 ? opts.perClientRunRows : period;
2059    }
2060
2061    @Override
2062    protected void testTakedown() throws IOException {
2063      super.testTakedown();
2064    }
2065  }
2066
2067  static class RandomWriteTest extends SequentialWriteTest {
2068    RandomWriteTest(Connection con, TestOptions options, Status status) {
2069      super(con, options, status);
2070    }
2071
2072    @Override
2073    protected byte[] generateRow(final int i) {
2074      return getRandomRow(this.rand, opts.totalRows);
2075    }
2076
2077  }
2078
2079  static class RandomDeleteTest extends SequentialDeleteTest {
2080    RandomDeleteTest(Connection con, TestOptions options, Status status) {
2081      super(con, options, status);
2082    }
2083
2084    @Override
2085    protected byte[] generateRow(final int i) {
2086      return getRandomRow(this.rand, opts.totalRows);
2087    }
2088
2089  }
2090
2091  static class ScanTest extends TableTest {
2092    private ResultScanner testScanner;
2093
2094    ScanTest(Connection con, TestOptions options, Status status) {
2095      super(con, options, status);
2096    }
2097
2098    @Override
2099    void testTakedown() throws IOException {
2100      if (this.testScanner != null) {
2101        this.testScanner.close();
2102      }
2103      super.testTakedown();
2104    }
2105
2106    @Override
2107    boolean testRow(final int i, final long startTime) throws IOException {
2108      if (this.testScanner == null) {
2109        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2110          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2111          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2112        for (int family = 0; family < opts.families; family++) {
2113          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2114          if (opts.addColumns) {
2115            for (int column = 0; column < opts.columns; column++) {
2116              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2117              scan.addColumn(familyName, qualifier);
2118            }
2119          } else {
2120            scan.addFamily(familyName);
2121          }
2122        }
2123        if (opts.filterAll) {
2124          scan.setFilter(new FilterAllFilter());
2125        }
2126        this.testScanner = table.getScanner(scan);
2127      }
2128      Result r = testScanner.next();
2129      updateValueSize(r);
2130      return true;
2131    }
2132  }
2133
2134  static class ReverseScanTest extends TableTest {
2135    private ResultScanner testScanner;
2136
2137    ReverseScanTest(Connection con, TestOptions options, Status status) {
2138      super(con, options, status);
2139    }
2140
2141    @Override
2142    void testTakedown() throws IOException {
2143      if (this.testScanner != null) {
2144        this.testScanner.close();
2145      }
2146      super.testTakedown();
2147    }
2148
2149    @Override
2150    boolean testRow(final int i, final long startTime) throws IOException {
2151      if (this.testScanner == null) {
2152        Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2153          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2154          .setScanMetricsEnabled(true).setReversed(true);
2155        for (int family = 0; family < opts.families; family++) {
2156          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2157          if (opts.addColumns) {
2158            for (int column = 0; column < opts.columns; column++) {
2159              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2160              scan.addColumn(familyName, qualifier);
2161            }
2162          } else {
2163            scan.addFamily(familyName);
2164          }
2165        }
2166        if (opts.filterAll) {
2167          scan.setFilter(new FilterAllFilter());
2168        }
2169        this.testScanner = table.getScanner(scan);
2170      }
2171      Result r = testScanner.next();
2172      updateValueSize(r);
2173      return true;
2174    }
2175  }
2176
2177  /**
2178   * Base class for operations that are CAS-like; that read a value and then set it based off what
2179   * they read. In this category is increment, append, checkAndPut, etc.
2180   * <p>
2181   * These operations also want some concurrency going on. Usually when these tests run, they
2182   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2183   * same key space. We do this with our getStartRow and getLastRow overrides.
2184   */
2185  static abstract class CASTableTest extends TableTest {
2186    private final byte[] qualifier;
2187
2188    CASTableTest(Connection con, TestOptions options, Status status) {
2189      super(con, options, status);
2190      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2191    }
2192
2193    byte[] getQualifier() {
2194      return this.qualifier;
2195    }
2196
2197    @Override
2198    int getStartRow() {
2199      return 0;
2200    }
2201
2202    @Override
2203    int getLastRow() {
2204      return opts.perClientRunRows;
2205    }
2206  }
2207
2208  static class IncrementTest extends CASTableTest {
2209    IncrementTest(Connection con, TestOptions options, Status status) {
2210      super(con, options, status);
2211    }
2212
2213    @Override
2214    boolean testRow(final int i, final long startTime) throws IOException {
2215      Increment increment = new Increment(format(i));
2216      // unlike checkAndXXX tests, which make most sense to do on a single value,
2217      // if multiple families are specified for an increment test we assume it is
2218      // meant to raise the work factor
2219      for (int family = 0; family < opts.families; family++) {
2220        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2221        increment.addColumn(familyName, getQualifier(), 1l);
2222      }
2223      updateValueSize(this.table.increment(increment));
2224      return true;
2225    }
2226  }
2227
2228  static class AppendTest extends CASTableTest {
2229    AppendTest(Connection con, TestOptions options, Status status) {
2230      super(con, options, status);
2231    }
2232
2233    @Override
2234    boolean testRow(final int i, final long startTime) throws IOException {
2235      byte[] bytes = format(i);
2236      Append append = new Append(bytes);
2237      // unlike checkAndXXX tests, which make most sense to do on a single value,
2238      // if multiple families are specified for an append test we assume it is
2239      // meant to raise the work factor
2240      for (int family = 0; family < opts.families; family++) {
2241        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2242        append.addColumn(familyName, getQualifier(), bytes);
2243      }
2244      updateValueSize(this.table.append(append));
2245      return true;
2246    }
2247  }
2248
2249  static class CheckAndMutateTest extends CASTableTest {
2250    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2251      super(con, options, status);
2252    }
2253
2254    @Override
2255    boolean testRow(final int i, final long startTime) throws IOException {
2256      final byte[] bytes = format(i);
2257      // checkAndXXX tests operate on only a single value
2258      // Put a known value so when we go to check it, it is there.
2259      Put put = new Put(bytes);
2260      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2261      this.table.put(put);
2262      RowMutations mutations = new RowMutations(bytes);
2263      mutations.add(put);
2264      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2265        .thenMutate(mutations);
2266      return true;
2267    }
2268  }
2269
2270  static class CheckAndPutTest extends CASTableTest {
2271    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2272      super(con, options, status);
2273    }
2274
2275    @Override
2276    boolean testRow(final int i, final long startTime) throws IOException {
2277      final byte[] bytes = format(i);
2278      // checkAndXXX tests operate on only a single value
2279      // Put a known value so when we go to check it, it is there.
2280      Put put = new Put(bytes);
2281      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2282      this.table.put(put);
2283      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2284        .thenPut(put);
2285      return true;
2286    }
2287  }
2288
2289  static class CheckAndDeleteTest extends CASTableTest {
2290    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2291      super(con, options, status);
2292    }
2293
2294    @Override
2295    boolean testRow(final int i, final long startTime) throws IOException {
2296      final byte[] bytes = format(i);
2297      // checkAndXXX tests operate on only a single value
2298      // Put a known value so when we go to check it, it is there.
2299      Put put = new Put(bytes);
2300      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2301      this.table.put(put);
2302      Delete delete = new Delete(put.getRow());
2303      delete.addColumn(FAMILY_ZERO, getQualifier());
2304      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2305        .thenDelete(delete);
2306      return true;
2307    }
2308  }
2309
2310  /*
2311   * Delete all fake regions inserted to meta table by MetaWriteTest.
2312   */
2313  static class CleanMetaTest extends MetaTest {
2314    CleanMetaTest(Connection con, TestOptions options, Status status) {
2315      super(con, options, status);
2316    }
2317
2318    @Override
2319    boolean testRow(final int i, final long startTime) throws IOException {
2320      try {
2321        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2322          .getRegionLocation(getSplitKey(i), false).getRegion();
2323        LOG.debug("deleting region from meta: " + regionInfo);
2324
2325        Delete delete =
2326          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2327        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2328          t.delete(delete);
2329        }
2330      } catch (IOException ie) {
2331        // Log and continue
2332        LOG.error("cannot find region with start key: " + i);
2333      }
2334      return true;
2335    }
2336  }
2337
2338  static class SequentialReadTest extends TableTest {
2339    SequentialReadTest(Connection con, TestOptions options, Status status) {
2340      super(con, options, status);
2341    }
2342
2343    @Override
2344    boolean testRow(final int i, final long startTime) throws IOException {
2345      Get get = new Get(format(i));
2346      for (int family = 0; family < opts.families; family++) {
2347        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2348        if (opts.addColumns) {
2349          for (int column = 0; column < opts.columns; column++) {
2350            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2351            get.addColumn(familyName, qualifier);
2352          }
2353        } else {
2354          get.addFamily(familyName);
2355        }
2356      }
2357      if (opts.filterAll) {
2358        get.setFilter(new FilterAllFilter());
2359      }
2360      updateValueSize(table.get(get));
2361      return true;
2362    }
2363  }
2364
2365  static class SequentialWriteTest extends BufferedMutatorTest {
2366    private ArrayList<Put> puts;
2367
2368    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2369      super(con, options, status);
2370      if (opts.multiPut > 0) {
2371        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2372        this.puts = new ArrayList<>(opts.multiPut);
2373      }
2374    }
2375
2376    protected byte[] generateRow(final int i) {
2377      return format(i);
2378    }
2379
2380    @Override
2381    boolean testRow(final int i, final long startTime) throws IOException {
2382      byte[] row = generateRow(i);
2383      Put put = new Put(row);
2384      for (int family = 0; family < opts.families; family++) {
2385        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2386        for (int column = 0; column < opts.columns; column++) {
2387          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2388          byte[] value = generateData(this.rand, getValueLength(this.rand));
2389          if (opts.useTags) {
2390            byte[] tag = generateData(this.rand, TAG_LENGTH);
2391            Tag[] tags = new Tag[opts.noOfTags];
2392            for (int n = 0; n < opts.noOfTags; n++) {
2393              Tag t = new ArrayBackedTag((byte) n, tag);
2394              tags[n] = t;
2395            }
2396            KeyValue kv =
2397              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2398            put.add(kv);
2399            updateValueSize(kv.getValueLength());
2400          } else {
2401            put.addColumn(familyName, qualifier, value);
2402            updateValueSize(value.length);
2403          }
2404        }
2405      }
2406      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2407      if (opts.autoFlush) {
2408        if (opts.multiPut > 0) {
2409          this.puts.add(put);
2410          if (this.puts.size() == opts.multiPut) {
2411            table.put(this.puts);
2412            this.puts.clear();
2413          } else {
2414            return false;
2415          }
2416        } else {
2417          table.put(put);
2418        }
2419      } else {
2420        mutator.mutate(put);
2421      }
2422      return true;
2423    }
2424  }
2425
2426  static class SequentialDeleteTest extends BufferedMutatorTest {
2427
2428    SequentialDeleteTest(Connection con, TestOptions options, Status status) {
2429      super(con, options, status);
2430    }
2431
2432    protected byte[] generateRow(final int i) {
2433      return format(i);
2434    }
2435
2436    @Override
2437    boolean testRow(final int i, final long startTime) throws IOException {
2438      byte[] row = generateRow(i);
2439      Delete delete = new Delete(row);
2440      for (int family = 0; family < opts.families; family++) {
2441        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2442        delete.addFamily(familyName);
2443      }
2444      delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2445      if (opts.autoFlush) {
2446        table.delete(delete);
2447      } else {
2448        mutator.mutate(delete);
2449      }
2450      return true;
2451    }
2452  }
2453
2454  /*
2455   * Insert fake regions into meta table with contiguous split keys.
2456   */
2457  static class MetaWriteTest extends MetaTest {
2458
2459    MetaWriteTest(Connection con, TestOptions options, Status status) {
2460      super(con, options, status);
2461    }
2462
2463    @Override
2464    boolean testRow(final int i, final long startTime) throws IOException {
2465      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2466      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2467        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2468      regionInfos.add(regionInfo);
2469      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2470
2471      // write the serverName columns
2472      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2473        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2474        EnvironmentEdgeManager.currentTime());
2475      return true;
2476    }
2477  }
2478
2479  static class FilteredScanTest extends TableTest {
2480    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2481
2482    FilteredScanTest(Connection con, TestOptions options, Status status) {
2483      super(con, options, status);
2484      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2485        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2486          + ". This could take a very long time.");
2487      }
2488    }
2489
2490    @Override
2491    boolean testRow(int i, final long startTime) throws IOException {
2492      byte[] value = generateData(this.rand, getValueLength(this.rand));
2493      Scan scan = constructScan(value);
2494      ResultScanner scanner = null;
2495      try {
2496        scanner = this.table.getScanner(scan);
2497        for (Result r = null; (r = scanner.next()) != null;) {
2498          updateValueSize(r);
2499        }
2500      } finally {
2501        if (scanner != null) {
2502          updateScanMetrics(scanner.getScanMetrics());
2503          scanner.close();
2504        }
2505      }
2506      return true;
2507    }
2508
2509    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2510      FilterList list = new FilterList();
2511      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2512        new BinaryComparator(valuePrefix));
2513      list.addFilter(filter);
2514      if (opts.filterAll) {
2515        list.addFilter(new FilterAllFilter());
2516      }
2517      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2518        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2519        .setScanMetricsEnabled(true);
2520      if (opts.addColumns) {
2521        for (int column = 0; column < opts.columns; column++) {
2522          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2523          scan.addColumn(FAMILY_ZERO, qualifier);
2524        }
2525      } else {
2526        scan.addFamily(FAMILY_ZERO);
2527      }
2528      scan.setFilter(list);
2529      return scan;
2530    }
2531  }
2532
2533  /**
2534   * Compute a throughput rate in MB/s.
2535   * @param rows   Number of records consumed.
2536   * @param timeMs Time taken in milliseconds.
2537   * @return String value with label, ie '123.76 MB/s'
2538   */
2539  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2540    int columns) {
2541    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2542      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2543    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2544      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2545    return FMT.format(mbps) + " MB/s";
2546  }
2547
2548  /*
2549   * Format passed integer.
2550   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2551   * absolute in case number is negative).
2552   */
2553  public static byte[] format(final int number) {
2554    byte[] b = new byte[ROW_LENGTH];
2555    int d = Math.abs(number);
2556    for (int i = b.length - 1; i >= 0; i--) {
2557      b[i] = (byte) ((d % 10) + '0');
2558      d /= 10;
2559    }
2560    return b;
2561  }
2562
2563  /*
2564   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2565   * test, generation of the key and value consumes about 30% of CPU time.
2566   * @return Generated random value to insert into a table cell.
2567   */
2568  public static byte[] generateData(final Random r, int length) {
2569    byte[] b = new byte[length];
2570    int i;
2571
2572    for (i = 0; i < (length - 8); i += 8) {
2573      b[i] = (byte) (65 + r.nextInt(26));
2574      b[i + 1] = b[i];
2575      b[i + 2] = b[i];
2576      b[i + 3] = b[i];
2577      b[i + 4] = b[i];
2578      b[i + 5] = b[i];
2579      b[i + 6] = b[i];
2580      b[i + 7] = b[i];
2581    }
2582
2583    byte a = (byte) (65 + r.nextInt(26));
2584    for (; i < length; i++) {
2585      b[i] = a;
2586    }
2587    return b;
2588  }
2589
2590  static byte[] getRandomRow(final Random random, final int totalRows) {
2591    return format(generateRandomRow(random, totalRows));
2592  }
2593
2594  static int generateRandomRow(final Random random, final int totalRows) {
2595    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2596  }
2597
2598  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2599    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2600    throws IOException, InterruptedException {
2601    status.setStatus(
2602      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2603    long totalElapsedTime;
2604
2605    final TestBase t;
2606    try {
2607      if (AsyncTest.class.isAssignableFrom(cmd)) {
2608        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2609        Constructor<? extends AsyncTest> constructor =
2610          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2611        t = constructor.newInstance(asyncCon, opts, status);
2612      } else {
2613        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2614        Constructor<? extends Test> constructor =
2615          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2616        t = constructor.newInstance(con, opts, status);
2617      }
2618    } catch (NoSuchMethodException e) {
2619      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2620        + ".  It does not provide a constructor as described by "
2621        + "the javadoc comment.  Available constructors are: "
2622        + Arrays.toString(cmd.getConstructors()));
2623    } catch (Exception e) {
2624      throw new IllegalStateException("Failed to construct command class", e);
2625    }
2626    totalElapsedTime = t.test();
2627
2628    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2629      + " for " + opts.perClientRunRows + " rows" + " ("
2630      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2631        getAverageValueLength(opts), opts.families, opts.columns)
2632      + ")");
2633
2634    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2635      t.numOfReplyFromReplica, t.getLatencyHistogram());
2636  }
2637
2638  private static int getAverageValueLength(final TestOptions opts) {
2639    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2640  }
2641
2642  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2643    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2644    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2645    // the TestOptions introspection for us and dump the output in a readable format.
2646    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2647    Admin admin = null;
2648    Connection connection = null;
2649    try {
2650      connection = ConnectionFactory.createConnection(getConf());
2651      admin = connection.getAdmin();
2652      checkTable(admin, opts);
2653    } finally {
2654      if (admin != null) admin.close();
2655      if (connection != null) connection.close();
2656    }
2657    if (opts.nomapred) {
2658      doLocalClients(opts, getConf());
2659    } else {
2660      doMapReduce(opts, getConf());
2661    }
2662  }
2663
2664  protected void printUsage() {
2665    printUsage(PE_COMMAND_SHORTNAME, null);
2666  }
2667
2668  protected static void printUsage(final String message) {
2669    printUsage(PE_COMMAND_SHORTNAME, message);
2670  }
2671
2672  protected static void printUsageAndExit(final String message, final int exitCode) {
2673    printUsage(message);
2674    System.exit(exitCode);
2675  }
2676
2677  protected static void printUsage(final String shortName, final String message) {
2678    if (message != null && message.length() > 0) {
2679      System.err.println(message);
2680    }
2681    System.err.print("Usage: hbase " + shortName);
2682    System.err.println("  <OPTIONS> [-D<property=value>]* <command|class> <nclients>");
2683    System.err.println();
2684    System.err.println("General Options:");
2685    System.err.println(
2686      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2687    System.err
2688      .println(" oneCon          all the threads share the same connection. Default: False");
2689    System.err.println(" connCount          connections all threads share. "
2690      + "For example, if set to 2, then all thread share 2 connection. "
2691      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2692      + "if not, connCount=thread number");
2693
2694    System.err.println(" sampleRate      Execute test on a sample of total "
2695      + "rows. Only supported by randomRead. Default: 1.0");
2696    System.err.println(" period          Report every 'period' rows: "
2697      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2698    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2699    System.err.println(
2700      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2701    System.err.println(" latency         Set to report operation latencies. Default: False");
2702    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2703      + "over lantencyThreshold, unit in millisecond, default 0");
2704    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2705      + " rows have been treated. Default: 0");
2706    System.err
2707      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2708    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2709      + "'valueSize'; set on read for stats on size: Default: Not set.");
2710    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2711      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2712    System.err.println();
2713    System.err.println("Table Creation / Write Tests:");
2714    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2715    System.err.println(
2716      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2717        + ".  In case of randomReads and randomSeekScans this could"
2718        + " be specified along with --size to specify the number of rows to be scanned within"
2719        + " the total range specified by the size.");
2720    System.err.println(
2721      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2722        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2723        + " use size to specify the end range and --rows"
2724        + " specifies the number of rows within that range. " + "Default: 1.0.");
2725    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2726    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2727    System.err.println(
2728      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2729    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2730      + "'valueSize' in zipf form: Default: Not set.");
2731    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2732    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2733    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2734      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2735    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2736      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2737      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2738    System.err.println(
2739      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2740    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2741      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2742    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2743    System.err.println(" columns         Columns to write per row. Default: 1");
2744    System.err
2745      .println(" families        Specify number of column families for the table. Default: 1");
2746    System.err.println();
2747    System.err.println("Read Tests:");
2748    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2749      + " there by not returning any thing back to the client.  Helps to check the server side"
2750      + " performance.  Uses FilterAllFilter internally. ");
2751    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2752      + "by randomRead. Default: disabled");
2753    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2754      + "inmemory as far as possible. Not guaranteed that reads are always served "
2755      + "from memory.  Default: false");
2756    System.err
2757      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2758    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2759    System.err
2760      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2761        + "Uses the CompactingMemstore");
2762    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2763    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2764    System.err.println(
2765      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2766    System.err.println(" caching         Scan caching to use. Default: 30");
2767    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2768    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2769    System.err.println(
2770      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2771    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2772    System.err.println();
2773    System.err.println(" Note: -D properties will be applied to the conf used. ");
2774    System.err.println("  For example: ");
2775    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2776    System.err.println("   -Dmapreduce.task.timeout=60000");
2777    System.err.println();
2778    System.err.println("Command:");
2779    for (CmdDescriptor command : COMMANDS.values()) {
2780      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2781    }
2782    System.err.println();
2783    System.err.println("Class:");
2784    System.err.println("To run any custom implementation of PerformanceEvaluation.Test, "
2785      + "provide the classname of the implementaion class in place of "
2786      + "command name and it will be loaded at runtime from classpath.:");
2787    System.err.println("Please consider to contribute back "
2788      + "this custom test impl into a builtin PE command for the benefit of the community");
2789    System.err.println();
2790    System.err.println("Args:");
2791    System.err.println(" nclients        Integer. Required. Total number of clients "
2792      + "(and HRegionServers) running. 1 <= value <= 500");
2793    System.err.println("Examples:");
2794    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2795    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2796    System.err.println(" To run 10 clients doing increments over ten rows:");
2797    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2798  }
2799
2800  /**
2801   * Parse options passed in via an arguments array. Assumes that array has been split on
2802   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2803   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2804   * arguments.
2805   */
2806  static TestOptions parseOpts(Queue<String> args) {
2807    TestOptions opts = new TestOptions();
2808
2809    String cmd = null;
2810    while ((cmd = args.poll()) != null) {
2811      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2812        // place item back onto queue so that caller knows parsing was incomplete
2813        args.add(cmd);
2814        break;
2815      }
2816
2817      final String nmr = "--nomapred";
2818      if (cmd.startsWith(nmr)) {
2819        opts.nomapred = true;
2820        continue;
2821      }
2822
2823      final String rows = "--rows=";
2824      if (cmd.startsWith(rows)) {
2825        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2826        continue;
2827      }
2828
2829      final String cycles = "--cycles=";
2830      if (cmd.startsWith(cycles)) {
2831        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2832        continue;
2833      }
2834
2835      final String sampleRate = "--sampleRate=";
2836      if (cmd.startsWith(sampleRate)) {
2837        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2838        continue;
2839      }
2840
2841      final String table = "--table=";
2842      if (cmd.startsWith(table)) {
2843        opts.tableName = cmd.substring(table.length());
2844        continue;
2845      }
2846
2847      final String startRow = "--startRow=";
2848      if (cmd.startsWith(startRow)) {
2849        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2850        continue;
2851      }
2852
2853      final String compress = "--compress=";
2854      if (cmd.startsWith(compress)) {
2855        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2856        continue;
2857      }
2858
2859      final String encryption = "--encryption=";
2860      if (cmd.startsWith(encryption)) {
2861        opts.encryption = cmd.substring(encryption.length());
2862        continue;
2863      }
2864
2865      final String traceRate = "--traceRate=";
2866      if (cmd.startsWith(traceRate)) {
2867        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2868        continue;
2869      }
2870
2871      final String blockEncoding = "--blockEncoding=";
2872      if (cmd.startsWith(blockEncoding)) {
2873        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2874        continue;
2875      }
2876
2877      final String flushCommits = "--flushCommits=";
2878      if (cmd.startsWith(flushCommits)) {
2879        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2880        continue;
2881      }
2882
2883      final String writeToWAL = "--writeToWAL=";
2884      if (cmd.startsWith(writeToWAL)) {
2885        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2886        continue;
2887      }
2888
2889      final String presplit = "--presplit=";
2890      if (cmd.startsWith(presplit)) {
2891        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2892        continue;
2893      }
2894
2895      final String inMemory = "--inmemory=";
2896      if (cmd.startsWith(inMemory)) {
2897        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2898        continue;
2899      }
2900
2901      final String autoFlush = "--autoFlush=";
2902      if (cmd.startsWith(autoFlush)) {
2903        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2904        continue;
2905      }
2906
2907      final String onceCon = "--oneCon=";
2908      if (cmd.startsWith(onceCon)) {
2909        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2910        continue;
2911      }
2912
2913      final String connCount = "--connCount=";
2914      if (cmd.startsWith(connCount)) {
2915        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2916        continue;
2917      }
2918
2919      final String latencyThreshold = "--latencyThreshold=";
2920      if (cmd.startsWith(latencyThreshold)) {
2921        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2922        continue;
2923      }
2924
2925      final String latency = "--latency";
2926      if (cmd.startsWith(latency)) {
2927        opts.reportLatency = true;
2928        continue;
2929      }
2930
2931      final String multiGet = "--multiGet=";
2932      if (cmd.startsWith(multiGet)) {
2933        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2934        continue;
2935      }
2936
2937      final String multiPut = "--multiPut=";
2938      if (cmd.startsWith(multiPut)) {
2939        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2940        continue;
2941      }
2942
2943      final String useTags = "--usetags=";
2944      if (cmd.startsWith(useTags)) {
2945        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2946        continue;
2947      }
2948
2949      final String noOfTags = "--numoftags=";
2950      if (cmd.startsWith(noOfTags)) {
2951        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2952        continue;
2953      }
2954
2955      final String replicas = "--replicas=";
2956      if (cmd.startsWith(replicas)) {
2957        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2958        continue;
2959      }
2960
2961      final String filterOutAll = "--filterAll";
2962      if (cmd.startsWith(filterOutAll)) {
2963        opts.filterAll = true;
2964        continue;
2965      }
2966
2967      final String size = "--size=";
2968      if (cmd.startsWith(size)) {
2969        opts.size = Float.parseFloat(cmd.substring(size.length()));
2970        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2971        continue;
2972      }
2973
2974      final String splitPolicy = "--splitPolicy=";
2975      if (cmd.startsWith(splitPolicy)) {
2976        opts.splitPolicy = cmd.substring(splitPolicy.length());
2977        continue;
2978      }
2979
2980      final String randomSleep = "--randomSleep=";
2981      if (cmd.startsWith(randomSleep)) {
2982        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2983        continue;
2984      }
2985
2986      final String measureAfter = "--measureAfter=";
2987      if (cmd.startsWith(measureAfter)) {
2988        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2989        continue;
2990      }
2991
2992      final String bloomFilter = "--bloomFilter=";
2993      if (cmd.startsWith(bloomFilter)) {
2994        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2995        continue;
2996      }
2997
2998      final String blockSize = "--blockSize=";
2999      if (cmd.startsWith(blockSize)) {
3000        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
3001        continue;
3002      }
3003
3004      final String valueSize = "--valueSize=";
3005      if (cmd.startsWith(valueSize)) {
3006        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
3007        continue;
3008      }
3009
3010      final String valueRandom = "--valueRandom";
3011      if (cmd.startsWith(valueRandom)) {
3012        opts.valueRandom = true;
3013        continue;
3014      }
3015
3016      final String valueZipf = "--valueZipf";
3017      if (cmd.startsWith(valueZipf)) {
3018        opts.valueZipf = true;
3019        continue;
3020      }
3021
3022      final String period = "--period=";
3023      if (cmd.startsWith(period)) {
3024        opts.period = Integer.parseInt(cmd.substring(period.length()));
3025        continue;
3026      }
3027
3028      final String addColumns = "--addColumns=";
3029      if (cmd.startsWith(addColumns)) {
3030        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
3031        continue;
3032      }
3033
3034      final String inMemoryCompaction = "--inmemoryCompaction=";
3035      if (cmd.startsWith(inMemoryCompaction)) {
3036        opts.inMemoryCompaction =
3037          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
3038        continue;
3039      }
3040
3041      final String columns = "--columns=";
3042      if (cmd.startsWith(columns)) {
3043        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
3044        continue;
3045      }
3046
3047      final String families = "--families=";
3048      if (cmd.startsWith(families)) {
3049        opts.families = Integer.parseInt(cmd.substring(families.length()));
3050        continue;
3051      }
3052
3053      final String caching = "--caching=";
3054      if (cmd.startsWith(caching)) {
3055        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
3056        continue;
3057      }
3058
3059      final String asyncPrefetch = "--asyncPrefetch";
3060      if (cmd.startsWith(asyncPrefetch)) {
3061        opts.asyncPrefetch = true;
3062        continue;
3063      }
3064
3065      final String cacheBlocks = "--cacheBlocks=";
3066      if (cmd.startsWith(cacheBlocks)) {
3067        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
3068        continue;
3069      }
3070
3071      final String scanReadType = "--scanReadType=";
3072      if (cmd.startsWith(scanReadType)) {
3073        opts.scanReadType =
3074          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
3075        continue;
3076      }
3077
3078      final String bufferSize = "--bufferSize=";
3079      if (cmd.startsWith(bufferSize)) {
3080        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
3081        continue;
3082      }
3083
3084      final String commandPropertiesFile = "--commandPropertiesFile=";
3085      if (cmd.startsWith(commandPropertiesFile)) {
3086        String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length()));
3087        Properties properties = new Properties();
3088        try {
3089          properties
3090            .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName));
3091          opts.commandProperties = properties;
3092        } catch (IOException e) {
3093          LOG.error("Failed to load metricIds from properties file", e);
3094        }
3095        continue;
3096      }
3097
3098      validateParsedOpts(opts);
3099
3100      if (isCommandClass(cmd)) {
3101        opts.cmdName = cmd;
3102        try {
3103          opts.numClientThreads = Integer.parseInt(args.remove());
3104        } catch (NoSuchElementException | NumberFormatException e) {
3105          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
3106        }
3107        opts = calculateRowsAndSize(opts);
3108        break;
3109      } else {
3110        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
3111      }
3112
3113      // Not matching any option or command.
3114      System.err.println("Error: Wrong option or command: " + cmd);
3115      args.add(cmd);
3116      break;
3117    }
3118    return opts;
3119  }
3120
3121  /**
3122   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3123   */
3124  private static void validateParsedOpts(TestOptions opts) {
3125
3126    if (!opts.autoFlush && opts.multiPut > 0) {
3127      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3128    }
3129
3130    if (opts.oneCon && opts.connCount > 1) {
3131      throw new IllegalArgumentException(
3132        "oneCon is set to true, " + "connCount should not bigger than 1");
3133    }
3134
3135    if (opts.valueZipf && opts.valueRandom) {
3136      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3137    }
3138  }
3139
3140  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3141    int rowsPerGB = getRowsPerGB(opts);
3142    if (
3143      (opts.getCmdName() != null
3144        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3145        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3146    ) {
3147      opts.totalRows = (int) opts.size * rowsPerGB;
3148    } else if (opts.size != DEFAULT_OPTS.size) {
3149      // total size in GB specified
3150      opts.totalRows = (int) opts.size * rowsPerGB;
3151      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3152    } else {
3153      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3154      opts.size = opts.totalRows / rowsPerGB;
3155    }
3156    return opts;
3157  }
3158
3159  static int getRowsPerGB(final TestOptions opts) {
3160    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3161      * opts.getColumns());
3162  }
3163
3164  @Override
3165  public int run(String[] args) throws Exception {
3166    // Process command-line args. TODO: Better cmd-line processing
3167    // (but hopefully something not as painful as cli options).
3168    int errCode = -1;
3169    if (args.length < 1) {
3170      printUsage();
3171      return errCode;
3172    }
3173
3174    try {
3175      LinkedList<String> argv = new LinkedList<>();
3176      argv.addAll(Arrays.asList(args));
3177      TestOptions opts = parseOpts(argv);
3178
3179      // args remaining, print help and exit
3180      if (!argv.isEmpty()) {
3181        errCode = 0;
3182        printUsage();
3183        return errCode;
3184      }
3185
3186      // must run at least 1 client
3187      if (opts.numClientThreads <= 0) {
3188        throw new IllegalArgumentException("Number of clients must be > 0");
3189      }
3190
3191      // cmdName should not be null, print help and exit
3192      if (opts.cmdName == null) {
3193        printUsage();
3194        return errCode;
3195      }
3196
3197      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3198      if (cmdClass != null) {
3199        runTest(cmdClass, opts);
3200        errCode = 0;
3201      }
3202
3203    } catch (Exception e) {
3204      e.printStackTrace();
3205    }
3206
3207    return errCode;
3208  }
3209
3210  private static boolean isCommandClass(String cmd) {
3211    return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd);
3212  }
3213
3214  private static boolean isCustomTestClass(String cmd) {
3215    Class<? extends Test> cmdClass;
3216    try {
3217      cmdClass =
3218        (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd);
3219      addCommandDescriptor(cmdClass, cmd, "custom command");
3220      return true;
3221    } catch (Throwable th) {
3222      LOG.info("No class found for command: " + cmd, th);
3223      return false;
3224    }
3225  }
3226
3227  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3228    CmdDescriptor descriptor = COMMANDS.get(cmd);
3229    return descriptor != null ? descriptor.getCmdClass() : null;
3230  }
3231
3232  public static void main(final String[] args) throws Exception {
3233    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3234    System.exit(res);
3235  }
3236}