001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Queue;
040import java.util.Random;
041import java.util.TreeMap;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.ThreadLocalRandom;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.AsyncConnection;
056import org.apache.hadoop.hbase.client.AsyncTable;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionFactory;
061import org.apache.hadoop.hbase.client.Consistency;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Increment;
066import org.apache.hadoop.hbase.client.Put;
067import org.apache.hadoop.hbase.client.RegionInfo;
068import org.apache.hadoop.hbase.client.RegionInfoBuilder;
069import org.apache.hadoop.hbase.client.RegionLocator;
070import org.apache.hadoop.hbase.client.Result;
071import org.apache.hadoop.hbase.client.ResultScanner;
072import org.apache.hadoop.hbase.client.RowMutations;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
076import org.apache.hadoop.hbase.filter.BinaryComparator;
077import org.apache.hadoop.hbase.filter.Filter;
078import org.apache.hadoop.hbase.filter.FilterAllFilter;
079import org.apache.hadoop.hbase.filter.FilterList;
080import org.apache.hadoop.hbase.filter.PageFilter;
081import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
082import org.apache.hadoop.hbase.filter.WhileMatchFilter;
083import org.apache.hadoop.hbase.io.compress.Compression;
084import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
085import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
086import org.apache.hadoop.hbase.regionserver.BloomType;
087import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
088import org.apache.hadoop.hbase.trace.TraceUtil;
089import org.apache.hadoop.hbase.util.ByteArrayHashKey;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
092import org.apache.hadoop.hbase.util.GsonUtil;
093import org.apache.hadoop.hbase.util.Hash;
094import org.apache.hadoop.hbase.util.MurmurHash;
095import org.apache.hadoop.hbase.util.Pair;
096import org.apache.hadoop.hbase.util.RandomDistribution;
097import org.apache.hadoop.hbase.util.YammerHistogramUtils;
098import org.apache.hadoop.io.LongWritable;
099import org.apache.hadoop.io.Text;
100import org.apache.hadoop.mapreduce.Job;
101import org.apache.hadoop.mapreduce.Mapper;
102import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
103import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
104import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
105import org.apache.hadoop.util.Tool;
106import org.apache.hadoop.util.ToolRunner;
107import org.apache.yetus.audience.InterfaceAudience;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
113import org.apache.hbase.thirdparty.com.google.gson.Gson;
114
115/**
116 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
117 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
118 * etc.). Pass on the command-line which test to run and how many clients are participating in this
119 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
120 * <p>
121 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
122 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
123 * pages 8-10.
124 * <p>
125 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
126 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
127 * about 1GB of data, unless specified otherwise.
128 */
129@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
130public class PerformanceEvaluation extends Configured implements Tool {
131  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
132  static final String RANDOM_READ = "randomRead";
133  static final String PE_COMMAND_SHORTNAME = "pe";
134  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
135  private static final Gson GSON = GsonUtil.createGson().create();
136
137  public static final String TABLE_NAME = "TestTable";
138  public static final String FAMILY_NAME_BASE = "info";
139  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
140  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
141  public static final int DEFAULT_VALUE_LENGTH = 1000;
142  public static final int ROW_LENGTH = 26;
143
144  private static final int ONE_GB = 1024 * 1024 * 1000;
145  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
146  // TODO : should we make this configurable
147  private static final int TAG_LENGTH = 256;
148  private static final DecimalFormat FMT = new DecimalFormat("0.##");
149  private static final MathContext CXT = MathContext.DECIMAL64;
150  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
151  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
152  private static final TestOptions DEFAULT_OPTS = new TestOptions();
153
154  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
155  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
156
157  static {
158    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
159      "Run async random read test");
160    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
161      "Run async random write test");
162    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
163      "Run async sequential read test");
164    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
165      "Run async sequential write test");
166    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
167    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
168    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
169    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
170      "Run random seek and scan 100 test");
171    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
172      "Run random seek scan with both start and stop row (max 10 rows)");
173    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
174      "Run random seek scan with both start and stop row (max 100 rows)");
175    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
176      "Run random seek scan with both start and stop row (max 1000 rows)");
177    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
178      "Run random seek scan with both start and stop row (max 10000 rows)");
179    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
180    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
181    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
182    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
183      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
184    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
185    addCommandDescriptor(ReverseScanTest.class, "reverseScan",
186      "Run reverse scan test (read every row)");
187    addCommandDescriptor(FilteredScanTest.class, "filterScan",
188      "Run scan test using a filter to find a specific row based on it's value "
189        + "(make sure to use --rows=20)");
190    addCommandDescriptor(IncrementTest.class, "increment",
191      "Increment on each row; clients overlap on keyspace so some concurrent operations");
192    addCommandDescriptor(AppendTest.class, "append",
193      "Append on each row; clients overlap on keyspace so some concurrent operations");
194    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
195      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
196    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
197      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
198    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
199      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
200    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
201      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
202  }
203
204  /**
205   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
206   * associated properties.
207   */
208  protected static enum Counter {
209    /** elapsed time */
210    ELAPSED_TIME,
211    /** number of rows */
212    ROWS
213  }
214
215  protected static class RunResult implements Comparable<RunResult> {
216    public RunResult(long duration, Histogram hist) {
217      this.duration = duration;
218      this.hist = hist;
219      numbOfReplyOverThreshold = 0;
220      numOfReplyFromReplica = 0;
221    }
222
223    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
224      Histogram hist) {
225      this.duration = duration;
226      this.hist = hist;
227      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
228      this.numOfReplyFromReplica = numOfReplyFromReplica;
229    }
230
231    public final long duration;
232    public final Histogram hist;
233    public final long numbOfReplyOverThreshold;
234    public final long numOfReplyFromReplica;
235
236    @Override
237    public String toString() {
238      return Long.toString(duration);
239    }
240
241    @Override
242    public int compareTo(RunResult o) {
243      return Long.compare(this.duration, o.duration);
244    }
245  }
246
247  /**
248   * Constructor
249   * @param conf Configuration object
250   */
251  public PerformanceEvaluation(final Configuration conf) {
252    super(conf);
253  }
254
255  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
256    String description) {
257    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
258    COMMANDS.put(name, cmdDescriptor);
259  }
260
261  /**
262   * Implementations can have their status set.
263   */
264  interface Status {
265    /**
266     * Sets status
267     * @param msg status message
268     */
269    void setStatus(final String msg) throws IOException;
270  }
271
272  /**
273   * MapReduce job that runs a performance evaluation client in each map task.
274   */
275  public static class EvaluationMapTask
276    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
277
278    /** configuration parameter name that contains the command */
279    public final static String CMD_KEY = "EvaluationMapTask.command";
280    /** configuration parameter name that contains the PE impl */
281    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
282
283    private Class<? extends Test> cmd;
284
285    @Override
286    protected void setup(Context context) throws IOException, InterruptedException {
287      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
288
289      // this is required so that extensions of PE are instantiated within the
290      // map reduce task...
291      Class<? extends PerformanceEvaluation> peClass =
292        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
293      try {
294        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
295      } catch (Exception e) {
296        throw new IllegalStateException("Could not instantiate PE instance", e);
297      }
298    }
299
300    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
301      try {
302        return Class.forName(className).asSubclass(type);
303      } catch (ClassNotFoundException e) {
304        throw new IllegalStateException("Could not find class for name: " + className, e);
305      }
306    }
307
308    @Override
309    protected void map(LongWritable key, Text value, final Context context)
310      throws IOException, InterruptedException {
311
312      Status status = new Status() {
313        @Override
314        public void setStatus(String msg) {
315          context.setStatus(msg);
316        }
317      };
318
319      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
320      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
321      final Connection con = ConnectionFactory.createConnection(conf);
322      AsyncConnection asyncCon = null;
323      try {
324        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
325      } catch (ExecutionException e) {
326        throw new IOException(e);
327      }
328
329      // Evaluation task
330      RunResult result =
331        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
332      // Collect how much time the thing took. Report as map output and
333      // to the ELAPSED_TIME counter.
334      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
335      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
336      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
337      context.progress();
338    }
339  }
340
341  /*
342   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
343   * is specified or when the existing table's region replica count doesn't match {@code
344   * opts.replicas}.
345   */
346  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
347    TableName tableName = TableName.valueOf(opts.tableName);
348    boolean needsDelete = false, exists = admin.tableExists(tableName);
349    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
350      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
351    if (!exists && isReadCmd) {
352      throw new IllegalStateException(
353        "Must specify an existing table for read commands. Run a write command first.");
354    }
355    HTableDescriptor desc =
356      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
357    byte[][] splits = getSplits(opts);
358
359    // recreate the table when user has requested presplit or when existing
360    // {RegionSplitPolicy,replica count} does not match requested, or when the
361    // number of column families does not match requested.
362    if (
363      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
364        || (!isReadCmd && desc != null
365          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
366        || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
367        || (desc != null && desc.getColumnFamilyCount() != opts.families)
368    ) {
369      needsDelete = true;
370      // wait, why did it delete my table?!?
371      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
372        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
373        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
374        .add("replicas", opts.replicas).add("families", opts.families).toString());
375    }
376
377    // remove an existing table
378    if (needsDelete) {
379      if (admin.isTableEnabled(tableName)) {
380        admin.disableTable(tableName);
381      }
382      admin.deleteTable(tableName);
383    }
384
385    // table creation is necessary
386    if (!exists || needsDelete) {
387      desc = getTableDescriptor(opts);
388      if (splits != null) {
389        if (LOG.isDebugEnabled()) {
390          for (int i = 0; i < splits.length; i++) {
391            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
392          }
393        }
394      }
395      admin.createTable(desc, splits);
396      LOG.info("Table " + desc + " created");
397    }
398    return admin.tableExists(tableName);
399  }
400
401  /**
402   * Create an HTableDescriptor from provided TestOptions.
403   */
404  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
405    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
406    for (int family = 0; family < opts.families; family++) {
407      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
408      HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
409      familyDesc.setDataBlockEncoding(opts.blockEncoding);
410      familyDesc.setCompressionType(opts.compression);
411      familyDesc.setEncryptionType(opts.encryption);
412      familyDesc.setBloomFilterType(opts.bloomType);
413      familyDesc.setBlocksize(opts.blockSize);
414      if (opts.inMemoryCF) {
415        familyDesc.setInMemory(true);
416      }
417      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
418      tableDesc.addFamily(familyDesc);
419    }
420    if (opts.replicas != DEFAULT_OPTS.replicas) {
421      tableDesc.setRegionReplication(opts.replicas);
422    }
423    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
424      tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
425    }
426    return tableDesc;
427  }
428
429  /**
430   * generates splits based on total number of rows and specified split regions
431   */
432  protected static byte[][] getSplits(TestOptions opts) {
433    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
434
435    int numSplitPoints = opts.presplitRegions - 1;
436    byte[][] splits = new byte[numSplitPoints][];
437    int jump = opts.totalRows / opts.presplitRegions;
438    for (int i = 0; i < numSplitPoints; i++) {
439      int rowkey = jump * (1 + i);
440      splits[i] = format(rowkey);
441    }
442    return splits;
443  }
444
445  static void setupConnectionCount(final TestOptions opts) {
446    if (opts.oneCon) {
447      opts.connCount = 1;
448    } else {
449      if (opts.connCount == -1) {
450        // set to thread number if connCount is not set
451        opts.connCount = opts.numClientThreads;
452      }
453    }
454  }
455
456  /*
457   * Run all clients in this vm each to its own thread.
458   */
459  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
460    throws IOException, InterruptedException, ExecutionException {
461    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
462    assert cmd != null;
463    @SuppressWarnings("unchecked")
464    Future<RunResult>[] threads = new Future[opts.numClientThreads];
465    RunResult[] results = new RunResult[opts.numClientThreads];
466    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
467      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
468    setupConnectionCount(opts);
469    final Connection[] cons = new Connection[opts.connCount];
470    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
471    for (int i = 0; i < opts.connCount; i++) {
472      cons[i] = ConnectionFactory.createConnection(conf);
473      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
474    }
475    LOG
476      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
477    for (int i = 0; i < threads.length; i++) {
478      final int index = i;
479      threads[i] = pool.submit(new Callable<RunResult>() {
480        @Override
481        public RunResult call() throws Exception {
482          TestOptions threadOpts = new TestOptions(opts);
483          final Connection con = cons[index % cons.length];
484          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
485          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
486          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
487            @Override
488            public void setStatus(final String msg) throws IOException {
489              LOG.info(msg);
490            }
491          });
492          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
493            + "ms over " + threadOpts.perClientRunRows + " rows");
494          if (opts.latencyThreshold > 0) {
495            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
496              + "(ms) is " + run.numbOfReplyOverThreshold);
497          }
498          return run;
499        }
500      });
501    }
502    pool.shutdown();
503
504    for (int i = 0; i < threads.length; i++) {
505      try {
506        results[i] = threads[i].get();
507      } catch (ExecutionException e) {
508        throw new IOException(e.getCause());
509      }
510    }
511    final String test = cmd.getSimpleName();
512    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
513    Arrays.sort(results);
514    long total = 0;
515    float avgLatency = 0;
516    float avgTPS = 0;
517    long replicaWins = 0;
518    for (RunResult result : results) {
519      total += result.duration;
520      avgLatency += result.hist.getSnapshot().getMean();
521      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
522      replicaWins += result.numOfReplyFromReplica;
523    }
524    avgTPS *= 1000; // ms to second
525    avgLatency = avgLatency / results.length;
526    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
527      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
528    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
529    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
530    if (opts.replicas > 1) {
531      LOG.info("[results from replica regions] " + replicaWins);
532    }
533
534    for (int i = 0; i < opts.connCount; i++) {
535      cons[i].close();
536      asyncCons[i].close();
537    }
538
539    return results;
540  }
541
542  /*
543   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
544   * out an input file with instruction per client regards which row they are to start on.
545   * @param cmd Command to run.
546   */
547  static Job doMapReduce(TestOptions opts, final Configuration conf)
548    throws IOException, InterruptedException, ClassNotFoundException {
549    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
550    assert cmd != null;
551    Path inputDir = writeInputFile(conf, opts);
552    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
553    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
554    Job job = Job.getInstance(conf);
555    job.setJarByClass(PerformanceEvaluation.class);
556    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
557
558    job.setInputFormatClass(NLineInputFormat.class);
559    NLineInputFormat.setInputPaths(job, inputDir);
560    // this is default, but be explicit about it just in case.
561    NLineInputFormat.setNumLinesPerSplit(job, 1);
562
563    job.setOutputKeyClass(LongWritable.class);
564    job.setOutputValueClass(LongWritable.class);
565
566    job.setMapperClass(EvaluationMapTask.class);
567    job.setReducerClass(LongSumReducer.class);
568
569    job.setNumReduceTasks(1);
570
571    job.setOutputFormatClass(TextOutputFormat.class);
572    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
573
574    TableMapReduceUtil.addDependencyJars(job);
575    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
576                                                                                            // metrics
577      Gson.class, // gson
578      FilterAllFilter.class // hbase-server tests jar
579    );
580
581    TableMapReduceUtil.initCredentials(job);
582
583    job.waitForCompletion(true);
584    return job;
585  }
586
587  /**
588   * Each client has one mapper to do the work, and client do the resulting count in a map task.
589   */
590
591  static String JOB_INPUT_FILENAME = "input.txt";
592
593  /*
594   * Write input file of offsets-per-client for the mapreduce job.
595   * @param c Configuration
596   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
597   */
598  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
599    return writeInputFile(c, opts, new Path("."));
600  }
601
602  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
603    throws IOException {
604    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
605    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
606    Path inputDir = new Path(jobdir, "inputs");
607
608    FileSystem fs = FileSystem.get(c);
609    fs.mkdirs(inputDir);
610
611    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
612    PrintStream out = new PrintStream(fs.create(inputFile));
613    // Make input random.
614    Map<Integer, String> m = new TreeMap<>();
615    Hash h = MurmurHash.getInstance();
616    int perClientRows = (opts.totalRows / opts.numClientThreads);
617    try {
618      for (int j = 0; j < opts.numClientThreads; j++) {
619        TestOptions next = new TestOptions(opts);
620        next.startRow = j * perClientRows;
621        next.perClientRunRows = perClientRows;
622        String s = GSON.toJson(next);
623        LOG.info("Client=" + j + ", input=" + s);
624        byte[] b = Bytes.toBytes(s);
625        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
626        m.put(hash, s);
627      }
628      for (Map.Entry<Integer, String> e : m.entrySet()) {
629        out.println(e.getValue());
630      }
631    } finally {
632      out.close();
633    }
634    return inputDir;
635  }
636
637  /**
638   * Describes a command.
639   */
640  static class CmdDescriptor {
641    private Class<? extends TestBase> cmdClass;
642    private String name;
643    private String description;
644
645    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
646      this.cmdClass = cmdClass;
647      this.name = name;
648      this.description = description;
649    }
650
651    public Class<? extends TestBase> getCmdClass() {
652      return cmdClass;
653    }
654
655    public String getName() {
656      return name;
657    }
658
659    public String getDescription() {
660      return description;
661    }
662  }
663
664  /**
665   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
666   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
667   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
668   * need to add to the clone constructor below copying your new option from the 'that' to the
669   * 'this'. Look for 'clone' below.
670   */
671  static class TestOptions {
672    String cmdName = null;
673    boolean nomapred = false;
674    boolean filterAll = false;
675    int startRow = 0;
676    float size = 1.0f;
677    int perClientRunRows = DEFAULT_ROWS_PER_GB;
678    int numClientThreads = 1;
679    int totalRows = DEFAULT_ROWS_PER_GB;
680    int measureAfter = 0;
681    float sampleRate = 1.0f;
682    /**
683     * @deprecated Useless after switching to OpenTelemetry
684     */
685    @Deprecated
686    double traceRate = 0.0;
687    String tableName = TABLE_NAME;
688    boolean flushCommits = true;
689    boolean writeToWAL = true;
690    boolean autoFlush = false;
691    boolean oneCon = false;
692    int connCount = -1; // wil decide the actual num later
693    boolean useTags = false;
694    int noOfTags = 1;
695    boolean reportLatency = false;
696    int multiGet = 0;
697    int multiPut = 0;
698    int randomSleep = 0;
699    boolean inMemoryCF = false;
700    int presplitRegions = 0;
701    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
702    String splitPolicy = null;
703    Compression.Algorithm compression = Compression.Algorithm.NONE;
704    String encryption = null;
705    BloomType bloomType = BloomType.ROW;
706    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
707    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
708    boolean valueRandom = false;
709    boolean valueZipf = false;
710    int valueSize = DEFAULT_VALUE_LENGTH;
711    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
712    int cycles = 1;
713    int columns = 1;
714    int families = 1;
715    int caching = 30;
716    int latencyThreshold = 0; // in millsecond
717    boolean addColumns = true;
718    MemoryCompactionPolicy inMemoryCompaction =
719      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
720    boolean asyncPrefetch = false;
721    boolean cacheBlocks = true;
722    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
723    long bufferSize = 2l * 1024l * 1024l;
724
725    public TestOptions() {
726    }
727
728    /**
729     * Clone constructor.
730     * @param that Object to copy from.
731     */
732    public TestOptions(TestOptions that) {
733      this.cmdName = that.cmdName;
734      this.cycles = that.cycles;
735      this.nomapred = that.nomapred;
736      this.startRow = that.startRow;
737      this.size = that.size;
738      this.perClientRunRows = that.perClientRunRows;
739      this.numClientThreads = that.numClientThreads;
740      this.totalRows = that.totalRows;
741      this.sampleRate = that.sampleRate;
742      this.traceRate = that.traceRate;
743      this.tableName = that.tableName;
744      this.flushCommits = that.flushCommits;
745      this.writeToWAL = that.writeToWAL;
746      this.autoFlush = that.autoFlush;
747      this.oneCon = that.oneCon;
748      this.connCount = that.connCount;
749      this.useTags = that.useTags;
750      this.noOfTags = that.noOfTags;
751      this.reportLatency = that.reportLatency;
752      this.latencyThreshold = that.latencyThreshold;
753      this.multiGet = that.multiGet;
754      this.multiPut = that.multiPut;
755      this.inMemoryCF = that.inMemoryCF;
756      this.presplitRegions = that.presplitRegions;
757      this.replicas = that.replicas;
758      this.splitPolicy = that.splitPolicy;
759      this.compression = that.compression;
760      this.encryption = that.encryption;
761      this.blockEncoding = that.blockEncoding;
762      this.filterAll = that.filterAll;
763      this.bloomType = that.bloomType;
764      this.blockSize = that.blockSize;
765      this.valueRandom = that.valueRandom;
766      this.valueZipf = that.valueZipf;
767      this.valueSize = that.valueSize;
768      this.period = that.period;
769      this.randomSleep = that.randomSleep;
770      this.measureAfter = that.measureAfter;
771      this.addColumns = that.addColumns;
772      this.columns = that.columns;
773      this.families = that.families;
774      this.caching = that.caching;
775      this.inMemoryCompaction = that.inMemoryCompaction;
776      this.asyncPrefetch = that.asyncPrefetch;
777      this.cacheBlocks = that.cacheBlocks;
778      this.scanReadType = that.scanReadType;
779      this.bufferSize = that.bufferSize;
780    }
781
782    public int getCaching() {
783      return this.caching;
784    }
785
786    public void setCaching(final int caching) {
787      this.caching = caching;
788    }
789
790    public int getColumns() {
791      return this.columns;
792    }
793
794    public void setColumns(final int columns) {
795      this.columns = columns;
796    }
797
798    public int getFamilies() {
799      return this.families;
800    }
801
802    public void setFamilies(final int families) {
803      this.families = families;
804    }
805
806    public int getCycles() {
807      return this.cycles;
808    }
809
810    public void setCycles(final int cycles) {
811      this.cycles = cycles;
812    }
813
814    public boolean isValueZipf() {
815      return valueZipf;
816    }
817
818    public void setValueZipf(boolean valueZipf) {
819      this.valueZipf = valueZipf;
820    }
821
822    public String getCmdName() {
823      return cmdName;
824    }
825
826    public void setCmdName(String cmdName) {
827      this.cmdName = cmdName;
828    }
829
830    public int getRandomSleep() {
831      return randomSleep;
832    }
833
834    public void setRandomSleep(int randomSleep) {
835      this.randomSleep = randomSleep;
836    }
837
838    public int getReplicas() {
839      return replicas;
840    }
841
842    public void setReplicas(int replicas) {
843      this.replicas = replicas;
844    }
845
846    public String getSplitPolicy() {
847      return splitPolicy;
848    }
849
850    public void setSplitPolicy(String splitPolicy) {
851      this.splitPolicy = splitPolicy;
852    }
853
854    public void setNomapred(boolean nomapred) {
855      this.nomapred = nomapred;
856    }
857
858    public void setFilterAll(boolean filterAll) {
859      this.filterAll = filterAll;
860    }
861
862    public void setStartRow(int startRow) {
863      this.startRow = startRow;
864    }
865
866    public void setSize(float size) {
867      this.size = size;
868    }
869
870    public void setPerClientRunRows(int perClientRunRows) {
871      this.perClientRunRows = perClientRunRows;
872    }
873
874    public void setNumClientThreads(int numClientThreads) {
875      this.numClientThreads = numClientThreads;
876    }
877
878    public void setTotalRows(int totalRows) {
879      this.totalRows = totalRows;
880    }
881
882    public void setSampleRate(float sampleRate) {
883      this.sampleRate = sampleRate;
884    }
885
886    public void setTraceRate(double traceRate) {
887      this.traceRate = traceRate;
888    }
889
890    public void setTableName(String tableName) {
891      this.tableName = tableName;
892    }
893
894    public void setFlushCommits(boolean flushCommits) {
895      this.flushCommits = flushCommits;
896    }
897
898    public void setWriteToWAL(boolean writeToWAL) {
899      this.writeToWAL = writeToWAL;
900    }
901
902    public void setAutoFlush(boolean autoFlush) {
903      this.autoFlush = autoFlush;
904    }
905
906    public void setOneCon(boolean oneCon) {
907      this.oneCon = oneCon;
908    }
909
910    public int getConnCount() {
911      return connCount;
912    }
913
914    public void setConnCount(int connCount) {
915      this.connCount = connCount;
916    }
917
918    public void setUseTags(boolean useTags) {
919      this.useTags = useTags;
920    }
921
922    public void setNoOfTags(int noOfTags) {
923      this.noOfTags = noOfTags;
924    }
925
926    public void setReportLatency(boolean reportLatency) {
927      this.reportLatency = reportLatency;
928    }
929
930    public void setMultiGet(int multiGet) {
931      this.multiGet = multiGet;
932    }
933
934    public void setMultiPut(int multiPut) {
935      this.multiPut = multiPut;
936    }
937
938    public void setInMemoryCF(boolean inMemoryCF) {
939      this.inMemoryCF = inMemoryCF;
940    }
941
942    public void setPresplitRegions(int presplitRegions) {
943      this.presplitRegions = presplitRegions;
944    }
945
946    public void setCompression(Compression.Algorithm compression) {
947      this.compression = compression;
948    }
949
950    public void setEncryption(String encryption) {
951      this.encryption = encryption;
952    }
953
954    public void setBloomType(BloomType bloomType) {
955      this.bloomType = bloomType;
956    }
957
958    public void setBlockSize(int blockSize) {
959      this.blockSize = blockSize;
960    }
961
962    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
963      this.blockEncoding = blockEncoding;
964    }
965
966    public void setValueRandom(boolean valueRandom) {
967      this.valueRandom = valueRandom;
968    }
969
970    public void setValueSize(int valueSize) {
971      this.valueSize = valueSize;
972    }
973
974    public void setBufferSize(long bufferSize) {
975      this.bufferSize = bufferSize;
976    }
977
978    public void setPeriod(int period) {
979      this.period = period;
980    }
981
982    public boolean isNomapred() {
983      return nomapred;
984    }
985
986    public boolean isFilterAll() {
987      return filterAll;
988    }
989
990    public int getStartRow() {
991      return startRow;
992    }
993
994    public float getSize() {
995      return size;
996    }
997
998    public int getPerClientRunRows() {
999      return perClientRunRows;
1000    }
1001
1002    public int getNumClientThreads() {
1003      return numClientThreads;
1004    }
1005
1006    public int getTotalRows() {
1007      return totalRows;
1008    }
1009
1010    public float getSampleRate() {
1011      return sampleRate;
1012    }
1013
1014    public double getTraceRate() {
1015      return traceRate;
1016    }
1017
1018    public String getTableName() {
1019      return tableName;
1020    }
1021
1022    public boolean isFlushCommits() {
1023      return flushCommits;
1024    }
1025
1026    public boolean isWriteToWAL() {
1027      return writeToWAL;
1028    }
1029
1030    public boolean isAutoFlush() {
1031      return autoFlush;
1032    }
1033
1034    public boolean isUseTags() {
1035      return useTags;
1036    }
1037
1038    public int getNoOfTags() {
1039      return noOfTags;
1040    }
1041
1042    public boolean isReportLatency() {
1043      return reportLatency;
1044    }
1045
1046    public int getMultiGet() {
1047      return multiGet;
1048    }
1049
1050    public int getMultiPut() {
1051      return multiPut;
1052    }
1053
1054    public boolean isInMemoryCF() {
1055      return inMemoryCF;
1056    }
1057
1058    public int getPresplitRegions() {
1059      return presplitRegions;
1060    }
1061
1062    public Compression.Algorithm getCompression() {
1063      return compression;
1064    }
1065
1066    public String getEncryption() {
1067      return encryption;
1068    }
1069
1070    public DataBlockEncoding getBlockEncoding() {
1071      return blockEncoding;
1072    }
1073
1074    public boolean isValueRandom() {
1075      return valueRandom;
1076    }
1077
1078    public int getValueSize() {
1079      return valueSize;
1080    }
1081
1082    public int getPeriod() {
1083      return period;
1084    }
1085
1086    public BloomType getBloomType() {
1087      return bloomType;
1088    }
1089
1090    public int getBlockSize() {
1091      return blockSize;
1092    }
1093
1094    public boolean isOneCon() {
1095      return oneCon;
1096    }
1097
1098    public int getMeasureAfter() {
1099      return measureAfter;
1100    }
1101
1102    public void setMeasureAfter(int measureAfter) {
1103      this.measureAfter = measureAfter;
1104    }
1105
1106    public boolean getAddColumns() {
1107      return addColumns;
1108    }
1109
1110    public void setAddColumns(boolean addColumns) {
1111      this.addColumns = addColumns;
1112    }
1113
1114    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1115      this.inMemoryCompaction = inMemoryCompaction;
1116    }
1117
1118    public MemoryCompactionPolicy getInMemoryCompaction() {
1119      return this.inMemoryCompaction;
1120    }
1121
1122    public long getBufferSize() {
1123      return this.bufferSize;
1124    }
1125  }
1126
1127  /*
1128   * A test. Subclass to particularize what happens per row.
1129   */
1130  static abstract class TestBase {
1131    // Below is make it so when Tests are all running in the one
1132    // jvm, that they each have a differently seeded Random.
1133    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1134
1135    private static long nextRandomSeed() {
1136      return randomSeed.nextLong();
1137    }
1138
1139    private final int everyN;
1140
1141    protected final Random rand = new Random(nextRandomSeed());
1142    protected final Configuration conf;
1143    protected final TestOptions opts;
1144
1145    private final Status status;
1146
1147    private String testName;
1148    private Histogram latencyHistogram;
1149    private Histogram replicaLatencyHistogram;
1150    private Histogram valueSizeHistogram;
1151    private Histogram rpcCallsHistogram;
1152    private Histogram remoteRpcCallsHistogram;
1153    private Histogram millisBetweenNextHistogram;
1154    private Histogram regionsScannedHistogram;
1155    private Histogram bytesInResultsHistogram;
1156    private Histogram bytesInRemoteResultsHistogram;
1157    private RandomDistribution.Zipf zipf;
1158    private long numOfReplyOverLatencyThreshold = 0;
1159    private long numOfReplyFromReplica = 0;
1160
1161    /**
1162     * Note that all subclasses of this class must provide a public constructor that has the exact
1163     * same list of arguments.
1164     */
1165    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1166      this.conf = conf;
1167      this.opts = options;
1168      this.status = status;
1169      this.testName = this.getClass().getSimpleName();
1170      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1171      if (options.isValueZipf()) {
1172        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1173      }
1174      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1175    }
1176
1177    int getValueLength(final Random r) {
1178      if (this.opts.isValueRandom()) {
1179        return r.nextInt(opts.valueSize);
1180      } else if (this.opts.isValueZipf()) {
1181        return Math.abs(this.zipf.nextInt());
1182      } else {
1183        return opts.valueSize;
1184      }
1185    }
1186
1187    void updateValueSize(final Result[] rs) throws IOException {
1188      updateValueSize(rs, 0);
1189    }
1190
1191    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1192      if (rs == null || (latency == 0)) return;
1193      for (Result r : rs)
1194        updateValueSize(r, latency);
1195    }
1196
1197    void updateValueSize(final Result r) throws IOException {
1198      updateValueSize(r, 0);
1199    }
1200
1201    void updateValueSize(final Result r, final long latency) throws IOException {
1202      if (r == null || (latency == 0)) return;
1203      int size = 0;
1204      // update replicaHistogram
1205      if (r.isStale()) {
1206        replicaLatencyHistogram.update(latency / 1000);
1207        numOfReplyFromReplica++;
1208      }
1209      if (!isRandomValueSize()) return;
1210
1211      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1212        size += scanner.current().getValueLength();
1213      }
1214      updateValueSize(size);
1215    }
1216
1217    void updateValueSize(final int valueSize) {
1218      if (!isRandomValueSize()) return;
1219      this.valueSizeHistogram.update(valueSize);
1220    }
1221
1222    void updateScanMetrics(final ScanMetrics metrics) {
1223      if (metrics == null) return;
1224      Map<String, Long> metricsMap = metrics.getMetricsMap();
1225      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1226      if (rpcCalls != null) {
1227        this.rpcCallsHistogram.update(rpcCalls.longValue());
1228      }
1229      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1230      if (remoteRpcCalls != null) {
1231        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1232      }
1233      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1234      if (millisBetweenNext != null) {
1235        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1236      }
1237      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1238      if (regionsScanned != null) {
1239        this.regionsScannedHistogram.update(regionsScanned.longValue());
1240      }
1241      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1242      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1243        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1244      }
1245      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1246      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1247        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1248      }
1249    }
1250
1251    String generateStatus(final int sr, final int i, final int lr) {
1252      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1253        + getShortLatencyReport() + "]"
1254        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1255    }
1256
1257    boolean isRandomValueSize() {
1258      return opts.valueRandom;
1259    }
1260
1261    protected int getReportingPeriod() {
1262      return opts.period;
1263    }
1264
1265    /**
1266     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1267     */
1268    public Histogram getLatencyHistogram() {
1269      return latencyHistogram;
1270    }
1271
1272    void testSetup() throws IOException {
1273      // test metrics
1274      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1275      // If it is a replica test, set up histogram for replica.
1276      if (opts.replicas > 1) {
1277        replicaLatencyHistogram =
1278          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1279      }
1280      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1281      // scan metrics
1282      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1283      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1284      millisBetweenNextHistogram =
1285        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1286      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1287      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1288      bytesInRemoteResultsHistogram =
1289        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1290
1291      onStartup();
1292    }
1293
1294    abstract void onStartup() throws IOException;
1295
1296    void testTakedown() throws IOException {
1297      onTakedown();
1298      // Print all stats for this thread continuously.
1299      // Synchronize on Test.class so different threads don't intermingle the
1300      // output. We can't use 'this' here because each thread has its own instance of Test class.
1301      synchronized (Test.class) {
1302        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1303        status
1304          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1305        if (opts.replicas > 1) {
1306          status.setStatus("Latency (us) from Replica Regions: "
1307            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1308        }
1309        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1310        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1311        if (valueSizeHistogram.getCount() > 0) {
1312          status.setStatus(
1313            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1314          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1315          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1316        } else {
1317          status.setStatus("No valueSize statistics available");
1318        }
1319        if (rpcCallsHistogram.getCount() > 0) {
1320          status.setStatus(
1321            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1322        }
1323        if (remoteRpcCallsHistogram.getCount() > 0) {
1324          status.setStatus("remoteRpcCalls (count): "
1325            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1326        }
1327        if (millisBetweenNextHistogram.getCount() > 0) {
1328          status.setStatus("millisBetweenNext (latency): "
1329            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1330        }
1331        if (regionsScannedHistogram.getCount() > 0) {
1332          status.setStatus("regionsScanned (count): "
1333            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1334        }
1335        if (bytesInResultsHistogram.getCount() > 0) {
1336          status.setStatus("bytesInResults (size): "
1337            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1338        }
1339        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1340          status.setStatus("bytesInRemoteResults (size): "
1341            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1342        }
1343      }
1344    }
1345
1346    abstract void onTakedown() throws IOException;
1347
1348    /*
1349     * Run test
1350     * @return Elapsed time.
1351     */
1352    long test() throws IOException, InterruptedException {
1353      testSetup();
1354      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1355      final long startTime = System.nanoTime();
1356      try {
1357        testTimed();
1358      } finally {
1359        testTakedown();
1360      }
1361      return (System.nanoTime() - startTime) / 1000000;
1362    }
1363
1364    int getStartRow() {
1365      return opts.startRow;
1366    }
1367
1368    int getLastRow() {
1369      return getStartRow() + opts.perClientRunRows;
1370    }
1371
1372    /**
1373     * Provides an extension point for tests that don't want a per row invocation.
1374     */
1375    void testTimed() throws IOException, InterruptedException {
1376      int startRow = getStartRow();
1377      int lastRow = getLastRow();
1378      // Report on completion of 1/10th of total.
1379      for (int ii = 0; ii < opts.cycles; ii++) {
1380        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1381        for (int i = startRow; i < lastRow; i++) {
1382          if (i % everyN != 0) continue;
1383          long startTime = System.nanoTime();
1384          boolean requestSent = false;
1385          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1386          try (Scope scope = span.makeCurrent()) {
1387            requestSent = testRow(i, startTime);
1388          } finally {
1389            span.end();
1390          }
1391          if ((i - startRow) > opts.measureAfter) {
1392            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1393            // first 9 times and sends the actual get request in the 10th iteration.
1394            // We should only set latency when actual request is sent because otherwise
1395            // it turns out to be 0.
1396            if (requestSent) {
1397              long latency = (System.nanoTime() - startTime) / 1000;
1398              latencyHistogram.update(latency);
1399              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1400                numOfReplyOverLatencyThreshold++;
1401              }
1402            }
1403            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1404              status.setStatus(generateStatus(startRow, i, lastRow));
1405            }
1406          }
1407        }
1408      }
1409    }
1410
1411    /** Returns Subset of the histograms' calculation. */
1412    public String getShortLatencyReport() {
1413      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1414    }
1415
1416    /** Returns Subset of the histograms' calculation. */
1417    public String getShortValueSizeReport() {
1418      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1419    }
1420
1421    /**
1422     * Test for individual row.
1423     * @param i Row index.
1424     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1425     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1426     */
1427    abstract boolean testRow(final int i, final long startTime)
1428      throws IOException, InterruptedException;
1429  }
1430
1431  static abstract class Test extends TestBase {
1432    protected Connection connection;
1433
1434    Test(final Connection con, final TestOptions options, final Status status) {
1435      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1436      this.connection = con;
1437    }
1438  }
1439
1440  static abstract class AsyncTest extends TestBase {
1441    protected AsyncConnection connection;
1442
1443    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1444      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1445      this.connection = con;
1446    }
1447  }
1448
1449  static abstract class TableTest extends Test {
1450    protected Table table;
1451
1452    TableTest(Connection con, TestOptions options, Status status) {
1453      super(con, options, status);
1454    }
1455
1456    @Override
1457    void onStartup() throws IOException {
1458      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1459    }
1460
1461    @Override
1462    void onTakedown() throws IOException {
1463      table.close();
1464    }
1465  }
1466
1467  /*
1468   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1469   */
1470  static abstract class MetaTest extends TableTest {
1471    protected int keyLength;
1472
1473    MetaTest(Connection con, TestOptions options, Status status) {
1474      super(con, options, status);
1475      keyLength = Integer.toString(opts.perClientRunRows).length();
1476    }
1477
1478    @Override
1479    void onTakedown() throws IOException {
1480      // No clean up
1481    }
1482
1483    /*
1484     * Generates Lexicographically ascending strings
1485     */
1486    protected byte[] getSplitKey(final int i) {
1487      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1488    }
1489
1490  }
1491
1492  static abstract class AsyncTableTest extends AsyncTest {
1493    protected AsyncTable<?> table;
1494
1495    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1496      super(con, options, status);
1497    }
1498
1499    @Override
1500    void onStartup() throws IOException {
1501      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1502    }
1503
1504    @Override
1505    void onTakedown() throws IOException {
1506    }
1507  }
1508
1509  static class AsyncRandomReadTest extends AsyncTableTest {
1510    private final Consistency consistency;
1511    private ArrayList<Get> gets;
1512
1513    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1514      super(con, options, status);
1515      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1516      if (opts.multiGet > 0) {
1517        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1518        this.gets = new ArrayList<>(opts.multiGet);
1519      }
1520    }
1521
1522    @Override
1523    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1524      if (opts.randomSleep > 0) {
1525        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1526      }
1527      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1528      for (int family = 0; family < opts.families; family++) {
1529        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1530        if (opts.addColumns) {
1531          for (int column = 0; column < opts.columns; column++) {
1532            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1533            get.addColumn(familyName, qualifier);
1534          }
1535        } else {
1536          get.addFamily(familyName);
1537        }
1538      }
1539      if (opts.filterAll) {
1540        get.setFilter(new FilterAllFilter());
1541      }
1542      get.setConsistency(consistency);
1543      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1544      try {
1545        if (opts.multiGet > 0) {
1546          this.gets.add(get);
1547          if (this.gets.size() == opts.multiGet) {
1548            Result[] rs =
1549              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1550            updateValueSize(rs);
1551            this.gets.clear();
1552          } else {
1553            return false;
1554          }
1555        } else {
1556          updateValueSize(this.table.get(get).get());
1557        }
1558      } catch (ExecutionException e) {
1559        throw new IOException(e);
1560      }
1561      return true;
1562    }
1563
1564    public static RuntimeException runtime(Throwable e) {
1565      if (e instanceof RuntimeException) {
1566        return (RuntimeException) e;
1567      }
1568      return new RuntimeException(e);
1569    }
1570
1571    public static <V> V propagate(Callable<V> callable) {
1572      try {
1573        return callable.call();
1574      } catch (Exception e) {
1575        throw runtime(e);
1576      }
1577    }
1578
1579    @Override
1580    protected int getReportingPeriod() {
1581      int period = opts.perClientRunRows / 10;
1582      return period == 0 ? opts.perClientRunRows : period;
1583    }
1584
1585    @Override
1586    protected void testTakedown() throws IOException {
1587      if (this.gets != null && this.gets.size() > 0) {
1588        this.table.get(gets);
1589        this.gets.clear();
1590      }
1591      super.testTakedown();
1592    }
1593  }
1594
1595  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1596
1597    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1598      super(con, options, status);
1599    }
1600
1601    @Override
1602    protected byte[] generateRow(final int i) {
1603      return getRandomRow(this.rand, opts.totalRows);
1604    }
1605  }
1606
1607  static class AsyncScanTest extends AsyncTableTest {
1608    private ResultScanner testScanner;
1609    private AsyncTable<?> asyncTable;
1610
1611    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1612      super(con, options, status);
1613    }
1614
1615    @Override
1616    void onStartup() throws IOException {
1617      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1618        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1619    }
1620
1621    @Override
1622    void testTakedown() throws IOException {
1623      if (this.testScanner != null) {
1624        updateScanMetrics(this.testScanner.getScanMetrics());
1625        this.testScanner.close();
1626      }
1627      super.testTakedown();
1628    }
1629
1630    @Override
1631    boolean testRow(final int i, final long startTime) throws IOException {
1632      if (this.testScanner == null) {
1633        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1634          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1635          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1636        for (int family = 0; family < opts.families; family++) {
1637          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1638          if (opts.addColumns) {
1639            for (int column = 0; column < opts.columns; column++) {
1640              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1641              scan.addColumn(familyName, qualifier);
1642            }
1643          } else {
1644            scan.addFamily(familyName);
1645          }
1646        }
1647        if (opts.filterAll) {
1648          scan.setFilter(new FilterAllFilter());
1649        }
1650        this.testScanner = asyncTable.getScanner(scan);
1651      }
1652      Result r = testScanner.next();
1653      updateValueSize(r);
1654      return true;
1655    }
1656  }
1657
1658  static class AsyncSequentialReadTest extends AsyncTableTest {
1659    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1660      super(con, options, status);
1661    }
1662
1663    @Override
1664    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1665      Get get = new Get(format(i));
1666      for (int family = 0; family < opts.families; family++) {
1667        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1668        if (opts.addColumns) {
1669          for (int column = 0; column < opts.columns; column++) {
1670            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1671            get.addColumn(familyName, qualifier);
1672          }
1673        } else {
1674          get.addFamily(familyName);
1675        }
1676      }
1677      if (opts.filterAll) {
1678        get.setFilter(new FilterAllFilter());
1679      }
1680      try {
1681        updateValueSize(table.get(get).get());
1682      } catch (ExecutionException e) {
1683        throw new IOException(e);
1684      }
1685      return true;
1686    }
1687  }
1688
1689  static class AsyncSequentialWriteTest extends AsyncTableTest {
1690    private ArrayList<Put> puts;
1691
1692    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1693      super(con, options, status);
1694      if (opts.multiPut > 0) {
1695        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1696        this.puts = new ArrayList<>(opts.multiPut);
1697      }
1698    }
1699
1700    protected byte[] generateRow(final int i) {
1701      return format(i);
1702    }
1703
1704    @Override
1705    @SuppressWarnings("ReturnValueIgnored")
1706    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1707      byte[] row = generateRow(i);
1708      Put put = new Put(row);
1709      for (int family = 0; family < opts.families; family++) {
1710        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1711        for (int column = 0; column < opts.columns; column++) {
1712          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1713          byte[] value = generateData(this.rand, getValueLength(this.rand));
1714          if (opts.useTags) {
1715            byte[] tag = generateData(this.rand, TAG_LENGTH);
1716            Tag[] tags = new Tag[opts.noOfTags];
1717            for (int n = 0; n < opts.noOfTags; n++) {
1718              Tag t = new ArrayBackedTag((byte) n, tag);
1719              tags[n] = t;
1720            }
1721            KeyValue kv =
1722              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1723            put.add(kv);
1724            updateValueSize(kv.getValueLength());
1725          } else {
1726            put.addColumn(familyName, qualifier, value);
1727            updateValueSize(value.length);
1728          }
1729        }
1730      }
1731      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1732      try {
1733        table.put(put).get();
1734        if (opts.multiPut > 0) {
1735          this.puts.add(put);
1736          if (this.puts.size() == opts.multiPut) {
1737            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1738            this.puts.clear();
1739          } else {
1740            return false;
1741          }
1742        } else {
1743          table.put(put).get();
1744        }
1745      } catch (ExecutionException e) {
1746        throw new IOException(e);
1747      }
1748      return true;
1749    }
1750  }
1751
1752  static abstract class BufferedMutatorTest extends Test {
1753    protected BufferedMutator mutator;
1754    protected Table table;
1755
1756    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1757      super(con, options, status);
1758    }
1759
1760    @Override
1761    void onStartup() throws IOException {
1762      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1763      p.writeBufferSize(opts.bufferSize);
1764      this.mutator = connection.getBufferedMutator(p);
1765      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1766    }
1767
1768    @Override
1769    void onTakedown() throws IOException {
1770      mutator.close();
1771      table.close();
1772    }
1773  }
1774
1775  static class RandomSeekScanTest extends TableTest {
1776    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1777      super(con, options, status);
1778    }
1779
1780    @Override
1781    boolean testRow(final int i, final long startTime) throws IOException {
1782      Scan scan =
1783        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1784          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1785          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1786      FilterList list = new FilterList();
1787      for (int family = 0; family < opts.families; family++) {
1788        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1789        if (opts.addColumns) {
1790          for (int column = 0; column < opts.columns; column++) {
1791            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1792            scan.addColumn(familyName, qualifier);
1793          }
1794        } else {
1795          scan.addFamily(familyName);
1796        }
1797      }
1798      if (opts.filterAll) {
1799        list.addFilter(new FilterAllFilter());
1800      }
1801      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1802      scan.setFilter(list);
1803      ResultScanner s = this.table.getScanner(scan);
1804      try {
1805        for (Result rr; (rr = s.next()) != null;) {
1806          updateValueSize(rr);
1807        }
1808      } finally {
1809        updateScanMetrics(s.getScanMetrics());
1810        s.close();
1811      }
1812      return true;
1813    }
1814
1815    @Override
1816    protected int getReportingPeriod() {
1817      int period = opts.perClientRunRows / 100;
1818      return period == 0 ? opts.perClientRunRows : period;
1819    }
1820
1821  }
1822
1823  static abstract class RandomScanWithRangeTest extends TableTest {
1824    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1825      super(con, options, status);
1826    }
1827
1828    @Override
1829    boolean testRow(final int i, final long startTime) throws IOException {
1830      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1831      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1832        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1833        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1834        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1835      for (int family = 0; family < opts.families; family++) {
1836        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1837        if (opts.addColumns) {
1838          for (int column = 0; column < opts.columns; column++) {
1839            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1840            scan.addColumn(familyName, qualifier);
1841          }
1842        } else {
1843          scan.addFamily(familyName);
1844        }
1845      }
1846      if (opts.filterAll) {
1847        scan.setFilter(new FilterAllFilter());
1848      }
1849      Result r = null;
1850      int count = 0;
1851      ResultScanner s = this.table.getScanner(scan);
1852      try {
1853        for (; (r = s.next()) != null;) {
1854          updateValueSize(r);
1855          count++;
1856        }
1857        if (i % 100 == 0) {
1858          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1859            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1860            count));
1861        }
1862      } finally {
1863        updateScanMetrics(s.getScanMetrics());
1864        s.close();
1865      }
1866      return true;
1867    }
1868
1869    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1870
1871    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1872      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1873      int stop = start + maxRange;
1874      return new Pair<>(format(start), format(stop));
1875    }
1876
1877    @Override
1878    protected int getReportingPeriod() {
1879      int period = opts.perClientRunRows / 100;
1880      return period == 0 ? opts.perClientRunRows : period;
1881    }
1882  }
1883
1884  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1885    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1886      super(con, options, status);
1887    }
1888
1889    @Override
1890    protected Pair<byte[], byte[]> getStartAndStopRow() {
1891      return generateStartAndStopRows(10);
1892    }
1893  }
1894
1895  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1896    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1897      super(con, options, status);
1898    }
1899
1900    @Override
1901    protected Pair<byte[], byte[]> getStartAndStopRow() {
1902      return generateStartAndStopRows(100);
1903    }
1904  }
1905
1906  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1907    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1908      super(con, options, status);
1909    }
1910
1911    @Override
1912    protected Pair<byte[], byte[]> getStartAndStopRow() {
1913      return generateStartAndStopRows(1000);
1914    }
1915  }
1916
1917  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1918    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1919      super(con, options, status);
1920    }
1921
1922    @Override
1923    protected Pair<byte[], byte[]> getStartAndStopRow() {
1924      return generateStartAndStopRows(10000);
1925    }
1926  }
1927
1928  static class RandomReadTest extends TableTest {
1929    private final Consistency consistency;
1930    private ArrayList<Get> gets;
1931
1932    RandomReadTest(Connection con, TestOptions options, Status status) {
1933      super(con, options, status);
1934      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1935      if (opts.multiGet > 0) {
1936        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1937        this.gets = new ArrayList<>(opts.multiGet);
1938      }
1939    }
1940
1941    @Override
1942    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1943      if (opts.randomSleep > 0) {
1944        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1945      }
1946      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1947      for (int family = 0; family < opts.families; family++) {
1948        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1949        if (opts.addColumns) {
1950          for (int column = 0; column < opts.columns; column++) {
1951            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1952            get.addColumn(familyName, qualifier);
1953          }
1954        } else {
1955          get.addFamily(familyName);
1956        }
1957      }
1958      if (opts.filterAll) {
1959        get.setFilter(new FilterAllFilter());
1960      }
1961      get.setConsistency(consistency);
1962      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1963      if (opts.multiGet > 0) {
1964        this.gets.add(get);
1965        if (this.gets.size() == opts.multiGet) {
1966          Result[] rs = this.table.get(this.gets);
1967          if (opts.replicas > 1) {
1968            long latency = System.nanoTime() - startTime;
1969            updateValueSize(rs, latency);
1970          } else {
1971            updateValueSize(rs);
1972          }
1973          this.gets.clear();
1974        } else {
1975          return false;
1976        }
1977      } else {
1978        if (opts.replicas > 1) {
1979          Result r = this.table.get(get);
1980          long latency = System.nanoTime() - startTime;
1981          updateValueSize(r, latency);
1982        } else {
1983          updateValueSize(this.table.get(get));
1984        }
1985      }
1986      return true;
1987    }
1988
1989    @Override
1990    protected int getReportingPeriod() {
1991      int period = opts.perClientRunRows / 10;
1992      return period == 0 ? opts.perClientRunRows : period;
1993    }
1994
1995    @Override
1996    protected void testTakedown() throws IOException {
1997      if (this.gets != null && this.gets.size() > 0) {
1998        this.table.get(gets);
1999        this.gets.clear();
2000      }
2001      super.testTakedown();
2002    }
2003  }
2004
2005  /*
2006   * Send random reads against fake regions inserted by MetaWriteTest
2007   */
2008  static class MetaRandomReadTest extends MetaTest {
2009    private Random rd = new Random();
2010    private RegionLocator regionLocator;
2011
2012    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2013      super(con, options, status);
2014      LOG.info("call getRegionLocation");
2015    }
2016
2017    @Override
2018    void onStartup() throws IOException {
2019      super.onStartup();
2020      this.regionLocator = connection.getRegionLocator(table.getName());
2021    }
2022
2023    @Override
2024    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2025      if (opts.randomSleep > 0) {
2026        Thread.sleep(rd.nextInt(opts.randomSleep));
2027      }
2028      HRegionLocation hRegionLocation =
2029        regionLocator.getRegionLocation(getSplitKey(rd.nextInt(opts.perClientRunRows)), true);
2030      LOG.debug("get location for region: " + hRegionLocation);
2031      return true;
2032    }
2033
2034    @Override
2035    protected int getReportingPeriod() {
2036      int period = opts.perClientRunRows / 10;
2037      return period == 0 ? opts.perClientRunRows : period;
2038    }
2039
2040    @Override
2041    protected void testTakedown() throws IOException {
2042      super.testTakedown();
2043    }
2044  }
2045
2046  static class RandomWriteTest extends SequentialWriteTest {
2047    RandomWriteTest(Connection con, TestOptions options, Status status) {
2048      super(con, options, status);
2049    }
2050
2051    @Override
2052    protected byte[] generateRow(final int i) {
2053      return getRandomRow(this.rand, opts.totalRows);
2054    }
2055
2056  }
2057
2058  static class ScanTest extends TableTest {
2059    private ResultScanner testScanner;
2060
2061    ScanTest(Connection con, TestOptions options, Status status) {
2062      super(con, options, status);
2063    }
2064
2065    @Override
2066    void testTakedown() throws IOException {
2067      if (this.testScanner != null) {
2068        this.testScanner.close();
2069      }
2070      super.testTakedown();
2071    }
2072
2073    @Override
2074    boolean testRow(final int i, final long startTime) throws IOException {
2075      if (this.testScanner == null) {
2076        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2077          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2078          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2079        for (int family = 0; family < opts.families; family++) {
2080          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2081          if (opts.addColumns) {
2082            for (int column = 0; column < opts.columns; column++) {
2083              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2084              scan.addColumn(familyName, qualifier);
2085            }
2086          } else {
2087            scan.addFamily(familyName);
2088          }
2089        }
2090        if (opts.filterAll) {
2091          scan.setFilter(new FilterAllFilter());
2092        }
2093        this.testScanner = table.getScanner(scan);
2094      }
2095      Result r = testScanner.next();
2096      updateValueSize(r);
2097      return true;
2098    }
2099  }
2100
2101  static class ReverseScanTest extends TableTest {
2102    private ResultScanner testScanner;
2103
2104    ReverseScanTest(Connection con, TestOptions options, Status status) {
2105      super(con, options, status);
2106    }
2107
2108    @Override
2109    void testTakedown() throws IOException {
2110      if (this.testScanner != null) {
2111        this.testScanner.close();
2112      }
2113      super.testTakedown();
2114    }
2115
2116    @Override
2117    boolean testRow(final int i, final long startTime) throws IOException {
2118      if (this.testScanner == null) {
2119        Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2120          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2121          .setScanMetricsEnabled(true).setReversed(true);
2122        for (int family = 0; family < opts.families; family++) {
2123          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2124          if (opts.addColumns) {
2125            for (int column = 0; column < opts.columns; column++) {
2126              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2127              scan.addColumn(familyName, qualifier);
2128            }
2129          } else {
2130            scan.addFamily(familyName);
2131          }
2132        }
2133        if (opts.filterAll) {
2134          scan.setFilter(new FilterAllFilter());
2135        }
2136        this.testScanner = table.getScanner(scan);
2137      }
2138      Result r = testScanner.next();
2139      updateValueSize(r);
2140      return true;
2141    }
2142  }
2143
2144  /**
2145   * Base class for operations that are CAS-like; that read a value and then set it based off what
2146   * they read. In this category is increment, append, checkAndPut, etc.
2147   * <p>
2148   * These operations also want some concurrency going on. Usually when these tests run, they
2149   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2150   * same key space. We do this with our getStartRow and getLastRow overrides.
2151   */
2152  static abstract class CASTableTest extends TableTest {
2153    private final byte[] qualifier;
2154
2155    CASTableTest(Connection con, TestOptions options, Status status) {
2156      super(con, options, status);
2157      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2158    }
2159
2160    byte[] getQualifier() {
2161      return this.qualifier;
2162    }
2163
2164    @Override
2165    int getStartRow() {
2166      return 0;
2167    }
2168
2169    @Override
2170    int getLastRow() {
2171      return opts.perClientRunRows;
2172    }
2173  }
2174
2175  static class IncrementTest extends CASTableTest {
2176    IncrementTest(Connection con, TestOptions options, Status status) {
2177      super(con, options, status);
2178    }
2179
2180    @Override
2181    boolean testRow(final int i, final long startTime) throws IOException {
2182      Increment increment = new Increment(format(i));
2183      // unlike checkAndXXX tests, which make most sense to do on a single value,
2184      // if multiple families are specified for an increment test we assume it is
2185      // meant to raise the work factor
2186      for (int family = 0; family < opts.families; family++) {
2187        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2188        increment.addColumn(familyName, getQualifier(), 1l);
2189      }
2190      updateValueSize(this.table.increment(increment));
2191      return true;
2192    }
2193  }
2194
2195  static class AppendTest extends CASTableTest {
2196    AppendTest(Connection con, TestOptions options, Status status) {
2197      super(con, options, status);
2198    }
2199
2200    @Override
2201    boolean testRow(final int i, final long startTime) throws IOException {
2202      byte[] bytes = format(i);
2203      Append append = new Append(bytes);
2204      // unlike checkAndXXX tests, which make most sense to do on a single value,
2205      // if multiple families are specified for an append test we assume it is
2206      // meant to raise the work factor
2207      for (int family = 0; family < opts.families; family++) {
2208        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2209        append.addColumn(familyName, getQualifier(), bytes);
2210      }
2211      updateValueSize(this.table.append(append));
2212      return true;
2213    }
2214  }
2215
2216  static class CheckAndMutateTest extends CASTableTest {
2217    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2218      super(con, options, status);
2219    }
2220
2221    @Override
2222    boolean testRow(final int i, final long startTime) throws IOException {
2223      final byte[] bytes = format(i);
2224      // checkAndXXX tests operate on only a single value
2225      // Put a known value so when we go to check it, it is there.
2226      Put put = new Put(bytes);
2227      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2228      this.table.put(put);
2229      RowMutations mutations = new RowMutations(bytes);
2230      mutations.add(put);
2231      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2232        .thenMutate(mutations);
2233      return true;
2234    }
2235  }
2236
2237  static class CheckAndPutTest extends CASTableTest {
2238    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2239      super(con, options, status);
2240    }
2241
2242    @Override
2243    boolean testRow(final int i, final long startTime) throws IOException {
2244      final byte[] bytes = format(i);
2245      // checkAndXXX tests operate on only a single value
2246      // Put a known value so when we go to check it, it is there.
2247      Put put = new Put(bytes);
2248      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2249      this.table.put(put);
2250      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2251        .thenPut(put);
2252      return true;
2253    }
2254  }
2255
2256  static class CheckAndDeleteTest extends CASTableTest {
2257    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2258      super(con, options, status);
2259    }
2260
2261    @Override
2262    boolean testRow(final int i, final long startTime) throws IOException {
2263      final byte[] bytes = format(i);
2264      // checkAndXXX tests operate on only a single value
2265      // Put a known value so when we go to check it, it is there.
2266      Put put = new Put(bytes);
2267      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2268      this.table.put(put);
2269      Delete delete = new Delete(put.getRow());
2270      delete.addColumn(FAMILY_ZERO, getQualifier());
2271      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2272        .thenDelete(delete);
2273      return true;
2274    }
2275  }
2276
2277  /*
2278   * Delete all fake regions inserted to meta table by MetaWriteTest.
2279   */
2280  static class CleanMetaTest extends MetaTest {
2281    CleanMetaTest(Connection con, TestOptions options, Status status) {
2282      super(con, options, status);
2283    }
2284
2285    @Override
2286    boolean testRow(final int i, final long startTime) throws IOException {
2287      try {
2288        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2289          .getRegionLocation(getSplitKey(i), false).getRegion();
2290        LOG.debug("deleting region from meta: " + regionInfo);
2291
2292        Delete delete =
2293          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2294        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2295          t.delete(delete);
2296        }
2297      } catch (IOException ie) {
2298        // Log and continue
2299        LOG.error("cannot find region with start key: " + i);
2300      }
2301      return true;
2302    }
2303  }
2304
2305  static class SequentialReadTest extends TableTest {
2306    SequentialReadTest(Connection con, TestOptions options, Status status) {
2307      super(con, options, status);
2308    }
2309
2310    @Override
2311    boolean testRow(final int i, final long startTime) throws IOException {
2312      Get get = new Get(format(i));
2313      for (int family = 0; family < opts.families; family++) {
2314        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2315        if (opts.addColumns) {
2316          for (int column = 0; column < opts.columns; column++) {
2317            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2318            get.addColumn(familyName, qualifier);
2319          }
2320        } else {
2321          get.addFamily(familyName);
2322        }
2323      }
2324      if (opts.filterAll) {
2325        get.setFilter(new FilterAllFilter());
2326      }
2327      updateValueSize(table.get(get));
2328      return true;
2329    }
2330  }
2331
2332  static class SequentialWriteTest extends BufferedMutatorTest {
2333    private ArrayList<Put> puts;
2334
2335    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2336      super(con, options, status);
2337      if (opts.multiPut > 0) {
2338        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2339        this.puts = new ArrayList<>(opts.multiPut);
2340      }
2341    }
2342
2343    protected byte[] generateRow(final int i) {
2344      return format(i);
2345    }
2346
2347    @Override
2348    boolean testRow(final int i, final long startTime) throws IOException {
2349      byte[] row = generateRow(i);
2350      Put put = new Put(row);
2351      for (int family = 0; family < opts.families; family++) {
2352        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2353        for (int column = 0; column < opts.columns; column++) {
2354          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2355          byte[] value = generateData(this.rand, getValueLength(this.rand));
2356          if (opts.useTags) {
2357            byte[] tag = generateData(this.rand, TAG_LENGTH);
2358            Tag[] tags = new Tag[opts.noOfTags];
2359            for (int n = 0; n < opts.noOfTags; n++) {
2360              Tag t = new ArrayBackedTag((byte) n, tag);
2361              tags[n] = t;
2362            }
2363            KeyValue kv =
2364              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2365            put.add(kv);
2366            updateValueSize(kv.getValueLength());
2367          } else {
2368            put.addColumn(familyName, qualifier, value);
2369            updateValueSize(value.length);
2370          }
2371        }
2372      }
2373      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2374      if (opts.autoFlush) {
2375        if (opts.multiPut > 0) {
2376          this.puts.add(put);
2377          if (this.puts.size() == opts.multiPut) {
2378            table.put(this.puts);
2379            this.puts.clear();
2380          } else {
2381            return false;
2382          }
2383        } else {
2384          table.put(put);
2385        }
2386      } else {
2387        mutator.mutate(put);
2388      }
2389      return true;
2390    }
2391  }
2392
2393  /*
2394   * Insert fake regions into meta table with contiguous split keys.
2395   */
2396  static class MetaWriteTest extends MetaTest {
2397
2398    MetaWriteTest(Connection con, TestOptions options, Status status) {
2399      super(con, options, status);
2400    }
2401
2402    @Override
2403    boolean testRow(final int i, final long startTime) throws IOException {
2404      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2405      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2406        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2407      regionInfos.add(regionInfo);
2408      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2409
2410      // write the serverName columns
2411      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2412        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2413        EnvironmentEdgeManager.currentTime());
2414      return true;
2415    }
2416  }
2417
2418  static class FilteredScanTest extends TableTest {
2419    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2420
2421    FilteredScanTest(Connection con, TestOptions options, Status status) {
2422      super(con, options, status);
2423      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2424        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2425          + ". This could take a very long time.");
2426      }
2427    }
2428
2429    @Override
2430    boolean testRow(int i, final long startTime) throws IOException {
2431      byte[] value = generateData(this.rand, getValueLength(this.rand));
2432      Scan scan = constructScan(value);
2433      ResultScanner scanner = null;
2434      try {
2435        scanner = this.table.getScanner(scan);
2436        for (Result r = null; (r = scanner.next()) != null;) {
2437          updateValueSize(r);
2438        }
2439      } finally {
2440        if (scanner != null) {
2441          updateScanMetrics(scanner.getScanMetrics());
2442          scanner.close();
2443        }
2444      }
2445      return true;
2446    }
2447
2448    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2449      FilterList list = new FilterList();
2450      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2451        new BinaryComparator(valuePrefix));
2452      list.addFilter(filter);
2453      if (opts.filterAll) {
2454        list.addFilter(new FilterAllFilter());
2455      }
2456      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2457        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2458        .setScanMetricsEnabled(true);
2459      if (opts.addColumns) {
2460        for (int column = 0; column < opts.columns; column++) {
2461          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2462          scan.addColumn(FAMILY_ZERO, qualifier);
2463        }
2464      } else {
2465        scan.addFamily(FAMILY_ZERO);
2466      }
2467      scan.setFilter(list);
2468      return scan;
2469    }
2470  }
2471
2472  /**
2473   * Compute a throughput rate in MB/s.
2474   * @param rows   Number of records consumed.
2475   * @param timeMs Time taken in milliseconds.
2476   * @return String value with label, ie '123.76 MB/s'
2477   */
2478  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2479    int columns) {
2480    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2481      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2482    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2483      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2484    return FMT.format(mbps) + " MB/s";
2485  }
2486
2487  /*
2488   * Format passed integer.
2489   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2490   * absolute in case number is negative).
2491   */
2492  public static byte[] format(final int number) {
2493    byte[] b = new byte[ROW_LENGTH];
2494    int d = Math.abs(number);
2495    for (int i = b.length - 1; i >= 0; i--) {
2496      b[i] = (byte) ((d % 10) + '0');
2497      d /= 10;
2498    }
2499    return b;
2500  }
2501
2502  /*
2503   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2504   * test, generation of the key and value consumes about 30% of CPU time.
2505   * @return Generated random value to insert into a table cell.
2506   */
2507  public static byte[] generateData(final Random r, int length) {
2508    byte[] b = new byte[length];
2509    int i;
2510
2511    for (i = 0; i < (length - 8); i += 8) {
2512      b[i] = (byte) (65 + r.nextInt(26));
2513      b[i + 1] = b[i];
2514      b[i + 2] = b[i];
2515      b[i + 3] = b[i];
2516      b[i + 4] = b[i];
2517      b[i + 5] = b[i];
2518      b[i + 6] = b[i];
2519      b[i + 7] = b[i];
2520    }
2521
2522    byte a = (byte) (65 + r.nextInt(26));
2523    for (; i < length; i++) {
2524      b[i] = a;
2525    }
2526    return b;
2527  }
2528
2529  static byte[] getRandomRow(final Random random, final int totalRows) {
2530    return format(generateRandomRow(random, totalRows));
2531  }
2532
2533  static int generateRandomRow(final Random random, final int totalRows) {
2534    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2535  }
2536
2537  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2538    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2539    throws IOException, InterruptedException {
2540    status.setStatus(
2541      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2542    long totalElapsedTime;
2543
2544    final TestBase t;
2545    try {
2546      if (AsyncTest.class.isAssignableFrom(cmd)) {
2547        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2548        Constructor<? extends AsyncTest> constructor =
2549          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2550        t = constructor.newInstance(asyncCon, opts, status);
2551      } else {
2552        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2553        Constructor<? extends Test> constructor =
2554          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2555        t = constructor.newInstance(con, opts, status);
2556      }
2557    } catch (NoSuchMethodException e) {
2558      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2559        + ".  It does not provide a constructor as described by "
2560        + "the javadoc comment.  Available constructors are: "
2561        + Arrays.toString(cmd.getConstructors()));
2562    } catch (Exception e) {
2563      throw new IllegalStateException("Failed to construct command class", e);
2564    }
2565    totalElapsedTime = t.test();
2566
2567    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2568      + " for " + opts.perClientRunRows + " rows" + " ("
2569      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2570        getAverageValueLength(opts), opts.families, opts.columns)
2571      + ")");
2572
2573    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2574      t.numOfReplyFromReplica, t.getLatencyHistogram());
2575  }
2576
2577  private static int getAverageValueLength(final TestOptions opts) {
2578    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2579  }
2580
2581  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2582    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2583    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2584    // the TestOptions introspection for us and dump the output in a readable format.
2585    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2586    Admin admin = null;
2587    Connection connection = null;
2588    try {
2589      connection = ConnectionFactory.createConnection(getConf());
2590      admin = connection.getAdmin();
2591      checkTable(admin, opts);
2592    } finally {
2593      if (admin != null) admin.close();
2594      if (connection != null) connection.close();
2595    }
2596    if (opts.nomapred) {
2597      doLocalClients(opts, getConf());
2598    } else {
2599      doMapReduce(opts, getConf());
2600    }
2601  }
2602
2603  protected void printUsage() {
2604    printUsage(PE_COMMAND_SHORTNAME, null);
2605  }
2606
2607  protected static void printUsage(final String message) {
2608    printUsage(PE_COMMAND_SHORTNAME, message);
2609  }
2610
2611  protected static void printUsageAndExit(final String message, final int exitCode) {
2612    printUsage(message);
2613    System.exit(exitCode);
2614  }
2615
2616  protected static void printUsage(final String shortName, final String message) {
2617    if (message != null && message.length() > 0) {
2618      System.err.println(message);
2619    }
2620    System.err.print("Usage: hbase " + shortName);
2621    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2622    System.err.println();
2623    System.err.println("General Options:");
2624    System.err.println(
2625      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2626    System.err
2627      .println(" oneCon          all the threads share the same connection. Default: False");
2628    System.err.println(" connCount          connections all threads share. "
2629      + "For example, if set to 2, then all thread share 2 connection. "
2630      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2631      + "if not, connCount=thread number");
2632
2633    System.err.println(" sampleRate      Execute test on a sample of total "
2634      + "rows. Only supported by randomRead. Default: 1.0");
2635    System.err.println(" period          Report every 'period' rows: "
2636      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2637    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2638    System.err.println(
2639      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2640    System.err.println(" latency         Set to report operation latencies. Default: False");
2641    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2642      + "over lantencyThreshold, unit in millisecond, default 0");
2643    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2644      + " rows have been treated. Default: 0");
2645    System.err
2646      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2647    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2648      + "'valueSize'; set on read for stats on size: Default: Not set.");
2649    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2650      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2651    System.err.println();
2652    System.err.println("Table Creation / Write Tests:");
2653    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2654    System.err.println(
2655      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2656        + ".  In case of randomReads and randomSeekScans this could"
2657        + " be specified along with --size to specify the number of rows to be scanned within"
2658        + " the total range specified by the size.");
2659    System.err.println(
2660      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2661        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2662        + " use size to specify the end range and --rows"
2663        + " specifies the number of rows within that range. " + "Default: 1.0.");
2664    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2665    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2666    System.err.println(
2667      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2668    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2669      + "'valueSize' in zipf form: Default: Not set.");
2670    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2671    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2672    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2673      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2674    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2675      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2676      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2677    System.err.println(
2678      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2679    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2680      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2681    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2682    System.err.println(" columns         Columns to write per row. Default: 1");
2683    System.err
2684      .println(" families        Specify number of column families for the table. Default: 1");
2685    System.err.println();
2686    System.err.println("Read Tests:");
2687    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2688      + " there by not returning any thing back to the client.  Helps to check the server side"
2689      + " performance.  Uses FilterAllFilter internally. ");
2690    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2691      + "by randomRead. Default: disabled");
2692    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2693      + "inmemory as far as possible. Not guaranteed that reads are always served "
2694      + "from memory.  Default: false");
2695    System.err
2696      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2697    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2698    System.err
2699      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2700        + "Uses the CompactingMemstore");
2701    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2702    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2703    System.err.println(
2704      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2705    System.err.println(" caching         Scan caching to use. Default: 30");
2706    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2707    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2708    System.err.println(
2709      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2710    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2711    System.err.println();
2712    System.err.println(" Note: -D properties will be applied to the conf used. ");
2713    System.err.println("  For example: ");
2714    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2715    System.err.println("   -Dmapreduce.task.timeout=60000");
2716    System.err.println();
2717    System.err.println("Command:");
2718    for (CmdDescriptor command : COMMANDS.values()) {
2719      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2720    }
2721    System.err.println();
2722    System.err.println("Args:");
2723    System.err.println(" nclients        Integer. Required. Total number of clients "
2724      + "(and HRegionServers) running. 1 <= value <= 500");
2725    System.err.println("Examples:");
2726    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2727    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2728    System.err.println(" To run 10 clients doing increments over ten rows:");
2729    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2730  }
2731
2732  /**
2733   * Parse options passed in via an arguments array. Assumes that array has been split on
2734   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2735   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2736   * arguments.
2737   */
2738  static TestOptions parseOpts(Queue<String> args) {
2739    TestOptions opts = new TestOptions();
2740
2741    String cmd = null;
2742    while ((cmd = args.poll()) != null) {
2743      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2744        // place item back onto queue so that caller knows parsing was incomplete
2745        args.add(cmd);
2746        break;
2747      }
2748
2749      final String nmr = "--nomapred";
2750      if (cmd.startsWith(nmr)) {
2751        opts.nomapred = true;
2752        continue;
2753      }
2754
2755      final String rows = "--rows=";
2756      if (cmd.startsWith(rows)) {
2757        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2758        continue;
2759      }
2760
2761      final String cycles = "--cycles=";
2762      if (cmd.startsWith(cycles)) {
2763        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2764        continue;
2765      }
2766
2767      final String sampleRate = "--sampleRate=";
2768      if (cmd.startsWith(sampleRate)) {
2769        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2770        continue;
2771      }
2772
2773      final String table = "--table=";
2774      if (cmd.startsWith(table)) {
2775        opts.tableName = cmd.substring(table.length());
2776        continue;
2777      }
2778
2779      final String startRow = "--startRow=";
2780      if (cmd.startsWith(startRow)) {
2781        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2782        continue;
2783      }
2784
2785      final String compress = "--compress=";
2786      if (cmd.startsWith(compress)) {
2787        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2788        continue;
2789      }
2790
2791      final String encryption = "--encryption=";
2792      if (cmd.startsWith(encryption)) {
2793        opts.encryption = cmd.substring(encryption.length());
2794        continue;
2795      }
2796
2797      final String traceRate = "--traceRate=";
2798      if (cmd.startsWith(traceRate)) {
2799        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2800        continue;
2801      }
2802
2803      final String blockEncoding = "--blockEncoding=";
2804      if (cmd.startsWith(blockEncoding)) {
2805        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2806        continue;
2807      }
2808
2809      final String flushCommits = "--flushCommits=";
2810      if (cmd.startsWith(flushCommits)) {
2811        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2812        continue;
2813      }
2814
2815      final String writeToWAL = "--writeToWAL=";
2816      if (cmd.startsWith(writeToWAL)) {
2817        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2818        continue;
2819      }
2820
2821      final String presplit = "--presplit=";
2822      if (cmd.startsWith(presplit)) {
2823        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2824        continue;
2825      }
2826
2827      final String inMemory = "--inmemory=";
2828      if (cmd.startsWith(inMemory)) {
2829        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2830        continue;
2831      }
2832
2833      final String autoFlush = "--autoFlush=";
2834      if (cmd.startsWith(autoFlush)) {
2835        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2836        continue;
2837      }
2838
2839      final String onceCon = "--oneCon=";
2840      if (cmd.startsWith(onceCon)) {
2841        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2842        continue;
2843      }
2844
2845      final String connCount = "--connCount=";
2846      if (cmd.startsWith(connCount)) {
2847        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2848        continue;
2849      }
2850
2851      final String latencyThreshold = "--latencyThreshold=";
2852      if (cmd.startsWith(latencyThreshold)) {
2853        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2854        continue;
2855      }
2856
2857      final String latency = "--latency";
2858      if (cmd.startsWith(latency)) {
2859        opts.reportLatency = true;
2860        continue;
2861      }
2862
2863      final String multiGet = "--multiGet=";
2864      if (cmd.startsWith(multiGet)) {
2865        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2866        continue;
2867      }
2868
2869      final String multiPut = "--multiPut=";
2870      if (cmd.startsWith(multiPut)) {
2871        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2872        continue;
2873      }
2874
2875      final String useTags = "--usetags=";
2876      if (cmd.startsWith(useTags)) {
2877        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2878        continue;
2879      }
2880
2881      final String noOfTags = "--numoftags=";
2882      if (cmd.startsWith(noOfTags)) {
2883        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2884        continue;
2885      }
2886
2887      final String replicas = "--replicas=";
2888      if (cmd.startsWith(replicas)) {
2889        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2890        continue;
2891      }
2892
2893      final String filterOutAll = "--filterAll";
2894      if (cmd.startsWith(filterOutAll)) {
2895        opts.filterAll = true;
2896        continue;
2897      }
2898
2899      final String size = "--size=";
2900      if (cmd.startsWith(size)) {
2901        opts.size = Float.parseFloat(cmd.substring(size.length()));
2902        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2903        continue;
2904      }
2905
2906      final String splitPolicy = "--splitPolicy=";
2907      if (cmd.startsWith(splitPolicy)) {
2908        opts.splitPolicy = cmd.substring(splitPolicy.length());
2909        continue;
2910      }
2911
2912      final String randomSleep = "--randomSleep=";
2913      if (cmd.startsWith(randomSleep)) {
2914        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2915        continue;
2916      }
2917
2918      final String measureAfter = "--measureAfter=";
2919      if (cmd.startsWith(measureAfter)) {
2920        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2921        continue;
2922      }
2923
2924      final String bloomFilter = "--bloomFilter=";
2925      if (cmd.startsWith(bloomFilter)) {
2926        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2927        continue;
2928      }
2929
2930      final String blockSize = "--blockSize=";
2931      if (cmd.startsWith(blockSize)) {
2932        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2933        continue;
2934      }
2935
2936      final String valueSize = "--valueSize=";
2937      if (cmd.startsWith(valueSize)) {
2938        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2939        continue;
2940      }
2941
2942      final String valueRandom = "--valueRandom";
2943      if (cmd.startsWith(valueRandom)) {
2944        opts.valueRandom = true;
2945        continue;
2946      }
2947
2948      final String valueZipf = "--valueZipf";
2949      if (cmd.startsWith(valueZipf)) {
2950        opts.valueZipf = true;
2951        continue;
2952      }
2953
2954      final String period = "--period=";
2955      if (cmd.startsWith(period)) {
2956        opts.period = Integer.parseInt(cmd.substring(period.length()));
2957        continue;
2958      }
2959
2960      final String addColumns = "--addColumns=";
2961      if (cmd.startsWith(addColumns)) {
2962        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2963        continue;
2964      }
2965
2966      final String inMemoryCompaction = "--inmemoryCompaction=";
2967      if (cmd.startsWith(inMemoryCompaction)) {
2968        opts.inMemoryCompaction =
2969          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2970        continue;
2971      }
2972
2973      final String columns = "--columns=";
2974      if (cmd.startsWith(columns)) {
2975        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2976        continue;
2977      }
2978
2979      final String families = "--families=";
2980      if (cmd.startsWith(families)) {
2981        opts.families = Integer.parseInt(cmd.substring(families.length()));
2982        continue;
2983      }
2984
2985      final String caching = "--caching=";
2986      if (cmd.startsWith(caching)) {
2987        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2988        continue;
2989      }
2990
2991      final String asyncPrefetch = "--asyncPrefetch";
2992      if (cmd.startsWith(asyncPrefetch)) {
2993        opts.asyncPrefetch = true;
2994        continue;
2995      }
2996
2997      final String cacheBlocks = "--cacheBlocks=";
2998      if (cmd.startsWith(cacheBlocks)) {
2999        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
3000        continue;
3001      }
3002
3003      final String scanReadType = "--scanReadType=";
3004      if (cmd.startsWith(scanReadType)) {
3005        opts.scanReadType =
3006          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
3007        continue;
3008      }
3009
3010      final String bufferSize = "--bufferSize=";
3011      if (cmd.startsWith(bufferSize)) {
3012        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
3013        continue;
3014      }
3015
3016      validateParsedOpts(opts);
3017
3018      if (isCommandClass(cmd)) {
3019        opts.cmdName = cmd;
3020        try {
3021          opts.numClientThreads = Integer.parseInt(args.remove());
3022        } catch (NoSuchElementException | NumberFormatException e) {
3023          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
3024        }
3025        opts = calculateRowsAndSize(opts);
3026        break;
3027      } else {
3028        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
3029      }
3030
3031      // Not matching any option or command.
3032      System.err.println("Error: Wrong option or command: " + cmd);
3033      args.add(cmd);
3034      break;
3035    }
3036    return opts;
3037  }
3038
3039  /**
3040   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3041   */
3042  private static void validateParsedOpts(TestOptions opts) {
3043
3044    if (!opts.autoFlush && opts.multiPut > 0) {
3045      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3046    }
3047
3048    if (opts.oneCon && opts.connCount > 1) {
3049      throw new IllegalArgumentException(
3050        "oneCon is set to true, " + "connCount should not bigger than 1");
3051    }
3052
3053    if (opts.valueZipf && opts.valueRandom) {
3054      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3055    }
3056  }
3057
3058  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3059    int rowsPerGB = getRowsPerGB(opts);
3060    if (
3061      (opts.getCmdName() != null
3062        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3063        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3064    ) {
3065      opts.totalRows = (int) opts.size * rowsPerGB;
3066    } else if (opts.size != DEFAULT_OPTS.size) {
3067      // total size in GB specified
3068      opts.totalRows = (int) opts.size * rowsPerGB;
3069      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3070    } else {
3071      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3072      opts.size = opts.totalRows / rowsPerGB;
3073    }
3074    return opts;
3075  }
3076
3077  static int getRowsPerGB(final TestOptions opts) {
3078    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3079      * opts.getColumns());
3080  }
3081
3082  @Override
3083  public int run(String[] args) throws Exception {
3084    // Process command-line args. TODO: Better cmd-line processing
3085    // (but hopefully something not as painful as cli options).
3086    int errCode = -1;
3087    if (args.length < 1) {
3088      printUsage();
3089      return errCode;
3090    }
3091
3092    try {
3093      LinkedList<String> argv = new LinkedList<>();
3094      argv.addAll(Arrays.asList(args));
3095      TestOptions opts = parseOpts(argv);
3096
3097      // args remaining, print help and exit
3098      if (!argv.isEmpty()) {
3099        errCode = 0;
3100        printUsage();
3101        return errCode;
3102      }
3103
3104      // must run at least 1 client
3105      if (opts.numClientThreads <= 0) {
3106        throw new IllegalArgumentException("Number of clients must be > 0");
3107      }
3108
3109      // cmdName should not be null, print help and exit
3110      if (opts.cmdName == null) {
3111        printUsage();
3112        return errCode;
3113      }
3114
3115      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3116      if (cmdClass != null) {
3117        runTest(cmdClass, opts);
3118        errCode = 0;
3119      }
3120
3121    } catch (Exception e) {
3122      e.printStackTrace();
3123    }
3124
3125    return errCode;
3126  }
3127
3128  private static boolean isCommandClass(String cmd) {
3129    return COMMANDS.containsKey(cmd);
3130  }
3131
3132  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3133    CmdDescriptor descriptor = COMMANDS.get(cmd);
3134    return descriptor != null ? descriptor.getCmdClass() : null;
3135  }
3136
3137  public static void main(final String[] args) throws Exception {
3138    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3139    System.exit(res);
3140  }
3141}