001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Queue;
040import java.util.Random;
041import java.util.TreeMap;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.ThreadLocalRandom;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.AsyncConnection;
056import org.apache.hadoop.hbase.client.AsyncTable;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Consistency;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.RowMutations;
074import org.apache.hadoop.hbase.client.Scan;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
079import org.apache.hadoop.hbase.filter.BinaryComparator;
080import org.apache.hadoop.hbase.filter.Filter;
081import org.apache.hadoop.hbase.filter.FilterAllFilter;
082import org.apache.hadoop.hbase.filter.FilterList;
083import org.apache.hadoop.hbase.filter.PageFilter;
084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
085import org.apache.hadoop.hbase.filter.WhileMatchFilter;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
091import org.apache.hadoop.hbase.trace.TraceUtil;
092import org.apache.hadoop.hbase.util.ByteArrayHashKey;
093import org.apache.hadoop.hbase.util.Bytes;
094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
095import org.apache.hadoop.hbase.util.GsonUtil;
096import org.apache.hadoop.hbase.util.Hash;
097import org.apache.hadoop.hbase.util.MurmurHash;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.hbase.util.RandomDistribution;
100import org.apache.hadoop.hbase.util.YammerHistogramUtils;
101import org.apache.hadoop.io.LongWritable;
102import org.apache.hadoop.io.Text;
103import org.apache.hadoop.mapreduce.Job;
104import org.apache.hadoop.mapreduce.Mapper;
105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
108import org.apache.hadoop.util.Tool;
109import org.apache.hadoop.util.ToolRunner;
110import org.apache.yetus.audience.InterfaceAudience;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113
114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
116import org.apache.hbase.thirdparty.com.google.gson.Gson;
117
118/**
119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
121 * etc.). Pass on the command-line which test to run and how many clients are participating in this
122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
123 * <p>
124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
126 * pages 8-10.
127 * <p>
128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
130 * about 1GB of data, unless specified otherwise.
131 */
132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
133public class PerformanceEvaluation extends Configured implements Tool {
134  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
135  static final String RANDOM_READ = "randomRead";
136  static final String PE_COMMAND_SHORTNAME = "pe";
137  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
138  private static final Gson GSON = GsonUtil.createGson().create();
139
140  public static final String TABLE_NAME = "TestTable";
141  public static final String FAMILY_NAME_BASE = "info";
142  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
143  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
144  public static final int DEFAULT_VALUE_LENGTH = 1000;
145  public static final int ROW_LENGTH = 26;
146
147  private static final int ONE_GB = 1024 * 1024 * 1000;
148  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
149  // TODO : should we make this configurable
150  private static final int TAG_LENGTH = 256;
151  private static final DecimalFormat FMT = new DecimalFormat("0.##");
152  private static final MathContext CXT = MathContext.DECIMAL64;
153  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
154  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
155  private static final TestOptions DEFAULT_OPTS = new TestOptions();
156
157  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
158  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
159
160  static {
161    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
162      "Run async random read test");
163    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
164      "Run async random write test");
165    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
166      "Run async sequential read test");
167    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
168      "Run async sequential write test");
169    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
170    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
171    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
172    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
173      "Run random seek and scan 100 test");
174    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
175      "Run random seek scan with both start and stop row (max 10 rows)");
176    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
177      "Run random seek scan with both start and stop row (max 100 rows)");
178    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
179      "Run random seek scan with both start and stop row (max 1000 rows)");
180    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
181      "Run random seek scan with both start and stop row (max 10000 rows)");
182    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
183    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
184    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
185    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
186      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
187    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
188    addCommandDescriptor(FilteredScanTest.class, "filterScan",
189      "Run scan test using a filter to find a specific row based on it's value "
190        + "(make sure to use --rows=20)");
191    addCommandDescriptor(IncrementTest.class, "increment",
192      "Increment on each row; clients overlap on keyspace so some concurrent operations");
193    addCommandDescriptor(AppendTest.class, "append",
194      "Append on each row; clients overlap on keyspace so some concurrent operations");
195    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
196      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
197    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
198      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
200      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
202      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
203  }
204
205  /**
206   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
207   * associated properties.
208   */
209  protected static enum Counter {
210    /** elapsed time */
211    ELAPSED_TIME,
212    /** number of rows */
213    ROWS
214  }
215
216  protected static class RunResult implements Comparable<RunResult> {
217    public RunResult(long duration, Histogram hist) {
218      this.duration = duration;
219      this.hist = hist;
220      numbOfReplyOverThreshold = 0;
221      numOfReplyFromReplica = 0;
222    }
223
224    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
225      Histogram hist) {
226      this.duration = duration;
227      this.hist = hist;
228      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
229      this.numOfReplyFromReplica = numOfReplyFromReplica;
230    }
231
232    public final long duration;
233    public final Histogram hist;
234    public final long numbOfReplyOverThreshold;
235    public final long numOfReplyFromReplica;
236
237    @Override
238    public String toString() {
239      return Long.toString(duration);
240    }
241
242    @Override
243    public int compareTo(RunResult o) {
244      return Long.compare(this.duration, o.duration);
245    }
246  }
247
248  /**
249   * Constructor
250   * @param conf Configuration object
251   */
252  public PerformanceEvaluation(final Configuration conf) {
253    super(conf);
254  }
255
256  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
257    String description) {
258    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
259    COMMANDS.put(name, cmdDescriptor);
260  }
261
262  /**
263   * Implementations can have their status set.
264   */
265  interface Status {
266    /**
267     * Sets status
268     * @param msg status message
269     */
270    void setStatus(final String msg) throws IOException;
271  }
272
273  /**
274   * MapReduce job that runs a performance evaluation client in each map task.
275   */
276  public static class EvaluationMapTask
277    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
278
279    /** configuration parameter name that contains the command */
280    public final static String CMD_KEY = "EvaluationMapTask.command";
281    /** configuration parameter name that contains the PE impl */
282    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
283
284    private Class<? extends Test> cmd;
285
286    @Override
287    protected void setup(Context context) throws IOException, InterruptedException {
288      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
289
290      // this is required so that extensions of PE are instantiated within the
291      // map reduce task...
292      Class<? extends PerformanceEvaluation> peClass =
293        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
294      try {
295        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
296      } catch (Exception e) {
297        throw new IllegalStateException("Could not instantiate PE instance", e);
298      }
299    }
300
301    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
302      try {
303        return Class.forName(className).asSubclass(type);
304      } catch (ClassNotFoundException e) {
305        throw new IllegalStateException("Could not find class for name: " + className, e);
306      }
307    }
308
309    @Override
310    protected void map(LongWritable key, Text value, final Context context)
311      throws IOException, InterruptedException {
312
313      Status status = new Status() {
314        @Override
315        public void setStatus(String msg) {
316          context.setStatus(msg);
317        }
318      };
319
320      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
321      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
322      final Connection con = ConnectionFactory.createConnection(conf);
323      AsyncConnection asyncCon = null;
324      try {
325        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
326      } catch (ExecutionException e) {
327        throw new IOException(e);
328      }
329
330      // Evaluation task
331      RunResult result =
332        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
333      // Collect how much time the thing took. Report as map output and
334      // to the ELAPSED_TIME counter.
335      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
336      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
337      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
338      context.progress();
339    }
340  }
341
342  /*
343   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
344   * is specified or when the existing table's region replica count doesn't match {@code
345   * opts.replicas}.
346   */
347  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
348    TableName tableName = TableName.valueOf(opts.tableName);
349    boolean needsDelete = false, exists = admin.tableExists(tableName);
350    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
351      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
352    if (!exists && isReadCmd) {
353      throw new IllegalStateException(
354        "Must specify an existing table for read commands. Run a write command first.");
355    }
356    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
357    byte[][] splits = getSplits(opts);
358
359    // recreate the table when user has requested presplit or when existing
360    // {RegionSplitPolicy,replica count} does not match requested, or when the
361    // number of column families does not match requested.
362    if (
363      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
364        || (!isReadCmd && desc != null
365          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
366        || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
367        || (desc != null && desc.getColumnFamilyCount() != opts.families)
368    ) {
369      needsDelete = true;
370      // wait, why did it delete my table?!?
371      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
372        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
373        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
374        .add("replicas", opts.replicas).add("families", opts.families).toString());
375    }
376
377    // remove an existing table
378    if (needsDelete) {
379      if (admin.isTableEnabled(tableName)) {
380        admin.disableTable(tableName);
381      }
382      admin.deleteTable(tableName);
383    }
384
385    // table creation is necessary
386    if (!exists || needsDelete) {
387      desc = getTableDescriptor(opts);
388      if (splits != null) {
389        if (LOG.isDebugEnabled()) {
390          for (int i = 0; i < splits.length; i++) {
391            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
392          }
393        }
394      }
395      if (splits != null) {
396        admin.createTable(desc, splits);
397      } else {
398        admin.createTable(desc);
399      }
400      LOG.info("Table " + desc + " created");
401    }
402    return admin.tableExists(tableName);
403  }
404
405  /**
406   * Create an HTableDescriptor from provided TestOptions.
407   */
408  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
409    TableDescriptorBuilder builder =
410      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
411
412    for (int family = 0; family < opts.families; family++) {
413      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
414      ColumnFamilyDescriptorBuilder cfBuilder =
415        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
416      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
417      cfBuilder.setCompressionType(opts.compression);
418      cfBuilder.setEncryptionType(opts.encryption);
419      cfBuilder.setBloomFilterType(opts.bloomType);
420      cfBuilder.setBlocksize(opts.blockSize);
421      if (opts.inMemoryCF) {
422        cfBuilder.setInMemory(true);
423      }
424      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
425      builder.setColumnFamily(cfBuilder.build());
426    }
427    if (opts.replicas != DEFAULT_OPTS.replicas) {
428      builder.setRegionReplication(opts.replicas);
429    }
430    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
431      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
432    }
433    return builder.build();
434  }
435
436  /**
437   * generates splits based on total number of rows and specified split regions
438   */
439  protected static byte[][] getSplits(TestOptions opts) {
440    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
441
442    int numSplitPoints = opts.presplitRegions - 1;
443    byte[][] splits = new byte[numSplitPoints][];
444    int jump = opts.totalRows / opts.presplitRegions;
445    for (int i = 0; i < numSplitPoints; i++) {
446      int rowkey = jump * (1 + i);
447      splits[i] = format(rowkey);
448    }
449    return splits;
450  }
451
452  static void setupConnectionCount(final TestOptions opts) {
453    if (opts.oneCon) {
454      opts.connCount = 1;
455    } else {
456      if (opts.connCount == -1) {
457        // set to thread number if connCount is not set
458        opts.connCount = opts.numClientThreads;
459      }
460    }
461  }
462
463  /*
464   * Run all clients in this vm each to its own thread.
465   */
466  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
467    throws IOException, InterruptedException, ExecutionException {
468    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
469    assert cmd != null;
470    @SuppressWarnings("unchecked")
471    Future<RunResult>[] threads = new Future[opts.numClientThreads];
472    RunResult[] results = new RunResult[opts.numClientThreads];
473    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
474      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
475    setupConnectionCount(opts);
476    final Connection[] cons = new Connection[opts.connCount];
477    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
478    for (int i = 0; i < opts.connCount; i++) {
479      cons[i] = ConnectionFactory.createConnection(conf);
480      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
481    }
482    LOG
483      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
484    for (int i = 0; i < threads.length; i++) {
485      final int index = i;
486      threads[i] = pool.submit(new Callable<RunResult>() {
487        @Override
488        public RunResult call() throws Exception {
489          TestOptions threadOpts = new TestOptions(opts);
490          final Connection con = cons[index % cons.length];
491          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
492          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
493          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
494            @Override
495            public void setStatus(final String msg) throws IOException {
496              LOG.info(msg);
497            }
498          });
499          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
500            + "ms over " + threadOpts.perClientRunRows + " rows");
501          if (opts.latencyThreshold > 0) {
502            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
503              + "(ms) is " + run.numbOfReplyOverThreshold);
504          }
505          return run;
506        }
507      });
508    }
509    pool.shutdown();
510
511    for (int i = 0; i < threads.length; i++) {
512      try {
513        results[i] = threads[i].get();
514      } catch (ExecutionException e) {
515        throw new IOException(e.getCause());
516      }
517    }
518    final String test = cmd.getSimpleName();
519    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
520    Arrays.sort(results);
521    long total = 0;
522    float avgLatency = 0;
523    float avgTPS = 0;
524    long replicaWins = 0;
525    for (RunResult result : results) {
526      total += result.duration;
527      avgLatency += result.hist.getSnapshot().getMean();
528      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
529      replicaWins += result.numOfReplyFromReplica;
530    }
531    avgTPS *= 1000; // ms to second
532    avgLatency = avgLatency / results.length;
533    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
534      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
535    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
536    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
537    if (opts.replicas > 1) {
538      LOG.info("[results from replica regions] " + replicaWins);
539    }
540
541    for (int i = 0; i < opts.connCount; i++) {
542      cons[i].close();
543      asyncCons[i].close();
544    }
545
546    return results;
547  }
548
549  /*
550   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
551   * out an input file with instruction per client regards which row they are to start on.
552   * @param cmd Command to run.
553   */
554  static Job doMapReduce(TestOptions opts, final Configuration conf)
555    throws IOException, InterruptedException, ClassNotFoundException {
556    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
557    assert cmd != null;
558    Path inputDir = writeInputFile(conf, opts);
559    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
560    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
561    Job job = Job.getInstance(conf);
562    job.setJarByClass(PerformanceEvaluation.class);
563    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
564
565    job.setInputFormatClass(NLineInputFormat.class);
566    NLineInputFormat.setInputPaths(job, inputDir);
567    // this is default, but be explicit about it just in case.
568    NLineInputFormat.setNumLinesPerSplit(job, 1);
569
570    job.setOutputKeyClass(LongWritable.class);
571    job.setOutputValueClass(LongWritable.class);
572
573    job.setMapperClass(EvaluationMapTask.class);
574    job.setReducerClass(LongSumReducer.class);
575
576    job.setNumReduceTasks(1);
577
578    job.setOutputFormatClass(TextOutputFormat.class);
579    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
580
581    TableMapReduceUtil.addDependencyJars(job);
582    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
583                                                                                            // metrics
584      Gson.class, // gson
585      FilterAllFilter.class // hbase-server tests jar
586    );
587
588    TableMapReduceUtil.initCredentials(job);
589
590    job.waitForCompletion(true);
591    return job;
592  }
593
594  /**
595   * Each client has one mapper to do the work, and client do the resulting count in a map task.
596   */
597
598  static String JOB_INPUT_FILENAME = "input.txt";
599
600  /*
601   * Write input file of offsets-per-client for the mapreduce job.
602   * @param c Configuration
603   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
604   */
605  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
606    return writeInputFile(c, opts, new Path("."));
607  }
608
609  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
610    throws IOException {
611    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
612    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
613    Path inputDir = new Path(jobdir, "inputs");
614
615    FileSystem fs = FileSystem.get(c);
616    fs.mkdirs(inputDir);
617
618    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
619    PrintStream out = new PrintStream(fs.create(inputFile));
620    // Make input random.
621    Map<Integer, String> m = new TreeMap<>();
622    Hash h = MurmurHash.getInstance();
623    int perClientRows = (opts.totalRows / opts.numClientThreads);
624    try {
625      for (int j = 0; j < opts.numClientThreads; j++) {
626        TestOptions next = new TestOptions(opts);
627        next.startRow = j * perClientRows;
628        next.perClientRunRows = perClientRows;
629        String s = GSON.toJson(next);
630        LOG.info("Client=" + j + ", input=" + s);
631        byte[] b = Bytes.toBytes(s);
632        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
633        m.put(hash, s);
634      }
635      for (Map.Entry<Integer, String> e : m.entrySet()) {
636        out.println(e.getValue());
637      }
638    } finally {
639      out.close();
640    }
641    return inputDir;
642  }
643
644  /**
645   * Describes a command.
646   */
647  static class CmdDescriptor {
648    private Class<? extends TestBase> cmdClass;
649    private String name;
650    private String description;
651
652    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
653      this.cmdClass = cmdClass;
654      this.name = name;
655      this.description = description;
656    }
657
658    public Class<? extends TestBase> getCmdClass() {
659      return cmdClass;
660    }
661
662    public String getName() {
663      return name;
664    }
665
666    public String getDescription() {
667      return description;
668    }
669  }
670
671  /**
672   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
673   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
674   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
675   * need to add to the clone constructor below copying your new option from the 'that' to the
676   * 'this'. Look for 'clone' below.
677   */
678  static class TestOptions {
679    String cmdName = null;
680    boolean nomapred = false;
681    boolean filterAll = false;
682    int startRow = 0;
683    float size = 1.0f;
684    int perClientRunRows = DEFAULT_ROWS_PER_GB;
685    int numClientThreads = 1;
686    int totalRows = DEFAULT_ROWS_PER_GB;
687    int measureAfter = 0;
688    float sampleRate = 1.0f;
689    /**
690     * @deprecated Useless after switching to OpenTelemetry
691     */
692    @Deprecated
693    double traceRate = 0.0;
694    String tableName = TABLE_NAME;
695    boolean flushCommits = true;
696    boolean writeToWAL = true;
697    boolean autoFlush = false;
698    boolean oneCon = false;
699    int connCount = -1; // wil decide the actual num later
700    boolean useTags = false;
701    int noOfTags = 1;
702    boolean reportLatency = false;
703    int multiGet = 0;
704    int multiPut = 0;
705    int randomSleep = 0;
706    boolean inMemoryCF = false;
707    int presplitRegions = 0;
708    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
709    String splitPolicy = null;
710    Compression.Algorithm compression = Compression.Algorithm.NONE;
711    String encryption = null;
712    BloomType bloomType = BloomType.ROW;
713    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
714    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
715    boolean valueRandom = false;
716    boolean valueZipf = false;
717    int valueSize = DEFAULT_VALUE_LENGTH;
718    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
719    int cycles = 1;
720    int columns = 1;
721    int families = 1;
722    int caching = 30;
723    int latencyThreshold = 0; // in millsecond
724    boolean addColumns = true;
725    MemoryCompactionPolicy inMemoryCompaction =
726      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
727    boolean asyncPrefetch = false;
728    boolean cacheBlocks = true;
729    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
730    long bufferSize = 2l * 1024l * 1024l;
731
732    public TestOptions() {
733    }
734
735    /**
736     * Clone constructor.
737     * @param that Object to copy from.
738     */
739    public TestOptions(TestOptions that) {
740      this.cmdName = that.cmdName;
741      this.cycles = that.cycles;
742      this.nomapred = that.nomapred;
743      this.startRow = that.startRow;
744      this.size = that.size;
745      this.perClientRunRows = that.perClientRunRows;
746      this.numClientThreads = that.numClientThreads;
747      this.totalRows = that.totalRows;
748      this.sampleRate = that.sampleRate;
749      this.traceRate = that.traceRate;
750      this.tableName = that.tableName;
751      this.flushCommits = that.flushCommits;
752      this.writeToWAL = that.writeToWAL;
753      this.autoFlush = that.autoFlush;
754      this.oneCon = that.oneCon;
755      this.connCount = that.connCount;
756      this.useTags = that.useTags;
757      this.noOfTags = that.noOfTags;
758      this.reportLatency = that.reportLatency;
759      this.latencyThreshold = that.latencyThreshold;
760      this.multiGet = that.multiGet;
761      this.multiPut = that.multiPut;
762      this.inMemoryCF = that.inMemoryCF;
763      this.presplitRegions = that.presplitRegions;
764      this.replicas = that.replicas;
765      this.splitPolicy = that.splitPolicy;
766      this.compression = that.compression;
767      this.encryption = that.encryption;
768      this.blockEncoding = that.blockEncoding;
769      this.filterAll = that.filterAll;
770      this.bloomType = that.bloomType;
771      this.blockSize = that.blockSize;
772      this.valueRandom = that.valueRandom;
773      this.valueZipf = that.valueZipf;
774      this.valueSize = that.valueSize;
775      this.period = that.period;
776      this.randomSleep = that.randomSleep;
777      this.measureAfter = that.measureAfter;
778      this.addColumns = that.addColumns;
779      this.columns = that.columns;
780      this.families = that.families;
781      this.caching = that.caching;
782      this.inMemoryCompaction = that.inMemoryCompaction;
783      this.asyncPrefetch = that.asyncPrefetch;
784      this.cacheBlocks = that.cacheBlocks;
785      this.scanReadType = that.scanReadType;
786      this.bufferSize = that.bufferSize;
787    }
788
789    public int getCaching() {
790      return this.caching;
791    }
792
793    public void setCaching(final int caching) {
794      this.caching = caching;
795    }
796
797    public int getColumns() {
798      return this.columns;
799    }
800
801    public void setColumns(final int columns) {
802      this.columns = columns;
803    }
804
805    public int getFamilies() {
806      return this.families;
807    }
808
809    public void setFamilies(final int families) {
810      this.families = families;
811    }
812
813    public int getCycles() {
814      return this.cycles;
815    }
816
817    public void setCycles(final int cycles) {
818      this.cycles = cycles;
819    }
820
821    public boolean isValueZipf() {
822      return valueZipf;
823    }
824
825    public void setValueZipf(boolean valueZipf) {
826      this.valueZipf = valueZipf;
827    }
828
829    public String getCmdName() {
830      return cmdName;
831    }
832
833    public void setCmdName(String cmdName) {
834      this.cmdName = cmdName;
835    }
836
837    public int getRandomSleep() {
838      return randomSleep;
839    }
840
841    public void setRandomSleep(int randomSleep) {
842      this.randomSleep = randomSleep;
843    }
844
845    public int getReplicas() {
846      return replicas;
847    }
848
849    public void setReplicas(int replicas) {
850      this.replicas = replicas;
851    }
852
853    public String getSplitPolicy() {
854      return splitPolicy;
855    }
856
857    public void setSplitPolicy(String splitPolicy) {
858      this.splitPolicy = splitPolicy;
859    }
860
861    public void setNomapred(boolean nomapred) {
862      this.nomapred = nomapred;
863    }
864
865    public void setFilterAll(boolean filterAll) {
866      this.filterAll = filterAll;
867    }
868
869    public void setStartRow(int startRow) {
870      this.startRow = startRow;
871    }
872
873    public void setSize(float size) {
874      this.size = size;
875    }
876
877    public void setPerClientRunRows(int perClientRunRows) {
878      this.perClientRunRows = perClientRunRows;
879    }
880
881    public void setNumClientThreads(int numClientThreads) {
882      this.numClientThreads = numClientThreads;
883    }
884
885    public void setTotalRows(int totalRows) {
886      this.totalRows = totalRows;
887    }
888
889    public void setSampleRate(float sampleRate) {
890      this.sampleRate = sampleRate;
891    }
892
893    public void setTraceRate(double traceRate) {
894      this.traceRate = traceRate;
895    }
896
897    public void setTableName(String tableName) {
898      this.tableName = tableName;
899    }
900
901    public void setFlushCommits(boolean flushCommits) {
902      this.flushCommits = flushCommits;
903    }
904
905    public void setWriteToWAL(boolean writeToWAL) {
906      this.writeToWAL = writeToWAL;
907    }
908
909    public void setAutoFlush(boolean autoFlush) {
910      this.autoFlush = autoFlush;
911    }
912
913    public void setOneCon(boolean oneCon) {
914      this.oneCon = oneCon;
915    }
916
917    public int getConnCount() {
918      return connCount;
919    }
920
921    public void setConnCount(int connCount) {
922      this.connCount = connCount;
923    }
924
925    public void setUseTags(boolean useTags) {
926      this.useTags = useTags;
927    }
928
929    public void setNoOfTags(int noOfTags) {
930      this.noOfTags = noOfTags;
931    }
932
933    public void setReportLatency(boolean reportLatency) {
934      this.reportLatency = reportLatency;
935    }
936
937    public void setMultiGet(int multiGet) {
938      this.multiGet = multiGet;
939    }
940
941    public void setMultiPut(int multiPut) {
942      this.multiPut = multiPut;
943    }
944
945    public void setInMemoryCF(boolean inMemoryCF) {
946      this.inMemoryCF = inMemoryCF;
947    }
948
949    public void setPresplitRegions(int presplitRegions) {
950      this.presplitRegions = presplitRegions;
951    }
952
953    public void setCompression(Compression.Algorithm compression) {
954      this.compression = compression;
955    }
956
957    public void setEncryption(String encryption) {
958      this.encryption = encryption;
959    }
960
961    public void setBloomType(BloomType bloomType) {
962      this.bloomType = bloomType;
963    }
964
965    public void setBlockSize(int blockSize) {
966      this.blockSize = blockSize;
967    }
968
969    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
970      this.blockEncoding = blockEncoding;
971    }
972
973    public void setValueRandom(boolean valueRandom) {
974      this.valueRandom = valueRandom;
975    }
976
977    public void setValueSize(int valueSize) {
978      this.valueSize = valueSize;
979    }
980
981    public void setBufferSize(long bufferSize) {
982      this.bufferSize = bufferSize;
983    }
984
985    public void setPeriod(int period) {
986      this.period = period;
987    }
988
989    public boolean isNomapred() {
990      return nomapred;
991    }
992
993    public boolean isFilterAll() {
994      return filterAll;
995    }
996
997    public int getStartRow() {
998      return startRow;
999    }
1000
1001    public float getSize() {
1002      return size;
1003    }
1004
1005    public int getPerClientRunRows() {
1006      return perClientRunRows;
1007    }
1008
1009    public int getNumClientThreads() {
1010      return numClientThreads;
1011    }
1012
1013    public int getTotalRows() {
1014      return totalRows;
1015    }
1016
1017    public float getSampleRate() {
1018      return sampleRate;
1019    }
1020
1021    public double getTraceRate() {
1022      return traceRate;
1023    }
1024
1025    public String getTableName() {
1026      return tableName;
1027    }
1028
1029    public boolean isFlushCommits() {
1030      return flushCommits;
1031    }
1032
1033    public boolean isWriteToWAL() {
1034      return writeToWAL;
1035    }
1036
1037    public boolean isAutoFlush() {
1038      return autoFlush;
1039    }
1040
1041    public boolean isUseTags() {
1042      return useTags;
1043    }
1044
1045    public int getNoOfTags() {
1046      return noOfTags;
1047    }
1048
1049    public boolean isReportLatency() {
1050      return reportLatency;
1051    }
1052
1053    public int getMultiGet() {
1054      return multiGet;
1055    }
1056
1057    public int getMultiPut() {
1058      return multiPut;
1059    }
1060
1061    public boolean isInMemoryCF() {
1062      return inMemoryCF;
1063    }
1064
1065    public int getPresplitRegions() {
1066      return presplitRegions;
1067    }
1068
1069    public Compression.Algorithm getCompression() {
1070      return compression;
1071    }
1072
1073    public String getEncryption() {
1074      return encryption;
1075    }
1076
1077    public DataBlockEncoding getBlockEncoding() {
1078      return blockEncoding;
1079    }
1080
1081    public boolean isValueRandom() {
1082      return valueRandom;
1083    }
1084
1085    public int getValueSize() {
1086      return valueSize;
1087    }
1088
1089    public int getPeriod() {
1090      return period;
1091    }
1092
1093    public BloomType getBloomType() {
1094      return bloomType;
1095    }
1096
1097    public int getBlockSize() {
1098      return blockSize;
1099    }
1100
1101    public boolean isOneCon() {
1102      return oneCon;
1103    }
1104
1105    public int getMeasureAfter() {
1106      return measureAfter;
1107    }
1108
1109    public void setMeasureAfter(int measureAfter) {
1110      this.measureAfter = measureAfter;
1111    }
1112
1113    public boolean getAddColumns() {
1114      return addColumns;
1115    }
1116
1117    public void setAddColumns(boolean addColumns) {
1118      this.addColumns = addColumns;
1119    }
1120
1121    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1122      this.inMemoryCompaction = inMemoryCompaction;
1123    }
1124
1125    public MemoryCompactionPolicy getInMemoryCompaction() {
1126      return this.inMemoryCompaction;
1127    }
1128
1129    public long getBufferSize() {
1130      return this.bufferSize;
1131    }
1132  }
1133
1134  /*
1135   * A test. Subclass to particularize what happens per row.
1136   */
1137  static abstract class TestBase {
1138    // Below is make it so when Tests are all running in the one
1139    // jvm, that they each have a differently seeded Random.
1140    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1141
1142    private static long nextRandomSeed() {
1143      return randomSeed.nextLong();
1144    }
1145
1146    private final int everyN;
1147
1148    protected final Random rand = new Random(nextRandomSeed());
1149    protected final Configuration conf;
1150    protected final TestOptions opts;
1151
1152    private final Status status;
1153
1154    private String testName;
1155    private Histogram latencyHistogram;
1156    private Histogram replicaLatencyHistogram;
1157    private Histogram valueSizeHistogram;
1158    private Histogram rpcCallsHistogram;
1159    private Histogram remoteRpcCallsHistogram;
1160    private Histogram millisBetweenNextHistogram;
1161    private Histogram regionsScannedHistogram;
1162    private Histogram bytesInResultsHistogram;
1163    private Histogram bytesInRemoteResultsHistogram;
1164    private RandomDistribution.Zipf zipf;
1165    private long numOfReplyOverLatencyThreshold = 0;
1166    private long numOfReplyFromReplica = 0;
1167
1168    /**
1169     * Note that all subclasses of this class must provide a public constructor that has the exact
1170     * same list of arguments.
1171     */
1172    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1173      this.conf = conf;
1174      this.opts = options;
1175      this.status = status;
1176      this.testName = this.getClass().getSimpleName();
1177      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1178      if (options.isValueZipf()) {
1179        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1180      }
1181      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1182    }
1183
1184    int getValueLength(final Random r) {
1185      if (this.opts.isValueRandom()) {
1186        return r.nextInt(opts.valueSize);
1187      } else if (this.opts.isValueZipf()) {
1188        return Math.abs(this.zipf.nextInt());
1189      } else {
1190        return opts.valueSize;
1191      }
1192    }
1193
1194    void updateValueSize(final Result[] rs) throws IOException {
1195      updateValueSize(rs, 0);
1196    }
1197
1198    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1199      if (rs == null || (latency == 0)) return;
1200      for (Result r : rs)
1201        updateValueSize(r, latency);
1202    }
1203
1204    void updateValueSize(final Result r) throws IOException {
1205      updateValueSize(r, 0);
1206    }
1207
1208    void updateValueSize(final Result r, final long latency) throws IOException {
1209      if (r == null || (latency == 0)) return;
1210      int size = 0;
1211      // update replicaHistogram
1212      if (r.isStale()) {
1213        replicaLatencyHistogram.update(latency / 1000);
1214        numOfReplyFromReplica++;
1215      }
1216      if (!isRandomValueSize()) return;
1217
1218      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1219        size += scanner.current().getValueLength();
1220      }
1221      updateValueSize(size);
1222    }
1223
1224    void updateValueSize(final int valueSize) {
1225      if (!isRandomValueSize()) return;
1226      this.valueSizeHistogram.update(valueSize);
1227    }
1228
1229    void updateScanMetrics(final ScanMetrics metrics) {
1230      if (metrics == null) return;
1231      Map<String, Long> metricsMap = metrics.getMetricsMap();
1232      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1233      if (rpcCalls != null) {
1234        this.rpcCallsHistogram.update(rpcCalls.longValue());
1235      }
1236      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1237      if (remoteRpcCalls != null) {
1238        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1239      }
1240      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1241      if (millisBetweenNext != null) {
1242        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1243      }
1244      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1245      if (regionsScanned != null) {
1246        this.regionsScannedHistogram.update(regionsScanned.longValue());
1247      }
1248      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1249      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1250        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1251      }
1252      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1253      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1254        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1255      }
1256    }
1257
1258    String generateStatus(final int sr, final int i, final int lr) {
1259      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1260        + getShortLatencyReport() + "]"
1261        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1262    }
1263
1264    boolean isRandomValueSize() {
1265      return opts.valueRandom;
1266    }
1267
1268    protected int getReportingPeriod() {
1269      return opts.period;
1270    }
1271
1272    /**
1273     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1274     */
1275    public Histogram getLatencyHistogram() {
1276      return latencyHistogram;
1277    }
1278
1279    void testSetup() throws IOException {
1280      // test metrics
1281      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1282      // If it is a replica test, set up histogram for replica.
1283      if (opts.replicas > 1) {
1284        replicaLatencyHistogram =
1285          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1286      }
1287      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1288      // scan metrics
1289      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1290      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1291      millisBetweenNextHistogram =
1292        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1293      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1294      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1295      bytesInRemoteResultsHistogram =
1296        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1297
1298      onStartup();
1299    }
1300
1301    abstract void onStartup() throws IOException;
1302
1303    void testTakedown() throws IOException {
1304      onTakedown();
1305      // Print all stats for this thread continuously.
1306      // Synchronize on Test.class so different threads don't intermingle the
1307      // output. We can't use 'this' here because each thread has its own instance of Test class.
1308      synchronized (Test.class) {
1309        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1310        status
1311          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1312        if (opts.replicas > 1) {
1313          status.setStatus("Latency (us) from Replica Regions: "
1314            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1315        }
1316        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1317        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1318        if (valueSizeHistogram.getCount() > 0) {
1319          status.setStatus(
1320            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1321          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1322          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1323        } else {
1324          status.setStatus("No valueSize statistics available");
1325        }
1326        if (rpcCallsHistogram.getCount() > 0) {
1327          status.setStatus(
1328            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1329        }
1330        if (remoteRpcCallsHistogram.getCount() > 0) {
1331          status.setStatus("remoteRpcCalls (count): "
1332            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1333        }
1334        if (millisBetweenNextHistogram.getCount() > 0) {
1335          status.setStatus("millisBetweenNext (latency): "
1336            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1337        }
1338        if (regionsScannedHistogram.getCount() > 0) {
1339          status.setStatus("regionsScanned (count): "
1340            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1341        }
1342        if (bytesInResultsHistogram.getCount() > 0) {
1343          status.setStatus("bytesInResults (size): "
1344            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1345        }
1346        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1347          status.setStatus("bytesInRemoteResults (size): "
1348            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1349        }
1350      }
1351    }
1352
1353    abstract void onTakedown() throws IOException;
1354
1355    /*
1356     * Run test
1357     * @return Elapsed time.
1358     */
1359    long test() throws IOException, InterruptedException {
1360      testSetup();
1361      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1362      final long startTime = System.nanoTime();
1363      try {
1364        testTimed();
1365      } finally {
1366        testTakedown();
1367      }
1368      return (System.nanoTime() - startTime) / 1000000;
1369    }
1370
1371    int getStartRow() {
1372      return opts.startRow;
1373    }
1374
1375    int getLastRow() {
1376      return getStartRow() + opts.perClientRunRows;
1377    }
1378
1379    /**
1380     * Provides an extension point for tests that don't want a per row invocation.
1381     */
1382    void testTimed() throws IOException, InterruptedException {
1383      int startRow = getStartRow();
1384      int lastRow = getLastRow();
1385      // Report on completion of 1/10th of total.
1386      for (int ii = 0; ii < opts.cycles; ii++) {
1387        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1388        for (int i = startRow; i < lastRow; i++) {
1389          if (i % everyN != 0) continue;
1390          long startTime = System.nanoTime();
1391          boolean requestSent = false;
1392          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1393          try (Scope scope = span.makeCurrent()) {
1394            requestSent = testRow(i, startTime);
1395          } finally {
1396            span.end();
1397          }
1398          if ((i - startRow) > opts.measureAfter) {
1399            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1400            // first 9 times and sends the actual get request in the 10th iteration.
1401            // We should only set latency when actual request is sent because otherwise
1402            // it turns out to be 0.
1403            if (requestSent) {
1404              long latency = (System.nanoTime() - startTime) / 1000;
1405              latencyHistogram.update(latency);
1406              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1407                numOfReplyOverLatencyThreshold++;
1408              }
1409            }
1410            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1411              status.setStatus(generateStatus(startRow, i, lastRow));
1412            }
1413          }
1414        }
1415      }
1416    }
1417
1418    /** Returns Subset of the histograms' calculation. */
1419    public String getShortLatencyReport() {
1420      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1421    }
1422
1423    /** Returns Subset of the histograms' calculation. */
1424    public String getShortValueSizeReport() {
1425      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1426    }
1427
1428    /**
1429     * Test for individual row.
1430     * @param i Row index.
1431     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1432     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1433     */
1434    abstract boolean testRow(final int i, final long startTime)
1435      throws IOException, InterruptedException;
1436  }
1437
1438  static abstract class Test extends TestBase {
1439    protected Connection connection;
1440
1441    Test(final Connection con, final TestOptions options, final Status status) {
1442      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1443      this.connection = con;
1444    }
1445  }
1446
1447  static abstract class AsyncTest extends TestBase {
1448    protected AsyncConnection connection;
1449
1450    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1451      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1452      this.connection = con;
1453    }
1454  }
1455
1456  static abstract class TableTest extends Test {
1457    protected Table table;
1458
1459    TableTest(Connection con, TestOptions options, Status status) {
1460      super(con, options, status);
1461    }
1462
1463    @Override
1464    void onStartup() throws IOException {
1465      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1466    }
1467
1468    @Override
1469    void onTakedown() throws IOException {
1470      table.close();
1471    }
1472  }
1473
1474  /*
1475   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1476   */
1477  static abstract class MetaTest extends TableTest {
1478    protected int keyLength;
1479
1480    MetaTest(Connection con, TestOptions options, Status status) {
1481      super(con, options, status);
1482      keyLength = Integer.toString(opts.perClientRunRows).length();
1483    }
1484
1485    @Override
1486    void onTakedown() throws IOException {
1487      // No clean up
1488    }
1489
1490    /*
1491     * Generates Lexicographically ascending strings
1492     */
1493    protected byte[] getSplitKey(final int i) {
1494      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1495    }
1496
1497  }
1498
1499  static abstract class AsyncTableTest extends AsyncTest {
1500    protected AsyncTable<?> table;
1501
1502    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1503      super(con, options, status);
1504    }
1505
1506    @Override
1507    void onStartup() throws IOException {
1508      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1509    }
1510
1511    @Override
1512    void onTakedown() throws IOException {
1513    }
1514  }
1515
1516  static class AsyncRandomReadTest extends AsyncTableTest {
1517    private final Consistency consistency;
1518    private ArrayList<Get> gets;
1519
1520    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1521      super(con, options, status);
1522      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1523      if (opts.multiGet > 0) {
1524        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1525        this.gets = new ArrayList<>(opts.multiGet);
1526      }
1527    }
1528
1529    @Override
1530    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1531      if (opts.randomSleep > 0) {
1532        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1533      }
1534      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1535      for (int family = 0; family < opts.families; family++) {
1536        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1537        if (opts.addColumns) {
1538          for (int column = 0; column < opts.columns; column++) {
1539            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1540            get.addColumn(familyName, qualifier);
1541          }
1542        } else {
1543          get.addFamily(familyName);
1544        }
1545      }
1546      if (opts.filterAll) {
1547        get.setFilter(new FilterAllFilter());
1548      }
1549      get.setConsistency(consistency);
1550      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1551      try {
1552        if (opts.multiGet > 0) {
1553          this.gets.add(get);
1554          if (this.gets.size() == opts.multiGet) {
1555            Result[] rs =
1556              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1557            updateValueSize(rs);
1558            this.gets.clear();
1559          } else {
1560            return false;
1561          }
1562        } else {
1563          updateValueSize(this.table.get(get).get());
1564        }
1565      } catch (ExecutionException e) {
1566        throw new IOException(e);
1567      }
1568      return true;
1569    }
1570
1571    public static RuntimeException runtime(Throwable e) {
1572      if (e instanceof RuntimeException) {
1573        return (RuntimeException) e;
1574      }
1575      return new RuntimeException(e);
1576    }
1577
1578    public static <V> V propagate(Callable<V> callable) {
1579      try {
1580        return callable.call();
1581      } catch (Exception e) {
1582        throw runtime(e);
1583      }
1584    }
1585
1586    @Override
1587    protected int getReportingPeriod() {
1588      int period = opts.perClientRunRows / 10;
1589      return period == 0 ? opts.perClientRunRows : period;
1590    }
1591
1592    @Override
1593    protected void testTakedown() throws IOException {
1594      if (this.gets != null && this.gets.size() > 0) {
1595        this.table.get(gets);
1596        this.gets.clear();
1597      }
1598      super.testTakedown();
1599    }
1600  }
1601
1602  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1603
1604    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1605      super(con, options, status);
1606    }
1607
1608    @Override
1609    protected byte[] generateRow(final int i) {
1610      return getRandomRow(this.rand, opts.totalRows);
1611    }
1612  }
1613
1614  static class AsyncScanTest extends AsyncTableTest {
1615    private ResultScanner testScanner;
1616    private AsyncTable<?> asyncTable;
1617
1618    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1619      super(con, options, status);
1620    }
1621
1622    @Override
1623    void onStartup() throws IOException {
1624      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1625        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1626    }
1627
1628    @Override
1629    void testTakedown() throws IOException {
1630      if (this.testScanner != null) {
1631        updateScanMetrics(this.testScanner.getScanMetrics());
1632        this.testScanner.close();
1633      }
1634      super.testTakedown();
1635    }
1636
1637    @Override
1638    boolean testRow(final int i, final long startTime) throws IOException {
1639      if (this.testScanner == null) {
1640        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1641          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1642          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1643        for (int family = 0; family < opts.families; family++) {
1644          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1645          if (opts.addColumns) {
1646            for (int column = 0; column < opts.columns; column++) {
1647              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1648              scan.addColumn(familyName, qualifier);
1649            }
1650          } else {
1651            scan.addFamily(familyName);
1652          }
1653        }
1654        if (opts.filterAll) {
1655          scan.setFilter(new FilterAllFilter());
1656        }
1657        this.testScanner = asyncTable.getScanner(scan);
1658      }
1659      Result r = testScanner.next();
1660      updateValueSize(r);
1661      return true;
1662    }
1663  }
1664
1665  static class AsyncSequentialReadTest extends AsyncTableTest {
1666    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1667      super(con, options, status);
1668    }
1669
1670    @Override
1671    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1672      Get get = new Get(format(i));
1673      for (int family = 0; family < opts.families; family++) {
1674        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1675        if (opts.addColumns) {
1676          for (int column = 0; column < opts.columns; column++) {
1677            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1678            get.addColumn(familyName, qualifier);
1679          }
1680        } else {
1681          get.addFamily(familyName);
1682        }
1683      }
1684      if (opts.filterAll) {
1685        get.setFilter(new FilterAllFilter());
1686      }
1687      try {
1688        updateValueSize(table.get(get).get());
1689      } catch (ExecutionException e) {
1690        throw new IOException(e);
1691      }
1692      return true;
1693    }
1694  }
1695
1696  static class AsyncSequentialWriteTest extends AsyncTableTest {
1697    private ArrayList<Put> puts;
1698
1699    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1700      super(con, options, status);
1701      if (opts.multiPut > 0) {
1702        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1703        this.puts = new ArrayList<>(opts.multiPut);
1704      }
1705    }
1706
1707    protected byte[] generateRow(final int i) {
1708      return format(i);
1709    }
1710
1711    @Override
1712    @SuppressWarnings("ReturnValueIgnored")
1713    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1714      byte[] row = generateRow(i);
1715      Put put = new Put(row);
1716      for (int family = 0; family < opts.families; family++) {
1717        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1718        for (int column = 0; column < opts.columns; column++) {
1719          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1720          byte[] value = generateData(this.rand, getValueLength(this.rand));
1721          if (opts.useTags) {
1722            byte[] tag = generateData(this.rand, TAG_LENGTH);
1723            Tag[] tags = new Tag[opts.noOfTags];
1724            for (int n = 0; n < opts.noOfTags; n++) {
1725              Tag t = new ArrayBackedTag((byte) n, tag);
1726              tags[n] = t;
1727            }
1728            KeyValue kv =
1729              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1730            put.add(kv);
1731            updateValueSize(kv.getValueLength());
1732          } else {
1733            put.addColumn(familyName, qualifier, value);
1734            updateValueSize(value.length);
1735          }
1736        }
1737      }
1738      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1739      try {
1740        table.put(put).get();
1741        if (opts.multiPut > 0) {
1742          this.puts.add(put);
1743          if (this.puts.size() == opts.multiPut) {
1744            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1745            this.puts.clear();
1746          } else {
1747            return false;
1748          }
1749        } else {
1750          table.put(put).get();
1751        }
1752      } catch (ExecutionException e) {
1753        throw new IOException(e);
1754      }
1755      return true;
1756    }
1757  }
1758
1759  static abstract class BufferedMutatorTest extends Test {
1760    protected BufferedMutator mutator;
1761    protected Table table;
1762
1763    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1764      super(con, options, status);
1765    }
1766
1767    @Override
1768    void onStartup() throws IOException {
1769      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1770      p.writeBufferSize(opts.bufferSize);
1771      this.mutator = connection.getBufferedMutator(p);
1772      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1773    }
1774
1775    @Override
1776    void onTakedown() throws IOException {
1777      mutator.close();
1778      table.close();
1779    }
1780  }
1781
1782  static class RandomSeekScanTest extends TableTest {
1783    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1784      super(con, options, status);
1785    }
1786
1787    @Override
1788    boolean testRow(final int i, final long startTime) throws IOException {
1789      Scan scan =
1790        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1791          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1792          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1793      FilterList list = new FilterList();
1794      for (int family = 0; family < opts.families; family++) {
1795        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1796        if (opts.addColumns) {
1797          for (int column = 0; column < opts.columns; column++) {
1798            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1799            scan.addColumn(familyName, qualifier);
1800          }
1801        } else {
1802          scan.addFamily(familyName);
1803        }
1804      }
1805      if (opts.filterAll) {
1806        list.addFilter(new FilterAllFilter());
1807      }
1808      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1809      scan.setFilter(list);
1810      ResultScanner s = this.table.getScanner(scan);
1811      try {
1812        for (Result rr; (rr = s.next()) != null;) {
1813          updateValueSize(rr);
1814        }
1815      } finally {
1816        updateScanMetrics(s.getScanMetrics());
1817        s.close();
1818      }
1819      return true;
1820    }
1821
1822    @Override
1823    protected int getReportingPeriod() {
1824      int period = opts.perClientRunRows / 100;
1825      return period == 0 ? opts.perClientRunRows : period;
1826    }
1827
1828  }
1829
1830  static abstract class RandomScanWithRangeTest extends TableTest {
1831    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1832      super(con, options, status);
1833    }
1834
1835    @Override
1836    boolean testRow(final int i, final long startTime) throws IOException {
1837      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1838      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1839        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1840        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1841        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1842      for (int family = 0; family < opts.families; family++) {
1843        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1844        if (opts.addColumns) {
1845          for (int column = 0; column < opts.columns; column++) {
1846            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1847            scan.addColumn(familyName, qualifier);
1848          }
1849        } else {
1850          scan.addFamily(familyName);
1851        }
1852      }
1853      if (opts.filterAll) {
1854        scan.setFilter(new FilterAllFilter());
1855      }
1856      Result r = null;
1857      int count = 0;
1858      ResultScanner s = this.table.getScanner(scan);
1859      try {
1860        for (; (r = s.next()) != null;) {
1861          updateValueSize(r);
1862          count++;
1863        }
1864        if (i % 100 == 0) {
1865          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1866            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1867            count));
1868        }
1869      } finally {
1870        updateScanMetrics(s.getScanMetrics());
1871        s.close();
1872      }
1873      return true;
1874    }
1875
1876    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1877
1878    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1879      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1880      int stop = start + maxRange;
1881      return new Pair<>(format(start), format(stop));
1882    }
1883
1884    @Override
1885    protected int getReportingPeriod() {
1886      int period = opts.perClientRunRows / 100;
1887      return period == 0 ? opts.perClientRunRows : period;
1888    }
1889  }
1890
1891  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1892    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1893      super(con, options, status);
1894    }
1895
1896    @Override
1897    protected Pair<byte[], byte[]> getStartAndStopRow() {
1898      return generateStartAndStopRows(10);
1899    }
1900  }
1901
1902  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1903    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1904      super(con, options, status);
1905    }
1906
1907    @Override
1908    protected Pair<byte[], byte[]> getStartAndStopRow() {
1909      return generateStartAndStopRows(100);
1910    }
1911  }
1912
1913  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1914    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1915      super(con, options, status);
1916    }
1917
1918    @Override
1919    protected Pair<byte[], byte[]> getStartAndStopRow() {
1920      return generateStartAndStopRows(1000);
1921    }
1922  }
1923
1924  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1925    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1926      super(con, options, status);
1927    }
1928
1929    @Override
1930    protected Pair<byte[], byte[]> getStartAndStopRow() {
1931      return generateStartAndStopRows(10000);
1932    }
1933  }
1934
1935  static class RandomReadTest extends TableTest {
1936    private final Consistency consistency;
1937    private ArrayList<Get> gets;
1938
1939    RandomReadTest(Connection con, TestOptions options, Status status) {
1940      super(con, options, status);
1941      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1942      if (opts.multiGet > 0) {
1943        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1944        this.gets = new ArrayList<>(opts.multiGet);
1945      }
1946    }
1947
1948    @Override
1949    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1950      if (opts.randomSleep > 0) {
1951        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1952      }
1953      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1954      for (int family = 0; family < opts.families; family++) {
1955        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1956        if (opts.addColumns) {
1957          for (int column = 0; column < opts.columns; column++) {
1958            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1959            get.addColumn(familyName, qualifier);
1960          }
1961        } else {
1962          get.addFamily(familyName);
1963        }
1964      }
1965      if (opts.filterAll) {
1966        get.setFilter(new FilterAllFilter());
1967      }
1968      get.setConsistency(consistency);
1969      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1970      if (opts.multiGet > 0) {
1971        this.gets.add(get);
1972        if (this.gets.size() == opts.multiGet) {
1973          Result[] rs = this.table.get(this.gets);
1974          if (opts.replicas > 1) {
1975            long latency = System.nanoTime() - startTime;
1976            updateValueSize(rs, latency);
1977          } else {
1978            updateValueSize(rs);
1979          }
1980          this.gets.clear();
1981        } else {
1982          return false;
1983        }
1984      } else {
1985        if (opts.replicas > 1) {
1986          Result r = this.table.get(get);
1987          long latency = System.nanoTime() - startTime;
1988          updateValueSize(r, latency);
1989        } else {
1990          updateValueSize(this.table.get(get));
1991        }
1992      }
1993      return true;
1994    }
1995
1996    @Override
1997    protected int getReportingPeriod() {
1998      int period = opts.perClientRunRows / 10;
1999      return period == 0 ? opts.perClientRunRows : period;
2000    }
2001
2002    @Override
2003    protected void testTakedown() throws IOException {
2004      if (this.gets != null && this.gets.size() > 0) {
2005        this.table.get(gets);
2006        this.gets.clear();
2007      }
2008      super.testTakedown();
2009    }
2010  }
2011
2012  /*
2013   * Send random reads against fake regions inserted by MetaWriteTest
2014   */
2015  static class MetaRandomReadTest extends MetaTest {
2016    private RegionLocator regionLocator;
2017
2018    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2019      super(con, options, status);
2020      LOG.info("call getRegionLocation");
2021    }
2022
2023    @Override
2024    void onStartup() throws IOException {
2025      super.onStartup();
2026      this.regionLocator = connection.getRegionLocator(table.getName());
2027    }
2028
2029    @Override
2030    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2031      if (opts.randomSleep > 0) {
2032        Thread.sleep(rand.nextInt(opts.randomSleep));
2033      }
2034      HRegionLocation hRegionLocation =
2035        regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true);
2036      LOG.debug("get location for region: " + hRegionLocation);
2037      return true;
2038    }
2039
2040    @Override
2041    protected int getReportingPeriod() {
2042      int period = opts.perClientRunRows / 10;
2043      return period == 0 ? opts.perClientRunRows : period;
2044    }
2045
2046    @Override
2047    protected void testTakedown() throws IOException {
2048      super.testTakedown();
2049    }
2050  }
2051
2052  static class RandomWriteTest extends SequentialWriteTest {
2053    RandomWriteTest(Connection con, TestOptions options, Status status) {
2054      super(con, options, status);
2055    }
2056
2057    @Override
2058    protected byte[] generateRow(final int i) {
2059      return getRandomRow(this.rand, opts.totalRows);
2060    }
2061
2062  }
2063
2064  static class ScanTest extends TableTest {
2065    private ResultScanner testScanner;
2066
2067    ScanTest(Connection con, TestOptions options, Status status) {
2068      super(con, options, status);
2069    }
2070
2071    @Override
2072    void testTakedown() throws IOException {
2073      if (this.testScanner != null) {
2074        this.testScanner.close();
2075      }
2076      super.testTakedown();
2077    }
2078
2079    @Override
2080    boolean testRow(final int i, final long startTime) throws IOException {
2081      if (this.testScanner == null) {
2082        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2083          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2084          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2085        for (int family = 0; family < opts.families; family++) {
2086          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2087          if (opts.addColumns) {
2088            for (int column = 0; column < opts.columns; column++) {
2089              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2090              scan.addColumn(familyName, qualifier);
2091            }
2092          } else {
2093            scan.addFamily(familyName);
2094          }
2095        }
2096        if (opts.filterAll) {
2097          scan.setFilter(new FilterAllFilter());
2098        }
2099        this.testScanner = table.getScanner(scan);
2100      }
2101      Result r = testScanner.next();
2102      updateValueSize(r);
2103      return true;
2104    }
2105  }
2106
2107  /**
2108   * Base class for operations that are CAS-like; that read a value and then set it based off what
2109   * they read. In this category is increment, append, checkAndPut, etc.
2110   * <p>
2111   * These operations also want some concurrency going on. Usually when these tests run, they
2112   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2113   * same key space. We do this with our getStartRow and getLastRow overrides.
2114   */
2115  static abstract class CASTableTest extends TableTest {
2116    private final byte[] qualifier;
2117
2118    CASTableTest(Connection con, TestOptions options, Status status) {
2119      super(con, options, status);
2120      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2121    }
2122
2123    byte[] getQualifier() {
2124      return this.qualifier;
2125    }
2126
2127    @Override
2128    int getStartRow() {
2129      return 0;
2130    }
2131
2132    @Override
2133    int getLastRow() {
2134      return opts.perClientRunRows;
2135    }
2136  }
2137
2138  static class IncrementTest extends CASTableTest {
2139    IncrementTest(Connection con, TestOptions options, Status status) {
2140      super(con, options, status);
2141    }
2142
2143    @Override
2144    boolean testRow(final int i, final long startTime) throws IOException {
2145      Increment increment = new Increment(format(i));
2146      // unlike checkAndXXX tests, which make most sense to do on a single value,
2147      // if multiple families are specified for an increment test we assume it is
2148      // meant to raise the work factor
2149      for (int family = 0; family < opts.families; family++) {
2150        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2151        increment.addColumn(familyName, getQualifier(), 1l);
2152      }
2153      updateValueSize(this.table.increment(increment));
2154      return true;
2155    }
2156  }
2157
2158  static class AppendTest extends CASTableTest {
2159    AppendTest(Connection con, TestOptions options, Status status) {
2160      super(con, options, status);
2161    }
2162
2163    @Override
2164    boolean testRow(final int i, final long startTime) throws IOException {
2165      byte[] bytes = format(i);
2166      Append append = new Append(bytes);
2167      // unlike checkAndXXX tests, which make most sense to do on a single value,
2168      // if multiple families are specified for an append test we assume it is
2169      // meant to raise the work factor
2170      for (int family = 0; family < opts.families; family++) {
2171        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2172        append.addColumn(familyName, getQualifier(), bytes);
2173      }
2174      updateValueSize(this.table.append(append));
2175      return true;
2176    }
2177  }
2178
2179  static class CheckAndMutateTest extends CASTableTest {
2180    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2181      super(con, options, status);
2182    }
2183
2184    @Override
2185    boolean testRow(final int i, final long startTime) throws IOException {
2186      final byte[] bytes = format(i);
2187      // checkAndXXX tests operate on only a single value
2188      // Put a known value so when we go to check it, it is there.
2189      Put put = new Put(bytes);
2190      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2191      this.table.put(put);
2192      RowMutations mutations = new RowMutations(bytes);
2193      mutations.add(put);
2194      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2195        .thenMutate(mutations);
2196      return true;
2197    }
2198  }
2199
2200  static class CheckAndPutTest extends CASTableTest {
2201    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2202      super(con, options, status);
2203    }
2204
2205    @Override
2206    boolean testRow(final int i, final long startTime) throws IOException {
2207      final byte[] bytes = format(i);
2208      // checkAndXXX tests operate on only a single value
2209      // Put a known value so when we go to check it, it is there.
2210      Put put = new Put(bytes);
2211      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2212      this.table.put(put);
2213      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2214        .thenPut(put);
2215      return true;
2216    }
2217  }
2218
2219  static class CheckAndDeleteTest extends CASTableTest {
2220    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2221      super(con, options, status);
2222    }
2223
2224    @Override
2225    boolean testRow(final int i, final long startTime) throws IOException {
2226      final byte[] bytes = format(i);
2227      // checkAndXXX tests operate on only a single value
2228      // Put a known value so when we go to check it, it is there.
2229      Put put = new Put(bytes);
2230      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2231      this.table.put(put);
2232      Delete delete = new Delete(put.getRow());
2233      delete.addColumn(FAMILY_ZERO, getQualifier());
2234      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2235        .thenDelete(delete);
2236      return true;
2237    }
2238  }
2239
2240  /*
2241   * Delete all fake regions inserted to meta table by MetaWriteTest.
2242   */
2243  static class CleanMetaTest extends MetaTest {
2244    CleanMetaTest(Connection con, TestOptions options, Status status) {
2245      super(con, options, status);
2246    }
2247
2248    @Override
2249    boolean testRow(final int i, final long startTime) throws IOException {
2250      try {
2251        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2252          .getRegionLocation(getSplitKey(i), false).getRegion();
2253        LOG.debug("deleting region from meta: " + regionInfo);
2254
2255        Delete delete =
2256          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2257        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2258          t.delete(delete);
2259        }
2260      } catch (IOException ie) {
2261        // Log and continue
2262        LOG.error("cannot find region with start key: " + i);
2263      }
2264      return true;
2265    }
2266  }
2267
2268  static class SequentialReadTest extends TableTest {
2269    SequentialReadTest(Connection con, TestOptions options, Status status) {
2270      super(con, options, status);
2271    }
2272
2273    @Override
2274    boolean testRow(final int i, final long startTime) throws IOException {
2275      Get get = new Get(format(i));
2276      for (int family = 0; family < opts.families; family++) {
2277        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2278        if (opts.addColumns) {
2279          for (int column = 0; column < opts.columns; column++) {
2280            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2281            get.addColumn(familyName, qualifier);
2282          }
2283        } else {
2284          get.addFamily(familyName);
2285        }
2286      }
2287      if (opts.filterAll) {
2288        get.setFilter(new FilterAllFilter());
2289      }
2290      updateValueSize(table.get(get));
2291      return true;
2292    }
2293  }
2294
2295  static class SequentialWriteTest extends BufferedMutatorTest {
2296    private ArrayList<Put> puts;
2297
2298    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2299      super(con, options, status);
2300      if (opts.multiPut > 0) {
2301        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2302        this.puts = new ArrayList<>(opts.multiPut);
2303      }
2304    }
2305
2306    protected byte[] generateRow(final int i) {
2307      return format(i);
2308    }
2309
2310    @Override
2311    boolean testRow(final int i, final long startTime) throws IOException {
2312      byte[] row = generateRow(i);
2313      Put put = new Put(row);
2314      for (int family = 0; family < opts.families; family++) {
2315        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2316        for (int column = 0; column < opts.columns; column++) {
2317          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2318          byte[] value = generateData(this.rand, getValueLength(this.rand));
2319          if (opts.useTags) {
2320            byte[] tag = generateData(this.rand, TAG_LENGTH);
2321            Tag[] tags = new Tag[opts.noOfTags];
2322            for (int n = 0; n < opts.noOfTags; n++) {
2323              Tag t = new ArrayBackedTag((byte) n, tag);
2324              tags[n] = t;
2325            }
2326            KeyValue kv =
2327              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2328            put.add(kv);
2329            updateValueSize(kv.getValueLength());
2330          } else {
2331            put.addColumn(familyName, qualifier, value);
2332            updateValueSize(value.length);
2333          }
2334        }
2335      }
2336      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2337      if (opts.autoFlush) {
2338        if (opts.multiPut > 0) {
2339          this.puts.add(put);
2340          if (this.puts.size() == opts.multiPut) {
2341            table.put(this.puts);
2342            this.puts.clear();
2343          } else {
2344            return false;
2345          }
2346        } else {
2347          table.put(put);
2348        }
2349      } else {
2350        mutator.mutate(put);
2351      }
2352      return true;
2353    }
2354  }
2355
2356  /*
2357   * Insert fake regions into meta table with contiguous split keys.
2358   */
2359  static class MetaWriteTest extends MetaTest {
2360
2361    MetaWriteTest(Connection con, TestOptions options, Status status) {
2362      super(con, options, status);
2363    }
2364
2365    @Override
2366    boolean testRow(final int i, final long startTime) throws IOException {
2367      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2368      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2369        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2370      regionInfos.add(regionInfo);
2371      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2372
2373      // write the serverName columns
2374      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2375        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2376        EnvironmentEdgeManager.currentTime());
2377      return true;
2378    }
2379  }
2380
2381  static class FilteredScanTest extends TableTest {
2382    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2383
2384    FilteredScanTest(Connection con, TestOptions options, Status status) {
2385      super(con, options, status);
2386      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2387        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2388          + ". This could take a very long time.");
2389      }
2390    }
2391
2392    @Override
2393    boolean testRow(int i, final long startTime) throws IOException {
2394      byte[] value = generateData(this.rand, getValueLength(this.rand));
2395      Scan scan = constructScan(value);
2396      ResultScanner scanner = null;
2397      try {
2398        scanner = this.table.getScanner(scan);
2399        for (Result r = null; (r = scanner.next()) != null;) {
2400          updateValueSize(r);
2401        }
2402      } finally {
2403        if (scanner != null) {
2404          updateScanMetrics(scanner.getScanMetrics());
2405          scanner.close();
2406        }
2407      }
2408      return true;
2409    }
2410
2411    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2412      FilterList list = new FilterList();
2413      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2414        new BinaryComparator(valuePrefix));
2415      list.addFilter(filter);
2416      if (opts.filterAll) {
2417        list.addFilter(new FilterAllFilter());
2418      }
2419      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2420        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2421        .setScanMetricsEnabled(true);
2422      if (opts.addColumns) {
2423        for (int column = 0; column < opts.columns; column++) {
2424          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2425          scan.addColumn(FAMILY_ZERO, qualifier);
2426        }
2427      } else {
2428        scan.addFamily(FAMILY_ZERO);
2429      }
2430      scan.setFilter(list);
2431      return scan;
2432    }
2433  }
2434
2435  /**
2436   * Compute a throughput rate in MB/s.
2437   * @param rows   Number of records consumed.
2438   * @param timeMs Time taken in milliseconds.
2439   * @return String value with label, ie '123.76 MB/s'
2440   */
2441  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2442    int columns) {
2443    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2444      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2445    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2446      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2447    return FMT.format(mbps) + " MB/s";
2448  }
2449
2450  /*
2451   * Format passed integer.
2452   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2453   * absolute in case number is negative).
2454   */
2455  public static byte[] format(final int number) {
2456    byte[] b = new byte[ROW_LENGTH];
2457    int d = Math.abs(number);
2458    for (int i = b.length - 1; i >= 0; i--) {
2459      b[i] = (byte) ((d % 10) + '0');
2460      d /= 10;
2461    }
2462    return b;
2463  }
2464
2465  /*
2466   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2467   * test, generation of the key and value consumes about 30% of CPU time.
2468   * @return Generated random value to insert into a table cell.
2469   */
2470  public static byte[] generateData(final Random r, int length) {
2471    byte[] b = new byte[length];
2472    int i;
2473
2474    for (i = 0; i < (length - 8); i += 8) {
2475      b[i] = (byte) (65 + r.nextInt(26));
2476      b[i + 1] = b[i];
2477      b[i + 2] = b[i];
2478      b[i + 3] = b[i];
2479      b[i + 4] = b[i];
2480      b[i + 5] = b[i];
2481      b[i + 6] = b[i];
2482      b[i + 7] = b[i];
2483    }
2484
2485    byte a = (byte) (65 + r.nextInt(26));
2486    for (; i < length; i++) {
2487      b[i] = a;
2488    }
2489    return b;
2490  }
2491
2492  static byte[] getRandomRow(final Random random, final int totalRows) {
2493    return format(generateRandomRow(random, totalRows));
2494  }
2495
2496  static int generateRandomRow(final Random random, final int totalRows) {
2497    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2498  }
2499
2500  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2501    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2502    throws IOException, InterruptedException {
2503    status.setStatus(
2504      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2505    long totalElapsedTime;
2506
2507    final TestBase t;
2508    try {
2509      if (AsyncTest.class.isAssignableFrom(cmd)) {
2510        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2511        Constructor<? extends AsyncTest> constructor =
2512          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2513        t = constructor.newInstance(asyncCon, opts, status);
2514      } else {
2515        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2516        Constructor<? extends Test> constructor =
2517          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2518        t = constructor.newInstance(con, opts, status);
2519      }
2520    } catch (NoSuchMethodException e) {
2521      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2522        + ".  It does not provide a constructor as described by "
2523        + "the javadoc comment.  Available constructors are: "
2524        + Arrays.toString(cmd.getConstructors()));
2525    } catch (Exception e) {
2526      throw new IllegalStateException("Failed to construct command class", e);
2527    }
2528    totalElapsedTime = t.test();
2529
2530    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2531      + " for " + opts.perClientRunRows + " rows" + " ("
2532      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2533        getAverageValueLength(opts), opts.families, opts.columns)
2534      + ")");
2535
2536    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2537      t.numOfReplyFromReplica, t.getLatencyHistogram());
2538  }
2539
2540  private static int getAverageValueLength(final TestOptions opts) {
2541    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2542  }
2543
2544  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2545    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2546    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2547    // the TestOptions introspection for us and dump the output in a readable format.
2548    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2549    Admin admin = null;
2550    Connection connection = null;
2551    try {
2552      connection = ConnectionFactory.createConnection(getConf());
2553      admin = connection.getAdmin();
2554      checkTable(admin, opts);
2555    } finally {
2556      if (admin != null) admin.close();
2557      if (connection != null) connection.close();
2558    }
2559    if (opts.nomapred) {
2560      doLocalClients(opts, getConf());
2561    } else {
2562      doMapReduce(opts, getConf());
2563    }
2564  }
2565
2566  protected void printUsage() {
2567    printUsage(PE_COMMAND_SHORTNAME, null);
2568  }
2569
2570  protected static void printUsage(final String message) {
2571    printUsage(PE_COMMAND_SHORTNAME, message);
2572  }
2573
2574  protected static void printUsageAndExit(final String message, final int exitCode) {
2575    printUsage(message);
2576    System.exit(exitCode);
2577  }
2578
2579  protected static void printUsage(final String shortName, final String message) {
2580    if (message != null && message.length() > 0) {
2581      System.err.println(message);
2582    }
2583    System.err.print("Usage: hbase " + shortName);
2584    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2585    System.err.println();
2586    System.err.println("General Options:");
2587    System.err.println(
2588      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2589    System.err
2590      .println(" oneCon          all the threads share the same connection. Default: False");
2591    System.err.println(" connCount          connections all threads share. "
2592      + "For example, if set to 2, then all thread share 2 connection. "
2593      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2594      + "if not, connCount=thread number");
2595
2596    System.err.println(" sampleRate      Execute test on a sample of total "
2597      + "rows. Only supported by randomRead. Default: 1.0");
2598    System.err.println(" period          Report every 'period' rows: "
2599      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2600    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2601    System.err.println(
2602      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2603    System.err.println(" latency         Set to report operation latencies. Default: False");
2604    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2605      + "over lantencyThreshold, unit in millisecond, default 0");
2606    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2607      + " rows have been treated. Default: 0");
2608    System.err
2609      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2610    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2611      + "'valueSize'; set on read for stats on size: Default: Not set.");
2612    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2613      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2614    System.err.println();
2615    System.err.println("Table Creation / Write Tests:");
2616    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2617    System.err.println(
2618      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2619        + ".  In case of randomReads and randomSeekScans this could"
2620        + " be specified along with --size to specify the number of rows to be scanned within"
2621        + " the total range specified by the size.");
2622    System.err.println(
2623      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2624        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2625        + " use size to specify the end range and --rows"
2626        + " specifies the number of rows within that range. " + "Default: 1.0.");
2627    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2628    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2629    System.err.println(
2630      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2631    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2632      + "'valueSize' in zipf form: Default: Not set.");
2633    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2634    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2635    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2636      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2637    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2638      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2639      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2640    System.err.println(
2641      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2642    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2643      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2644    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2645    System.err.println(" columns         Columns to write per row. Default: 1");
2646    System.err
2647      .println(" families        Specify number of column families for the table. Default: 1");
2648    System.err.println();
2649    System.err.println("Read Tests:");
2650    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2651      + " there by not returning any thing back to the client.  Helps to check the server side"
2652      + " performance.  Uses FilterAllFilter internally. ");
2653    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2654      + "by randomRead. Default: disabled");
2655    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2656      + "inmemory as far as possible. Not guaranteed that reads are always served "
2657      + "from memory.  Default: false");
2658    System.err
2659      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2660    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2661    System.err
2662      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2663        + "Uses the CompactingMemstore");
2664    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2665    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2666    System.err.println(
2667      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2668    System.err.println(" caching         Scan caching to use. Default: 30");
2669    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2670    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2671    System.err.println(
2672      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2673    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2674    System.err.println();
2675    System.err.println(" Note: -D properties will be applied to the conf used. ");
2676    System.err.println("  For example: ");
2677    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2678    System.err.println("   -Dmapreduce.task.timeout=60000");
2679    System.err.println();
2680    System.err.println("Command:");
2681    for (CmdDescriptor command : COMMANDS.values()) {
2682      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2683    }
2684    System.err.println();
2685    System.err.println("Args:");
2686    System.err.println(" nclients        Integer. Required. Total number of clients "
2687      + "(and HRegionServers) running. 1 <= value <= 500");
2688    System.err.println("Examples:");
2689    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2690    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2691    System.err.println(" To run 10 clients doing increments over ten rows:");
2692    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2693  }
2694
2695  /**
2696   * Parse options passed in via an arguments array. Assumes that array has been split on
2697   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2698   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2699   * arguments.
2700   */
2701  static TestOptions parseOpts(Queue<String> args) {
2702    TestOptions opts = new TestOptions();
2703
2704    String cmd = null;
2705    while ((cmd = args.poll()) != null) {
2706      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2707        // place item back onto queue so that caller knows parsing was incomplete
2708        args.add(cmd);
2709        break;
2710      }
2711
2712      final String nmr = "--nomapred";
2713      if (cmd.startsWith(nmr)) {
2714        opts.nomapred = true;
2715        continue;
2716      }
2717
2718      final String rows = "--rows=";
2719      if (cmd.startsWith(rows)) {
2720        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2721        continue;
2722      }
2723
2724      final String cycles = "--cycles=";
2725      if (cmd.startsWith(cycles)) {
2726        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2727        continue;
2728      }
2729
2730      final String sampleRate = "--sampleRate=";
2731      if (cmd.startsWith(sampleRate)) {
2732        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2733        continue;
2734      }
2735
2736      final String table = "--table=";
2737      if (cmd.startsWith(table)) {
2738        opts.tableName = cmd.substring(table.length());
2739        continue;
2740      }
2741
2742      final String startRow = "--startRow=";
2743      if (cmd.startsWith(startRow)) {
2744        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2745        continue;
2746      }
2747
2748      final String compress = "--compress=";
2749      if (cmd.startsWith(compress)) {
2750        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2751        continue;
2752      }
2753
2754      final String encryption = "--encryption=";
2755      if (cmd.startsWith(encryption)) {
2756        opts.encryption = cmd.substring(encryption.length());
2757        continue;
2758      }
2759
2760      final String traceRate = "--traceRate=";
2761      if (cmd.startsWith(traceRate)) {
2762        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2763        continue;
2764      }
2765
2766      final String blockEncoding = "--blockEncoding=";
2767      if (cmd.startsWith(blockEncoding)) {
2768        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2769        continue;
2770      }
2771
2772      final String flushCommits = "--flushCommits=";
2773      if (cmd.startsWith(flushCommits)) {
2774        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2775        continue;
2776      }
2777
2778      final String writeToWAL = "--writeToWAL=";
2779      if (cmd.startsWith(writeToWAL)) {
2780        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2781        continue;
2782      }
2783
2784      final String presplit = "--presplit=";
2785      if (cmd.startsWith(presplit)) {
2786        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2787        continue;
2788      }
2789
2790      final String inMemory = "--inmemory=";
2791      if (cmd.startsWith(inMemory)) {
2792        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2793        continue;
2794      }
2795
2796      final String autoFlush = "--autoFlush=";
2797      if (cmd.startsWith(autoFlush)) {
2798        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2799        continue;
2800      }
2801
2802      final String onceCon = "--oneCon=";
2803      if (cmd.startsWith(onceCon)) {
2804        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2805        continue;
2806      }
2807
2808      final String connCount = "--connCount=";
2809      if (cmd.startsWith(connCount)) {
2810        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2811        continue;
2812      }
2813
2814      final String latencyThreshold = "--latencyThreshold=";
2815      if (cmd.startsWith(latencyThreshold)) {
2816        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2817        continue;
2818      }
2819
2820      final String latency = "--latency";
2821      if (cmd.startsWith(latency)) {
2822        opts.reportLatency = true;
2823        continue;
2824      }
2825
2826      final String multiGet = "--multiGet=";
2827      if (cmd.startsWith(multiGet)) {
2828        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2829        continue;
2830      }
2831
2832      final String multiPut = "--multiPut=";
2833      if (cmd.startsWith(multiPut)) {
2834        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2835        continue;
2836      }
2837
2838      final String useTags = "--usetags=";
2839      if (cmd.startsWith(useTags)) {
2840        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2841        continue;
2842      }
2843
2844      final String noOfTags = "--numoftags=";
2845      if (cmd.startsWith(noOfTags)) {
2846        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2847        continue;
2848      }
2849
2850      final String replicas = "--replicas=";
2851      if (cmd.startsWith(replicas)) {
2852        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2853        continue;
2854      }
2855
2856      final String filterOutAll = "--filterAll";
2857      if (cmd.startsWith(filterOutAll)) {
2858        opts.filterAll = true;
2859        continue;
2860      }
2861
2862      final String size = "--size=";
2863      if (cmd.startsWith(size)) {
2864        opts.size = Float.parseFloat(cmd.substring(size.length()));
2865        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2866        continue;
2867      }
2868
2869      final String splitPolicy = "--splitPolicy=";
2870      if (cmd.startsWith(splitPolicy)) {
2871        opts.splitPolicy = cmd.substring(splitPolicy.length());
2872        continue;
2873      }
2874
2875      final String randomSleep = "--randomSleep=";
2876      if (cmd.startsWith(randomSleep)) {
2877        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2878        continue;
2879      }
2880
2881      final String measureAfter = "--measureAfter=";
2882      if (cmd.startsWith(measureAfter)) {
2883        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2884        continue;
2885      }
2886
2887      final String bloomFilter = "--bloomFilter=";
2888      if (cmd.startsWith(bloomFilter)) {
2889        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2890        continue;
2891      }
2892
2893      final String blockSize = "--blockSize=";
2894      if (cmd.startsWith(blockSize)) {
2895        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2896        continue;
2897      }
2898
2899      final String valueSize = "--valueSize=";
2900      if (cmd.startsWith(valueSize)) {
2901        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2902        continue;
2903      }
2904
2905      final String valueRandom = "--valueRandom";
2906      if (cmd.startsWith(valueRandom)) {
2907        opts.valueRandom = true;
2908        continue;
2909      }
2910
2911      final String valueZipf = "--valueZipf";
2912      if (cmd.startsWith(valueZipf)) {
2913        opts.valueZipf = true;
2914        continue;
2915      }
2916
2917      final String period = "--period=";
2918      if (cmd.startsWith(period)) {
2919        opts.period = Integer.parseInt(cmd.substring(period.length()));
2920        continue;
2921      }
2922
2923      final String addColumns = "--addColumns=";
2924      if (cmd.startsWith(addColumns)) {
2925        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2926        continue;
2927      }
2928
2929      final String inMemoryCompaction = "--inmemoryCompaction=";
2930      if (cmd.startsWith(inMemoryCompaction)) {
2931        opts.inMemoryCompaction =
2932          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2933        continue;
2934      }
2935
2936      final String columns = "--columns=";
2937      if (cmd.startsWith(columns)) {
2938        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2939        continue;
2940      }
2941
2942      final String families = "--families=";
2943      if (cmd.startsWith(families)) {
2944        opts.families = Integer.parseInt(cmd.substring(families.length()));
2945        continue;
2946      }
2947
2948      final String caching = "--caching=";
2949      if (cmd.startsWith(caching)) {
2950        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2951        continue;
2952      }
2953
2954      final String asyncPrefetch = "--asyncPrefetch";
2955      if (cmd.startsWith(asyncPrefetch)) {
2956        opts.asyncPrefetch = true;
2957        continue;
2958      }
2959
2960      final String cacheBlocks = "--cacheBlocks=";
2961      if (cmd.startsWith(cacheBlocks)) {
2962        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2963        continue;
2964      }
2965
2966      final String scanReadType = "--scanReadType=";
2967      if (cmd.startsWith(scanReadType)) {
2968        opts.scanReadType =
2969          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2970        continue;
2971      }
2972
2973      final String bufferSize = "--bufferSize=";
2974      if (cmd.startsWith(bufferSize)) {
2975        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2976        continue;
2977      }
2978
2979      validateParsedOpts(opts);
2980
2981      if (isCommandClass(cmd)) {
2982        opts.cmdName = cmd;
2983        try {
2984          opts.numClientThreads = Integer.parseInt(args.remove());
2985        } catch (NoSuchElementException | NumberFormatException e) {
2986          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2987        }
2988        opts = calculateRowsAndSize(opts);
2989        break;
2990      } else {
2991        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2992      }
2993
2994      // Not matching any option or command.
2995      System.err.println("Error: Wrong option or command: " + cmd);
2996      args.add(cmd);
2997      break;
2998    }
2999    return opts;
3000  }
3001
3002  /**
3003   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3004   */
3005  private static void validateParsedOpts(TestOptions opts) {
3006
3007    if (!opts.autoFlush && opts.multiPut > 0) {
3008      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3009    }
3010
3011    if (opts.oneCon && opts.connCount > 1) {
3012      throw new IllegalArgumentException(
3013        "oneCon is set to true, " + "connCount should not bigger than 1");
3014    }
3015
3016    if (opts.valueZipf && opts.valueRandom) {
3017      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3018    }
3019  }
3020
3021  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3022    int rowsPerGB = getRowsPerGB(opts);
3023    if (
3024      (opts.getCmdName() != null
3025        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3026        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3027    ) {
3028      opts.totalRows = (int) opts.size * rowsPerGB;
3029    } else if (opts.size != DEFAULT_OPTS.size) {
3030      // total size in GB specified
3031      opts.totalRows = (int) opts.size * rowsPerGB;
3032      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3033    } else {
3034      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3035      opts.size = opts.totalRows / rowsPerGB;
3036    }
3037    return opts;
3038  }
3039
3040  static int getRowsPerGB(final TestOptions opts) {
3041    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3042      * opts.getColumns());
3043  }
3044
3045  @Override
3046  public int run(String[] args) throws Exception {
3047    // Process command-line args. TODO: Better cmd-line processing
3048    // (but hopefully something not as painful as cli options).
3049    int errCode = -1;
3050    if (args.length < 1) {
3051      printUsage();
3052      return errCode;
3053    }
3054
3055    try {
3056      LinkedList<String> argv = new LinkedList<>();
3057      argv.addAll(Arrays.asList(args));
3058      TestOptions opts = parseOpts(argv);
3059
3060      // args remaining, print help and exit
3061      if (!argv.isEmpty()) {
3062        errCode = 0;
3063        printUsage();
3064        return errCode;
3065      }
3066
3067      // must run at least 1 client
3068      if (opts.numClientThreads <= 0) {
3069        throw new IllegalArgumentException("Number of clients must be > 0");
3070      }
3071
3072      // cmdName should not be null, print help and exit
3073      if (opts.cmdName == null) {
3074        printUsage();
3075        return errCode;
3076      }
3077
3078      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3079      if (cmdClass != null) {
3080        runTest(cmdClass, opts);
3081        errCode = 0;
3082      }
3083
3084    } catch (Exception e) {
3085      e.printStackTrace();
3086    }
3087
3088    return errCode;
3089  }
3090
3091  private static boolean isCommandClass(String cmd) {
3092    return COMMANDS.containsKey(cmd);
3093  }
3094
3095  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3096    CmdDescriptor descriptor = COMMANDS.get(cmd);
3097    return descriptor != null ? descriptor.getCmdClass() : null;
3098  }
3099
3100  public static void main(final String[] args) throws Exception {
3101    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3102    System.exit(res);
3103  }
3104}