001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Queue;
040import java.util.Random;
041import java.util.TreeMap;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.ThreadLocalRandom;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.AsyncConnection;
056import org.apache.hadoop.hbase.client.AsyncTable;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Consistency;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.RowMutations;
074import org.apache.hadoop.hbase.client.Scan;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
079import org.apache.hadoop.hbase.filter.BinaryComparator;
080import org.apache.hadoop.hbase.filter.Filter;
081import org.apache.hadoop.hbase.filter.FilterAllFilter;
082import org.apache.hadoop.hbase.filter.FilterList;
083import org.apache.hadoop.hbase.filter.PageFilter;
084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
085import org.apache.hadoop.hbase.filter.WhileMatchFilter;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
091import org.apache.hadoop.hbase.trace.TraceUtil;
092import org.apache.hadoop.hbase.util.ByteArrayHashKey;
093import org.apache.hadoop.hbase.util.Bytes;
094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
095import org.apache.hadoop.hbase.util.GsonUtil;
096import org.apache.hadoop.hbase.util.Hash;
097import org.apache.hadoop.hbase.util.MurmurHash;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.hbase.util.RandomDistribution;
100import org.apache.hadoop.hbase.util.YammerHistogramUtils;
101import org.apache.hadoop.io.LongWritable;
102import org.apache.hadoop.io.Text;
103import org.apache.hadoop.mapreduce.Job;
104import org.apache.hadoop.mapreduce.Mapper;
105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
108import org.apache.hadoop.util.Tool;
109import org.apache.hadoop.util.ToolRunner;
110import org.apache.yetus.audience.InterfaceAudience;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113
114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
116import org.apache.hbase.thirdparty.com.google.gson.Gson;
117
118/**
119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
121 * etc.). Pass on the command-line which test to run and how many clients are participating in this
122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
123 * <p>
124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
126 * pages 8-10.
127 * <p>
128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
130 * about 1GB of data, unless specified otherwise.
131 */
132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
133public class PerformanceEvaluation extends Configured implements Tool {
134  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
135  static final String RANDOM_READ = "randomRead";
136  static final String PE_COMMAND_SHORTNAME = "pe";
137  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
138  private static final Gson GSON = GsonUtil.createGson().create();
139
140  public static final String TABLE_NAME = "TestTable";
141  public static final String FAMILY_NAME_BASE = "info";
142  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
143  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
144  public static final int DEFAULT_VALUE_LENGTH = 1000;
145  public static final int ROW_LENGTH = 26;
146
147  private static final int ONE_GB = 1024 * 1024 * 1000;
148  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
149  // TODO : should we make this configurable
150  private static final int TAG_LENGTH = 256;
151  private static final DecimalFormat FMT = new DecimalFormat("0.##");
152  private static final MathContext CXT = MathContext.DECIMAL64;
153  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
154  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
155  private static final TestOptions DEFAULT_OPTS = new TestOptions();
156
157  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
158  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
159
160  static {
161    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
162      "Run async random read test");
163    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
164      "Run async random write test");
165    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
166      "Run async sequential read test");
167    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
168      "Run async sequential write test");
169    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
170    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
171    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
172    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
173      "Run random seek and scan 100 test");
174    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
175      "Run random seek scan with both start and stop row (max 10 rows)");
176    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
177      "Run random seek scan with both start and stop row (max 100 rows)");
178    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
179      "Run random seek scan with both start and stop row (max 1000 rows)");
180    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
181      "Run random seek scan with both start and stop row (max 10000 rows)");
182    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
183    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
184    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
185    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
186      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
187    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
188    addCommandDescriptor(FilteredScanTest.class, "filterScan",
189      "Run scan test using a filter to find a specific row based on it's value "
190        + "(make sure to use --rows=20)");
191    addCommandDescriptor(IncrementTest.class, "increment",
192      "Increment on each row; clients overlap on keyspace so some concurrent operations");
193    addCommandDescriptor(AppendTest.class, "append",
194      "Append on each row; clients overlap on keyspace so some concurrent operations");
195    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
196      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
197    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
198      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
200      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
202      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
203  }
204
205  /**
206   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
207   * associated properties.
208   */
209  protected static enum Counter {
210    /** elapsed time */
211    ELAPSED_TIME,
212    /** number of rows */
213    ROWS
214  }
215
216  protected static class RunResult implements Comparable<RunResult> {
217    public RunResult(long duration, Histogram hist) {
218      this.duration = duration;
219      this.hist = hist;
220      numbOfReplyOverThreshold = 0;
221      numOfReplyFromReplica = 0;
222    }
223
224    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
225      Histogram hist) {
226      this.duration = duration;
227      this.hist = hist;
228      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
229      this.numOfReplyFromReplica = numOfReplyFromReplica;
230    }
231
232    public final long duration;
233    public final Histogram hist;
234    public final long numbOfReplyOverThreshold;
235    public final long numOfReplyFromReplica;
236
237    @Override
238    public String toString() {
239      return Long.toString(duration);
240    }
241
242    @Override
243    public int compareTo(RunResult o) {
244      return Long.compare(this.duration, o.duration);
245    }
246  }
247
248  /**
249   * Constructor
250   * @param conf Configuration object
251   */
252  public PerformanceEvaluation(final Configuration conf) {
253    super(conf);
254  }
255
256  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
257    String description) {
258    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
259    COMMANDS.put(name, cmdDescriptor);
260  }
261
262  /**
263   * Implementations can have their status set.
264   */
265  interface Status {
266    /**
267     * Sets status
268     * @param msg status message n
269     */
270    void setStatus(final String msg) throws IOException;
271  }
272
273  /**
274   * MapReduce job that runs a performance evaluation client in each map task.
275   */
276  public static class EvaluationMapTask
277    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
278
279    /** configuration parameter name that contains the command */
280    public final static String CMD_KEY = "EvaluationMapTask.command";
281    /** configuration parameter name that contains the PE impl */
282    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
283
284    private Class<? extends Test> cmd;
285
286    @Override
287    protected void setup(Context context) throws IOException, InterruptedException {
288      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
289
290      // this is required so that extensions of PE are instantiated within the
291      // map reduce task...
292      Class<? extends PerformanceEvaluation> peClass =
293        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
294      try {
295        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
296      } catch (Exception e) {
297        throw new IllegalStateException("Could not instantiate PE instance", e);
298      }
299    }
300
301    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
302      try {
303        return Class.forName(className).asSubclass(type);
304      } catch (ClassNotFoundException e) {
305        throw new IllegalStateException("Could not find class for name: " + className, e);
306      }
307    }
308
309    @Override
310    protected void map(LongWritable key, Text value, final Context context)
311      throws IOException, InterruptedException {
312
313      Status status = new Status() {
314        @Override
315        public void setStatus(String msg) {
316          context.setStatus(msg);
317        }
318      };
319
320      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
321      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
322      final Connection con = ConnectionFactory.createConnection(conf);
323      AsyncConnection asyncCon = null;
324      try {
325        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
326      } catch (ExecutionException e) {
327        throw new IOException(e);
328      }
329
330      // Evaluation task
331      RunResult result =
332        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
333      // Collect how much time the thing took. Report as map output and
334      // to the ELAPSED_TIME counter.
335      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
336      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
337      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
338      context.progress();
339    }
340  }
341
342  /*
343   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
344   * is specified or when the existing table's region replica count doesn't match {@code
345   * opts.replicas}.
346   */
347  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
348    TableName tableName = TableName.valueOf(opts.tableName);
349    boolean needsDelete = false, exists = admin.tableExists(tableName);
350    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
351      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
352    if (!exists && isReadCmd) {
353      throw new IllegalStateException(
354        "Must specify an existing table for read commands. Run a write command first.");
355    }
356    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
357    byte[][] splits = getSplits(opts);
358
359    // recreate the table when user has requested presplit or when existing
360    // {RegionSplitPolicy,replica count} does not match requested, or when the
361    // number of column families does not match requested.
362    if (
363      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
364        || (!isReadCmd && desc != null
365          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
366        || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
367        || (desc != null && desc.getColumnFamilyCount() != opts.families)
368    ) {
369      needsDelete = true;
370      // wait, why did it delete my table?!?
371      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
372        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
373        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
374        .add("replicas", opts.replicas).add("families", opts.families).toString());
375    }
376
377    // remove an existing table
378    if (needsDelete) {
379      if (admin.isTableEnabled(tableName)) {
380        admin.disableTable(tableName);
381      }
382      admin.deleteTable(tableName);
383    }
384
385    // table creation is necessary
386    if (!exists || needsDelete) {
387      desc = getTableDescriptor(opts);
388      if (splits != null) {
389        if (LOG.isDebugEnabled()) {
390          for (int i = 0; i < splits.length; i++) {
391            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
392          }
393        }
394      }
395      if (splits != null) {
396        admin.createTable(desc, splits);
397      } else {
398        admin.createTable(desc);
399      }
400      LOG.info("Table " + desc + " created");
401    }
402    return admin.tableExists(tableName);
403  }
404
405  /**
406   * Create an HTableDescriptor from provided TestOptions.
407   */
408  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
409    TableDescriptorBuilder builder =
410      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
411
412    for (int family = 0; family < opts.families; family++) {
413      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
414      ColumnFamilyDescriptorBuilder cfBuilder =
415        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
416      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
417      cfBuilder.setCompressionType(opts.compression);
418      cfBuilder.setEncryptionType(opts.encryption);
419      cfBuilder.setBloomFilterType(opts.bloomType);
420      cfBuilder.setBlocksize(opts.blockSize);
421      if (opts.inMemoryCF) {
422        cfBuilder.setInMemory(true);
423      }
424      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
425      builder.setColumnFamily(cfBuilder.build());
426    }
427    if (opts.replicas != DEFAULT_OPTS.replicas) {
428      builder.setRegionReplication(opts.replicas);
429    }
430    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
431      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
432    }
433    return builder.build();
434  }
435
436  /**
437   * generates splits based on total number of rows and specified split regions
438   */
439  protected static byte[][] getSplits(TestOptions opts) {
440    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
441
442    int numSplitPoints = opts.presplitRegions - 1;
443    byte[][] splits = new byte[numSplitPoints][];
444    int jump = opts.totalRows / opts.presplitRegions;
445    for (int i = 0; i < numSplitPoints; i++) {
446      int rowkey = jump * (1 + i);
447      splits[i] = format(rowkey);
448    }
449    return splits;
450  }
451
452  static void setupConnectionCount(final TestOptions opts) {
453    if (opts.oneCon) {
454      opts.connCount = 1;
455    } else {
456      if (opts.connCount == -1) {
457        // set to thread number if connCount is not set
458        opts.connCount = opts.numClientThreads;
459      }
460    }
461  }
462
463  /*
464   * Run all clients in this vm each to its own thread.
465   */
466  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
467    throws IOException, InterruptedException, ExecutionException {
468    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
469    assert cmd != null;
470    @SuppressWarnings("unchecked")
471    Future<RunResult>[] threads = new Future[opts.numClientThreads];
472    RunResult[] results = new RunResult[opts.numClientThreads];
473    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
474      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
475    setupConnectionCount(opts);
476    final Connection[] cons = new Connection[opts.connCount];
477    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
478    for (int i = 0; i < opts.connCount; i++) {
479      cons[i] = ConnectionFactory.createConnection(conf);
480      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
481    }
482    LOG
483      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
484    for (int i = 0; i < threads.length; i++) {
485      final int index = i;
486      threads[i] = pool.submit(new Callable<RunResult>() {
487        @Override
488        public RunResult call() throws Exception {
489          TestOptions threadOpts = new TestOptions(opts);
490          final Connection con = cons[index % cons.length];
491          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
492          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
493          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
494            @Override
495            public void setStatus(final String msg) throws IOException {
496              LOG.info(msg);
497            }
498          });
499          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
500            + "ms over " + threadOpts.perClientRunRows + " rows");
501          if (opts.latencyThreshold > 0) {
502            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
503              + "(ms) is " + run.numbOfReplyOverThreshold);
504          }
505          return run;
506        }
507      });
508    }
509    pool.shutdown();
510
511    for (int i = 0; i < threads.length; i++) {
512      try {
513        results[i] = threads[i].get();
514      } catch (ExecutionException e) {
515        throw new IOException(e.getCause());
516      }
517    }
518    final String test = cmd.getSimpleName();
519    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
520    Arrays.sort(results);
521    long total = 0;
522    float avgLatency = 0;
523    float avgTPS = 0;
524    long replicaWins = 0;
525    for (RunResult result : results) {
526      total += result.duration;
527      avgLatency += result.hist.getSnapshot().getMean();
528      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
529      replicaWins += result.numOfReplyFromReplica;
530    }
531    avgTPS *= 1000; // ms to second
532    avgLatency = avgLatency / results.length;
533    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
534      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
535    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
536    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
537    if (opts.replicas > 1) {
538      LOG.info("[results from replica regions] " + replicaWins);
539    }
540
541    for (int i = 0; i < opts.connCount; i++) {
542      cons[i].close();
543      asyncCons[i].close();
544    }
545
546    return results;
547  }
548
549  /*
550   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
551   * out an input file with instruction per client regards which row they are to start on.
552   * @param cmd Command to run. n
553   */
554  static Job doMapReduce(TestOptions opts, final Configuration conf)
555    throws IOException, InterruptedException, ClassNotFoundException {
556    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
557    assert cmd != null;
558    Path inputDir = writeInputFile(conf, opts);
559    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
560    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
561    Job job = Job.getInstance(conf);
562    job.setJarByClass(PerformanceEvaluation.class);
563    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
564
565    job.setInputFormatClass(NLineInputFormat.class);
566    NLineInputFormat.setInputPaths(job, inputDir);
567    // this is default, but be explicit about it just in case.
568    NLineInputFormat.setNumLinesPerSplit(job, 1);
569
570    job.setOutputKeyClass(LongWritable.class);
571    job.setOutputValueClass(LongWritable.class);
572
573    job.setMapperClass(EvaluationMapTask.class);
574    job.setReducerClass(LongSumReducer.class);
575
576    job.setNumReduceTasks(1);
577
578    job.setOutputFormatClass(TextOutputFormat.class);
579    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
580
581    TableMapReduceUtil.addDependencyJars(job);
582    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
583                                                                                            // metrics
584      Gson.class, // gson
585      FilterAllFilter.class // hbase-server tests jar
586    );
587
588    TableMapReduceUtil.initCredentials(job);
589
590    job.waitForCompletion(true);
591    return job;
592  }
593
594  /**
595   * Each client has one mapper to do the work, and client do the resulting count in a map task.
596   */
597
598  static String JOB_INPUT_FILENAME = "input.txt";
599
600  /*
601   * Write input file of offsets-per-client for the mapreduce job.
602   * @param c Configuration
603   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME n
604   */
605  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
606    return writeInputFile(c, opts, new Path("."));
607  }
608
609  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
610    throws IOException {
611    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
612    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
613    Path inputDir = new Path(jobdir, "inputs");
614
615    FileSystem fs = FileSystem.get(c);
616    fs.mkdirs(inputDir);
617
618    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
619    PrintStream out = new PrintStream(fs.create(inputFile));
620    // Make input random.
621    Map<Integer, String> m = new TreeMap<>();
622    Hash h = MurmurHash.getInstance();
623    int perClientRows = (opts.totalRows / opts.numClientThreads);
624    try {
625      for (int j = 0; j < opts.numClientThreads; j++) {
626        TestOptions next = new TestOptions(opts);
627        next.startRow = j * perClientRows;
628        next.perClientRunRows = perClientRows;
629        String s = GSON.toJson(next);
630        LOG.info("Client=" + j + ", input=" + s);
631        byte[] b = Bytes.toBytes(s);
632        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
633        m.put(hash, s);
634      }
635      for (Map.Entry<Integer, String> e : m.entrySet()) {
636        out.println(e.getValue());
637      }
638    } finally {
639      out.close();
640    }
641    return inputDir;
642  }
643
644  /**
645   * Describes a command.
646   */
647  static class CmdDescriptor {
648    private Class<? extends TestBase> cmdClass;
649    private String name;
650    private String description;
651
652    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
653      this.cmdClass = cmdClass;
654      this.name = name;
655      this.description = description;
656    }
657
658    public Class<? extends TestBase> getCmdClass() {
659      return cmdClass;
660    }
661
662    public String getName() {
663      return name;
664    }
665
666    public String getDescription() {
667      return description;
668    }
669  }
670
671  /**
672   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
673   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
674   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
675   * need to add to the clone constructor below copying your new option from the 'that' to the
676   * 'this'. Look for 'clone' below.
677   */
678  static class TestOptions {
679    String cmdName = null;
680    boolean nomapred = false;
681    boolean filterAll = false;
682    int startRow = 0;
683    float size = 1.0f;
684    int perClientRunRows = DEFAULT_ROWS_PER_GB;
685    int numClientThreads = 1;
686    int totalRows = DEFAULT_ROWS_PER_GB;
687    int measureAfter = 0;
688    float sampleRate = 1.0f;
689    /**
690     * @deprecated Useless after switching to OpenTelemetry
691     */
692    @Deprecated
693    double traceRate = 0.0;
694    String tableName = TABLE_NAME;
695    boolean flushCommits = true;
696    boolean writeToWAL = true;
697    boolean autoFlush = false;
698    boolean oneCon = false;
699    int connCount = -1; // wil decide the actual num later
700    boolean useTags = false;
701    int noOfTags = 1;
702    boolean reportLatency = false;
703    int multiGet = 0;
704    int multiPut = 0;
705    int randomSleep = 0;
706    boolean inMemoryCF = false;
707    int presplitRegions = 0;
708    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
709    String splitPolicy = null;
710    Compression.Algorithm compression = Compression.Algorithm.NONE;
711    String encryption = null;
712    BloomType bloomType = BloomType.ROW;
713    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
714    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
715    boolean valueRandom = false;
716    boolean valueZipf = false;
717    int valueSize = DEFAULT_VALUE_LENGTH;
718    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
719    int cycles = 1;
720    int columns = 1;
721    int families = 1;
722    int caching = 30;
723    int latencyThreshold = 0; // in millsecond
724    boolean addColumns = true;
725    MemoryCompactionPolicy inMemoryCompaction =
726      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
727    boolean asyncPrefetch = false;
728    boolean cacheBlocks = true;
729    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
730    long bufferSize = 2l * 1024l * 1024l;
731
732    public TestOptions() {
733    }
734
735    /**
736     * Clone constructor.
737     * @param that Object to copy from.
738     */
739    public TestOptions(TestOptions that) {
740      this.cmdName = that.cmdName;
741      this.cycles = that.cycles;
742      this.nomapred = that.nomapred;
743      this.startRow = that.startRow;
744      this.size = that.size;
745      this.perClientRunRows = that.perClientRunRows;
746      this.numClientThreads = that.numClientThreads;
747      this.totalRows = that.totalRows;
748      this.sampleRate = that.sampleRate;
749      this.traceRate = that.traceRate;
750      this.tableName = that.tableName;
751      this.flushCommits = that.flushCommits;
752      this.writeToWAL = that.writeToWAL;
753      this.autoFlush = that.autoFlush;
754      this.oneCon = that.oneCon;
755      this.connCount = that.connCount;
756      this.useTags = that.useTags;
757      this.noOfTags = that.noOfTags;
758      this.reportLatency = that.reportLatency;
759      this.latencyThreshold = that.latencyThreshold;
760      this.multiGet = that.multiGet;
761      this.multiPut = that.multiPut;
762      this.inMemoryCF = that.inMemoryCF;
763      this.presplitRegions = that.presplitRegions;
764      this.replicas = that.replicas;
765      this.splitPolicy = that.splitPolicy;
766      this.compression = that.compression;
767      this.encryption = that.encryption;
768      this.blockEncoding = that.blockEncoding;
769      this.filterAll = that.filterAll;
770      this.bloomType = that.bloomType;
771      this.blockSize = that.blockSize;
772      this.valueRandom = that.valueRandom;
773      this.valueZipf = that.valueZipf;
774      this.valueSize = that.valueSize;
775      this.period = that.period;
776      this.randomSleep = that.randomSleep;
777      this.measureAfter = that.measureAfter;
778      this.addColumns = that.addColumns;
779      this.columns = that.columns;
780      this.families = that.families;
781      this.caching = that.caching;
782      this.inMemoryCompaction = that.inMemoryCompaction;
783      this.asyncPrefetch = that.asyncPrefetch;
784      this.cacheBlocks = that.cacheBlocks;
785      this.scanReadType = that.scanReadType;
786      this.bufferSize = that.bufferSize;
787    }
788
789    public int getCaching() {
790      return this.caching;
791    }
792
793    public void setCaching(final int caching) {
794      this.caching = caching;
795    }
796
797    public int getColumns() {
798      return this.columns;
799    }
800
801    public void setColumns(final int columns) {
802      this.columns = columns;
803    }
804
805    public int getFamilies() {
806      return this.families;
807    }
808
809    public void setFamilies(final int families) {
810      this.families = families;
811    }
812
813    public int getCycles() {
814      return this.cycles;
815    }
816
817    public void setCycles(final int cycles) {
818      this.cycles = cycles;
819    }
820
821    public boolean isValueZipf() {
822      return valueZipf;
823    }
824
825    public void setValueZipf(boolean valueZipf) {
826      this.valueZipf = valueZipf;
827    }
828
829    public String getCmdName() {
830      return cmdName;
831    }
832
833    public void setCmdName(String cmdName) {
834      this.cmdName = cmdName;
835    }
836
837    public int getRandomSleep() {
838      return randomSleep;
839    }
840
841    public void setRandomSleep(int randomSleep) {
842      this.randomSleep = randomSleep;
843    }
844
845    public int getReplicas() {
846      return replicas;
847    }
848
849    public void setReplicas(int replicas) {
850      this.replicas = replicas;
851    }
852
853    public String getSplitPolicy() {
854      return splitPolicy;
855    }
856
857    public void setSplitPolicy(String splitPolicy) {
858      this.splitPolicy = splitPolicy;
859    }
860
861    public void setNomapred(boolean nomapred) {
862      this.nomapred = nomapred;
863    }
864
865    public void setFilterAll(boolean filterAll) {
866      this.filterAll = filterAll;
867    }
868
869    public void setStartRow(int startRow) {
870      this.startRow = startRow;
871    }
872
873    public void setSize(float size) {
874      this.size = size;
875    }
876
877    public void setPerClientRunRows(int perClientRunRows) {
878      this.perClientRunRows = perClientRunRows;
879    }
880
881    public void setNumClientThreads(int numClientThreads) {
882      this.numClientThreads = numClientThreads;
883    }
884
885    public void setTotalRows(int totalRows) {
886      this.totalRows = totalRows;
887    }
888
889    public void setSampleRate(float sampleRate) {
890      this.sampleRate = sampleRate;
891    }
892
893    public void setTraceRate(double traceRate) {
894      this.traceRate = traceRate;
895    }
896
897    public void setTableName(String tableName) {
898      this.tableName = tableName;
899    }
900
901    public void setFlushCommits(boolean flushCommits) {
902      this.flushCommits = flushCommits;
903    }
904
905    public void setWriteToWAL(boolean writeToWAL) {
906      this.writeToWAL = writeToWAL;
907    }
908
909    public void setAutoFlush(boolean autoFlush) {
910      this.autoFlush = autoFlush;
911    }
912
913    public void setOneCon(boolean oneCon) {
914      this.oneCon = oneCon;
915    }
916
917    public int getConnCount() {
918      return connCount;
919    }
920
921    public void setConnCount(int connCount) {
922      this.connCount = connCount;
923    }
924
925    public void setUseTags(boolean useTags) {
926      this.useTags = useTags;
927    }
928
929    public void setNoOfTags(int noOfTags) {
930      this.noOfTags = noOfTags;
931    }
932
933    public void setReportLatency(boolean reportLatency) {
934      this.reportLatency = reportLatency;
935    }
936
937    public void setMultiGet(int multiGet) {
938      this.multiGet = multiGet;
939    }
940
941    public void setMultiPut(int multiPut) {
942      this.multiPut = multiPut;
943    }
944
945    public void setInMemoryCF(boolean inMemoryCF) {
946      this.inMemoryCF = inMemoryCF;
947    }
948
949    public void setPresplitRegions(int presplitRegions) {
950      this.presplitRegions = presplitRegions;
951    }
952
953    public void setCompression(Compression.Algorithm compression) {
954      this.compression = compression;
955    }
956
957    public void setEncryption(String encryption) {
958      this.encryption = encryption;
959    }
960
961    public void setBloomType(BloomType bloomType) {
962      this.bloomType = bloomType;
963    }
964
965    public void setBlockSize(int blockSize) {
966      this.blockSize = blockSize;
967    }
968
969    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
970      this.blockEncoding = blockEncoding;
971    }
972
973    public void setValueRandom(boolean valueRandom) {
974      this.valueRandom = valueRandom;
975    }
976
977    public void setValueSize(int valueSize) {
978      this.valueSize = valueSize;
979    }
980
981    public void setBufferSize(long bufferSize) {
982      this.bufferSize = bufferSize;
983    }
984
985    public void setPeriod(int period) {
986      this.period = period;
987    }
988
989    public boolean isNomapred() {
990      return nomapred;
991    }
992
993    public boolean isFilterAll() {
994      return filterAll;
995    }
996
997    public int getStartRow() {
998      return startRow;
999    }
1000
1001    public float getSize() {
1002      return size;
1003    }
1004
1005    public int getPerClientRunRows() {
1006      return perClientRunRows;
1007    }
1008
1009    public int getNumClientThreads() {
1010      return numClientThreads;
1011    }
1012
1013    public int getTotalRows() {
1014      return totalRows;
1015    }
1016
1017    public float getSampleRate() {
1018      return sampleRate;
1019    }
1020
1021    public double getTraceRate() {
1022      return traceRate;
1023    }
1024
1025    public String getTableName() {
1026      return tableName;
1027    }
1028
1029    public boolean isFlushCommits() {
1030      return flushCommits;
1031    }
1032
1033    public boolean isWriteToWAL() {
1034      return writeToWAL;
1035    }
1036
1037    public boolean isAutoFlush() {
1038      return autoFlush;
1039    }
1040
1041    public boolean isUseTags() {
1042      return useTags;
1043    }
1044
1045    public int getNoOfTags() {
1046      return noOfTags;
1047    }
1048
1049    public boolean isReportLatency() {
1050      return reportLatency;
1051    }
1052
1053    public int getMultiGet() {
1054      return multiGet;
1055    }
1056
1057    public int getMultiPut() {
1058      return multiPut;
1059    }
1060
1061    public boolean isInMemoryCF() {
1062      return inMemoryCF;
1063    }
1064
1065    public int getPresplitRegions() {
1066      return presplitRegions;
1067    }
1068
1069    public Compression.Algorithm getCompression() {
1070      return compression;
1071    }
1072
1073    public String getEncryption() {
1074      return encryption;
1075    }
1076
1077    public DataBlockEncoding getBlockEncoding() {
1078      return blockEncoding;
1079    }
1080
1081    public boolean isValueRandom() {
1082      return valueRandom;
1083    }
1084
1085    public int getValueSize() {
1086      return valueSize;
1087    }
1088
1089    public int getPeriod() {
1090      return period;
1091    }
1092
1093    public BloomType getBloomType() {
1094      return bloomType;
1095    }
1096
1097    public int getBlockSize() {
1098      return blockSize;
1099    }
1100
1101    public boolean isOneCon() {
1102      return oneCon;
1103    }
1104
1105    public int getMeasureAfter() {
1106      return measureAfter;
1107    }
1108
1109    public void setMeasureAfter(int measureAfter) {
1110      this.measureAfter = measureAfter;
1111    }
1112
1113    public boolean getAddColumns() {
1114      return addColumns;
1115    }
1116
1117    public void setAddColumns(boolean addColumns) {
1118      this.addColumns = addColumns;
1119    }
1120
1121    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1122      this.inMemoryCompaction = inMemoryCompaction;
1123    }
1124
1125    public MemoryCompactionPolicy getInMemoryCompaction() {
1126      return this.inMemoryCompaction;
1127    }
1128
1129    public long getBufferSize() {
1130      return this.bufferSize;
1131    }
1132  }
1133
1134  /*
1135   * A test. Subclass to particularize what happens per row.
1136   */
1137  static abstract class TestBase {
1138    // Below is make it so when Tests are all running in the one
1139    // jvm, that they each have a differently seeded Random.
1140    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1141
1142    private static long nextRandomSeed() {
1143      return randomSeed.nextLong();
1144    }
1145
1146    private final int everyN;
1147
1148    protected final Random rand = new Random(nextRandomSeed());
1149    protected final Configuration conf;
1150    protected final TestOptions opts;
1151
1152    private final Status status;
1153
1154    private String testName;
1155    private Histogram latencyHistogram;
1156    private Histogram replicaLatencyHistogram;
1157    private Histogram valueSizeHistogram;
1158    private Histogram rpcCallsHistogram;
1159    private Histogram remoteRpcCallsHistogram;
1160    private Histogram millisBetweenNextHistogram;
1161    private Histogram regionsScannedHistogram;
1162    private Histogram bytesInResultsHistogram;
1163    private Histogram bytesInRemoteResultsHistogram;
1164    private RandomDistribution.Zipf zipf;
1165    private long numOfReplyOverLatencyThreshold = 0;
1166    private long numOfReplyFromReplica = 0;
1167
1168    /**
1169     * Note that all subclasses of this class must provide a public constructor that has the exact
1170     * same list of arguments.
1171     */
1172    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1173      this.conf = conf;
1174      this.opts = options;
1175      this.status = status;
1176      this.testName = this.getClass().getSimpleName();
1177      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1178      if (options.isValueZipf()) {
1179        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1180      }
1181      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1182    }
1183
1184    int getValueLength(final Random r) {
1185      if (this.opts.isValueRandom()) {
1186        return r.nextInt(opts.valueSize);
1187      } else if (this.opts.isValueZipf()) {
1188        return Math.abs(this.zipf.nextInt());
1189      } else {
1190        return opts.valueSize;
1191      }
1192    }
1193
1194    void updateValueSize(final Result[] rs) throws IOException {
1195      updateValueSize(rs, 0);
1196    }
1197
1198    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1199      if (rs == null || (latency == 0)) return;
1200      for (Result r : rs)
1201        updateValueSize(r, latency);
1202    }
1203
1204    void updateValueSize(final Result r) throws IOException {
1205      updateValueSize(r, 0);
1206    }
1207
1208    void updateValueSize(final Result r, final long latency) throws IOException {
1209      if (r == null || (latency == 0)) return;
1210      int size = 0;
1211      // update replicaHistogram
1212      if (r.isStale()) {
1213        replicaLatencyHistogram.update(latency / 1000);
1214        numOfReplyFromReplica++;
1215      }
1216      if (!isRandomValueSize()) return;
1217
1218      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1219        size += scanner.current().getValueLength();
1220      }
1221      updateValueSize(size);
1222    }
1223
1224    void updateValueSize(final int valueSize) {
1225      if (!isRandomValueSize()) return;
1226      this.valueSizeHistogram.update(valueSize);
1227    }
1228
1229    void updateScanMetrics(final ScanMetrics metrics) {
1230      if (metrics == null) return;
1231      Map<String, Long> metricsMap = metrics.getMetricsMap();
1232      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1233      if (rpcCalls != null) {
1234        this.rpcCallsHistogram.update(rpcCalls.longValue());
1235      }
1236      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1237      if (remoteRpcCalls != null) {
1238        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1239      }
1240      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1241      if (millisBetweenNext != null) {
1242        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1243      }
1244      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1245      if (regionsScanned != null) {
1246        this.regionsScannedHistogram.update(regionsScanned.longValue());
1247      }
1248      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1249      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1250        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1251      }
1252      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1253      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1254        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1255      }
1256    }
1257
1258    String generateStatus(final int sr, final int i, final int lr) {
1259      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1260        + getShortLatencyReport() + "]"
1261        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1262    }
1263
1264    boolean isRandomValueSize() {
1265      return opts.valueRandom;
1266    }
1267
1268    protected int getReportingPeriod() {
1269      return opts.period;
1270    }
1271
1272    /**
1273     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1274     */
1275    public Histogram getLatencyHistogram() {
1276      return latencyHistogram;
1277    }
1278
1279    void testSetup() throws IOException {
1280      // test metrics
1281      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1282      // If it is a replica test, set up histogram for replica.
1283      if (opts.replicas > 1) {
1284        replicaLatencyHistogram =
1285          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1286      }
1287      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1288      // scan metrics
1289      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1290      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1291      millisBetweenNextHistogram =
1292        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1293      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1294      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1295      bytesInRemoteResultsHistogram =
1296        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1297
1298      onStartup();
1299    }
1300
1301    abstract void onStartup() throws IOException;
1302
1303    void testTakedown() throws IOException {
1304      onTakedown();
1305      // Print all stats for this thread continuously.
1306      // Synchronize on Test.class so different threads don't intermingle the
1307      // output. We can't use 'this' here because each thread has its own instance of Test class.
1308      synchronized (Test.class) {
1309        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1310        status
1311          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1312        if (opts.replicas > 1) {
1313          status.setStatus("Latency (us) from Replica Regions: "
1314            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1315        }
1316        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1317        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1318        if (valueSizeHistogram.getCount() > 0) {
1319          status.setStatus(
1320            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1321          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1322          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1323        } else {
1324          status.setStatus("No valueSize statistics available");
1325        }
1326        if (rpcCallsHistogram.getCount() > 0) {
1327          status.setStatus(
1328            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1329        }
1330        if (remoteRpcCallsHistogram.getCount() > 0) {
1331          status.setStatus("remoteRpcCalls (count): "
1332            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1333        }
1334        if (millisBetweenNextHistogram.getCount() > 0) {
1335          status.setStatus("millisBetweenNext (latency): "
1336            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1337        }
1338        if (regionsScannedHistogram.getCount() > 0) {
1339          status.setStatus("regionsScanned (count): "
1340            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1341        }
1342        if (bytesInResultsHistogram.getCount() > 0) {
1343          status.setStatus("bytesInResults (size): "
1344            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1345        }
1346        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1347          status.setStatus("bytesInRemoteResults (size): "
1348            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1349        }
1350      }
1351    }
1352
1353    abstract void onTakedown() throws IOException;
1354
1355    /*
1356     * Run test
1357     * @return Elapsed time. n
1358     */
1359    long test() throws IOException, InterruptedException {
1360      testSetup();
1361      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1362      final long startTime = System.nanoTime();
1363      try {
1364        testTimed();
1365      } finally {
1366        testTakedown();
1367      }
1368      return (System.nanoTime() - startTime) / 1000000;
1369    }
1370
1371    int getStartRow() {
1372      return opts.startRow;
1373    }
1374
1375    int getLastRow() {
1376      return getStartRow() + opts.perClientRunRows;
1377    }
1378
1379    /**
1380     * Provides an extension point for tests that don't want a per row invocation.
1381     */
1382    void testTimed() throws IOException, InterruptedException {
1383      int startRow = getStartRow();
1384      int lastRow = getLastRow();
1385      // Report on completion of 1/10th of total.
1386      for (int ii = 0; ii < opts.cycles; ii++) {
1387        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1388        for (int i = startRow; i < lastRow; i++) {
1389          if (i % everyN != 0) continue;
1390          long startTime = System.nanoTime();
1391          boolean requestSent = false;
1392          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1393          try (Scope scope = span.makeCurrent()) {
1394            requestSent = testRow(i, startTime);
1395          } finally {
1396            span.end();
1397          }
1398          if ((i - startRow) > opts.measureAfter) {
1399            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1400            // first 9 times and sends the actual get request in the 10th iteration.
1401            // We should only set latency when actual request is sent because otherwise
1402            // it turns out to be 0.
1403            if (requestSent) {
1404              long latency = (System.nanoTime() - startTime) / 1000;
1405              latencyHistogram.update(latency);
1406              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1407                numOfReplyOverLatencyThreshold++;
1408              }
1409            }
1410            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1411              status.setStatus(generateStatus(startRow, i, lastRow));
1412            }
1413          }
1414        }
1415      }
1416    }
1417
1418    /**
1419     * @return Subset of the histograms' calculation.
1420     */
1421    public String getShortLatencyReport() {
1422      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1423    }
1424
1425    /**
1426     * @return Subset of the histograms' calculation.
1427     */
1428    public String getShortValueSizeReport() {
1429      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1430    }
1431
1432    /**
1433     * Test for individual row.
1434     * @param i Row index.
1435     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1436     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1437     */
1438    abstract boolean testRow(final int i, final long startTime)
1439      throws IOException, InterruptedException;
1440  }
1441
1442  static abstract class Test extends TestBase {
1443    protected Connection connection;
1444
1445    Test(final Connection con, final TestOptions options, final Status status) {
1446      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1447      this.connection = con;
1448    }
1449  }
1450
1451  static abstract class AsyncTest extends TestBase {
1452    protected AsyncConnection connection;
1453
1454    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1455      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1456      this.connection = con;
1457    }
1458  }
1459
1460  static abstract class TableTest extends Test {
1461    protected Table table;
1462
1463    TableTest(Connection con, TestOptions options, Status status) {
1464      super(con, options, status);
1465    }
1466
1467    @Override
1468    void onStartup() throws IOException {
1469      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1470    }
1471
1472    @Override
1473    void onTakedown() throws IOException {
1474      table.close();
1475    }
1476  }
1477
1478  /*
1479   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1480   */
1481  static abstract class MetaTest extends TableTest {
1482    protected int keyLength;
1483
1484    MetaTest(Connection con, TestOptions options, Status status) {
1485      super(con, options, status);
1486      keyLength = Integer.toString(opts.perClientRunRows).length();
1487    }
1488
1489    @Override
1490    void onTakedown() throws IOException {
1491      // No clean up
1492    }
1493
1494    /*
1495     * Generates Lexicographically ascending strings
1496     */
1497    protected byte[] getSplitKey(final int i) {
1498      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1499    }
1500
1501  }
1502
1503  static abstract class AsyncTableTest extends AsyncTest {
1504    protected AsyncTable<?> table;
1505
1506    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1507      super(con, options, status);
1508    }
1509
1510    @Override
1511    void onStartup() throws IOException {
1512      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1513    }
1514
1515    @Override
1516    void onTakedown() throws IOException {
1517    }
1518  }
1519
1520  static class AsyncRandomReadTest extends AsyncTableTest {
1521    private final Consistency consistency;
1522    private ArrayList<Get> gets;
1523
1524    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1525      super(con, options, status);
1526      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1527      if (opts.multiGet > 0) {
1528        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1529        this.gets = new ArrayList<>(opts.multiGet);
1530      }
1531    }
1532
1533    @Override
1534    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1535      if (opts.randomSleep > 0) {
1536        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1537      }
1538      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1539      for (int family = 0; family < opts.families; family++) {
1540        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1541        if (opts.addColumns) {
1542          for (int column = 0; column < opts.columns; column++) {
1543            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1544            get.addColumn(familyName, qualifier);
1545          }
1546        } else {
1547          get.addFamily(familyName);
1548        }
1549      }
1550      if (opts.filterAll) {
1551        get.setFilter(new FilterAllFilter());
1552      }
1553      get.setConsistency(consistency);
1554      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1555      try {
1556        if (opts.multiGet > 0) {
1557          this.gets.add(get);
1558          if (this.gets.size() == opts.multiGet) {
1559            Result[] rs =
1560              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1561            updateValueSize(rs);
1562            this.gets.clear();
1563          } else {
1564            return false;
1565          }
1566        } else {
1567          updateValueSize(this.table.get(get).get());
1568        }
1569      } catch (ExecutionException e) {
1570        throw new IOException(e);
1571      }
1572      return true;
1573    }
1574
1575    public static RuntimeException runtime(Throwable e) {
1576      if (e instanceof RuntimeException) {
1577        return (RuntimeException) e;
1578      }
1579      return new RuntimeException(e);
1580    }
1581
1582    public static <V> V propagate(Callable<V> callable) {
1583      try {
1584        return callable.call();
1585      } catch (Exception e) {
1586        throw runtime(e);
1587      }
1588    }
1589
1590    @Override
1591    protected int getReportingPeriod() {
1592      int period = opts.perClientRunRows / 10;
1593      return period == 0 ? opts.perClientRunRows : period;
1594    }
1595
1596    @Override
1597    protected void testTakedown() throws IOException {
1598      if (this.gets != null && this.gets.size() > 0) {
1599        this.table.get(gets);
1600        this.gets.clear();
1601      }
1602      super.testTakedown();
1603    }
1604  }
1605
1606  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1607
1608    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1609      super(con, options, status);
1610    }
1611
1612    @Override
1613    protected byte[] generateRow(final int i) {
1614      return getRandomRow(this.rand, opts.totalRows);
1615    }
1616  }
1617
1618  static class AsyncScanTest extends AsyncTableTest {
1619    private ResultScanner testScanner;
1620    private AsyncTable<?> asyncTable;
1621
1622    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1623      super(con, options, status);
1624    }
1625
1626    @Override
1627    void onStartup() throws IOException {
1628      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1629        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1630    }
1631
1632    @Override
1633    void testTakedown() throws IOException {
1634      if (this.testScanner != null) {
1635        updateScanMetrics(this.testScanner.getScanMetrics());
1636        this.testScanner.close();
1637      }
1638      super.testTakedown();
1639    }
1640
1641    @Override
1642    boolean testRow(final int i, final long startTime) throws IOException {
1643      if (this.testScanner == null) {
1644        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1645          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1646          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1647        for (int family = 0; family < opts.families; family++) {
1648          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1649          if (opts.addColumns) {
1650            for (int column = 0; column < opts.columns; column++) {
1651              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1652              scan.addColumn(familyName, qualifier);
1653            }
1654          } else {
1655            scan.addFamily(familyName);
1656          }
1657        }
1658        if (opts.filterAll) {
1659          scan.setFilter(new FilterAllFilter());
1660        }
1661        this.testScanner = asyncTable.getScanner(scan);
1662      }
1663      Result r = testScanner.next();
1664      updateValueSize(r);
1665      return true;
1666    }
1667  }
1668
1669  static class AsyncSequentialReadTest extends AsyncTableTest {
1670    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1671      super(con, options, status);
1672    }
1673
1674    @Override
1675    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1676      Get get = new Get(format(i));
1677      for (int family = 0; family < opts.families; family++) {
1678        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1679        if (opts.addColumns) {
1680          for (int column = 0; column < opts.columns; column++) {
1681            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1682            get.addColumn(familyName, qualifier);
1683          }
1684        } else {
1685          get.addFamily(familyName);
1686        }
1687      }
1688      if (opts.filterAll) {
1689        get.setFilter(new FilterAllFilter());
1690      }
1691      try {
1692        updateValueSize(table.get(get).get());
1693      } catch (ExecutionException e) {
1694        throw new IOException(e);
1695      }
1696      return true;
1697    }
1698  }
1699
1700  static class AsyncSequentialWriteTest extends AsyncTableTest {
1701    private ArrayList<Put> puts;
1702
1703    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1704      super(con, options, status);
1705      if (opts.multiPut > 0) {
1706        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1707        this.puts = new ArrayList<>(opts.multiPut);
1708      }
1709    }
1710
1711    protected byte[] generateRow(final int i) {
1712      return format(i);
1713    }
1714
1715    @Override
1716    @SuppressWarnings("ReturnValueIgnored")
1717    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1718      byte[] row = generateRow(i);
1719      Put put = new Put(row);
1720      for (int family = 0; family < opts.families; family++) {
1721        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1722        for (int column = 0; column < opts.columns; column++) {
1723          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1724          byte[] value = generateData(this.rand, getValueLength(this.rand));
1725          if (opts.useTags) {
1726            byte[] tag = generateData(this.rand, TAG_LENGTH);
1727            Tag[] tags = new Tag[opts.noOfTags];
1728            for (int n = 0; n < opts.noOfTags; n++) {
1729              Tag t = new ArrayBackedTag((byte) n, tag);
1730              tags[n] = t;
1731            }
1732            KeyValue kv =
1733              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1734            put.add(kv);
1735            updateValueSize(kv.getValueLength());
1736          } else {
1737            put.addColumn(familyName, qualifier, value);
1738            updateValueSize(value.length);
1739          }
1740        }
1741      }
1742      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1743      try {
1744        table.put(put).get();
1745        if (opts.multiPut > 0) {
1746          this.puts.add(put);
1747          if (this.puts.size() == opts.multiPut) {
1748            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1749            this.puts.clear();
1750          } else {
1751            return false;
1752          }
1753        } else {
1754          table.put(put).get();
1755        }
1756      } catch (ExecutionException e) {
1757        throw new IOException(e);
1758      }
1759      return true;
1760    }
1761  }
1762
1763  static abstract class BufferedMutatorTest extends Test {
1764    protected BufferedMutator mutator;
1765    protected Table table;
1766
1767    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1768      super(con, options, status);
1769    }
1770
1771    @Override
1772    void onStartup() throws IOException {
1773      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1774      p.writeBufferSize(opts.bufferSize);
1775      this.mutator = connection.getBufferedMutator(p);
1776      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1777    }
1778
1779    @Override
1780    void onTakedown() throws IOException {
1781      mutator.close();
1782      table.close();
1783    }
1784  }
1785
1786  static class RandomSeekScanTest extends TableTest {
1787    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1788      super(con, options, status);
1789    }
1790
1791    @Override
1792    boolean testRow(final int i, final long startTime) throws IOException {
1793      Scan scan =
1794        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1795          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1796          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1797      FilterList list = new FilterList();
1798      for (int family = 0; family < opts.families; family++) {
1799        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1800        if (opts.addColumns) {
1801          for (int column = 0; column < opts.columns; column++) {
1802            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1803            scan.addColumn(familyName, qualifier);
1804          }
1805        } else {
1806          scan.addFamily(familyName);
1807        }
1808      }
1809      if (opts.filterAll) {
1810        list.addFilter(new FilterAllFilter());
1811      }
1812      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1813      scan.setFilter(list);
1814      ResultScanner s = this.table.getScanner(scan);
1815      try {
1816        for (Result rr; (rr = s.next()) != null;) {
1817          updateValueSize(rr);
1818        }
1819      } finally {
1820        updateScanMetrics(s.getScanMetrics());
1821        s.close();
1822      }
1823      return true;
1824    }
1825
1826    @Override
1827    protected int getReportingPeriod() {
1828      int period = opts.perClientRunRows / 100;
1829      return period == 0 ? opts.perClientRunRows : period;
1830    }
1831
1832  }
1833
1834  static abstract class RandomScanWithRangeTest extends TableTest {
1835    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1836      super(con, options, status);
1837    }
1838
1839    @Override
1840    boolean testRow(final int i, final long startTime) throws IOException {
1841      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1842      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1843        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1844        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1845        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1846      for (int family = 0; family < opts.families; family++) {
1847        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1848        if (opts.addColumns) {
1849          for (int column = 0; column < opts.columns; column++) {
1850            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1851            scan.addColumn(familyName, qualifier);
1852          }
1853        } else {
1854          scan.addFamily(familyName);
1855        }
1856      }
1857      if (opts.filterAll) {
1858        scan.setFilter(new FilterAllFilter());
1859      }
1860      Result r = null;
1861      int count = 0;
1862      ResultScanner s = this.table.getScanner(scan);
1863      try {
1864        for (; (r = s.next()) != null;) {
1865          updateValueSize(r);
1866          count++;
1867        }
1868        if (i % 100 == 0) {
1869          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1870            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1871            count));
1872        }
1873      } finally {
1874        updateScanMetrics(s.getScanMetrics());
1875        s.close();
1876      }
1877      return true;
1878    }
1879
1880    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1881
1882    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1883      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1884      int stop = start + maxRange;
1885      return new Pair<>(format(start), format(stop));
1886    }
1887
1888    @Override
1889    protected int getReportingPeriod() {
1890      int period = opts.perClientRunRows / 100;
1891      return period == 0 ? opts.perClientRunRows : period;
1892    }
1893  }
1894
1895  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1896    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1897      super(con, options, status);
1898    }
1899
1900    @Override
1901    protected Pair<byte[], byte[]> getStartAndStopRow() {
1902      return generateStartAndStopRows(10);
1903    }
1904  }
1905
1906  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1907    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1908      super(con, options, status);
1909    }
1910
1911    @Override
1912    protected Pair<byte[], byte[]> getStartAndStopRow() {
1913      return generateStartAndStopRows(100);
1914    }
1915  }
1916
1917  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1918    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1919      super(con, options, status);
1920    }
1921
1922    @Override
1923    protected Pair<byte[], byte[]> getStartAndStopRow() {
1924      return generateStartAndStopRows(1000);
1925    }
1926  }
1927
1928  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1929    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1930      super(con, options, status);
1931    }
1932
1933    @Override
1934    protected Pair<byte[], byte[]> getStartAndStopRow() {
1935      return generateStartAndStopRows(10000);
1936    }
1937  }
1938
1939  static class RandomReadTest extends TableTest {
1940    private final Consistency consistency;
1941    private ArrayList<Get> gets;
1942
1943    RandomReadTest(Connection con, TestOptions options, Status status) {
1944      super(con, options, status);
1945      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1946      if (opts.multiGet > 0) {
1947        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1948        this.gets = new ArrayList<>(opts.multiGet);
1949      }
1950    }
1951
1952    @Override
1953    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1954      if (opts.randomSleep > 0) {
1955        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1956      }
1957      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1958      for (int family = 0; family < opts.families; family++) {
1959        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1960        if (opts.addColumns) {
1961          for (int column = 0; column < opts.columns; column++) {
1962            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1963            get.addColumn(familyName, qualifier);
1964          }
1965        } else {
1966          get.addFamily(familyName);
1967        }
1968      }
1969      if (opts.filterAll) {
1970        get.setFilter(new FilterAllFilter());
1971      }
1972      get.setConsistency(consistency);
1973      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1974      if (opts.multiGet > 0) {
1975        this.gets.add(get);
1976        if (this.gets.size() == opts.multiGet) {
1977          Result[] rs = this.table.get(this.gets);
1978          if (opts.replicas > 1) {
1979            long latency = System.nanoTime() - startTime;
1980            updateValueSize(rs, latency);
1981          } else {
1982            updateValueSize(rs);
1983          }
1984          this.gets.clear();
1985        } else {
1986          return false;
1987        }
1988      } else {
1989        if (opts.replicas > 1) {
1990          Result r = this.table.get(get);
1991          long latency = System.nanoTime() - startTime;
1992          updateValueSize(r, latency);
1993        } else {
1994          updateValueSize(this.table.get(get));
1995        }
1996      }
1997      return true;
1998    }
1999
2000    @Override
2001    protected int getReportingPeriod() {
2002      int period = opts.perClientRunRows / 10;
2003      return period == 0 ? opts.perClientRunRows : period;
2004    }
2005
2006    @Override
2007    protected void testTakedown() throws IOException {
2008      if (this.gets != null && this.gets.size() > 0) {
2009        this.table.get(gets);
2010        this.gets.clear();
2011      }
2012      super.testTakedown();
2013    }
2014  }
2015
2016  /*
2017   * Send random reads against fake regions inserted by MetaWriteTest
2018   */
2019  static class MetaRandomReadTest extends MetaTest {
2020    private RegionLocator regionLocator;
2021
2022    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2023      super(con, options, status);
2024      LOG.info("call getRegionLocation");
2025    }
2026
2027    @Override
2028    void onStartup() throws IOException {
2029      super.onStartup();
2030      this.regionLocator = connection.getRegionLocator(table.getName());
2031    }
2032
2033    @Override
2034    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2035      if (opts.randomSleep > 0) {
2036        Thread.sleep(rand.nextInt(opts.randomSleep));
2037      }
2038      HRegionLocation hRegionLocation =
2039        regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true);
2040      LOG.debug("get location for region: " + hRegionLocation);
2041      return true;
2042    }
2043
2044    @Override
2045    protected int getReportingPeriod() {
2046      int period = opts.perClientRunRows / 10;
2047      return period == 0 ? opts.perClientRunRows : period;
2048    }
2049
2050    @Override
2051    protected void testTakedown() throws IOException {
2052      super.testTakedown();
2053    }
2054  }
2055
2056  static class RandomWriteTest extends SequentialWriteTest {
2057    RandomWriteTest(Connection con, TestOptions options, Status status) {
2058      super(con, options, status);
2059    }
2060
2061    @Override
2062    protected byte[] generateRow(final int i) {
2063      return getRandomRow(this.rand, opts.totalRows);
2064    }
2065
2066  }
2067
2068  static class ScanTest extends TableTest {
2069    private ResultScanner testScanner;
2070
2071    ScanTest(Connection con, TestOptions options, Status status) {
2072      super(con, options, status);
2073    }
2074
2075    @Override
2076    void testTakedown() throws IOException {
2077      if (this.testScanner != null) {
2078        this.testScanner.close();
2079      }
2080      super.testTakedown();
2081    }
2082
2083    @Override
2084    boolean testRow(final int i, final long startTime) throws IOException {
2085      if (this.testScanner == null) {
2086        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2087          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2088          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2089        for (int family = 0; family < opts.families; family++) {
2090          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2091          if (opts.addColumns) {
2092            for (int column = 0; column < opts.columns; column++) {
2093              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2094              scan.addColumn(familyName, qualifier);
2095            }
2096          } else {
2097            scan.addFamily(familyName);
2098          }
2099        }
2100        if (opts.filterAll) {
2101          scan.setFilter(new FilterAllFilter());
2102        }
2103        this.testScanner = table.getScanner(scan);
2104      }
2105      Result r = testScanner.next();
2106      updateValueSize(r);
2107      return true;
2108    }
2109  }
2110
2111  /**
2112   * Base class for operations that are CAS-like; that read a value and then set it based off what
2113   * they read. In this category is increment, append, checkAndPut, etc.
2114   * <p>
2115   * These operations also want some concurrency going on. Usually when these tests run, they
2116   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2117   * same key space. We do this with our getStartRow and getLastRow overrides.
2118   */
2119  static abstract class CASTableTest extends TableTest {
2120    private final byte[] qualifier;
2121
2122    CASTableTest(Connection con, TestOptions options, Status status) {
2123      super(con, options, status);
2124      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2125    }
2126
2127    byte[] getQualifier() {
2128      return this.qualifier;
2129    }
2130
2131    @Override
2132    int getStartRow() {
2133      return 0;
2134    }
2135
2136    @Override
2137    int getLastRow() {
2138      return opts.perClientRunRows;
2139    }
2140  }
2141
2142  static class IncrementTest extends CASTableTest {
2143    IncrementTest(Connection con, TestOptions options, Status status) {
2144      super(con, options, status);
2145    }
2146
2147    @Override
2148    boolean testRow(final int i, final long startTime) throws IOException {
2149      Increment increment = new Increment(format(i));
2150      // unlike checkAndXXX tests, which make most sense to do on a single value,
2151      // if multiple families are specified for an increment test we assume it is
2152      // meant to raise the work factor
2153      for (int family = 0; family < opts.families; family++) {
2154        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2155        increment.addColumn(familyName, getQualifier(), 1l);
2156      }
2157      updateValueSize(this.table.increment(increment));
2158      return true;
2159    }
2160  }
2161
2162  static class AppendTest extends CASTableTest {
2163    AppendTest(Connection con, TestOptions options, Status status) {
2164      super(con, options, status);
2165    }
2166
2167    @Override
2168    boolean testRow(final int i, final long startTime) throws IOException {
2169      byte[] bytes = format(i);
2170      Append append = new Append(bytes);
2171      // unlike checkAndXXX tests, which make most sense to do on a single value,
2172      // if multiple families are specified for an append test we assume it is
2173      // meant to raise the work factor
2174      for (int family = 0; family < opts.families; family++) {
2175        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2176        append.addColumn(familyName, getQualifier(), bytes);
2177      }
2178      updateValueSize(this.table.append(append));
2179      return true;
2180    }
2181  }
2182
2183  static class CheckAndMutateTest extends CASTableTest {
2184    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2185      super(con, options, status);
2186    }
2187
2188    @Override
2189    boolean testRow(final int i, final long startTime) throws IOException {
2190      final byte[] bytes = format(i);
2191      // checkAndXXX tests operate on only a single value
2192      // Put a known value so when we go to check it, it is there.
2193      Put put = new Put(bytes);
2194      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2195      this.table.put(put);
2196      RowMutations mutations = new RowMutations(bytes);
2197      mutations.add(put);
2198      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2199        .thenMutate(mutations);
2200      return true;
2201    }
2202  }
2203
2204  static class CheckAndPutTest extends CASTableTest {
2205    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2206      super(con, options, status);
2207    }
2208
2209    @Override
2210    boolean testRow(final int i, final long startTime) throws IOException {
2211      final byte[] bytes = format(i);
2212      // checkAndXXX tests operate on only a single value
2213      // Put a known value so when we go to check it, it is there.
2214      Put put = new Put(bytes);
2215      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2216      this.table.put(put);
2217      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2218        .thenPut(put);
2219      return true;
2220    }
2221  }
2222
2223  static class CheckAndDeleteTest extends CASTableTest {
2224    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2225      super(con, options, status);
2226    }
2227
2228    @Override
2229    boolean testRow(final int i, final long startTime) throws IOException {
2230      final byte[] bytes = format(i);
2231      // checkAndXXX tests operate on only a single value
2232      // Put a known value so when we go to check it, it is there.
2233      Put put = new Put(bytes);
2234      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2235      this.table.put(put);
2236      Delete delete = new Delete(put.getRow());
2237      delete.addColumn(FAMILY_ZERO, getQualifier());
2238      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2239        .thenDelete(delete);
2240      return true;
2241    }
2242  }
2243
2244  /*
2245   * Delete all fake regions inserted to meta table by MetaWriteTest.
2246   */
2247  static class CleanMetaTest extends MetaTest {
2248    CleanMetaTest(Connection con, TestOptions options, Status status) {
2249      super(con, options, status);
2250    }
2251
2252    @Override
2253    boolean testRow(final int i, final long startTime) throws IOException {
2254      try {
2255        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2256          .getRegionLocation(getSplitKey(i), false).getRegion();
2257        LOG.debug("deleting region from meta: " + regionInfo);
2258
2259        Delete delete =
2260          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2261        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2262          t.delete(delete);
2263        }
2264      } catch (IOException ie) {
2265        // Log and continue
2266        LOG.error("cannot find region with start key: " + i);
2267      }
2268      return true;
2269    }
2270  }
2271
2272  static class SequentialReadTest extends TableTest {
2273    SequentialReadTest(Connection con, TestOptions options, Status status) {
2274      super(con, options, status);
2275    }
2276
2277    @Override
2278    boolean testRow(final int i, final long startTime) throws IOException {
2279      Get get = new Get(format(i));
2280      for (int family = 0; family < opts.families; family++) {
2281        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2282        if (opts.addColumns) {
2283          for (int column = 0; column < opts.columns; column++) {
2284            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2285            get.addColumn(familyName, qualifier);
2286          }
2287        } else {
2288          get.addFamily(familyName);
2289        }
2290      }
2291      if (opts.filterAll) {
2292        get.setFilter(new FilterAllFilter());
2293      }
2294      updateValueSize(table.get(get));
2295      return true;
2296    }
2297  }
2298
2299  static class SequentialWriteTest extends BufferedMutatorTest {
2300    private ArrayList<Put> puts;
2301
2302    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2303      super(con, options, status);
2304      if (opts.multiPut > 0) {
2305        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2306        this.puts = new ArrayList<>(opts.multiPut);
2307      }
2308    }
2309
2310    protected byte[] generateRow(final int i) {
2311      return format(i);
2312    }
2313
2314    @Override
2315    boolean testRow(final int i, final long startTime) throws IOException {
2316      byte[] row = generateRow(i);
2317      Put put = new Put(row);
2318      for (int family = 0; family < opts.families; family++) {
2319        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2320        for (int column = 0; column < opts.columns; column++) {
2321          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2322          byte[] value = generateData(this.rand, getValueLength(this.rand));
2323          if (opts.useTags) {
2324            byte[] tag = generateData(this.rand, TAG_LENGTH);
2325            Tag[] tags = new Tag[opts.noOfTags];
2326            for (int n = 0; n < opts.noOfTags; n++) {
2327              Tag t = new ArrayBackedTag((byte) n, tag);
2328              tags[n] = t;
2329            }
2330            KeyValue kv =
2331              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2332            put.add(kv);
2333            updateValueSize(kv.getValueLength());
2334          } else {
2335            put.addColumn(familyName, qualifier, value);
2336            updateValueSize(value.length);
2337          }
2338        }
2339      }
2340      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2341      if (opts.autoFlush) {
2342        if (opts.multiPut > 0) {
2343          this.puts.add(put);
2344          if (this.puts.size() == opts.multiPut) {
2345            table.put(this.puts);
2346            this.puts.clear();
2347          } else {
2348            return false;
2349          }
2350        } else {
2351          table.put(put);
2352        }
2353      } else {
2354        mutator.mutate(put);
2355      }
2356      return true;
2357    }
2358  }
2359
2360  /*
2361   * Insert fake regions into meta table with contiguous split keys.
2362   */
2363  static class MetaWriteTest extends MetaTest {
2364
2365    MetaWriteTest(Connection con, TestOptions options, Status status) {
2366      super(con, options, status);
2367    }
2368
2369    @Override
2370    boolean testRow(final int i, final long startTime) throws IOException {
2371      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2372      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2373        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2374      regionInfos.add(regionInfo);
2375      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2376
2377      // write the serverName columns
2378      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2379        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2380        EnvironmentEdgeManager.currentTime());
2381      return true;
2382    }
2383  }
2384
2385  static class FilteredScanTest extends TableTest {
2386    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2387
2388    FilteredScanTest(Connection con, TestOptions options, Status status) {
2389      super(con, options, status);
2390      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2391        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2392          + ". This could take a very long time.");
2393      }
2394    }
2395
2396    @Override
2397    boolean testRow(int i, final long startTime) throws IOException {
2398      byte[] value = generateData(this.rand, getValueLength(this.rand));
2399      Scan scan = constructScan(value);
2400      ResultScanner scanner = null;
2401      try {
2402        scanner = this.table.getScanner(scan);
2403        for (Result r = null; (r = scanner.next()) != null;) {
2404          updateValueSize(r);
2405        }
2406      } finally {
2407        if (scanner != null) {
2408          updateScanMetrics(scanner.getScanMetrics());
2409          scanner.close();
2410        }
2411      }
2412      return true;
2413    }
2414
2415    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2416      FilterList list = new FilterList();
2417      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2418        new BinaryComparator(valuePrefix));
2419      list.addFilter(filter);
2420      if (opts.filterAll) {
2421        list.addFilter(new FilterAllFilter());
2422      }
2423      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2424        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2425        .setScanMetricsEnabled(true);
2426      if (opts.addColumns) {
2427        for (int column = 0; column < opts.columns; column++) {
2428          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2429          scan.addColumn(FAMILY_ZERO, qualifier);
2430        }
2431      } else {
2432        scan.addFamily(FAMILY_ZERO);
2433      }
2434      scan.setFilter(list);
2435      return scan;
2436    }
2437  }
2438
2439  /**
2440   * Compute a throughput rate in MB/s.
2441   * @param rows   Number of records consumed.
2442   * @param timeMs Time taken in milliseconds.
2443   * @return String value with label, ie '123.76 MB/s'
2444   */
2445  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2446    int columns) {
2447    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2448      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2449    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2450      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2451    return FMT.format(mbps) + " MB/s";
2452  }
2453
2454  /*
2455   * Format passed integer. n * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version
2456   * of passed number (Does absolute in case number is negative).
2457   */
2458  public static byte[] format(final int number) {
2459    byte[] b = new byte[ROW_LENGTH];
2460    int d = Math.abs(number);
2461    for (int i = b.length - 1; i >= 0; i--) {
2462      b[i] = (byte) ((d % 10) + '0');
2463      d /= 10;
2464    }
2465    return b;
2466  }
2467
2468  /*
2469   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2470   * test, generation of the key and value consumes about 30% of CPU time.
2471   * @return Generated random value to insert into a table cell.
2472   */
2473  public static byte[] generateData(final Random r, int length) {
2474    byte[] b = new byte[length];
2475    int i;
2476
2477    for (i = 0; i < (length - 8); i += 8) {
2478      b[i] = (byte) (65 + r.nextInt(26));
2479      b[i + 1] = b[i];
2480      b[i + 2] = b[i];
2481      b[i + 3] = b[i];
2482      b[i + 4] = b[i];
2483      b[i + 5] = b[i];
2484      b[i + 6] = b[i];
2485      b[i + 7] = b[i];
2486    }
2487
2488    byte a = (byte) (65 + r.nextInt(26));
2489    for (; i < length; i++) {
2490      b[i] = a;
2491    }
2492    return b;
2493  }
2494
2495  static byte[] getRandomRow(final Random random, final int totalRows) {
2496    return format(generateRandomRow(random, totalRows));
2497  }
2498
2499  static int generateRandomRow(final Random random, final int totalRows) {
2500    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2501  }
2502
2503  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2504    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2505    throws IOException, InterruptedException {
2506    status.setStatus(
2507      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2508    long totalElapsedTime;
2509
2510    final TestBase t;
2511    try {
2512      if (AsyncTest.class.isAssignableFrom(cmd)) {
2513        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2514        Constructor<? extends AsyncTest> constructor =
2515          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2516        t = constructor.newInstance(asyncCon, opts, status);
2517      } else {
2518        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2519        Constructor<? extends Test> constructor =
2520          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2521        t = constructor.newInstance(con, opts, status);
2522      }
2523    } catch (NoSuchMethodException e) {
2524      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2525        + ".  It does not provide a constructor as described by "
2526        + "the javadoc comment.  Available constructors are: "
2527        + Arrays.toString(cmd.getConstructors()));
2528    } catch (Exception e) {
2529      throw new IllegalStateException("Failed to construct command class", e);
2530    }
2531    totalElapsedTime = t.test();
2532
2533    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2534      + " for " + opts.perClientRunRows + " rows" + " ("
2535      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2536        getAverageValueLength(opts), opts.families, opts.columns)
2537      + ")");
2538
2539    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2540      t.numOfReplyFromReplica, t.getLatencyHistogram());
2541  }
2542
2543  private static int getAverageValueLength(final TestOptions opts) {
2544    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2545  }
2546
2547  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2548    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2549    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2550    // the TestOptions introspection for us and dump the output in a readable format.
2551    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2552    Admin admin = null;
2553    Connection connection = null;
2554    try {
2555      connection = ConnectionFactory.createConnection(getConf());
2556      admin = connection.getAdmin();
2557      checkTable(admin, opts);
2558    } finally {
2559      if (admin != null) admin.close();
2560      if (connection != null) connection.close();
2561    }
2562    if (opts.nomapred) {
2563      doLocalClients(opts, getConf());
2564    } else {
2565      doMapReduce(opts, getConf());
2566    }
2567  }
2568
2569  protected void printUsage() {
2570    printUsage(PE_COMMAND_SHORTNAME, null);
2571  }
2572
2573  protected static void printUsage(final String message) {
2574    printUsage(PE_COMMAND_SHORTNAME, message);
2575  }
2576
2577  protected static void printUsageAndExit(final String message, final int exitCode) {
2578    printUsage(message);
2579    System.exit(exitCode);
2580  }
2581
2582  protected static void printUsage(final String shortName, final String message) {
2583    if (message != null && message.length() > 0) {
2584      System.err.println(message);
2585    }
2586    System.err.print("Usage: hbase " + shortName);
2587    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2588    System.err.println();
2589    System.err.println("General Options:");
2590    System.err.println(
2591      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2592    System.err
2593      .println(" oneCon          all the threads share the same connection. Default: False");
2594    System.err.println(" connCount          connections all threads share. "
2595      + "For example, if set to 2, then all thread share 2 connection. "
2596      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2597      + "if not, connCount=thread number");
2598
2599    System.err.println(" sampleRate      Execute test on a sample of total "
2600      + "rows. Only supported by randomRead. Default: 1.0");
2601    System.err.println(" period          Report every 'period' rows: "
2602      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2603    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2604    System.err.println(
2605      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2606    System.err.println(" latency         Set to report operation latencies. Default: False");
2607    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2608      + "over lantencyThreshold, unit in millisecond, default 0");
2609    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2610      + " rows have been treated. Default: 0");
2611    System.err
2612      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2613    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2614      + "'valueSize'; set on read for stats on size: Default: Not set.");
2615    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2616      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2617    System.err.println();
2618    System.err.println("Table Creation / Write Tests:");
2619    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2620    System.err.println(
2621      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2622        + ".  In case of randomReads and randomSeekScans this could"
2623        + " be specified along with --size to specify the number of rows to be scanned within"
2624        + " the total range specified by the size.");
2625    System.err.println(
2626      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2627        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2628        + " use size to specify the end range and --rows"
2629        + " specifies the number of rows within that range. " + "Default: 1.0.");
2630    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2631    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2632    System.err.println(
2633      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2634    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2635      + "'valueSize' in zipf form: Default: Not set.");
2636    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2637    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2638    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2639      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2640    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2641      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2642      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2643    System.err.println(
2644      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2645    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2646      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2647    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2648    System.err.println(" columns         Columns to write per row. Default: 1");
2649    System.err
2650      .println(" families        Specify number of column families for the table. Default: 1");
2651    System.err.println();
2652    System.err.println("Read Tests:");
2653    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2654      + " there by not returning any thing back to the client.  Helps to check the server side"
2655      + " performance.  Uses FilterAllFilter internally. ");
2656    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2657      + "by randomRead. Default: disabled");
2658    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2659      + "inmemory as far as possible. Not guaranteed that reads are always served "
2660      + "from memory.  Default: false");
2661    System.err
2662      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2663    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2664    System.err
2665      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2666        + "Uses the CompactingMemstore");
2667    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2668    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2669    System.err.println(
2670      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2671    System.err.println(" caching         Scan caching to use. Default: 30");
2672    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2673    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2674    System.err.println(
2675      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2676    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2677    System.err.println();
2678    System.err.println(" Note: -D properties will be applied to the conf used. ");
2679    System.err.println("  For example: ");
2680    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2681    System.err.println("   -Dmapreduce.task.timeout=60000");
2682    System.err.println();
2683    System.err.println("Command:");
2684    for (CmdDescriptor command : COMMANDS.values()) {
2685      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2686    }
2687    System.err.println();
2688    System.err.println("Args:");
2689    System.err.println(" nclients        Integer. Required. Total number of clients "
2690      + "(and HRegionServers) running. 1 <= value <= 500");
2691    System.err.println("Examples:");
2692    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2693    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2694    System.err.println(" To run 10 clients doing increments over ten rows:");
2695    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2696  }
2697
2698  /**
2699   * Parse options passed in via an arguments array. Assumes that array has been split on
2700   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2701   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2702   * arguments.
2703   */
2704  static TestOptions parseOpts(Queue<String> args) {
2705    TestOptions opts = new TestOptions();
2706
2707    String cmd = null;
2708    while ((cmd = args.poll()) != null) {
2709      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2710        // place item back onto queue so that caller knows parsing was incomplete
2711        args.add(cmd);
2712        break;
2713      }
2714
2715      final String nmr = "--nomapred";
2716      if (cmd.startsWith(nmr)) {
2717        opts.nomapred = true;
2718        continue;
2719      }
2720
2721      final String rows = "--rows=";
2722      if (cmd.startsWith(rows)) {
2723        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2724        continue;
2725      }
2726
2727      final String cycles = "--cycles=";
2728      if (cmd.startsWith(cycles)) {
2729        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2730        continue;
2731      }
2732
2733      final String sampleRate = "--sampleRate=";
2734      if (cmd.startsWith(sampleRate)) {
2735        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2736        continue;
2737      }
2738
2739      final String table = "--table=";
2740      if (cmd.startsWith(table)) {
2741        opts.tableName = cmd.substring(table.length());
2742        continue;
2743      }
2744
2745      final String startRow = "--startRow=";
2746      if (cmd.startsWith(startRow)) {
2747        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2748        continue;
2749      }
2750
2751      final String compress = "--compress=";
2752      if (cmd.startsWith(compress)) {
2753        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2754        continue;
2755      }
2756
2757      final String encryption = "--encryption=";
2758      if (cmd.startsWith(encryption)) {
2759        opts.encryption = cmd.substring(encryption.length());
2760        continue;
2761      }
2762
2763      final String traceRate = "--traceRate=";
2764      if (cmd.startsWith(traceRate)) {
2765        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2766        continue;
2767      }
2768
2769      final String blockEncoding = "--blockEncoding=";
2770      if (cmd.startsWith(blockEncoding)) {
2771        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2772        continue;
2773      }
2774
2775      final String flushCommits = "--flushCommits=";
2776      if (cmd.startsWith(flushCommits)) {
2777        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2778        continue;
2779      }
2780
2781      final String writeToWAL = "--writeToWAL=";
2782      if (cmd.startsWith(writeToWAL)) {
2783        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2784        continue;
2785      }
2786
2787      final String presplit = "--presplit=";
2788      if (cmd.startsWith(presplit)) {
2789        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2790        continue;
2791      }
2792
2793      final String inMemory = "--inmemory=";
2794      if (cmd.startsWith(inMemory)) {
2795        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2796        continue;
2797      }
2798
2799      final String autoFlush = "--autoFlush=";
2800      if (cmd.startsWith(autoFlush)) {
2801        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2802        continue;
2803      }
2804
2805      final String onceCon = "--oneCon=";
2806      if (cmd.startsWith(onceCon)) {
2807        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2808        continue;
2809      }
2810
2811      final String connCount = "--connCount=";
2812      if (cmd.startsWith(connCount)) {
2813        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2814        continue;
2815      }
2816
2817      final String latencyThreshold = "--latencyThreshold=";
2818      if (cmd.startsWith(latencyThreshold)) {
2819        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2820        continue;
2821      }
2822
2823      final String latency = "--latency";
2824      if (cmd.startsWith(latency)) {
2825        opts.reportLatency = true;
2826        continue;
2827      }
2828
2829      final String multiGet = "--multiGet=";
2830      if (cmd.startsWith(multiGet)) {
2831        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2832        continue;
2833      }
2834
2835      final String multiPut = "--multiPut=";
2836      if (cmd.startsWith(multiPut)) {
2837        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2838        continue;
2839      }
2840
2841      final String useTags = "--usetags=";
2842      if (cmd.startsWith(useTags)) {
2843        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2844        continue;
2845      }
2846
2847      final String noOfTags = "--numoftags=";
2848      if (cmd.startsWith(noOfTags)) {
2849        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2850        continue;
2851      }
2852
2853      final String replicas = "--replicas=";
2854      if (cmd.startsWith(replicas)) {
2855        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2856        continue;
2857      }
2858
2859      final String filterOutAll = "--filterAll";
2860      if (cmd.startsWith(filterOutAll)) {
2861        opts.filterAll = true;
2862        continue;
2863      }
2864
2865      final String size = "--size=";
2866      if (cmd.startsWith(size)) {
2867        opts.size = Float.parseFloat(cmd.substring(size.length()));
2868        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2869        continue;
2870      }
2871
2872      final String splitPolicy = "--splitPolicy=";
2873      if (cmd.startsWith(splitPolicy)) {
2874        opts.splitPolicy = cmd.substring(splitPolicy.length());
2875        continue;
2876      }
2877
2878      final String randomSleep = "--randomSleep=";
2879      if (cmd.startsWith(randomSleep)) {
2880        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2881        continue;
2882      }
2883
2884      final String measureAfter = "--measureAfter=";
2885      if (cmd.startsWith(measureAfter)) {
2886        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2887        continue;
2888      }
2889
2890      final String bloomFilter = "--bloomFilter=";
2891      if (cmd.startsWith(bloomFilter)) {
2892        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2893        continue;
2894      }
2895
2896      final String blockSize = "--blockSize=";
2897      if (cmd.startsWith(blockSize)) {
2898        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2899        continue;
2900      }
2901
2902      final String valueSize = "--valueSize=";
2903      if (cmd.startsWith(valueSize)) {
2904        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2905        continue;
2906      }
2907
2908      final String valueRandom = "--valueRandom";
2909      if (cmd.startsWith(valueRandom)) {
2910        opts.valueRandom = true;
2911        continue;
2912      }
2913
2914      final String valueZipf = "--valueZipf";
2915      if (cmd.startsWith(valueZipf)) {
2916        opts.valueZipf = true;
2917        continue;
2918      }
2919
2920      final String period = "--period=";
2921      if (cmd.startsWith(period)) {
2922        opts.period = Integer.parseInt(cmd.substring(period.length()));
2923        continue;
2924      }
2925
2926      final String addColumns = "--addColumns=";
2927      if (cmd.startsWith(addColumns)) {
2928        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2929        continue;
2930      }
2931
2932      final String inMemoryCompaction = "--inmemoryCompaction=";
2933      if (cmd.startsWith(inMemoryCompaction)) {
2934        opts.inMemoryCompaction =
2935          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2936        continue;
2937      }
2938
2939      final String columns = "--columns=";
2940      if (cmd.startsWith(columns)) {
2941        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2942        continue;
2943      }
2944
2945      final String families = "--families=";
2946      if (cmd.startsWith(families)) {
2947        opts.families = Integer.parseInt(cmd.substring(families.length()));
2948        continue;
2949      }
2950
2951      final String caching = "--caching=";
2952      if (cmd.startsWith(caching)) {
2953        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2954        continue;
2955      }
2956
2957      final String asyncPrefetch = "--asyncPrefetch";
2958      if (cmd.startsWith(asyncPrefetch)) {
2959        opts.asyncPrefetch = true;
2960        continue;
2961      }
2962
2963      final String cacheBlocks = "--cacheBlocks=";
2964      if (cmd.startsWith(cacheBlocks)) {
2965        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2966        continue;
2967      }
2968
2969      final String scanReadType = "--scanReadType=";
2970      if (cmd.startsWith(scanReadType)) {
2971        opts.scanReadType =
2972          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2973        continue;
2974      }
2975
2976      final String bufferSize = "--bufferSize=";
2977      if (cmd.startsWith(bufferSize)) {
2978        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2979        continue;
2980      }
2981
2982      validateParsedOpts(opts);
2983
2984      if (isCommandClass(cmd)) {
2985        opts.cmdName = cmd;
2986        try {
2987          opts.numClientThreads = Integer.parseInt(args.remove());
2988        } catch (NoSuchElementException | NumberFormatException e) {
2989          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2990        }
2991        opts = calculateRowsAndSize(opts);
2992        break;
2993      } else {
2994        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2995      }
2996
2997      // Not matching any option or command.
2998      System.err.println("Error: Wrong option or command: " + cmd);
2999      args.add(cmd);
3000      break;
3001    }
3002    return opts;
3003  }
3004
3005  /**
3006   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3007   */
3008  private static void validateParsedOpts(TestOptions opts) {
3009
3010    if (!opts.autoFlush && opts.multiPut > 0) {
3011      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3012    }
3013
3014    if (opts.oneCon && opts.connCount > 1) {
3015      throw new IllegalArgumentException(
3016        "oneCon is set to true, " + "connCount should not bigger than 1");
3017    }
3018
3019    if (opts.valueZipf && opts.valueRandom) {
3020      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3021    }
3022  }
3023
3024  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3025    int rowsPerGB = getRowsPerGB(opts);
3026    if (
3027      (opts.getCmdName() != null
3028        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3029        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3030    ) {
3031      opts.totalRows = (int) opts.size * rowsPerGB;
3032    } else if (opts.size != DEFAULT_OPTS.size) {
3033      // total size in GB specified
3034      opts.totalRows = (int) opts.size * rowsPerGB;
3035      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3036    } else {
3037      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3038      opts.size = opts.totalRows / rowsPerGB;
3039    }
3040    return opts;
3041  }
3042
3043  static int getRowsPerGB(final TestOptions opts) {
3044    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3045      * opts.getColumns());
3046  }
3047
3048  @Override
3049  public int run(String[] args) throws Exception {
3050    // Process command-line args. TODO: Better cmd-line processing
3051    // (but hopefully something not as painful as cli options).
3052    int errCode = -1;
3053    if (args.length < 1) {
3054      printUsage();
3055      return errCode;
3056    }
3057
3058    try {
3059      LinkedList<String> argv = new LinkedList<>();
3060      argv.addAll(Arrays.asList(args));
3061      TestOptions opts = parseOpts(argv);
3062
3063      // args remaining, print help and exit
3064      if (!argv.isEmpty()) {
3065        errCode = 0;
3066        printUsage();
3067        return errCode;
3068      }
3069
3070      // must run at least 1 client
3071      if (opts.numClientThreads <= 0) {
3072        throw new IllegalArgumentException("Number of clients must be > 0");
3073      }
3074
3075      // cmdName should not be null, print help and exit
3076      if (opts.cmdName == null) {
3077        printUsage();
3078        return errCode;
3079      }
3080
3081      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3082      if (cmdClass != null) {
3083        runTest(cmdClass, opts);
3084        errCode = 0;
3085      }
3086
3087    } catch (Exception e) {
3088      e.printStackTrace();
3089    }
3090
3091    return errCode;
3092  }
3093
3094  private static boolean isCommandClass(String cmd) {
3095    return COMMANDS.containsKey(cmd);
3096  }
3097
3098  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3099    CmdDescriptor descriptor = COMMANDS.get(cmd);
3100    return descriptor != null ? descriptor.getCmdClass() : null;
3101  }
3102
3103  public static void main(final String[] args) throws Exception {
3104    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3105    System.exit(res);
3106  }
3107}