001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Queue;
040import java.util.Random;
041import java.util.TreeMap;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.ThreadLocalRandom;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.AsyncConnection;
056import org.apache.hadoop.hbase.client.AsyncTable;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionFactory;
061import org.apache.hadoop.hbase.client.Consistency;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Increment;
066import org.apache.hadoop.hbase.client.Put;
067import org.apache.hadoop.hbase.client.RegionInfo;
068import org.apache.hadoop.hbase.client.RegionInfoBuilder;
069import org.apache.hadoop.hbase.client.RegionLocator;
070import org.apache.hadoop.hbase.client.Result;
071import org.apache.hadoop.hbase.client.ResultScanner;
072import org.apache.hadoop.hbase.client.RowMutations;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
076import org.apache.hadoop.hbase.filter.BinaryComparator;
077import org.apache.hadoop.hbase.filter.Filter;
078import org.apache.hadoop.hbase.filter.FilterAllFilter;
079import org.apache.hadoop.hbase.filter.FilterList;
080import org.apache.hadoop.hbase.filter.PageFilter;
081import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
082import org.apache.hadoop.hbase.filter.WhileMatchFilter;
083import org.apache.hadoop.hbase.io.compress.Compression;
084import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
085import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
086import org.apache.hadoop.hbase.regionserver.BloomType;
087import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
088import org.apache.hadoop.hbase.trace.TraceUtil;
089import org.apache.hadoop.hbase.util.ByteArrayHashKey;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
092import org.apache.hadoop.hbase.util.GsonUtil;
093import org.apache.hadoop.hbase.util.Hash;
094import org.apache.hadoop.hbase.util.MurmurHash;
095import org.apache.hadoop.hbase.util.Pair;
096import org.apache.hadoop.hbase.util.RandomDistribution;
097import org.apache.hadoop.hbase.util.YammerHistogramUtils;
098import org.apache.hadoop.io.LongWritable;
099import org.apache.hadoop.io.Text;
100import org.apache.hadoop.mapreduce.Job;
101import org.apache.hadoop.mapreduce.Mapper;
102import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
103import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
104import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
105import org.apache.hadoop.util.Tool;
106import org.apache.hadoop.util.ToolRunner;
107import org.apache.yetus.audience.InterfaceAudience;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
113import org.apache.hbase.thirdparty.com.google.gson.Gson;
114
115/**
116 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
117 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
118 * etc.). Pass on the command-line which test to run and how many clients are participating in this
119 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
120 * <p>
121 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
122 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
123 * pages 8-10.
124 * <p>
125 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
126 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
127 * about 1GB of data, unless specified otherwise.
128 */
129@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
130public class PerformanceEvaluation extends Configured implements Tool {
131  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
132  static final String RANDOM_READ = "randomRead";
133  static final String PE_COMMAND_SHORTNAME = "pe";
134  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
135  private static final Gson GSON = GsonUtil.createGson().create();
136
137  public static final String TABLE_NAME = "TestTable";
138  public static final String FAMILY_NAME_BASE = "info";
139  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
140  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
141  public static final int DEFAULT_VALUE_LENGTH = 1000;
142  public static final int ROW_LENGTH = 26;
143
144  private static final int ONE_GB = 1024 * 1024 * 1000;
145  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
146  // TODO : should we make this configurable
147  private static final int TAG_LENGTH = 256;
148  private static final DecimalFormat FMT = new DecimalFormat("0.##");
149  private static final MathContext CXT = MathContext.DECIMAL64;
150  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
151  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
152  private static final TestOptions DEFAULT_OPTS = new TestOptions();
153
154  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
155  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
156
157  static {
158    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
159      "Run async random read test");
160    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
161      "Run async random write test");
162    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
163      "Run async sequential read test");
164    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
165      "Run async sequential write test");
166    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
167    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
168    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
169    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
170      "Run random seek and scan 100 test");
171    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
172      "Run random seek scan with both start and stop row (max 10 rows)");
173    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
174      "Run random seek scan with both start and stop row (max 100 rows)");
175    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
176      "Run random seek scan with both start and stop row (max 1000 rows)");
177    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
178      "Run random seek scan with both start and stop row (max 10000 rows)");
179    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
180    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
181    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
182    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
183      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
184    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
185    addCommandDescriptor(FilteredScanTest.class, "filterScan",
186      "Run scan test using a filter to find a specific row based on it's value "
187        + "(make sure to use --rows=20)");
188    addCommandDescriptor(IncrementTest.class, "increment",
189      "Increment on each row; clients overlap on keyspace so some concurrent operations");
190    addCommandDescriptor(AppendTest.class, "append",
191      "Append on each row; clients overlap on keyspace so some concurrent operations");
192    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
193      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
194    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
195      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
196    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
197      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
198    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
199      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
200  }
201
202  /**
203   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
204   * associated properties.
205   */
206  protected static enum Counter {
207    /** elapsed time */
208    ELAPSED_TIME,
209    /** number of rows */
210    ROWS
211  }
212
213  protected static class RunResult implements Comparable<RunResult> {
214    public RunResult(long duration, Histogram hist) {
215      this.duration = duration;
216      this.hist = hist;
217      numbOfReplyOverThreshold = 0;
218      numOfReplyFromReplica = 0;
219    }
220
221    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
222      Histogram hist) {
223      this.duration = duration;
224      this.hist = hist;
225      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
226      this.numOfReplyFromReplica = numOfReplyFromReplica;
227    }
228
229    public final long duration;
230    public final Histogram hist;
231    public final long numbOfReplyOverThreshold;
232    public final long numOfReplyFromReplica;
233
234    @Override
235    public String toString() {
236      return Long.toString(duration);
237    }
238
239    @Override
240    public int compareTo(RunResult o) {
241      return Long.compare(this.duration, o.duration);
242    }
243  }
244
245  /**
246   * Constructor
247   * @param conf Configuration object
248   */
249  public PerformanceEvaluation(final Configuration conf) {
250    super(conf);
251  }
252
253  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
254    String description) {
255    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
256    COMMANDS.put(name, cmdDescriptor);
257  }
258
259  /**
260   * Implementations can have their status set.
261   */
262  interface Status {
263    /**
264     * Sets status
265     * @param msg status message n
266     */
267    void setStatus(final String msg) throws IOException;
268  }
269
270  /**
271   * MapReduce job that runs a performance evaluation client in each map task.
272   */
273  public static class EvaluationMapTask
274    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
275
276    /** configuration parameter name that contains the command */
277    public final static String CMD_KEY = "EvaluationMapTask.command";
278    /** configuration parameter name that contains the PE impl */
279    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
280
281    private Class<? extends Test> cmd;
282
283    @Override
284    protected void setup(Context context) throws IOException, InterruptedException {
285      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
286
287      // this is required so that extensions of PE are instantiated within the
288      // map reduce task...
289      Class<? extends PerformanceEvaluation> peClass =
290        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
291      try {
292        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
293      } catch (Exception e) {
294        throw new IllegalStateException("Could not instantiate PE instance", e);
295      }
296    }
297
298    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
299      try {
300        return Class.forName(className).asSubclass(type);
301      } catch (ClassNotFoundException e) {
302        throw new IllegalStateException("Could not find class for name: " + className, e);
303      }
304    }
305
306    @Override
307    protected void map(LongWritable key, Text value, final Context context)
308      throws IOException, InterruptedException {
309
310      Status status = new Status() {
311        @Override
312        public void setStatus(String msg) {
313          context.setStatus(msg);
314        }
315      };
316
317      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
318      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
319      final Connection con = ConnectionFactory.createConnection(conf);
320      AsyncConnection asyncCon = null;
321      try {
322        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
323      } catch (ExecutionException e) {
324        throw new IOException(e);
325      }
326
327      // Evaluation task
328      RunResult result =
329        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
330      // Collect how much time the thing took. Report as map output and
331      // to the ELAPSED_TIME counter.
332      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
333      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
334      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
335      context.progress();
336    }
337  }
338
339  /*
340   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
341   * is specified or when the existing table's region replica count doesn't match {@code
342   * opts.replicas}.
343   */
344  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
345    TableName tableName = TableName.valueOf(opts.tableName);
346    boolean needsDelete = false, exists = admin.tableExists(tableName);
347    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
348      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
349    if (!exists && isReadCmd) {
350      throw new IllegalStateException(
351        "Must specify an existing table for read commands. Run a write command first.");
352    }
353    HTableDescriptor desc =
354      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
355    byte[][] splits = getSplits(opts);
356
357    // recreate the table when user has requested presplit or when existing
358    // {RegionSplitPolicy,replica count} does not match requested, or when the
359    // number of column families does not match requested.
360    if (
361      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
362        || (!isReadCmd && desc != null
363          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
364        || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
365        || (desc != null && desc.getColumnFamilyCount() != opts.families)
366    ) {
367      needsDelete = true;
368      // wait, why did it delete my table?!?
369      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
370        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
371        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
372        .add("replicas", opts.replicas).add("families", opts.families).toString());
373    }
374
375    // remove an existing table
376    if (needsDelete) {
377      if (admin.isTableEnabled(tableName)) {
378        admin.disableTable(tableName);
379      }
380      admin.deleteTable(tableName);
381    }
382
383    // table creation is necessary
384    if (!exists || needsDelete) {
385      desc = getTableDescriptor(opts);
386      if (splits != null) {
387        if (LOG.isDebugEnabled()) {
388          for (int i = 0; i < splits.length; i++) {
389            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
390          }
391        }
392      }
393      admin.createTable(desc, splits);
394      LOG.info("Table " + desc + " created");
395    }
396    return admin.tableExists(tableName);
397  }
398
399  /**
400   * Create an HTableDescriptor from provided TestOptions.
401   */
402  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
403    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
404    for (int family = 0; family < opts.families; family++) {
405      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
406      HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
407      familyDesc.setDataBlockEncoding(opts.blockEncoding);
408      familyDesc.setCompressionType(opts.compression);
409      familyDesc.setEncryptionType(opts.encryption);
410      familyDesc.setBloomFilterType(opts.bloomType);
411      familyDesc.setBlocksize(opts.blockSize);
412      if (opts.inMemoryCF) {
413        familyDesc.setInMemory(true);
414      }
415      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
416      tableDesc.addFamily(familyDesc);
417    }
418    if (opts.replicas != DEFAULT_OPTS.replicas) {
419      tableDesc.setRegionReplication(opts.replicas);
420    }
421    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
422      tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
423    }
424    return tableDesc;
425  }
426
427  /**
428   * generates splits based on total number of rows and specified split regions
429   */
430  protected static byte[][] getSplits(TestOptions opts) {
431    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
432
433    int numSplitPoints = opts.presplitRegions - 1;
434    byte[][] splits = new byte[numSplitPoints][];
435    int jump = opts.totalRows / opts.presplitRegions;
436    for (int i = 0; i < numSplitPoints; i++) {
437      int rowkey = jump * (1 + i);
438      splits[i] = format(rowkey);
439    }
440    return splits;
441  }
442
443  static void setupConnectionCount(final TestOptions opts) {
444    if (opts.oneCon) {
445      opts.connCount = 1;
446    } else {
447      if (opts.connCount == -1) {
448        // set to thread number if connCount is not set
449        opts.connCount = opts.numClientThreads;
450      }
451    }
452  }
453
454  /*
455   * Run all clients in this vm each to its own thread.
456   */
457  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
458    throws IOException, InterruptedException, ExecutionException {
459    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
460    assert cmd != null;
461    @SuppressWarnings("unchecked")
462    Future<RunResult>[] threads = new Future[opts.numClientThreads];
463    RunResult[] results = new RunResult[opts.numClientThreads];
464    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
465      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
466    setupConnectionCount(opts);
467    final Connection[] cons = new Connection[opts.connCount];
468    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
469    for (int i = 0; i < opts.connCount; i++) {
470      cons[i] = ConnectionFactory.createConnection(conf);
471      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
472    }
473    LOG
474      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
475    for (int i = 0; i < threads.length; i++) {
476      final int index = i;
477      threads[i] = pool.submit(new Callable<RunResult>() {
478        @Override
479        public RunResult call() throws Exception {
480          TestOptions threadOpts = new TestOptions(opts);
481          final Connection con = cons[index % cons.length];
482          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
483          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
484          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
485            @Override
486            public void setStatus(final String msg) throws IOException {
487              LOG.info(msg);
488            }
489          });
490          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
491            + "ms over " + threadOpts.perClientRunRows + " rows");
492          if (opts.latencyThreshold > 0) {
493            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
494              + "(ms) is " + run.numbOfReplyOverThreshold);
495          }
496          return run;
497        }
498      });
499    }
500    pool.shutdown();
501
502    for (int i = 0; i < threads.length; i++) {
503      try {
504        results[i] = threads[i].get();
505      } catch (ExecutionException e) {
506        throw new IOException(e.getCause());
507      }
508    }
509    final String test = cmd.getSimpleName();
510    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
511    Arrays.sort(results);
512    long total = 0;
513    float avgLatency = 0;
514    float avgTPS = 0;
515    long replicaWins = 0;
516    for (RunResult result : results) {
517      total += result.duration;
518      avgLatency += result.hist.getSnapshot().getMean();
519      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
520      replicaWins += result.numOfReplyFromReplica;
521    }
522    avgTPS *= 1000; // ms to second
523    avgLatency = avgLatency / results.length;
524    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
525      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
526    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
527    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
528    if (opts.replicas > 1) {
529      LOG.info("[results from replica regions] " + replicaWins);
530    }
531
532    for (int i = 0; i < opts.connCount; i++) {
533      cons[i].close();
534      asyncCons[i].close();
535    }
536
537    return results;
538  }
539
540  /*
541   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
542   * out an input file with instruction per client regards which row they are to start on.
543   * @param cmd Command to run. n
544   */
545  static Job doMapReduce(TestOptions opts, final Configuration conf)
546    throws IOException, InterruptedException, ClassNotFoundException {
547    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
548    assert cmd != null;
549    Path inputDir = writeInputFile(conf, opts);
550    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
551    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
552    Job job = Job.getInstance(conf);
553    job.setJarByClass(PerformanceEvaluation.class);
554    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
555
556    job.setInputFormatClass(NLineInputFormat.class);
557    NLineInputFormat.setInputPaths(job, inputDir);
558    // this is default, but be explicit about it just in case.
559    NLineInputFormat.setNumLinesPerSplit(job, 1);
560
561    job.setOutputKeyClass(LongWritable.class);
562    job.setOutputValueClass(LongWritable.class);
563
564    job.setMapperClass(EvaluationMapTask.class);
565    job.setReducerClass(LongSumReducer.class);
566
567    job.setNumReduceTasks(1);
568
569    job.setOutputFormatClass(TextOutputFormat.class);
570    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
571
572    TableMapReduceUtil.addDependencyJars(job);
573    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
574                                                                                            // metrics
575      Gson.class, // gson
576      FilterAllFilter.class // hbase-server tests jar
577    );
578
579    TableMapReduceUtil.initCredentials(job);
580
581    job.waitForCompletion(true);
582    return job;
583  }
584
585  /**
586   * Each client has one mapper to do the work, and client do the resulting count in a map task.
587   */
588
589  static String JOB_INPUT_FILENAME = "input.txt";
590
591  /*
592   * Write input file of offsets-per-client for the mapreduce job.
593   * @param c Configuration
594   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME n
595   */
596  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
597    return writeInputFile(c, opts, new Path("."));
598  }
599
600  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
601    throws IOException {
602    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
603    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
604    Path inputDir = new Path(jobdir, "inputs");
605
606    FileSystem fs = FileSystem.get(c);
607    fs.mkdirs(inputDir);
608
609    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
610    PrintStream out = new PrintStream(fs.create(inputFile));
611    // Make input random.
612    Map<Integer, String> m = new TreeMap<>();
613    Hash h = MurmurHash.getInstance();
614    int perClientRows = (opts.totalRows / opts.numClientThreads);
615    try {
616      for (int j = 0; j < opts.numClientThreads; j++) {
617        TestOptions next = new TestOptions(opts);
618        next.startRow = j * perClientRows;
619        next.perClientRunRows = perClientRows;
620        String s = GSON.toJson(next);
621        LOG.info("Client=" + j + ", input=" + s);
622        byte[] b = Bytes.toBytes(s);
623        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
624        m.put(hash, s);
625      }
626      for (Map.Entry<Integer, String> e : m.entrySet()) {
627        out.println(e.getValue());
628      }
629    } finally {
630      out.close();
631    }
632    return inputDir;
633  }
634
635  /**
636   * Describes a command.
637   */
638  static class CmdDescriptor {
639    private Class<? extends TestBase> cmdClass;
640    private String name;
641    private String description;
642
643    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
644      this.cmdClass = cmdClass;
645      this.name = name;
646      this.description = description;
647    }
648
649    public Class<? extends TestBase> getCmdClass() {
650      return cmdClass;
651    }
652
653    public String getName() {
654      return name;
655    }
656
657    public String getDescription() {
658      return description;
659    }
660  }
661
662  /**
663   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
664   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
665   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
666   * need to add to the clone constructor below copying your new option from the 'that' to the
667   * 'this'. Look for 'clone' below.
668   */
669  static class TestOptions {
670    String cmdName = null;
671    boolean nomapred = false;
672    boolean filterAll = false;
673    int startRow = 0;
674    float size = 1.0f;
675    int perClientRunRows = DEFAULT_ROWS_PER_GB;
676    int numClientThreads = 1;
677    int totalRows = DEFAULT_ROWS_PER_GB;
678    int measureAfter = 0;
679    float sampleRate = 1.0f;
680    /**
681     * @deprecated Useless after switching to OpenTelemetry
682     */
683    @Deprecated
684    double traceRate = 0.0;
685    String tableName = TABLE_NAME;
686    boolean flushCommits = true;
687    boolean writeToWAL = true;
688    boolean autoFlush = false;
689    boolean oneCon = false;
690    int connCount = -1; // wil decide the actual num later
691    boolean useTags = false;
692    int noOfTags = 1;
693    boolean reportLatency = false;
694    int multiGet = 0;
695    int multiPut = 0;
696    int randomSleep = 0;
697    boolean inMemoryCF = false;
698    int presplitRegions = 0;
699    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
700    String splitPolicy = null;
701    Compression.Algorithm compression = Compression.Algorithm.NONE;
702    String encryption = null;
703    BloomType bloomType = BloomType.ROW;
704    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
705    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
706    boolean valueRandom = false;
707    boolean valueZipf = false;
708    int valueSize = DEFAULT_VALUE_LENGTH;
709    int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
710    int cycles = 1;
711    int columns = 1;
712    int families = 1;
713    int caching = 30;
714    int latencyThreshold = 0; // in millsecond
715    boolean addColumns = true;
716    MemoryCompactionPolicy inMemoryCompaction =
717      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
718    boolean asyncPrefetch = false;
719    boolean cacheBlocks = true;
720    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
721    long bufferSize = 2l * 1024l * 1024l;
722
723    public TestOptions() {
724    }
725
726    /**
727     * Clone constructor.
728     * @param that Object to copy from.
729     */
730    public TestOptions(TestOptions that) {
731      this.cmdName = that.cmdName;
732      this.cycles = that.cycles;
733      this.nomapred = that.nomapred;
734      this.startRow = that.startRow;
735      this.size = that.size;
736      this.perClientRunRows = that.perClientRunRows;
737      this.numClientThreads = that.numClientThreads;
738      this.totalRows = that.totalRows;
739      this.sampleRate = that.sampleRate;
740      this.traceRate = that.traceRate;
741      this.tableName = that.tableName;
742      this.flushCommits = that.flushCommits;
743      this.writeToWAL = that.writeToWAL;
744      this.autoFlush = that.autoFlush;
745      this.oneCon = that.oneCon;
746      this.connCount = that.connCount;
747      this.useTags = that.useTags;
748      this.noOfTags = that.noOfTags;
749      this.reportLatency = that.reportLatency;
750      this.latencyThreshold = that.latencyThreshold;
751      this.multiGet = that.multiGet;
752      this.multiPut = that.multiPut;
753      this.inMemoryCF = that.inMemoryCF;
754      this.presplitRegions = that.presplitRegions;
755      this.replicas = that.replicas;
756      this.splitPolicy = that.splitPolicy;
757      this.compression = that.compression;
758      this.encryption = that.encryption;
759      this.blockEncoding = that.blockEncoding;
760      this.filterAll = that.filterAll;
761      this.bloomType = that.bloomType;
762      this.blockSize = that.blockSize;
763      this.valueRandom = that.valueRandom;
764      this.valueZipf = that.valueZipf;
765      this.valueSize = that.valueSize;
766      this.period = that.period;
767      this.randomSleep = that.randomSleep;
768      this.measureAfter = that.measureAfter;
769      this.addColumns = that.addColumns;
770      this.columns = that.columns;
771      this.families = that.families;
772      this.caching = that.caching;
773      this.inMemoryCompaction = that.inMemoryCompaction;
774      this.asyncPrefetch = that.asyncPrefetch;
775      this.cacheBlocks = that.cacheBlocks;
776      this.scanReadType = that.scanReadType;
777      this.bufferSize = that.bufferSize;
778    }
779
780    public int getCaching() {
781      return this.caching;
782    }
783
784    public void setCaching(final int caching) {
785      this.caching = caching;
786    }
787
788    public int getColumns() {
789      return this.columns;
790    }
791
792    public void setColumns(final int columns) {
793      this.columns = columns;
794    }
795
796    public int getFamilies() {
797      return this.families;
798    }
799
800    public void setFamilies(final int families) {
801      this.families = families;
802    }
803
804    public int getCycles() {
805      return this.cycles;
806    }
807
808    public void setCycles(final int cycles) {
809      this.cycles = cycles;
810    }
811
812    public boolean isValueZipf() {
813      return valueZipf;
814    }
815
816    public void setValueZipf(boolean valueZipf) {
817      this.valueZipf = valueZipf;
818    }
819
820    public String getCmdName() {
821      return cmdName;
822    }
823
824    public void setCmdName(String cmdName) {
825      this.cmdName = cmdName;
826    }
827
828    public int getRandomSleep() {
829      return randomSleep;
830    }
831
832    public void setRandomSleep(int randomSleep) {
833      this.randomSleep = randomSleep;
834    }
835
836    public int getReplicas() {
837      return replicas;
838    }
839
840    public void setReplicas(int replicas) {
841      this.replicas = replicas;
842    }
843
844    public String getSplitPolicy() {
845      return splitPolicy;
846    }
847
848    public void setSplitPolicy(String splitPolicy) {
849      this.splitPolicy = splitPolicy;
850    }
851
852    public void setNomapred(boolean nomapred) {
853      this.nomapred = nomapred;
854    }
855
856    public void setFilterAll(boolean filterAll) {
857      this.filterAll = filterAll;
858    }
859
860    public void setStartRow(int startRow) {
861      this.startRow = startRow;
862    }
863
864    public void setSize(float size) {
865      this.size = size;
866    }
867
868    public void setPerClientRunRows(int perClientRunRows) {
869      this.perClientRunRows = perClientRunRows;
870    }
871
872    public void setNumClientThreads(int numClientThreads) {
873      this.numClientThreads = numClientThreads;
874    }
875
876    public void setTotalRows(int totalRows) {
877      this.totalRows = totalRows;
878    }
879
880    public void setSampleRate(float sampleRate) {
881      this.sampleRate = sampleRate;
882    }
883
884    public void setTraceRate(double traceRate) {
885      this.traceRate = traceRate;
886    }
887
888    public void setTableName(String tableName) {
889      this.tableName = tableName;
890    }
891
892    public void setFlushCommits(boolean flushCommits) {
893      this.flushCommits = flushCommits;
894    }
895
896    public void setWriteToWAL(boolean writeToWAL) {
897      this.writeToWAL = writeToWAL;
898    }
899
900    public void setAutoFlush(boolean autoFlush) {
901      this.autoFlush = autoFlush;
902    }
903
904    public void setOneCon(boolean oneCon) {
905      this.oneCon = oneCon;
906    }
907
908    public int getConnCount() {
909      return connCount;
910    }
911
912    public void setConnCount(int connCount) {
913      this.connCount = connCount;
914    }
915
916    public void setUseTags(boolean useTags) {
917      this.useTags = useTags;
918    }
919
920    public void setNoOfTags(int noOfTags) {
921      this.noOfTags = noOfTags;
922    }
923
924    public void setReportLatency(boolean reportLatency) {
925      this.reportLatency = reportLatency;
926    }
927
928    public void setMultiGet(int multiGet) {
929      this.multiGet = multiGet;
930    }
931
932    public void setMultiPut(int multiPut) {
933      this.multiPut = multiPut;
934    }
935
936    public void setInMemoryCF(boolean inMemoryCF) {
937      this.inMemoryCF = inMemoryCF;
938    }
939
940    public void setPresplitRegions(int presplitRegions) {
941      this.presplitRegions = presplitRegions;
942    }
943
944    public void setCompression(Compression.Algorithm compression) {
945      this.compression = compression;
946    }
947
948    public void setEncryption(String encryption) {
949      this.encryption = encryption;
950    }
951
952    public void setBloomType(BloomType bloomType) {
953      this.bloomType = bloomType;
954    }
955
956    public void setBlockSize(int blockSize) {
957      this.blockSize = blockSize;
958    }
959
960    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
961      this.blockEncoding = blockEncoding;
962    }
963
964    public void setValueRandom(boolean valueRandom) {
965      this.valueRandom = valueRandom;
966    }
967
968    public void setValueSize(int valueSize) {
969      this.valueSize = valueSize;
970    }
971
972    public void setBufferSize(long bufferSize) {
973      this.bufferSize = bufferSize;
974    }
975
976    public void setPeriod(int period) {
977      this.period = period;
978    }
979
980    public boolean isNomapred() {
981      return nomapred;
982    }
983
984    public boolean isFilterAll() {
985      return filterAll;
986    }
987
988    public int getStartRow() {
989      return startRow;
990    }
991
992    public float getSize() {
993      return size;
994    }
995
996    public int getPerClientRunRows() {
997      return perClientRunRows;
998    }
999
1000    public int getNumClientThreads() {
1001      return numClientThreads;
1002    }
1003
1004    public int getTotalRows() {
1005      return totalRows;
1006    }
1007
1008    public float getSampleRate() {
1009      return sampleRate;
1010    }
1011
1012    public double getTraceRate() {
1013      return traceRate;
1014    }
1015
1016    public String getTableName() {
1017      return tableName;
1018    }
1019
1020    public boolean isFlushCommits() {
1021      return flushCommits;
1022    }
1023
1024    public boolean isWriteToWAL() {
1025      return writeToWAL;
1026    }
1027
1028    public boolean isAutoFlush() {
1029      return autoFlush;
1030    }
1031
1032    public boolean isUseTags() {
1033      return useTags;
1034    }
1035
1036    public int getNoOfTags() {
1037      return noOfTags;
1038    }
1039
1040    public boolean isReportLatency() {
1041      return reportLatency;
1042    }
1043
1044    public int getMultiGet() {
1045      return multiGet;
1046    }
1047
1048    public int getMultiPut() {
1049      return multiPut;
1050    }
1051
1052    public boolean isInMemoryCF() {
1053      return inMemoryCF;
1054    }
1055
1056    public int getPresplitRegions() {
1057      return presplitRegions;
1058    }
1059
1060    public Compression.Algorithm getCompression() {
1061      return compression;
1062    }
1063
1064    public String getEncryption() {
1065      return encryption;
1066    }
1067
1068    public DataBlockEncoding getBlockEncoding() {
1069      return blockEncoding;
1070    }
1071
1072    public boolean isValueRandom() {
1073      return valueRandom;
1074    }
1075
1076    public int getValueSize() {
1077      return valueSize;
1078    }
1079
1080    public int getPeriod() {
1081      return period;
1082    }
1083
1084    public BloomType getBloomType() {
1085      return bloomType;
1086    }
1087
1088    public int getBlockSize() {
1089      return blockSize;
1090    }
1091
1092    public boolean isOneCon() {
1093      return oneCon;
1094    }
1095
1096    public int getMeasureAfter() {
1097      return measureAfter;
1098    }
1099
1100    public void setMeasureAfter(int measureAfter) {
1101      this.measureAfter = measureAfter;
1102    }
1103
1104    public boolean getAddColumns() {
1105      return addColumns;
1106    }
1107
1108    public void setAddColumns(boolean addColumns) {
1109      this.addColumns = addColumns;
1110    }
1111
1112    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1113      this.inMemoryCompaction = inMemoryCompaction;
1114    }
1115
1116    public MemoryCompactionPolicy getInMemoryCompaction() {
1117      return this.inMemoryCompaction;
1118    }
1119
1120    public long getBufferSize() {
1121      return this.bufferSize;
1122    }
1123  }
1124
1125  /*
1126   * A test. Subclass to particularize what happens per row.
1127   */
1128  static abstract class TestBase {
1129    // Below is make it so when Tests are all running in the one
1130    // jvm, that they each have a differently seeded Random.
1131    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1132
1133    private static long nextRandomSeed() {
1134      return randomSeed.nextLong();
1135    }
1136
1137    private final int everyN;
1138
1139    protected final Random rand = new Random(nextRandomSeed());
1140    protected final Configuration conf;
1141    protected final TestOptions opts;
1142
1143    private final Status status;
1144
1145    private String testName;
1146    private Histogram latencyHistogram;
1147    private Histogram replicaLatencyHistogram;
1148    private Histogram valueSizeHistogram;
1149    private Histogram rpcCallsHistogram;
1150    private Histogram remoteRpcCallsHistogram;
1151    private Histogram millisBetweenNextHistogram;
1152    private Histogram regionsScannedHistogram;
1153    private Histogram bytesInResultsHistogram;
1154    private Histogram bytesInRemoteResultsHistogram;
1155    private RandomDistribution.Zipf zipf;
1156    private long numOfReplyOverLatencyThreshold = 0;
1157    private long numOfReplyFromReplica = 0;
1158
1159    /**
1160     * Note that all subclasses of this class must provide a public constructor that has the exact
1161     * same list of arguments.
1162     */
1163    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1164      this.conf = conf;
1165      this.opts = options;
1166      this.status = status;
1167      this.testName = this.getClass().getSimpleName();
1168      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1169      if (options.isValueZipf()) {
1170        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1171      }
1172      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1173    }
1174
1175    int getValueLength(final Random r) {
1176      if (this.opts.isValueRandom()) {
1177        return r.nextInt(opts.valueSize);
1178      } else if (this.opts.isValueZipf()) {
1179        return Math.abs(this.zipf.nextInt());
1180      } else {
1181        return opts.valueSize;
1182      }
1183    }
1184
1185    void updateValueSize(final Result[] rs) throws IOException {
1186      updateValueSize(rs, 0);
1187    }
1188
1189    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1190      if (rs == null || (latency == 0)) return;
1191      for (Result r : rs)
1192        updateValueSize(r, latency);
1193    }
1194
1195    void updateValueSize(final Result r) throws IOException {
1196      updateValueSize(r, 0);
1197    }
1198
1199    void updateValueSize(final Result r, final long latency) throws IOException {
1200      if (r == null || (latency == 0)) return;
1201      int size = 0;
1202      // update replicaHistogram
1203      if (r.isStale()) {
1204        replicaLatencyHistogram.update(latency / 1000);
1205        numOfReplyFromReplica++;
1206      }
1207      if (!isRandomValueSize()) return;
1208
1209      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1210        size += scanner.current().getValueLength();
1211      }
1212      updateValueSize(size);
1213    }
1214
1215    void updateValueSize(final int valueSize) {
1216      if (!isRandomValueSize()) return;
1217      this.valueSizeHistogram.update(valueSize);
1218    }
1219
1220    void updateScanMetrics(final ScanMetrics metrics) {
1221      if (metrics == null) return;
1222      Map<String, Long> metricsMap = metrics.getMetricsMap();
1223      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1224      if (rpcCalls != null) {
1225        this.rpcCallsHistogram.update(rpcCalls.longValue());
1226      }
1227      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1228      if (remoteRpcCalls != null) {
1229        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1230      }
1231      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1232      if (millisBetweenNext != null) {
1233        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1234      }
1235      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1236      if (regionsScanned != null) {
1237        this.regionsScannedHistogram.update(regionsScanned.longValue());
1238      }
1239      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1240      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1241        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1242      }
1243      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1244      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1245        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1246      }
1247    }
1248
1249    String generateStatus(final int sr, final int i, final int lr) {
1250      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1251        + getShortLatencyReport() + "]"
1252        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1253    }
1254
1255    boolean isRandomValueSize() {
1256      return opts.valueRandom;
1257    }
1258
1259    protected int getReportingPeriod() {
1260      return opts.period;
1261    }
1262
1263    /**
1264     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1265     */
1266    public Histogram getLatencyHistogram() {
1267      return latencyHistogram;
1268    }
1269
1270    void testSetup() throws IOException {
1271      // test metrics
1272      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1273      // If it is a replica test, set up histogram for replica.
1274      if (opts.replicas > 1) {
1275        replicaLatencyHistogram =
1276          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1277      }
1278      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1279      // scan metrics
1280      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1281      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1282      millisBetweenNextHistogram =
1283        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1284      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1285      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1286      bytesInRemoteResultsHistogram =
1287        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1288
1289      onStartup();
1290    }
1291
1292    abstract void onStartup() throws IOException;
1293
1294    void testTakedown() throws IOException {
1295      onTakedown();
1296      // Print all stats for this thread continuously.
1297      // Synchronize on Test.class so different threads don't intermingle the
1298      // output. We can't use 'this' here because each thread has its own instance of Test class.
1299      synchronized (Test.class) {
1300        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1301        status
1302          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1303        if (opts.replicas > 1) {
1304          status.setStatus("Latency (us) from Replica Regions: "
1305            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1306        }
1307        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1308        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1309        if (valueSizeHistogram.getCount() > 0) {
1310          status.setStatus(
1311            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1312          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1313          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1314        } else {
1315          status.setStatus("No valueSize statistics available");
1316        }
1317        if (rpcCallsHistogram.getCount() > 0) {
1318          status.setStatus(
1319            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1320        }
1321        if (remoteRpcCallsHistogram.getCount() > 0) {
1322          status.setStatus("remoteRpcCalls (count): "
1323            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1324        }
1325        if (millisBetweenNextHistogram.getCount() > 0) {
1326          status.setStatus("millisBetweenNext (latency): "
1327            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1328        }
1329        if (regionsScannedHistogram.getCount() > 0) {
1330          status.setStatus("regionsScanned (count): "
1331            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1332        }
1333        if (bytesInResultsHistogram.getCount() > 0) {
1334          status.setStatus("bytesInResults (size): "
1335            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1336        }
1337        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1338          status.setStatus("bytesInRemoteResults (size): "
1339            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1340        }
1341      }
1342    }
1343
1344    abstract void onTakedown() throws IOException;
1345
1346    /*
1347     * Run test
1348     * @return Elapsed time. n
1349     */
1350    long test() throws IOException, InterruptedException {
1351      testSetup();
1352      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1353      final long startTime = System.nanoTime();
1354      try {
1355        testTimed();
1356      } finally {
1357        testTakedown();
1358      }
1359      return (System.nanoTime() - startTime) / 1000000;
1360    }
1361
1362    int getStartRow() {
1363      return opts.startRow;
1364    }
1365
1366    int getLastRow() {
1367      return getStartRow() + opts.perClientRunRows;
1368    }
1369
1370    /**
1371     * Provides an extension point for tests that don't want a per row invocation.
1372     */
1373    void testTimed() throws IOException, InterruptedException {
1374      int startRow = getStartRow();
1375      int lastRow = getLastRow();
1376      // Report on completion of 1/10th of total.
1377      for (int ii = 0; ii < opts.cycles; ii++) {
1378        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1379        for (int i = startRow; i < lastRow; i++) {
1380          if (i % everyN != 0) continue;
1381          long startTime = System.nanoTime();
1382          boolean requestSent = false;
1383          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1384          try (Scope scope = span.makeCurrent()) {
1385            requestSent = testRow(i, startTime);
1386          } finally {
1387            span.end();
1388          }
1389          if ((i - startRow) > opts.measureAfter) {
1390            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1391            // first 9 times and sends the actual get request in the 10th iteration.
1392            // We should only set latency when actual request is sent because otherwise
1393            // it turns out to be 0.
1394            if (requestSent) {
1395              long latency = (System.nanoTime() - startTime) / 1000;
1396              latencyHistogram.update(latency);
1397              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1398                numOfReplyOverLatencyThreshold++;
1399              }
1400            }
1401            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1402              status.setStatus(generateStatus(startRow, i, lastRow));
1403            }
1404          }
1405        }
1406      }
1407    }
1408
1409    /** Returns Subset of the histograms' calculation. */
1410    public String getShortLatencyReport() {
1411      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1412    }
1413
1414    /** Returns Subset of the histograms' calculation. */
1415    public String getShortValueSizeReport() {
1416      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1417    }
1418
1419    /**
1420     * Test for individual row.
1421     * @param i Row index.
1422     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1423     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1424     */
1425    abstract boolean testRow(final int i, final long startTime)
1426      throws IOException, InterruptedException;
1427  }
1428
1429  static abstract class Test extends TestBase {
1430    protected Connection connection;
1431
1432    Test(final Connection con, final TestOptions options, final Status status) {
1433      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1434      this.connection = con;
1435    }
1436  }
1437
1438  static abstract class AsyncTest extends TestBase {
1439    protected AsyncConnection connection;
1440
1441    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1442      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1443      this.connection = con;
1444    }
1445  }
1446
1447  static abstract class TableTest extends Test {
1448    protected Table table;
1449
1450    TableTest(Connection con, TestOptions options, Status status) {
1451      super(con, options, status);
1452    }
1453
1454    @Override
1455    void onStartup() throws IOException {
1456      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1457    }
1458
1459    @Override
1460    void onTakedown() throws IOException {
1461      table.close();
1462    }
1463  }
1464
1465  /*
1466   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1467   */
1468  static abstract class MetaTest extends TableTest {
1469    protected int keyLength;
1470
1471    MetaTest(Connection con, TestOptions options, Status status) {
1472      super(con, options, status);
1473      keyLength = Integer.toString(opts.perClientRunRows).length();
1474    }
1475
1476    @Override
1477    void onTakedown() throws IOException {
1478      // No clean up
1479    }
1480
1481    /*
1482     * Generates Lexicographically ascending strings
1483     */
1484    protected byte[] getSplitKey(final int i) {
1485      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1486    }
1487
1488  }
1489
1490  static abstract class AsyncTableTest extends AsyncTest {
1491    protected AsyncTable<?> table;
1492
1493    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1494      super(con, options, status);
1495    }
1496
1497    @Override
1498    void onStartup() throws IOException {
1499      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1500    }
1501
1502    @Override
1503    void onTakedown() throws IOException {
1504    }
1505  }
1506
1507  static class AsyncRandomReadTest extends AsyncTableTest {
1508    private final Consistency consistency;
1509    private ArrayList<Get> gets;
1510
1511    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1512      super(con, options, status);
1513      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1514      if (opts.multiGet > 0) {
1515        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1516        this.gets = new ArrayList<>(opts.multiGet);
1517      }
1518    }
1519
1520    @Override
1521    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1522      if (opts.randomSleep > 0) {
1523        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1524      }
1525      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1526      for (int family = 0; family < opts.families; family++) {
1527        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1528        if (opts.addColumns) {
1529          for (int column = 0; column < opts.columns; column++) {
1530            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1531            get.addColumn(familyName, qualifier);
1532          }
1533        } else {
1534          get.addFamily(familyName);
1535        }
1536      }
1537      if (opts.filterAll) {
1538        get.setFilter(new FilterAllFilter());
1539      }
1540      get.setConsistency(consistency);
1541      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1542      try {
1543        if (opts.multiGet > 0) {
1544          this.gets.add(get);
1545          if (this.gets.size() == opts.multiGet) {
1546            Result[] rs =
1547              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1548            updateValueSize(rs);
1549            this.gets.clear();
1550          } else {
1551            return false;
1552          }
1553        } else {
1554          updateValueSize(this.table.get(get).get());
1555        }
1556      } catch (ExecutionException e) {
1557        throw new IOException(e);
1558      }
1559      return true;
1560    }
1561
1562    public static RuntimeException runtime(Throwable e) {
1563      if (e instanceof RuntimeException) {
1564        return (RuntimeException) e;
1565      }
1566      return new RuntimeException(e);
1567    }
1568
1569    public static <V> V propagate(Callable<V> callable) {
1570      try {
1571        return callable.call();
1572      } catch (Exception e) {
1573        throw runtime(e);
1574      }
1575    }
1576
1577    @Override
1578    protected int getReportingPeriod() {
1579      int period = opts.perClientRunRows / 10;
1580      return period == 0 ? opts.perClientRunRows : period;
1581    }
1582
1583    @Override
1584    protected void testTakedown() throws IOException {
1585      if (this.gets != null && this.gets.size() > 0) {
1586        this.table.get(gets);
1587        this.gets.clear();
1588      }
1589      super.testTakedown();
1590    }
1591  }
1592
1593  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1594
1595    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1596      super(con, options, status);
1597    }
1598
1599    @Override
1600    protected byte[] generateRow(final int i) {
1601      return getRandomRow(this.rand, opts.totalRows);
1602    }
1603  }
1604
1605  static class AsyncScanTest extends AsyncTableTest {
1606    private ResultScanner testScanner;
1607    private AsyncTable<?> asyncTable;
1608
1609    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1610      super(con, options, status);
1611    }
1612
1613    @Override
1614    void onStartup() throws IOException {
1615      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1616        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1617    }
1618
1619    @Override
1620    void testTakedown() throws IOException {
1621      if (this.testScanner != null) {
1622        updateScanMetrics(this.testScanner.getScanMetrics());
1623        this.testScanner.close();
1624      }
1625      super.testTakedown();
1626    }
1627
1628    @Override
1629    boolean testRow(final int i, final long startTime) throws IOException {
1630      if (this.testScanner == null) {
1631        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1632          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1633          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1634        for (int family = 0; family < opts.families; family++) {
1635          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1636          if (opts.addColumns) {
1637            for (int column = 0; column < opts.columns; column++) {
1638              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1639              scan.addColumn(familyName, qualifier);
1640            }
1641          } else {
1642            scan.addFamily(familyName);
1643          }
1644        }
1645        if (opts.filterAll) {
1646          scan.setFilter(new FilterAllFilter());
1647        }
1648        this.testScanner = asyncTable.getScanner(scan);
1649      }
1650      Result r = testScanner.next();
1651      updateValueSize(r);
1652      return true;
1653    }
1654  }
1655
1656  static class AsyncSequentialReadTest extends AsyncTableTest {
1657    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1658      super(con, options, status);
1659    }
1660
1661    @Override
1662    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1663      Get get = new Get(format(i));
1664      for (int family = 0; family < opts.families; family++) {
1665        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1666        if (opts.addColumns) {
1667          for (int column = 0; column < opts.columns; column++) {
1668            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1669            get.addColumn(familyName, qualifier);
1670          }
1671        } else {
1672          get.addFamily(familyName);
1673        }
1674      }
1675      if (opts.filterAll) {
1676        get.setFilter(new FilterAllFilter());
1677      }
1678      try {
1679        updateValueSize(table.get(get).get());
1680      } catch (ExecutionException e) {
1681        throw new IOException(e);
1682      }
1683      return true;
1684    }
1685  }
1686
1687  static class AsyncSequentialWriteTest extends AsyncTableTest {
1688    private ArrayList<Put> puts;
1689
1690    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1691      super(con, options, status);
1692      if (opts.multiPut > 0) {
1693        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1694        this.puts = new ArrayList<>(opts.multiPut);
1695      }
1696    }
1697
1698    protected byte[] generateRow(final int i) {
1699      return format(i);
1700    }
1701
1702    @Override
1703    @SuppressWarnings("ReturnValueIgnored")
1704    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1705      byte[] row = generateRow(i);
1706      Put put = new Put(row);
1707      for (int family = 0; family < opts.families; family++) {
1708        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1709        for (int column = 0; column < opts.columns; column++) {
1710          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1711          byte[] value = generateData(this.rand, getValueLength(this.rand));
1712          if (opts.useTags) {
1713            byte[] tag = generateData(this.rand, TAG_LENGTH);
1714            Tag[] tags = new Tag[opts.noOfTags];
1715            for (int n = 0; n < opts.noOfTags; n++) {
1716              Tag t = new ArrayBackedTag((byte) n, tag);
1717              tags[n] = t;
1718            }
1719            KeyValue kv =
1720              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1721            put.add(kv);
1722            updateValueSize(kv.getValueLength());
1723          } else {
1724            put.addColumn(familyName, qualifier, value);
1725            updateValueSize(value.length);
1726          }
1727        }
1728      }
1729      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1730      try {
1731        table.put(put).get();
1732        if (opts.multiPut > 0) {
1733          this.puts.add(put);
1734          if (this.puts.size() == opts.multiPut) {
1735            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1736            this.puts.clear();
1737          } else {
1738            return false;
1739          }
1740        } else {
1741          table.put(put).get();
1742        }
1743      } catch (ExecutionException e) {
1744        throw new IOException(e);
1745      }
1746      return true;
1747    }
1748  }
1749
1750  static abstract class BufferedMutatorTest extends Test {
1751    protected BufferedMutator mutator;
1752    protected Table table;
1753
1754    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1755      super(con, options, status);
1756    }
1757
1758    @Override
1759    void onStartup() throws IOException {
1760      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1761      p.writeBufferSize(opts.bufferSize);
1762      this.mutator = connection.getBufferedMutator(p);
1763      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1764    }
1765
1766    @Override
1767    void onTakedown() throws IOException {
1768      mutator.close();
1769      table.close();
1770    }
1771  }
1772
1773  static class RandomSeekScanTest extends TableTest {
1774    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1775      super(con, options, status);
1776    }
1777
1778    @Override
1779    boolean testRow(final int i, final long startTime) throws IOException {
1780      Scan scan =
1781        new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching)
1782          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1783          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1784      FilterList list = new FilterList();
1785      for (int family = 0; family < opts.families; family++) {
1786        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1787        if (opts.addColumns) {
1788          for (int column = 0; column < opts.columns; column++) {
1789            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1790            scan.addColumn(familyName, qualifier);
1791          }
1792        } else {
1793          scan.addFamily(familyName);
1794        }
1795      }
1796      if (opts.filterAll) {
1797        list.addFilter(new FilterAllFilter());
1798      }
1799      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1800      scan.setFilter(list);
1801      ResultScanner s = this.table.getScanner(scan);
1802      try {
1803        for (Result rr; (rr = s.next()) != null;) {
1804          updateValueSize(rr);
1805        }
1806      } finally {
1807        updateScanMetrics(s.getScanMetrics());
1808        s.close();
1809      }
1810      return true;
1811    }
1812
1813    @Override
1814    protected int getReportingPeriod() {
1815      int period = opts.perClientRunRows / 100;
1816      return period == 0 ? opts.perClientRunRows : period;
1817    }
1818
1819  }
1820
1821  static abstract class RandomScanWithRangeTest extends TableTest {
1822    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1823      super(con, options, status);
1824    }
1825
1826    @Override
1827    boolean testRow(final int i, final long startTime) throws IOException {
1828      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1829      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1830        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1831        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1832        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1833      for (int family = 0; family < opts.families; family++) {
1834        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1835        if (opts.addColumns) {
1836          for (int column = 0; column < opts.columns; column++) {
1837            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1838            scan.addColumn(familyName, qualifier);
1839          }
1840        } else {
1841          scan.addFamily(familyName);
1842        }
1843      }
1844      if (opts.filterAll) {
1845        scan.setFilter(new FilterAllFilter());
1846      }
1847      Result r = null;
1848      int count = 0;
1849      ResultScanner s = this.table.getScanner(scan);
1850      try {
1851        for (; (r = s.next()) != null;) {
1852          updateValueSize(r);
1853          count++;
1854        }
1855        if (i % 100 == 0) {
1856          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1857            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1858            count));
1859        }
1860      } finally {
1861        updateScanMetrics(s.getScanMetrics());
1862        s.close();
1863      }
1864      return true;
1865    }
1866
1867    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1868
1869    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1870      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1871      int stop = start + maxRange;
1872      return new Pair<>(format(start), format(stop));
1873    }
1874
1875    @Override
1876    protected int getReportingPeriod() {
1877      int period = opts.perClientRunRows / 100;
1878      return period == 0 ? opts.perClientRunRows : period;
1879    }
1880  }
1881
1882  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1883    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1884      super(con, options, status);
1885    }
1886
1887    @Override
1888    protected Pair<byte[], byte[]> getStartAndStopRow() {
1889      return generateStartAndStopRows(10);
1890    }
1891  }
1892
1893  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1894    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1895      super(con, options, status);
1896    }
1897
1898    @Override
1899    protected Pair<byte[], byte[]> getStartAndStopRow() {
1900      return generateStartAndStopRows(100);
1901    }
1902  }
1903
1904  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1905    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1906      super(con, options, status);
1907    }
1908
1909    @Override
1910    protected Pair<byte[], byte[]> getStartAndStopRow() {
1911      return generateStartAndStopRows(1000);
1912    }
1913  }
1914
1915  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1916    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1917      super(con, options, status);
1918    }
1919
1920    @Override
1921    protected Pair<byte[], byte[]> getStartAndStopRow() {
1922      return generateStartAndStopRows(10000);
1923    }
1924  }
1925
1926  static class RandomReadTest extends TableTest {
1927    private final Consistency consistency;
1928    private ArrayList<Get> gets;
1929
1930    RandomReadTest(Connection con, TestOptions options, Status status) {
1931      super(con, options, status);
1932      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1933      if (opts.multiGet > 0) {
1934        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1935        this.gets = new ArrayList<>(opts.multiGet);
1936      }
1937    }
1938
1939    @Override
1940    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1941      if (opts.randomSleep > 0) {
1942        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1943      }
1944      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1945      for (int family = 0; family < opts.families; family++) {
1946        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1947        if (opts.addColumns) {
1948          for (int column = 0; column < opts.columns; column++) {
1949            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1950            get.addColumn(familyName, qualifier);
1951          }
1952        } else {
1953          get.addFamily(familyName);
1954        }
1955      }
1956      if (opts.filterAll) {
1957        get.setFilter(new FilterAllFilter());
1958      }
1959      get.setConsistency(consistency);
1960      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1961      if (opts.multiGet > 0) {
1962        this.gets.add(get);
1963        if (this.gets.size() == opts.multiGet) {
1964          Result[] rs = this.table.get(this.gets);
1965          if (opts.replicas > 1) {
1966            long latency = System.nanoTime() - startTime;
1967            updateValueSize(rs, latency);
1968          } else {
1969            updateValueSize(rs);
1970          }
1971          this.gets.clear();
1972        } else {
1973          return false;
1974        }
1975      } else {
1976        if (opts.replicas > 1) {
1977          Result r = this.table.get(get);
1978          long latency = System.nanoTime() - startTime;
1979          updateValueSize(r, latency);
1980        } else {
1981          updateValueSize(this.table.get(get));
1982        }
1983      }
1984      return true;
1985    }
1986
1987    @Override
1988    protected int getReportingPeriod() {
1989      int period = opts.perClientRunRows / 10;
1990      return period == 0 ? opts.perClientRunRows : period;
1991    }
1992
1993    @Override
1994    protected void testTakedown() throws IOException {
1995      if (this.gets != null && this.gets.size() > 0) {
1996        this.table.get(gets);
1997        this.gets.clear();
1998      }
1999      super.testTakedown();
2000    }
2001  }
2002
2003  /*
2004   * Send random reads against fake regions inserted by MetaWriteTest
2005   */
2006  static class MetaRandomReadTest extends MetaTest {
2007    private Random rd = new Random();
2008    private RegionLocator regionLocator;
2009
2010    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2011      super(con, options, status);
2012      LOG.info("call getRegionLocation");
2013    }
2014
2015    @Override
2016    void onStartup() throws IOException {
2017      super.onStartup();
2018      this.regionLocator = connection.getRegionLocator(table.getName());
2019    }
2020
2021    @Override
2022    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2023      if (opts.randomSleep > 0) {
2024        Thread.sleep(rd.nextInt(opts.randomSleep));
2025      }
2026      HRegionLocation hRegionLocation =
2027        regionLocator.getRegionLocation(getSplitKey(rd.nextInt(opts.perClientRunRows)), true);
2028      LOG.debug("get location for region: " + hRegionLocation);
2029      return true;
2030    }
2031
2032    @Override
2033    protected int getReportingPeriod() {
2034      int period = opts.perClientRunRows / 10;
2035      return period == 0 ? opts.perClientRunRows : period;
2036    }
2037
2038    @Override
2039    protected void testTakedown() throws IOException {
2040      super.testTakedown();
2041    }
2042  }
2043
2044  static class RandomWriteTest extends SequentialWriteTest {
2045    RandomWriteTest(Connection con, TestOptions options, Status status) {
2046      super(con, options, status);
2047    }
2048
2049    @Override
2050    protected byte[] generateRow(final int i) {
2051      return getRandomRow(this.rand, opts.totalRows);
2052    }
2053
2054  }
2055
2056  static class ScanTest extends TableTest {
2057    private ResultScanner testScanner;
2058
2059    ScanTest(Connection con, TestOptions options, Status status) {
2060      super(con, options, status);
2061    }
2062
2063    @Override
2064    void testTakedown() throws IOException {
2065      if (this.testScanner != null) {
2066        this.testScanner.close();
2067      }
2068      super.testTakedown();
2069    }
2070
2071    @Override
2072    boolean testRow(final int i, final long startTime) throws IOException {
2073      if (this.testScanner == null) {
2074        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2075          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2076          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2077        for (int family = 0; family < opts.families; family++) {
2078          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2079          if (opts.addColumns) {
2080            for (int column = 0; column < opts.columns; column++) {
2081              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2082              scan.addColumn(familyName, qualifier);
2083            }
2084          } else {
2085            scan.addFamily(familyName);
2086          }
2087        }
2088        if (opts.filterAll) {
2089          scan.setFilter(new FilterAllFilter());
2090        }
2091        this.testScanner = table.getScanner(scan);
2092      }
2093      Result r = testScanner.next();
2094      updateValueSize(r);
2095      return true;
2096    }
2097  }
2098
2099  /**
2100   * Base class for operations that are CAS-like; that read a value and then set it based off what
2101   * they read. In this category is increment, append, checkAndPut, etc.
2102   * <p>
2103   * These operations also want some concurrency going on. Usually when these tests run, they
2104   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2105   * same key space. We do this with our getStartRow and getLastRow overrides.
2106   */
2107  static abstract class CASTableTest extends TableTest {
2108    private final byte[] qualifier;
2109
2110    CASTableTest(Connection con, TestOptions options, Status status) {
2111      super(con, options, status);
2112      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2113    }
2114
2115    byte[] getQualifier() {
2116      return this.qualifier;
2117    }
2118
2119    @Override
2120    int getStartRow() {
2121      return 0;
2122    }
2123
2124    @Override
2125    int getLastRow() {
2126      return opts.perClientRunRows;
2127    }
2128  }
2129
2130  static class IncrementTest extends CASTableTest {
2131    IncrementTest(Connection con, TestOptions options, Status status) {
2132      super(con, options, status);
2133    }
2134
2135    @Override
2136    boolean testRow(final int i, final long startTime) throws IOException {
2137      Increment increment = new Increment(format(i));
2138      // unlike checkAndXXX tests, which make most sense to do on a single value,
2139      // if multiple families are specified for an increment test we assume it is
2140      // meant to raise the work factor
2141      for (int family = 0; family < opts.families; family++) {
2142        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2143        increment.addColumn(familyName, getQualifier(), 1l);
2144      }
2145      updateValueSize(this.table.increment(increment));
2146      return true;
2147    }
2148  }
2149
2150  static class AppendTest extends CASTableTest {
2151    AppendTest(Connection con, TestOptions options, Status status) {
2152      super(con, options, status);
2153    }
2154
2155    @Override
2156    boolean testRow(final int i, final long startTime) throws IOException {
2157      byte[] bytes = format(i);
2158      Append append = new Append(bytes);
2159      // unlike checkAndXXX tests, which make most sense to do on a single value,
2160      // if multiple families are specified for an append test we assume it is
2161      // meant to raise the work factor
2162      for (int family = 0; family < opts.families; family++) {
2163        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2164        append.addColumn(familyName, getQualifier(), bytes);
2165      }
2166      updateValueSize(this.table.append(append));
2167      return true;
2168    }
2169  }
2170
2171  static class CheckAndMutateTest extends CASTableTest {
2172    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2173      super(con, options, status);
2174    }
2175
2176    @Override
2177    boolean testRow(final int i, final long startTime) throws IOException {
2178      final byte[] bytes = format(i);
2179      // checkAndXXX tests operate on only a single value
2180      // Put a known value so when we go to check it, it is there.
2181      Put put = new Put(bytes);
2182      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2183      this.table.put(put);
2184      RowMutations mutations = new RowMutations(bytes);
2185      mutations.add(put);
2186      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2187        .thenMutate(mutations);
2188      return true;
2189    }
2190  }
2191
2192  static class CheckAndPutTest extends CASTableTest {
2193    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2194      super(con, options, status);
2195    }
2196
2197    @Override
2198    boolean testRow(final int i, final long startTime) throws IOException {
2199      final byte[] bytes = format(i);
2200      // checkAndXXX tests operate on only a single value
2201      // Put a known value so when we go to check it, it is there.
2202      Put put = new Put(bytes);
2203      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2204      this.table.put(put);
2205      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2206        .thenPut(put);
2207      return true;
2208    }
2209  }
2210
2211  static class CheckAndDeleteTest extends CASTableTest {
2212    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2213      super(con, options, status);
2214    }
2215
2216    @Override
2217    boolean testRow(final int i, final long startTime) throws IOException {
2218      final byte[] bytes = format(i);
2219      // checkAndXXX tests operate on only a single value
2220      // Put a known value so when we go to check it, it is there.
2221      Put put = new Put(bytes);
2222      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2223      this.table.put(put);
2224      Delete delete = new Delete(put.getRow());
2225      delete.addColumn(FAMILY_ZERO, getQualifier());
2226      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2227        .thenDelete(delete);
2228      return true;
2229    }
2230  }
2231
2232  /*
2233   * Delete all fake regions inserted to meta table by MetaWriteTest.
2234   */
2235  static class CleanMetaTest extends MetaTest {
2236    CleanMetaTest(Connection con, TestOptions options, Status status) {
2237      super(con, options, status);
2238    }
2239
2240    @Override
2241    boolean testRow(final int i, final long startTime) throws IOException {
2242      try {
2243        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2244          .getRegionLocation(getSplitKey(i), false).getRegion();
2245        LOG.debug("deleting region from meta: " + regionInfo);
2246
2247        Delete delete =
2248          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2249        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2250          t.delete(delete);
2251        }
2252      } catch (IOException ie) {
2253        // Log and continue
2254        LOG.error("cannot find region with start key: " + i);
2255      }
2256      return true;
2257    }
2258  }
2259
2260  static class SequentialReadTest extends TableTest {
2261    SequentialReadTest(Connection con, TestOptions options, Status status) {
2262      super(con, options, status);
2263    }
2264
2265    @Override
2266    boolean testRow(final int i, final long startTime) throws IOException {
2267      Get get = new Get(format(i));
2268      for (int family = 0; family < opts.families; family++) {
2269        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2270        if (opts.addColumns) {
2271          for (int column = 0; column < opts.columns; column++) {
2272            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2273            get.addColumn(familyName, qualifier);
2274          }
2275        } else {
2276          get.addFamily(familyName);
2277        }
2278      }
2279      if (opts.filterAll) {
2280        get.setFilter(new FilterAllFilter());
2281      }
2282      updateValueSize(table.get(get));
2283      return true;
2284    }
2285  }
2286
2287  static class SequentialWriteTest extends BufferedMutatorTest {
2288    private ArrayList<Put> puts;
2289
2290    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2291      super(con, options, status);
2292      if (opts.multiPut > 0) {
2293        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2294        this.puts = new ArrayList<>(opts.multiPut);
2295      }
2296    }
2297
2298    protected byte[] generateRow(final int i) {
2299      return format(i);
2300    }
2301
2302    @Override
2303    boolean testRow(final int i, final long startTime) throws IOException {
2304      byte[] row = generateRow(i);
2305      Put put = new Put(row);
2306      for (int family = 0; family < opts.families; family++) {
2307        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2308        for (int column = 0; column < opts.columns; column++) {
2309          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2310          byte[] value = generateData(this.rand, getValueLength(this.rand));
2311          if (opts.useTags) {
2312            byte[] tag = generateData(this.rand, TAG_LENGTH);
2313            Tag[] tags = new Tag[opts.noOfTags];
2314            for (int n = 0; n < opts.noOfTags; n++) {
2315              Tag t = new ArrayBackedTag((byte) n, tag);
2316              tags[n] = t;
2317            }
2318            KeyValue kv =
2319              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2320            put.add(kv);
2321            updateValueSize(kv.getValueLength());
2322          } else {
2323            put.addColumn(familyName, qualifier, value);
2324            updateValueSize(value.length);
2325          }
2326        }
2327      }
2328      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2329      if (opts.autoFlush) {
2330        if (opts.multiPut > 0) {
2331          this.puts.add(put);
2332          if (this.puts.size() == opts.multiPut) {
2333            table.put(this.puts);
2334            this.puts.clear();
2335          } else {
2336            return false;
2337          }
2338        } else {
2339          table.put(put);
2340        }
2341      } else {
2342        mutator.mutate(put);
2343      }
2344      return true;
2345    }
2346  }
2347
2348  /*
2349   * Insert fake regions into meta table with contiguous split keys.
2350   */
2351  static class MetaWriteTest extends MetaTest {
2352
2353    MetaWriteTest(Connection con, TestOptions options, Status status) {
2354      super(con, options, status);
2355    }
2356
2357    @Override
2358    boolean testRow(final int i, final long startTime) throws IOException {
2359      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2360      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2361        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2362      regionInfos.add(regionInfo);
2363      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2364
2365      // write the serverName columns
2366      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2367        ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2368        EnvironmentEdgeManager.currentTime());
2369      return true;
2370    }
2371  }
2372
2373  static class FilteredScanTest extends TableTest {
2374    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2375
2376    FilteredScanTest(Connection con, TestOptions options, Status status) {
2377      super(con, options, status);
2378      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2379        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2380          + ". This could take a very long time.");
2381      }
2382    }
2383
2384    @Override
2385    boolean testRow(int i, final long startTime) throws IOException {
2386      byte[] value = generateData(this.rand, getValueLength(this.rand));
2387      Scan scan = constructScan(value);
2388      ResultScanner scanner = null;
2389      try {
2390        scanner = this.table.getScanner(scan);
2391        for (Result r = null; (r = scanner.next()) != null;) {
2392          updateValueSize(r);
2393        }
2394      } finally {
2395        if (scanner != null) {
2396          updateScanMetrics(scanner.getScanMetrics());
2397          scanner.close();
2398        }
2399      }
2400      return true;
2401    }
2402
2403    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2404      FilterList list = new FilterList();
2405      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2406        new BinaryComparator(valuePrefix));
2407      list.addFilter(filter);
2408      if (opts.filterAll) {
2409        list.addFilter(new FilterAllFilter());
2410      }
2411      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2412        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2413        .setScanMetricsEnabled(true);
2414      if (opts.addColumns) {
2415        for (int column = 0; column < opts.columns; column++) {
2416          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2417          scan.addColumn(FAMILY_ZERO, qualifier);
2418        }
2419      } else {
2420        scan.addFamily(FAMILY_ZERO);
2421      }
2422      scan.setFilter(list);
2423      return scan;
2424    }
2425  }
2426
2427  /**
2428   * Compute a throughput rate in MB/s.
2429   * @param rows   Number of records consumed.
2430   * @param timeMs Time taken in milliseconds.
2431   * @return String value with label, ie '123.76 MB/s'
2432   */
2433  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families,
2434    int columns) {
2435    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2436      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2437    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2438      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2439    return FMT.format(mbps) + " MB/s";
2440  }
2441
2442  /*
2443   * Format passed integer. n * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version
2444   * of passed number (Does absolute in case number is negative).
2445   */
2446  public static byte[] format(final int number) {
2447    byte[] b = new byte[ROW_LENGTH];
2448    int d = Math.abs(number);
2449    for (int i = b.length - 1; i >= 0; i--) {
2450      b[i] = (byte) ((d % 10) + '0');
2451      d /= 10;
2452    }
2453    return b;
2454  }
2455
2456  /*
2457   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2458   * test, generation of the key and value consumes about 30% of CPU time.
2459   * @return Generated random value to insert into a table cell.
2460   */
2461  public static byte[] generateData(final Random r, int length) {
2462    byte[] b = new byte[length];
2463    int i;
2464
2465    for (i = 0; i < (length - 8); i += 8) {
2466      b[i] = (byte) (65 + r.nextInt(26));
2467      b[i + 1] = b[i];
2468      b[i + 2] = b[i];
2469      b[i + 3] = b[i];
2470      b[i + 4] = b[i];
2471      b[i + 5] = b[i];
2472      b[i + 6] = b[i];
2473      b[i + 7] = b[i];
2474    }
2475
2476    byte a = (byte) (65 + r.nextInt(26));
2477    for (; i < length; i++) {
2478      b[i] = a;
2479    }
2480    return b;
2481  }
2482
2483  static byte[] getRandomRow(final Random random, final int totalRows) {
2484    return format(generateRandomRow(random, totalRows));
2485  }
2486
2487  static int generateRandomRow(final Random random, final int totalRows) {
2488    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2489  }
2490
2491  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2492    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2493    throws IOException, InterruptedException {
2494    status.setStatus(
2495      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2496    long totalElapsedTime;
2497
2498    final TestBase t;
2499    try {
2500      if (AsyncTest.class.isAssignableFrom(cmd)) {
2501        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2502        Constructor<? extends AsyncTest> constructor =
2503          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2504        t = constructor.newInstance(asyncCon, opts, status);
2505      } else {
2506        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2507        Constructor<? extends Test> constructor =
2508          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2509        t = constructor.newInstance(con, opts, status);
2510      }
2511    } catch (NoSuchMethodException e) {
2512      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2513        + ".  It does not provide a constructor as described by "
2514        + "the javadoc comment.  Available constructors are: "
2515        + Arrays.toString(cmd.getConstructors()));
2516    } catch (Exception e) {
2517      throw new IllegalStateException("Failed to construct command class", e);
2518    }
2519    totalElapsedTime = t.test();
2520
2521    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2522      + " for " + opts.perClientRunRows + " rows" + " ("
2523      + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2524        getAverageValueLength(opts), opts.families, opts.columns)
2525      + ")");
2526
2527    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2528      t.numOfReplyFromReplica, t.getLatencyHistogram());
2529  }
2530
2531  private static int getAverageValueLength(final TestOptions opts) {
2532    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2533  }
2534
2535  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2536    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2537    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2538    // the TestOptions introspection for us and dump the output in a readable format.
2539    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2540    Admin admin = null;
2541    Connection connection = null;
2542    try {
2543      connection = ConnectionFactory.createConnection(getConf());
2544      admin = connection.getAdmin();
2545      checkTable(admin, opts);
2546    } finally {
2547      if (admin != null) admin.close();
2548      if (connection != null) connection.close();
2549    }
2550    if (opts.nomapred) {
2551      doLocalClients(opts, getConf());
2552    } else {
2553      doMapReduce(opts, getConf());
2554    }
2555  }
2556
2557  protected void printUsage() {
2558    printUsage(PE_COMMAND_SHORTNAME, null);
2559  }
2560
2561  protected static void printUsage(final String message) {
2562    printUsage(PE_COMMAND_SHORTNAME, message);
2563  }
2564
2565  protected static void printUsageAndExit(final String message, final int exitCode) {
2566    printUsage(message);
2567    System.exit(exitCode);
2568  }
2569
2570  protected static void printUsage(final String shortName, final String message) {
2571    if (message != null && message.length() > 0) {
2572      System.err.println(message);
2573    }
2574    System.err.print("Usage: hbase " + shortName);
2575    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2576    System.err.println();
2577    System.err.println("General Options:");
2578    System.err.println(
2579      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2580    System.err
2581      .println(" oneCon          all the threads share the same connection. Default: False");
2582    System.err.println(" connCount          connections all threads share. "
2583      + "For example, if set to 2, then all thread share 2 connection. "
2584      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2585      + "if not, connCount=thread number");
2586
2587    System.err.println(" sampleRate      Execute test on a sample of total "
2588      + "rows. Only supported by randomRead. Default: 1.0");
2589    System.err.println(" period          Report every 'period' rows: "
2590      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2591    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2592    System.err.println(
2593      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2594    System.err.println(" latency         Set to report operation latencies. Default: False");
2595    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2596      + "over lantencyThreshold, unit in millisecond, default 0");
2597    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2598      + " rows have been treated. Default: 0");
2599    System.err
2600      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2601    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2602      + "'valueSize'; set on read for stats on size: Default: Not set.");
2603    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2604      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2605    System.err.println();
2606    System.err.println("Table Creation / Write Tests:");
2607    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2608    System.err.println(
2609      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2610        + ".  In case of randomReads and randomSeekScans this could"
2611        + " be specified along with --size to specify the number of rows to be scanned within"
2612        + " the total range specified by the size.");
2613    System.err.println(
2614      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2615        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2616        + " use size to specify the end range and --rows"
2617        + " specifies the number of rows within that range. " + "Default: 1.0.");
2618    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2619    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2620    System.err.println(
2621      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2622    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2623      + "'valueSize' in zipf form: Default: Not set.");
2624    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2625    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2626    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2627      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2628    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2629      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2630      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2631    System.err.println(
2632      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2633    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2634      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2635    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2636    System.err.println(" columns         Columns to write per row. Default: 1");
2637    System.err
2638      .println(" families        Specify number of column families for the table. Default: 1");
2639    System.err.println();
2640    System.err.println("Read Tests:");
2641    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2642      + " there by not returning any thing back to the client.  Helps to check the server side"
2643      + " performance.  Uses FilterAllFilter internally. ");
2644    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2645      + "by randomRead. Default: disabled");
2646    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2647      + "inmemory as far as possible. Not guaranteed that reads are always served "
2648      + "from memory.  Default: false");
2649    System.err
2650      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2651    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2652    System.err
2653      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2654        + "Uses the CompactingMemstore");
2655    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2656    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2657    System.err.println(
2658      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2659    System.err.println(" caching         Scan caching to use. Default: 30");
2660    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2661    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2662    System.err.println(
2663      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2664    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2665    System.err.println();
2666    System.err.println(" Note: -D properties will be applied to the conf used. ");
2667    System.err.println("  For example: ");
2668    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2669    System.err.println("   -Dmapreduce.task.timeout=60000");
2670    System.err.println();
2671    System.err.println("Command:");
2672    for (CmdDescriptor command : COMMANDS.values()) {
2673      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2674    }
2675    System.err.println();
2676    System.err.println("Args:");
2677    System.err.println(" nclients        Integer. Required. Total number of clients "
2678      + "(and HRegionServers) running. 1 <= value <= 500");
2679    System.err.println("Examples:");
2680    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2681    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2682    System.err.println(" To run 10 clients doing increments over ten rows:");
2683    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2684  }
2685
2686  /**
2687   * Parse options passed in via an arguments array. Assumes that array has been split on
2688   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2689   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2690   * arguments.
2691   */
2692  static TestOptions parseOpts(Queue<String> args) {
2693    TestOptions opts = new TestOptions();
2694
2695    String cmd = null;
2696    while ((cmd = args.poll()) != null) {
2697      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2698        // place item back onto queue so that caller knows parsing was incomplete
2699        args.add(cmd);
2700        break;
2701      }
2702
2703      final String nmr = "--nomapred";
2704      if (cmd.startsWith(nmr)) {
2705        opts.nomapred = true;
2706        continue;
2707      }
2708
2709      final String rows = "--rows=";
2710      if (cmd.startsWith(rows)) {
2711        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2712        continue;
2713      }
2714
2715      final String cycles = "--cycles=";
2716      if (cmd.startsWith(cycles)) {
2717        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2718        continue;
2719      }
2720
2721      final String sampleRate = "--sampleRate=";
2722      if (cmd.startsWith(sampleRate)) {
2723        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2724        continue;
2725      }
2726
2727      final String table = "--table=";
2728      if (cmd.startsWith(table)) {
2729        opts.tableName = cmd.substring(table.length());
2730        continue;
2731      }
2732
2733      final String startRow = "--startRow=";
2734      if (cmd.startsWith(startRow)) {
2735        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2736        continue;
2737      }
2738
2739      final String compress = "--compress=";
2740      if (cmd.startsWith(compress)) {
2741        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2742        continue;
2743      }
2744
2745      final String encryption = "--encryption=";
2746      if (cmd.startsWith(encryption)) {
2747        opts.encryption = cmd.substring(encryption.length());
2748        continue;
2749      }
2750
2751      final String traceRate = "--traceRate=";
2752      if (cmd.startsWith(traceRate)) {
2753        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2754        continue;
2755      }
2756
2757      final String blockEncoding = "--blockEncoding=";
2758      if (cmd.startsWith(blockEncoding)) {
2759        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2760        continue;
2761      }
2762
2763      final String flushCommits = "--flushCommits=";
2764      if (cmd.startsWith(flushCommits)) {
2765        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2766        continue;
2767      }
2768
2769      final String writeToWAL = "--writeToWAL=";
2770      if (cmd.startsWith(writeToWAL)) {
2771        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2772        continue;
2773      }
2774
2775      final String presplit = "--presplit=";
2776      if (cmd.startsWith(presplit)) {
2777        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2778        continue;
2779      }
2780
2781      final String inMemory = "--inmemory=";
2782      if (cmd.startsWith(inMemory)) {
2783        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2784        continue;
2785      }
2786
2787      final String autoFlush = "--autoFlush=";
2788      if (cmd.startsWith(autoFlush)) {
2789        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2790        continue;
2791      }
2792
2793      final String onceCon = "--oneCon=";
2794      if (cmd.startsWith(onceCon)) {
2795        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2796        continue;
2797      }
2798
2799      final String connCount = "--connCount=";
2800      if (cmd.startsWith(connCount)) {
2801        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2802        continue;
2803      }
2804
2805      final String latencyThreshold = "--latencyThreshold=";
2806      if (cmd.startsWith(latencyThreshold)) {
2807        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2808        continue;
2809      }
2810
2811      final String latency = "--latency";
2812      if (cmd.startsWith(latency)) {
2813        opts.reportLatency = true;
2814        continue;
2815      }
2816
2817      final String multiGet = "--multiGet=";
2818      if (cmd.startsWith(multiGet)) {
2819        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2820        continue;
2821      }
2822
2823      final String multiPut = "--multiPut=";
2824      if (cmd.startsWith(multiPut)) {
2825        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2826        continue;
2827      }
2828
2829      final String useTags = "--usetags=";
2830      if (cmd.startsWith(useTags)) {
2831        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2832        continue;
2833      }
2834
2835      final String noOfTags = "--numoftags=";
2836      if (cmd.startsWith(noOfTags)) {
2837        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2838        continue;
2839      }
2840
2841      final String replicas = "--replicas=";
2842      if (cmd.startsWith(replicas)) {
2843        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2844        continue;
2845      }
2846
2847      final String filterOutAll = "--filterAll";
2848      if (cmd.startsWith(filterOutAll)) {
2849        opts.filterAll = true;
2850        continue;
2851      }
2852
2853      final String size = "--size=";
2854      if (cmd.startsWith(size)) {
2855        opts.size = Float.parseFloat(cmd.substring(size.length()));
2856        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2857        continue;
2858      }
2859
2860      final String splitPolicy = "--splitPolicy=";
2861      if (cmd.startsWith(splitPolicy)) {
2862        opts.splitPolicy = cmd.substring(splitPolicy.length());
2863        continue;
2864      }
2865
2866      final String randomSleep = "--randomSleep=";
2867      if (cmd.startsWith(randomSleep)) {
2868        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2869        continue;
2870      }
2871
2872      final String measureAfter = "--measureAfter=";
2873      if (cmd.startsWith(measureAfter)) {
2874        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2875        continue;
2876      }
2877
2878      final String bloomFilter = "--bloomFilter=";
2879      if (cmd.startsWith(bloomFilter)) {
2880        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2881        continue;
2882      }
2883
2884      final String blockSize = "--blockSize=";
2885      if (cmd.startsWith(blockSize)) {
2886        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2887        continue;
2888      }
2889
2890      final String valueSize = "--valueSize=";
2891      if (cmd.startsWith(valueSize)) {
2892        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2893        continue;
2894      }
2895
2896      final String valueRandom = "--valueRandom";
2897      if (cmd.startsWith(valueRandom)) {
2898        opts.valueRandom = true;
2899        continue;
2900      }
2901
2902      final String valueZipf = "--valueZipf";
2903      if (cmd.startsWith(valueZipf)) {
2904        opts.valueZipf = true;
2905        continue;
2906      }
2907
2908      final String period = "--period=";
2909      if (cmd.startsWith(period)) {
2910        opts.period = Integer.parseInt(cmd.substring(period.length()));
2911        continue;
2912      }
2913
2914      final String addColumns = "--addColumns=";
2915      if (cmd.startsWith(addColumns)) {
2916        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2917        continue;
2918      }
2919
2920      final String inMemoryCompaction = "--inmemoryCompaction=";
2921      if (cmd.startsWith(inMemoryCompaction)) {
2922        opts.inMemoryCompaction =
2923          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2924        continue;
2925      }
2926
2927      final String columns = "--columns=";
2928      if (cmd.startsWith(columns)) {
2929        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2930        continue;
2931      }
2932
2933      final String families = "--families=";
2934      if (cmd.startsWith(families)) {
2935        opts.families = Integer.parseInt(cmd.substring(families.length()));
2936        continue;
2937      }
2938
2939      final String caching = "--caching=";
2940      if (cmd.startsWith(caching)) {
2941        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2942        continue;
2943      }
2944
2945      final String asyncPrefetch = "--asyncPrefetch";
2946      if (cmd.startsWith(asyncPrefetch)) {
2947        opts.asyncPrefetch = true;
2948        continue;
2949      }
2950
2951      final String cacheBlocks = "--cacheBlocks=";
2952      if (cmd.startsWith(cacheBlocks)) {
2953        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2954        continue;
2955      }
2956
2957      final String scanReadType = "--scanReadType=";
2958      if (cmd.startsWith(scanReadType)) {
2959        opts.scanReadType =
2960          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2961        continue;
2962      }
2963
2964      final String bufferSize = "--bufferSize=";
2965      if (cmd.startsWith(bufferSize)) {
2966        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2967        continue;
2968      }
2969
2970      validateParsedOpts(opts);
2971
2972      if (isCommandClass(cmd)) {
2973        opts.cmdName = cmd;
2974        try {
2975          opts.numClientThreads = Integer.parseInt(args.remove());
2976        } catch (NoSuchElementException | NumberFormatException e) {
2977          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2978        }
2979        opts = calculateRowsAndSize(opts);
2980        break;
2981      } else {
2982        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2983      }
2984
2985      // Not matching any option or command.
2986      System.err.println("Error: Wrong option or command: " + cmd);
2987      args.add(cmd);
2988      break;
2989    }
2990    return opts;
2991  }
2992
2993  /**
2994   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
2995   */
2996  private static void validateParsedOpts(TestOptions opts) {
2997
2998    if (!opts.autoFlush && opts.multiPut > 0) {
2999      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3000    }
3001
3002    if (opts.oneCon && opts.connCount > 1) {
3003      throw new IllegalArgumentException(
3004        "oneCon is set to true, " + "connCount should not bigger than 1");
3005    }
3006
3007    if (opts.valueZipf && opts.valueRandom) {
3008      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3009    }
3010  }
3011
3012  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3013    int rowsPerGB = getRowsPerGB(opts);
3014    if (
3015      (opts.getCmdName() != null
3016        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3017        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3018    ) {
3019      opts.totalRows = (int) opts.size * rowsPerGB;
3020    } else if (opts.size != DEFAULT_OPTS.size) {
3021      // total size in GB specified
3022      opts.totalRows = (int) opts.size * rowsPerGB;
3023      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3024    } else {
3025      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3026      opts.size = opts.totalRows / rowsPerGB;
3027    }
3028    return opts;
3029  }
3030
3031  static int getRowsPerGB(final TestOptions opts) {
3032    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3033      * opts.getColumns());
3034  }
3035
3036  @Override
3037  public int run(String[] args) throws Exception {
3038    // Process command-line args. TODO: Better cmd-line processing
3039    // (but hopefully something not as painful as cli options).
3040    int errCode = -1;
3041    if (args.length < 1) {
3042      printUsage();
3043      return errCode;
3044    }
3045
3046    try {
3047      LinkedList<String> argv = new LinkedList<>();
3048      argv.addAll(Arrays.asList(args));
3049      TestOptions opts = parseOpts(argv);
3050
3051      // args remaining, print help and exit
3052      if (!argv.isEmpty()) {
3053        errCode = 0;
3054        printUsage();
3055        return errCode;
3056      }
3057
3058      // must run at least 1 client
3059      if (opts.numClientThreads <= 0) {
3060        throw new IllegalArgumentException("Number of clients must be > 0");
3061      }
3062
3063      // cmdName should not be null, print help and exit
3064      if (opts.cmdName == null) {
3065        printUsage();
3066        return errCode;
3067      }
3068
3069      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3070      if (cmdClass != null) {
3071        runTest(cmdClass, opts);
3072        errCode = 0;
3073      }
3074
3075    } catch (Exception e) {
3076      e.printStackTrace();
3077    }
3078
3079    return errCode;
3080  }
3081
3082  private static boolean isCommandClass(String cmd) {
3083    return COMMANDS.containsKey(cmd);
3084  }
3085
3086  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3087    CmdDescriptor descriptor = COMMANDS.get(cmd);
3088    return descriptor != null ? descriptor.getCmdClass() : null;
3089  }
3090
3091  public static void main(final String[] args) throws Exception {
3092    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3093    System.exit(res);
3094  }
3095}