001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import com.codahale.metrics.Histogram;
022import com.codahale.metrics.UniformReservoir;
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Scope;
025import java.io.IOException;
026import java.io.PrintStream;
027import java.lang.reflect.Constructor;
028import java.math.BigDecimal;
029import java.math.MathContext;
030import java.text.DecimalFormat;
031import java.text.SimpleDateFormat;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Date;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Locale;
038import java.util.Map;
039import java.util.NoSuchElementException;
040import java.util.Queue;
041import java.util.Random;
042import java.util.TreeMap;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.AsyncConnection;
056import org.apache.hadoop.hbase.client.AsyncTable;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Consistency;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.RowMutations;
074import org.apache.hadoop.hbase.client.Scan;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
079import org.apache.hadoop.hbase.filter.BinaryComparator;
080import org.apache.hadoop.hbase.filter.Filter;
081import org.apache.hadoop.hbase.filter.FilterAllFilter;
082import org.apache.hadoop.hbase.filter.FilterList;
083import org.apache.hadoop.hbase.filter.PageFilter;
084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
085import org.apache.hadoop.hbase.filter.WhileMatchFilter;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
091import org.apache.hadoop.hbase.trace.TraceUtil;
092import org.apache.hadoop.hbase.util.ByteArrayHashKey;
093import org.apache.hadoop.hbase.util.Bytes;
094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
095import org.apache.hadoop.hbase.util.GsonUtil;
096import org.apache.hadoop.hbase.util.Hash;
097import org.apache.hadoop.hbase.util.MurmurHash;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.hbase.util.RandomDistribution;
100import org.apache.hadoop.hbase.util.YammerHistogramUtils;
101import org.apache.hadoop.io.LongWritable;
102import org.apache.hadoop.io.Text;
103import org.apache.hadoop.mapreduce.Job;
104import org.apache.hadoop.mapreduce.Mapper;
105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
108import org.apache.hadoop.util.Tool;
109import org.apache.hadoop.util.ToolRunner;
110import org.apache.yetus.audience.InterfaceAudience;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113
114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
116import org.apache.hbase.thirdparty.com.google.gson.Gson;
117
118/**
119 * Script used evaluating HBase performance and scalability.  Runs a HBase
120 * client that steps through one of a set of hardcoded tests or 'experiments'
121 * (e.g. a random reads test, a random writes test, etc.). Pass on the
122 * command-line which test to run and how many clients are participating in
123 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
124 *
125 * <p>This class sets up and runs the evaluation programs described in
126 * Section 7, <i>Performance Evaluation</i>, of the <a
127 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
128 * paper, pages 8-10.
129 *
130 * <p>By default, runs as a mapreduce job where each mapper runs a single test
131 * client. Can also run as a non-mapreduce, multithreaded application by
132 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
133 * specified otherwise.
134 */
135@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
136public class PerformanceEvaluation extends Configured implements Tool {
137  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
138  static final String RANDOM_READ = "randomRead";
139  static final String PE_COMMAND_SHORTNAME = "pe";
140  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
141  private static final Gson GSON = GsonUtil.createGson().create();
142
143  public static final String TABLE_NAME = "TestTable";
144  public static final String FAMILY_NAME_BASE = "info";
145  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
146  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
147  public static final int DEFAULT_VALUE_LENGTH = 1000;
148  public static final int ROW_LENGTH = 26;
149
150  private static final int ONE_GB = 1024 * 1024 * 1000;
151  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
152  // TODO : should we make this configurable
153  private static final int TAG_LENGTH = 256;
154  private static final DecimalFormat FMT = new DecimalFormat("0.##");
155  private static final MathContext CXT = MathContext.DECIMAL64;
156  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
157  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
158  private static final TestOptions DEFAULT_OPTS = new TestOptions();
159
160  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
161  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
162
163  static {
164    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
165      "Run async random read test");
166    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
167      "Run async random write test");
168    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
169      "Run async sequential read test");
170    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
171      "Run async sequential write test");
172    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
173      "Run async scan test (read every row)");
174    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
175    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead",
176      "Run getRegionLocation test");
177    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
178      "Run random seek and scan 100 test");
179    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
180      "Run random seek scan with both start and stop row (max 10 rows)");
181    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
182      "Run random seek scan with both start and stop row (max 100 rows)");
183    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
184      "Run random seek scan with both start and stop row (max 1000 rows)");
185    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
186      "Run random seek scan with both start and stop row (max 10000 rows)");
187    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
188      "Run random write test");
189    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
190      "Run sequential read test");
191    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
192      "Run sequential write test");
193    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
194      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
195    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
196    addCommandDescriptor(FilteredScanTest.class, "filterScan",
197      "Run scan test using a filter to find a specific row based on it's value " +
198        "(make sure to use --rows=20)");
199    addCommandDescriptor(IncrementTest.class, "increment",
200      "Increment on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(AppendTest.class, "append",
202      "Append on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
204      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
205    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
206      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
207    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
208      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
209    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
210      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
211  }
212
213  /**
214   * Enum for map metrics.  Keep it out here rather than inside in the Map
215   * inner-class so we can find associated properties.
216   */
217  protected static enum Counter {
218    /** elapsed time */
219    ELAPSED_TIME,
220    /** number of rows */
221    ROWS
222  }
223
224  protected static class RunResult implements Comparable<RunResult> {
225    public RunResult(long duration, Histogram hist) {
226      this.duration = duration;
227      this.hist = hist;
228      numbOfReplyOverThreshold = 0;
229      numOfReplyFromReplica = 0;
230    }
231
232    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
233      Histogram hist) {
234      this.duration = duration;
235      this.hist = hist;
236      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
237      this.numOfReplyFromReplica = numOfReplyFromReplica;
238    }
239
240    public final long duration;
241    public final Histogram hist;
242    public final long numbOfReplyOverThreshold;
243    public final long numOfReplyFromReplica;
244
245    @Override
246    public String toString() {
247      return Long.toString(duration);
248    }
249
250    @Override public int compareTo(RunResult o) {
251      return Long.compare(this.duration, o.duration);
252    }
253  }
254
255  /**
256   * Constructor
257   * @param conf Configuration object
258   */
259  public PerformanceEvaluation(final Configuration conf) {
260    super(conf);
261  }
262
263  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
264      String name, String description) {
265    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
266    COMMANDS.put(name, cmdDescriptor);
267  }
268
269  /**
270   * Implementations can have their status set.
271   */
272  interface Status {
273    /**
274     * Sets status
275     * @param msg status message
276     * @throws IOException
277     */
278    void setStatus(final String msg) throws IOException;
279  }
280
281  /**
282   * MapReduce job that runs a performance evaluation client in each map task.
283   */
284  public static class EvaluationMapTask
285      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
286
287    /** configuration parameter name that contains the command */
288    public final static String CMD_KEY = "EvaluationMapTask.command";
289    /** configuration parameter name that contains the PE impl */
290    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
291
292    private Class<? extends Test> cmd;
293
294    @Override
295    protected void setup(Context context) throws IOException, InterruptedException {
296      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
297
298      // this is required so that extensions of PE are instantiated within the
299      // map reduce task...
300      Class<? extends PerformanceEvaluation> peClass =
301          forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
302      try {
303        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
304      } catch (Exception e) {
305        throw new IllegalStateException("Could not instantiate PE instance", e);
306      }
307    }
308
309    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
310      try {
311        return Class.forName(className).asSubclass(type);
312      } catch (ClassNotFoundException e) {
313        throw new IllegalStateException("Could not find class for name: " + className, e);
314      }
315    }
316
317    @Override
318    protected void map(LongWritable key, Text value, final Context context)
319           throws IOException, InterruptedException {
320
321      Status status = new Status() {
322        @Override
323        public void setStatus(String msg) {
324           context.setStatus(msg);
325        }
326      };
327
328      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
329      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
330      final Connection con = ConnectionFactory.createConnection(conf);
331      AsyncConnection asyncCon = null;
332      try {
333        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
334      } catch (ExecutionException e) {
335        throw new IOException(e);
336      }
337
338      // Evaluation task
339      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
340      // Collect how much time the thing took. Report as map output and
341      // to the ELAPSED_TIME counter.
342      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
343      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
344      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
345      context.progress();
346    }
347  }
348
349  /*
350   * If table does not already exist, create. Also create a table when
351   * {@code opts.presplitRegions} is specified or when the existing table's
352   * region replica count doesn't match {@code opts.replicas}.
353   */
354  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
355    TableName tableName = TableName.valueOf(opts.tableName);
356    boolean needsDelete = false, exists = admin.tableExists(tableName);
357    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
358      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
359    if (!exists && isReadCmd) {
360      throw new IllegalStateException(
361        "Must specify an existing table for read commands. Run a write command first.");
362    }
363    TableDescriptor desc =
364      exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
365    byte[][] splits = getSplits(opts);
366
367    // recreate the table when user has requested presplit or when existing
368    // {RegionSplitPolicy,replica count} does not match requested, or when the
369    // number of column families does not match requested.
370    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
371      || (!isReadCmd && desc != null &&
372          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
373      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
374      || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
375      needsDelete = true;
376      // wait, why did it delete my table?!?
377      LOG.debug(MoreObjects.toStringHelper("needsDelete")
378        .add("needsDelete", needsDelete)
379        .add("isReadCmd", isReadCmd)
380        .add("exists", exists)
381        .add("desc", desc)
382        .add("presplit", opts.presplitRegions)
383        .add("splitPolicy", opts.splitPolicy)
384        .add("replicas", opts.replicas)
385        .add("families", opts.families)
386        .toString());
387    }
388
389    // remove an existing table
390    if (needsDelete) {
391      if (admin.isTableEnabled(tableName)) {
392        admin.disableTable(tableName);
393      }
394      admin.deleteTable(tableName);
395    }
396
397    // table creation is necessary
398    if (!exists || needsDelete) {
399      desc = getTableDescriptor(opts);
400      if (splits != null) {
401        if (LOG.isDebugEnabled()) {
402          for (int i = 0; i < splits.length; i++) {
403            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
404          }
405        }
406      }
407      if (splits != null) {
408        admin.createTable(desc, splits);
409      } else {
410        admin.createTable(desc);
411      }
412      LOG.info("Table " + desc + " created");
413    }
414    return admin.tableExists(tableName);
415  }
416
417  /**
418   * Create an HTableDescriptor from provided TestOptions.
419   */
420  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
421    TableDescriptorBuilder builder =
422      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
423
424    for (int family = 0; family < opts.families; family++) {
425      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
426      ColumnFamilyDescriptorBuilder cfBuilder =
427        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
428      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
429      cfBuilder.setCompressionType(opts.compression);
430      cfBuilder.setBloomFilterType(opts.bloomType);
431      cfBuilder.setBlocksize(opts.blockSize);
432      if (opts.inMemoryCF) {
433        cfBuilder.setInMemory(true);
434      }
435      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
436      builder.setColumnFamily(cfBuilder.build());
437    }
438    if (opts.replicas != DEFAULT_OPTS.replicas) {
439      builder.setRegionReplication(opts.replicas);
440    }
441    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
442      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
443    }
444    return builder.build();
445  }
446
447  /**
448   * generates splits based on total number of rows and specified split regions
449   */
450  protected static byte[][] getSplits(TestOptions opts) {
451    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
452      return null;
453
454    int numSplitPoints = opts.presplitRegions - 1;
455    byte[][] splits = new byte[numSplitPoints][];
456    int jump = opts.totalRows / opts.presplitRegions;
457    for (int i = 0; i < numSplitPoints; i++) {
458      int rowkey = jump * (1 + i);
459      splits[i] = format(rowkey);
460    }
461    return splits;
462  }
463
464  static void setupConnectionCount(final TestOptions opts) {
465    if (opts.oneCon) {
466      opts.connCount = 1;
467    } else {
468      if (opts.connCount == -1) {
469        // set to thread number if connCount is not set
470        opts.connCount = opts.numClientThreads;
471      }
472    }
473  }
474
475  /*
476   * Run all clients in this vm each to its own thread.
477   */
478  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
479      throws IOException, InterruptedException, ExecutionException {
480    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
481    assert cmd != null;
482    @SuppressWarnings("unchecked")
483    Future<RunResult>[] threads = new Future[opts.numClientThreads];
484    RunResult[] results = new RunResult[opts.numClientThreads];
485    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
486      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
487    setupConnectionCount(opts);
488    final Connection[] cons = new Connection[opts.connCount];
489    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
490    for (int i = 0; i < opts.connCount; i++) {
491      cons[i] = ConnectionFactory.createConnection(conf);
492      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
493    }
494    LOG.info("Created " + opts.connCount + " connections for " +
495        opts.numClientThreads + " threads");
496    for (int i = 0; i < threads.length; i++) {
497      final int index = i;
498      threads[i] = pool.submit(new Callable<RunResult>() {
499        @Override
500        public RunResult call() throws Exception {
501          TestOptions threadOpts = new TestOptions(opts);
502          final Connection con = cons[index % cons.length];
503          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
504          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
505          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
506            @Override
507            public void setStatus(final String msg) throws IOException {
508              LOG.info(msg);
509            }
510          });
511          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
512            "ms over " + threadOpts.perClientRunRows + " rows");
513          if (opts.latencyThreshold > 0) {
514            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold +
515              "(ms) is " + run.numbOfReplyOverThreshold);
516          }
517          return run;
518        }
519      });
520    }
521    pool.shutdown();
522
523    for (int i = 0; i < threads.length; i++) {
524      try {
525        results[i] = threads[i].get();
526      } catch (ExecutionException e) {
527        throw new IOException(e.getCause());
528      }
529    }
530    final String test = cmd.getSimpleName();
531    LOG.info("[" + test + "] Summary of timings (ms): "
532             + Arrays.toString(results));
533    Arrays.sort(results);
534    long total = 0;
535    float avgLatency = 0 ;
536    float avgTPS = 0;
537    long replicaWins = 0;
538    for (RunResult result : results) {
539      total += result.duration;
540      avgLatency += result.hist.getSnapshot().getMean();
541      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
542      replicaWins += result.numOfReplyFromReplica;
543    }
544    avgTPS *= 1000; // ms to second
545    avgLatency = avgLatency / results.length;
546    LOG.info("[" + test + " duration ]"
547      + "\tMin: " + results[0] + "ms"
548      + "\tMax: " + results[results.length - 1] + "ms"
549      + "\tAvg: " + (total / results.length) + "ms");
550    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
551    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
552    if (opts.replicas > 1) {
553      LOG.info("[results from replica regions] " + replicaWins);
554    }
555
556    for (int i = 0; i < opts.connCount; i++) {
557      cons[i].close();
558      asyncCons[i].close();
559    }
560
561    return results;
562  }
563
564  /*
565   * Run a mapreduce job.  Run as many maps as asked-for clients.
566   * Before we start up the job, write out an input file with instruction
567   * per client regards which row they are to start on.
568   * @param cmd Command to run.
569   * @throws IOException
570   */
571  static Job doMapReduce(TestOptions opts, final Configuration conf)
572      throws IOException, InterruptedException, ClassNotFoundException {
573    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
574    assert cmd != null;
575    Path inputDir = writeInputFile(conf, opts);
576    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
577    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
578    Job job = Job.getInstance(conf);
579    job.setJarByClass(PerformanceEvaluation.class);
580    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
581
582    job.setInputFormatClass(NLineInputFormat.class);
583    NLineInputFormat.setInputPaths(job, inputDir);
584    // this is default, but be explicit about it just in case.
585    NLineInputFormat.setNumLinesPerSplit(job, 1);
586
587    job.setOutputKeyClass(LongWritable.class);
588    job.setOutputValueClass(LongWritable.class);
589
590    job.setMapperClass(EvaluationMapTask.class);
591    job.setReducerClass(LongSumReducer.class);
592
593    job.setNumReduceTasks(1);
594
595    job.setOutputFormatClass(TextOutputFormat.class);
596    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
597
598    TableMapReduceUtil.addDependencyJars(job);
599    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
600      Histogram.class,     // yammer metrics
601      Gson.class,  // gson
602      FilterAllFilter.class // hbase-server tests jar
603      );
604
605    TableMapReduceUtil.initCredentials(job);
606
607    job.waitForCompletion(true);
608    return job;
609  }
610
611  /**
612   * Each client has one mapper to do the work,  and client do the resulting count in a map task.
613   */
614
615  static String JOB_INPUT_FILENAME = "input.txt";
616
617  /*
618   * Write input file of offsets-per-client for the mapreduce job.
619   * @param c Configuration
620   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
621   * @throws IOException
622   */
623  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
624    return writeInputFile(c, opts, new Path("."));
625  }
626
627  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
628  throws IOException {
629    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
630    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
631    Path inputDir = new Path(jobdir, "inputs");
632
633    FileSystem fs = FileSystem.get(c);
634    fs.mkdirs(inputDir);
635
636    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
637    PrintStream out = new PrintStream(fs.create(inputFile));
638    // Make input random.
639    Map<Integer, String> m = new TreeMap<>();
640    Hash h = MurmurHash.getInstance();
641    int perClientRows = (opts.totalRows / opts.numClientThreads);
642    try {
643      for (int j = 0; j < opts.numClientThreads; j++) {
644        TestOptions next = new TestOptions(opts);
645        next.startRow = j * perClientRows;
646        next.perClientRunRows = perClientRows;
647        String s = GSON.toJson(next);
648        LOG.info("Client=" + j + ", input=" + s);
649        byte[] b = Bytes.toBytes(s);
650        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
651        m.put(hash, s);
652      }
653      for (Map.Entry<Integer, String> e: m.entrySet()) {
654        out.println(e.getValue());
655      }
656    } finally {
657      out.close();
658    }
659    return inputDir;
660  }
661
662  /**
663   * Describes a command.
664   */
665  static class CmdDescriptor {
666    private Class<? extends TestBase> cmdClass;
667    private String name;
668    private String description;
669
670    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
671      this.cmdClass = cmdClass;
672      this.name = name;
673      this.description = description;
674    }
675
676    public Class<? extends TestBase> getCmdClass() {
677      return cmdClass;
678    }
679
680    public String getName() {
681      return name;
682    }
683
684    public String getDescription() {
685      return description;
686    }
687  }
688
689  /**
690   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
691   * This makes tracking all these arguments a little easier.
692   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
693   * serialization of this TestOptions class behave), and you need to add to the clone constructor
694   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
695   */
696  static class TestOptions {
697    String cmdName = null;
698    boolean nomapred = false;
699    boolean filterAll = false;
700    int startRow = 0;
701    float size = 1.0f;
702    int perClientRunRows = DEFAULT_ROWS_PER_GB;
703    int numClientThreads = 1;
704    int totalRows = DEFAULT_ROWS_PER_GB;
705    int measureAfter = 0;
706    float sampleRate = 1.0f;
707    /**
708     * @deprecated Useless after switching to OpenTelemetry
709     */
710    @Deprecated
711    double traceRate = 0.0;
712    String tableName = TABLE_NAME;
713    boolean flushCommits = true;
714    boolean writeToWAL = true;
715    boolean autoFlush = false;
716    boolean oneCon = false;
717    int connCount = -1; //wil decide the actual num later
718    boolean useTags = false;
719    int noOfTags = 1;
720    boolean reportLatency = false;
721    int multiGet = 0;
722    int multiPut = 0;
723    int randomSleep = 0;
724    boolean inMemoryCF = false;
725    int presplitRegions = 0;
726    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
727    String splitPolicy = null;
728    Compression.Algorithm compression = Compression.Algorithm.NONE;
729    BloomType bloomType = BloomType.ROW;
730    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
731    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
732    boolean valueRandom = false;
733    boolean valueZipf = false;
734    int valueSize = DEFAULT_VALUE_LENGTH;
735    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
736    int cycles = 1;
737    int columns = 1;
738    int families = 1;
739    int caching = 30;
740    int latencyThreshold = 0; // in millsecond
741    boolean addColumns = true;
742    MemoryCompactionPolicy inMemoryCompaction =
743        MemoryCompactionPolicy.valueOf(
744            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
745    boolean asyncPrefetch = false;
746    boolean cacheBlocks = true;
747    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
748    long bufferSize = 2l * 1024l * 1024l;
749
750    public TestOptions() {}
751
752    /**
753     * Clone constructor.
754     * @param that Object to copy from.
755     */
756    public TestOptions(TestOptions that) {
757      this.cmdName = that.cmdName;
758      this.cycles = that.cycles;
759      this.nomapred = that.nomapred;
760      this.startRow = that.startRow;
761      this.size = that.size;
762      this.perClientRunRows = that.perClientRunRows;
763      this.numClientThreads = that.numClientThreads;
764      this.totalRows = that.totalRows;
765      this.sampleRate = that.sampleRate;
766      this.traceRate = that.traceRate;
767      this.tableName = that.tableName;
768      this.flushCommits = that.flushCommits;
769      this.writeToWAL = that.writeToWAL;
770      this.autoFlush = that.autoFlush;
771      this.oneCon = that.oneCon;
772      this.connCount = that.connCount;
773      this.useTags = that.useTags;
774      this.noOfTags = that.noOfTags;
775      this.reportLatency = that.reportLatency;
776      this.latencyThreshold = that.latencyThreshold;
777      this.multiGet = that.multiGet;
778      this.multiPut = that.multiPut;
779      this.inMemoryCF = that.inMemoryCF;
780      this.presplitRegions = that.presplitRegions;
781      this.replicas = that.replicas;
782      this.splitPolicy = that.splitPolicy;
783      this.compression = that.compression;
784      this.blockEncoding = that.blockEncoding;
785      this.filterAll = that.filterAll;
786      this.bloomType = that.bloomType;
787      this.blockSize = that.blockSize;
788      this.valueRandom = that.valueRandom;
789      this.valueZipf = that.valueZipf;
790      this.valueSize = that.valueSize;
791      this.period = that.period;
792      this.randomSleep = that.randomSleep;
793      this.measureAfter = that.measureAfter;
794      this.addColumns = that.addColumns;
795      this.columns = that.columns;
796      this.families = that.families;
797      this.caching = that.caching;
798      this.inMemoryCompaction = that.inMemoryCompaction;
799      this.asyncPrefetch = that.asyncPrefetch;
800      this.cacheBlocks = that.cacheBlocks;
801      this.scanReadType = that.scanReadType;
802      this.bufferSize = that.bufferSize;
803    }
804
805    public int getCaching() {
806      return this.caching;
807    }
808
809    public void setCaching(final int caching) {
810      this.caching = caching;
811    }
812
813    public int getColumns() {
814      return this.columns;
815    }
816
817    public void setColumns(final int columns) {
818      this.columns = columns;
819    }
820
821    public int getFamilies() {
822      return this.families;
823    }
824
825    public void setFamilies(final int families) {
826      this.families = families;
827    }
828
829    public int getCycles() {
830      return this.cycles;
831    }
832
833    public void setCycles(final int cycles) {
834      this.cycles = cycles;
835    }
836
837    public boolean isValueZipf() {
838      return valueZipf;
839    }
840
841    public void setValueZipf(boolean valueZipf) {
842      this.valueZipf = valueZipf;
843    }
844
845    public String getCmdName() {
846      return cmdName;
847    }
848
849    public void setCmdName(String cmdName) {
850      this.cmdName = cmdName;
851    }
852
853    public int getRandomSleep() {
854      return randomSleep;
855    }
856
857    public void setRandomSleep(int randomSleep) {
858      this.randomSleep = randomSleep;
859    }
860
861    public int getReplicas() {
862      return replicas;
863    }
864
865    public void setReplicas(int replicas) {
866      this.replicas = replicas;
867    }
868
869    public String getSplitPolicy() {
870      return splitPolicy;
871    }
872
873    public void setSplitPolicy(String splitPolicy) {
874      this.splitPolicy = splitPolicy;
875    }
876
877    public void setNomapred(boolean nomapred) {
878      this.nomapred = nomapred;
879    }
880
881    public void setFilterAll(boolean filterAll) {
882      this.filterAll = filterAll;
883    }
884
885    public void setStartRow(int startRow) {
886      this.startRow = startRow;
887    }
888
889    public void setSize(float size) {
890      this.size = size;
891    }
892
893    public void setPerClientRunRows(int perClientRunRows) {
894      this.perClientRunRows = perClientRunRows;
895    }
896
897    public void setNumClientThreads(int numClientThreads) {
898      this.numClientThreads = numClientThreads;
899    }
900
901    public void setTotalRows(int totalRows) {
902      this.totalRows = totalRows;
903    }
904
905    public void setSampleRate(float sampleRate) {
906      this.sampleRate = sampleRate;
907    }
908
909    public void setTraceRate(double traceRate) {
910      this.traceRate = traceRate;
911    }
912
913    public void setTableName(String tableName) {
914      this.tableName = tableName;
915    }
916
917    public void setFlushCommits(boolean flushCommits) {
918      this.flushCommits = flushCommits;
919    }
920
921    public void setWriteToWAL(boolean writeToWAL) {
922      this.writeToWAL = writeToWAL;
923    }
924
925    public void setAutoFlush(boolean autoFlush) {
926      this.autoFlush = autoFlush;
927    }
928
929    public void setOneCon(boolean oneCon) {
930      this.oneCon = oneCon;
931    }
932
933    public int getConnCount() {
934      return connCount;
935    }
936
937    public void setConnCount(int connCount) {
938      this.connCount = connCount;
939    }
940
941    public void setUseTags(boolean useTags) {
942      this.useTags = useTags;
943    }
944
945    public void setNoOfTags(int noOfTags) {
946      this.noOfTags = noOfTags;
947    }
948
949    public void setReportLatency(boolean reportLatency) {
950      this.reportLatency = reportLatency;
951    }
952
953    public void setMultiGet(int multiGet) {
954      this.multiGet = multiGet;
955    }
956
957    public void setMultiPut(int multiPut) {
958      this.multiPut = multiPut;
959    }
960
961    public void setInMemoryCF(boolean inMemoryCF) {
962      this.inMemoryCF = inMemoryCF;
963    }
964
965    public void setPresplitRegions(int presplitRegions) {
966      this.presplitRegions = presplitRegions;
967    }
968
969    public void setCompression(Compression.Algorithm compression) {
970      this.compression = compression;
971    }
972
973    public void setBloomType(BloomType bloomType) {
974      this.bloomType = bloomType;
975    }
976
977    public void setBlockSize(int blockSize) {
978      this.blockSize = blockSize;
979    }
980
981    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
982      this.blockEncoding = blockEncoding;
983    }
984
985    public void setValueRandom(boolean valueRandom) {
986      this.valueRandom = valueRandom;
987    }
988
989    public void setValueSize(int valueSize) {
990      this.valueSize = valueSize;
991    }
992
993    public void setBufferSize(long bufferSize) {
994      this.bufferSize = bufferSize;
995    }
996
997    public void setPeriod(int period) {
998      this.period = period;
999    }
1000
1001    public boolean isNomapred() {
1002      return nomapred;
1003    }
1004
1005    public boolean isFilterAll() {
1006      return filterAll;
1007    }
1008
1009    public int getStartRow() {
1010      return startRow;
1011    }
1012
1013    public float getSize() {
1014      return size;
1015    }
1016
1017    public int getPerClientRunRows() {
1018      return perClientRunRows;
1019    }
1020
1021    public int getNumClientThreads() {
1022      return numClientThreads;
1023    }
1024
1025    public int getTotalRows() {
1026      return totalRows;
1027    }
1028
1029    public float getSampleRate() {
1030      return sampleRate;
1031    }
1032
1033    public double getTraceRate() {
1034      return traceRate;
1035    }
1036
1037    public String getTableName() {
1038      return tableName;
1039    }
1040
1041    public boolean isFlushCommits() {
1042      return flushCommits;
1043    }
1044
1045    public boolean isWriteToWAL() {
1046      return writeToWAL;
1047    }
1048
1049    public boolean isAutoFlush() {
1050      return autoFlush;
1051    }
1052
1053    public boolean isUseTags() {
1054      return useTags;
1055    }
1056
1057    public int getNoOfTags() {
1058      return noOfTags;
1059    }
1060
1061    public boolean isReportLatency() {
1062      return reportLatency;
1063    }
1064
1065    public int getMultiGet() {
1066      return multiGet;
1067    }
1068
1069    public int getMultiPut() {
1070      return multiPut;
1071    }
1072
1073    public boolean isInMemoryCF() {
1074      return inMemoryCF;
1075    }
1076
1077    public int getPresplitRegions() {
1078      return presplitRegions;
1079    }
1080
1081    public Compression.Algorithm getCompression() {
1082      return compression;
1083    }
1084
1085    public DataBlockEncoding getBlockEncoding() {
1086      return blockEncoding;
1087    }
1088
1089    public boolean isValueRandom() {
1090      return valueRandom;
1091    }
1092
1093    public int getValueSize() {
1094      return valueSize;
1095    }
1096
1097    public int getPeriod() {
1098      return period;
1099    }
1100
1101    public BloomType getBloomType() {
1102      return bloomType;
1103    }
1104
1105    public int getBlockSize() {
1106      return blockSize;
1107    }
1108
1109    public boolean isOneCon() {
1110      return oneCon;
1111    }
1112
1113    public int getMeasureAfter() {
1114      return measureAfter;
1115    }
1116
1117    public void setMeasureAfter(int measureAfter) {
1118      this.measureAfter = measureAfter;
1119    }
1120
1121    public boolean getAddColumns() {
1122      return addColumns;
1123    }
1124
1125    public void setAddColumns(boolean addColumns) {
1126      this.addColumns = addColumns;
1127    }
1128
1129    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1130      this.inMemoryCompaction = inMemoryCompaction;
1131    }
1132
1133    public MemoryCompactionPolicy getInMemoryCompaction() {
1134      return this.inMemoryCompaction;
1135    }
1136
1137    public long getBufferSize() {
1138      return this.bufferSize;
1139    }
1140  }
1141
1142  /*
1143   * A test.
1144   * Subclass to particularize what happens per row.
1145   */
1146  static abstract class TestBase {
1147    // Below is make it so when Tests are all running in the one
1148    // jvm, that they each have a differently seeded Random.
1149    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
1150
1151    private static long nextRandomSeed() {
1152      return randomSeed.nextLong();
1153    }
1154    private final int everyN;
1155
1156    protected final Random rand = new Random(nextRandomSeed());
1157    protected final Configuration conf;
1158    protected final TestOptions opts;
1159
1160    private final Status status;
1161
1162    private String testName;
1163    private Histogram latencyHistogram;
1164    private Histogram replicaLatencyHistogram;
1165    private Histogram valueSizeHistogram;
1166    private Histogram rpcCallsHistogram;
1167    private Histogram remoteRpcCallsHistogram;
1168    private Histogram millisBetweenNextHistogram;
1169    private Histogram regionsScannedHistogram;
1170    private Histogram bytesInResultsHistogram;
1171    private Histogram bytesInRemoteResultsHistogram;
1172    private RandomDistribution.Zipf zipf;
1173    private long numOfReplyOverLatencyThreshold = 0;
1174    private long numOfReplyFromReplica = 0;
1175
1176    /**
1177     * Note that all subclasses of this class must provide a public constructor
1178     * that has the exact same list of arguments.
1179     */
1180    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1181      this.conf = conf;
1182      this.opts = options;
1183      this.status = status;
1184      this.testName = this.getClass().getSimpleName();
1185      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1186      if (options.isValueZipf()) {
1187        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1188      }
1189      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1190    }
1191
1192    int getValueLength(final Random r) {
1193      if (this.opts.isValueRandom()) {
1194        return r.nextInt(opts.valueSize);
1195      } else if (this.opts.isValueZipf()) {
1196        return Math.abs(this.zipf.nextInt());
1197      } else {
1198        return opts.valueSize;
1199      }
1200    }
1201
1202    void updateValueSize(final Result [] rs) throws IOException {
1203      updateValueSize(rs, 0);
1204    }
1205
1206    void updateValueSize(final Result [] rs, final long latency) throws IOException {
1207      if (rs == null || (latency == 0)) return;
1208      for (Result r: rs) updateValueSize(r, latency);
1209    }
1210
1211    void updateValueSize(final Result r) throws IOException {
1212      updateValueSize(r, 0);
1213    }
1214
1215    void updateValueSize(final Result r, final long latency) throws IOException {
1216      if (r == null || (latency == 0)) return;
1217      int size = 0;
1218      // update replicaHistogram
1219      if (r.isStale()) {
1220        replicaLatencyHistogram.update(latency / 1000);
1221        numOfReplyFromReplica ++;
1222      }
1223      if (!isRandomValueSize()) return;
1224
1225      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1226        size += scanner.current().getValueLength();
1227      }
1228      updateValueSize(size);
1229    }
1230
1231    void updateValueSize(final int valueSize) {
1232      if (!isRandomValueSize()) return;
1233      this.valueSizeHistogram.update(valueSize);
1234    }
1235
1236    void updateScanMetrics(final ScanMetrics metrics) {
1237      if (metrics == null) return;
1238      Map<String,Long> metricsMap = metrics.getMetricsMap();
1239      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1240      if (rpcCalls != null) {
1241        this.rpcCallsHistogram.update(rpcCalls.longValue());
1242      }
1243      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1244      if (remoteRpcCalls != null) {
1245        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1246      }
1247      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1248      if (millisBetweenNext != null) {
1249        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1250      }
1251      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1252      if (regionsScanned != null) {
1253        this.regionsScannedHistogram.update(regionsScanned.longValue());
1254      }
1255      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1256      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1257        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1258      }
1259      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1260      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1261        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1262      }
1263    }
1264
1265    String generateStatus(final int sr, final int i, final int lr) {
1266      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1267        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1268    }
1269
1270    boolean isRandomValueSize() {
1271      return opts.valueRandom;
1272    }
1273
1274    protected int getReportingPeriod() {
1275      return opts.period;
1276    }
1277
1278    /**
1279     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1280     */
1281    public Histogram getLatencyHistogram() {
1282      return latencyHistogram;
1283    }
1284
1285    void testSetup() throws IOException {
1286      // test metrics
1287      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1288      // If it is a replica test, set up histogram for replica.
1289      if (opts.replicas > 1) {
1290        replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1291      }
1292      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1293      // scan metrics
1294      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1295      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1296      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1297      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1298      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1299      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1300
1301      onStartup();
1302    }
1303
1304    abstract void onStartup() throws IOException;
1305
1306    void testTakedown() throws IOException {
1307      onTakedown();
1308      // Print all stats for this thread continuously.
1309      // Synchronize on Test.class so different threads don't intermingle the
1310      // output. We can't use 'this' here because each thread has its own instance of Test class.
1311      synchronized (Test.class) {
1312        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1313        status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1314            latencyHistogram));
1315        if (opts.replicas > 1) {
1316          status.setStatus("Latency (us) from Replica Regions: " +
1317            YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1318        }
1319        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1320        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1321        if (valueSizeHistogram.getCount() > 0) {
1322          status.setStatus("ValueSize (bytes) : "
1323              + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1324          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1325          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1326        } else {
1327          status.setStatus("No valueSize statistics available");
1328        }
1329        if (rpcCallsHistogram.getCount() > 0) {
1330          status.setStatus("rpcCalls (count): " +
1331              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1332        }
1333        if (remoteRpcCallsHistogram.getCount() > 0) {
1334          status.setStatus("remoteRpcCalls (count): " +
1335              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1336        }
1337        if (millisBetweenNextHistogram.getCount() > 0) {
1338          status.setStatus("millisBetweenNext (latency): " +
1339              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1340        }
1341        if (regionsScannedHistogram.getCount() > 0) {
1342          status.setStatus("regionsScanned (count): " +
1343              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1344        }
1345        if (bytesInResultsHistogram.getCount() > 0) {
1346          status.setStatus("bytesInResults (size): " +
1347              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1348        }
1349        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1350          status.setStatus("bytesInRemoteResults (size): " +
1351              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1352        }
1353      }
1354    }
1355
1356    abstract void onTakedown() throws IOException;
1357
1358
1359    /*
1360     * Run test
1361     * @return Elapsed time.
1362     * @throws IOException
1363     */
1364    long test() throws IOException, InterruptedException {
1365      testSetup();
1366      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1367      final long startTime = System.nanoTime();
1368      try {
1369        testTimed();
1370      } finally {
1371        testTakedown();
1372      }
1373      return (System.nanoTime() - startTime) / 1000000;
1374    }
1375
1376    int getStartRow() {
1377      return opts.startRow;
1378    }
1379
1380    int getLastRow() {
1381      return getStartRow() + opts.perClientRunRows;
1382    }
1383
1384    /**
1385     * Provides an extension point for tests that don't want a per row invocation.
1386     */
1387    void testTimed() throws IOException, InterruptedException {
1388      int startRow = getStartRow();
1389      int lastRow = getLastRow();
1390      // Report on completion of 1/10th of total.
1391      for (int ii = 0; ii < opts.cycles; ii++) {
1392        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1393        for (int i = startRow; i < lastRow; i++) {
1394          if (i % everyN != 0) continue;
1395          long startTime = System.nanoTime();
1396          boolean requestSent = false;
1397          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1398          try (Scope scope = span.makeCurrent()){
1399            requestSent = testRow(i, startTime);
1400          } finally {
1401            span.end();
1402          }
1403          if ( (i - startRow) > opts.measureAfter) {
1404            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1405            // first 9 times and sends the actual get request in the 10th iteration.
1406            // We should only set latency when actual request is sent because otherwise
1407            // it turns out to be 0.
1408            if (requestSent) {
1409              long latency = (System.nanoTime() - startTime) / 1000;
1410              latencyHistogram.update(latency);
1411              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1412                numOfReplyOverLatencyThreshold ++;
1413              }
1414            }
1415            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1416              status.setStatus(generateStatus(startRow, i, lastRow));
1417            }
1418          }
1419        }
1420      }
1421    }
1422
1423    /**
1424     * @return Subset of the histograms' calculation.
1425     */
1426    public String getShortLatencyReport() {
1427      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1428    }
1429
1430    /**
1431     * @return Subset of the histograms' calculation.
1432     */
1433    public String getShortValueSizeReport() {
1434      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1435    }
1436
1437
1438    /**
1439     * Test for individual row.
1440     * @param i Row index.
1441     * @return true if the row was sent to server and need to record metrics.
1442     *         False if not, multiGet and multiPut e.g., the rows are sent
1443     *         to server only if enough gets/puts are gathered.
1444     */
1445    abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException;
1446  }
1447
1448  static abstract class Test extends TestBase {
1449    protected Connection connection;
1450
1451    Test(final Connection con, final TestOptions options, final Status status) {
1452      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1453      this.connection = con;
1454    }
1455  }
1456
1457  static abstract class AsyncTest extends TestBase {
1458    protected AsyncConnection connection;
1459
1460    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1461      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1462      this.connection = con;
1463    }
1464  }
1465
1466  static abstract class TableTest extends Test {
1467    protected Table table;
1468
1469    TableTest(Connection con, TestOptions options, Status status) {
1470      super(con, options, status);
1471    }
1472
1473    @Override
1474    void onStartup() throws IOException {
1475      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1476    }
1477
1478    @Override
1479    void onTakedown() throws IOException {
1480      table.close();
1481    }
1482  }
1483
1484  /*
1485  * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1486  */
1487  static abstract class MetaTest extends TableTest {
1488    protected int keyLength;
1489
1490    MetaTest(Connection con, TestOptions options, Status status) {
1491      super(con, options, status);
1492      keyLength = Integer.toString(opts.perClientRunRows).length();
1493    }
1494
1495    @Override
1496    void onTakedown() throws IOException {
1497      // No clean up
1498    }
1499
1500    /*
1501    Generates Lexicographically ascending strings
1502     */
1503    protected byte[] getSplitKey(final int i) {
1504      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1505    }
1506
1507  }
1508
1509  static abstract class AsyncTableTest extends AsyncTest {
1510    protected AsyncTable<?> table;
1511
1512    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1513      super(con, options, status);
1514    }
1515
1516    @Override
1517    void onStartup() throws IOException {
1518      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1519    }
1520
1521    @Override
1522    void onTakedown() throws IOException {
1523    }
1524  }
1525
1526  static class AsyncRandomReadTest extends AsyncTableTest {
1527    private final Consistency consistency;
1528    private ArrayList<Get> gets;
1529    private Random rd = new Random();
1530
1531    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1532      super(con, options, status);
1533      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1534      if (opts.multiGet > 0) {
1535        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1536        this.gets = new ArrayList<>(opts.multiGet);
1537      }
1538    }
1539
1540    @Override
1541    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1542      if (opts.randomSleep > 0) {
1543        Thread.sleep(rd.nextInt(opts.randomSleep));
1544      }
1545      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1546      for (int family = 0; family < opts.families; family++) {
1547        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1548        if (opts.addColumns) {
1549          for (int column = 0; column < opts.columns; column++) {
1550            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1551            get.addColumn(familyName, qualifier);
1552          }
1553        } else {
1554          get.addFamily(familyName);
1555        }
1556      }
1557      if (opts.filterAll) {
1558        get.setFilter(new FilterAllFilter());
1559      }
1560      get.setConsistency(consistency);
1561      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1562      try {
1563        if (opts.multiGet > 0) {
1564          this.gets.add(get);
1565          if (this.gets.size() == opts.multiGet) {
1566            Result[] rs =
1567                this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1568            updateValueSize(rs);
1569            this.gets.clear();
1570          } else {
1571            return false;
1572          }
1573        } else {
1574          updateValueSize(this.table.get(get).get());
1575        }
1576      } catch (ExecutionException e) {
1577        throw new IOException(e);
1578      }
1579      return true;
1580    }
1581
1582    public static RuntimeException runtime(Throwable e) {
1583      if (e instanceof RuntimeException) {
1584        return (RuntimeException) e;
1585      }
1586      return new RuntimeException(e);
1587    }
1588
1589    public static <V> V propagate(Callable<V> callable) {
1590      try {
1591        return callable.call();
1592      } catch (Exception e) {
1593        throw runtime(e);
1594      }
1595    }
1596
1597    @Override
1598    protected int getReportingPeriod() {
1599      int period = opts.perClientRunRows / 10;
1600      return period == 0 ? opts.perClientRunRows : period;
1601    }
1602
1603    @Override
1604    protected void testTakedown() throws IOException {
1605      if (this.gets != null && this.gets.size() > 0) {
1606        this.table.get(gets);
1607        this.gets.clear();
1608      }
1609      super.testTakedown();
1610    }
1611  }
1612
1613  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1614
1615    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1616      super(con, options, status);
1617    }
1618
1619    @Override
1620    protected byte[] generateRow(final int i) {
1621      return getRandomRow(this.rand, opts.totalRows);
1622    }
1623  }
1624
1625  static class AsyncScanTest extends AsyncTableTest {
1626    private ResultScanner testScanner;
1627    private AsyncTable<?> asyncTable;
1628
1629    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1630      super(con, options, status);
1631    }
1632
1633    @Override
1634    void onStartup() throws IOException {
1635      this.asyncTable =
1636          connection.getTable(TableName.valueOf(opts.tableName),
1637            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1638    }
1639
1640    @Override
1641    void testTakedown() throws IOException {
1642      if (this.testScanner != null) {
1643        updateScanMetrics(this.testScanner.getScanMetrics());
1644        this.testScanner.close();
1645      }
1646      super.testTakedown();
1647    }
1648
1649    @Override
1650    boolean testRow(final int i, final long startTime) throws IOException {
1651      if (this.testScanner == null) {
1652        Scan scan =
1653            new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1654                .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1655                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1656        for (int family = 0; family < opts.families; family++) {
1657          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1658          if (opts.addColumns) {
1659            for (int column = 0; column < opts.columns; column++) {
1660              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1661              scan.addColumn(familyName, qualifier);
1662            }
1663          } else {
1664            scan.addFamily(familyName);
1665          }
1666        }
1667        if (opts.filterAll) {
1668          scan.setFilter(new FilterAllFilter());
1669        }
1670        this.testScanner = asyncTable.getScanner(scan);
1671      }
1672      Result r = testScanner.next();
1673      updateValueSize(r);
1674      return true;
1675    }
1676  }
1677
1678  static class AsyncSequentialReadTest extends AsyncTableTest {
1679    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1680      super(con, options, status);
1681    }
1682
1683    @Override
1684    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1685      Get get = new Get(format(i));
1686      for (int family = 0; family < opts.families; family++) {
1687        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1688        if (opts.addColumns) {
1689          for (int column = 0; column < opts.columns; column++) {
1690            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1691            get.addColumn(familyName, qualifier);
1692          }
1693        } else {
1694          get.addFamily(familyName);
1695        }
1696      }
1697      if (opts.filterAll) {
1698        get.setFilter(new FilterAllFilter());
1699      }
1700      try {
1701        updateValueSize(table.get(get).get());
1702      } catch (ExecutionException e) {
1703        throw new IOException(e);
1704      }
1705      return true;
1706    }
1707  }
1708
1709  static class AsyncSequentialWriteTest extends AsyncTableTest {
1710    private ArrayList<Put> puts;
1711
1712    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1713      super(con, options, status);
1714      if (opts.multiPut > 0) {
1715        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1716        this.puts = new ArrayList<>(opts.multiPut);
1717      }
1718    }
1719
1720    protected byte[] generateRow(final int i) {
1721      return format(i);
1722    }
1723
1724    @Override
1725    @SuppressWarnings("ReturnValueIgnored")
1726    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1727      byte[] row = generateRow(i);
1728      Put put = new Put(row);
1729      for (int family = 0; family < opts.families; family++) {
1730        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1731        for (int column = 0; column < opts.columns; column++) {
1732          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1733          byte[] value = generateData(this.rand, getValueLength(this.rand));
1734          if (opts.useTags) {
1735            byte[] tag = generateData(this.rand, TAG_LENGTH);
1736            Tag[] tags = new Tag[opts.noOfTags];
1737            for (int n = 0; n < opts.noOfTags; n++) {
1738              Tag t = new ArrayBackedTag((byte) n, tag);
1739              tags[n] = t;
1740            }
1741            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1742              value, tags);
1743            put.add(kv);
1744            updateValueSize(kv.getValueLength());
1745          } else {
1746            put.addColumn(familyName, qualifier, value);
1747            updateValueSize(value.length);
1748          }
1749        }
1750      }
1751      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1752      try {
1753        table.put(put).get();
1754        if (opts.multiPut > 0) {
1755          this.puts.add(put);
1756          if (this.puts.size() == opts.multiPut) {
1757            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1758            this.puts.clear();
1759          } else {
1760            return false;
1761          }
1762        } else {
1763          table.put(put).get();
1764        }
1765      } catch (ExecutionException e) {
1766        throw new IOException(e);
1767      }
1768      return true;
1769    }
1770  }
1771
1772  static abstract class BufferedMutatorTest extends Test {
1773    protected BufferedMutator mutator;
1774    protected Table table;
1775
1776    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1777      super(con, options, status);
1778    }
1779
1780    @Override
1781    void onStartup() throws IOException {
1782      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1783      p.writeBufferSize(opts.bufferSize);
1784      this.mutator = connection.getBufferedMutator(p);
1785      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1786    }
1787
1788    @Override
1789    void onTakedown() throws IOException {
1790      mutator.close();
1791      table.close();
1792    }
1793  }
1794
1795  static class RandomSeekScanTest extends TableTest {
1796    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1797      super(con, options, status);
1798    }
1799
1800    @Override
1801    boolean testRow(final int i, final long startTime) throws IOException {
1802      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1803          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1804          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1805          .setScanMetricsEnabled(true);
1806      FilterList list = new FilterList();
1807      for (int family = 0; family < opts.families; family++) {
1808        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1809        if (opts.addColumns) {
1810          for (int column = 0; column < opts.columns; column++) {
1811            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1812            scan.addColumn(familyName, qualifier);
1813          }
1814        } else {
1815          scan.addFamily(familyName);
1816        }
1817      }
1818      if (opts.filterAll) {
1819        list.addFilter(new FilterAllFilter());
1820      }
1821      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1822      scan.setFilter(list);
1823      ResultScanner s = this.table.getScanner(scan);
1824      try {
1825        for (Result rr; (rr = s.next()) != null;) {
1826          updateValueSize(rr);
1827        }
1828      } finally {
1829        updateScanMetrics(s.getScanMetrics());
1830        s.close();
1831      }
1832      return true;
1833    }
1834
1835    @Override
1836    protected int getReportingPeriod() {
1837      int period = opts.perClientRunRows / 100;
1838      return period == 0 ? opts.perClientRunRows : period;
1839    }
1840
1841  }
1842
1843  static abstract class RandomScanWithRangeTest extends TableTest {
1844    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1845      super(con, options, status);
1846    }
1847
1848    @Override
1849    boolean testRow(final int i, final long startTime) throws IOException {
1850      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1851      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1852          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1853          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1854          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1855      for (int family = 0; family < opts.families; family++) {
1856        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1857        if (opts.addColumns) {
1858          for (int column = 0; column < opts.columns; column++) {
1859            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1860            scan.addColumn(familyName, qualifier);
1861          }
1862        } else {
1863          scan.addFamily(familyName);
1864        }
1865      }
1866      if (opts.filterAll) {
1867        scan.setFilter(new FilterAllFilter());
1868      }
1869      Result r = null;
1870      int count = 0;
1871      ResultScanner s = this.table.getScanner(scan);
1872      try {
1873        for (; (r = s.next()) != null;) {
1874          updateValueSize(r);
1875          count++;
1876        }
1877        if (i % 100 == 0) {
1878          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1879            Bytes.toString(startAndStopRow.getFirst()),
1880            Bytes.toString(startAndStopRow.getSecond()), count));
1881        }
1882      } finally {
1883        updateScanMetrics(s.getScanMetrics());
1884        s.close();
1885      }
1886      return true;
1887    }
1888
1889    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1890
1891    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1892      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1893      int stop = start + maxRange;
1894      return new Pair<>(format(start), format(stop));
1895    }
1896
1897    @Override
1898    protected int getReportingPeriod() {
1899      int period = opts.perClientRunRows / 100;
1900      return period == 0? opts.perClientRunRows: period;
1901    }
1902  }
1903
1904  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1905    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1906      super(con, options, status);
1907    }
1908
1909    @Override
1910    protected Pair<byte[], byte[]> getStartAndStopRow() {
1911      return generateStartAndStopRows(10);
1912    }
1913  }
1914
1915  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1916    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1917      super(con, options, status);
1918    }
1919
1920    @Override
1921    protected Pair<byte[], byte[]> getStartAndStopRow() {
1922      return generateStartAndStopRows(100);
1923    }
1924  }
1925
1926  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1927    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1928      super(con, options, status);
1929    }
1930
1931    @Override
1932    protected Pair<byte[], byte[]> getStartAndStopRow() {
1933      return generateStartAndStopRows(1000);
1934    }
1935  }
1936
1937  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1938    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1939      super(con, options, status);
1940    }
1941
1942    @Override
1943    protected Pair<byte[], byte[]> getStartAndStopRow() {
1944      return generateStartAndStopRows(10000);
1945    }
1946  }
1947
1948  static class RandomReadTest extends TableTest {
1949    private final Consistency consistency;
1950    private ArrayList<Get> gets;
1951    private Random rd = new Random();
1952    private long numOfReplyFromReplica = 0;
1953
1954    RandomReadTest(Connection con, TestOptions options, Status status) {
1955      super(con, options, status);
1956      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1957      if (opts.multiGet > 0) {
1958        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1959        this.gets = new ArrayList<>(opts.multiGet);
1960      }
1961    }
1962
1963    @Override
1964    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1965      if (opts.randomSleep > 0) {
1966        Thread.sleep(rd.nextInt(opts.randomSleep));
1967      }
1968      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1969      for (int family = 0; family < opts.families; family++) {
1970        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1971        if (opts.addColumns) {
1972          for (int column = 0; column < opts.columns; column++) {
1973            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1974            get.addColumn(familyName, qualifier);
1975          }
1976        } else {
1977          get.addFamily(familyName);
1978        }
1979      }
1980      if (opts.filterAll) {
1981        get.setFilter(new FilterAllFilter());
1982      }
1983      get.setConsistency(consistency);
1984      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1985      if (opts.multiGet > 0) {
1986        this.gets.add(get);
1987        if (this.gets.size() == opts.multiGet) {
1988          Result [] rs = this.table.get(this.gets);
1989          if (opts.replicas > 1) {
1990            long latency = System.nanoTime() - startTime;
1991            updateValueSize(rs, latency);
1992          } else {
1993            updateValueSize(rs);
1994          }
1995          this.gets.clear();
1996        } else {
1997          return false;
1998        }
1999      } else {
2000        if (opts.replicas > 1) {
2001          Result r = this.table.get(get);
2002          long latency = System.nanoTime() - startTime;
2003          updateValueSize(r, latency);
2004        } else {
2005          updateValueSize(this.table.get(get));
2006        }
2007      }
2008      return true;
2009    }
2010
2011    @Override
2012    protected int getReportingPeriod() {
2013      int period = opts.perClientRunRows / 10;
2014      return period == 0 ? opts.perClientRunRows : period;
2015    }
2016
2017    @Override
2018    protected void testTakedown() throws IOException {
2019      if (this.gets != null && this.gets.size() > 0) {
2020        this.table.get(gets);
2021        this.gets.clear();
2022      }
2023      super.testTakedown();
2024    }
2025  }
2026
2027  /*
2028  * Send random reads against fake regions inserted by MetaWriteTest
2029  */
2030  static class MetaRandomReadTest extends MetaTest {
2031    private RegionLocator regionLocator;
2032
2033    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2034      super(con, options, status);
2035      LOG.info("call getRegionLocation");
2036    }
2037
2038    @Override
2039    void onStartup() throws IOException {
2040      super.onStartup();
2041      this.regionLocator = connection.getRegionLocator(table.getName());
2042    }
2043
2044    @Override
2045    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2046      if (opts.randomSleep > 0) {
2047        Thread.sleep(rand.nextInt(opts.randomSleep));
2048      }
2049      HRegionLocation hRegionLocation = regionLocator.getRegionLocation(
2050        getSplitKey(rand.nextInt(opts.perClientRunRows)), true);
2051      LOG.debug("get location for region: " + hRegionLocation);
2052      return true;
2053    }
2054
2055    @Override
2056    protected int getReportingPeriod() {
2057      int period = opts.perClientRunRows / 10;
2058      return period == 0 ? opts.perClientRunRows : period;
2059    }
2060
2061    @Override
2062    protected void testTakedown() throws IOException {
2063      super.testTakedown();
2064    }
2065  }
2066
2067  static class RandomWriteTest extends SequentialWriteTest {
2068    RandomWriteTest(Connection con, TestOptions options, Status status) {
2069      super(con, options, status);
2070    }
2071
2072    @Override
2073    protected byte[] generateRow(final int i) {
2074      return getRandomRow(this.rand, opts.totalRows);
2075    }
2076
2077
2078  }
2079
2080  static class ScanTest extends TableTest {
2081    private ResultScanner testScanner;
2082
2083    ScanTest(Connection con, TestOptions options, Status status) {
2084      super(con, options, status);
2085    }
2086
2087    @Override
2088    void testTakedown() throws IOException {
2089      if (this.testScanner != null) {
2090        this.testScanner.close();
2091      }
2092      super.testTakedown();
2093    }
2094
2095
2096    @Override
2097    boolean testRow(final int i, final long startTime) throws IOException {
2098      if (this.testScanner == null) {
2099        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2100            .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2101            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2102        for (int family = 0; family < opts.families; family++) {
2103          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2104          if (opts.addColumns) {
2105            for (int column = 0; column < opts.columns; column++) {
2106              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2107              scan.addColumn(familyName, qualifier);
2108            }
2109          } else {
2110            scan.addFamily(familyName);
2111          }
2112        }
2113        if (opts.filterAll) {
2114          scan.setFilter(new FilterAllFilter());
2115        }
2116        this.testScanner = table.getScanner(scan);
2117      }
2118      Result r = testScanner.next();
2119      updateValueSize(r);
2120      return true;
2121    }
2122  }
2123
2124  /**
2125   * Base class for operations that are CAS-like; that read a value and then set it based off what
2126   * they read. In this category is increment, append, checkAndPut, etc.
2127   *
2128   * <p>These operations also want some concurrency going on. Usually when these tests run, they
2129   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2130   * same key space. We do this with our getStartRow and getLastRow overrides.
2131   */
2132  static abstract class CASTableTest extends TableTest {
2133    private final byte [] qualifier;
2134    CASTableTest(Connection con, TestOptions options, Status status) {
2135      super(con, options, status);
2136      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2137    }
2138
2139    byte [] getQualifier() {
2140      return this.qualifier;
2141    }
2142
2143    @Override
2144    int getStartRow() {
2145      return 0;
2146    }
2147
2148    @Override
2149    int getLastRow() {
2150      return opts.perClientRunRows;
2151    }
2152  }
2153
2154  static class IncrementTest extends CASTableTest {
2155    IncrementTest(Connection con, TestOptions options, Status status) {
2156      super(con, options, status);
2157    }
2158
2159    @Override
2160    boolean testRow(final int i, final long startTime) throws IOException {
2161      Increment increment = new Increment(format(i));
2162      // unlike checkAndXXX tests, which make most sense to do on a single value,
2163      // if multiple families are specified for an increment test we assume it is
2164      // meant to raise the work factor
2165      for (int family = 0; family < opts.families; family++) {
2166        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2167        increment.addColumn(familyName, getQualifier(), 1l);
2168      }
2169      updateValueSize(this.table.increment(increment));
2170      return true;
2171    }
2172  }
2173
2174  static class AppendTest extends CASTableTest {
2175    AppendTest(Connection con, TestOptions options, Status status) {
2176      super(con, options, status);
2177    }
2178
2179    @Override
2180    boolean testRow(final int i, final long startTime) throws IOException {
2181      byte [] bytes = format(i);
2182      Append append = new Append(bytes);
2183      // unlike checkAndXXX tests, which make most sense to do on a single value,
2184      // if multiple families are specified for an append test we assume it is
2185      // meant to raise the work factor
2186      for (int family = 0; family < opts.families; family++) {
2187        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2188        append.addColumn(familyName, getQualifier(), bytes);
2189      }
2190      updateValueSize(this.table.append(append));
2191      return true;
2192    }
2193  }
2194
2195  static class CheckAndMutateTest extends CASTableTest {
2196    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2197      super(con, options, status);
2198    }
2199
2200    @Override
2201    boolean testRow(final int i, final long startTime) throws IOException {
2202      final byte [] bytes = format(i);
2203      // checkAndXXX tests operate on only a single value
2204      // Put a known value so when we go to check it, it is there.
2205      Put put = new Put(bytes);
2206      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2207      this.table.put(put);
2208      RowMutations mutations = new RowMutations(bytes);
2209      mutations.add(put);
2210      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2211          .ifEquals(bytes).thenMutate(mutations);
2212      return true;
2213    }
2214  }
2215
2216  static class CheckAndPutTest extends CASTableTest {
2217    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2218      super(con, options, status);
2219    }
2220
2221    @Override
2222    boolean testRow(final int i, final long startTime) throws IOException {
2223      final byte [] bytes = format(i);
2224      // checkAndXXX tests operate on only a single value
2225      // Put a known value so when we go to check it, it is there.
2226      Put put = new Put(bytes);
2227      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2228      this.table.put(put);
2229      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2230          .ifEquals(bytes).thenPut(put);
2231      return true;
2232    }
2233  }
2234
2235  static class CheckAndDeleteTest extends CASTableTest {
2236    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2237      super(con, options, status);
2238    }
2239
2240    @Override
2241    boolean testRow(final int i, final long startTime) throws IOException {
2242      final byte [] bytes = format(i);
2243      // checkAndXXX tests operate on only a single value
2244      // Put a known value so when we go to check it, it is there.
2245      Put put = new Put(bytes);
2246      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2247      this.table.put(put);
2248      Delete delete = new Delete(put.getRow());
2249      delete.addColumn(FAMILY_ZERO, getQualifier());
2250      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2251          .ifEquals(bytes).thenDelete(delete);
2252      return true;
2253    }
2254  }
2255
2256  /*
2257  * Delete all fake regions inserted to meta table by MetaWriteTest.
2258  */
2259  static class CleanMetaTest extends MetaTest {
2260    CleanMetaTest(Connection con, TestOptions options, Status status) {
2261      super(con, options, status);
2262    }
2263
2264    @Override
2265    boolean testRow(final int i, final long startTime) throws IOException {
2266      try {
2267        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2268          .getRegionLocation(getSplitKey(i), false).getRegion();
2269        LOG.debug("deleting region from meta: " + regionInfo);
2270
2271        Delete delete = MetaTableAccessor
2272          .makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2273        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2274          t.delete(delete);
2275        }
2276      } catch (IOException ie) {
2277        // Log and continue
2278        LOG.error("cannot find region with start key: " + i);
2279      }
2280      return true;
2281    }
2282  }
2283
2284  static class SequentialReadTest extends TableTest {
2285    SequentialReadTest(Connection con, TestOptions options, Status status) {
2286      super(con, options, status);
2287    }
2288
2289    @Override
2290    boolean testRow(final int i, final long startTime) throws IOException {
2291      Get get = new Get(format(i));
2292      for (int family = 0; family < opts.families; family++) {
2293        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2294        if (opts.addColumns) {
2295          for (int column = 0; column < opts.columns; column++) {
2296            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2297            get.addColumn(familyName, qualifier);
2298          }
2299        } else {
2300          get.addFamily(familyName);
2301        }
2302      }
2303      if (opts.filterAll) {
2304        get.setFilter(new FilterAllFilter());
2305      }
2306      updateValueSize(table.get(get));
2307      return true;
2308    }
2309  }
2310
2311  static class SequentialWriteTest extends BufferedMutatorTest {
2312    private ArrayList<Put> puts;
2313
2314
2315    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2316      super(con, options, status);
2317      if (opts.multiPut > 0) {
2318        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2319        this.puts = new ArrayList<>(opts.multiPut);
2320      }
2321    }
2322
2323    protected byte[] generateRow(final int i) {
2324      return format(i);
2325    }
2326
2327    @Override
2328    boolean testRow(final int i, final long startTime) throws IOException {
2329      byte[] row = generateRow(i);
2330      Put put = new Put(row);
2331      for (int family = 0; family < opts.families; family++) {
2332        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2333        for (int column = 0; column < opts.columns; column++) {
2334          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2335          byte[] value = generateData(this.rand, getValueLength(this.rand));
2336          if (opts.useTags) {
2337            byte[] tag = generateData(this.rand, TAG_LENGTH);
2338            Tag[] tags = new Tag[opts.noOfTags];
2339            for (int n = 0; n < opts.noOfTags; n++) {
2340              Tag t = new ArrayBackedTag((byte) n, tag);
2341              tags[n] = t;
2342            }
2343            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2344              value, tags);
2345            put.add(kv);
2346            updateValueSize(kv.getValueLength());
2347          } else {
2348            put.addColumn(familyName, qualifier, value);
2349            updateValueSize(value.length);
2350          }
2351        }
2352      }
2353      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2354      if (opts.autoFlush) {
2355        if (opts.multiPut > 0) {
2356          this.puts.add(put);
2357          if (this.puts.size() == opts.multiPut) {
2358            table.put(this.puts);
2359            this.puts.clear();
2360          } else {
2361            return false;
2362          }
2363        } else {
2364          table.put(put);
2365        }
2366      } else {
2367        mutator.mutate(put);
2368      }
2369      return true;
2370    }
2371  }
2372
2373  /*
2374  * Insert fake regions into meta table with contiguous split keys.
2375  */
2376  static class MetaWriteTest extends MetaTest {
2377
2378    MetaWriteTest(Connection con, TestOptions options, Status status) {
2379      super(con, options, status);
2380    }
2381
2382    @Override
2383    boolean testRow(final int i, final long startTime) throws IOException {
2384      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2385      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2386        .setStartKey(getSplitKey(i))
2387        .setEndKey(getSplitKey(i + 1))
2388        .build());
2389      regionInfos.add(regionInfo);
2390      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2391
2392      // write the serverName columns
2393      MetaTableAccessor.updateRegionLocation(connection,
2394        regionInfo, ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2395        EnvironmentEdgeManager.currentTime());
2396      return true;
2397    }
2398  }
2399  static class FilteredScanTest extends TableTest {
2400    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2401
2402    FilteredScanTest(Connection con, TestOptions options, Status status) {
2403      super(con, options, status);
2404      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2405        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB +
2406                ". This could take a very long time.");
2407      }
2408    }
2409
2410    @Override
2411    boolean testRow(int i, final long startTime) throws IOException {
2412      byte[] value = generateData(this.rand, getValueLength(this.rand));
2413      Scan scan = constructScan(value);
2414      ResultScanner scanner = null;
2415      try {
2416        scanner = this.table.getScanner(scan);
2417        for (Result r = null; (r = scanner.next()) != null;) {
2418          updateValueSize(r);
2419        }
2420      } finally {
2421        if (scanner != null) {
2422          updateScanMetrics(scanner.getScanMetrics());
2423          scanner.close();
2424        }
2425      }
2426      return true;
2427    }
2428
2429    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2430      FilterList list = new FilterList();
2431      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2432        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2433      list.addFilter(filter);
2434      if (opts.filterAll) {
2435        list.addFilter(new FilterAllFilter());
2436      }
2437      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2438          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2439          .setScanMetricsEnabled(true);
2440      if (opts.addColumns) {
2441        for (int column = 0; column < opts.columns; column++) {
2442          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2443          scan.addColumn(FAMILY_ZERO, qualifier);
2444        }
2445      } else {
2446        scan.addFamily(FAMILY_ZERO);
2447      }
2448      scan.setFilter(list);
2449      return scan;
2450    }
2451  }
2452
2453  /**
2454   * Compute a throughput rate in MB/s.
2455   * @param rows Number of records consumed.
2456   * @param timeMs Time taken in milliseconds.
2457   * @return String value with label, ie '123.76 MB/s'
2458   */
2459  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2460    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2461      ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2462    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2463      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2464      .divide(BYTES_PER_MB, CXT);
2465    return FMT.format(mbps) + " MB/s";
2466  }
2467
2468  /*
2469   * Format passed integer.
2470   * @param number
2471   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2472   * number (Does absolute in case number is negative).
2473   */
2474  public static byte [] format(final int number) {
2475    byte [] b = new byte[ROW_LENGTH];
2476    int d = Math.abs(number);
2477    for (int i = b.length - 1; i >= 0; i--) {
2478      b[i] = (byte)((d % 10) + '0');
2479      d /= 10;
2480    }
2481    return b;
2482  }
2483
2484  /*
2485   * This method takes some time and is done inline uploading data.  For
2486   * example, doing the mapfile test, generation of the key and value
2487   * consumes about 30% of CPU time.
2488   * @return Generated random value to insert into a table cell.
2489   */
2490  public static byte[] generateData(final Random r, int length) {
2491    byte [] b = new byte [length];
2492    int i;
2493
2494    for(i = 0; i < (length-8); i += 8) {
2495      b[i] = (byte) (65 + r.nextInt(26));
2496      b[i+1] = b[i];
2497      b[i+2] = b[i];
2498      b[i+3] = b[i];
2499      b[i+4] = b[i];
2500      b[i+5] = b[i];
2501      b[i+6] = b[i];
2502      b[i+7] = b[i];
2503    }
2504
2505    byte a = (byte) (65 + r.nextInt(26));
2506    for(; i < length; i++) {
2507      b[i] = a;
2508    }
2509    return b;
2510  }
2511
2512  static byte [] getRandomRow(final Random random, final int totalRows) {
2513    return format(generateRandomRow(random, totalRows));
2514  }
2515
2516  static int generateRandomRow(final Random random, final int totalRows) {
2517    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2518  }
2519
2520  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2521      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2522      throws IOException, InterruptedException {
2523    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2524        + opts.perClientRunRows + " rows");
2525    long totalElapsedTime;
2526
2527    final TestBase t;
2528    try {
2529      if (AsyncTest.class.isAssignableFrom(cmd)) {
2530        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2531        Constructor<? extends AsyncTest> constructor =
2532            newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2533        t = constructor.newInstance(asyncCon, opts, status);
2534      } else {
2535        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2536        Constructor<? extends Test> constructor =
2537            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2538        t = constructor.newInstance(con, opts, status);
2539      }
2540    } catch (NoSuchMethodException e) {
2541      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2542          + ".  It does not provide a constructor as described by "
2543          + "the javadoc comment.  Available constructors are: "
2544          + Arrays.toString(cmd.getConstructors()));
2545    } catch (Exception e) {
2546      throw new IllegalStateException("Failed to construct command class", e);
2547    }
2548    totalElapsedTime = t.test();
2549
2550    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2551      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2552      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2553          getAverageValueLength(opts), opts.families, opts.columns) + ")");
2554
2555    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2556      t.numOfReplyFromReplica, t.getLatencyHistogram());
2557  }
2558
2559  private static int getAverageValueLength(final TestOptions opts) {
2560    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2561  }
2562
2563  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2564      InterruptedException, ClassNotFoundException, ExecutionException {
2565    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2566    // the TestOptions introspection for us and dump the output in a readable format.
2567    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2568    Admin admin = null;
2569    Connection connection = null;
2570    try {
2571      connection = ConnectionFactory.createConnection(getConf());
2572      admin = connection.getAdmin();
2573      checkTable(admin, opts);
2574    } finally {
2575      if (admin != null) admin.close();
2576      if (connection != null) connection.close();
2577    }
2578    if (opts.nomapred) {
2579      doLocalClients(opts, getConf());
2580    } else {
2581      doMapReduce(opts, getConf());
2582    }
2583  }
2584
2585  protected void printUsage() {
2586    printUsage(PE_COMMAND_SHORTNAME, null);
2587  }
2588
2589  protected static void printUsage(final String message) {
2590    printUsage(PE_COMMAND_SHORTNAME, message);
2591  }
2592
2593  protected static void printUsageAndExit(final String message, final int exitCode) {
2594    printUsage(message);
2595    System.exit(exitCode);
2596  }
2597
2598  protected static void printUsage(final String shortName, final String message) {
2599    if (message != null && message.length() > 0) {
2600      System.err.println(message);
2601    }
2602    System.err.print("Usage: hbase " + shortName);
2603    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2604    System.err.println();
2605    System.err.println("General Options:");
2606    System.err.println(" nomapred        Run multiple clients using threads " +
2607      "(rather than use mapreduce)");
2608    System.err.println(" oneCon          all the threads share the same connection. Default: False");
2609    System.err.println(" connCount          connections all threads share. "
2610        + "For example, if set to 2, then all thread share 2 connection. "
2611        + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2612        + "if not, connCount=thread number");
2613
2614    System.err.println(" sampleRate      Execute test on a sample of total " +
2615      "rows. Only supported by randomRead. Default: 1.0");
2616    System.err.println(" period          Report every 'period' rows: " +
2617      "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2618    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2619    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
2620      "Default: 0");
2621    System.err.println(" latency         Set to report operation latencies. Default: False");
2622    System.err.println(" latencyThreshold  Set to report number of operations with latency " +
2623      "over lantencyThreshold, unit in millisecond, default 0");
2624    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'" +
2625        " rows have been treated. Default: 0");
2626    System.err.println(" valueSize       Pass value size to use: Default: "
2627        + DEFAULT_OPTS.getValueSize());
2628    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
2629        "'valueSize'; set on read for stats on size: Default: Not set.");
2630    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2631        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2632    System.err.println();
2633    System.err.println("Table Creation / Write Tests:");
2634    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2635    System.err.println(" rows            Rows each client runs. Default: "
2636        + DEFAULT_OPTS.getPerClientRunRows()
2637        + ".  In case of randomReads and randomSeekScans this could"
2638        + " be specified along with --size to specify the number of rows to be scanned within"
2639        + " the total range specified by the size.");
2640    System.err.println(
2641      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2642          + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2643          + " use size to specify the end range and --rows"
2644          + " specifies the number of rows within that range. " + "Default: 1.0.");
2645    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2646    System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
2647      "Default: false");
2648    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
2649        "'valueSize' in zipf form: Default: Not set.");
2650    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2651    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2652    System.err.println(" multiPut        Batch puts together into groups of N. Only supported " +
2653        "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2654    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2655        + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2656        + "Recommended for accurate perf analysis (see guide). Default: disabled");
2657    System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
2658      "Default: false");
2659    System.err.println(" numoftags       Specify the no of tags that would be needed. " +
2660       "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2661    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2662    System.err.println(" columns         Columns to write per row. Default: 1");
2663    System.err.println(" families        Specify number of column families for the table. Default: 1");
2664    System.err.println();
2665    System.err.println("Read Tests:");
2666    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2667        + " there by not returning any thing back to the client.  Helps to check the server side"
2668        + " performance.  Uses FilterAllFilter internally. ");
2669    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
2670      "by randomRead. Default: disabled");
2671    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
2672      "inmemory as far as possible. Not guaranteed that reads are always served " +
2673      "from memory.  Default: false");
2674    System.err.println(" bloomFilter     Bloom filter type, one of "
2675        + Arrays.toString(BloomType.values()));
2676    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2677    System.err.println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2678        + "Uses the CompactingMemstore");
2679    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2680    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2681    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2682    System.err.println(" caching         Scan caching to use. Default: 30");
2683    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2684    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2685    System.err.println(" scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2686    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2687    System.err.println();
2688    System.err.println(" Note: -D properties will be applied to the conf used. ");
2689    System.err.println("  For example: ");
2690    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2691    System.err.println("   -Dmapreduce.task.timeout=60000");
2692    System.err.println();
2693    System.err.println("Command:");
2694    for (CmdDescriptor command : COMMANDS.values()) {
2695      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2696    }
2697    System.err.println();
2698    System.err.println("Args:");
2699    System.err.println(" nclients        Integer. Required. Total number of clients "
2700        + "(and HRegionServers) running. 1 <= value <= 500");
2701    System.err.println("Examples:");
2702    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2703    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2704    System.err.println(" To run 10 clients doing increments over ten rows:");
2705    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2706  }
2707
2708  /**
2709   * Parse options passed in via an arguments array. Assumes that array has been split
2710   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2711   * in the queue at the conclusion of this method call. It's up to the caller to deal
2712   * with these unrecognized arguments.
2713   */
2714  static TestOptions parseOpts(Queue<String> args) {
2715    TestOptions opts = new TestOptions();
2716
2717    String cmd = null;
2718    while ((cmd = args.poll()) != null) {
2719      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2720        // place item back onto queue so that caller knows parsing was incomplete
2721        args.add(cmd);
2722        break;
2723      }
2724
2725      final String nmr = "--nomapred";
2726      if (cmd.startsWith(nmr)) {
2727        opts.nomapred = true;
2728        continue;
2729      }
2730
2731      final String rows = "--rows=";
2732      if (cmd.startsWith(rows)) {
2733        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2734        continue;
2735      }
2736
2737      final String cycles = "--cycles=";
2738      if (cmd.startsWith(cycles)) {
2739        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2740        continue;
2741      }
2742
2743      final String sampleRate = "--sampleRate=";
2744      if (cmd.startsWith(sampleRate)) {
2745        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2746        continue;
2747      }
2748
2749      final String table = "--table=";
2750      if (cmd.startsWith(table)) {
2751        opts.tableName = cmd.substring(table.length());
2752        continue;
2753      }
2754
2755      final String startRow = "--startRow=";
2756      if (cmd.startsWith(startRow)) {
2757        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2758        continue;
2759      }
2760
2761      final String compress = "--compress=";
2762      if (cmd.startsWith(compress)) {
2763        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2764        continue;
2765      }
2766
2767      final String traceRate = "--traceRate=";
2768      if (cmd.startsWith(traceRate)) {
2769        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2770        continue;
2771      }
2772
2773      final String blockEncoding = "--blockEncoding=";
2774      if (cmd.startsWith(blockEncoding)) {
2775        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2776        continue;
2777      }
2778
2779      final String flushCommits = "--flushCommits=";
2780      if (cmd.startsWith(flushCommits)) {
2781        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2782        continue;
2783      }
2784
2785      final String writeToWAL = "--writeToWAL=";
2786      if (cmd.startsWith(writeToWAL)) {
2787        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2788        continue;
2789      }
2790
2791      final String presplit = "--presplit=";
2792      if (cmd.startsWith(presplit)) {
2793        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2794        continue;
2795      }
2796
2797      final String inMemory = "--inmemory=";
2798      if (cmd.startsWith(inMemory)) {
2799        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2800        continue;
2801      }
2802
2803      final String autoFlush = "--autoFlush=";
2804      if (cmd.startsWith(autoFlush)) {
2805        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2806        continue;
2807      }
2808
2809      final String onceCon = "--oneCon=";
2810      if (cmd.startsWith(onceCon)) {
2811        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2812        continue;
2813      }
2814
2815      final String connCount = "--connCount=";
2816      if (cmd.startsWith(connCount)) {
2817        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2818        continue;
2819      }
2820
2821      final String latencyThreshold = "--latencyThreshold=";
2822      if (cmd.startsWith(latencyThreshold)) {
2823        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2824        continue;
2825      }
2826
2827      final String latency = "--latency";
2828      if (cmd.startsWith(latency)) {
2829        opts.reportLatency = true;
2830        continue;
2831      }
2832
2833      final String multiGet = "--multiGet=";
2834      if (cmd.startsWith(multiGet)) {
2835        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2836        continue;
2837      }
2838
2839      final String multiPut = "--multiPut=";
2840      if (cmd.startsWith(multiPut)) {
2841        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2842        continue;
2843      }
2844
2845      final String useTags = "--usetags=";
2846      if (cmd.startsWith(useTags)) {
2847        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2848        continue;
2849      }
2850
2851      final String noOfTags = "--numoftags=";
2852      if (cmd.startsWith(noOfTags)) {
2853        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2854        continue;
2855      }
2856
2857      final String replicas = "--replicas=";
2858      if (cmd.startsWith(replicas)) {
2859        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2860        continue;
2861      }
2862
2863      final String filterOutAll = "--filterAll";
2864      if (cmd.startsWith(filterOutAll)) {
2865        opts.filterAll = true;
2866        continue;
2867      }
2868
2869      final String size = "--size=";
2870      if (cmd.startsWith(size)) {
2871        opts.size = Float.parseFloat(cmd.substring(size.length()));
2872        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2873        continue;
2874      }
2875
2876      final String splitPolicy = "--splitPolicy=";
2877      if (cmd.startsWith(splitPolicy)) {
2878        opts.splitPolicy = cmd.substring(splitPolicy.length());
2879        continue;
2880      }
2881
2882      final String randomSleep = "--randomSleep=";
2883      if (cmd.startsWith(randomSleep)) {
2884        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2885        continue;
2886      }
2887
2888      final String measureAfter = "--measureAfter=";
2889      if (cmd.startsWith(measureAfter)) {
2890        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2891        continue;
2892      }
2893
2894      final String bloomFilter = "--bloomFilter=";
2895      if (cmd.startsWith(bloomFilter)) {
2896        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2897        continue;
2898      }
2899
2900      final String blockSize = "--blockSize=";
2901      if(cmd.startsWith(blockSize) ) {
2902        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2903        continue;
2904      }
2905
2906      final String valueSize = "--valueSize=";
2907      if (cmd.startsWith(valueSize)) {
2908        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2909        continue;
2910      }
2911
2912      final String valueRandom = "--valueRandom";
2913      if (cmd.startsWith(valueRandom)) {
2914        opts.valueRandom = true;
2915        continue;
2916      }
2917
2918      final String valueZipf = "--valueZipf";
2919      if (cmd.startsWith(valueZipf)) {
2920        opts.valueZipf = true;
2921        continue;
2922      }
2923
2924      final String period = "--period=";
2925      if (cmd.startsWith(period)) {
2926        opts.period = Integer.parseInt(cmd.substring(period.length()));
2927        continue;
2928      }
2929
2930      final String addColumns = "--addColumns=";
2931      if (cmd.startsWith(addColumns)) {
2932        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2933        continue;
2934      }
2935
2936      final String inMemoryCompaction = "--inmemoryCompaction=";
2937      if (cmd.startsWith(inMemoryCompaction)) {
2938        opts.inMemoryCompaction =
2939            MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2940        continue;
2941      }
2942
2943      final String columns = "--columns=";
2944      if (cmd.startsWith(columns)) {
2945        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2946        continue;
2947      }
2948
2949      final String families = "--families=";
2950      if (cmd.startsWith(families)) {
2951        opts.families = Integer.parseInt(cmd.substring(families.length()));
2952        continue;
2953      }
2954
2955      final String caching = "--caching=";
2956      if (cmd.startsWith(caching)) {
2957        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2958        continue;
2959      }
2960
2961      final String asyncPrefetch = "--asyncPrefetch";
2962      if (cmd.startsWith(asyncPrefetch)) {
2963        opts.asyncPrefetch = true;
2964        continue;
2965      }
2966
2967      final String cacheBlocks = "--cacheBlocks=";
2968      if (cmd.startsWith(cacheBlocks)) {
2969        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2970        continue;
2971      }
2972
2973      final String scanReadType = "--scanReadType=";
2974      if (cmd.startsWith(scanReadType)) {
2975        opts.scanReadType =
2976            Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2977        continue;
2978      }
2979
2980      final String bufferSize = "--bufferSize=";
2981      if (cmd.startsWith(bufferSize)) {
2982        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2983        continue;
2984      }
2985
2986      validateParsedOpts(opts);
2987
2988      if (isCommandClass(cmd)) {
2989        opts.cmdName = cmd;
2990        try {
2991          opts.numClientThreads = Integer.parseInt(args.remove());
2992        } catch (NoSuchElementException | NumberFormatException e) {
2993          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2994        }
2995        opts = calculateRowsAndSize(opts);
2996        break;
2997      } else {
2998        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2999      }
3000
3001      // Not matching any option or command.
3002      System.err.println("Error: Wrong option or command: " + cmd);
3003      args.add(cmd);
3004      break;
3005    }
3006    return opts;
3007  }
3008
3009  /**
3010  * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3011  */
3012  private static void validateParsedOpts(TestOptions opts)  {
3013
3014    if (!opts.autoFlush && opts.multiPut > 0) {
3015      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3016    }
3017
3018    if (opts.oneCon && opts.connCount > 1) {
3019      throw new IllegalArgumentException("oneCon is set to true, "
3020          + "connCount should not bigger than 1");
3021    }
3022
3023    if (opts.valueZipf && opts.valueRandom) {
3024      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3025    }
3026  }
3027
3028  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3029    int rowsPerGB = getRowsPerGB(opts);
3030    if ((opts.getCmdName() != null
3031        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3032        && opts.size != DEFAULT_OPTS.size
3033        && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
3034      opts.totalRows = (int) opts.size * rowsPerGB;
3035    } else if (opts.size != DEFAULT_OPTS.size) {
3036      // total size in GB specified
3037      opts.totalRows = (int) opts.size * rowsPerGB;
3038      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3039    } else {
3040      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3041      opts.size = opts.totalRows / rowsPerGB;
3042    }
3043    return opts;
3044  }
3045
3046  static int getRowsPerGB(final TestOptions opts) {
3047    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
3048        opts.getColumns());
3049  }
3050
3051  @Override
3052  public int run(String[] args) throws Exception {
3053    // Process command-line args. TODO: Better cmd-line processing
3054    // (but hopefully something not as painful as cli options).
3055    int errCode = -1;
3056    if (args.length < 1) {
3057      printUsage();
3058      return errCode;
3059    }
3060
3061    try {
3062      LinkedList<String> argv = new LinkedList<>();
3063      argv.addAll(Arrays.asList(args));
3064      TestOptions opts = parseOpts(argv);
3065
3066      // args remaining, print help and exit
3067      if (!argv.isEmpty()) {
3068        errCode = 0;
3069        printUsage();
3070        return errCode;
3071      }
3072
3073      // must run at least 1 client
3074      if (opts.numClientThreads <= 0) {
3075        throw new IllegalArgumentException("Number of clients must be > 0");
3076      }
3077
3078      // cmdName should not be null, print help and exit
3079      if (opts.cmdName == null) {
3080        printUsage();
3081        return errCode;
3082      }
3083
3084      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3085      if (cmdClass != null) {
3086        runTest(cmdClass, opts);
3087        errCode = 0;
3088      }
3089
3090    } catch (Exception e) {
3091      e.printStackTrace();
3092    }
3093
3094    return errCode;
3095  }
3096
3097  private static boolean isCommandClass(String cmd) {
3098    return COMMANDS.containsKey(cmd);
3099  }
3100
3101  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3102    CmdDescriptor descriptor = COMMANDS.get(cmd);
3103    return descriptor != null ? descriptor.getCmdClass() : null;
3104  }
3105
3106  public static void main(final String[] args) throws Exception {
3107    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3108    System.exit(res);
3109  }
3110}