001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Properties;
040import java.util.Queue;
041import java.util.Random;
042import java.util.TreeMap;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.ThreadLocalRandom;
049import java.util.stream.Collectors;
050import org.apache.commons.lang3.StringUtils;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.conf.Configured;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.client.Append;
057import org.apache.hadoop.hbase.client.AsyncConnection;
058import org.apache.hadoop.hbase.client.AsyncTable;
059import org.apache.hadoop.hbase.client.BufferedMutator;
060import org.apache.hadoop.hbase.client.BufferedMutatorParams;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.ConnectionFactory;
064import org.apache.hadoop.hbase.client.Consistency;
065import org.apache.hadoop.hbase.client.Delete;
066import org.apache.hadoop.hbase.client.Durability;
067import org.apache.hadoop.hbase.client.Get;
068import org.apache.hadoop.hbase.client.Increment;
069import org.apache.hadoop.hbase.client.Put;
070import org.apache.hadoop.hbase.client.RegionInfo;
071import org.apache.hadoop.hbase.client.RegionInfoBuilder;
072import org.apache.hadoop.hbase.client.RegionLocator;
073import org.apache.hadoop.hbase.client.Result;
074import org.apache.hadoop.hbase.client.ResultScanner;
075import org.apache.hadoop.hbase.client.RowMutations;
076import org.apache.hadoop.hbase.client.Scan;
077import org.apache.hadoop.hbase.client.Table;
078import org.apache.hadoop.hbase.client.TableDescriptor;
079import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
080import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
081import org.apache.hadoop.hbase.filter.BinaryComparator;
082import org.apache.hadoop.hbase.filter.Filter;
083import org.apache.hadoop.hbase.filter.FilterAllFilter;
084import org.apache.hadoop.hbase.filter.FilterList;
085import org.apache.hadoop.hbase.filter.PageFilter;
086import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
087import org.apache.hadoop.hbase.filter.WhileMatchFilter;
088import org.apache.hadoop.hbase.io.compress.Compression;
089import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
090import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
091import org.apache.hadoop.hbase.regionserver.BloomType;
092import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
093import org.apache.hadoop.hbase.trace.TraceUtil;
094import org.apache.hadoop.hbase.util.ByteArrayHashKey;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.GsonUtil;
098import org.apache.hadoop.hbase.util.Hash;
099import org.apache.hadoop.hbase.util.MurmurHash;
100import org.apache.hadoop.hbase.util.Pair;
101import org.apache.hadoop.hbase.util.RandomDistribution;
102import org.apache.hadoop.hbase.util.YammerHistogramUtils;
103import org.apache.hadoop.io.LongWritable;
104import org.apache.hadoop.io.Text;
105import org.apache.hadoop.mapreduce.Job;
106import org.apache.hadoop.mapreduce.Mapper;
107import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
108import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
109import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
110import org.apache.hadoop.util.Tool;
111import org.apache.hadoop.util.ToolRunner;
112import org.apache.yetus.audience.InterfaceAudience;
113import org.slf4j.Logger;
114import org.slf4j.LoggerFactory;
115
116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
117import org.apache.hbase.thirdparty.com.google.gson.Gson;
118
119/**
120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
122 * etc.). Pass on the command-line which test to run and how many clients are participating in this
123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
124 * <p>
125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
127 * pages 8-10.
128 * <p>
129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
131 * about 1GB of data, unless specified otherwise.
132 */
133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
134public class PerformanceEvaluation extends Configured implements Tool {
135  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
136  static final String RANDOM_READ = "randomRead";
137  static final String PE_COMMAND_SHORTNAME = "pe";
138  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
139  private static final Gson GSON = GsonUtil.createGson().create();
140
141  public static final String TABLE_NAME = "TestTable";
142  public static final String FAMILY_NAME_BASE = "info";
143  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
144  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
145  public static final int DEFAULT_VALUE_LENGTH = 1000;
146  public static final int ROW_LENGTH = 26;
147
148  private static final int ONE_GB = 1024 * 1024 * 1000;
149  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
150  // TODO : should we make this configurable
151  private static final int TAG_LENGTH = 256;
152  private static final DecimalFormat FMT = new DecimalFormat("0.##");
153  private static final MathContext CXT = MathContext.DECIMAL64;
154  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
155  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
156  private static final TestOptions DEFAULT_OPTS = new TestOptions();
157
158  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
159  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
160
161  static {
162    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
163      "Run async random read test");
164    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
165      "Run async random write test");
166    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
167      "Run async sequential read test");
168    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
169      "Run async sequential write test");
170    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
171    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
172    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
173    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
174      "Run random seek and scan 100 test");
175    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
176      "Run random seek scan with both start and stop row (max 10 rows)");
177    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
178      "Run random seek scan with both start and stop row (max 100 rows)");
179    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
180      "Run random seek scan with both start and stop row (max 1000 rows)");
181    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
182      "Run random seek scan with both start and stop row (max 10000 rows)");
183    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
184    addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
185    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
186    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
187    addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
188      "Run sequential delete test");
189    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
190      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
191    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
192    addCommandDescriptor(ReverseScanTest.class, "reverseScan",
193      "Run reverse scan test (read every row)");
194    addCommandDescriptor(FilteredScanTest.class, "filterScan",
195      "Run scan test using a filter to find a specific row based on it's value "
196        + "(make sure to use --rows=20)");
197    addCommandDescriptor(IncrementTest.class, "increment",
198      "Increment on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(AppendTest.class, "append",
200      "Append on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
202      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
204      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
205    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
206      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
207    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
208      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
209  }
210
211  /**
212   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
213   * associated properties.
214   */
215  protected static enum Counter {
216    /** elapsed time */
217    ELAPSED_TIME,
218    /** number of rows */
219    ROWS
220  }
221
222  protected static class RunResult implements Comparable<RunResult> {
223    public RunResult(long duration, Histogram hist) {
224      this.duration = duration;
225      this.hist = hist;
226      numbOfReplyOverThreshold = 0;
227      numOfReplyFromReplica = 0;
228    }
229
230    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
231      Histogram hist) {
232      this.duration = duration;
233      this.hist = hist;
234      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
235      this.numOfReplyFromReplica = numOfReplyFromReplica;
236    }
237
238    public final long duration;
239    public final Histogram hist;
240    public final long numbOfReplyOverThreshold;
241    public final long numOfReplyFromReplica;
242
243    @Override
244    public String toString() {
245      return Long.toString(duration);
246    }
247
248    @Override
249    public int compareTo(RunResult o) {
250      return Long.compare(this.duration, o.duration);
251    }
252
253    @Override
254    public boolean equals(Object obj) {
255      if (this == obj) {
256        return true;
257      }
258      if (obj == null || getClass() != obj.getClass()) {
259        return false;
260      }
261      return this.compareTo((RunResult) obj) == 0;
262    }
263
264    @Override
265    public int hashCode() {
266      return Long.hashCode(duration);
267    }
268  }
269
270  /**
271   * Constructor
272   * @param conf Configuration object
273   */
274  public PerformanceEvaluation(final Configuration conf) {
275    super(conf);
276  }
277
278  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
279    String description) {
280    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
281    COMMANDS.put(name, cmdDescriptor);
282  }
283
284  /**
285   * Implementations can have their status set.
286   */
287  interface Status {
288    /**
289     * Sets status
290     * @param msg status message
291     */
292    void setStatus(final String msg) throws IOException;
293  }
294
295  /**
296   * MapReduce job that runs a performance evaluation client in each map task.
297   */
298  public static class EvaluationMapTask
299    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
300
301    /** configuration parameter name that contains the command */
302    public final static String CMD_KEY = "EvaluationMapTask.command";
303    /** configuration parameter name that contains the PE impl */
304    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
305
306    private Class<? extends Test> cmd;
307
308    @Override
309    protected void setup(Context context) throws IOException, InterruptedException {
310      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
311
312      // this is required so that extensions of PE are instantiated within the
313      // map reduce task...
314      Class<? extends PerformanceEvaluation> peClass =
315        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
316      try {
317        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
318      } catch (Exception e) {
319        throw new IllegalStateException("Could not instantiate PE instance", e);
320      }
321    }
322
323    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
324      try {
325        return Class.forName(className).asSubclass(type);
326      } catch (ClassNotFoundException e) {
327        throw new IllegalStateException("Could not find class for name: " + className, e);
328      }
329    }
330
331    @Override
332    protected void map(LongWritable key, Text value, final Context context)
333      throws IOException, InterruptedException {
334
335      Status status = new Status() {
336        @Override
337        public void setStatus(String msg) {
338          context.setStatus(msg);
339        }
340      };
341
342      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
343      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
344      final Connection con = ConnectionFactory.createConnection(conf);
345      AsyncConnection asyncCon = null;
346      try {
347        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
348      } catch (ExecutionException e) {
349        throw new IOException(e);
350      }
351
352      // Evaluation task
353      RunResult result =
354        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
355      // Collect how much time the thing took. Report as map output and
356      // to the ELAPSED_TIME counter.
357      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
358      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
359      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
360      context.progress();
361    }
362  }
363
364  /*
365   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
366   * is specified or when the existing table's region replica count doesn't match {@code
367   * opts.replicas}.
368   */
369  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
370    final TableName tableName = TableName.valueOf(opts.tableName);
371    final boolean exists = admin.tableExists(tableName);
372    final boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
373      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
374    final boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
375    final boolean needsData = isReadCmd || isDeleteCmd;
376    if (!exists && needsData) {
377      throw new IllegalStateException(
378        "Must specify an existing table for read/delete commands. Run a write command first.");
379    }
380    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
381    final byte[][] splits = getSplits(opts);
382
383    // recreate the table when user has requested presplit or when existing
384    // {RegionSplitPolicy,replica count} does not match requested, or when the
385    // number of column families does not match requested.
386    final boolean regionCountChanged =
387      exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions
388        && opts.presplitRegions != admin.getRegions(tableName).size();
389    final boolean splitPolicyChanged =
390      exists && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy);
391    final boolean regionReplicationChanged = exists && desc.getRegionReplication() != opts.replicas;
392    final boolean columnFamilyCountChanged = exists && desc.getColumnFamilyCount() != opts.families;
393
394    boolean needsDelete = regionCountChanged || splitPolicyChanged || regionReplicationChanged
395      || columnFamilyCountChanged;
396
397    if (needsDelete) {
398      final List<String> errors = new ArrayList<>();
399      if (columnFamilyCountChanged) {
400        final String error = String.format("--families=%d, but found %d column families",
401          opts.families, desc.getColumnFamilyCount());
402        if (needsData) {
403          // We can't proceed the test in this case
404          throw new IllegalStateException(
405            "Cannot proceed the test. Run a write command first: " + error);
406        }
407        errors.add(error);
408      }
409      if (regionCountChanged) {
410        errors.add(String.format("--presplit=%d, but found %d regions", opts.presplitRegions,
411          admin.getRegions(tableName).size()));
412      }
413      if (splitPolicyChanged) {
414        errors.add(String.format("--splitPolicy=%s, but current policy is %s", opts.splitPolicy,
415          desc.getRegionSplitPolicyClassName()));
416      }
417      if (regionReplicationChanged) {
418        errors.add(String.format("--replicas=%d, but found %d replicas", opts.replicas,
419          desc.getRegionReplication()));
420      }
421      final String reason =
422        errors.stream().map(s -> '[' + s + ']').collect(Collectors.joining(", "));
423
424      if (needsData) {
425        LOG.warn("Unexpected or incorrect options provided for {}. "
426          + "Please verify whether the detected inconsistencies are expected or ignorable: {}. "
427          + "The test will proceed, but the results may not be reliable.", opts.cmdName, reason);
428        needsDelete = false;
429      } else {
430        LOG.info("Table will be recreated: " + reason);
431      }
432    }
433
434    // remove an existing table
435    if (needsDelete) {
436      if (admin.isTableEnabled(tableName)) {
437        admin.disableTable(tableName);
438      }
439      admin.deleteTable(tableName);
440    }
441
442    // table creation is necessary
443    if (!exists || needsDelete) {
444      desc = getTableDescriptor(opts);
445      if (splits != null) {
446        if (LOG.isDebugEnabled()) {
447          for (int i = 0; i < splits.length; i++) {
448            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
449          }
450        }
451      }
452      if (splits != null) {
453        admin.createTable(desc, splits);
454      } else {
455        admin.createTable(desc);
456      }
457      LOG.info("Table " + desc + " created");
458    }
459    return admin.tableExists(tableName);
460  }
461
462  /**
463   * Create an HTableDescriptor from provided TestOptions.
464   */
465  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
466    TableDescriptorBuilder builder =
467      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
468
469    for (int family = 0; family < opts.families; family++) {
470      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
471      ColumnFamilyDescriptorBuilder cfBuilder =
472        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
473      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
474      cfBuilder.setCompressionType(opts.compression);
475      cfBuilder.setEncryptionType(opts.encryption);
476      cfBuilder.setBloomFilterType(opts.bloomType);
477      cfBuilder.setBlocksize(opts.blockSize);
478      if (opts.inMemoryCF) {
479        cfBuilder.setInMemory(true);
480      }
481      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
482      builder.setColumnFamily(cfBuilder.build());
483    }
484    if (opts.replicas != DEFAULT_OPTS.replicas) {
485      builder.setRegionReplication(opts.replicas);
486    }
487    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
488      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
489    }
490    return builder.build();
491  }
492
493  /**
494   * generates splits based on total number of rows and specified split regions
495   */
496  protected static byte[][] getSplits(TestOptions opts) {
497    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
498
499    int numSplitPoints = opts.presplitRegions - 1;
500    byte[][] splits = new byte[numSplitPoints][];
501    long jump = opts.totalRows / opts.presplitRegions;
502    for (int i = 0; i < numSplitPoints; i++) {
503      long rowkey = jump * (1 + i);
504      splits[i] = format(rowkey);
505    }
506    return splits;
507  }
508
509  static void setupConnectionCount(final TestOptions opts) {
510    if (opts.oneCon) {
511      opts.connCount = 1;
512    } else {
513      if (opts.connCount == -1) {
514        // set to thread number if connCount is not set
515        opts.connCount = opts.numClientThreads;
516      }
517    }
518  }
519
520  /*
521   * Run all clients in this vm each to its own thread.
522   */
523  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
524    throws IOException, InterruptedException, ExecutionException {
525    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
526    assert cmd != null;
527    @SuppressWarnings("unchecked")
528    Future<RunResult>[] threads = new Future[opts.numClientThreads];
529    RunResult[] results = new RunResult[opts.numClientThreads];
530    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
531      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
532    setupConnectionCount(opts);
533    final Connection[] cons = new Connection[opts.connCount];
534    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
535    for (int i = 0; i < opts.connCount; i++) {
536      cons[i] = ConnectionFactory.createConnection(conf);
537      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
538    }
539    LOG
540      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
541    for (int i = 0; i < threads.length; i++) {
542      final int index = i;
543      threads[i] = pool.submit(new Callable<RunResult>() {
544        @Override
545        public RunResult call() throws Exception {
546          TestOptions threadOpts = new TestOptions(opts);
547          final Connection con = cons[index % cons.length];
548          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
549          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
550          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
551            @Override
552            public void setStatus(final String msg) throws IOException {
553              LOG.info(msg);
554            }
555          });
556          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
557            + "ms over " + threadOpts.perClientRunRows + " rows");
558          if (opts.latencyThreshold > 0) {
559            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
560              + "(ms) is " + run.numbOfReplyOverThreshold);
561          }
562          return run;
563        }
564      });
565    }
566    pool.shutdown();
567
568    for (int i = 0; i < threads.length; i++) {
569      try {
570        results[i] = threads[i].get();
571      } catch (ExecutionException e) {
572        throw new IOException(e.getCause());
573      }
574    }
575    final String test = cmd.getSimpleName();
576    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
577    Arrays.sort(results);
578    long total = 0;
579    float avgLatency = 0;
580    float avgTPS = 0;
581    long replicaWins = 0;
582    for (RunResult result : results) {
583      total += result.duration;
584      avgLatency += result.hist.getSnapshot().getMean();
585      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
586      replicaWins += result.numOfReplyFromReplica;
587    }
588    avgTPS *= 1000; // ms to second
589    avgLatency = avgLatency / results.length;
590    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
591      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
592    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
593    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
594    if (opts.replicas > 1) {
595      LOG.info("[results from replica regions] " + replicaWins);
596    }
597
598    for (int i = 0; i < opts.connCount; i++) {
599      cons[i].close();
600      asyncCons[i].close();
601    }
602
603    return results;
604  }
605
606  /*
607   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
608   * out an input file with instruction per client regards which row they are to start on.
609   * @param cmd Command to run.
610   */
611  static Job doMapReduce(TestOptions opts, final Configuration conf)
612    throws IOException, InterruptedException, ClassNotFoundException {
613    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
614    assert cmd != null;
615    Path inputDir = writeInputFile(conf, opts);
616    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
617    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
618    Job job = Job.getInstance(conf);
619    job.setJarByClass(PerformanceEvaluation.class);
620    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
621
622    job.setInputFormatClass(NLineInputFormat.class);
623    NLineInputFormat.setInputPaths(job, inputDir);
624    // this is default, but be explicit about it just in case.
625    NLineInputFormat.setNumLinesPerSplit(job, 1);
626
627    job.setOutputKeyClass(LongWritable.class);
628    job.setOutputValueClass(LongWritable.class);
629
630    job.setMapperClass(EvaluationMapTask.class);
631    job.setReducerClass(LongSumReducer.class);
632
633    job.setNumReduceTasks(1);
634
635    job.setOutputFormatClass(TextOutputFormat.class);
636    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
637
638    TableMapReduceUtil.addDependencyJars(job);
639    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
640                                                                                            // metrics
641      Gson.class, // gson
642      FilterAllFilter.class // hbase-server tests jar
643    );
644
645    TableMapReduceUtil.initCredentials(job);
646
647    job.waitForCompletion(true);
648    return job;
649  }
650
651  /**
652   * Each client has one mapper to do the work, and client do the resulting count in a map task.
653   */
654
655  static String JOB_INPUT_FILENAME = "input.txt";
656
657  /*
658   * Write input file of offsets-per-client for the mapreduce job.
659   * @param c Configuration
660   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
661   */
662  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
663    return writeInputFile(c, opts, new Path("."));
664  }
665
666  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
667    throws IOException {
668    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
669    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
670    Path inputDir = new Path(jobdir, "inputs");
671
672    FileSystem fs = FileSystem.get(c);
673    fs.mkdirs(inputDir);
674
675    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
676    PrintStream out = new PrintStream(fs.create(inputFile));
677    // Make input random.
678    Map<Integer, String> m = new TreeMap<>();
679    Hash h = MurmurHash.getInstance();
680    long perClientRows = (opts.totalRows / opts.numClientThreads);
681    try {
682      for (int j = 0; j < opts.numClientThreads; j++) {
683        TestOptions next = new TestOptions(opts);
684        next.startRow = j * perClientRows;
685        next.perClientRunRows = perClientRows;
686        String s = GSON.toJson(next);
687        LOG.info("Client=" + j + ", input=" + s);
688        byte[] b = Bytes.toBytes(s);
689        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
690        m.put(hash, s);
691      }
692      for (Map.Entry<Integer, String> e : m.entrySet()) {
693        out.println(e.getValue());
694      }
695    } finally {
696      out.close();
697    }
698    return inputDir;
699  }
700
701  /**
702   * Describes a command.
703   */
704  static class CmdDescriptor {
705    private Class<? extends TestBase> cmdClass;
706    private String name;
707    private String description;
708
709    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
710      this.cmdClass = cmdClass;
711      this.name = name;
712      this.description = description;
713    }
714
715    public Class<? extends TestBase> getCmdClass() {
716      return cmdClass;
717    }
718
719    public String getName() {
720      return name;
721    }
722
723    public String getDescription() {
724      return description;
725    }
726  }
727
728  /**
729   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
730   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
731   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
732   * need to add to the clone constructor below copying your new option from the 'that' to the
733   * 'this'. Look for 'clone' below.
734   */
735  static class TestOptions {
736    String cmdName = null;
737    boolean nomapred = false;
738    boolean filterAll = false;
739    long startRow = 0;
740    float size = 1.0f;
741    long perClientRunRows = DEFAULT_ROWS_PER_GB;
742    int numClientThreads = 1;
743    long totalRows = DEFAULT_ROWS_PER_GB;
744    int measureAfter = 0;
745    float sampleRate = 1.0f;
746    /**
747     * @deprecated Useless after switching to OpenTelemetry
748     */
749    @Deprecated
750    double traceRate = 0.0;
751    String tableName = TABLE_NAME;
752    boolean flushCommits = true;
753    boolean writeToWAL = true;
754    boolean autoFlush = false;
755    boolean oneCon = false;
756    int connCount = -1; // wil decide the actual num later
757    boolean useTags = false;
758    int noOfTags = 1;
759    boolean reportLatency = false;
760    int multiGet = 0;
761    int multiPut = 0;
762    int randomSleep = 0;
763    boolean inMemoryCF = false;
764    int presplitRegions = 0;
765    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
766    String splitPolicy = null;
767    Compression.Algorithm compression = Compression.Algorithm.NONE;
768    String encryption = null;
769    BloomType bloomType = BloomType.ROW;
770    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
771    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
772    boolean valueRandom = false;
773    boolean valueZipf = false;
774    int valueSize = DEFAULT_VALUE_LENGTH;
775    long period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
776    int cycles = 1;
777    int columns = 1;
778    int families = 1;
779    int caching = 30;
780    int latencyThreshold = 0; // in millsecond
781    boolean addColumns = true;
782    MemoryCompactionPolicy inMemoryCompaction =
783      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
784    boolean asyncPrefetch = false;
785    boolean cacheBlocks = true;
786    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
787    long bufferSize = 2l * 1024l * 1024l;
788    Properties commandProperties;
789
790    public TestOptions() {
791    }
792
793    /**
794     * Clone constructor.
795     * @param that Object to copy from.
796     */
797    public TestOptions(TestOptions that) {
798      this.cmdName = that.cmdName;
799      this.cycles = that.cycles;
800      this.nomapred = that.nomapred;
801      this.startRow = that.startRow;
802      this.size = that.size;
803      this.perClientRunRows = that.perClientRunRows;
804      this.numClientThreads = that.numClientThreads;
805      this.totalRows = that.totalRows;
806      this.sampleRate = that.sampleRate;
807      this.traceRate = that.traceRate;
808      this.tableName = that.tableName;
809      this.flushCommits = that.flushCommits;
810      this.writeToWAL = that.writeToWAL;
811      this.autoFlush = that.autoFlush;
812      this.oneCon = that.oneCon;
813      this.connCount = that.connCount;
814      this.useTags = that.useTags;
815      this.noOfTags = that.noOfTags;
816      this.reportLatency = that.reportLatency;
817      this.latencyThreshold = that.latencyThreshold;
818      this.multiGet = that.multiGet;
819      this.multiPut = that.multiPut;
820      this.inMemoryCF = that.inMemoryCF;
821      this.presplitRegions = that.presplitRegions;
822      this.replicas = that.replicas;
823      this.splitPolicy = that.splitPolicy;
824      this.compression = that.compression;
825      this.encryption = that.encryption;
826      this.blockEncoding = that.blockEncoding;
827      this.filterAll = that.filterAll;
828      this.bloomType = that.bloomType;
829      this.blockSize = that.blockSize;
830      this.valueRandom = that.valueRandom;
831      this.valueZipf = that.valueZipf;
832      this.valueSize = that.valueSize;
833      this.period = that.period;
834      this.randomSleep = that.randomSleep;
835      this.measureAfter = that.measureAfter;
836      this.addColumns = that.addColumns;
837      this.columns = that.columns;
838      this.families = that.families;
839      this.caching = that.caching;
840      this.inMemoryCompaction = that.inMemoryCompaction;
841      this.asyncPrefetch = that.asyncPrefetch;
842      this.cacheBlocks = that.cacheBlocks;
843      this.scanReadType = that.scanReadType;
844      this.bufferSize = that.bufferSize;
845      this.commandProperties = that.commandProperties;
846    }
847
848    public Properties getCommandProperties() {
849      return commandProperties;
850    }
851
852    public int getCaching() {
853      return this.caching;
854    }
855
856    public void setCaching(final int caching) {
857      this.caching = caching;
858    }
859
860    public int getColumns() {
861      return this.columns;
862    }
863
864    public void setColumns(final int columns) {
865      this.columns = columns;
866    }
867
868    public int getFamilies() {
869      return this.families;
870    }
871
872    public void setFamilies(final int families) {
873      this.families = families;
874    }
875
876    public int getCycles() {
877      return this.cycles;
878    }
879
880    public void setCycles(final int cycles) {
881      this.cycles = cycles;
882    }
883
884    public boolean isValueZipf() {
885      return valueZipf;
886    }
887
888    public void setValueZipf(boolean valueZipf) {
889      this.valueZipf = valueZipf;
890    }
891
892    public String getCmdName() {
893      return cmdName;
894    }
895
896    public void setCmdName(String cmdName) {
897      this.cmdName = cmdName;
898    }
899
900    public int getRandomSleep() {
901      return randomSleep;
902    }
903
904    public void setRandomSleep(int randomSleep) {
905      this.randomSleep = randomSleep;
906    }
907
908    public int getReplicas() {
909      return replicas;
910    }
911
912    public void setReplicas(int replicas) {
913      this.replicas = replicas;
914    }
915
916    public String getSplitPolicy() {
917      return splitPolicy;
918    }
919
920    public void setSplitPolicy(String splitPolicy) {
921      this.splitPolicy = splitPolicy;
922    }
923
924    public void setNomapred(boolean nomapred) {
925      this.nomapred = nomapred;
926    }
927
928    public void setFilterAll(boolean filterAll) {
929      this.filterAll = filterAll;
930    }
931
932    public void setStartRow(long startRow) {
933      this.startRow = startRow;
934    }
935
936    public void setSize(float size) {
937      this.size = size;
938    }
939
940    public void setPerClientRunRows(int perClientRunRows) {
941      this.perClientRunRows = perClientRunRows;
942    }
943
944    public void setNumClientThreads(int numClientThreads) {
945      this.numClientThreads = numClientThreads;
946    }
947
948    public void setTotalRows(long totalRows) {
949      this.totalRows = totalRows;
950    }
951
952    public void setSampleRate(float sampleRate) {
953      this.sampleRate = sampleRate;
954    }
955
956    public void setTraceRate(double traceRate) {
957      this.traceRate = traceRate;
958    }
959
960    public void setTableName(String tableName) {
961      this.tableName = tableName;
962    }
963
964    public void setFlushCommits(boolean flushCommits) {
965      this.flushCommits = flushCommits;
966    }
967
968    public void setWriteToWAL(boolean writeToWAL) {
969      this.writeToWAL = writeToWAL;
970    }
971
972    public void setAutoFlush(boolean autoFlush) {
973      this.autoFlush = autoFlush;
974    }
975
976    public void setOneCon(boolean oneCon) {
977      this.oneCon = oneCon;
978    }
979
980    public int getConnCount() {
981      return connCount;
982    }
983
984    public void setConnCount(int connCount) {
985      this.connCount = connCount;
986    }
987
988    public void setUseTags(boolean useTags) {
989      this.useTags = useTags;
990    }
991
992    public void setNoOfTags(int noOfTags) {
993      this.noOfTags = noOfTags;
994    }
995
996    public void setReportLatency(boolean reportLatency) {
997      this.reportLatency = reportLatency;
998    }
999
1000    public void setMultiGet(int multiGet) {
1001      this.multiGet = multiGet;
1002    }
1003
1004    public void setMultiPut(int multiPut) {
1005      this.multiPut = multiPut;
1006    }
1007
1008    public void setInMemoryCF(boolean inMemoryCF) {
1009      this.inMemoryCF = inMemoryCF;
1010    }
1011
1012    public void setPresplitRegions(int presplitRegions) {
1013      this.presplitRegions = presplitRegions;
1014    }
1015
1016    public void setCompression(Compression.Algorithm compression) {
1017      this.compression = compression;
1018    }
1019
1020    public void setEncryption(String encryption) {
1021      this.encryption = encryption;
1022    }
1023
1024    public void setBloomType(BloomType bloomType) {
1025      this.bloomType = bloomType;
1026    }
1027
1028    public void setBlockSize(int blockSize) {
1029      this.blockSize = blockSize;
1030    }
1031
1032    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
1033      this.blockEncoding = blockEncoding;
1034    }
1035
1036    public void setValueRandom(boolean valueRandom) {
1037      this.valueRandom = valueRandom;
1038    }
1039
1040    public void setValueSize(int valueSize) {
1041      this.valueSize = valueSize;
1042    }
1043
1044    public void setBufferSize(long bufferSize) {
1045      this.bufferSize = bufferSize;
1046    }
1047
1048    public void setPeriod(int period) {
1049      this.period = period;
1050    }
1051
1052    public boolean isNomapred() {
1053      return nomapred;
1054    }
1055
1056    public boolean isFilterAll() {
1057      return filterAll;
1058    }
1059
1060    public long getStartRow() {
1061      return startRow;
1062    }
1063
1064    public float getSize() {
1065      return size;
1066    }
1067
1068    public long getPerClientRunRows() {
1069      return perClientRunRows;
1070    }
1071
1072    public int getNumClientThreads() {
1073      return numClientThreads;
1074    }
1075
1076    public long getTotalRows() {
1077      return totalRows;
1078    }
1079
1080    public float getSampleRate() {
1081      return sampleRate;
1082    }
1083
1084    public double getTraceRate() {
1085      return traceRate;
1086    }
1087
1088    public String getTableName() {
1089      return tableName;
1090    }
1091
1092    public boolean isFlushCommits() {
1093      return flushCommits;
1094    }
1095
1096    public boolean isWriteToWAL() {
1097      return writeToWAL;
1098    }
1099
1100    public boolean isAutoFlush() {
1101      return autoFlush;
1102    }
1103
1104    public boolean isUseTags() {
1105      return useTags;
1106    }
1107
1108    public int getNoOfTags() {
1109      return noOfTags;
1110    }
1111
1112    public boolean isReportLatency() {
1113      return reportLatency;
1114    }
1115
1116    public int getMultiGet() {
1117      return multiGet;
1118    }
1119
1120    public int getMultiPut() {
1121      return multiPut;
1122    }
1123
1124    public boolean isInMemoryCF() {
1125      return inMemoryCF;
1126    }
1127
1128    public int getPresplitRegions() {
1129      return presplitRegions;
1130    }
1131
1132    public Compression.Algorithm getCompression() {
1133      return compression;
1134    }
1135
1136    public String getEncryption() {
1137      return encryption;
1138    }
1139
1140    public DataBlockEncoding getBlockEncoding() {
1141      return blockEncoding;
1142    }
1143
1144    public boolean isValueRandom() {
1145      return valueRandom;
1146    }
1147
1148    public int getValueSize() {
1149      return valueSize;
1150    }
1151
1152    public long getPeriod() {
1153      return period;
1154    }
1155
1156    public BloomType getBloomType() {
1157      return bloomType;
1158    }
1159
1160    public int getBlockSize() {
1161      return blockSize;
1162    }
1163
1164    public boolean isOneCon() {
1165      return oneCon;
1166    }
1167
1168    public int getMeasureAfter() {
1169      return measureAfter;
1170    }
1171
1172    public void setMeasureAfter(int measureAfter) {
1173      this.measureAfter = measureAfter;
1174    }
1175
1176    public boolean getAddColumns() {
1177      return addColumns;
1178    }
1179
1180    public void setAddColumns(boolean addColumns) {
1181      this.addColumns = addColumns;
1182    }
1183
1184    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1185      this.inMemoryCompaction = inMemoryCompaction;
1186    }
1187
1188    public MemoryCompactionPolicy getInMemoryCompaction() {
1189      return this.inMemoryCompaction;
1190    }
1191
1192    public long getBufferSize() {
1193      return this.bufferSize;
1194    }
1195  }
1196
1197  /*
1198   * A test. Subclass to particularize what happens per row.
1199   */
1200  static abstract class TestBase {
1201    private final long everyN;
1202
1203    protected final Configuration conf;
1204    protected final TestOptions opts;
1205
1206    protected final Status status;
1207
1208    private String testName;
1209    protected Histogram latencyHistogram;
1210    private Histogram replicaLatencyHistogram;
1211    private Histogram valueSizeHistogram;
1212    private Histogram rpcCallsHistogram;
1213    private Histogram remoteRpcCallsHistogram;
1214    private Histogram millisBetweenNextHistogram;
1215    private Histogram regionsScannedHistogram;
1216    private Histogram bytesInResultsHistogram;
1217    private Histogram bytesInRemoteResultsHistogram;
1218    private RandomDistribution.Zipf zipf;
1219    private long numOfReplyOverLatencyThreshold = 0;
1220    private long numOfReplyFromReplica = 0;
1221
1222    /**
1223     * Note that all subclasses of this class must provide a public constructor that has the exact
1224     * same list of arguments.
1225     */
1226    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1227      this.conf = conf;
1228      this.opts = options;
1229      this.status = status;
1230      this.testName = this.getClass().getSimpleName();
1231      everyN = (long) (1 / opts.sampleRate);
1232      if (options.isValueZipf()) {
1233        this.zipf =
1234          new RandomDistribution.Zipf(ThreadLocalRandom.current(), 1, options.getValueSize(), 1.2);
1235      }
1236      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1237    }
1238
1239    int getValueLength() {
1240      if (this.opts.isValueRandom()) {
1241        return ThreadLocalRandom.current().nextInt(opts.valueSize);
1242      } else if (this.opts.isValueZipf()) {
1243        return Math.abs(this.zipf.nextInt());
1244      } else {
1245        return opts.valueSize;
1246      }
1247    }
1248
1249    void updateValueSize(final Result[] rs) throws IOException {
1250      updateValueSize(rs, 0);
1251    }
1252
1253    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1254      if (rs == null || (latency == 0)) return;
1255      for (Result r : rs)
1256        updateValueSize(r, latency);
1257    }
1258
1259    void updateValueSize(final Result r) throws IOException {
1260      updateValueSize(r, 0);
1261    }
1262
1263    void updateValueSize(final Result r, final long latency) throws IOException {
1264      if (r == null || (latency == 0)) return;
1265      int size = 0;
1266      // update replicaHistogram
1267      if (r.isStale()) {
1268        replicaLatencyHistogram.update(latency / 1000);
1269        numOfReplyFromReplica++;
1270      }
1271      if (!isRandomValueSize()) return;
1272
1273      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1274        size += scanner.current().getValueLength();
1275      }
1276      updateValueSize(size);
1277    }
1278
1279    void updateValueSize(final int valueSize) {
1280      if (!isRandomValueSize()) return;
1281      this.valueSizeHistogram.update(valueSize);
1282    }
1283
1284    void updateScanMetrics(final ScanMetrics metrics) {
1285      if (metrics == null) return;
1286      Map<String, Long> metricsMap = metrics.getMetricsMap();
1287      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1288      if (rpcCalls != null) {
1289        this.rpcCallsHistogram.update(rpcCalls.longValue());
1290      }
1291      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1292      if (remoteRpcCalls != null) {
1293        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1294      }
1295      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1296      if (millisBetweenNext != null) {
1297        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1298      }
1299      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1300      if (regionsScanned != null) {
1301        this.regionsScannedHistogram.update(regionsScanned.longValue());
1302      }
1303      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1304      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1305        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1306      }
1307      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1308      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1309        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1310      }
1311    }
1312
1313    String generateStatus(final long sr, final long i, final long lr) {
1314      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1315        + getShortLatencyReport() + "]"
1316        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1317    }
1318
1319    boolean isRandomValueSize() {
1320      return opts.valueRandom;
1321    }
1322
1323    protected long getReportingPeriod() {
1324      return opts.period;
1325    }
1326
1327    /**
1328     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1329     */
1330    public Histogram getLatencyHistogram() {
1331      return latencyHistogram;
1332    }
1333
1334    void testSetup() throws IOException {
1335      // test metrics
1336      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1337      // If it is a replica test, set up histogram for replica.
1338      if (opts.replicas > 1) {
1339        replicaLatencyHistogram =
1340          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1341      }
1342      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1343      // scan metrics
1344      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1345      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1346      millisBetweenNextHistogram =
1347        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1348      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1349      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1350      bytesInRemoteResultsHistogram =
1351        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1352
1353      onStartup();
1354    }
1355
1356    abstract void onStartup() throws IOException;
1357
1358    void testTakedown() throws IOException {
1359      onTakedown();
1360      // Print all stats for this thread continuously.
1361      // Synchronize on Test.class so different threads don't intermingle the
1362      // output. We can't use 'this' here because each thread has its own instance of Test class.
1363      synchronized (Test.class) {
1364        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1365        status
1366          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1367        if (opts.replicas > 1) {
1368          status.setStatus("Latency (us) from Replica Regions: "
1369            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1370        }
1371        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1372        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1373        if (valueSizeHistogram.getCount() > 0) {
1374          status.setStatus(
1375            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1376          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1377          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1378        } else {
1379          status.setStatus("No valueSize statistics available");
1380        }
1381        if (rpcCallsHistogram.getCount() > 0) {
1382          status.setStatus(
1383            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1384        }
1385        if (remoteRpcCallsHistogram.getCount() > 0) {
1386          status.setStatus("remoteRpcCalls (count): "
1387            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1388        }
1389        if (millisBetweenNextHistogram.getCount() > 0) {
1390          status.setStatus("millisBetweenNext (latency): "
1391            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1392        }
1393        if (regionsScannedHistogram.getCount() > 0) {
1394          status.setStatus("regionsScanned (count): "
1395            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1396        }
1397        if (bytesInResultsHistogram.getCount() > 0) {
1398          status.setStatus("bytesInResults (size): "
1399            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1400        }
1401        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1402          status.setStatus("bytesInRemoteResults (size): "
1403            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1404        }
1405      }
1406    }
1407
1408    abstract void onTakedown() throws IOException;
1409
1410    /*
1411     * Run test
1412     * @return Elapsed time.
1413     */
1414    long test() throws IOException, InterruptedException {
1415      testSetup();
1416      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1417      final long startTime = System.nanoTime();
1418      try {
1419        testTimed();
1420      } finally {
1421        testTakedown();
1422      }
1423      return (System.nanoTime() - startTime) / 1000000;
1424    }
1425
1426    long getStartRow() {
1427      return opts.startRow;
1428    }
1429
1430    long getLastRow() {
1431      return getStartRow() + opts.perClientRunRows;
1432    }
1433
1434    /**
1435     * Provides an extension point for tests that don't want a per row invocation.
1436     */
1437    void testTimed() throws IOException, InterruptedException {
1438      long startRow = getStartRow();
1439      long lastRow = getLastRow();
1440      // Report on completion of 1/10th of total.
1441      for (int ii = 0; ii < opts.cycles; ii++) {
1442        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1443        for (long i = startRow; i < lastRow; i++) {
1444          if (i % everyN != 0) continue;
1445          long startTime = System.nanoTime();
1446          boolean requestSent = false;
1447          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1448          try (Scope scope = span.makeCurrent()) {
1449            requestSent = testRow(i, startTime);
1450          } finally {
1451            span.end();
1452          }
1453          if ((i - startRow) > opts.measureAfter) {
1454            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1455            // first 9 times and sends the actual get request in the 10th iteration.
1456            // We should only set latency when actual request is sent because otherwise
1457            // it turns out to be 0.
1458            if (requestSent) {
1459              long latency = (System.nanoTime() - startTime) / 1000;
1460              latencyHistogram.update(latency);
1461              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1462                numOfReplyOverLatencyThreshold++;
1463              }
1464            }
1465            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1466              status.setStatus(generateStatus(startRow, i, lastRow));
1467            }
1468          }
1469        }
1470      }
1471    }
1472
1473    /** Returns Subset of the histograms' calculation. */
1474    public String getShortLatencyReport() {
1475      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1476    }
1477
1478    /** Returns Subset of the histograms' calculation. */
1479    public String getShortValueSizeReport() {
1480      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1481    }
1482
1483    /**
1484     * Test for individual row.
1485     * @param i Row index.
1486     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1487     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1488     */
1489    abstract boolean testRow(final long i, final long startTime)
1490      throws IOException, InterruptedException;
1491  }
1492
1493  static abstract class Test extends TestBase {
1494    protected Connection connection;
1495
1496    Test(final Connection con, final TestOptions options, final Status status) {
1497      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1498      this.connection = con;
1499    }
1500  }
1501
1502  static abstract class AsyncTest extends TestBase {
1503    protected AsyncConnection connection;
1504
1505    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1506      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1507      this.connection = con;
1508    }
1509  }
1510
1511  static abstract class TableTest extends Test {
1512    protected Table table;
1513
1514    TableTest(Connection con, TestOptions options, Status status) {
1515      super(con, options, status);
1516    }
1517
1518    @Override
1519    void onStartup() throws IOException {
1520      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1521    }
1522
1523    @Override
1524    void onTakedown() throws IOException {
1525      table.close();
1526    }
1527  }
1528
1529  /*
1530   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1531   */
1532  static abstract class MetaTest extends TableTest {
1533    protected int keyLength;
1534
1535    MetaTest(Connection con, TestOptions options, Status status) {
1536      super(con, options, status);
1537      keyLength = Long.toString(opts.perClientRunRows).length();
1538    }
1539
1540    @Override
1541    void onTakedown() throws IOException {
1542      // No clean up
1543    }
1544
1545    /*
1546     * Generates Lexicographically ascending strings
1547     */
1548    protected byte[] getSplitKey(final long i) {
1549      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1550    }
1551
1552  }
1553
1554  static abstract class AsyncTableTest extends AsyncTest {
1555    protected AsyncTable<?> table;
1556
1557    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1558      super(con, options, status);
1559    }
1560
1561    @Override
1562    void onStartup() throws IOException {
1563      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1564    }
1565
1566    @Override
1567    void onTakedown() throws IOException {
1568    }
1569  }
1570
1571  static class AsyncRandomReadTest extends AsyncTableTest {
1572    private final Consistency consistency;
1573    private ArrayList<Get> gets;
1574
1575    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1576      super(con, options, status);
1577      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1578      if (opts.multiGet > 0) {
1579        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1580        this.gets = new ArrayList<>(opts.multiGet);
1581      }
1582    }
1583
1584    @Override
1585    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1586      if (opts.randomSleep > 0) {
1587        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1588      }
1589      Get get = new Get(getRandomRow(opts.totalRows));
1590      for (int family = 0; family < opts.families; family++) {
1591        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1592        if (opts.addColumns) {
1593          for (int column = 0; column < opts.columns; column++) {
1594            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1595            get.addColumn(familyName, qualifier);
1596          }
1597        } else {
1598          get.addFamily(familyName);
1599        }
1600      }
1601      if (opts.filterAll) {
1602        get.setFilter(new FilterAllFilter());
1603      }
1604      get.setConsistency(consistency);
1605      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1606      try {
1607        if (opts.multiGet > 0) {
1608          this.gets.add(get);
1609          if (this.gets.size() == opts.multiGet) {
1610            Result[] rs =
1611              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1612            updateValueSize(rs);
1613            this.gets.clear();
1614          } else {
1615            return false;
1616          }
1617        } else {
1618          updateValueSize(this.table.get(get).get());
1619        }
1620      } catch (ExecutionException e) {
1621        throw new IOException(e);
1622      }
1623      return true;
1624    }
1625
1626    public static RuntimeException runtime(Throwable e) {
1627      if (e instanceof RuntimeException) {
1628        return (RuntimeException) e;
1629      }
1630      return new RuntimeException(e);
1631    }
1632
1633    public static <V> V propagate(Callable<V> callable) {
1634      try {
1635        return callable.call();
1636      } catch (Exception e) {
1637        throw runtime(e);
1638      }
1639    }
1640
1641    @Override
1642    protected long getReportingPeriod() {
1643      long period = opts.perClientRunRows / 10;
1644      return period == 0 ? opts.perClientRunRows : period;
1645    }
1646
1647    @Override
1648    protected void testTakedown() throws IOException {
1649      if (this.gets != null && this.gets.size() > 0) {
1650        this.table.get(gets);
1651        this.gets.clear();
1652      }
1653      super.testTakedown();
1654    }
1655  }
1656
1657  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1658
1659    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1660      super(con, options, status);
1661    }
1662
1663    @Override
1664    protected byte[] generateRow(final long i) {
1665      return getRandomRow(opts.totalRows);
1666    }
1667  }
1668
1669  static class AsyncScanTest extends AsyncTableTest {
1670    private ResultScanner testScanner;
1671    private AsyncTable<?> asyncTable;
1672
1673    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1674      super(con, options, status);
1675    }
1676
1677    @Override
1678    void onStartup() throws IOException {
1679      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1680        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1681    }
1682
1683    @Override
1684    void testTakedown() throws IOException {
1685      if (this.testScanner != null) {
1686        updateScanMetrics(this.testScanner.getScanMetrics());
1687        this.testScanner.close();
1688      }
1689      super.testTakedown();
1690    }
1691
1692    @Override
1693    boolean testRow(final long i, final long startTime) throws IOException {
1694      if (this.testScanner == null) {
1695        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1696          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1697          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1698        for (int family = 0; family < opts.families; family++) {
1699          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1700          if (opts.addColumns) {
1701            for (int column = 0; column < opts.columns; column++) {
1702              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1703              scan.addColumn(familyName, qualifier);
1704            }
1705          } else {
1706            scan.addFamily(familyName);
1707          }
1708        }
1709        if (opts.filterAll) {
1710          scan.setFilter(new FilterAllFilter());
1711        }
1712        this.testScanner = asyncTable.getScanner(scan);
1713      }
1714      Result r = testScanner.next();
1715      updateValueSize(r);
1716      return true;
1717    }
1718  }
1719
1720  static class AsyncSequentialReadTest extends AsyncTableTest {
1721    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1722      super(con, options, status);
1723    }
1724
1725    @Override
1726    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1727      Get get = new Get(format(i));
1728      for (int family = 0; family < opts.families; family++) {
1729        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1730        if (opts.addColumns) {
1731          for (int column = 0; column < opts.columns; column++) {
1732            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1733            get.addColumn(familyName, qualifier);
1734          }
1735        } else {
1736          get.addFamily(familyName);
1737        }
1738      }
1739      if (opts.filterAll) {
1740        get.setFilter(new FilterAllFilter());
1741      }
1742      try {
1743        updateValueSize(table.get(get).get());
1744      } catch (ExecutionException e) {
1745        throw new IOException(e);
1746      }
1747      return true;
1748    }
1749  }
1750
1751  static class AsyncSequentialWriteTest extends AsyncTableTest {
1752    private ArrayList<Put> puts;
1753
1754    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1755      super(con, options, status);
1756      if (opts.multiPut > 0) {
1757        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1758        this.puts = new ArrayList<>(opts.multiPut);
1759      }
1760    }
1761
1762    protected byte[] generateRow(final long i) {
1763      return format(i);
1764    }
1765
1766    @Override
1767    @SuppressWarnings("ReturnValueIgnored")
1768    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1769      byte[] row = generateRow(i);
1770      Put put = new Put(row);
1771      for (int family = 0; family < opts.families; family++) {
1772        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1773        for (int column = 0; column < opts.columns; column++) {
1774          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1775          byte[] value = generateData(getValueLength());
1776          if (opts.useTags) {
1777            byte[] tag = generateData(TAG_LENGTH);
1778            Tag[] tags = new Tag[opts.noOfTags];
1779            for (int n = 0; n < opts.noOfTags; n++) {
1780              Tag t = new ArrayBackedTag((byte) n, tag);
1781              tags[n] = t;
1782            }
1783            KeyValue kv =
1784              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1785            put.add(kv);
1786            updateValueSize(kv.getValueLength());
1787          } else {
1788            put.addColumn(familyName, qualifier, value);
1789            updateValueSize(value.length);
1790          }
1791        }
1792      }
1793      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1794      try {
1795        table.put(put).get();
1796        if (opts.multiPut > 0) {
1797          this.puts.add(put);
1798          if (this.puts.size() == opts.multiPut) {
1799            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1800            this.puts.clear();
1801          } else {
1802            return false;
1803          }
1804        } else {
1805          table.put(put).get();
1806        }
1807      } catch (ExecutionException e) {
1808        throw new IOException(e);
1809      }
1810      return true;
1811    }
1812  }
1813
1814  static abstract class BufferedMutatorTest extends Test {
1815    protected BufferedMutator mutator;
1816    protected Table table;
1817
1818    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1819      super(con, options, status);
1820    }
1821
1822    @Override
1823    void onStartup() throws IOException {
1824      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1825      p.writeBufferSize(opts.bufferSize);
1826      this.mutator = connection.getBufferedMutator(p);
1827      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1828    }
1829
1830    @Override
1831    void onTakedown() throws IOException {
1832      mutator.close();
1833      table.close();
1834    }
1835  }
1836
1837  static class RandomSeekScanTest extends TableTest {
1838    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1839      super(con, options, status);
1840    }
1841
1842    @Override
1843    boolean testRow(final long i, final long startTime) throws IOException {
1844      Scan scan = new Scan().withStartRow(getRandomRow(opts.totalRows)).setCaching(opts.caching)
1845        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1846        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1847      FilterList list = new FilterList();
1848      for (int family = 0; family < opts.families; family++) {
1849        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1850        if (opts.addColumns) {
1851          for (int column = 0; column < opts.columns; column++) {
1852            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1853            scan.addColumn(familyName, qualifier);
1854          }
1855        } else {
1856          scan.addFamily(familyName);
1857        }
1858      }
1859      if (opts.filterAll) {
1860        list.addFilter(new FilterAllFilter());
1861      }
1862      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1863      scan.setFilter(list);
1864      ResultScanner s = this.table.getScanner(scan);
1865      try {
1866        for (Result rr; (rr = s.next()) != null;) {
1867          updateValueSize(rr);
1868        }
1869      } finally {
1870        updateScanMetrics(s.getScanMetrics());
1871        s.close();
1872      }
1873      return true;
1874    }
1875
1876    @Override
1877    protected long getReportingPeriod() {
1878      long period = opts.perClientRunRows / 100;
1879      return period == 0 ? opts.perClientRunRows : period;
1880    }
1881
1882  }
1883
1884  static abstract class RandomScanWithRangeTest extends TableTest {
1885    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1886      super(con, options, status);
1887    }
1888
1889    @Override
1890    boolean testRow(final long i, final long startTime) throws IOException {
1891      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1892      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1893        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1894        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1895        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1896      for (int family = 0; family < opts.families; family++) {
1897        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1898        if (opts.addColumns) {
1899          for (int column = 0; column < opts.columns; column++) {
1900            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1901            scan.addColumn(familyName, qualifier);
1902          }
1903        } else {
1904          scan.addFamily(familyName);
1905        }
1906      }
1907      if (opts.filterAll) {
1908        scan.setFilter(new FilterAllFilter());
1909      }
1910      Result r = null;
1911      int count = 0;
1912      ResultScanner s = this.table.getScanner(scan);
1913      try {
1914        for (; (r = s.next()) != null;) {
1915          updateValueSize(r);
1916          count++;
1917        }
1918        if (i % 100 == 0) {
1919          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1920            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1921            count));
1922        }
1923      } finally {
1924        updateScanMetrics(s.getScanMetrics());
1925        s.close();
1926      }
1927      return true;
1928    }
1929
1930    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1931
1932    protected Pair<byte[], byte[]> generateStartAndStopRows(long maxRange) {
1933      long start = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % opts.totalRows;
1934      long stop = start + maxRange;
1935      return new Pair<>(format(start), format(stop));
1936    }
1937
1938    @Override
1939    protected long getReportingPeriod() {
1940      long period = opts.perClientRunRows / 100;
1941      return period == 0 ? opts.perClientRunRows : period;
1942    }
1943  }
1944
1945  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1946    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1947      super(con, options, status);
1948    }
1949
1950    @Override
1951    protected Pair<byte[], byte[]> getStartAndStopRow() {
1952      return generateStartAndStopRows(10);
1953    }
1954  }
1955
1956  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1957    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1958      super(con, options, status);
1959    }
1960
1961    @Override
1962    protected Pair<byte[], byte[]> getStartAndStopRow() {
1963      return generateStartAndStopRows(100);
1964    }
1965  }
1966
1967  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1968    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1969      super(con, options, status);
1970    }
1971
1972    @Override
1973    protected Pair<byte[], byte[]> getStartAndStopRow() {
1974      return generateStartAndStopRows(1000);
1975    }
1976  }
1977
1978  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1979    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1980      super(con, options, status);
1981    }
1982
1983    @Override
1984    protected Pair<byte[], byte[]> getStartAndStopRow() {
1985      return generateStartAndStopRows(10000);
1986    }
1987  }
1988
1989  static class RandomReadTest extends TableTest {
1990    private final Consistency consistency;
1991    private ArrayList<Get> gets;
1992
1993    RandomReadTest(Connection con, TestOptions options, Status status) {
1994      super(con, options, status);
1995      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1996      if (opts.multiGet > 0) {
1997        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1998        this.gets = new ArrayList<>(opts.multiGet);
1999      }
2000    }
2001
2002    @Override
2003    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
2004      if (opts.randomSleep > 0) {
2005        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
2006      }
2007      Get get = new Get(getRandomRow(opts.totalRows));
2008      for (int family = 0; family < opts.families; family++) {
2009        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2010        if (opts.addColumns) {
2011          for (int column = 0; column < opts.columns; column++) {
2012            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2013            get.addColumn(familyName, qualifier);
2014          }
2015        } else {
2016          get.addFamily(familyName);
2017        }
2018      }
2019      if (opts.filterAll) {
2020        get.setFilter(new FilterAllFilter());
2021      }
2022      get.setConsistency(consistency);
2023      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
2024      if (opts.multiGet > 0) {
2025        this.gets.add(get);
2026        if (this.gets.size() == opts.multiGet) {
2027          Result[] rs = this.table.get(this.gets);
2028          if (opts.replicas > 1) {
2029            long latency = System.nanoTime() - startTime;
2030            updateValueSize(rs, latency);
2031          } else {
2032            updateValueSize(rs);
2033          }
2034          this.gets.clear();
2035        } else {
2036          return false;
2037        }
2038      } else {
2039        if (opts.replicas > 1) {
2040          Result r = this.table.get(get);
2041          long latency = System.nanoTime() - startTime;
2042          updateValueSize(r, latency);
2043        } else {
2044          updateValueSize(this.table.get(get));
2045        }
2046      }
2047      return true;
2048    }
2049
2050    @Override
2051    protected long getReportingPeriod() {
2052      long period = opts.perClientRunRows / 10;
2053      return period == 0 ? opts.perClientRunRows : period;
2054    }
2055
2056    @Override
2057    protected void testTakedown() throws IOException {
2058      if (this.gets != null && this.gets.size() > 0) {
2059        this.table.get(gets);
2060        this.gets.clear();
2061      }
2062      super.testTakedown();
2063    }
2064  }
2065
2066  /*
2067   * Send random reads against fake regions inserted by MetaWriteTest
2068   */
2069  static class MetaRandomReadTest extends MetaTest {
2070    private RegionLocator regionLocator;
2071
2072    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2073      super(con, options, status);
2074      LOG.info("call getRegionLocation");
2075    }
2076
2077    @Override
2078    void onStartup() throws IOException {
2079      super.onStartup();
2080      this.regionLocator = connection.getRegionLocator(table.getName());
2081    }
2082
2083    @Override
2084    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
2085      if (opts.randomSleep > 0) {
2086        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
2087      }
2088      HRegionLocation hRegionLocation = regionLocator.getRegionLocation(
2089        getSplitKey(ThreadLocalRandom.current().nextLong(opts.perClientRunRows)), true);
2090      LOG.debug("get location for region: " + hRegionLocation);
2091      return true;
2092    }
2093
2094    @Override
2095    protected long getReportingPeriod() {
2096      long period = opts.perClientRunRows / 10;
2097      return period == 0 ? opts.perClientRunRows : period;
2098    }
2099
2100    @Override
2101    protected void testTakedown() throws IOException {
2102      super.testTakedown();
2103    }
2104  }
2105
2106  static class RandomWriteTest extends SequentialWriteTest {
2107    RandomWriteTest(Connection con, TestOptions options, Status status) {
2108      super(con, options, status);
2109    }
2110
2111    @Override
2112    protected byte[] generateRow(final long i) {
2113      return getRandomRow(opts.totalRows);
2114    }
2115
2116  }
2117
2118  static class RandomDeleteTest extends SequentialDeleteTest {
2119    RandomDeleteTest(Connection con, TestOptions options, Status status) {
2120      super(con, options, status);
2121    }
2122
2123    @Override
2124    protected byte[] generateRow(final long i) {
2125      return getRandomRow(opts.totalRows);
2126    }
2127
2128  }
2129
2130  static class ScanTest extends TableTest {
2131    private ResultScanner testScanner;
2132
2133    ScanTest(Connection con, TestOptions options, Status status) {
2134      super(con, options, status);
2135    }
2136
2137    @Override
2138    void testTakedown() throws IOException {
2139      if (this.testScanner != null) {
2140        this.testScanner.close();
2141      }
2142      super.testTakedown();
2143    }
2144
2145    @Override
2146    boolean testRow(final long i, final long startTime) throws IOException {
2147      if (this.testScanner == null) {
2148        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2149          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2150          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2151        for (int family = 0; family < opts.families; family++) {
2152          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2153          if (opts.addColumns) {
2154            for (int column = 0; column < opts.columns; column++) {
2155              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2156              scan.addColumn(familyName, qualifier);
2157            }
2158          } else {
2159            scan.addFamily(familyName);
2160          }
2161        }
2162        if (opts.filterAll) {
2163          scan.setFilter(new FilterAllFilter());
2164        }
2165        this.testScanner = table.getScanner(scan);
2166      }
2167      Result r = testScanner.next();
2168      updateValueSize(r);
2169      return true;
2170    }
2171  }
2172
2173  static class ReverseScanTest extends TableTest {
2174    private ResultScanner testScanner;
2175
2176    ReverseScanTest(Connection con, TestOptions options, Status status) {
2177      super(con, options, status);
2178    }
2179
2180    @Override
2181    void testTakedown() throws IOException {
2182      if (this.testScanner != null) {
2183        this.testScanner.close();
2184      }
2185      super.testTakedown();
2186    }
2187
2188    @Override
2189    boolean testRow(final long i, final long startTime) throws IOException {
2190      if (this.testScanner == null) {
2191        Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2192          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2193          .setScanMetricsEnabled(true).setReversed(true);
2194        for (int family = 0; family < opts.families; family++) {
2195          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2196          if (opts.addColumns) {
2197            for (int column = 0; column < opts.columns; column++) {
2198              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2199              scan.addColumn(familyName, qualifier);
2200            }
2201          } else {
2202            scan.addFamily(familyName);
2203          }
2204        }
2205        if (opts.filterAll) {
2206          scan.setFilter(new FilterAllFilter());
2207        }
2208        this.testScanner = table.getScanner(scan);
2209      }
2210      Result r = testScanner.next();
2211      updateValueSize(r);
2212      return true;
2213    }
2214  }
2215
2216  /**
2217   * Base class for operations that are CAS-like; that read a value and then set it based off what
2218   * they read. In this category is increment, append, checkAndPut, etc.
2219   * <p>
2220   * These operations also want some concurrency going on. Usually when these tests run, they
2221   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2222   * same key space. We do this with our getStartRow and getLastRow overrides.
2223   */
2224  static abstract class CASTableTest extends TableTest {
2225    private final byte[] qualifier;
2226
2227    CASTableTest(Connection con, TestOptions options, Status status) {
2228      super(con, options, status);
2229      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2230    }
2231
2232    byte[] getQualifier() {
2233      return this.qualifier;
2234    }
2235
2236    @Override
2237    long getStartRow() {
2238      return 0;
2239    }
2240
2241    @Override
2242    long getLastRow() {
2243      return opts.perClientRunRows;
2244    }
2245  }
2246
2247  static class IncrementTest extends CASTableTest {
2248    IncrementTest(Connection con, TestOptions options, Status status) {
2249      super(con, options, status);
2250    }
2251
2252    @Override
2253    boolean testRow(final long i, final long startTime) throws IOException {
2254      Increment increment = new Increment(format(i));
2255      // unlike checkAndXXX tests, which make most sense to do on a single value,
2256      // if multiple families are specified for an increment test we assume it is
2257      // meant to raise the work factor
2258      for (int family = 0; family < opts.families; family++) {
2259        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2260        increment.addColumn(familyName, getQualifier(), 1l);
2261      }
2262      updateValueSize(this.table.increment(increment));
2263      return true;
2264    }
2265  }
2266
2267  static class AppendTest extends CASTableTest {
2268    AppendTest(Connection con, TestOptions options, Status status) {
2269      super(con, options, status);
2270    }
2271
2272    @Override
2273    boolean testRow(final long i, final long startTime) throws IOException {
2274      byte[] bytes = format(i);
2275      Append append = new Append(bytes);
2276      // unlike checkAndXXX tests, which make most sense to do on a single value,
2277      // if multiple families are specified for an append test we assume it is
2278      // meant to raise the work factor
2279      for (int family = 0; family < opts.families; family++) {
2280        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2281        append.addColumn(familyName, getQualifier(), bytes);
2282      }
2283      updateValueSize(this.table.append(append));
2284      return true;
2285    }
2286  }
2287
2288  static class CheckAndMutateTest extends CASTableTest {
2289    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2290      super(con, options, status);
2291    }
2292
2293    @Override
2294    boolean testRow(final long i, final long startTime) throws IOException {
2295      final byte[] bytes = format(i);
2296      // checkAndXXX tests operate on only a single value
2297      // Put a known value so when we go to check it, it is there.
2298      Put put = new Put(bytes);
2299      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2300      this.table.put(put);
2301      RowMutations mutations = new RowMutations(bytes);
2302      mutations.add(put);
2303      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2304        .thenMutate(mutations);
2305      return true;
2306    }
2307  }
2308
2309  static class CheckAndPutTest extends CASTableTest {
2310    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2311      super(con, options, status);
2312    }
2313
2314    @Override
2315    boolean testRow(final long i, final long startTime) throws IOException {
2316      final byte[] bytes = format(i);
2317      // checkAndXXX tests operate on only a single value
2318      // Put a known value so when we go to check it, it is there.
2319      Put put = new Put(bytes);
2320      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2321      this.table.put(put);
2322      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2323        .thenPut(put);
2324      return true;
2325    }
2326  }
2327
2328  static class CheckAndDeleteTest extends CASTableTest {
2329    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2330      super(con, options, status);
2331    }
2332
2333    @Override
2334    boolean testRow(final long i, final long startTime) throws IOException {
2335      final byte[] bytes = format(i);
2336      // checkAndXXX tests operate on only a single value
2337      // Put a known value so when we go to check it, it is there.
2338      Put put = new Put(bytes);
2339      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2340      this.table.put(put);
2341      Delete delete = new Delete(put.getRow());
2342      delete.addColumn(FAMILY_ZERO, getQualifier());
2343      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2344        .thenDelete(delete);
2345      return true;
2346    }
2347  }
2348
2349  /*
2350   * Delete all fake regions inserted to meta table by MetaWriteTest.
2351   */
2352  static class CleanMetaTest extends MetaTest {
2353    CleanMetaTest(Connection con, TestOptions options, Status status) {
2354      super(con, options, status);
2355    }
2356
2357    @Override
2358    boolean testRow(final long i, final long startTime) throws IOException {
2359      try {
2360        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2361          .getRegionLocation(getSplitKey(i), false).getRegion();
2362        LOG.debug("deleting region from meta: " + regionInfo);
2363
2364        Delete delete =
2365          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2366        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2367          t.delete(delete);
2368        }
2369      } catch (IOException ie) {
2370        // Log and continue
2371        LOG.error("cannot find region with start key: " + i);
2372      }
2373      return true;
2374    }
2375  }
2376
2377  static class SequentialReadTest extends TableTest {
2378    SequentialReadTest(Connection con, TestOptions options, Status status) {
2379      super(con, options, status);
2380    }
2381
2382    @Override
2383    boolean testRow(final long i, final long startTime) throws IOException {
2384      Get get = new Get(format(i));
2385      for (int family = 0; family < opts.families; family++) {
2386        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2387        if (opts.addColumns) {
2388          for (int column = 0; column < opts.columns; column++) {
2389            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2390            get.addColumn(familyName, qualifier);
2391          }
2392        } else {
2393          get.addFamily(familyName);
2394        }
2395      }
2396      if (opts.filterAll) {
2397        get.setFilter(new FilterAllFilter());
2398      }
2399      updateValueSize(table.get(get));
2400      return true;
2401    }
2402  }
2403
2404  static class SequentialWriteTest extends BufferedMutatorTest {
2405    private ArrayList<Put> puts;
2406
2407    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2408      super(con, options, status);
2409      if (opts.multiPut > 0) {
2410        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2411        this.puts = new ArrayList<>(opts.multiPut);
2412      }
2413    }
2414
2415    protected byte[] generateRow(final long i) {
2416      return format(i);
2417    }
2418
2419    @Override
2420    boolean testRow(final long i, final long startTime) throws IOException {
2421      byte[] row = generateRow(i);
2422      Put put = new Put(row);
2423      for (int family = 0; family < opts.families; family++) {
2424        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2425        for (int column = 0; column < opts.columns; column++) {
2426          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2427          byte[] value = generateData(getValueLength());
2428          if (opts.useTags) {
2429            byte[] tag = generateData(TAG_LENGTH);
2430            Tag[] tags = new Tag[opts.noOfTags];
2431            for (int n = 0; n < opts.noOfTags; n++) {
2432              Tag t = new ArrayBackedTag((byte) n, tag);
2433              tags[n] = t;
2434            }
2435            KeyValue kv =
2436              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2437            put.add(kv);
2438            updateValueSize(kv.getValueLength());
2439          } else {
2440            put.addColumn(familyName, qualifier, value);
2441            updateValueSize(value.length);
2442          }
2443        }
2444      }
2445      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2446      if (opts.autoFlush) {
2447        if (opts.multiPut > 0) {
2448          this.puts.add(put);
2449          if (this.puts.size() == opts.multiPut) {
2450            table.put(this.puts);
2451            this.puts.clear();
2452          } else {
2453            return false;
2454          }
2455        } else {
2456          table.put(put);
2457        }
2458      } else {
2459        mutator.mutate(put);
2460      }
2461      return true;
2462    }
2463  }
2464
2465  static class SequentialDeleteTest extends BufferedMutatorTest {
2466
2467    SequentialDeleteTest(Connection con, TestOptions options, Status status) {
2468      super(con, options, status);
2469    }
2470
2471    protected byte[] generateRow(final long i) {
2472      return format(i);
2473    }
2474
2475    @Override
2476    boolean testRow(final long i, final long startTime) throws IOException {
2477      byte[] row = generateRow(i);
2478      Delete delete = new Delete(row);
2479      for (int family = 0; family < opts.families; family++) {
2480        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2481        delete.addFamily(familyName);
2482      }
2483      delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2484      if (opts.autoFlush) {
2485        table.delete(delete);
2486      } else {
2487        mutator.mutate(delete);
2488      }
2489      return true;
2490    }
2491  }
2492
2493  /*
2494   * Insert fake regions into meta table with contiguous split keys.
2495   */
2496  static class MetaWriteTest extends MetaTest {
2497
2498    MetaWriteTest(Connection con, TestOptions options, Status status) {
2499      super(con, options, status);
2500    }
2501
2502    @Override
2503    boolean testRow(final long i, final long startTime) throws IOException {
2504      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2505      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2506        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2507      regionInfos.add(regionInfo);
2508      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2509
2510      // write the serverName columns
2511      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2512        ServerName.valueOf("localhost", 60010, ThreadLocalRandom.current().nextLong()), i,
2513        EnvironmentEdgeManager.currentTime());
2514      return true;
2515    }
2516  }
2517
2518  static class FilteredScanTest extends TableTest {
2519    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2520
2521    FilteredScanTest(Connection con, TestOptions options, Status status) {
2522      super(con, options, status);
2523      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2524        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2525          + ". This could take a very long time.");
2526      }
2527    }
2528
2529    @Override
2530    boolean testRow(long i, final long startTime) throws IOException {
2531      byte[] value = generateData(getValueLength());
2532      Scan scan = constructScan(value);
2533      ResultScanner scanner = null;
2534      try {
2535        scanner = this.table.getScanner(scan);
2536        for (Result r = null; (r = scanner.next()) != null;) {
2537          updateValueSize(r);
2538        }
2539      } finally {
2540        if (scanner != null) {
2541          updateScanMetrics(scanner.getScanMetrics());
2542          scanner.close();
2543        }
2544      }
2545      return true;
2546    }
2547
2548    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2549      FilterList list = new FilterList();
2550      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2551        new BinaryComparator(valuePrefix));
2552      list.addFilter(filter);
2553      if (opts.filterAll) {
2554        list.addFilter(new FilterAllFilter());
2555      }
2556      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2557        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2558        .setScanMetricsEnabled(true);
2559      if (opts.addColumns) {
2560        for (int column = 0; column < opts.columns; column++) {
2561          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2562          scan.addColumn(FAMILY_ZERO, qualifier);
2563        }
2564      } else {
2565        scan.addFamily(FAMILY_ZERO);
2566      }
2567      scan.setFilter(list);
2568      return scan;
2569    }
2570  }
2571
2572  /**
2573   * Compute a throughput rate in MB/s.
2574   * @param rows   Number of records consumed.
2575   * @param timeMs Time taken in milliseconds.
2576   * @return String value with label, ie '123.76 MB/s'
2577   */
2578  private static String calculateMbps(long rows, long timeMs, final int valueSize, int families,
2579    int columns) {
2580    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2581      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2582    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2583      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2584    return FMT.format(mbps) + " MB/s";
2585  }
2586
2587  /*
2588   * Format passed integer.
2589   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2590   * absolute in case number is negative).
2591   */
2592  public static byte[] format(final long number) {
2593    byte[] b = new byte[ROW_LENGTH];
2594    long d = Math.abs(number);
2595    for (int i = b.length - 1; i >= 0; i--) {
2596      b[i] = (byte) ((d % 10) + '0');
2597      d /= 10;
2598    }
2599    return b;
2600  }
2601
2602  /*
2603   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2604   * test, generation of the key and value consumes about 30% of CPU time.
2605   * @return Generated random value to insert into a table cell.
2606   */
2607  public static byte[] generateData(int length) {
2608    byte[] b = new byte[length];
2609    int i;
2610
2611    Random r = ThreadLocalRandom.current();
2612    for (i = 0; i < (length - 8); i += 8) {
2613      b[i] = (byte) (65 + r.nextInt(26));
2614      b[i + 1] = b[i];
2615      b[i + 2] = b[i];
2616      b[i + 3] = b[i];
2617      b[i + 4] = b[i];
2618      b[i + 5] = b[i];
2619      b[i + 6] = b[i];
2620      b[i + 7] = b[i];
2621    }
2622
2623    byte a = (byte) (65 + r.nextInt(26));
2624    for (; i < length; i++) {
2625      b[i] = a;
2626    }
2627    return b;
2628  }
2629
2630  static byte[] getRandomRow(final long totalRows) {
2631    return format(generateRandomRow(totalRows));
2632  }
2633
2634  static long generateRandomRow(final long totalRows) {
2635    return ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % totalRows;
2636  }
2637
2638  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2639    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2640    throws IOException, InterruptedException {
2641    status.setStatus(
2642      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2643    long totalElapsedTime;
2644
2645    final TestBase t;
2646    try {
2647      if (AsyncTest.class.isAssignableFrom(cmd)) {
2648        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2649        Constructor<? extends AsyncTest> constructor =
2650          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2651        t = constructor.newInstance(asyncCon, opts, status);
2652      } else {
2653        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2654        Constructor<? extends Test> constructor =
2655          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2656        t = constructor.newInstance(con, opts, status);
2657      }
2658    } catch (NoSuchMethodException e) {
2659      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2660        + ".  It does not provide a constructor as described by "
2661        + "the javadoc comment.  Available constructors are: "
2662        + Arrays.toString(cmd.getConstructors()));
2663    } catch (Exception e) {
2664      throw new IllegalStateException("Failed to construct command class", e);
2665    }
2666    totalElapsedTime = t.test();
2667
2668    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2669      + " for " + opts.perClientRunRows + " rows" + " ("
2670      + calculateMbps((long) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2671        getAverageValueLength(opts), opts.families, opts.columns)
2672      + ")");
2673
2674    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2675      t.numOfReplyFromReplica, t.getLatencyHistogram());
2676  }
2677
2678  private static int getAverageValueLength(final TestOptions opts) {
2679    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2680  }
2681
2682  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2683    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2684    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2685    // the TestOptions introspection for us and dump the output in a readable format.
2686    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2687    Admin admin = null;
2688    Connection connection = null;
2689    try {
2690      connection = ConnectionFactory.createConnection(getConf());
2691      admin = connection.getAdmin();
2692      checkTable(admin, opts);
2693    } finally {
2694      if (admin != null) admin.close();
2695      if (connection != null) connection.close();
2696    }
2697    if (opts.nomapred) {
2698      doLocalClients(opts, getConf());
2699    } else {
2700      doMapReduce(opts, getConf());
2701    }
2702  }
2703
2704  protected void printUsage() {
2705    printUsage(PE_COMMAND_SHORTNAME, null);
2706  }
2707
2708  protected static void printUsage(final String message) {
2709    printUsage(PE_COMMAND_SHORTNAME, message);
2710  }
2711
2712  protected static void printUsageAndExit(final String message, final int exitCode) {
2713    printUsage(message);
2714    System.exit(exitCode);
2715  }
2716
2717  protected static void printUsage(final String shortName, final String message) {
2718    if (message != null && message.length() > 0) {
2719      System.err.println(message);
2720    }
2721    System.err.print("Usage: hbase " + shortName);
2722    System.err.println("  <OPTIONS> [-D<property=value>]* <command|class> <nclients>");
2723    System.err.println();
2724    System.err.println("General Options:");
2725    System.err.println(
2726      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2727    System.err
2728      .println(" oneCon          all the threads share the same connection. Default: False");
2729    System.err.println(" connCount          connections all threads share. "
2730      + "For example, if set to 2, then all thread share 2 connection. "
2731      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2732      + "if not, connCount=thread number");
2733
2734    System.err.println(" sampleRate      Execute test on a sample of total rows. Default: 1.0");
2735    System.err.println(" period          Report every 'period' rows: "
2736      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2737    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2738    System.err.println(
2739      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2740    System.err.println(" latency         Set to report operation latencies. Default: False");
2741    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2742      + "over lantencyThreshold, unit in millisecond, default 0");
2743    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2744      + " rows have been treated. Default: 0");
2745    System.err
2746      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2747    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2748      + "'valueSize'; set on read for stats on size: Default: Not set.");
2749    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2750      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2751    System.err.println();
2752    System.err.println("Table Creation / Write Tests:");
2753    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2754    System.err.println(
2755      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2756        + ".  In case of randomReads and randomSeekScans this could"
2757        + " be specified along with --size to specify the number of rows to be scanned within"
2758        + " the total range specified by the size.");
2759    System.err.println(
2760      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2761        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2762        + " use size to specify the end range and --rows"
2763        + " specifies the number of rows within that range. " + "Default: 1.0.");
2764    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2765    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2766    System.err.println(
2767      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2768    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2769      + "'valueSize' in zipf form: Default: Not set.");
2770    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2771    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2772    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2773      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2774    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2775      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2776      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2777    System.err.println(
2778      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2779    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2780      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2781    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2782    System.err.println(" columns         Columns to write per row. Default: 1");
2783    System.err
2784      .println(" families        Specify number of column families for the table. Default: 1");
2785    System.err.println();
2786    System.err.println("Read Tests:");
2787    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2788      + " there by not returning any thing back to the client.  Helps to check the server side"
2789      + " performance.  Uses FilterAllFilter internally. ");
2790    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2791      + "by randomRead. Default: disabled");
2792    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2793      + "inmemory as far as possible. Not guaranteed that reads are always served "
2794      + "from memory.  Default: false");
2795    System.err
2796      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2797    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2798    System.err
2799      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2800        + "Uses the CompactingMemstore");
2801    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2802    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2803    System.err.println(
2804      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2805    System.err.println(" caching         Scan caching to use. Default: 30");
2806    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2807    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2808    System.err.println(
2809      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2810    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2811    System.err.println();
2812    System.err.println(" Note: -D properties will be applied to the conf used. ");
2813    System.err.println("  For example: ");
2814    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2815    System.err.println("   -Dmapreduce.task.timeout=60000");
2816    System.err.println();
2817    System.err.println("Command:");
2818    for (CmdDescriptor command : COMMANDS.values()) {
2819      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2820    }
2821    System.err.println();
2822    System.err.println("Class:");
2823    System.err.println("To run any custom implementation of PerformanceEvaluation.Test, "
2824      + "provide the classname of the implementaion class in place of "
2825      + "command name and it will be loaded at runtime from classpath.:");
2826    System.err.println("Please consider to contribute back "
2827      + "this custom test impl into a builtin PE command for the benefit of the community");
2828    System.err.println();
2829    System.err.println("Args:");
2830    System.err.println(" nclients        Integer. Required. Total number of clients "
2831      + "(and HRegionServers) running. 1 <= value <= 500");
2832    System.err.println("Examples:");
2833    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2834    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2835    System.err.println(" To run 10 clients doing increments over ten rows:");
2836    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2837  }
2838
2839  /**
2840   * Parse options passed in via an arguments array. Assumes that array has been split on
2841   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2842   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2843   * arguments.
2844   */
2845  static TestOptions parseOpts(Queue<String> args) {
2846    TestOptions opts = new TestOptions();
2847
2848    String cmd = null;
2849    while ((cmd = args.poll()) != null) {
2850      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2851        // place item back onto queue so that caller knows parsing was incomplete
2852        args.add(cmd);
2853        break;
2854      }
2855
2856      final String nmr = "--nomapred";
2857      if (cmd.startsWith(nmr)) {
2858        opts.nomapred = true;
2859        continue;
2860      }
2861
2862      final String rows = "--rows=";
2863      if (cmd.startsWith(rows)) {
2864        opts.perClientRunRows = Long.parseLong(cmd.substring(rows.length()));
2865        continue;
2866      }
2867
2868      final String cycles = "--cycles=";
2869      if (cmd.startsWith(cycles)) {
2870        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2871        continue;
2872      }
2873
2874      final String sampleRate = "--sampleRate=";
2875      if (cmd.startsWith(sampleRate)) {
2876        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2877        continue;
2878      }
2879
2880      final String table = "--table=";
2881      if (cmd.startsWith(table)) {
2882        opts.tableName = cmd.substring(table.length());
2883        continue;
2884      }
2885
2886      final String startRow = "--startRow=";
2887      if (cmd.startsWith(startRow)) {
2888        opts.startRow = Long.parseLong(cmd.substring(startRow.length()));
2889        continue;
2890      }
2891
2892      final String compress = "--compress=";
2893      if (cmd.startsWith(compress)) {
2894        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2895        continue;
2896      }
2897
2898      final String encryption = "--encryption=";
2899      if (cmd.startsWith(encryption)) {
2900        opts.encryption = cmd.substring(encryption.length());
2901        continue;
2902      }
2903
2904      final String traceRate = "--traceRate=";
2905      if (cmd.startsWith(traceRate)) {
2906        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2907        continue;
2908      }
2909
2910      final String blockEncoding = "--blockEncoding=";
2911      if (cmd.startsWith(blockEncoding)) {
2912        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2913        continue;
2914      }
2915
2916      final String flushCommits = "--flushCommits=";
2917      if (cmd.startsWith(flushCommits)) {
2918        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2919        continue;
2920      }
2921
2922      final String writeToWAL = "--writeToWAL=";
2923      if (cmd.startsWith(writeToWAL)) {
2924        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2925        continue;
2926      }
2927
2928      final String presplit = "--presplit=";
2929      if (cmd.startsWith(presplit)) {
2930        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2931        continue;
2932      }
2933
2934      final String inMemory = "--inmemory=";
2935      if (cmd.startsWith(inMemory)) {
2936        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2937        continue;
2938      }
2939
2940      final String autoFlush = "--autoFlush=";
2941      if (cmd.startsWith(autoFlush)) {
2942        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2943        continue;
2944      }
2945
2946      final String onceCon = "--oneCon=";
2947      if (cmd.startsWith(onceCon)) {
2948        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2949        continue;
2950      }
2951
2952      final String connCount = "--connCount=";
2953      if (cmd.startsWith(connCount)) {
2954        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2955        continue;
2956      }
2957
2958      final String latencyThreshold = "--latencyThreshold=";
2959      if (cmd.startsWith(latencyThreshold)) {
2960        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2961        continue;
2962      }
2963
2964      final String latency = "--latency";
2965      if (cmd.startsWith(latency)) {
2966        opts.reportLatency = true;
2967        continue;
2968      }
2969
2970      final String multiGet = "--multiGet=";
2971      if (cmd.startsWith(multiGet)) {
2972        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2973        continue;
2974      }
2975
2976      final String multiPut = "--multiPut=";
2977      if (cmd.startsWith(multiPut)) {
2978        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2979        continue;
2980      }
2981
2982      final String useTags = "--usetags=";
2983      if (cmd.startsWith(useTags)) {
2984        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2985        continue;
2986      }
2987
2988      final String noOfTags = "--numoftags=";
2989      if (cmd.startsWith(noOfTags)) {
2990        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2991        continue;
2992      }
2993
2994      final String replicas = "--replicas=";
2995      if (cmd.startsWith(replicas)) {
2996        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2997        continue;
2998      }
2999
3000      final String filterOutAll = "--filterAll";
3001      if (cmd.startsWith(filterOutAll)) {
3002        opts.filterAll = true;
3003        continue;
3004      }
3005
3006      final String size = "--size=";
3007      if (cmd.startsWith(size)) {
3008        opts.size = Float.parseFloat(cmd.substring(size.length()));
3009        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
3010        continue;
3011      }
3012
3013      final String splitPolicy = "--splitPolicy=";
3014      if (cmd.startsWith(splitPolicy)) {
3015        opts.splitPolicy = cmd.substring(splitPolicy.length());
3016        continue;
3017      }
3018
3019      final String randomSleep = "--randomSleep=";
3020      if (cmd.startsWith(randomSleep)) {
3021        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
3022        continue;
3023      }
3024
3025      final String measureAfter = "--measureAfter=";
3026      if (cmd.startsWith(measureAfter)) {
3027        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
3028        continue;
3029      }
3030
3031      final String bloomFilter = "--bloomFilter=";
3032      if (cmd.startsWith(bloomFilter)) {
3033        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
3034        continue;
3035      }
3036
3037      final String blockSize = "--blockSize=";
3038      if (cmd.startsWith(blockSize)) {
3039        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
3040        continue;
3041      }
3042
3043      final String valueSize = "--valueSize=";
3044      if (cmd.startsWith(valueSize)) {
3045        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
3046        continue;
3047      }
3048
3049      final String valueRandom = "--valueRandom";
3050      if (cmd.startsWith(valueRandom)) {
3051        opts.valueRandom = true;
3052        continue;
3053      }
3054
3055      final String valueZipf = "--valueZipf";
3056      if (cmd.startsWith(valueZipf)) {
3057        opts.valueZipf = true;
3058        continue;
3059      }
3060
3061      final String period = "--period=";
3062      if (cmd.startsWith(period)) {
3063        opts.period = Integer.parseInt(cmd.substring(period.length()));
3064        continue;
3065      }
3066
3067      final String addColumns = "--addColumns=";
3068      if (cmd.startsWith(addColumns)) {
3069        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
3070        continue;
3071      }
3072
3073      final String inMemoryCompaction = "--inmemoryCompaction=";
3074      if (cmd.startsWith(inMemoryCompaction)) {
3075        opts.inMemoryCompaction =
3076          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
3077        continue;
3078      }
3079
3080      final String columns = "--columns=";
3081      if (cmd.startsWith(columns)) {
3082        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
3083        continue;
3084      }
3085
3086      final String families = "--families=";
3087      if (cmd.startsWith(families)) {
3088        opts.families = Integer.parseInt(cmd.substring(families.length()));
3089        continue;
3090      }
3091
3092      final String caching = "--caching=";
3093      if (cmd.startsWith(caching)) {
3094        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
3095        continue;
3096      }
3097
3098      final String asyncPrefetch = "--asyncPrefetch";
3099      if (cmd.startsWith(asyncPrefetch)) {
3100        opts.asyncPrefetch = true;
3101        continue;
3102      }
3103
3104      final String cacheBlocks = "--cacheBlocks=";
3105      if (cmd.startsWith(cacheBlocks)) {
3106        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
3107        continue;
3108      }
3109
3110      final String scanReadType = "--scanReadType=";
3111      if (cmd.startsWith(scanReadType)) {
3112        opts.scanReadType =
3113          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
3114        continue;
3115      }
3116
3117      final String bufferSize = "--bufferSize=";
3118      if (cmd.startsWith(bufferSize)) {
3119        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
3120        continue;
3121      }
3122
3123      final String commandPropertiesFile = "--commandPropertiesFile=";
3124      if (cmd.startsWith(commandPropertiesFile)) {
3125        String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length()));
3126        Properties properties = new Properties();
3127        try {
3128          properties
3129            .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName));
3130          opts.commandProperties = properties;
3131        } catch (IOException e) {
3132          LOG.error("Failed to load metricIds from properties file", e);
3133        }
3134        continue;
3135      }
3136
3137      validateParsedOpts(opts);
3138
3139      if (isCommandClass(cmd)) {
3140        opts.cmdName = cmd;
3141        try {
3142          opts.numClientThreads = Integer.parseInt(args.remove());
3143        } catch (NoSuchElementException | NumberFormatException e) {
3144          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
3145        }
3146        opts = calculateRowsAndSize(opts);
3147        break;
3148      } else {
3149        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
3150      }
3151
3152      // Not matching any option or command.
3153      System.err.println("Error: Wrong option or command: " + cmd);
3154      args.add(cmd);
3155      break;
3156    }
3157    return opts;
3158  }
3159
3160  /**
3161   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3162   */
3163  private static void validateParsedOpts(TestOptions opts) {
3164
3165    if (!opts.autoFlush && opts.multiPut > 0) {
3166      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3167    }
3168
3169    if (opts.oneCon && opts.connCount > 1) {
3170      throw new IllegalArgumentException(
3171        "oneCon is set to true, " + "connCount should not bigger than 1");
3172    }
3173
3174    if (opts.valueZipf && opts.valueRandom) {
3175      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3176    }
3177  }
3178
3179  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3180    int rowsPerGB = getRowsPerGB(opts);
3181    if (
3182      (opts.getCmdName() != null
3183        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3184        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3185    ) {
3186      opts.totalRows = (long) (opts.size * rowsPerGB);
3187    } else if (opts.size != DEFAULT_OPTS.size) {
3188      // total size in GB specified
3189      opts.totalRows = (long) (opts.size * rowsPerGB);
3190      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3191    } else {
3192      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3193      // Cast to float to ensure floating-point division
3194      opts.size = (float) opts.totalRows / rowsPerGB;
3195    }
3196    return opts;
3197  }
3198
3199  static int getRowsPerGB(final TestOptions opts) {
3200    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3201      * opts.getColumns());
3202  }
3203
3204  @Override
3205  public int run(String[] args) throws Exception {
3206    // Process command-line args. TODO: Better cmd-line processing
3207    // (but hopefully something not as painful as cli options).
3208    int errCode = -1;
3209    if (args.length < 1) {
3210      printUsage();
3211      return errCode;
3212    }
3213
3214    try {
3215      LinkedList<String> argv = new LinkedList<>();
3216      argv.addAll(Arrays.asList(args));
3217      TestOptions opts = parseOpts(argv);
3218
3219      // args remaining, print help and exit
3220      if (!argv.isEmpty()) {
3221        errCode = 0;
3222        printUsage();
3223        return errCode;
3224      }
3225
3226      // must run at least 1 client
3227      if (opts.numClientThreads <= 0) {
3228        throw new IllegalArgumentException("Number of clients must be > 0");
3229      }
3230
3231      // cmdName should not be null, print help and exit
3232      if (opts.cmdName == null) {
3233        printUsage();
3234        return errCode;
3235      }
3236
3237      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3238      if (cmdClass != null) {
3239        runTest(cmdClass, opts);
3240        errCode = 0;
3241      }
3242
3243    } catch (Exception e) {
3244      e.printStackTrace();
3245    }
3246
3247    return errCode;
3248  }
3249
3250  private static boolean isCommandClass(String cmd) {
3251    return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd);
3252  }
3253
3254  private static boolean isCustomTestClass(String cmd) {
3255    Class<? extends Test> cmdClass;
3256    try {
3257      cmdClass =
3258        (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd);
3259      addCommandDescriptor(cmdClass, cmd, "custom command");
3260      return true;
3261    } catch (Throwable th) {
3262      LOG.info("No class found for command: " + cmd, th);
3263      return false;
3264    }
3265  }
3266
3267  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3268    CmdDescriptor descriptor = COMMANDS.get(cmd);
3269    return descriptor != null ? descriptor.getCmdClass() : null;
3270  }
3271
3272  public static void main(final String[] args) throws Exception {
3273    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3274    System.exit(res);
3275  }
3276}