001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Queue;
040import java.util.Random;
041import java.util.TreeMap;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.ThreadLocalRandom;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.AsyncConnection;
056import org.apache.hadoop.hbase.client.AsyncTable;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Consistency;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.RowMutations;
074import org.apache.hadoop.hbase.client.Scan;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
079import org.apache.hadoop.hbase.filter.BinaryComparator;
080import org.apache.hadoop.hbase.filter.Filter;
081import org.apache.hadoop.hbase.filter.FilterAllFilter;
082import org.apache.hadoop.hbase.filter.FilterList;
083import org.apache.hadoop.hbase.filter.PageFilter;
084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
085import org.apache.hadoop.hbase.filter.WhileMatchFilter;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
091import org.apache.hadoop.hbase.trace.TraceUtil;
092import org.apache.hadoop.hbase.util.ByteArrayHashKey;
093import org.apache.hadoop.hbase.util.Bytes;
094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
095import org.apache.hadoop.hbase.util.GsonUtil;
096import org.apache.hadoop.hbase.util.Hash;
097import org.apache.hadoop.hbase.util.MurmurHash;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.hbase.util.RandomDistribution;
100import org.apache.hadoop.hbase.util.YammerHistogramUtils;
101import org.apache.hadoop.io.LongWritable;
102import org.apache.hadoop.io.Text;
103import org.apache.hadoop.mapreduce.Job;
104import org.apache.hadoop.mapreduce.Mapper;
105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
108import org.apache.hadoop.util.Tool;
109import org.apache.hadoop.util.ToolRunner;
110import org.apache.yetus.audience.InterfaceAudience;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113
114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
116import org.apache.hbase.thirdparty.com.google.gson.Gson;
117
118/**
119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
121 * etc.). Pass on the command-line which test to run and how many clients are participating in this
122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
123 * <p>
124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
126 * pages 8-10.
127 * <p>
128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
130 * about 1GB of data, unless specified otherwise.
131 */
132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
133public class PerformanceEvaluation extends Configured implements Tool {
134  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
135  static final String RANDOM_READ = "randomRead";
136  static final String PE_COMMAND_SHORTNAME = "pe";
137  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
138  private static final Gson GSON = GsonUtil.createGson().create();
139
140  public static final String TABLE_NAME = "TestTable";
141  public static final String FAMILY_NAME_BASE = "info";
142  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
143  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
144  public static final int DEFAULT_VALUE_LENGTH = 1000;
145  public static final int ROW_LENGTH = 26;
146
147  private static final int ONE_GB = 1024 * 1024 * 1000;
148  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
149  // TODO : should we make this configurable
150  private static final int TAG_LENGTH = 256;
151  private static final DecimalFormat FMT = new DecimalFormat("0.##");
152  private static final MathContext CXT = MathContext.DECIMAL64;
153  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
154  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
155  private static final TestOptions DEFAULT_OPTS = new TestOptions();
156
157  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
158  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
159
160  static {
161    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
162      "Run async random read test");
163    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
164      "Run async random write test");
165    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
166      "Run async sequential read test");
167    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
168      "Run async sequential write test");
169    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
170    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
171    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
172    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
173      "Run random seek and scan 100 test");
174    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
175      "Run random seek scan with both start and stop row (max 10 rows)");
176    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
177      "Run random seek scan with both start and stop row (max 100 rows)");
178    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
179      "Run random seek scan with both start and stop row (max 1000 rows)");
180    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
181      "Run random seek scan with both start and stop row (max 10000 rows)");
182    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
183    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
184    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
185    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
186      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
187    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
188    addCommandDescriptor(FilteredScanTest.class, "filterScan",
189      "Run scan test using a filter to find a specific row based on it's value "
190        + "(make sure to use --rows=20)");
191    addCommandDescriptor(IncrementTest.class, "increment",
192      "Increment on each row; clients overlap on keyspace so some concurrent operations");
193    addCommandDescriptor(AppendTest.class, "append",
194      "Append on each row; clients overlap on keyspace so some concurrent operations");
195    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
196      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
197    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
198      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
200      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
202      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
203  }
204
205  /**
206   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
207   * associated properties.
208   */
209  protected static enum Counter {
210    /** elapsed time */
211    ELAPSED_TIME,
212    /** number of rows */
213    ROWS
214  }
215
216  protected static class RunResult implements Comparable<RunResult> {
217    public RunResult(long duration, Histogram hist) {
218      this.duration = duration;
219      this.hist = hist;
220      numbOfReplyOverThreshold = 0;
221      numOfReplyFromReplica = 0;
222    }
223
224    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
225      Histogram hist) {
226      this.duration = duration;
227      this.hist = hist;
228      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
229      this.numOfReplyFromReplica = numOfReplyFromReplica;
230    }
231
232    public final long duration;
233    public final Histogram hist;
234    public final long numbOfReplyOverThreshold;
235    public final long numOfReplyFromReplica;
236
237    @Override
238    public String toString() {
239      return Long.toString(duration);
240    }
241
242    @Override
243    public int compareTo(RunResult o) {
244      return Long.compare(this.duration, o.duration);
245    }
246  }
247
248  /**
249   * Constructor
250   * @param conf Configuration object
251   */
252  public PerformanceEvaluation(final Configuration conf) {
253    super(conf);
254  }
255
256  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
257    String description) {
258    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
259    COMMANDS.put(name, cmdDescriptor);
260  }
261
262  /**
263   * Implementations can have their status set.
264   */
265  interface Status {
266    /**
267     * Sets status
268     * @param msg status message n
269     */
270    void setStatus(final String msg) throws IOException;
271  }
272
273  /**
274   * MapReduce job that runs a performance evaluation client in each map task.
275   */
276  public static class EvaluationMapTask
277    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
278
279    /** configuration parameter name that contains the command */
280    public final static String CMD_KEY = "EvaluationMapTask.command";
281    /** configuration parameter name that contains the PE impl */
282    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
283
284    private Class<? extends Test> cmd;
285
286    @Override
287    protected void setup(Context context) throws IOException, InterruptedException {
288      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
289
290      // this is required so that extensions of PE are instantiated within the
291      // map reduce task...
292      Class<? extends PerformanceEvaluation> peClass =
293        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
294      try {
295        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
296      } catch (Exception e) {
297        throw new IllegalStateException("Could not instantiate PE instance", e);
298      }
299    }
300
301    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
302      try {
303        return Class.forName(className).asSubclass(type);
304      } catch (ClassNotFoundException e) {
305        throw new IllegalStateException("Could not find class for name: " + className, e);
306      }
307    }
308
309    @Override
310    protected void map(LongWritable key, Text value, final Context context)
311      throws IOException, InterruptedException {
312
313      Status status = new Status() {
314        @Override
315        public void setStatus(String msg) {
316          context.setStatus(msg);
317        }
318      };
319
320      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
321      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
322      final Connection con = ConnectionFactory.createConnection(conf);
323      AsyncConnection asyncCon = null;
324      try {
325        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
326      } catch (ExecutionException e) {
327        throw new IOException(e);
328      }
329
330      // Evaluation task
331      RunResult result =
332        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
333      // Collect how much time the thing took. Report as map output and
334      // to the ELAPSED_TIME counter.
335      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
336      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
337      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
338      context.progress();
339    }
340  }
341
342  /*
343   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
344   * is specified or when the existing table's region replica count doesn't match {@code
345   * opts.replicas}.
346   */
347  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
348    TableName tableName = TableName.valueOf(opts.tableName);
349    boolean needsDelete = false, exists = admin.tableExists(tableName);
350    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
351      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
352    if (!exists && isReadCmd) {
353      throw new IllegalStateException(
354        "Must specify an existing table for read commands. Run a write command first.");
355    }
356    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
357    byte[][] splits = getSplits(opts);
358
359    // recreate the table when user has requested presplit or when existing
360    // {RegionSplitPolicy,replica count} does not match requested, or when the
361    // number of column families does not match requested.
362    if (
363      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
364        || (!isReadCmd && desc != null
365          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
366        || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
367        || (desc != null && desc.getColumnFamilyCount() != opts.families)
368    ) {
369      needsDelete = true;
370      // wait, why did it delete my table?!?
371      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
372        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
373        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
374        .add("replicas", opts.replicas).add("families", opts.families).toString());
375    }
376
377    // remove an existing table
378    if (needsDelete) {
379      if (admin.isTableEnabled(tableName)) {
380        admin.disableTable(tableName);
381      }
382      admin.deleteTable(tableName);
383    }
384
385    // table creation is necessary
386    if (!exists || needsDelete) {
387      desc = getTableDescriptor(opts);
388      if (splits != null) {
389        if (LOG.isDebugEnabled()) {
390          for (int i = 0; i < splits.length; i++) {
391            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
392          }
393        }
394      }
395      if (splits != null) {
396        admin.createTable(desc, splits);
397      } else {
398        admin.createTable(desc);
399      }
400      LOG.info("Table " + desc + " created");
401    }
402    return admin.tableExists(tableName);
403  }
404
405  /**
406   * Create an HTableDescriptor from provided TestOptions.
407   */
408  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
409    TableDescriptorBuilder builder =
410      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
411
412    for (int family = 0; family < opts.families; family++) {
413      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
414      ColumnFamilyDescriptorBuilder cfBuilder =
415        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
416      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
417      cfBuilder.setCompressionType(opts.compression);
418      cfBuilder.setBloomFilterType(opts.bloomType);
419      cfBuilder.setBlocksize(opts.blockSize);
420      if (opts.inMemoryCF) {
421        cfBuilder.setInMemory(true);
422      }
423      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
424      builder.setColumnFamily(cfBuilder.build());
425    }
426    if (opts.replicas != DEFAULT_OPTS.replicas) {
427      builder.setRegionReplication(opts.replicas);
428    }
429    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
430      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
431    }
432    return builder.build();
433  }
434
435  /**
436   * generates splits based on total number of rows and specified split regions
437   */
438  protected static byte[][] getSplits(TestOptions opts) {
439    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
440
441    int numSplitPoints = opts.presplitRegions - 1;
442    byte[][] splits = new byte[numSplitPoints][];
443    int jump = opts.totalRows / opts.presplitRegions;
444    for (int i = 0; i < numSplitPoints; i++) {
445      int rowkey = jump * (1 + i);
446      splits[i] = format(rowkey);
447    }
448    return splits;
449  }
450
451  static void setupConnectionCount(final TestOptions opts) {
452    if (opts.oneCon) {
453      opts.connCount = 1;
454    } else {
455      if (opts.connCount == -1) {
456        // set to thread number if connCount is not set
457        opts.connCount = opts.numClientThreads;
458      }
459    }
460  }
461
462  /*
463   * Run all clients in this vm each to its own thread.
464   */
465  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
466    throws IOException, InterruptedException, ExecutionException {
467    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
468    assert cmd != null;
469    @SuppressWarnings("unchecked")
470    Future<RunResult>[] threads = new Future[opts.numClientThreads];
471    RunResult[] results = new RunResult[opts.numClientThreads];
472    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
473      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
474    setupConnectionCount(opts);
475    final Connection[] cons = new Connection[opts.connCount];
476    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
477    for (int i = 0; i < opts.connCount; i++) {
478      cons[i] = ConnectionFactory.createConnection(conf);
479      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
480    }
481    LOG
482      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
483    for (int i = 0; i < threads.length; i++) {
484      final int index = i;
485      threads[i] = pool.submit(new Callable<RunResult>() {
486        @Override
487        public RunResult call() throws Exception {
488          TestOptions threadOpts = new TestOptions(opts);
489          final Connection con = cons[index % cons.length];
490          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
491          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
492          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
493            @Override
494            public void setStatus(final String msg) throws IOException {
495              LOG.info(msg);
496            }
497          });
498          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
499            + "ms over " + threadOpts.perClientRunRows + " rows");
500          if (opts.latencyThreshold > 0) {
501            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
502              + "(ms) is " + run.numbOfReplyOverThreshold);
503          }
504          return run;
505        }
506      });
507    }
508    pool.shutdown();
509
510    for (int i = 0; i < threads.length; i++) {
511      try {
512        results[i] = threads[i].get();
513      } catch (ExecutionException e) {
514        throw new IOException(e.getCause());
515      }
516    }
517    final String test = cmd.getSimpleName();
518    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
519    Arrays.sort(results);
520    long total = 0;
521    float avgLatency = 0;
522    float avgTPS = 0;
523    long replicaWins = 0;
524    for (RunResult result : results) {
525      total += result.duration;
526      avgLatency += result.hist.getSnapshot().getMean();
527      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
528      replicaWins += result.numOfReplyFromReplica;
529    }
530    avgTPS *= 1000; // ms to second
531    avgLatency = avgLatency / results.length;
532    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
533      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
534    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
535    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
536    if (opts.replicas > 1) {
537      LOG.info("[results from replica regions] " + replicaWins);
538    }
539
540    for (int i = 0; i < opts.connCount; i++) {
541      cons[i].close();
542      asyncCons[i].close();
543    }
544
545    return results;
546  }
547
548  /*
549   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
550   * out an input file with instruction per client regards which row they are to start on.
551   * @param cmd Command to run. n
552   */
553  static Job doMapReduce(TestOptions opts, final Configuration conf)
554    throws IOException, InterruptedException, ClassNotFoundException {
555    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
556    assert cmd != null;
557    Path inputDir = writeInputFile(conf, opts);
558    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
559    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
560    Job job = Job.getInstance(conf);
561    job.setJarByClass(PerformanceEvaluation.class);
562    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
563
564    job.setInputFormatClass(NLineInputFormat.class);
565    NLineInputFormat.setInputPaths(job, inputDir);
566    // this is default, but be explicit about it just in case.
567    NLineInputFormat.setNumLinesPerSplit(job, 1);
568
569    job.setOutputKeyClass(LongWritable.class);
570    job.setOutputValueClass(LongWritable.class);
571
572    job.setMapperClass(EvaluationMapTask.class);
573    job.setReducerClass(LongSumReducer.class);
574
575    job.setNumReduceTasks(1);
576
577    job.setOutputFormatClass(TextOutputFormat.class);
578    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
579
580    TableMapReduceUtil.addDependencyJars(job);
581    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
582                                                                                            // metrics
583      Gson.class, // gson
584      FilterAllFilter.class // hbase-server tests jar
585    );
586
587    TableMapReduceUtil.initCredentials(job);
588
589    job.waitForCompletion(true);
590    return job;
591  }
592
593  /**
594   * Each client has one mapper to do the work, and client do the resulting count in a map task.
595   */
596
597  static String JOB_INPUT_FILENAME = "input.txt";
598
599  /*
600   * Write input file of offsets-per-client for the mapreduce job.
601   * @param c Configuration
602   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME n
603   */
604  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
605    return writeInputFile(c, opts, new Path("."));
606  }
607
608  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
609    throws IOException {
610    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
611    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
612    Path inputDir = new Path(jobdir, "inputs");
613
614    FileSystem fs = FileSystem.get(c);
615    fs.mkdirs(inputDir);
616
617    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
618    PrintStream out = new PrintStream(fs.create(inputFile));
619    // Make input random.
620    Map<Integer, String> m = new TreeMap<>();
621    Hash h = MurmurHash.getInstance();
622    int perClientRows = (opts.totalRows / opts.numClientThreads);
623    try {
624      for (int j = 0; j < opts.numClientThreads; j++) {
625        TestOptions next = new TestOptions(opts);
626        next.startRow = j * perClientRows;
627        next.perClientRunRows = perClientRows;
628        String s = GSON.toJson(next);
629        LOG.info("Client=" + j + ", input=" + s);
630        byte[] b = Bytes.toBytes(s);
631        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
632        m.put(hash, s);
633      }
634      for (Map.Entry<Integer, String> e : m.entrySet()) {
635        out.println(e.getValue());
636      }
637    } finally {
638      out.close();
639    }
640    return inputDir;
641  }
642
643  /**
644   * Describes a command.
645   */
646  static class CmdDescriptor {
647    private Class<? extends TestBase> cmdClass;
648    private String name;
649    private String description;
650
651    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
652      this.cmdClass = cmdClass;
653      this.name = name;
654      this.description = description;
655    }
656
657    public Class<? extends TestBase> getCmdClass() {
658      return cmdClass;
659    }
660
661    public String getName() {
662      return name;
663    }
664
665    public String getDescription() {
666      return description;
667    }
668  }
669
670  /**
671   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
672   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
673   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
674   * need to add to the clone constructor below copying your new option from the 'that' to the
675   * 'this'. Look for 'clone' below.
676   */
677  static class TestOptions {
678    String cmdName = null;
679    boolean nomapred = false;
680    boolean filterAll = false;
681    int startRow = 0;
682    float size = 1.0f;
683    int perClientRunRows = DEFAULT_ROWS_PER_GB;
684    int numClientThreads = 1;
685    int totalRows = DEFAULT_ROWS_PER_GB;
686    int measureAfter = 0;
687    float sampleRate = 1.0f;
688    /**
689     * @deprecated Useless after switching to OpenTelemetry
690     */
691    @Deprecated
692    double traceRate = 0.0;
693    String tableName = TABLE_NAME;
694    boolean flushCommits = true;
695    boolean writeToWAL = true;
696    boolean autoFlush = false;
697    boolean oneCon = false;
698    int connCount = -1; // wil decide the actual num later
699    boolean useTags = false;
700    int noOfTags = 1;
701    boolean reportLatency = false;
702    int multiGet = 0;
703    int multiPut = 0;
704    int randomSleep = 0;
705    boolean inMemoryCF = false;
706    int presplitRegions = 0;
707    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
708    String splitPolicy = null;
709    Compression.Algorithm compression = Compression.Algorithm.NONE;
710    BloomType bloomType = BloomType.ROW;
711    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
712    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
713    boolean valueRandom = false;
714    boolean valueZipf = false;
715    int valueSize = DEFAULT_VALUE_LENGTH;
716    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
717    int cycles = 1;
718    int columns = 1;
719    int families = 1;
720    int caching = 30;
721    int latencyThreshold = 0; // in millsecond
722    boolean addColumns = true;
723    MemoryCompactionPolicy inMemoryCompaction =
724      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
725    boolean asyncPrefetch = false;
726    boolean cacheBlocks = true;
727    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
728    long bufferSize = 2l * 1024l * 1024l;
729
730    public TestOptions() {
731    }
732
733    /**
734     * Clone constructor.
735     * @param that Object to copy from.
736     */
737    public TestOptions(TestOptions that) {
738      this.cmdName = that.cmdName;
739      this.cycles = that.cycles;
740      this.nomapred = that.nomapred;
741      this.startRow = that.startRow;
742      this.size = that.size;
743      this.perClientRunRows = that.perClientRunRows;
744      this.numClientThreads = that.numClientThreads;
745      this.totalRows = that.totalRows;
746      this.sampleRate = that.sampleRate;
747      this.traceRate = that.traceRate;
748      this.tableName = that.tableName;
749      this.flushCommits = that.flushCommits;
750      this.writeToWAL = that.writeToWAL;
751      this.autoFlush = that.autoFlush;
752      this.oneCon = that.oneCon;
753      this.connCount = that.connCount;
754      this.useTags = that.useTags;
755      this.noOfTags = that.noOfTags;
756      this.reportLatency = that.reportLatency;
757      this.latencyThreshold = that.latencyThreshold;
758      this.multiGet = that.multiGet;
759      this.multiPut = that.multiPut;
760      this.inMemoryCF = that.inMemoryCF;
761      this.presplitRegions = that.presplitRegions;
762      this.replicas = that.replicas;
763      this.splitPolicy = that.splitPolicy;
764      this.compression = that.compression;
765      this.blockEncoding = that.blockEncoding;
766      this.filterAll = that.filterAll;
767      this.bloomType = that.bloomType;
768      this.blockSize = that.blockSize;
769      this.valueRandom = that.valueRandom;
770      this.valueZipf = that.valueZipf;
771      this.valueSize = that.valueSize;
772      this.period = that.period;
773      this.randomSleep = that.randomSleep;
774      this.measureAfter = that.measureAfter;
775      this.addColumns = that.addColumns;
776      this.columns = that.columns;
777      this.families = that.families;
778      this.caching = that.caching;
779      this.inMemoryCompaction = that.inMemoryCompaction;
780      this.asyncPrefetch = that.asyncPrefetch;
781      this.cacheBlocks = that.cacheBlocks;
782      this.scanReadType = that.scanReadType;
783      this.bufferSize = that.bufferSize;
784    }
785
786    public int getCaching() {
787      return this.caching;
788    }
789
790    public void setCaching(final int caching) {
791      this.caching = caching;
792    }
793
794    public int getColumns() {
795      return this.columns;
796    }
797
798    public void setColumns(final int columns) {
799      this.columns = columns;
800    }
801
802    public int getFamilies() {
803      return this.families;
804    }
805
806    public void setFamilies(final int families) {
807      this.families = families;
808    }
809
810    public int getCycles() {
811      return this.cycles;
812    }
813
814    public void setCycles(final int cycles) {
815      this.cycles = cycles;
816    }
817
818    public boolean isValueZipf() {
819      return valueZipf;
820    }
821
822    public void setValueZipf(boolean valueZipf) {
823      this.valueZipf = valueZipf;
824    }
825
826    public String getCmdName() {
827      return cmdName;
828    }
829
830    public void setCmdName(String cmdName) {
831      this.cmdName = cmdName;
832    }
833
834    public int getRandomSleep() {
835      return randomSleep;
836    }
837
838    public void setRandomSleep(int randomSleep) {
839      this.randomSleep = randomSleep;
840    }
841
842    public int getReplicas() {
843      return replicas;
844    }
845
846    public void setReplicas(int replicas) {
847      this.replicas = replicas;
848    }
849
850    public String getSplitPolicy() {
851      return splitPolicy;
852    }
853
854    public void setSplitPolicy(String splitPolicy) {
855      this.splitPolicy = splitPolicy;
856    }
857
858    public void setNomapred(boolean nomapred) {
859      this.nomapred = nomapred;
860    }
861
862    public void setFilterAll(boolean filterAll) {
863      this.filterAll = filterAll;
864    }
865
866    public void setStartRow(int startRow) {
867      this.startRow = startRow;
868    }
869
870    public void setSize(float size) {
871      this.size = size;
872    }
873
874    public void setPerClientRunRows(int perClientRunRows) {
875      this.perClientRunRows = perClientRunRows;
876    }
877
878    public void setNumClientThreads(int numClientThreads) {
879      this.numClientThreads = numClientThreads;
880    }
881
882    public void setTotalRows(int totalRows) {
883      this.totalRows = totalRows;
884    }
885
886    public void setSampleRate(float sampleRate) {
887      this.sampleRate = sampleRate;
888    }
889
890    public void setTraceRate(double traceRate) {
891      this.traceRate = traceRate;
892    }
893
894    public void setTableName(String tableName) {
895      this.tableName = tableName;
896    }
897
898    public void setFlushCommits(boolean flushCommits) {
899      this.flushCommits = flushCommits;
900    }
901
902    public void setWriteToWAL(boolean writeToWAL) {
903      this.writeToWAL = writeToWAL;
904    }
905
906    public void setAutoFlush(boolean autoFlush) {
907      this.autoFlush = autoFlush;
908    }
909
910    public void setOneCon(boolean oneCon) {
911      this.oneCon = oneCon;
912    }
913
914    public int getConnCount() {
915      return connCount;
916    }
917
918    public void setConnCount(int connCount) {
919      this.connCount = connCount;
920    }
921
922    public void setUseTags(boolean useTags) {
923      this.useTags = useTags;
924    }
925
926    public void setNoOfTags(int noOfTags) {
927      this.noOfTags = noOfTags;
928    }
929
930    public void setReportLatency(boolean reportLatency) {
931      this.reportLatency = reportLatency;
932    }
933
934    public void setMultiGet(int multiGet) {
935      this.multiGet = multiGet;
936    }
937
938    public void setMultiPut(int multiPut) {
939      this.multiPut = multiPut;
940    }
941
942    public void setInMemoryCF(boolean inMemoryCF) {
943      this.inMemoryCF = inMemoryCF;
944    }
945
946    public void setPresplitRegions(int presplitRegions) {
947      this.presplitRegions = presplitRegions;
948    }
949
950    public void setCompression(Compression.Algorithm compression) {
951      this.compression = compression;
952    }
953
954    public void setBloomType(BloomType bloomType) {
955      this.bloomType = bloomType;
956    }
957
958    public void setBlockSize(int blockSize) {
959      this.blockSize = blockSize;
960    }
961
962    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
963      this.blockEncoding = blockEncoding;
964    }
965
966    public void setValueRandom(boolean valueRandom) {
967      this.valueRandom = valueRandom;
968    }
969
970    public void setValueSize(int valueSize) {
971      this.valueSize = valueSize;
972    }
973
974    public void setBufferSize(long bufferSize) {
975      this.bufferSize = bufferSize;
976    }
977
978    public void setPeriod(int period) {
979      this.period = period;
980    }
981
982    public boolean isNomapred() {
983      return nomapred;
984    }
985
986    public boolean isFilterAll() {
987      return filterAll;
988    }
989
990    public int getStartRow() {
991      return startRow;
992    }
993
994    public float getSize() {
995      return size;
996    }
997
998    public int getPerClientRunRows() {
999      return perClientRunRows;
1000    }
1001
1002    public int getNumClientThreads() {
1003      return numClientThreads;
1004    }
1005
1006    public int getTotalRows() {
1007      return totalRows;
1008    }
1009
1010    public float getSampleRate() {
1011      return sampleRate;
1012    }
1013
1014    public double getTraceRate() {
1015      return traceRate;
1016    }
1017
1018    public String getTableName() {
1019      return tableName;
1020    }
1021
1022    public boolean isFlushCommits() {
1023      return flushCommits;
1024    }
1025
1026    public boolean isWriteToWAL() {
1027      return writeToWAL;
1028    }
1029
1030    public boolean isAutoFlush() {
1031      return autoFlush;
1032    }
1033
1034    public boolean isUseTags() {
1035      return useTags;
1036    }
1037
1038    public int getNoOfTags() {
1039      return noOfTags;
1040    }
1041
1042    public boolean isReportLatency() {
1043      return reportLatency;
1044    }
1045
1046    public int getMultiGet() {
1047      return multiGet;
1048    }
1049
1050    public int getMultiPut() {
1051      return multiPut;
1052    }
1053
1054    public boolean isInMemoryCF() {
1055      return inMemoryCF;
1056    }
1057
1058    public int getPresplitRegions() {
1059      return presplitRegions;
1060    }
1061
1062    public Compression.Algorithm getCompression() {
1063      return compression;
1064    }
1065
1066    public DataBlockEncoding getBlockEncoding() {
1067      return blockEncoding;
1068    }
1069
1070    public boolean isValueRandom() {
1071      return valueRandom;
1072    }
1073
1074    public int getValueSize() {
1075      return valueSize;
1076    }
1077
1078    public int getPeriod() {
1079      return period;
1080    }
1081
1082    public BloomType getBloomType() {
1083      return bloomType;
1084    }
1085
1086    public int getBlockSize() {
1087      return blockSize;
1088    }
1089
1090    public boolean isOneCon() {
1091      return oneCon;
1092    }
1093
1094    public int getMeasureAfter() {
1095      return measureAfter;
1096    }
1097
1098    public void setMeasureAfter(int measureAfter) {
1099      this.measureAfter = measureAfter;
1100    }
1101
1102    public boolean getAddColumns() {
1103      return addColumns;
1104    }
1105
1106    public void setAddColumns(boolean addColumns) {
1107      this.addColumns = addColumns;
1108    }
1109
1110    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1111      this.inMemoryCompaction = inMemoryCompaction;
1112    }
1113
1114    public MemoryCompactionPolicy getInMemoryCompaction() {
1115      return this.inMemoryCompaction;
1116    }
1117
1118    public long getBufferSize() {
1119      return this.bufferSize;
1120    }
1121  }
1122
1123  /*
1124   * A test. Subclass to particularize what happens per row.
1125   */
1126  static abstract class TestBase {
1127    // Below is make it so when Tests are all running in the one
1128    // jvm, that they each have a differently seeded Random.
1129    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1130
1131    private static long nextRandomSeed() {
1132      return randomSeed.nextLong();
1133    }
1134
1135    private final int everyN;
1136
1137    protected final Random rand = new Random(nextRandomSeed());
1138    protected final Configuration conf;
1139    protected final TestOptions opts;
1140
1141    private final Status status;
1142
1143    private String testName;
1144    private Histogram latencyHistogram;
1145    private Histogram replicaLatencyHistogram;
1146    private Histogram valueSizeHistogram;
1147    private Histogram rpcCallsHistogram;
1148    private Histogram remoteRpcCallsHistogram;
1149    private Histogram millisBetweenNextHistogram;
1150    private Histogram regionsScannedHistogram;
1151    private Histogram bytesInResultsHistogram;
1152    private Histogram bytesInRemoteResultsHistogram;
1153    private RandomDistribution.Zipf zipf;
1154    private long numOfReplyOverLatencyThreshold = 0;
1155    private long numOfReplyFromReplica = 0;
1156
1157    /**
1158     * Note that all subclasses of this class must provide a public constructor that has the exact
1159     * same list of arguments.
1160     */
1161    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1162      this.conf = conf;
1163      this.opts = options;
1164      this.status = status;
1165      this.testName = this.getClass().getSimpleName();
1166      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1167      if (options.isValueZipf()) {
1168        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1169      }
1170      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1171    }
1172
1173    int getValueLength(final Random r) {
1174      if (this.opts.isValueRandom()) {
1175        return r.nextInt(opts.valueSize);
1176      } else if (this.opts.isValueZipf()) {
1177        return Math.abs(this.zipf.nextInt());
1178      } else {
1179        return opts.valueSize;
1180      }
1181    }
1182
1183    void updateValueSize(final Result[] rs) throws IOException {
1184      updateValueSize(rs, 0);
1185    }
1186
1187    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1188      if (rs == null || (latency == 0)) return;
1189      for (Result r : rs)
1190        updateValueSize(r, latency);
1191    }
1192
1193    void updateValueSize(final Result r) throws IOException {
1194      updateValueSize(r, 0);
1195    }
1196
1197    void updateValueSize(final Result r, final long latency) throws IOException {
1198      if (r == null || (latency == 0)) return;
1199      int size = 0;
1200      // update replicaHistogram
1201      if (r.isStale()) {
1202        replicaLatencyHistogram.update(latency / 1000);
1203        numOfReplyFromReplica++;
1204      }
1205      if (!isRandomValueSize()) return;
1206
1207      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1208        size += scanner.current().getValueLength();
1209      }
1210      updateValueSize(size);
1211    }
1212
1213    void updateValueSize(final int valueSize) {
1214      if (!isRandomValueSize()) return;
1215      this.valueSizeHistogram.update(valueSize);
1216    }
1217
1218    void updateScanMetrics(final ScanMetrics metrics) {
1219      if (metrics == null) return;
1220      Map<String, Long> metricsMap = metrics.getMetricsMap();
1221      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1222      if (rpcCalls != null) {
1223        this.rpcCallsHistogram.update(rpcCalls.longValue());
1224      }
1225      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1226      if (remoteRpcCalls != null) {
1227        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1228      }
1229      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1230      if (millisBetweenNext != null) {
1231        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1232      }
1233      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1234      if (regionsScanned != null) {
1235        this.regionsScannedHistogram.update(regionsScanned.longValue());
1236      }
1237      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1238      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1239        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1240      }
1241      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1242      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1243        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1244      }
1245    }
1246
1247    String generateStatus(final int sr, final int i, final int lr) {
1248      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1249        + getShortLatencyReport() + "]"
1250        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1251    }
1252
1253    boolean isRandomValueSize() {
1254      return opts.valueRandom;
1255    }
1256
1257    protected int getReportingPeriod() {
1258      return opts.period;
1259    }
1260
1261    /**
1262     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1263     */
1264    public Histogram getLatencyHistogram() {
1265      return latencyHistogram;
1266    }
1267
1268    void testSetup() throws IOException {
1269      // test metrics
1270      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1271      // If it is a replica test, set up histogram for replica.
1272      if (opts.replicas > 1) {
1273        replicaLatencyHistogram =
1274          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1275      }
1276      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1277      // scan metrics
1278      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1279      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1280      millisBetweenNextHistogram =
1281        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1282      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1283      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1284      bytesInRemoteResultsHistogram =
1285        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1286
1287      onStartup();
1288    }
1289
1290    abstract void onStartup() throws IOException;
1291
1292    void testTakedown() throws IOException {
1293      onTakedown();
1294      // Print all stats for this thread continuously.
1295      // Synchronize on Test.class so different threads don't intermingle the
1296      // output. We can't use 'this' here because each thread has its own instance of Test class.
1297      synchronized (Test.class) {
1298        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1299        status
1300          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1301        if (opts.replicas > 1) {
1302          status.setStatus("Latency (us) from Replica Regions: "
1303            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1304        }
1305        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1306        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1307        if (valueSizeHistogram.getCount() > 0) {
1308          status.setStatus(
1309            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1310          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1311          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1312        } else {
1313          status.setStatus("No valueSize statistics available");
1314        }
1315        if (rpcCallsHistogram.getCount() > 0) {
1316          status.setStatus(
1317            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1318        }
1319        if (remoteRpcCallsHistogram.getCount() > 0) {
1320          status.setStatus("remoteRpcCalls (count): "
1321            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1322        }
1323        if (millisBetweenNextHistogram.getCount() > 0) {
1324          status.setStatus("millisBetweenNext (latency): "
1325            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1326        }
1327        if (regionsScannedHistogram.getCount() > 0) {
1328          status.setStatus("regionsScanned (count): "
1329            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1330        }
1331        if (bytesInResultsHistogram.getCount() > 0) {
1332          status.setStatus("bytesInResults (size): "
1333            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1334        }
1335        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1336          status.setStatus("bytesInRemoteResults (size): "
1337            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1338        }
1339      }
1340    }
1341
1342    abstract void onTakedown() throws IOException;
1343
1344    /*
1345     * Run test
1346     * @return Elapsed time. n
1347     */
1348    long test() throws IOException, InterruptedException {
1349      testSetup();
1350      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1351      final long startTime = System.nanoTime();
1352      try {
1353        testTimed();
1354      } finally {
1355        testTakedown();
1356      }
1357      return (System.nanoTime() - startTime) / 1000000;
1358    }
1359
1360    int getStartRow() {
1361      return opts.startRow;
1362    }
1363
1364    int getLastRow() {
1365      return getStartRow() + opts.perClientRunRows;
1366    }
1367
1368    /**
1369     * Provides an extension point for tests that don't want a per row invocation.
1370     */
1371    void testTimed() throws IOException, InterruptedException {
1372      int startRow = getStartRow();
1373      int lastRow = getLastRow();
1374      // Report on completion of 1/10th of total.
1375      for (int ii = 0; ii < opts.cycles; ii++) {
1376        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1377        for (int i = startRow; i < lastRow; i++) {
1378          if (i % everyN != 0) continue;
1379          long startTime = System.nanoTime();
1380          boolean requestSent = false;
1381          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1382          try (Scope scope = span.makeCurrent()) {
1383            requestSent = testRow(i, startTime);
1384          } finally {
1385            span.end();
1386          }
1387          if ((i - startRow) > opts.measureAfter) {
1388            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1389            // first 9 times and sends the actual get request in the 10th iteration.
1390            // We should only set latency when actual request is sent because otherwise
1391            // it turns out to be 0.
1392            if (requestSent) {
1393              long latency = (System.nanoTime() - startTime) / 1000;
1394              latencyHistogram.update(latency);
1395              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1396                numOfReplyOverLatencyThreshold++;
1397              }
1398            }
1399            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1400              status.setStatus(generateStatus(startRow, i, lastRow));
1401            }
1402          }
1403        }
1404      }
1405    }
1406
1407    /**
1408     * @return Subset of the histograms' calculation.
1409     */
1410    public String getShortLatencyReport() {
1411      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1412    }
1413
1414    /**
1415     * @return Subset of the histograms' calculation.
1416     */
1417    public String getShortValueSizeReport() {
1418      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1419    }
1420
1421    /**
1422     * Test for individual row.
1423     * @param i Row index.
1424     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1425     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1426     */
1427    abstract boolean testRow(final int i, final long startTime)
1428      throws IOException, InterruptedException;
1429  }
1430
1431  static abstract class Test extends TestBase {
1432    protected Connection connection;
1433
1434    Test(final Connection con, final TestOptions options, final Status status) {
1435      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1436      this.connection = con;
1437    }
1438  }
1439
1440  static abstract class AsyncTest extends TestBase {
1441    protected AsyncConnection connection;
1442
1443    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1444      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1445      this.connection = con;
1446    }
1447  }
1448
1449  static abstract class TableTest extends Test {
1450    protected Table table;
1451
1452    TableTest(Connection con, TestOptions options, Status status) {
1453      super(con, options, status);
1454    }
1455
1456    @Override
1457    void onStartup() throws IOException {
1458      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1459    }
1460
1461    @Override
1462    void onTakedown() throws IOException {
1463      table.close();
1464    }
1465  }
1466
1467  /*
1468   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1469   */
1470  static abstract class MetaTest extends TableTest {
1471    protected int keyLength;
1472
1473    MetaTest(Connection con, TestOptions options, Status status) {
1474      super(con, options, status);
1475      keyLength = Integer.toString(opts.perClientRunRows).length();
1476    }
1477
1478    @Override
1479    void onTakedown() throws IOException {
1480      // No clean up
1481    }
1482
1483    /*
1484     * Generates Lexicographically ascending strings
1485     */
1486    protected byte[] getSplitKey(final int i) {
1487      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1488    }
1489
1490  }
1491
1492  static abstract class AsyncTableTest extends AsyncTest {
1493    protected AsyncTable<?> table;
1494
1495    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1496      super(con, options, status);
1497    }
1498
1499    @Override
1500    void onStartup() throws IOException {
1501      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1502    }
1503
1504    @Override
1505    void onTakedown() throws IOException {
1506    }
1507  }
1508
1509  static class AsyncRandomReadTest extends AsyncTableTest {
1510    private final Consistency consistency;
1511    private ArrayList<Get> gets;
1512
1513    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1514      super(con, options, status);
1515      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1516      if (opts.multiGet > 0) {
1517        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1518        this.gets = new ArrayList<>(opts.multiGet);
1519      }
1520    }
1521
1522    @Override
1523    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1524      if (opts.randomSleep > 0) {
1525        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1526      }
1527      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1528      for (int family = 0; family < opts.families; family++) {
1529        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1530        if (opts.addColumns) {
1531          for (int column = 0; column < opts.columns; column++) {
1532            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1533            get.addColumn(familyName, qualifier);
1534          }
1535        } else {
1536          get.addFamily(familyName);
1537        }
1538      }
1539      if (opts.filterAll) {
1540        get.setFilter(new FilterAllFilter());
1541      }
1542      get.setConsistency(consistency);
1543      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1544      try {
1545        if (opts.multiGet > 0) {
1546          this.gets.add(get);
1547          if (this.gets.size() == opts.multiGet) {
1548            Result[] rs =
1549              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1550            updateValueSize(rs);
1551            this.gets.clear();
1552          } else {
1553            return false;
1554          }
1555        } else {
1556          updateValueSize(this.table.get(get).get());
1557        }
1558      } catch (ExecutionException e) {
1559        throw new IOException(e);
1560      }
1561      return true;
1562    }
1563
1564    public static RuntimeException runtime(Throwable e) {
1565      if (e instanceof RuntimeException) {
1566        return (RuntimeException) e;
1567      }
1568      return new RuntimeException(e);
1569    }
1570
1571    public static <V> V propagate(Callable<V> callable) {
1572      try {
1573        return callable.call();
1574      } catch (Exception e) {
1575        throw runtime(e);
1576      }
1577    }
1578
1579    @Override
1580    protected int getReportingPeriod() {
1581      int period = opts.perClientRunRows / 10;
1582      return period == 0 ? opts.perClientRunRows : period;
1583    }
1584
1585    @Override
1586    protected void testTakedown() throws IOException {
1587      if (this.gets != null && this.gets.size() > 0) {
1588        this.table.get(gets);
1589        this.gets.clear();
1590      }
1591      super.testTakedown();
1592    }
1593  }
1594
1595  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1596
1597    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1598      super(con, options, status);
1599    }
1600
1601    @Override
1602    protected byte[] generateRow(final int i) {
1603      return getRandomRow(this.rand, opts.totalRows);
1604    }
1605  }
1606
1607  static class AsyncScanTest extends AsyncTableTest {
1608    private ResultScanner testScanner;
1609    private AsyncTable<?> asyncTable;
1610
1611    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1612      super(con, options, status);
1613    }
1614
1615    @Override
1616    void onStartup() throws IOException {
1617      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1618        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1619    }
1620
1621    @Override
1622    void testTakedown() throws IOException {
1623      if (this.testScanner != null) {
1624        updateScanMetrics(this.testScanner.getScanMetrics());
1625        this.testScanner.close();
1626      }
1627      super.testTakedown();
1628    }
1629
1630    @Override
1631    boolean testRow(final int i, final long startTime) throws IOException {
1632      if (this.testScanner == null) {
1633        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1634          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1635          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1636        for (int family = 0; family < opts.families; family++) {
1637          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1638          if (opts.addColumns) {
1639            for (int column = 0; column < opts.columns; column++) {
1640              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1641              scan.addColumn(familyName, qualifier);
1642            }
1643          } else {
1644            scan.addFamily(familyName);
1645          }
1646        }
1647        if (opts.filterAll) {
1648          scan.setFilter(new FilterAllFilter());
1649        }
1650        this.testScanner = asyncTable.getScanner(scan);
1651      }
1652      Result r = testScanner.next();
1653      updateValueSize(r);
1654      return true;
1655    }
1656  }
1657
1658  static class AsyncSequentialReadTest extends AsyncTableTest {
1659    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1660      super(con, options, status);
1661    }
1662
1663    @Override
1664    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1665      Get get = new Get(format(i));
1666      for (int family = 0; family < opts.families; family++) {
1667        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1668        if (opts.addColumns) {
1669          for (int column = 0; column < opts.columns; column++) {
1670            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1671            get.addColumn(familyName, qualifier);
1672          }
1673        } else {
1674          get.addFamily(familyName);
1675        }
1676      }
1677      if (opts.filterAll) {
1678        get.setFilter(new FilterAllFilter());
1679      }
1680      try {
1681        updateValueSize(table.get(get).get());
1682      } catch (ExecutionException e) {
1683        throw new IOException(e);
1684      }
1685      return true;
1686    }
1687  }
1688
1689  static class AsyncSequentialWriteTest extends AsyncTableTest {
1690    private ArrayList<Put> puts;
1691
1692    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1693      super(con, options, status);
1694      if (opts.multiPut > 0) {
1695        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1696        this.puts = new ArrayList<>(opts.multiPut);
1697      }
1698    }
1699
1700    protected byte[] generateRow(final int i) {
1701      return format(i);
1702    }
1703
1704    @Override
1705    @SuppressWarnings("ReturnValueIgnored")
1706    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1707      byte[] row = generateRow(i);
1708      Put put = new Put(row);
1709      for (int family = 0; family < opts.families; family++) {
1710        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1711        for (int column = 0; column < opts.columns; column++) {
1712          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1713          byte[] value = generateData(this.rand, getValueLength(this.rand));
1714          if (opts.useTags) {
1715            byte[] tag = generateData(this.rand, TAG_LENGTH);
1716            Tag[] tags = new Tag[opts.noOfTags];
1717            for (int n = 0; n < opts.noOfTags; n++) {
1718              Tag t = new ArrayBackedTag((byte) n, tag);
1719              tags[n] = t;
1720            }
1721            KeyValue kv =
1722              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1723            put.add(kv);
1724            updateValueSize(kv.getValueLength());
1725          } else {
1726            put.addColumn(familyName, qualifier, value);
1727            updateValueSize(value.length);
1728          }
1729        }
1730      }
1731      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1732      try {
1733        table.put(put).get();
1734        if (opts.multiPut > 0) {
1735          this.puts.add(put);
1736          if (this.puts.size() == opts.multiPut) {
1737            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1738            this.puts.clear();
1739          } else {
1740            return false;
1741          }
1742        } else {
1743          table.put(put).get();
1744        }
1745      } catch (ExecutionException e) {
1746        throw new IOException(e);
1747      }
1748      return true;
1749    }
1750  }
1751
1752  static abstract class BufferedMutatorTest extends Test {
1753    protected BufferedMutator mutator;
1754    protected Table table;
1755
1756    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1757      super(con, options, status);
1758    }
1759
1760    @Override
1761    void onStartup() throws IOException {
1762      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1763      p.writeBufferSize(opts.bufferSize);
1764      this.mutator = connection.getBufferedMutator(p);
1765      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1766    }
1767
1768    @Override
1769    void onTakedown() throws IOException {
1770      mutator.close();
1771      table.close();
1772    }
1773  }
1774
1775  static class RandomSeekScanTest extends TableTest {
1776    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1777      super(con, options, status);
1778    }
1779
1780    @Override
1781    boolean testRow(final int i, final long startTime) throws IOException {
1782      Scan scan =
1783        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1784          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1785          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1786      FilterList list = new FilterList();
1787      for (int family = 0; family < opts.families; family++) {
1788        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1789        if (opts.addColumns) {
1790          for (int column = 0; column < opts.columns; column++) {
1791            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1792            scan.addColumn(familyName, qualifier);
1793          }
1794        } else {
1795          scan.addFamily(familyName);
1796        }
1797      }
1798      if (opts.filterAll) {
1799        list.addFilter(new FilterAllFilter());
1800      }
1801      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1802      scan.setFilter(list);
1803      ResultScanner s = this.table.getScanner(scan);
1804      try {
1805        for (Result rr; (rr = s.next()) != null;) {
1806          updateValueSize(rr);
1807        }
1808      } finally {
1809        updateScanMetrics(s.getScanMetrics());
1810        s.close();
1811      }
1812      return true;
1813    }
1814
1815    @Override
1816    protected int getReportingPeriod() {
1817      int period = opts.perClientRunRows / 100;
1818      return period == 0 ? opts.perClientRunRows : period;
1819    }
1820
1821  }
1822
1823  static abstract class RandomScanWithRangeTest extends TableTest {
1824    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1825      super(con, options, status);
1826    }
1827
1828    @Override
1829    boolean testRow(final int i, final long startTime) throws IOException {
1830      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1831      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1832        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1833        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1834        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1835      for (int family = 0; family < opts.families; family++) {
1836        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1837        if (opts.addColumns) {
1838          for (int column = 0; column < opts.columns; column++) {
1839            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1840            scan.addColumn(familyName, qualifier);
1841          }
1842        } else {
1843          scan.addFamily(familyName);
1844        }
1845      }
1846      if (opts.filterAll) {
1847        scan.setFilter(new FilterAllFilter());
1848      }
1849      Result r = null;
1850      int count = 0;
1851      ResultScanner s = this.table.getScanner(scan);
1852      try {
1853        for (; (r = s.next()) != null;) {
1854          updateValueSize(r);
1855          count++;
1856        }
1857        if (i % 100 == 0) {
1858          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1859            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1860            count));
1861        }
1862      } finally {
1863        updateScanMetrics(s.getScanMetrics());
1864        s.close();
1865      }
1866      return true;
1867    }
1868
1869    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1870
1871    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1872      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1873      int stop = start + maxRange;
1874      return new Pair<>(format(start), format(stop));
1875    }
1876
1877    @Override
1878    protected int getReportingPeriod() {
1879      int period = opts.perClientRunRows / 100;
1880      return period == 0 ? opts.perClientRunRows : period;
1881    }
1882  }
1883
1884  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1885    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1886      super(con, options, status);
1887    }
1888
1889    @Override
1890    protected Pair<byte[], byte[]> getStartAndStopRow() {
1891      return generateStartAndStopRows(10);
1892    }
1893  }
1894
1895  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1896    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1897      super(con, options, status);
1898    }
1899
1900    @Override
1901    protected Pair<byte[], byte[]> getStartAndStopRow() {
1902      return generateStartAndStopRows(100);
1903    }
1904  }
1905
1906  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1907    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1908      super(con, options, status);
1909    }
1910
1911    @Override
1912    protected Pair<byte[], byte[]> getStartAndStopRow() {
1913      return generateStartAndStopRows(1000);
1914    }
1915  }
1916
1917  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1918    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1919      super(con, options, status);
1920    }
1921
1922    @Override
1923    protected Pair<byte[], byte[]> getStartAndStopRow() {
1924      return generateStartAndStopRows(10000);
1925    }
1926  }
1927
1928  static class RandomReadTest extends TableTest {
1929    private final Consistency consistency;
1930    private ArrayList<Get> gets;
1931
1932    RandomReadTest(Connection con, TestOptions options, Status status) {
1933      super(con, options, status);
1934      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1935      if (opts.multiGet > 0) {
1936        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1937        this.gets = new ArrayList<>(opts.multiGet);
1938      }
1939    }
1940
1941    @Override
1942    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1943      if (opts.randomSleep > 0) {
1944        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1945      }
1946      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1947      for (int family = 0; family < opts.families; family++) {
1948        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1949        if (opts.addColumns) {
1950          for (int column = 0; column < opts.columns; column++) {
1951            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1952            get.addColumn(familyName, qualifier);
1953          }
1954        } else {
1955          get.addFamily(familyName);
1956        }
1957      }
1958      if (opts.filterAll) {
1959        get.setFilter(new FilterAllFilter());
1960      }
1961      get.setConsistency(consistency);
1962      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1963      if (opts.multiGet > 0) {
1964        this.gets.add(get);
1965        if (this.gets.size() == opts.multiGet) {
1966          Result[] rs = this.table.get(this.gets);
1967          if (opts.replicas > 1) {
1968            long latency = System.nanoTime() - startTime;
1969            updateValueSize(rs, latency);
1970          } else {
1971            updateValueSize(rs);
1972          }
1973          this.gets.clear();
1974        } else {
1975          return false;
1976        }
1977      } else {
1978        if (opts.replicas > 1) {
1979          Result r = this.table.get(get);
1980          long latency = System.nanoTime() - startTime;
1981          updateValueSize(r, latency);
1982        } else {
1983          updateValueSize(this.table.get(get));
1984        }
1985      }
1986      return true;
1987    }
1988
1989    @Override
1990    protected int getReportingPeriod() {
1991      int period = opts.perClientRunRows / 10;
1992      return period == 0 ? opts.perClientRunRows : period;
1993    }
1994
1995    @Override
1996    protected void testTakedown() throws IOException {
1997      if (this.gets != null && this.gets.size() > 0) {
1998        this.table.get(gets);
1999        this.gets.clear();
2000      }
2001      super.testTakedown();
2002    }
2003  }
2004
2005  /*
2006   * Send random reads against fake regions inserted by MetaWriteTest
2007   */
2008  static class MetaRandomReadTest extends MetaTest {
2009    private RegionLocator regionLocator;
2010
2011    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2012      super(con, options, status);
2013      LOG.info("call getRegionLocation");
2014    }
2015
2016    @Override
2017    void onStartup() throws IOException {
2018      super.onStartup();
2019      this.regionLocator = connection.getRegionLocator(table.getName());
2020    }
2021
2022    @Override
2023    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2024      if (opts.randomSleep > 0) {
2025        Thread.sleep(rand.nextInt(opts.randomSleep));
2026      }
2027      HRegionLocation hRegionLocation =
2028        regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true);
2029      LOG.debug("get location for region: " + hRegionLocation);
2030      return true;
2031    }
2032
2033    @Override
2034    protected int getReportingPeriod() {
2035      int period = opts.perClientRunRows / 10;
2036      return period == 0 ? opts.perClientRunRows : period;
2037    }
2038
2039    @Override
2040    protected void testTakedown() throws IOException {
2041      super.testTakedown();
2042    }
2043  }
2044
2045  static class RandomWriteTest extends SequentialWriteTest {
2046    RandomWriteTest(Connection con, TestOptions options, Status status) {
2047      super(con, options, status);
2048    }
2049
2050    @Override
2051    protected byte[] generateRow(final int i) {
2052      return getRandomRow(this.rand, opts.totalRows);
2053    }
2054
2055  }
2056
2057  static class ScanTest extends TableTest {
2058    private ResultScanner testScanner;
2059
2060    ScanTest(Connection con, TestOptions options, Status status) {
2061      super(con, options, status);
2062    }
2063
2064    @Override
2065    void testTakedown() throws IOException {
2066      if (this.testScanner != null) {
2067        this.testScanner.close();
2068      }
2069      super.testTakedown();
2070    }
2071
2072    @Override
2073    boolean testRow(final int i, final long startTime) throws IOException {
2074      if (this.testScanner == null) {
2075        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2076          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2077          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2078        for (int family = 0; family < opts.families; family++) {
2079          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2080          if (opts.addColumns) {
2081            for (int column = 0; column < opts.columns; column++) {
2082              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2083              scan.addColumn(familyName, qualifier);
2084            }
2085          } else {
2086            scan.addFamily(familyName);
2087          }
2088        }
2089        if (opts.filterAll) {
2090          scan.setFilter(new FilterAllFilter());
2091        }
2092        this.testScanner = table.getScanner(scan);
2093      }
2094      Result r = testScanner.next();
2095      updateValueSize(r);
2096      return true;
2097    }
2098  }
2099
2100  /**
2101   * Base class for operations that are CAS-like; that read a value and then set it based off what
2102   * they read. In this category is increment, append, checkAndPut, etc.
2103   * <p>
2104   * These operations also want some concurrency going on. Usually when these tests run, they
2105   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2106   * same key space. We do this with our getStartRow and getLastRow overrides.
2107   */
2108  static abstract class CASTableTest extends TableTest {
2109    private final byte[] qualifier;
2110
2111    CASTableTest(Connection con, TestOptions options, Status status) {
2112      super(con, options, status);
2113      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2114    }
2115
2116    byte[] getQualifier() {
2117      return this.qualifier;
2118    }
2119
2120    @Override
2121    int getStartRow() {
2122      return 0;
2123    }
2124
2125    @Override
2126    int getLastRow() {
2127      return opts.perClientRunRows;
2128    }
2129  }
2130
2131  static class IncrementTest extends CASTableTest {
2132    IncrementTest(Connection con, TestOptions options, Status status) {
2133      super(con, options, status);
2134    }
2135
2136    @Override
2137    boolean testRow(final int i, final long startTime) throws IOException {
2138      Increment increment = new Increment(format(i));
2139      // unlike checkAndXXX tests, which make most sense to do on a single value,
2140      // if multiple families are specified for an increment test we assume it is
2141      // meant to raise the work factor
2142      for (int family = 0; family < opts.families; family++) {
2143        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2144        increment.addColumn(familyName, getQualifier(), 1l);
2145      }
2146      updateValueSize(this.table.increment(increment));
2147      return true;
2148    }
2149  }
2150
2151  static class AppendTest extends CASTableTest {
2152    AppendTest(Connection con, TestOptions options, Status status) {
2153      super(con, options, status);
2154    }
2155
2156    @Override
2157    boolean testRow(final int i, final long startTime) throws IOException {
2158      byte[] bytes = format(i);
2159      Append append = new Append(bytes);
2160      // unlike checkAndXXX tests, which make most sense to do on a single value,
2161      // if multiple families are specified for an append test we assume it is
2162      // meant to raise the work factor
2163      for (int family = 0; family < opts.families; family++) {
2164        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2165        append.addColumn(familyName, getQualifier(), bytes);
2166      }
2167      updateValueSize(this.table.append(append));
2168      return true;
2169    }
2170  }
2171
2172  static class CheckAndMutateTest extends CASTableTest {
2173    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2174      super(con, options, status);
2175    }
2176
2177    @Override
2178    boolean testRow(final int i, final long startTime) throws IOException {
2179      final byte[] bytes = format(i);
2180      // checkAndXXX tests operate on only a single value
2181      // Put a known value so when we go to check it, it is there.
2182      Put put = new Put(bytes);
2183      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2184      this.table.put(put);
2185      RowMutations mutations = new RowMutations(bytes);
2186      mutations.add(put);
2187      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2188        .thenMutate(mutations);
2189      return true;
2190    }
2191  }
2192
2193  static class CheckAndPutTest extends CASTableTest {
2194    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2195      super(con, options, status);
2196    }
2197
2198    @Override
2199    boolean testRow(final int i, final long startTime) throws IOException {
2200      final byte[] bytes = format(i);
2201      // checkAndXXX tests operate on only a single value
2202      // Put a known value so when we go to check it, it is there.
2203      Put put = new Put(bytes);
2204      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2205      this.table.put(put);
2206      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2207        .thenPut(put);
2208      return true;
2209    }
2210  }
2211
2212  static class CheckAndDeleteTest extends CASTableTest {
2213    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2214      super(con, options, status);
2215    }
2216
2217    @Override
2218    boolean testRow(final int i, final long startTime) throws IOException {
2219      final byte[] bytes = format(i);
2220      // checkAndXXX tests operate on only a single value
2221      // Put a known value so when we go to check it, it is there.
2222      Put put = new Put(bytes);
2223      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2224      this.table.put(put);
2225      Delete delete = new Delete(put.getRow());
2226      delete.addColumn(FAMILY_ZERO, getQualifier());
2227      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2228        .thenDelete(delete);
2229      return true;
2230    }
2231  }
2232
2233  /*
2234   * Delete all fake regions inserted to meta table by MetaWriteTest.
2235   */
2236  static class CleanMetaTest extends MetaTest {
2237    CleanMetaTest(Connection con, TestOptions options, Status status) {
2238      super(con, options, status);
2239    }
2240
2241    @Override
2242    boolean testRow(final int i, final long startTime) throws IOException {
2243      try {
2244        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2245          .getRegionLocation(getSplitKey(i), false).getRegion();
2246        LOG.debug("deleting region from meta: " + regionInfo);
2247
2248        Delete delete =
2249          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2250        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2251          t.delete(delete);
2252        }
2253      } catch (IOException ie) {
2254        // Log and continue
2255        LOG.error("cannot find region with start key: " + i);
2256      }
2257      return true;
2258    }
2259  }
2260
2261  static class SequentialReadTest extends TableTest {
2262    SequentialReadTest(Connection con, TestOptions options, Status status) {
2263      super(con, options, status);
2264    }
2265
2266    @Override
2267    boolean testRow(final int i, final long startTime) throws IOException {
2268      Get get = new Get(format(i));
2269      for (int family = 0; family < opts.families; family++) {
2270        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2271        if (opts.addColumns) {
2272          for (int column = 0; column < opts.columns; column++) {
2273            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2274            get.addColumn(familyName, qualifier);
2275          }
2276        } else {
2277          get.addFamily(familyName);
2278        }
2279      }
2280      if (opts.filterAll) {
2281        get.setFilter(new FilterAllFilter());
2282      }
2283      updateValueSize(table.get(get));
2284      return true;
2285    }
2286  }
2287
2288  static class SequentialWriteTest extends BufferedMutatorTest {
2289    private ArrayList<Put> puts;
2290
2291    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2292      super(con, options, status);
2293      if (opts.multiPut > 0) {
2294        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2295        this.puts = new ArrayList<>(opts.multiPut);
2296      }
2297    }
2298
2299    protected byte[] generateRow(final int i) {
2300      return format(i);
2301    }
2302
2303    @Override
2304    boolean testRow(final int i, final long startTime) throws IOException {
2305      byte[] row = generateRow(i);
2306      Put put = new Put(row);
2307      for (int family = 0; family < opts.families; family++) {
2308        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2309        for (int column = 0; column < opts.columns; column++) {
2310          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2311          byte[] value = generateData(this.rand, getValueLength(this.rand));
2312          if (opts.useTags) {
2313            byte[] tag = generateData(this.rand, TAG_LENGTH);
2314            Tag[] tags = new Tag[opts.noOfTags];
2315            for (int n = 0; n < opts.noOfTags; n++) {
2316              Tag t = new ArrayBackedTag((byte) n, tag);
2317              tags[n] = t;
2318            }
2319            KeyValue kv =
2320              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2321            put.add(kv);
2322            updateValueSize(kv.getValueLength());
2323          } else {
2324            put.addColumn(familyName, qualifier, value);
2325            updateValueSize(value.length);
2326          }
2327        }
2328      }
2329      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2330      if (opts.autoFlush) {
2331        if (opts.multiPut > 0) {
2332          this.puts.add(put);
2333          if (this.puts.size() == opts.multiPut) {
2334            table.put(this.puts);
2335            this.puts.clear();
2336          } else {
2337            return false;
2338          }
2339        } else {
2340          table.put(put);
2341        }
2342      } else {
2343        mutator.mutate(put);
2344      }
2345      return true;
2346    }
2347  }
2348
2349  /*
2350   * Insert fake regions into meta table with contiguous split keys.
2351   */
2352  static class MetaWriteTest extends MetaTest {
2353
2354    MetaWriteTest(Connection con, TestOptions options, Status status) {
2355      super(con, options, status);
2356    }
2357
2358    @Override
2359    boolean testRow(final int i, final long startTime) throws IOException {
2360      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2361      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2362        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2363      regionInfos.add(regionInfo);
2364      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2365
2366      // write the serverName columns
2367      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2368        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2369        EnvironmentEdgeManager.currentTime());
2370      return true;
2371    }
2372  }
2373
2374  static class FilteredScanTest extends TableTest {
2375    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2376
2377    FilteredScanTest(Connection con, TestOptions options, Status status) {
2378      super(con, options, status);
2379      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2380        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2381          + ". This could take a very long time.");
2382      }
2383    }
2384
2385    @Override
2386    boolean testRow(int i, final long startTime) throws IOException {
2387      byte[] value = generateData(this.rand, getValueLength(this.rand));
2388      Scan scan = constructScan(value);
2389      ResultScanner scanner = null;
2390      try {
2391        scanner = this.table.getScanner(scan);
2392        for (Result r = null; (r = scanner.next()) != null;) {
2393          updateValueSize(r);
2394        }
2395      } finally {
2396        if (scanner != null) {
2397          updateScanMetrics(scanner.getScanMetrics());
2398          scanner.close();
2399        }
2400      }
2401      return true;
2402    }
2403
2404    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2405      FilterList list = new FilterList();
2406      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2407        new BinaryComparator(valuePrefix));
2408      list.addFilter(filter);
2409      if (opts.filterAll) {
2410        list.addFilter(new FilterAllFilter());
2411      }
2412      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2413        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2414        .setScanMetricsEnabled(true);
2415      if (opts.addColumns) {
2416        for (int column = 0; column < opts.columns; column++) {
2417          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2418          scan.addColumn(FAMILY_ZERO, qualifier);
2419        }
2420      } else {
2421        scan.addFamily(FAMILY_ZERO);
2422      }
2423      scan.setFilter(list);
2424      return scan;
2425    }
2426  }
2427
2428  /**
2429   * Compute a throughput rate in MB/s.
2430   * @param rows   Number of records consumed.
2431   * @param timeMs Time taken in milliseconds.
2432   * @return String value with label, ie '123.76 MB/s'
2433   */
2434  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2435    int columns) {
2436    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2437      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2438    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2439      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2440    return FMT.format(mbps) + " MB/s";
2441  }
2442
2443  /*
2444   * Format passed integer. n * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version
2445   * of passed number (Does absolute in case number is negative).
2446   */
2447  public static byte[] format(final int number) {
2448    byte[] b = new byte[ROW_LENGTH];
2449    int d = Math.abs(number);
2450    for (int i = b.length - 1; i >= 0; i--) {
2451      b[i] = (byte) ((d % 10) + '0');
2452      d /= 10;
2453    }
2454    return b;
2455  }
2456
2457  /*
2458   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2459   * test, generation of the key and value consumes about 30% of CPU time.
2460   * @return Generated random value to insert into a table cell.
2461   */
2462  public static byte[] generateData(final Random r, int length) {
2463    byte[] b = new byte[length];
2464    int i;
2465
2466    for (i = 0; i < (length - 8); i += 8) {
2467      b[i] = (byte) (65 + r.nextInt(26));
2468      b[i + 1] = b[i];
2469      b[i + 2] = b[i];
2470      b[i + 3] = b[i];
2471      b[i + 4] = b[i];
2472      b[i + 5] = b[i];
2473      b[i + 6] = b[i];
2474      b[i + 7] = b[i];
2475    }
2476
2477    byte a = (byte) (65 + r.nextInt(26));
2478    for (; i < length; i++) {
2479      b[i] = a;
2480    }
2481    return b;
2482  }
2483
2484  static byte[] getRandomRow(final Random random, final int totalRows) {
2485    return format(generateRandomRow(random, totalRows));
2486  }
2487
2488  static int generateRandomRow(final Random random, final int totalRows) {
2489    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2490  }
2491
2492  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2493    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2494    throws IOException, InterruptedException {
2495    status.setStatus(
2496      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2497    long totalElapsedTime;
2498
2499    final TestBase t;
2500    try {
2501      if (AsyncTest.class.isAssignableFrom(cmd)) {
2502        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2503        Constructor<? extends AsyncTest> constructor =
2504          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2505        t = constructor.newInstance(asyncCon, opts, status);
2506      } else {
2507        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2508        Constructor<? extends Test> constructor =
2509          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2510        t = constructor.newInstance(con, opts, status);
2511      }
2512    } catch (NoSuchMethodException e) {
2513      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2514        + ".  It does not provide a constructor as described by "
2515        + "the javadoc comment.  Available constructors are: "
2516        + Arrays.toString(cmd.getConstructors()));
2517    } catch (Exception e) {
2518      throw new IllegalStateException("Failed to construct command class", e);
2519    }
2520    totalElapsedTime = t.test();
2521
2522    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2523      + " for " + opts.perClientRunRows + " rows" + " ("
2524      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2525        getAverageValueLength(opts), opts.families, opts.columns)
2526      + ")");
2527
2528    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2529      t.numOfReplyFromReplica, t.getLatencyHistogram());
2530  }
2531
2532  private static int getAverageValueLength(final TestOptions opts) {
2533    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2534  }
2535
2536  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2537    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2538    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2539    // the TestOptions introspection for us and dump the output in a readable format.
2540    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2541    Admin admin = null;
2542    Connection connection = null;
2543    try {
2544      connection = ConnectionFactory.createConnection(getConf());
2545      admin = connection.getAdmin();
2546      checkTable(admin, opts);
2547    } finally {
2548      if (admin != null) admin.close();
2549      if (connection != null) connection.close();
2550    }
2551    if (opts.nomapred) {
2552      doLocalClients(opts, getConf());
2553    } else {
2554      doMapReduce(opts, getConf());
2555    }
2556  }
2557
2558  protected void printUsage() {
2559    printUsage(PE_COMMAND_SHORTNAME, null);
2560  }
2561
2562  protected static void printUsage(final String message) {
2563    printUsage(PE_COMMAND_SHORTNAME, message);
2564  }
2565
2566  protected static void printUsageAndExit(final String message, final int exitCode) {
2567    printUsage(message);
2568    System.exit(exitCode);
2569  }
2570
2571  protected static void printUsage(final String shortName, final String message) {
2572    if (message != null && message.length() > 0) {
2573      System.err.println(message);
2574    }
2575    System.err.print("Usage: hbase " + shortName);
2576    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2577    System.err.println();
2578    System.err.println("General Options:");
2579    System.err.println(
2580      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2581    System.err
2582      .println(" oneCon          all the threads share the same connection. Default: False");
2583    System.err.println(" connCount          connections all threads share. "
2584      + "For example, if set to 2, then all thread share 2 connection. "
2585      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2586      + "if not, connCount=thread number");
2587
2588    System.err.println(" sampleRate      Execute test on a sample of total "
2589      + "rows. Only supported by randomRead. Default: 1.0");
2590    System.err.println(" period          Report every 'period' rows: "
2591      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2592    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2593    System.err.println(
2594      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2595    System.err.println(" latency         Set to report operation latencies. Default: False");
2596    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2597      + "over lantencyThreshold, unit in millisecond, default 0");
2598    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2599      + " rows have been treated. Default: 0");
2600    System.err
2601      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2602    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2603      + "'valueSize'; set on read for stats on size: Default: Not set.");
2604    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2605      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2606    System.err.println();
2607    System.err.println("Table Creation / Write Tests:");
2608    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2609    System.err.println(
2610      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2611        + ".  In case of randomReads and randomSeekScans this could"
2612        + " be specified along with --size to specify the number of rows to be scanned within"
2613        + " the total range specified by the size.");
2614    System.err.println(
2615      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2616        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2617        + " use size to specify the end range and --rows"
2618        + " specifies the number of rows within that range. " + "Default: 1.0.");
2619    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2620    System.err.println(
2621      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2622    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2623      + "'valueSize' in zipf form: Default: Not set.");
2624    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2625    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2626    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2627      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2628    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2629      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2630      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2631    System.err.println(
2632      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2633    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2634      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2635    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2636    System.err.println(" columns         Columns to write per row. Default: 1");
2637    System.err
2638      .println(" families        Specify number of column families for the table. Default: 1");
2639    System.err.println();
2640    System.err.println("Read Tests:");
2641    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2642      + " there by not returning any thing back to the client.  Helps to check the server side"
2643      + " performance.  Uses FilterAllFilter internally. ");
2644    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2645      + "by randomRead. Default: disabled");
2646    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2647      + "inmemory as far as possible. Not guaranteed that reads are always served "
2648      + "from memory.  Default: false");
2649    System.err
2650      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2651    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2652    System.err
2653      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2654        + "Uses the CompactingMemstore");
2655    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2656    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2657    System.err.println(
2658      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2659    System.err.println(" caching         Scan caching to use. Default: 30");
2660    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2661    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2662    System.err.println(
2663      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2664    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2665    System.err.println();
2666    System.err.println(" Note: -D properties will be applied to the conf used. ");
2667    System.err.println("  For example: ");
2668    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2669    System.err.println("   -Dmapreduce.task.timeout=60000");
2670    System.err.println();
2671    System.err.println("Command:");
2672    for (CmdDescriptor command : COMMANDS.values()) {
2673      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2674    }
2675    System.err.println();
2676    System.err.println("Args:");
2677    System.err.println(" nclients        Integer. Required. Total number of clients "
2678      + "(and HRegionServers) running. 1 <= value <= 500");
2679    System.err.println("Examples:");
2680    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2681    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2682    System.err.println(" To run 10 clients doing increments over ten rows:");
2683    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2684  }
2685
2686  /**
2687   * Parse options passed in via an arguments array. Assumes that array has been split on
2688   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2689   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2690   * arguments.
2691   */
2692  static TestOptions parseOpts(Queue<String> args) {
2693    TestOptions opts = new TestOptions();
2694
2695    String cmd = null;
2696    while ((cmd = args.poll()) != null) {
2697      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2698        // place item back onto queue so that caller knows parsing was incomplete
2699        args.add(cmd);
2700        break;
2701      }
2702
2703      final String nmr = "--nomapred";
2704      if (cmd.startsWith(nmr)) {
2705        opts.nomapred = true;
2706        continue;
2707      }
2708
2709      final String rows = "--rows=";
2710      if (cmd.startsWith(rows)) {
2711        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2712        continue;
2713      }
2714
2715      final String cycles = "--cycles=";
2716      if (cmd.startsWith(cycles)) {
2717        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2718        continue;
2719      }
2720
2721      final String sampleRate = "--sampleRate=";
2722      if (cmd.startsWith(sampleRate)) {
2723        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2724        continue;
2725      }
2726
2727      final String table = "--table=";
2728      if (cmd.startsWith(table)) {
2729        opts.tableName = cmd.substring(table.length());
2730        continue;
2731      }
2732
2733      final String startRow = "--startRow=";
2734      if (cmd.startsWith(startRow)) {
2735        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2736        continue;
2737      }
2738
2739      final String compress = "--compress=";
2740      if (cmd.startsWith(compress)) {
2741        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2742        continue;
2743      }
2744
2745      final String traceRate = "--traceRate=";
2746      if (cmd.startsWith(traceRate)) {
2747        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2748        continue;
2749      }
2750
2751      final String blockEncoding = "--blockEncoding=";
2752      if (cmd.startsWith(blockEncoding)) {
2753        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2754        continue;
2755      }
2756
2757      final String flushCommits = "--flushCommits=";
2758      if (cmd.startsWith(flushCommits)) {
2759        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2760        continue;
2761      }
2762
2763      final String writeToWAL = "--writeToWAL=";
2764      if (cmd.startsWith(writeToWAL)) {
2765        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2766        continue;
2767      }
2768
2769      final String presplit = "--presplit=";
2770      if (cmd.startsWith(presplit)) {
2771        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2772        continue;
2773      }
2774
2775      final String inMemory = "--inmemory=";
2776      if (cmd.startsWith(inMemory)) {
2777        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2778        continue;
2779      }
2780
2781      final String autoFlush = "--autoFlush=";
2782      if (cmd.startsWith(autoFlush)) {
2783        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2784        continue;
2785      }
2786
2787      final String onceCon = "--oneCon=";
2788      if (cmd.startsWith(onceCon)) {
2789        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2790        continue;
2791      }
2792
2793      final String connCount = "--connCount=";
2794      if (cmd.startsWith(connCount)) {
2795        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2796        continue;
2797      }
2798
2799      final String latencyThreshold = "--latencyThreshold=";
2800      if (cmd.startsWith(latencyThreshold)) {
2801        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2802        continue;
2803      }
2804
2805      final String latency = "--latency";
2806      if (cmd.startsWith(latency)) {
2807        opts.reportLatency = true;
2808        continue;
2809      }
2810
2811      final String multiGet = "--multiGet=";
2812      if (cmd.startsWith(multiGet)) {
2813        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2814        continue;
2815      }
2816
2817      final String multiPut = "--multiPut=";
2818      if (cmd.startsWith(multiPut)) {
2819        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2820        continue;
2821      }
2822
2823      final String useTags = "--usetags=";
2824      if (cmd.startsWith(useTags)) {
2825        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2826        continue;
2827      }
2828
2829      final String noOfTags = "--numoftags=";
2830      if (cmd.startsWith(noOfTags)) {
2831        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2832        continue;
2833      }
2834
2835      final String replicas = "--replicas=";
2836      if (cmd.startsWith(replicas)) {
2837        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2838        continue;
2839      }
2840
2841      final String filterOutAll = "--filterAll";
2842      if (cmd.startsWith(filterOutAll)) {
2843        opts.filterAll = true;
2844        continue;
2845      }
2846
2847      final String size = "--size=";
2848      if (cmd.startsWith(size)) {
2849        opts.size = Float.parseFloat(cmd.substring(size.length()));
2850        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2851        continue;
2852      }
2853
2854      final String splitPolicy = "--splitPolicy=";
2855      if (cmd.startsWith(splitPolicy)) {
2856        opts.splitPolicy = cmd.substring(splitPolicy.length());
2857        continue;
2858      }
2859
2860      final String randomSleep = "--randomSleep=";
2861      if (cmd.startsWith(randomSleep)) {
2862        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2863        continue;
2864      }
2865
2866      final String measureAfter = "--measureAfter=";
2867      if (cmd.startsWith(measureAfter)) {
2868        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2869        continue;
2870      }
2871
2872      final String bloomFilter = "--bloomFilter=";
2873      if (cmd.startsWith(bloomFilter)) {
2874        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2875        continue;
2876      }
2877
2878      final String blockSize = "--blockSize=";
2879      if (cmd.startsWith(blockSize)) {
2880        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2881        continue;
2882      }
2883
2884      final String valueSize = "--valueSize=";
2885      if (cmd.startsWith(valueSize)) {
2886        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2887        continue;
2888      }
2889
2890      final String valueRandom = "--valueRandom";
2891      if (cmd.startsWith(valueRandom)) {
2892        opts.valueRandom = true;
2893        continue;
2894      }
2895
2896      final String valueZipf = "--valueZipf";
2897      if (cmd.startsWith(valueZipf)) {
2898        opts.valueZipf = true;
2899        continue;
2900      }
2901
2902      final String period = "--period=";
2903      if (cmd.startsWith(period)) {
2904        opts.period = Integer.parseInt(cmd.substring(period.length()));
2905        continue;
2906      }
2907
2908      final String addColumns = "--addColumns=";
2909      if (cmd.startsWith(addColumns)) {
2910        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2911        continue;
2912      }
2913
2914      final String inMemoryCompaction = "--inmemoryCompaction=";
2915      if (cmd.startsWith(inMemoryCompaction)) {
2916        opts.inMemoryCompaction =
2917          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2918        continue;
2919      }
2920
2921      final String columns = "--columns=";
2922      if (cmd.startsWith(columns)) {
2923        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2924        continue;
2925      }
2926
2927      final String families = "--families=";
2928      if (cmd.startsWith(families)) {
2929        opts.families = Integer.parseInt(cmd.substring(families.length()));
2930        continue;
2931      }
2932
2933      final String caching = "--caching=";
2934      if (cmd.startsWith(caching)) {
2935        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2936        continue;
2937      }
2938
2939      final String asyncPrefetch = "--asyncPrefetch";
2940      if (cmd.startsWith(asyncPrefetch)) {
2941        opts.asyncPrefetch = true;
2942        continue;
2943      }
2944
2945      final String cacheBlocks = "--cacheBlocks=";
2946      if (cmd.startsWith(cacheBlocks)) {
2947        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2948        continue;
2949      }
2950
2951      final String scanReadType = "--scanReadType=";
2952      if (cmd.startsWith(scanReadType)) {
2953        opts.scanReadType =
2954          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2955        continue;
2956      }
2957
2958      final String bufferSize = "--bufferSize=";
2959      if (cmd.startsWith(bufferSize)) {
2960        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2961        continue;
2962      }
2963
2964      validateParsedOpts(opts);
2965
2966      if (isCommandClass(cmd)) {
2967        opts.cmdName = cmd;
2968        try {
2969          opts.numClientThreads = Integer.parseInt(args.remove());
2970        } catch (NoSuchElementException | NumberFormatException e) {
2971          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2972        }
2973        opts = calculateRowsAndSize(opts);
2974        break;
2975      } else {
2976        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2977      }
2978
2979      // Not matching any option or command.
2980      System.err.println("Error: Wrong option or command: " + cmd);
2981      args.add(cmd);
2982      break;
2983    }
2984    return opts;
2985  }
2986
2987  /**
2988   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
2989   */
2990  private static void validateParsedOpts(TestOptions opts) {
2991
2992    if (!opts.autoFlush && opts.multiPut > 0) {
2993      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2994    }
2995
2996    if (opts.oneCon && opts.connCount > 1) {
2997      throw new IllegalArgumentException(
2998        "oneCon is set to true, " + "connCount should not bigger than 1");
2999    }
3000
3001    if (opts.valueZipf && opts.valueRandom) {
3002      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3003    }
3004  }
3005
3006  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3007    int rowsPerGB = getRowsPerGB(opts);
3008    if (
3009      (opts.getCmdName() != null
3010        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3011        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3012    ) {
3013      opts.totalRows = (int) opts.size * rowsPerGB;
3014    } else if (opts.size != DEFAULT_OPTS.size) {
3015      // total size in GB specified
3016      opts.totalRows = (int) opts.size * rowsPerGB;
3017      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3018    } else {
3019      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3020      opts.size = opts.totalRows / rowsPerGB;
3021    }
3022    return opts;
3023  }
3024
3025  static int getRowsPerGB(final TestOptions opts) {
3026    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3027      * opts.getColumns());
3028  }
3029
3030  @Override
3031  public int run(String[] args) throws Exception {
3032    // Process command-line args. TODO: Better cmd-line processing
3033    // (but hopefully something not as painful as cli options).
3034    int errCode = -1;
3035    if (args.length < 1) {
3036      printUsage();
3037      return errCode;
3038    }
3039
3040    try {
3041      LinkedList<String> argv = new LinkedList<>();
3042      argv.addAll(Arrays.asList(args));
3043      TestOptions opts = parseOpts(argv);
3044
3045      // args remaining, print help and exit
3046      if (!argv.isEmpty()) {
3047        errCode = 0;
3048        printUsage();
3049        return errCode;
3050      }
3051
3052      // must run at least 1 client
3053      if (opts.numClientThreads <= 0) {
3054        throw new IllegalArgumentException("Number of clients must be > 0");
3055      }
3056
3057      // cmdName should not be null, print help and exit
3058      if (opts.cmdName == null) {
3059        printUsage();
3060        return errCode;
3061      }
3062
3063      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3064      if (cmdClass != null) {
3065        runTest(cmdClass, opts);
3066        errCode = 0;
3067      }
3068
3069    } catch (Exception e) {
3070      e.printStackTrace();
3071    }
3072
3073    return errCode;
3074  }
3075
3076  private static boolean isCommandClass(String cmd) {
3077    return COMMANDS.containsKey(cmd);
3078  }
3079
3080  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3081    CmdDescriptor descriptor = COMMANDS.get(cmd);
3082    return descriptor != null ? descriptor.getCmdClass() : null;
3083  }
3084
3085  public static void main(final String[] args) throws Exception {
3086    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3087    System.exit(res);
3088  }
3089}