001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Queue;
040import java.util.Random;
041import java.util.TreeMap;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.ThreadLocalRandom;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.AsyncConnection;
056import org.apache.hadoop.hbase.client.AsyncTable;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Consistency;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.RowMutations;
074import org.apache.hadoop.hbase.client.Scan;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
079import org.apache.hadoop.hbase.filter.BinaryComparator;
080import org.apache.hadoop.hbase.filter.Filter;
081import org.apache.hadoop.hbase.filter.FilterAllFilter;
082import org.apache.hadoop.hbase.filter.FilterList;
083import org.apache.hadoop.hbase.filter.PageFilter;
084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
085import org.apache.hadoop.hbase.filter.WhileMatchFilter;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
091import org.apache.hadoop.hbase.trace.TraceUtil;
092import org.apache.hadoop.hbase.util.ByteArrayHashKey;
093import org.apache.hadoop.hbase.util.Bytes;
094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
095import org.apache.hadoop.hbase.util.GsonUtil;
096import org.apache.hadoop.hbase.util.Hash;
097import org.apache.hadoop.hbase.util.MurmurHash;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.hbase.util.RandomDistribution;
100import org.apache.hadoop.hbase.util.YammerHistogramUtils;
101import org.apache.hadoop.io.LongWritable;
102import org.apache.hadoop.io.Text;
103import org.apache.hadoop.mapreduce.Job;
104import org.apache.hadoop.mapreduce.Mapper;
105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
108import org.apache.hadoop.util.Tool;
109import org.apache.hadoop.util.ToolRunner;
110import org.apache.yetus.audience.InterfaceAudience;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113
114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
116import org.apache.hbase.thirdparty.com.google.gson.Gson;
117
118/**
119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
121 * etc.). Pass on the command-line which test to run and how many clients are participating in this
122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
123 * <p>
124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
126 * pages 8-10.
127 * <p>
128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
130 * about 1GB of data, unless specified otherwise.
131 */
132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
133public class PerformanceEvaluation extends Configured implements Tool {
134  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
135  static final String RANDOM_READ = "randomRead";
136  static final String PE_COMMAND_SHORTNAME = "pe";
137  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
138  private static final Gson GSON = GsonUtil.createGson().create();
139
140  public static final String TABLE_NAME = "TestTable";
141  public static final String FAMILY_NAME_BASE = "info";
142  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
143  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
144  public static final int DEFAULT_VALUE_LENGTH = 1000;
145  public static final int ROW_LENGTH = 26;
146
147  private static final int ONE_GB = 1024 * 1024 * 1000;
148  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
149  // TODO : should we make this configurable
150  private static final int TAG_LENGTH = 256;
151  private static final DecimalFormat FMT = new DecimalFormat("0.##");
152  private static final MathContext CXT = MathContext.DECIMAL64;
153  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
154  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
155  private static final TestOptions DEFAULT_OPTS = new TestOptions();
156
157  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
158  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
159
160  static {
161    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
162      "Run async random read test");
163    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
164      "Run async random write test");
165    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
166      "Run async sequential read test");
167    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
168      "Run async sequential write test");
169    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
170    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
171    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
172    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
173      "Run random seek and scan 100 test");
174    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
175      "Run random seek scan with both start and stop row (max 10 rows)");
176    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
177      "Run random seek scan with both start and stop row (max 100 rows)");
178    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
179      "Run random seek scan with both start and stop row (max 1000 rows)");
180    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
181      "Run random seek scan with both start and stop row (max 10000 rows)");
182    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
183    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
184    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
185    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
186      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
187    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
188    addCommandDescriptor(ReverseScanTest.class, "reverseScan",
189      "Run reverse scan test (read every row)");
190    addCommandDescriptor(FilteredScanTest.class, "filterScan",
191      "Run scan test using a filter to find a specific row based on it's value "
192        + "(make sure to use --rows=20)");
193    addCommandDescriptor(IncrementTest.class, "increment",
194      "Increment on each row; clients overlap on keyspace so some concurrent operations");
195    addCommandDescriptor(AppendTest.class, "append",
196      "Append on each row; clients overlap on keyspace so some concurrent operations");
197    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
198      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
200      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
202      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
204      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
205  }
206
207  /**
208   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
209   * associated properties.
210   */
211  protected static enum Counter {
212    /** elapsed time */
213    ELAPSED_TIME,
214    /** number of rows */
215    ROWS
216  }
217
218  protected static class RunResult implements Comparable<RunResult> {
219    public RunResult(long duration, Histogram hist) {
220      this.duration = duration;
221      this.hist = hist;
222      numbOfReplyOverThreshold = 0;
223      numOfReplyFromReplica = 0;
224    }
225
226    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
227      Histogram hist) {
228      this.duration = duration;
229      this.hist = hist;
230      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
231      this.numOfReplyFromReplica = numOfReplyFromReplica;
232    }
233
234    public final long duration;
235    public final Histogram hist;
236    public final long numbOfReplyOverThreshold;
237    public final long numOfReplyFromReplica;
238
239    @Override
240    public String toString() {
241      return Long.toString(duration);
242    }
243
244    @Override
245    public int compareTo(RunResult o) {
246      return Long.compare(this.duration, o.duration);
247    }
248  }
249
250  /**
251   * Constructor
252   * @param conf Configuration object
253   */
254  public PerformanceEvaluation(final Configuration conf) {
255    super(conf);
256  }
257
258  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
259    String description) {
260    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
261    COMMANDS.put(name, cmdDescriptor);
262  }
263
264  /**
265   * Implementations can have their status set.
266   */
267  interface Status {
268    /**
269     * Sets status
270     * @param msg status message
271     */
272    void setStatus(final String msg) throws IOException;
273  }
274
275  /**
276   * MapReduce job that runs a performance evaluation client in each map task.
277   */
278  public static class EvaluationMapTask
279    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
280
281    /** configuration parameter name that contains the command */
282    public final static String CMD_KEY = "EvaluationMapTask.command";
283    /** configuration parameter name that contains the PE impl */
284    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
285
286    private Class<? extends Test> cmd;
287
288    @Override
289    protected void setup(Context context) throws IOException, InterruptedException {
290      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
291
292      // this is required so that extensions of PE are instantiated within the
293      // map reduce task...
294      Class<? extends PerformanceEvaluation> peClass =
295        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
296      try {
297        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
298      } catch (Exception e) {
299        throw new IllegalStateException("Could not instantiate PE instance", e);
300      }
301    }
302
303    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
304      try {
305        return Class.forName(className).asSubclass(type);
306      } catch (ClassNotFoundException e) {
307        throw new IllegalStateException("Could not find class for name: " + className, e);
308      }
309    }
310
311    @Override
312    protected void map(LongWritable key, Text value, final Context context)
313      throws IOException, InterruptedException {
314
315      Status status = new Status() {
316        @Override
317        public void setStatus(String msg) {
318          context.setStatus(msg);
319        }
320      };
321
322      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
323      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
324      final Connection con = ConnectionFactory.createConnection(conf);
325      AsyncConnection asyncCon = null;
326      try {
327        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
328      } catch (ExecutionException e) {
329        throw new IOException(e);
330      }
331
332      // Evaluation task
333      RunResult result =
334        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
335      // Collect how much time the thing took. Report as map output and
336      // to the ELAPSED_TIME counter.
337      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
338      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
339      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
340      context.progress();
341    }
342  }
343
344  /*
345   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
346   * is specified or when the existing table's region replica count doesn't match {@code
347   * opts.replicas}.
348   */
349  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
350    TableName tableName = TableName.valueOf(opts.tableName);
351    boolean needsDelete = false, exists = admin.tableExists(tableName);
352    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
353      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
354    if (!exists && isReadCmd) {
355      throw new IllegalStateException(
356        "Must specify an existing table for read commands. Run a write command first.");
357    }
358    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
359    byte[][] splits = getSplits(opts);
360
361    // recreate the table when user has requested presplit or when existing
362    // {RegionSplitPolicy,replica count} does not match requested, or when the
363    // number of column families does not match requested.
364    if (
365      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
366        || (!isReadCmd && desc != null
367          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
368        || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
369        || (desc != null && desc.getColumnFamilyCount() != opts.families)
370    ) {
371      needsDelete = true;
372      // wait, why did it delete my table?!?
373      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
374        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
375        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
376        .add("replicas", opts.replicas).add("families", opts.families).toString());
377    }
378
379    // remove an existing table
380    if (needsDelete) {
381      if (admin.isTableEnabled(tableName)) {
382        admin.disableTable(tableName);
383      }
384      admin.deleteTable(tableName);
385    }
386
387    // table creation is necessary
388    if (!exists || needsDelete) {
389      desc = getTableDescriptor(opts);
390      if (splits != null) {
391        if (LOG.isDebugEnabled()) {
392          for (int i = 0; i < splits.length; i++) {
393            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
394          }
395        }
396      }
397      if (splits != null) {
398        admin.createTable(desc, splits);
399      } else {
400        admin.createTable(desc);
401      }
402      LOG.info("Table " + desc + " created");
403    }
404    return admin.tableExists(tableName);
405  }
406
407  /**
408   * Create an HTableDescriptor from provided TestOptions.
409   */
410  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
411    TableDescriptorBuilder builder =
412      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
413
414    for (int family = 0; family < opts.families; family++) {
415      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
416      ColumnFamilyDescriptorBuilder cfBuilder =
417        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
418      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
419      cfBuilder.setCompressionType(opts.compression);
420      cfBuilder.setEncryptionType(opts.encryption);
421      cfBuilder.setBloomFilterType(opts.bloomType);
422      cfBuilder.setBlocksize(opts.blockSize);
423      if (opts.inMemoryCF) {
424        cfBuilder.setInMemory(true);
425      }
426      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
427      builder.setColumnFamily(cfBuilder.build());
428    }
429    if (opts.replicas != DEFAULT_OPTS.replicas) {
430      builder.setRegionReplication(opts.replicas);
431    }
432    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
433      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
434    }
435    return builder.build();
436  }
437
438  /**
439   * generates splits based on total number of rows and specified split regions
440   */
441  protected static byte[][] getSplits(TestOptions opts) {
442    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
443
444    int numSplitPoints = opts.presplitRegions - 1;
445    byte[][] splits = new byte[numSplitPoints][];
446    int jump = opts.totalRows / opts.presplitRegions;
447    for (int i = 0; i < numSplitPoints; i++) {
448      int rowkey = jump * (1 + i);
449      splits[i] = format(rowkey);
450    }
451    return splits;
452  }
453
454  static void setupConnectionCount(final TestOptions opts) {
455    if (opts.oneCon) {
456      opts.connCount = 1;
457    } else {
458      if (opts.connCount == -1) {
459        // set to thread number if connCount is not set
460        opts.connCount = opts.numClientThreads;
461      }
462    }
463  }
464
465  /*
466   * Run all clients in this vm each to its own thread.
467   */
468  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
469    throws IOException, InterruptedException, ExecutionException {
470    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
471    assert cmd != null;
472    @SuppressWarnings("unchecked")
473    Future<RunResult>[] threads = new Future[opts.numClientThreads];
474    RunResult[] results = new RunResult[opts.numClientThreads];
475    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
476      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
477    setupConnectionCount(opts);
478    final Connection[] cons = new Connection[opts.connCount];
479    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
480    for (int i = 0; i < opts.connCount; i++) {
481      cons[i] = ConnectionFactory.createConnection(conf);
482      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
483    }
484    LOG
485      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
486    for (int i = 0; i < threads.length; i++) {
487      final int index = i;
488      threads[i] = pool.submit(new Callable<RunResult>() {
489        @Override
490        public RunResult call() throws Exception {
491          TestOptions threadOpts = new TestOptions(opts);
492          final Connection con = cons[index % cons.length];
493          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
494          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
495          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
496            @Override
497            public void setStatus(final String msg) throws IOException {
498              LOG.info(msg);
499            }
500          });
501          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
502            + "ms over " + threadOpts.perClientRunRows + " rows");
503          if (opts.latencyThreshold > 0) {
504            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
505              + "(ms) is " + run.numbOfReplyOverThreshold);
506          }
507          return run;
508        }
509      });
510    }
511    pool.shutdown();
512
513    for (int i = 0; i < threads.length; i++) {
514      try {
515        results[i] = threads[i].get();
516      } catch (ExecutionException e) {
517        throw new IOException(e.getCause());
518      }
519    }
520    final String test = cmd.getSimpleName();
521    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
522    Arrays.sort(results);
523    long total = 0;
524    float avgLatency = 0;
525    float avgTPS = 0;
526    long replicaWins = 0;
527    for (RunResult result : results) {
528      total += result.duration;
529      avgLatency += result.hist.getSnapshot().getMean();
530      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
531      replicaWins += result.numOfReplyFromReplica;
532    }
533    avgTPS *= 1000; // ms to second
534    avgLatency = avgLatency / results.length;
535    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
536      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
537    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
538    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
539    if (opts.replicas > 1) {
540      LOG.info("[results from replica regions] " + replicaWins);
541    }
542
543    for (int i = 0; i < opts.connCount; i++) {
544      cons[i].close();
545      asyncCons[i].close();
546    }
547
548    return results;
549  }
550
551  /*
552   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
553   * out an input file with instruction per client regards which row they are to start on.
554   * @param cmd Command to run.
555   */
556  static Job doMapReduce(TestOptions opts, final Configuration conf)
557    throws IOException, InterruptedException, ClassNotFoundException {
558    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
559    assert cmd != null;
560    Path inputDir = writeInputFile(conf, opts);
561    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
562    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
563    Job job = Job.getInstance(conf);
564    job.setJarByClass(PerformanceEvaluation.class);
565    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
566
567    job.setInputFormatClass(NLineInputFormat.class);
568    NLineInputFormat.setInputPaths(job, inputDir);
569    // this is default, but be explicit about it just in case.
570    NLineInputFormat.setNumLinesPerSplit(job, 1);
571
572    job.setOutputKeyClass(LongWritable.class);
573    job.setOutputValueClass(LongWritable.class);
574
575    job.setMapperClass(EvaluationMapTask.class);
576    job.setReducerClass(LongSumReducer.class);
577
578    job.setNumReduceTasks(1);
579
580    job.setOutputFormatClass(TextOutputFormat.class);
581    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
582
583    TableMapReduceUtil.addDependencyJars(job);
584    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
585                                                                                            // metrics
586      Gson.class, // gson
587      FilterAllFilter.class // hbase-server tests jar
588    );
589
590    TableMapReduceUtil.initCredentials(job);
591
592    job.waitForCompletion(true);
593    return job;
594  }
595
596  /**
597   * Each client has one mapper to do the work, and client do the resulting count in a map task.
598   */
599
600  static String JOB_INPUT_FILENAME = "input.txt";
601
602  /*
603   * Write input file of offsets-per-client for the mapreduce job.
604   * @param c Configuration
605   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
606   */
607  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
608    return writeInputFile(c, opts, new Path("."));
609  }
610
611  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
612    throws IOException {
613    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
614    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
615    Path inputDir = new Path(jobdir, "inputs");
616
617    FileSystem fs = FileSystem.get(c);
618    fs.mkdirs(inputDir);
619
620    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
621    PrintStream out = new PrintStream(fs.create(inputFile));
622    // Make input random.
623    Map<Integer, String> m = new TreeMap<>();
624    Hash h = MurmurHash.getInstance();
625    int perClientRows = (opts.totalRows / opts.numClientThreads);
626    try {
627      for (int j = 0; j < opts.numClientThreads; j++) {
628        TestOptions next = new TestOptions(opts);
629        next.startRow = j * perClientRows;
630        next.perClientRunRows = perClientRows;
631        String s = GSON.toJson(next);
632        LOG.info("Client=" + j + ", input=" + s);
633        byte[] b = Bytes.toBytes(s);
634        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
635        m.put(hash, s);
636      }
637      for (Map.Entry<Integer, String> e : m.entrySet()) {
638        out.println(e.getValue());
639      }
640    } finally {
641      out.close();
642    }
643    return inputDir;
644  }
645
646  /**
647   * Describes a command.
648   */
649  static class CmdDescriptor {
650    private Class<? extends TestBase> cmdClass;
651    private String name;
652    private String description;
653
654    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
655      this.cmdClass = cmdClass;
656      this.name = name;
657      this.description = description;
658    }
659
660    public Class<? extends TestBase> getCmdClass() {
661      return cmdClass;
662    }
663
664    public String getName() {
665      return name;
666    }
667
668    public String getDescription() {
669      return description;
670    }
671  }
672
673  /**
674   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
675   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
676   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
677   * need to add to the clone constructor below copying your new option from the 'that' to the
678   * 'this'. Look for 'clone' below.
679   */
680  static class TestOptions {
681    String cmdName = null;
682    boolean nomapred = false;
683    boolean filterAll = false;
684    int startRow = 0;
685    float size = 1.0f;
686    int perClientRunRows = DEFAULT_ROWS_PER_GB;
687    int numClientThreads = 1;
688    int totalRows = DEFAULT_ROWS_PER_GB;
689    int measureAfter = 0;
690    float sampleRate = 1.0f;
691    /**
692     * @deprecated Useless after switching to OpenTelemetry
693     */
694    @Deprecated
695    double traceRate = 0.0;
696    String tableName = TABLE_NAME;
697    boolean flushCommits = true;
698    boolean writeToWAL = true;
699    boolean autoFlush = false;
700    boolean oneCon = false;
701    int connCount = -1; // wil decide the actual num later
702    boolean useTags = false;
703    int noOfTags = 1;
704    boolean reportLatency = false;
705    int multiGet = 0;
706    int multiPut = 0;
707    int randomSleep = 0;
708    boolean inMemoryCF = false;
709    int presplitRegions = 0;
710    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
711    String splitPolicy = null;
712    Compression.Algorithm compression = Compression.Algorithm.NONE;
713    String encryption = null;
714    BloomType bloomType = BloomType.ROW;
715    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
716    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
717    boolean valueRandom = false;
718    boolean valueZipf = false;
719    int valueSize = DEFAULT_VALUE_LENGTH;
720    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
721    int cycles = 1;
722    int columns = 1;
723    int families = 1;
724    int caching = 30;
725    int latencyThreshold = 0; // in millsecond
726    boolean addColumns = true;
727    MemoryCompactionPolicy inMemoryCompaction =
728      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
729    boolean asyncPrefetch = false;
730    boolean cacheBlocks = true;
731    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
732    long bufferSize = 2l * 1024l * 1024l;
733
734    public TestOptions() {
735    }
736
737    /**
738     * Clone constructor.
739     * @param that Object to copy from.
740     */
741    public TestOptions(TestOptions that) {
742      this.cmdName = that.cmdName;
743      this.cycles = that.cycles;
744      this.nomapred = that.nomapred;
745      this.startRow = that.startRow;
746      this.size = that.size;
747      this.perClientRunRows = that.perClientRunRows;
748      this.numClientThreads = that.numClientThreads;
749      this.totalRows = that.totalRows;
750      this.sampleRate = that.sampleRate;
751      this.traceRate = that.traceRate;
752      this.tableName = that.tableName;
753      this.flushCommits = that.flushCommits;
754      this.writeToWAL = that.writeToWAL;
755      this.autoFlush = that.autoFlush;
756      this.oneCon = that.oneCon;
757      this.connCount = that.connCount;
758      this.useTags = that.useTags;
759      this.noOfTags = that.noOfTags;
760      this.reportLatency = that.reportLatency;
761      this.latencyThreshold = that.latencyThreshold;
762      this.multiGet = that.multiGet;
763      this.multiPut = that.multiPut;
764      this.inMemoryCF = that.inMemoryCF;
765      this.presplitRegions = that.presplitRegions;
766      this.replicas = that.replicas;
767      this.splitPolicy = that.splitPolicy;
768      this.compression = that.compression;
769      this.encryption = that.encryption;
770      this.blockEncoding = that.blockEncoding;
771      this.filterAll = that.filterAll;
772      this.bloomType = that.bloomType;
773      this.blockSize = that.blockSize;
774      this.valueRandom = that.valueRandom;
775      this.valueZipf = that.valueZipf;
776      this.valueSize = that.valueSize;
777      this.period = that.period;
778      this.randomSleep = that.randomSleep;
779      this.measureAfter = that.measureAfter;
780      this.addColumns = that.addColumns;
781      this.columns = that.columns;
782      this.families = that.families;
783      this.caching = that.caching;
784      this.inMemoryCompaction = that.inMemoryCompaction;
785      this.asyncPrefetch = that.asyncPrefetch;
786      this.cacheBlocks = that.cacheBlocks;
787      this.scanReadType = that.scanReadType;
788      this.bufferSize = that.bufferSize;
789    }
790
791    public int getCaching() {
792      return this.caching;
793    }
794
795    public void setCaching(final int caching) {
796      this.caching = caching;
797    }
798
799    public int getColumns() {
800      return this.columns;
801    }
802
803    public void setColumns(final int columns) {
804      this.columns = columns;
805    }
806
807    public int getFamilies() {
808      return this.families;
809    }
810
811    public void setFamilies(final int families) {
812      this.families = families;
813    }
814
815    public int getCycles() {
816      return this.cycles;
817    }
818
819    public void setCycles(final int cycles) {
820      this.cycles = cycles;
821    }
822
823    public boolean isValueZipf() {
824      return valueZipf;
825    }
826
827    public void setValueZipf(boolean valueZipf) {
828      this.valueZipf = valueZipf;
829    }
830
831    public String getCmdName() {
832      return cmdName;
833    }
834
835    public void setCmdName(String cmdName) {
836      this.cmdName = cmdName;
837    }
838
839    public int getRandomSleep() {
840      return randomSleep;
841    }
842
843    public void setRandomSleep(int randomSleep) {
844      this.randomSleep = randomSleep;
845    }
846
847    public int getReplicas() {
848      return replicas;
849    }
850
851    public void setReplicas(int replicas) {
852      this.replicas = replicas;
853    }
854
855    public String getSplitPolicy() {
856      return splitPolicy;
857    }
858
859    public void setSplitPolicy(String splitPolicy) {
860      this.splitPolicy = splitPolicy;
861    }
862
863    public void setNomapred(boolean nomapred) {
864      this.nomapred = nomapred;
865    }
866
867    public void setFilterAll(boolean filterAll) {
868      this.filterAll = filterAll;
869    }
870
871    public void setStartRow(int startRow) {
872      this.startRow = startRow;
873    }
874
875    public void setSize(float size) {
876      this.size = size;
877    }
878
879    public void setPerClientRunRows(int perClientRunRows) {
880      this.perClientRunRows = perClientRunRows;
881    }
882
883    public void setNumClientThreads(int numClientThreads) {
884      this.numClientThreads = numClientThreads;
885    }
886
887    public void setTotalRows(int totalRows) {
888      this.totalRows = totalRows;
889    }
890
891    public void setSampleRate(float sampleRate) {
892      this.sampleRate = sampleRate;
893    }
894
895    public void setTraceRate(double traceRate) {
896      this.traceRate = traceRate;
897    }
898
899    public void setTableName(String tableName) {
900      this.tableName = tableName;
901    }
902
903    public void setFlushCommits(boolean flushCommits) {
904      this.flushCommits = flushCommits;
905    }
906
907    public void setWriteToWAL(boolean writeToWAL) {
908      this.writeToWAL = writeToWAL;
909    }
910
911    public void setAutoFlush(boolean autoFlush) {
912      this.autoFlush = autoFlush;
913    }
914
915    public void setOneCon(boolean oneCon) {
916      this.oneCon = oneCon;
917    }
918
919    public int getConnCount() {
920      return connCount;
921    }
922
923    public void setConnCount(int connCount) {
924      this.connCount = connCount;
925    }
926
927    public void setUseTags(boolean useTags) {
928      this.useTags = useTags;
929    }
930
931    public void setNoOfTags(int noOfTags) {
932      this.noOfTags = noOfTags;
933    }
934
935    public void setReportLatency(boolean reportLatency) {
936      this.reportLatency = reportLatency;
937    }
938
939    public void setMultiGet(int multiGet) {
940      this.multiGet = multiGet;
941    }
942
943    public void setMultiPut(int multiPut) {
944      this.multiPut = multiPut;
945    }
946
947    public void setInMemoryCF(boolean inMemoryCF) {
948      this.inMemoryCF = inMemoryCF;
949    }
950
951    public void setPresplitRegions(int presplitRegions) {
952      this.presplitRegions = presplitRegions;
953    }
954
955    public void setCompression(Compression.Algorithm compression) {
956      this.compression = compression;
957    }
958
959    public void setEncryption(String encryption) {
960      this.encryption = encryption;
961    }
962
963    public void setBloomType(BloomType bloomType) {
964      this.bloomType = bloomType;
965    }
966
967    public void setBlockSize(int blockSize) {
968      this.blockSize = blockSize;
969    }
970
971    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
972      this.blockEncoding = blockEncoding;
973    }
974
975    public void setValueRandom(boolean valueRandom) {
976      this.valueRandom = valueRandom;
977    }
978
979    public void setValueSize(int valueSize) {
980      this.valueSize = valueSize;
981    }
982
983    public void setBufferSize(long bufferSize) {
984      this.bufferSize = bufferSize;
985    }
986
987    public void setPeriod(int period) {
988      this.period = period;
989    }
990
991    public boolean isNomapred() {
992      return nomapred;
993    }
994
995    public boolean isFilterAll() {
996      return filterAll;
997    }
998
999    public int getStartRow() {
1000      return startRow;
1001    }
1002
1003    public float getSize() {
1004      return size;
1005    }
1006
1007    public int getPerClientRunRows() {
1008      return perClientRunRows;
1009    }
1010
1011    public int getNumClientThreads() {
1012      return numClientThreads;
1013    }
1014
1015    public int getTotalRows() {
1016      return totalRows;
1017    }
1018
1019    public float getSampleRate() {
1020      return sampleRate;
1021    }
1022
1023    public double getTraceRate() {
1024      return traceRate;
1025    }
1026
1027    public String getTableName() {
1028      return tableName;
1029    }
1030
1031    public boolean isFlushCommits() {
1032      return flushCommits;
1033    }
1034
1035    public boolean isWriteToWAL() {
1036      return writeToWAL;
1037    }
1038
1039    public boolean isAutoFlush() {
1040      return autoFlush;
1041    }
1042
1043    public boolean isUseTags() {
1044      return useTags;
1045    }
1046
1047    public int getNoOfTags() {
1048      return noOfTags;
1049    }
1050
1051    public boolean isReportLatency() {
1052      return reportLatency;
1053    }
1054
1055    public int getMultiGet() {
1056      return multiGet;
1057    }
1058
1059    public int getMultiPut() {
1060      return multiPut;
1061    }
1062
1063    public boolean isInMemoryCF() {
1064      return inMemoryCF;
1065    }
1066
1067    public int getPresplitRegions() {
1068      return presplitRegions;
1069    }
1070
1071    public Compression.Algorithm getCompression() {
1072      return compression;
1073    }
1074
1075    public String getEncryption() {
1076      return encryption;
1077    }
1078
1079    public DataBlockEncoding getBlockEncoding() {
1080      return blockEncoding;
1081    }
1082
1083    public boolean isValueRandom() {
1084      return valueRandom;
1085    }
1086
1087    public int getValueSize() {
1088      return valueSize;
1089    }
1090
1091    public int getPeriod() {
1092      return period;
1093    }
1094
1095    public BloomType getBloomType() {
1096      return bloomType;
1097    }
1098
1099    public int getBlockSize() {
1100      return blockSize;
1101    }
1102
1103    public boolean isOneCon() {
1104      return oneCon;
1105    }
1106
1107    public int getMeasureAfter() {
1108      return measureAfter;
1109    }
1110
1111    public void setMeasureAfter(int measureAfter) {
1112      this.measureAfter = measureAfter;
1113    }
1114
1115    public boolean getAddColumns() {
1116      return addColumns;
1117    }
1118
1119    public void setAddColumns(boolean addColumns) {
1120      this.addColumns = addColumns;
1121    }
1122
1123    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1124      this.inMemoryCompaction = inMemoryCompaction;
1125    }
1126
1127    public MemoryCompactionPolicy getInMemoryCompaction() {
1128      return this.inMemoryCompaction;
1129    }
1130
1131    public long getBufferSize() {
1132      return this.bufferSize;
1133    }
1134  }
1135
1136  /*
1137   * A test. Subclass to particularize what happens per row.
1138   */
1139  static abstract class TestBase {
1140    // Below is make it so when Tests are all running in the one
1141    // jvm, that they each have a differently seeded Random.
1142    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1143
1144    private static long nextRandomSeed() {
1145      return randomSeed.nextLong();
1146    }
1147
1148    private final int everyN;
1149
1150    protected final Random rand = new Random(nextRandomSeed());
1151    protected final Configuration conf;
1152    protected final TestOptions opts;
1153
1154    private final Status status;
1155
1156    private String testName;
1157    private Histogram latencyHistogram;
1158    private Histogram replicaLatencyHistogram;
1159    private Histogram valueSizeHistogram;
1160    private Histogram rpcCallsHistogram;
1161    private Histogram remoteRpcCallsHistogram;
1162    private Histogram millisBetweenNextHistogram;
1163    private Histogram regionsScannedHistogram;
1164    private Histogram bytesInResultsHistogram;
1165    private Histogram bytesInRemoteResultsHistogram;
1166    private RandomDistribution.Zipf zipf;
1167    private long numOfReplyOverLatencyThreshold = 0;
1168    private long numOfReplyFromReplica = 0;
1169
1170    /**
1171     * Note that all subclasses of this class must provide a public constructor that has the exact
1172     * same list of arguments.
1173     */
1174    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1175      this.conf = conf;
1176      this.opts = options;
1177      this.status = status;
1178      this.testName = this.getClass().getSimpleName();
1179      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1180      if (options.isValueZipf()) {
1181        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1182      }
1183      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1184    }
1185
1186    int getValueLength(final Random r) {
1187      if (this.opts.isValueRandom()) {
1188        return r.nextInt(opts.valueSize);
1189      } else if (this.opts.isValueZipf()) {
1190        return Math.abs(this.zipf.nextInt());
1191      } else {
1192        return opts.valueSize;
1193      }
1194    }
1195
1196    void updateValueSize(final Result[] rs) throws IOException {
1197      updateValueSize(rs, 0);
1198    }
1199
1200    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1201      if (rs == null || (latency == 0)) return;
1202      for (Result r : rs)
1203        updateValueSize(r, latency);
1204    }
1205
1206    void updateValueSize(final Result r) throws IOException {
1207      updateValueSize(r, 0);
1208    }
1209
1210    void updateValueSize(final Result r, final long latency) throws IOException {
1211      if (r == null || (latency == 0)) return;
1212      int size = 0;
1213      // update replicaHistogram
1214      if (r.isStale()) {
1215        replicaLatencyHistogram.update(latency / 1000);
1216        numOfReplyFromReplica++;
1217      }
1218      if (!isRandomValueSize()) return;
1219
1220      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1221        size += scanner.current().getValueLength();
1222      }
1223      updateValueSize(size);
1224    }
1225
1226    void updateValueSize(final int valueSize) {
1227      if (!isRandomValueSize()) return;
1228      this.valueSizeHistogram.update(valueSize);
1229    }
1230
1231    void updateScanMetrics(final ScanMetrics metrics) {
1232      if (metrics == null) return;
1233      Map<String, Long> metricsMap = metrics.getMetricsMap();
1234      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1235      if (rpcCalls != null) {
1236        this.rpcCallsHistogram.update(rpcCalls.longValue());
1237      }
1238      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1239      if (remoteRpcCalls != null) {
1240        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1241      }
1242      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1243      if (millisBetweenNext != null) {
1244        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1245      }
1246      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1247      if (regionsScanned != null) {
1248        this.regionsScannedHistogram.update(regionsScanned.longValue());
1249      }
1250      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1251      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1252        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1253      }
1254      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1255      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1256        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1257      }
1258    }
1259
1260    String generateStatus(final int sr, final int i, final int lr) {
1261      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1262        + getShortLatencyReport() + "]"
1263        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1264    }
1265
1266    boolean isRandomValueSize() {
1267      return opts.valueRandom;
1268    }
1269
1270    protected int getReportingPeriod() {
1271      return opts.period;
1272    }
1273
1274    /**
1275     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1276     */
1277    public Histogram getLatencyHistogram() {
1278      return latencyHistogram;
1279    }
1280
1281    void testSetup() throws IOException {
1282      // test metrics
1283      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1284      // If it is a replica test, set up histogram for replica.
1285      if (opts.replicas > 1) {
1286        replicaLatencyHistogram =
1287          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1288      }
1289      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1290      // scan metrics
1291      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1292      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1293      millisBetweenNextHistogram =
1294        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1295      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1296      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1297      bytesInRemoteResultsHistogram =
1298        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1299
1300      onStartup();
1301    }
1302
1303    abstract void onStartup() throws IOException;
1304
1305    void testTakedown() throws IOException {
1306      onTakedown();
1307      // Print all stats for this thread continuously.
1308      // Synchronize on Test.class so different threads don't intermingle the
1309      // output. We can't use 'this' here because each thread has its own instance of Test class.
1310      synchronized (Test.class) {
1311        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1312        status
1313          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1314        if (opts.replicas > 1) {
1315          status.setStatus("Latency (us) from Replica Regions: "
1316            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1317        }
1318        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1319        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1320        if (valueSizeHistogram.getCount() > 0) {
1321          status.setStatus(
1322            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1323          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1324          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1325        } else {
1326          status.setStatus("No valueSize statistics available");
1327        }
1328        if (rpcCallsHistogram.getCount() > 0) {
1329          status.setStatus(
1330            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1331        }
1332        if (remoteRpcCallsHistogram.getCount() > 0) {
1333          status.setStatus("remoteRpcCalls (count): "
1334            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1335        }
1336        if (millisBetweenNextHistogram.getCount() > 0) {
1337          status.setStatus("millisBetweenNext (latency): "
1338            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1339        }
1340        if (regionsScannedHistogram.getCount() > 0) {
1341          status.setStatus("regionsScanned (count): "
1342            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1343        }
1344        if (bytesInResultsHistogram.getCount() > 0) {
1345          status.setStatus("bytesInResults (size): "
1346            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1347        }
1348        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1349          status.setStatus("bytesInRemoteResults (size): "
1350            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1351        }
1352      }
1353    }
1354
1355    abstract void onTakedown() throws IOException;
1356
1357    /*
1358     * Run test
1359     * @return Elapsed time.
1360     */
1361    long test() throws IOException, InterruptedException {
1362      testSetup();
1363      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1364      final long startTime = System.nanoTime();
1365      try {
1366        testTimed();
1367      } finally {
1368        testTakedown();
1369      }
1370      return (System.nanoTime() - startTime) / 1000000;
1371    }
1372
1373    int getStartRow() {
1374      return opts.startRow;
1375    }
1376
1377    int getLastRow() {
1378      return getStartRow() + opts.perClientRunRows;
1379    }
1380
1381    /**
1382     * Provides an extension point for tests that don't want a per row invocation.
1383     */
1384    void testTimed() throws IOException, InterruptedException {
1385      int startRow = getStartRow();
1386      int lastRow = getLastRow();
1387      // Report on completion of 1/10th of total.
1388      for (int ii = 0; ii < opts.cycles; ii++) {
1389        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1390        for (int i = startRow; i < lastRow; i++) {
1391          if (i % everyN != 0) continue;
1392          long startTime = System.nanoTime();
1393          boolean requestSent = false;
1394          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1395          try (Scope scope = span.makeCurrent()) {
1396            requestSent = testRow(i, startTime);
1397          } finally {
1398            span.end();
1399          }
1400          if ((i - startRow) > opts.measureAfter) {
1401            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1402            // first 9 times and sends the actual get request in the 10th iteration.
1403            // We should only set latency when actual request is sent because otherwise
1404            // it turns out to be 0.
1405            if (requestSent) {
1406              long latency = (System.nanoTime() - startTime) / 1000;
1407              latencyHistogram.update(latency);
1408              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1409                numOfReplyOverLatencyThreshold++;
1410              }
1411            }
1412            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1413              status.setStatus(generateStatus(startRow, i, lastRow));
1414            }
1415          }
1416        }
1417      }
1418    }
1419
1420    /** Returns Subset of the histograms' calculation. */
1421    public String getShortLatencyReport() {
1422      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1423    }
1424
1425    /** Returns Subset of the histograms' calculation. */
1426    public String getShortValueSizeReport() {
1427      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1428    }
1429
1430    /**
1431     * Test for individual row.
1432     * @param i Row index.
1433     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1434     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1435     */
1436    abstract boolean testRow(final int i, final long startTime)
1437      throws IOException, InterruptedException;
1438  }
1439
1440  static abstract class Test extends TestBase {
1441    protected Connection connection;
1442
1443    Test(final Connection con, final TestOptions options, final Status status) {
1444      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1445      this.connection = con;
1446    }
1447  }
1448
1449  static abstract class AsyncTest extends TestBase {
1450    protected AsyncConnection connection;
1451
1452    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1453      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1454      this.connection = con;
1455    }
1456  }
1457
1458  static abstract class TableTest extends Test {
1459    protected Table table;
1460
1461    TableTest(Connection con, TestOptions options, Status status) {
1462      super(con, options, status);
1463    }
1464
1465    @Override
1466    void onStartup() throws IOException {
1467      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1468    }
1469
1470    @Override
1471    void onTakedown() throws IOException {
1472      table.close();
1473    }
1474  }
1475
1476  /*
1477   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1478   */
1479  static abstract class MetaTest extends TableTest {
1480    protected int keyLength;
1481
1482    MetaTest(Connection con, TestOptions options, Status status) {
1483      super(con, options, status);
1484      keyLength = Integer.toString(opts.perClientRunRows).length();
1485    }
1486
1487    @Override
1488    void onTakedown() throws IOException {
1489      // No clean up
1490    }
1491
1492    /*
1493     * Generates Lexicographically ascending strings
1494     */
1495    protected byte[] getSplitKey(final int i) {
1496      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1497    }
1498
1499  }
1500
1501  static abstract class AsyncTableTest extends AsyncTest {
1502    protected AsyncTable<?> table;
1503
1504    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1505      super(con, options, status);
1506    }
1507
1508    @Override
1509    void onStartup() throws IOException {
1510      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1511    }
1512
1513    @Override
1514    void onTakedown() throws IOException {
1515    }
1516  }
1517
1518  static class AsyncRandomReadTest extends AsyncTableTest {
1519    private final Consistency consistency;
1520    private ArrayList<Get> gets;
1521
1522    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1523      super(con, options, status);
1524      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1525      if (opts.multiGet > 0) {
1526        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1527        this.gets = new ArrayList<>(opts.multiGet);
1528      }
1529    }
1530
1531    @Override
1532    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1533      if (opts.randomSleep > 0) {
1534        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1535      }
1536      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1537      for (int family = 0; family < opts.families; family++) {
1538        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1539        if (opts.addColumns) {
1540          for (int column = 0; column < opts.columns; column++) {
1541            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1542            get.addColumn(familyName, qualifier);
1543          }
1544        } else {
1545          get.addFamily(familyName);
1546        }
1547      }
1548      if (opts.filterAll) {
1549        get.setFilter(new FilterAllFilter());
1550      }
1551      get.setConsistency(consistency);
1552      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1553      try {
1554        if (opts.multiGet > 0) {
1555          this.gets.add(get);
1556          if (this.gets.size() == opts.multiGet) {
1557            Result[] rs =
1558              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1559            updateValueSize(rs);
1560            this.gets.clear();
1561          } else {
1562            return false;
1563          }
1564        } else {
1565          updateValueSize(this.table.get(get).get());
1566        }
1567      } catch (ExecutionException e) {
1568        throw new IOException(e);
1569      }
1570      return true;
1571    }
1572
1573    public static RuntimeException runtime(Throwable e) {
1574      if (e instanceof RuntimeException) {
1575        return (RuntimeException) e;
1576      }
1577      return new RuntimeException(e);
1578    }
1579
1580    public static <V> V propagate(Callable<V> callable) {
1581      try {
1582        return callable.call();
1583      } catch (Exception e) {
1584        throw runtime(e);
1585      }
1586    }
1587
1588    @Override
1589    protected int getReportingPeriod() {
1590      int period = opts.perClientRunRows / 10;
1591      return period == 0 ? opts.perClientRunRows : period;
1592    }
1593
1594    @Override
1595    protected void testTakedown() throws IOException {
1596      if (this.gets != null && this.gets.size() > 0) {
1597        this.table.get(gets);
1598        this.gets.clear();
1599      }
1600      super.testTakedown();
1601    }
1602  }
1603
1604  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1605
1606    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1607      super(con, options, status);
1608    }
1609
1610    @Override
1611    protected byte[] generateRow(final int i) {
1612      return getRandomRow(this.rand, opts.totalRows);
1613    }
1614  }
1615
1616  static class AsyncScanTest extends AsyncTableTest {
1617    private ResultScanner testScanner;
1618    private AsyncTable<?> asyncTable;
1619
1620    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1621      super(con, options, status);
1622    }
1623
1624    @Override
1625    void onStartup() throws IOException {
1626      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1627        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1628    }
1629
1630    @Override
1631    void testTakedown() throws IOException {
1632      if (this.testScanner != null) {
1633        updateScanMetrics(this.testScanner.getScanMetrics());
1634        this.testScanner.close();
1635      }
1636      super.testTakedown();
1637    }
1638
1639    @Override
1640    boolean testRow(final int i, final long startTime) throws IOException {
1641      if (this.testScanner == null) {
1642        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1643          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1644          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1645        for (int family = 0; family < opts.families; family++) {
1646          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1647          if (opts.addColumns) {
1648            for (int column = 0; column < opts.columns; column++) {
1649              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1650              scan.addColumn(familyName, qualifier);
1651            }
1652          } else {
1653            scan.addFamily(familyName);
1654          }
1655        }
1656        if (opts.filterAll) {
1657          scan.setFilter(new FilterAllFilter());
1658        }
1659        this.testScanner = asyncTable.getScanner(scan);
1660      }
1661      Result r = testScanner.next();
1662      updateValueSize(r);
1663      return true;
1664    }
1665  }
1666
1667  static class AsyncSequentialReadTest extends AsyncTableTest {
1668    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1669      super(con, options, status);
1670    }
1671
1672    @Override
1673    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1674      Get get = new Get(format(i));
1675      for (int family = 0; family < opts.families; family++) {
1676        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1677        if (opts.addColumns) {
1678          for (int column = 0; column < opts.columns; column++) {
1679            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1680            get.addColumn(familyName, qualifier);
1681          }
1682        } else {
1683          get.addFamily(familyName);
1684        }
1685      }
1686      if (opts.filterAll) {
1687        get.setFilter(new FilterAllFilter());
1688      }
1689      try {
1690        updateValueSize(table.get(get).get());
1691      } catch (ExecutionException e) {
1692        throw new IOException(e);
1693      }
1694      return true;
1695    }
1696  }
1697
1698  static class AsyncSequentialWriteTest extends AsyncTableTest {
1699    private ArrayList<Put> puts;
1700
1701    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1702      super(con, options, status);
1703      if (opts.multiPut > 0) {
1704        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1705        this.puts = new ArrayList<>(opts.multiPut);
1706      }
1707    }
1708
1709    protected byte[] generateRow(final int i) {
1710      return format(i);
1711    }
1712
1713    @Override
1714    @SuppressWarnings("ReturnValueIgnored")
1715    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1716      byte[] row = generateRow(i);
1717      Put put = new Put(row);
1718      for (int family = 0; family < opts.families; family++) {
1719        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1720        for (int column = 0; column < opts.columns; column++) {
1721          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1722          byte[] value = generateData(this.rand, getValueLength(this.rand));
1723          if (opts.useTags) {
1724            byte[] tag = generateData(this.rand, TAG_LENGTH);
1725            Tag[] tags = new Tag[opts.noOfTags];
1726            for (int n = 0; n < opts.noOfTags; n++) {
1727              Tag t = new ArrayBackedTag((byte) n, tag);
1728              tags[n] = t;
1729            }
1730            KeyValue kv =
1731              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1732            put.add(kv);
1733            updateValueSize(kv.getValueLength());
1734          } else {
1735            put.addColumn(familyName, qualifier, value);
1736            updateValueSize(value.length);
1737          }
1738        }
1739      }
1740      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1741      try {
1742        table.put(put).get();
1743        if (opts.multiPut > 0) {
1744          this.puts.add(put);
1745          if (this.puts.size() == opts.multiPut) {
1746            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1747            this.puts.clear();
1748          } else {
1749            return false;
1750          }
1751        } else {
1752          table.put(put).get();
1753        }
1754      } catch (ExecutionException e) {
1755        throw new IOException(e);
1756      }
1757      return true;
1758    }
1759  }
1760
1761  static abstract class BufferedMutatorTest extends Test {
1762    protected BufferedMutator mutator;
1763    protected Table table;
1764
1765    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1766      super(con, options, status);
1767    }
1768
1769    @Override
1770    void onStartup() throws IOException {
1771      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1772      p.writeBufferSize(opts.bufferSize);
1773      this.mutator = connection.getBufferedMutator(p);
1774      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1775    }
1776
1777    @Override
1778    void onTakedown() throws IOException {
1779      mutator.close();
1780      table.close();
1781    }
1782  }
1783
1784  static class RandomSeekScanTest extends TableTest {
1785    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1786      super(con, options, status);
1787    }
1788
1789    @Override
1790    boolean testRow(final int i, final long startTime) throws IOException {
1791      Scan scan =
1792        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1793          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1794          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1795      FilterList list = new FilterList();
1796      for (int family = 0; family < opts.families; family++) {
1797        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1798        if (opts.addColumns) {
1799          for (int column = 0; column < opts.columns; column++) {
1800            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1801            scan.addColumn(familyName, qualifier);
1802          }
1803        } else {
1804          scan.addFamily(familyName);
1805        }
1806      }
1807      if (opts.filterAll) {
1808        list.addFilter(new FilterAllFilter());
1809      }
1810      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1811      scan.setFilter(list);
1812      ResultScanner s = this.table.getScanner(scan);
1813      try {
1814        for (Result rr; (rr = s.next()) != null;) {
1815          updateValueSize(rr);
1816        }
1817      } finally {
1818        updateScanMetrics(s.getScanMetrics());
1819        s.close();
1820      }
1821      return true;
1822    }
1823
1824    @Override
1825    protected int getReportingPeriod() {
1826      int period = opts.perClientRunRows / 100;
1827      return period == 0 ? opts.perClientRunRows : period;
1828    }
1829
1830  }
1831
1832  static abstract class RandomScanWithRangeTest extends TableTest {
1833    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1834      super(con, options, status);
1835    }
1836
1837    @Override
1838    boolean testRow(final int i, final long startTime) throws IOException {
1839      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1840      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1841        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1842        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1843        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1844      for (int family = 0; family < opts.families; family++) {
1845        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1846        if (opts.addColumns) {
1847          for (int column = 0; column < opts.columns; column++) {
1848            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1849            scan.addColumn(familyName, qualifier);
1850          }
1851        } else {
1852          scan.addFamily(familyName);
1853        }
1854      }
1855      if (opts.filterAll) {
1856        scan.setFilter(new FilterAllFilter());
1857      }
1858      Result r = null;
1859      int count = 0;
1860      ResultScanner s = this.table.getScanner(scan);
1861      try {
1862        for (; (r = s.next()) != null;) {
1863          updateValueSize(r);
1864          count++;
1865        }
1866        if (i % 100 == 0) {
1867          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1868            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1869            count));
1870        }
1871      } finally {
1872        updateScanMetrics(s.getScanMetrics());
1873        s.close();
1874      }
1875      return true;
1876    }
1877
1878    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1879
1880    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1881      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1882      int stop = start + maxRange;
1883      return new Pair<>(format(start), format(stop));
1884    }
1885
1886    @Override
1887    protected int getReportingPeriod() {
1888      int period = opts.perClientRunRows / 100;
1889      return period == 0 ? opts.perClientRunRows : period;
1890    }
1891  }
1892
1893  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1894    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1895      super(con, options, status);
1896    }
1897
1898    @Override
1899    protected Pair<byte[], byte[]> getStartAndStopRow() {
1900      return generateStartAndStopRows(10);
1901    }
1902  }
1903
1904  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1905    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1906      super(con, options, status);
1907    }
1908
1909    @Override
1910    protected Pair<byte[], byte[]> getStartAndStopRow() {
1911      return generateStartAndStopRows(100);
1912    }
1913  }
1914
1915  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1916    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1917      super(con, options, status);
1918    }
1919
1920    @Override
1921    protected Pair<byte[], byte[]> getStartAndStopRow() {
1922      return generateStartAndStopRows(1000);
1923    }
1924  }
1925
1926  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1927    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1928      super(con, options, status);
1929    }
1930
1931    @Override
1932    protected Pair<byte[], byte[]> getStartAndStopRow() {
1933      return generateStartAndStopRows(10000);
1934    }
1935  }
1936
1937  static class RandomReadTest extends TableTest {
1938    private final Consistency consistency;
1939    private ArrayList<Get> gets;
1940
1941    RandomReadTest(Connection con, TestOptions options, Status status) {
1942      super(con, options, status);
1943      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1944      if (opts.multiGet > 0) {
1945        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1946        this.gets = new ArrayList<>(opts.multiGet);
1947      }
1948    }
1949
1950    @Override
1951    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1952      if (opts.randomSleep > 0) {
1953        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1954      }
1955      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1956      for (int family = 0; family < opts.families; family++) {
1957        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1958        if (opts.addColumns) {
1959          for (int column = 0; column < opts.columns; column++) {
1960            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1961            get.addColumn(familyName, qualifier);
1962          }
1963        } else {
1964          get.addFamily(familyName);
1965        }
1966      }
1967      if (opts.filterAll) {
1968        get.setFilter(new FilterAllFilter());
1969      }
1970      get.setConsistency(consistency);
1971      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1972      if (opts.multiGet > 0) {
1973        this.gets.add(get);
1974        if (this.gets.size() == opts.multiGet) {
1975          Result[] rs = this.table.get(this.gets);
1976          if (opts.replicas > 1) {
1977            long latency = System.nanoTime() - startTime;
1978            updateValueSize(rs, latency);
1979          } else {
1980            updateValueSize(rs);
1981          }
1982          this.gets.clear();
1983        } else {
1984          return false;
1985        }
1986      } else {
1987        if (opts.replicas > 1) {
1988          Result r = this.table.get(get);
1989          long latency = System.nanoTime() - startTime;
1990          updateValueSize(r, latency);
1991        } else {
1992          updateValueSize(this.table.get(get));
1993        }
1994      }
1995      return true;
1996    }
1997
1998    @Override
1999    protected int getReportingPeriod() {
2000      int period = opts.perClientRunRows / 10;
2001      return period == 0 ? opts.perClientRunRows : period;
2002    }
2003
2004    @Override
2005    protected void testTakedown() throws IOException {
2006      if (this.gets != null && this.gets.size() > 0) {
2007        this.table.get(gets);
2008        this.gets.clear();
2009      }
2010      super.testTakedown();
2011    }
2012  }
2013
2014  /*
2015   * Send random reads against fake regions inserted by MetaWriteTest
2016   */
2017  static class MetaRandomReadTest extends MetaTest {
2018    private RegionLocator regionLocator;
2019
2020    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2021      super(con, options, status);
2022      LOG.info("call getRegionLocation");
2023    }
2024
2025    @Override
2026    void onStartup() throws IOException {
2027      super.onStartup();
2028      this.regionLocator = connection.getRegionLocator(table.getName());
2029    }
2030
2031    @Override
2032    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2033      if (opts.randomSleep > 0) {
2034        Thread.sleep(rand.nextInt(opts.randomSleep));
2035      }
2036      HRegionLocation hRegionLocation =
2037        regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true);
2038      LOG.debug("get location for region: " + hRegionLocation);
2039      return true;
2040    }
2041
2042    @Override
2043    protected int getReportingPeriod() {
2044      int period = opts.perClientRunRows / 10;
2045      return period == 0 ? opts.perClientRunRows : period;
2046    }
2047
2048    @Override
2049    protected void testTakedown() throws IOException {
2050      super.testTakedown();
2051    }
2052  }
2053
2054  static class RandomWriteTest extends SequentialWriteTest {
2055    RandomWriteTest(Connection con, TestOptions options, Status status) {
2056      super(con, options, status);
2057    }
2058
2059    @Override
2060    protected byte[] generateRow(final int i) {
2061      return getRandomRow(this.rand, opts.totalRows);
2062    }
2063
2064  }
2065
2066  static class ScanTest extends TableTest {
2067    private ResultScanner testScanner;
2068
2069    ScanTest(Connection con, TestOptions options, Status status) {
2070      super(con, options, status);
2071    }
2072
2073    @Override
2074    void testTakedown() throws IOException {
2075      if (this.testScanner != null) {
2076        this.testScanner.close();
2077      }
2078      super.testTakedown();
2079    }
2080
2081    @Override
2082    boolean testRow(final int i, final long startTime) throws IOException {
2083      if (this.testScanner == null) {
2084        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2085          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2086          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2087        for (int family = 0; family < opts.families; family++) {
2088          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2089          if (opts.addColumns) {
2090            for (int column = 0; column < opts.columns; column++) {
2091              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2092              scan.addColumn(familyName, qualifier);
2093            }
2094          } else {
2095            scan.addFamily(familyName);
2096          }
2097        }
2098        if (opts.filterAll) {
2099          scan.setFilter(new FilterAllFilter());
2100        }
2101        this.testScanner = table.getScanner(scan);
2102      }
2103      Result r = testScanner.next();
2104      updateValueSize(r);
2105      return true;
2106    }
2107  }
2108
2109  static class ReverseScanTest extends TableTest {
2110    private ResultScanner testScanner;
2111
2112    ReverseScanTest(Connection con, TestOptions options, Status status) {
2113      super(con, options, status);
2114    }
2115
2116    @Override
2117    void testTakedown() throws IOException {
2118      if (this.testScanner != null) {
2119        this.testScanner.close();
2120      }
2121      super.testTakedown();
2122    }
2123
2124    @Override
2125    boolean testRow(final int i, final long startTime) throws IOException {
2126      if (this.testScanner == null) {
2127        Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2128          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2129          .setScanMetricsEnabled(true).setReversed(true);
2130        for (int family = 0; family < opts.families; family++) {
2131          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2132          if (opts.addColumns) {
2133            for (int column = 0; column < opts.columns; column++) {
2134              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2135              scan.addColumn(familyName, qualifier);
2136            }
2137          } else {
2138            scan.addFamily(familyName);
2139          }
2140        }
2141        if (opts.filterAll) {
2142          scan.setFilter(new FilterAllFilter());
2143        }
2144        this.testScanner = table.getScanner(scan);
2145      }
2146      Result r = testScanner.next();
2147      updateValueSize(r);
2148      return true;
2149    }
2150  }
2151
2152  /**
2153   * Base class for operations that are CAS-like; that read a value and then set it based off what
2154   * they read. In this category is increment, append, checkAndPut, etc.
2155   * <p>
2156   * These operations also want some concurrency going on. Usually when these tests run, they
2157   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2158   * same key space. We do this with our getStartRow and getLastRow overrides.
2159   */
2160  static abstract class CASTableTest extends TableTest {
2161    private final byte[] qualifier;
2162
2163    CASTableTest(Connection con, TestOptions options, Status status) {
2164      super(con, options, status);
2165      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2166    }
2167
2168    byte[] getQualifier() {
2169      return this.qualifier;
2170    }
2171
2172    @Override
2173    int getStartRow() {
2174      return 0;
2175    }
2176
2177    @Override
2178    int getLastRow() {
2179      return opts.perClientRunRows;
2180    }
2181  }
2182
2183  static class IncrementTest extends CASTableTest {
2184    IncrementTest(Connection con, TestOptions options, Status status) {
2185      super(con, options, status);
2186    }
2187
2188    @Override
2189    boolean testRow(final int i, final long startTime) throws IOException {
2190      Increment increment = new Increment(format(i));
2191      // unlike checkAndXXX tests, which make most sense to do on a single value,
2192      // if multiple families are specified for an increment test we assume it is
2193      // meant to raise the work factor
2194      for (int family = 0; family < opts.families; family++) {
2195        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2196        increment.addColumn(familyName, getQualifier(), 1l);
2197      }
2198      updateValueSize(this.table.increment(increment));
2199      return true;
2200    }
2201  }
2202
2203  static class AppendTest extends CASTableTest {
2204    AppendTest(Connection con, TestOptions options, Status status) {
2205      super(con, options, status);
2206    }
2207
2208    @Override
2209    boolean testRow(final int i, final long startTime) throws IOException {
2210      byte[] bytes = format(i);
2211      Append append = new Append(bytes);
2212      // unlike checkAndXXX tests, which make most sense to do on a single value,
2213      // if multiple families are specified for an append test we assume it is
2214      // meant to raise the work factor
2215      for (int family = 0; family < opts.families; family++) {
2216        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2217        append.addColumn(familyName, getQualifier(), bytes);
2218      }
2219      updateValueSize(this.table.append(append));
2220      return true;
2221    }
2222  }
2223
2224  static class CheckAndMutateTest extends CASTableTest {
2225    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2226      super(con, options, status);
2227    }
2228
2229    @Override
2230    boolean testRow(final int i, final long startTime) throws IOException {
2231      final byte[] bytes = format(i);
2232      // checkAndXXX tests operate on only a single value
2233      // Put a known value so when we go to check it, it is there.
2234      Put put = new Put(bytes);
2235      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2236      this.table.put(put);
2237      RowMutations mutations = new RowMutations(bytes);
2238      mutations.add(put);
2239      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2240        .thenMutate(mutations);
2241      return true;
2242    }
2243  }
2244
2245  static class CheckAndPutTest extends CASTableTest {
2246    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2247      super(con, options, status);
2248    }
2249
2250    @Override
2251    boolean testRow(final int i, final long startTime) throws IOException {
2252      final byte[] bytes = format(i);
2253      // checkAndXXX tests operate on only a single value
2254      // Put a known value so when we go to check it, it is there.
2255      Put put = new Put(bytes);
2256      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2257      this.table.put(put);
2258      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2259        .thenPut(put);
2260      return true;
2261    }
2262  }
2263
2264  static class CheckAndDeleteTest extends CASTableTest {
2265    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2266      super(con, options, status);
2267    }
2268
2269    @Override
2270    boolean testRow(final int i, final long startTime) throws IOException {
2271      final byte[] bytes = format(i);
2272      // checkAndXXX tests operate on only a single value
2273      // Put a known value so when we go to check it, it is there.
2274      Put put = new Put(bytes);
2275      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2276      this.table.put(put);
2277      Delete delete = new Delete(put.getRow());
2278      delete.addColumn(FAMILY_ZERO, getQualifier());
2279      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2280        .thenDelete(delete);
2281      return true;
2282    }
2283  }
2284
2285  /*
2286   * Delete all fake regions inserted to meta table by MetaWriteTest.
2287   */
2288  static class CleanMetaTest extends MetaTest {
2289    CleanMetaTest(Connection con, TestOptions options, Status status) {
2290      super(con, options, status);
2291    }
2292
2293    @Override
2294    boolean testRow(final int i, final long startTime) throws IOException {
2295      try {
2296        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2297          .getRegionLocation(getSplitKey(i), false).getRegion();
2298        LOG.debug("deleting region from meta: " + regionInfo);
2299
2300        Delete delete =
2301          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2302        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2303          t.delete(delete);
2304        }
2305      } catch (IOException ie) {
2306        // Log and continue
2307        LOG.error("cannot find region with start key: " + i);
2308      }
2309      return true;
2310    }
2311  }
2312
2313  static class SequentialReadTest extends TableTest {
2314    SequentialReadTest(Connection con, TestOptions options, Status status) {
2315      super(con, options, status);
2316    }
2317
2318    @Override
2319    boolean testRow(final int i, final long startTime) throws IOException {
2320      Get get = new Get(format(i));
2321      for (int family = 0; family < opts.families; family++) {
2322        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2323        if (opts.addColumns) {
2324          for (int column = 0; column < opts.columns; column++) {
2325            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2326            get.addColumn(familyName, qualifier);
2327          }
2328        } else {
2329          get.addFamily(familyName);
2330        }
2331      }
2332      if (opts.filterAll) {
2333        get.setFilter(new FilterAllFilter());
2334      }
2335      updateValueSize(table.get(get));
2336      return true;
2337    }
2338  }
2339
2340  static class SequentialWriteTest extends BufferedMutatorTest {
2341    private ArrayList<Put> puts;
2342
2343    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2344      super(con, options, status);
2345      if (opts.multiPut > 0) {
2346        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2347        this.puts = new ArrayList<>(opts.multiPut);
2348      }
2349    }
2350
2351    protected byte[] generateRow(final int i) {
2352      return format(i);
2353    }
2354
2355    @Override
2356    boolean testRow(final int i, final long startTime) throws IOException {
2357      byte[] row = generateRow(i);
2358      Put put = new Put(row);
2359      for (int family = 0; family < opts.families; family++) {
2360        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2361        for (int column = 0; column < opts.columns; column++) {
2362          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2363          byte[] value = generateData(this.rand, getValueLength(this.rand));
2364          if (opts.useTags) {
2365            byte[] tag = generateData(this.rand, TAG_LENGTH);
2366            Tag[] tags = new Tag[opts.noOfTags];
2367            for (int n = 0; n < opts.noOfTags; n++) {
2368              Tag t = new ArrayBackedTag((byte) n, tag);
2369              tags[n] = t;
2370            }
2371            KeyValue kv =
2372              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2373            put.add(kv);
2374            updateValueSize(kv.getValueLength());
2375          } else {
2376            put.addColumn(familyName, qualifier, value);
2377            updateValueSize(value.length);
2378          }
2379        }
2380      }
2381      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2382      if (opts.autoFlush) {
2383        if (opts.multiPut > 0) {
2384          this.puts.add(put);
2385          if (this.puts.size() == opts.multiPut) {
2386            table.put(this.puts);
2387            this.puts.clear();
2388          } else {
2389            return false;
2390          }
2391        } else {
2392          table.put(put);
2393        }
2394      } else {
2395        mutator.mutate(put);
2396      }
2397      return true;
2398    }
2399  }
2400
2401  /*
2402   * Insert fake regions into meta table with contiguous split keys.
2403   */
2404  static class MetaWriteTest extends MetaTest {
2405
2406    MetaWriteTest(Connection con, TestOptions options, Status status) {
2407      super(con, options, status);
2408    }
2409
2410    @Override
2411    boolean testRow(final int i, final long startTime) throws IOException {
2412      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2413      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2414        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2415      regionInfos.add(regionInfo);
2416      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2417
2418      // write the serverName columns
2419      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2420        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2421        EnvironmentEdgeManager.currentTime());
2422      return true;
2423    }
2424  }
2425
2426  static class FilteredScanTest extends TableTest {
2427    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2428
2429    FilteredScanTest(Connection con, TestOptions options, Status status) {
2430      super(con, options, status);
2431      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2432        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2433          + ". This could take a very long time.");
2434      }
2435    }
2436
2437    @Override
2438    boolean testRow(int i, final long startTime) throws IOException {
2439      byte[] value = generateData(this.rand, getValueLength(this.rand));
2440      Scan scan = constructScan(value);
2441      ResultScanner scanner = null;
2442      try {
2443        scanner = this.table.getScanner(scan);
2444        for (Result r = null; (r = scanner.next()) != null;) {
2445          updateValueSize(r);
2446        }
2447      } finally {
2448        if (scanner != null) {
2449          updateScanMetrics(scanner.getScanMetrics());
2450          scanner.close();
2451        }
2452      }
2453      return true;
2454    }
2455
2456    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2457      FilterList list = new FilterList();
2458      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2459        new BinaryComparator(valuePrefix));
2460      list.addFilter(filter);
2461      if (opts.filterAll) {
2462        list.addFilter(new FilterAllFilter());
2463      }
2464      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2465        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2466        .setScanMetricsEnabled(true);
2467      if (opts.addColumns) {
2468        for (int column = 0; column < opts.columns; column++) {
2469          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2470          scan.addColumn(FAMILY_ZERO, qualifier);
2471        }
2472      } else {
2473        scan.addFamily(FAMILY_ZERO);
2474      }
2475      scan.setFilter(list);
2476      return scan;
2477    }
2478  }
2479
2480  /**
2481   * Compute a throughput rate in MB/s.
2482   * @param rows   Number of records consumed.
2483   * @param timeMs Time taken in milliseconds.
2484   * @return String value with label, ie '123.76 MB/s'
2485   */
2486  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2487    int columns) {
2488    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2489      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2490    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2491      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2492    return FMT.format(mbps) + " MB/s";
2493  }
2494
2495  /*
2496   * Format passed integer.
2497   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2498   * absolute in case number is negative).
2499   */
2500  public static byte[] format(final int number) {
2501    byte[] b = new byte[ROW_LENGTH];
2502    int d = Math.abs(number);
2503    for (int i = b.length - 1; i >= 0; i--) {
2504      b[i] = (byte) ((d % 10) + '0');
2505      d /= 10;
2506    }
2507    return b;
2508  }
2509
2510  /*
2511   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2512   * test, generation of the key and value consumes about 30% of CPU time.
2513   * @return Generated random value to insert into a table cell.
2514   */
2515  public static byte[] generateData(final Random r, int length) {
2516    byte[] b = new byte[length];
2517    int i;
2518
2519    for (i = 0; i < (length - 8); i += 8) {
2520      b[i] = (byte) (65 + r.nextInt(26));
2521      b[i + 1] = b[i];
2522      b[i + 2] = b[i];
2523      b[i + 3] = b[i];
2524      b[i + 4] = b[i];
2525      b[i + 5] = b[i];
2526      b[i + 6] = b[i];
2527      b[i + 7] = b[i];
2528    }
2529
2530    byte a = (byte) (65 + r.nextInt(26));
2531    for (; i < length; i++) {
2532      b[i] = a;
2533    }
2534    return b;
2535  }
2536
2537  static byte[] getRandomRow(final Random random, final int totalRows) {
2538    return format(generateRandomRow(random, totalRows));
2539  }
2540
2541  static int generateRandomRow(final Random random, final int totalRows) {
2542    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2543  }
2544
2545  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2546    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2547    throws IOException, InterruptedException {
2548    status.setStatus(
2549      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2550    long totalElapsedTime;
2551
2552    final TestBase t;
2553    try {
2554      if (AsyncTest.class.isAssignableFrom(cmd)) {
2555        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2556        Constructor<? extends AsyncTest> constructor =
2557          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2558        t = constructor.newInstance(asyncCon, opts, status);
2559      } else {
2560        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2561        Constructor<? extends Test> constructor =
2562          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2563        t = constructor.newInstance(con, opts, status);
2564      }
2565    } catch (NoSuchMethodException e) {
2566      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2567        + ".  It does not provide a constructor as described by "
2568        + "the javadoc comment.  Available constructors are: "
2569        + Arrays.toString(cmd.getConstructors()));
2570    } catch (Exception e) {
2571      throw new IllegalStateException("Failed to construct command class", e);
2572    }
2573    totalElapsedTime = t.test();
2574
2575    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2576      + " for " + opts.perClientRunRows + " rows" + " ("
2577      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2578        getAverageValueLength(opts), opts.families, opts.columns)
2579      + ")");
2580
2581    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2582      t.numOfReplyFromReplica, t.getLatencyHistogram());
2583  }
2584
2585  private static int getAverageValueLength(final TestOptions opts) {
2586    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2587  }
2588
2589  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2590    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2591    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2592    // the TestOptions introspection for us and dump the output in a readable format.
2593    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2594    Admin admin = null;
2595    Connection connection = null;
2596    try {
2597      connection = ConnectionFactory.createConnection(getConf());
2598      admin = connection.getAdmin();
2599      checkTable(admin, opts);
2600    } finally {
2601      if (admin != null) admin.close();
2602      if (connection != null) connection.close();
2603    }
2604    if (opts.nomapred) {
2605      doLocalClients(opts, getConf());
2606    } else {
2607      doMapReduce(opts, getConf());
2608    }
2609  }
2610
2611  protected void printUsage() {
2612    printUsage(PE_COMMAND_SHORTNAME, null);
2613  }
2614
2615  protected static void printUsage(final String message) {
2616    printUsage(PE_COMMAND_SHORTNAME, message);
2617  }
2618
2619  protected static void printUsageAndExit(final String message, final int exitCode) {
2620    printUsage(message);
2621    System.exit(exitCode);
2622  }
2623
2624  protected static void printUsage(final String shortName, final String message) {
2625    if (message != null && message.length() > 0) {
2626      System.err.println(message);
2627    }
2628    System.err.print("Usage: hbase " + shortName);
2629    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2630    System.err.println();
2631    System.err.println("General Options:");
2632    System.err.println(
2633      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2634    System.err
2635      .println(" oneCon          all the threads share the same connection. Default: False");
2636    System.err.println(" connCount          connections all threads share. "
2637      + "For example, if set to 2, then all thread share 2 connection. "
2638      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2639      + "if not, connCount=thread number");
2640
2641    System.err.println(" sampleRate      Execute test on a sample of total "
2642      + "rows. Only supported by randomRead. Default: 1.0");
2643    System.err.println(" period          Report every 'period' rows: "
2644      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2645    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2646    System.err.println(
2647      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2648    System.err.println(" latency         Set to report operation latencies. Default: False");
2649    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2650      + "over lantencyThreshold, unit in millisecond, default 0");
2651    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2652      + " rows have been treated. Default: 0");
2653    System.err
2654      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2655    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2656      + "'valueSize'; set on read for stats on size: Default: Not set.");
2657    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2658      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2659    System.err.println();
2660    System.err.println("Table Creation / Write Tests:");
2661    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2662    System.err.println(
2663      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2664        + ".  In case of randomReads and randomSeekScans this could"
2665        + " be specified along with --size to specify the number of rows to be scanned within"
2666        + " the total range specified by the size.");
2667    System.err.println(
2668      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2669        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2670        + " use size to specify the end range and --rows"
2671        + " specifies the number of rows within that range. " + "Default: 1.0.");
2672    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2673    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2674    System.err.println(
2675      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2676    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2677      + "'valueSize' in zipf form: Default: Not set.");
2678    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2679    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2680    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2681      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2682    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2683      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2684      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2685    System.err.println(
2686      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2687    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2688      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2689    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2690    System.err.println(" columns         Columns to write per row. Default: 1");
2691    System.err
2692      .println(" families        Specify number of column families for the table. Default: 1");
2693    System.err.println();
2694    System.err.println("Read Tests:");
2695    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2696      + " there by not returning any thing back to the client.  Helps to check the server side"
2697      + " performance.  Uses FilterAllFilter internally. ");
2698    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2699      + "by randomRead. Default: disabled");
2700    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2701      + "inmemory as far as possible. Not guaranteed that reads are always served "
2702      + "from memory.  Default: false");
2703    System.err
2704      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2705    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2706    System.err
2707      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2708        + "Uses the CompactingMemstore");
2709    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2710    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2711    System.err.println(
2712      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2713    System.err.println(" caching         Scan caching to use. Default: 30");
2714    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2715    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2716    System.err.println(
2717      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2718    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2719    System.err.println();
2720    System.err.println(" Note: -D properties will be applied to the conf used. ");
2721    System.err.println("  For example: ");
2722    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2723    System.err.println("   -Dmapreduce.task.timeout=60000");
2724    System.err.println();
2725    System.err.println("Command:");
2726    for (CmdDescriptor command : COMMANDS.values()) {
2727      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2728    }
2729    System.err.println();
2730    System.err.println("Args:");
2731    System.err.println(" nclients        Integer. Required. Total number of clients "
2732      + "(and HRegionServers) running. 1 <= value <= 500");
2733    System.err.println("Examples:");
2734    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2735    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2736    System.err.println(" To run 10 clients doing increments over ten rows:");
2737    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2738  }
2739
2740  /**
2741   * Parse options passed in via an arguments array. Assumes that array has been split on
2742   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2743   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2744   * arguments.
2745   */
2746  static TestOptions parseOpts(Queue<String> args) {
2747    TestOptions opts = new TestOptions();
2748
2749    String cmd = null;
2750    while ((cmd = args.poll()) != null) {
2751      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2752        // place item back onto queue so that caller knows parsing was incomplete
2753        args.add(cmd);
2754        break;
2755      }
2756
2757      final String nmr = "--nomapred";
2758      if (cmd.startsWith(nmr)) {
2759        opts.nomapred = true;
2760        continue;
2761      }
2762
2763      final String rows = "--rows=";
2764      if (cmd.startsWith(rows)) {
2765        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2766        continue;
2767      }
2768
2769      final String cycles = "--cycles=";
2770      if (cmd.startsWith(cycles)) {
2771        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2772        continue;
2773      }
2774
2775      final String sampleRate = "--sampleRate=";
2776      if (cmd.startsWith(sampleRate)) {
2777        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2778        continue;
2779      }
2780
2781      final String table = "--table=";
2782      if (cmd.startsWith(table)) {
2783        opts.tableName = cmd.substring(table.length());
2784        continue;
2785      }
2786
2787      final String startRow = "--startRow=";
2788      if (cmd.startsWith(startRow)) {
2789        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2790        continue;
2791      }
2792
2793      final String compress = "--compress=";
2794      if (cmd.startsWith(compress)) {
2795        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2796        continue;
2797      }
2798
2799      final String encryption = "--encryption=";
2800      if (cmd.startsWith(encryption)) {
2801        opts.encryption = cmd.substring(encryption.length());
2802        continue;
2803      }
2804
2805      final String traceRate = "--traceRate=";
2806      if (cmd.startsWith(traceRate)) {
2807        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2808        continue;
2809      }
2810
2811      final String blockEncoding = "--blockEncoding=";
2812      if (cmd.startsWith(blockEncoding)) {
2813        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2814        continue;
2815      }
2816
2817      final String flushCommits = "--flushCommits=";
2818      if (cmd.startsWith(flushCommits)) {
2819        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2820        continue;
2821      }
2822
2823      final String writeToWAL = "--writeToWAL=";
2824      if (cmd.startsWith(writeToWAL)) {
2825        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2826        continue;
2827      }
2828
2829      final String presplit = "--presplit=";
2830      if (cmd.startsWith(presplit)) {
2831        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2832        continue;
2833      }
2834
2835      final String inMemory = "--inmemory=";
2836      if (cmd.startsWith(inMemory)) {
2837        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2838        continue;
2839      }
2840
2841      final String autoFlush = "--autoFlush=";
2842      if (cmd.startsWith(autoFlush)) {
2843        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2844        continue;
2845      }
2846
2847      final String onceCon = "--oneCon=";
2848      if (cmd.startsWith(onceCon)) {
2849        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2850        continue;
2851      }
2852
2853      final String connCount = "--connCount=";
2854      if (cmd.startsWith(connCount)) {
2855        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2856        continue;
2857      }
2858
2859      final String latencyThreshold = "--latencyThreshold=";
2860      if (cmd.startsWith(latencyThreshold)) {
2861        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2862        continue;
2863      }
2864
2865      final String latency = "--latency";
2866      if (cmd.startsWith(latency)) {
2867        opts.reportLatency = true;
2868        continue;
2869      }
2870
2871      final String multiGet = "--multiGet=";
2872      if (cmd.startsWith(multiGet)) {
2873        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2874        continue;
2875      }
2876
2877      final String multiPut = "--multiPut=";
2878      if (cmd.startsWith(multiPut)) {
2879        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2880        continue;
2881      }
2882
2883      final String useTags = "--usetags=";
2884      if (cmd.startsWith(useTags)) {
2885        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2886        continue;
2887      }
2888
2889      final String noOfTags = "--numoftags=";
2890      if (cmd.startsWith(noOfTags)) {
2891        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2892        continue;
2893      }
2894
2895      final String replicas = "--replicas=";
2896      if (cmd.startsWith(replicas)) {
2897        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2898        continue;
2899      }
2900
2901      final String filterOutAll = "--filterAll";
2902      if (cmd.startsWith(filterOutAll)) {
2903        opts.filterAll = true;
2904        continue;
2905      }
2906
2907      final String size = "--size=";
2908      if (cmd.startsWith(size)) {
2909        opts.size = Float.parseFloat(cmd.substring(size.length()));
2910        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2911        continue;
2912      }
2913
2914      final String splitPolicy = "--splitPolicy=";
2915      if (cmd.startsWith(splitPolicy)) {
2916        opts.splitPolicy = cmd.substring(splitPolicy.length());
2917        continue;
2918      }
2919
2920      final String randomSleep = "--randomSleep=";
2921      if (cmd.startsWith(randomSleep)) {
2922        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2923        continue;
2924      }
2925
2926      final String measureAfter = "--measureAfter=";
2927      if (cmd.startsWith(measureAfter)) {
2928        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2929        continue;
2930      }
2931
2932      final String bloomFilter = "--bloomFilter=";
2933      if (cmd.startsWith(bloomFilter)) {
2934        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2935        continue;
2936      }
2937
2938      final String blockSize = "--blockSize=";
2939      if (cmd.startsWith(blockSize)) {
2940        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2941        continue;
2942      }
2943
2944      final String valueSize = "--valueSize=";
2945      if (cmd.startsWith(valueSize)) {
2946        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2947        continue;
2948      }
2949
2950      final String valueRandom = "--valueRandom";
2951      if (cmd.startsWith(valueRandom)) {
2952        opts.valueRandom = true;
2953        continue;
2954      }
2955
2956      final String valueZipf = "--valueZipf";
2957      if (cmd.startsWith(valueZipf)) {
2958        opts.valueZipf = true;
2959        continue;
2960      }
2961
2962      final String period = "--period=";
2963      if (cmd.startsWith(period)) {
2964        opts.period = Integer.parseInt(cmd.substring(period.length()));
2965        continue;
2966      }
2967
2968      final String addColumns = "--addColumns=";
2969      if (cmd.startsWith(addColumns)) {
2970        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2971        continue;
2972      }
2973
2974      final String inMemoryCompaction = "--inmemoryCompaction=";
2975      if (cmd.startsWith(inMemoryCompaction)) {
2976        opts.inMemoryCompaction =
2977          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2978        continue;
2979      }
2980
2981      final String columns = "--columns=";
2982      if (cmd.startsWith(columns)) {
2983        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2984        continue;
2985      }
2986
2987      final String families = "--families=";
2988      if (cmd.startsWith(families)) {
2989        opts.families = Integer.parseInt(cmd.substring(families.length()));
2990        continue;
2991      }
2992
2993      final String caching = "--caching=";
2994      if (cmd.startsWith(caching)) {
2995        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2996        continue;
2997      }
2998
2999      final String asyncPrefetch = "--asyncPrefetch";
3000      if (cmd.startsWith(asyncPrefetch)) {
3001        opts.asyncPrefetch = true;
3002        continue;
3003      }
3004
3005      final String cacheBlocks = "--cacheBlocks=";
3006      if (cmd.startsWith(cacheBlocks)) {
3007        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
3008        continue;
3009      }
3010
3011      final String scanReadType = "--scanReadType=";
3012      if (cmd.startsWith(scanReadType)) {
3013        opts.scanReadType =
3014          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
3015        continue;
3016      }
3017
3018      final String bufferSize = "--bufferSize=";
3019      if (cmd.startsWith(bufferSize)) {
3020        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
3021        continue;
3022      }
3023
3024      validateParsedOpts(opts);
3025
3026      if (isCommandClass(cmd)) {
3027        opts.cmdName = cmd;
3028        try {
3029          opts.numClientThreads = Integer.parseInt(args.remove());
3030        } catch (NoSuchElementException | NumberFormatException e) {
3031          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
3032        }
3033        opts = calculateRowsAndSize(opts);
3034        break;
3035      } else {
3036        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
3037      }
3038
3039      // Not matching any option or command.
3040      System.err.println("Error: Wrong option or command: " + cmd);
3041      args.add(cmd);
3042      break;
3043    }
3044    return opts;
3045  }
3046
3047  /**
3048   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3049   */
3050  private static void validateParsedOpts(TestOptions opts) {
3051
3052    if (!opts.autoFlush && opts.multiPut > 0) {
3053      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3054    }
3055
3056    if (opts.oneCon && opts.connCount > 1) {
3057      throw new IllegalArgumentException(
3058        "oneCon is set to true, " + "connCount should not bigger than 1");
3059    }
3060
3061    if (opts.valueZipf && opts.valueRandom) {
3062      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3063    }
3064  }
3065
3066  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3067    int rowsPerGB = getRowsPerGB(opts);
3068    if (
3069      (opts.getCmdName() != null
3070        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3071        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3072    ) {
3073      opts.totalRows = (int) opts.size * rowsPerGB;
3074    } else if (opts.size != DEFAULT_OPTS.size) {
3075      // total size in GB specified
3076      opts.totalRows = (int) opts.size * rowsPerGB;
3077      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3078    } else {
3079      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3080      opts.size = opts.totalRows / rowsPerGB;
3081    }
3082    return opts;
3083  }
3084
3085  static int getRowsPerGB(final TestOptions opts) {
3086    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3087      * opts.getColumns());
3088  }
3089
3090  @Override
3091  public int run(String[] args) throws Exception {
3092    // Process command-line args. TODO: Better cmd-line processing
3093    // (but hopefully something not as painful as cli options).
3094    int errCode = -1;
3095    if (args.length < 1) {
3096      printUsage();
3097      return errCode;
3098    }
3099
3100    try {
3101      LinkedList<String> argv = new LinkedList<>();
3102      argv.addAll(Arrays.asList(args));
3103      TestOptions opts = parseOpts(argv);
3104
3105      // args remaining, print help and exit
3106      if (!argv.isEmpty()) {
3107        errCode = 0;
3108        printUsage();
3109        return errCode;
3110      }
3111
3112      // must run at least 1 client
3113      if (opts.numClientThreads <= 0) {
3114        throw new IllegalArgumentException("Number of clients must be > 0");
3115      }
3116
3117      // cmdName should not be null, print help and exit
3118      if (opts.cmdName == null) {
3119        printUsage();
3120        return errCode;
3121      }
3122
3123      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3124      if (cmdClass != null) {
3125        runTest(cmdClass, opts);
3126        errCode = 0;
3127      }
3128
3129    } catch (Exception e) {
3130      e.printStackTrace();
3131    }
3132
3133    return errCode;
3134  }
3135
3136  private static boolean isCommandClass(String cmd) {
3137    return COMMANDS.containsKey(cmd);
3138  }
3139
3140  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3141    CmdDescriptor descriptor = COMMANDS.get(cmd);
3142    return descriptor != null ? descriptor.getCmdClass() : null;
3143  }
3144
3145  public static void main(final String[] args) throws Exception {
3146    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3147    System.exit(res);
3148  }
3149}