001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import com.codahale.metrics.Histogram;
022import com.codahale.metrics.UniformReservoir;
023import java.io.IOException;
024import java.io.PrintStream;
025import java.lang.reflect.Constructor;
026import java.math.BigDecimal;
027import java.math.MathContext;
028import java.text.DecimalFormat;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Date;
033import java.util.LinkedList;
034import java.util.Locale;
035import java.util.Map;
036import java.util.NoSuchElementException;
037import java.util.Queue;
038import java.util.Random;
039import java.util.TreeMap;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import org.apache.commons.lang3.StringUtils;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.conf.Configured;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.Append;
052import org.apache.hadoop.hbase.client.AsyncConnection;
053import org.apache.hadoop.hbase.client.AsyncTable;
054import org.apache.hadoop.hbase.client.BufferedMutator;
055import org.apache.hadoop.hbase.client.BufferedMutatorParams;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.Consistency;
060import org.apache.hadoop.hbase.client.Delete;
061import org.apache.hadoop.hbase.client.Durability;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Increment;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.Result;
066import org.apache.hadoop.hbase.client.ResultScanner;
067import org.apache.hadoop.hbase.client.RowMutations;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.client.TableDescriptor;
071import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
072import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
073import org.apache.hadoop.hbase.filter.BinaryComparator;
074import org.apache.hadoop.hbase.filter.Filter;
075import org.apache.hadoop.hbase.filter.FilterAllFilter;
076import org.apache.hadoop.hbase.filter.FilterList;
077import org.apache.hadoop.hbase.filter.PageFilter;
078import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
079import org.apache.hadoop.hbase.filter.WhileMatchFilter;
080import org.apache.hadoop.hbase.io.compress.Compression;
081import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
082import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
083import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
084import org.apache.hadoop.hbase.regionserver.BloomType;
085import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
086import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
087import org.apache.hadoop.hbase.trace.SpanReceiverHost;
088import org.apache.hadoop.hbase.trace.TraceUtil;
089import org.apache.hadoop.hbase.util.ByteArrayHashKey;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.GsonUtil;
092import org.apache.hadoop.hbase.util.Hash;
093import org.apache.hadoop.hbase.util.MurmurHash;
094import org.apache.hadoop.hbase.util.Pair;
095import org.apache.hadoop.hbase.util.YammerHistogramUtils;
096import org.apache.hadoop.io.LongWritable;
097import org.apache.hadoop.io.Text;
098import org.apache.hadoop.mapreduce.Job;
099import org.apache.hadoop.mapreduce.Mapper;
100import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
101import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
102import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
103import org.apache.hadoop.util.Tool;
104import org.apache.hadoop.util.ToolRunner;
105import org.apache.htrace.core.ProbabilitySampler;
106import org.apache.htrace.core.Sampler;
107import org.apache.htrace.core.TraceScope;
108import org.apache.yetus.audience.InterfaceAudience;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
113import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
114import org.apache.hbase.thirdparty.com.google.gson.Gson;
115
116/**
117 * Script used evaluating HBase performance and scalability.  Runs a HBase
118 * client that steps through one of a set of hardcoded tests or 'experiments'
119 * (e.g. a random reads test, a random writes test, etc.). Pass on the
120 * command-line which test to run and how many clients are participating in
121 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
122 *
123 * <p>This class sets up and runs the evaluation programs described in
124 * Section 7, <i>Performance Evaluation</i>, of the <a
125 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
126 * paper, pages 8-10.
127 *
128 * <p>By default, runs as a mapreduce job where each mapper runs a single test
129 * client. Can also run as a non-mapreduce, multithreaded application by
130 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
131 * specified otherwise.
132 */
133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
134public class PerformanceEvaluation extends Configured implements Tool {
135  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
136  static final String RANDOM_READ = "randomRead";
137  static final String PE_COMMAND_SHORTNAME = "pe";
138  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
139  private static final Gson GSON = GsonUtil.createGson().create();
140
141  public static final String TABLE_NAME = "TestTable";
142  public static final String FAMILY_NAME_BASE = "info";
143  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
144  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
145  public static final int DEFAULT_VALUE_LENGTH = 1000;
146  public static final int ROW_LENGTH = 26;
147
148  private static final int ONE_GB = 1024 * 1024 * 1000;
149  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
150  // TODO : should we make this configurable
151  private static final int TAG_LENGTH = 256;
152  private static final DecimalFormat FMT = new DecimalFormat("0.##");
153  private static final MathContext CXT = MathContext.DECIMAL64;
154  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
155  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
156  private static final TestOptions DEFAULT_OPTS = new TestOptions();
157
158  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
159  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
160
161  static {
162    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
163        "Run async random read test");
164    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
165        "Run async random write test");
166    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
167        "Run async sequential read test");
168    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
169        "Run async sequential write test");
170    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
171        "Run async scan test (read every row)");
172    addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
173      "Run random read test");
174    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
175      "Run random seek and scan 100 test");
176    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
177      "Run random seek scan with both start and stop row (max 10 rows)");
178    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
179      "Run random seek scan with both start and stop row (max 100 rows)");
180    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
181      "Run random seek scan with both start and stop row (max 1000 rows)");
182    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
183      "Run random seek scan with both start and stop row (max 10000 rows)");
184    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
185      "Run random write test");
186    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
187      "Run sequential read test");
188    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
189      "Run sequential write test");
190    addCommandDescriptor(ScanTest.class, "scan",
191      "Run scan test (read every row)");
192    addCommandDescriptor(FilteredScanTest.class, "filterScan",
193      "Run scan test using a filter to find a specific row based on it's value " +
194      "(make sure to use --rows=20)");
195    addCommandDescriptor(IncrementTest.class, "increment",
196      "Increment on each row; clients overlap on keyspace so some concurrent operations");
197    addCommandDescriptor(AppendTest.class, "append",
198      "Append on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
200      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
202      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
204      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
205  }
206
207  /**
208   * Enum for map metrics.  Keep it out here rather than inside in the Map
209   * inner-class so we can find associated properties.
210   */
211  protected static enum Counter {
212    /** elapsed time */
213    ELAPSED_TIME,
214    /** number of rows */
215    ROWS
216  }
217
218  protected static class RunResult implements Comparable<RunResult> {
219    public RunResult(long duration, Histogram hist) {
220      this.duration = duration;
221      this.hist = hist;
222    }
223
224    public final long duration;
225    public final Histogram hist;
226
227    @Override
228    public String toString() {
229      return Long.toString(duration);
230    }
231
232    @Override public int compareTo(RunResult o) {
233      return Long.compare(this.duration, o.duration);
234    }
235  }
236
237  /**
238   * Constructor
239   * @param conf Configuration object
240   */
241  public PerformanceEvaluation(final Configuration conf) {
242    super(conf);
243  }
244
245  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
246      String name, String description) {
247    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
248    COMMANDS.put(name, cmdDescriptor);
249  }
250
251  /**
252   * Implementations can have their status set.
253   */
254  interface Status {
255    /**
256     * Sets status
257     * @param msg status message
258     * @throws IOException
259     */
260    void setStatus(final String msg) throws IOException;
261  }
262
263  /**
264   * MapReduce job that runs a performance evaluation client in each map task.
265   */
266  public static class EvaluationMapTask
267      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
268
269    /** configuration parameter name that contains the command */
270    public final static String CMD_KEY = "EvaluationMapTask.command";
271    /** configuration parameter name that contains the PE impl */
272    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
273
274    private Class<? extends Test> cmd;
275
276    @Override
277    protected void setup(Context context) throws IOException, InterruptedException {
278      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
279
280      // this is required so that extensions of PE are instantiated within the
281      // map reduce task...
282      Class<? extends PerformanceEvaluation> peClass =
283          forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
284      try {
285        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
286      } catch (Exception e) {
287        throw new IllegalStateException("Could not instantiate PE instance", e);
288      }
289    }
290
291    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
292      try {
293        return Class.forName(className).asSubclass(type);
294      } catch (ClassNotFoundException e) {
295        throw new IllegalStateException("Could not find class for name: " + className, e);
296      }
297    }
298
299    @Override
300    protected void map(LongWritable key, Text value, final Context context)
301           throws IOException, InterruptedException {
302
303      Status status = new Status() {
304        @Override
305        public void setStatus(String msg) {
306           context.setStatus(msg);
307        }
308      };
309
310      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
311      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
312      final Connection con = ConnectionFactory.createConnection(conf);
313      AsyncConnection asyncCon = null;
314      try {
315        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
316      } catch (ExecutionException e) {
317        throw new IOException(e);
318      }
319
320      // Evaluation task
321      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
322      // Collect how much time the thing took. Report as map output and
323      // to the ELAPSED_TIME counter.
324      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
325      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
326      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
327      context.progress();
328    }
329  }
330
331  /*
332   * If table does not already exist, create. Also create a table when
333   * {@code opts.presplitRegions} is specified or when the existing table's
334   * region replica count doesn't match {@code opts.replicas}.
335   */
336  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
337    TableName tableName = TableName.valueOf(opts.tableName);
338    boolean needsDelete = false, exists = admin.tableExists(tableName);
339    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
340      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
341    if (!exists && isReadCmd) {
342      throw new IllegalStateException(
343        "Must specify an existing table for read commands. Run a write command first.");
344    }
345    TableDescriptor desc =
346      exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
347    byte[][] splits = getSplits(opts);
348
349    // recreate the table when user has requested presplit or when existing
350    // {RegionSplitPolicy,replica count} does not match requested, or when the
351    // number of column families does not match requested.
352    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
353      || (!isReadCmd && desc != null &&
354          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
355      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
356      || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
357      needsDelete = true;
358      // wait, why did it delete my table?!?
359      LOG.debug(MoreObjects.toStringHelper("needsDelete")
360        .add("needsDelete", needsDelete)
361        .add("isReadCmd", isReadCmd)
362        .add("exists", exists)
363        .add("desc", desc)
364        .add("presplit", opts.presplitRegions)
365        .add("splitPolicy", opts.splitPolicy)
366        .add("replicas", opts.replicas)
367        .add("families", opts.families)
368        .toString());
369    }
370
371    // remove an existing table
372    if (needsDelete) {
373      if (admin.isTableEnabled(tableName)) {
374        admin.disableTable(tableName);
375      }
376      admin.deleteTable(tableName);
377    }
378
379    // table creation is necessary
380    if (!exists || needsDelete) {
381      desc = getTableDescriptor(opts);
382      if (splits != null) {
383        if (LOG.isDebugEnabled()) {
384          for (int i = 0; i < splits.length; i++) {
385            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
386          }
387        }
388      }
389      if (splits != null) {
390        admin.createTable(desc, splits);
391      } else {
392        admin.createTable(desc);
393      }
394      LOG.info("Table " + desc + " created");
395    }
396    return admin.tableExists(tableName);
397  }
398
399  /**
400   * Create an HTableDescriptor from provided TestOptions.
401   */
402  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
403    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
404      new TableDescriptorBuilder.ModifyableTableDescriptor(TableName.valueOf(opts.tableName));
405
406    for (int family = 0; family < opts.families; family++) {
407      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
408      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDesc =
409        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(familyName);
410      familyDesc.setDataBlockEncoding(opts.blockEncoding);
411      familyDesc.setCompressionType(opts.compression);
412      familyDesc.setBloomFilterType(opts.bloomType);
413      familyDesc.setBlocksize(opts.blockSize);
414      if (opts.inMemoryCF) {
415        familyDesc.setInMemory(true);
416      }
417      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
418      tableDescriptor.setColumnFamily(familyDesc);
419    }
420    if (opts.replicas != DEFAULT_OPTS.replicas) {
421      tableDescriptor.setRegionReplication(opts.replicas);
422    }
423    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
424      tableDescriptor.setRegionSplitPolicyClassName(opts.splitPolicy);
425    }
426    return tableDescriptor;
427  }
428
429  /**
430   * generates splits based on total number of rows and specified split regions
431   */
432  protected static byte[][] getSplits(TestOptions opts) {
433    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
434      return null;
435
436    int numSplitPoints = opts.presplitRegions - 1;
437    byte[][] splits = new byte[numSplitPoints][];
438    int jump = opts.totalRows / opts.presplitRegions;
439    for (int i = 0; i < numSplitPoints; i++) {
440      int rowkey = jump * (1 + i);
441      splits[i] = format(rowkey);
442    }
443    return splits;
444  }
445
446  static void setupConnectionCount(final TestOptions opts) {
447    if (opts.oneCon) {
448      opts.connCount = 1;
449    } else {
450      if (opts.connCount == -1) {
451        // set to thread number if connCount is not set
452        opts.connCount = opts.numClientThreads;
453      }
454    }
455  }
456
457  /*
458   * Run all clients in this vm each to its own thread.
459   */
460  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
461      throws IOException, InterruptedException, ExecutionException {
462    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
463    assert cmd != null;
464    @SuppressWarnings("unchecked")
465    Future<RunResult>[] threads = new Future[opts.numClientThreads];
466    RunResult[] results = new RunResult[opts.numClientThreads];
467    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
468      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
469    setupConnectionCount(opts);
470    final Connection[] cons = new Connection[opts.connCount];
471    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
472    for (int i = 0; i < opts.connCount; i++) {
473      cons[i] = ConnectionFactory.createConnection(conf);
474      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
475    }
476    LOG.info("Created " + opts.connCount + " connections for " +
477        opts.numClientThreads + " threads");
478    for (int i = 0; i < threads.length; i++) {
479      final int index = i;
480      threads[i] = pool.submit(new Callable<RunResult>() {
481        @Override
482        public RunResult call() throws Exception {
483          TestOptions threadOpts = new TestOptions(opts);
484          final Connection con = cons[index % cons.length];
485          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
486          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
487          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
488            @Override
489            public void setStatus(final String msg) throws IOException {
490              LOG.info(msg);
491            }
492          });
493          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
494            "ms over " + threadOpts.perClientRunRows + " rows");
495          return run;
496        }
497      });
498    }
499    pool.shutdown();
500
501    for (int i = 0; i < threads.length; i++) {
502      try {
503        results[i] = threads[i].get();
504      } catch (ExecutionException e) {
505        throw new IOException(e.getCause());
506      }
507    }
508    final String test = cmd.getSimpleName();
509    LOG.info("[" + test + "] Summary of timings (ms): "
510             + Arrays.toString(results));
511    Arrays.sort(results);
512    long total = 0;
513    float avgLatency = 0 ;
514    float avgTPS = 0;
515    for (RunResult result : results) {
516      total += result.duration;
517      avgLatency += result.hist.getSnapshot().getMean();
518      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
519    }
520    avgTPS *= 1000; // ms to second
521    avgLatency = avgLatency / results.length;
522    LOG.info("[" + test + " duration ]"
523      + "\tMin: " + results[0] + "ms"
524      + "\tMax: " + results[results.length - 1] + "ms"
525      + "\tAvg: " + (total / results.length) + "ms");
526    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
527    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
528    for (int i = 0; i < opts.connCount; i++) {
529      cons[i].close();
530      asyncCons[i].close();
531    }
532
533
534    return results;
535  }
536
537  /*
538   * Run a mapreduce job.  Run as many maps as asked-for clients.
539   * Before we start up the job, write out an input file with instruction
540   * per client regards which row they are to start on.
541   * @param cmd Command to run.
542   * @throws IOException
543   */
544  static Job doMapReduce(TestOptions opts, final Configuration conf)
545      throws IOException, InterruptedException, ClassNotFoundException {
546    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
547    assert cmd != null;
548    Path inputDir = writeInputFile(conf, opts);
549    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
550    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
551    Job job = Job.getInstance(conf);
552    job.setJarByClass(PerformanceEvaluation.class);
553    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
554
555    job.setInputFormatClass(NLineInputFormat.class);
556    NLineInputFormat.setInputPaths(job, inputDir);
557    // this is default, but be explicit about it just in case.
558    NLineInputFormat.setNumLinesPerSplit(job, 1);
559
560    job.setOutputKeyClass(LongWritable.class);
561    job.setOutputValueClass(LongWritable.class);
562
563    job.setMapperClass(EvaluationMapTask.class);
564    job.setReducerClass(LongSumReducer.class);
565
566    job.setNumReduceTasks(1);
567
568    job.setOutputFormatClass(TextOutputFormat.class);
569    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
570
571    TableMapReduceUtil.addDependencyJars(job);
572    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
573      Histogram.class,     // yammer metrics
574      Gson.class,  // gson
575      FilterAllFilter.class // hbase-server tests jar
576      );
577
578    TableMapReduceUtil.initCredentials(job);
579
580    job.waitForCompletion(true);
581    return job;
582  }
583
584  /**
585   * Each client has one mapper to do the work,  and client do the resulting count in a map task.
586   */
587
588  static String JOB_INPUT_FILENAME = "input.txt";
589
590  /*
591   * Write input file of offsets-per-client for the mapreduce job.
592   * @param c Configuration
593   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
594   * @throws IOException
595   */
596  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
597    return writeInputFile(c, opts, new Path("."));
598  }
599
600  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
601  throws IOException {
602    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
603    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
604    Path inputDir = new Path(jobdir, "inputs");
605
606    FileSystem fs = FileSystem.get(c);
607    fs.mkdirs(inputDir);
608
609    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
610    PrintStream out = new PrintStream(fs.create(inputFile));
611    // Make input random.
612    Map<Integer, String> m = new TreeMap<>();
613    Hash h = MurmurHash.getInstance();
614    int perClientRows = (opts.totalRows / opts.numClientThreads);
615    try {
616      for (int j = 0; j < opts.numClientThreads; j++) {
617        TestOptions next = new TestOptions(opts);
618        next.startRow = j * perClientRows;
619        next.perClientRunRows = perClientRows;
620        String s = GSON.toJson(next);
621        LOG.info("Client=" + j + ", input=" + s);
622        byte[] b = Bytes.toBytes(s);
623        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
624        m.put(hash, s);
625      }
626      for (Map.Entry<Integer, String> e: m.entrySet()) {
627        out.println(e.getValue());
628      }
629    } finally {
630      out.close();
631    }
632    return inputDir;
633  }
634
635  /**
636   * Describes a command.
637   */
638  static class CmdDescriptor {
639    private Class<? extends TestBase> cmdClass;
640    private String name;
641    private String description;
642
643    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
644      this.cmdClass = cmdClass;
645      this.name = name;
646      this.description = description;
647    }
648
649    public Class<? extends TestBase> getCmdClass() {
650      return cmdClass;
651    }
652
653    public String getName() {
654      return name;
655    }
656
657    public String getDescription() {
658      return description;
659    }
660  }
661
662  /**
663   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
664   * This makes tracking all these arguments a little easier.
665   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
666   * serialization of this TestOptions class behave), and you need to add to the clone constructor
667   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
668   */
669  static class TestOptions {
670    String cmdName = null;
671    boolean nomapred = false;
672    boolean filterAll = false;
673    int startRow = 0;
674    float size = 1.0f;
675    int perClientRunRows = DEFAULT_ROWS_PER_GB;
676    int numClientThreads = 1;
677    int totalRows = DEFAULT_ROWS_PER_GB;
678    int measureAfter = 0;
679    float sampleRate = 1.0f;
680    double traceRate = 0.0;
681    String tableName = TABLE_NAME;
682    boolean flushCommits = true;
683    boolean writeToWAL = true;
684    boolean autoFlush = false;
685    boolean oneCon = false;
686    int connCount = -1; //wil decide the actual num later
687    boolean useTags = false;
688    int noOfTags = 1;
689    boolean reportLatency = false;
690    int multiGet = 0;
691    int multiPut = 0;
692    int randomSleep = 0;
693    boolean inMemoryCF = false;
694    int presplitRegions = 0;
695    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
696    String splitPolicy = null;
697    Compression.Algorithm compression = Compression.Algorithm.NONE;
698    BloomType bloomType = BloomType.ROW;
699    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
700    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
701    boolean valueRandom = false;
702    boolean valueZipf = false;
703    int valueSize = DEFAULT_VALUE_LENGTH;
704    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
705    int cycles = 1;
706    int columns = 1;
707    int families = 1;
708    int caching = 30;
709    boolean addColumns = true;
710    MemoryCompactionPolicy inMemoryCompaction =
711        MemoryCompactionPolicy.valueOf(
712            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
713    boolean asyncPrefetch = false;
714    boolean cacheBlocks = true;
715    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
716    long bufferSize = 2l * 1024l * 1024l;
717
718    public TestOptions() {}
719
720    /**
721     * Clone constructor.
722     * @param that Object to copy from.
723     */
724    public TestOptions(TestOptions that) {
725      this.cmdName = that.cmdName;
726      this.cycles = that.cycles;
727      this.nomapred = that.nomapred;
728      this.startRow = that.startRow;
729      this.size = that.size;
730      this.perClientRunRows = that.perClientRunRows;
731      this.numClientThreads = that.numClientThreads;
732      this.totalRows = that.totalRows;
733      this.sampleRate = that.sampleRate;
734      this.traceRate = that.traceRate;
735      this.tableName = that.tableName;
736      this.flushCommits = that.flushCommits;
737      this.writeToWAL = that.writeToWAL;
738      this.autoFlush = that.autoFlush;
739      this.oneCon = that.oneCon;
740      this.connCount = that.connCount;
741      this.useTags = that.useTags;
742      this.noOfTags = that.noOfTags;
743      this.reportLatency = that.reportLatency;
744      this.multiGet = that.multiGet;
745      this.multiPut = that.multiPut;
746      this.inMemoryCF = that.inMemoryCF;
747      this.presplitRegions = that.presplitRegions;
748      this.replicas = that.replicas;
749      this.splitPolicy = that.splitPolicy;
750      this.compression = that.compression;
751      this.blockEncoding = that.blockEncoding;
752      this.filterAll = that.filterAll;
753      this.bloomType = that.bloomType;
754      this.blockSize = that.blockSize;
755      this.valueRandom = that.valueRandom;
756      this.valueZipf = that.valueZipf;
757      this.valueSize = that.valueSize;
758      this.period = that.period;
759      this.randomSleep = that.randomSleep;
760      this.measureAfter = that.measureAfter;
761      this.addColumns = that.addColumns;
762      this.columns = that.columns;
763      this.families = that.families;
764      this.caching = that.caching;
765      this.inMemoryCompaction = that.inMemoryCompaction;
766      this.asyncPrefetch = that.asyncPrefetch;
767      this.cacheBlocks = that.cacheBlocks;
768      this.scanReadType = that.scanReadType;
769      this.bufferSize = that.bufferSize;
770    }
771
772    public int getCaching() {
773      return this.caching;
774    }
775
776    public void setCaching(final int caching) {
777      this.caching = caching;
778    }
779
780    public int getColumns() {
781      return this.columns;
782    }
783
784    public void setColumns(final int columns) {
785      this.columns = columns;
786    }
787
788    public int getFamilies() {
789      return this.families;
790    }
791
792    public void setFamilies(final int families) {
793      this.families = families;
794    }
795
796    public int getCycles() {
797      return this.cycles;
798    }
799
800    public void setCycles(final int cycles) {
801      this.cycles = cycles;
802    }
803
804    public boolean isValueZipf() {
805      return valueZipf;
806    }
807
808    public void setValueZipf(boolean valueZipf) {
809      this.valueZipf = valueZipf;
810    }
811
812    public String getCmdName() {
813      return cmdName;
814    }
815
816    public void setCmdName(String cmdName) {
817      this.cmdName = cmdName;
818    }
819
820    public int getRandomSleep() {
821      return randomSleep;
822    }
823
824    public void setRandomSleep(int randomSleep) {
825      this.randomSleep = randomSleep;
826    }
827
828    public int getReplicas() {
829      return replicas;
830    }
831
832    public void setReplicas(int replicas) {
833      this.replicas = replicas;
834    }
835
836    public String getSplitPolicy() {
837      return splitPolicy;
838    }
839
840    public void setSplitPolicy(String splitPolicy) {
841      this.splitPolicy = splitPolicy;
842    }
843
844    public void setNomapred(boolean nomapred) {
845      this.nomapred = nomapred;
846    }
847
848    public void setFilterAll(boolean filterAll) {
849      this.filterAll = filterAll;
850    }
851
852    public void setStartRow(int startRow) {
853      this.startRow = startRow;
854    }
855
856    public void setSize(float size) {
857      this.size = size;
858    }
859
860    public void setPerClientRunRows(int perClientRunRows) {
861      this.perClientRunRows = perClientRunRows;
862    }
863
864    public void setNumClientThreads(int numClientThreads) {
865      this.numClientThreads = numClientThreads;
866    }
867
868    public void setTotalRows(int totalRows) {
869      this.totalRows = totalRows;
870    }
871
872    public void setSampleRate(float sampleRate) {
873      this.sampleRate = sampleRate;
874    }
875
876    public void setTraceRate(double traceRate) {
877      this.traceRate = traceRate;
878    }
879
880    public void setTableName(String tableName) {
881      this.tableName = tableName;
882    }
883
884    public void setFlushCommits(boolean flushCommits) {
885      this.flushCommits = flushCommits;
886    }
887
888    public void setWriteToWAL(boolean writeToWAL) {
889      this.writeToWAL = writeToWAL;
890    }
891
892    public void setAutoFlush(boolean autoFlush) {
893      this.autoFlush = autoFlush;
894    }
895
896    public void setOneCon(boolean oneCon) {
897      this.oneCon = oneCon;
898    }
899
900    public int getConnCount() {
901      return connCount;
902    }
903
904    public void setConnCount(int connCount) {
905      this.connCount = connCount;
906    }
907
908    public void setUseTags(boolean useTags) {
909      this.useTags = useTags;
910    }
911
912    public void setNoOfTags(int noOfTags) {
913      this.noOfTags = noOfTags;
914    }
915
916    public void setReportLatency(boolean reportLatency) {
917      this.reportLatency = reportLatency;
918    }
919
920    public void setMultiGet(int multiGet) {
921      this.multiGet = multiGet;
922    }
923
924    public void setMultiPut(int multiPut) {
925      this.multiPut = multiPut;
926    }
927
928    public void setInMemoryCF(boolean inMemoryCF) {
929      this.inMemoryCF = inMemoryCF;
930    }
931
932    public void setPresplitRegions(int presplitRegions) {
933      this.presplitRegions = presplitRegions;
934    }
935
936    public void setCompression(Compression.Algorithm compression) {
937      this.compression = compression;
938    }
939
940    public void setBloomType(BloomType bloomType) {
941      this.bloomType = bloomType;
942    }
943
944    public void setBlockSize(int blockSize) {
945      this.blockSize = blockSize;
946    }
947
948    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
949      this.blockEncoding = blockEncoding;
950    }
951
952    public void setValueRandom(boolean valueRandom) {
953      this.valueRandom = valueRandom;
954    }
955
956    public void setValueSize(int valueSize) {
957      this.valueSize = valueSize;
958    }
959
960    public void setBufferSize(long bufferSize) {
961      this.bufferSize = bufferSize;
962    }
963
964    public void setPeriod(int period) {
965      this.period = period;
966    }
967
968    public boolean isNomapred() {
969      return nomapred;
970    }
971
972    public boolean isFilterAll() {
973      return filterAll;
974    }
975
976    public int getStartRow() {
977      return startRow;
978    }
979
980    public float getSize() {
981      return size;
982    }
983
984    public int getPerClientRunRows() {
985      return perClientRunRows;
986    }
987
988    public int getNumClientThreads() {
989      return numClientThreads;
990    }
991
992    public int getTotalRows() {
993      return totalRows;
994    }
995
996    public float getSampleRate() {
997      return sampleRate;
998    }
999
1000    public double getTraceRate() {
1001      return traceRate;
1002    }
1003
1004    public String getTableName() {
1005      return tableName;
1006    }
1007
1008    public boolean isFlushCommits() {
1009      return flushCommits;
1010    }
1011
1012    public boolean isWriteToWAL() {
1013      return writeToWAL;
1014    }
1015
1016    public boolean isAutoFlush() {
1017      return autoFlush;
1018    }
1019
1020    public boolean isUseTags() {
1021      return useTags;
1022    }
1023
1024    public int getNoOfTags() {
1025      return noOfTags;
1026    }
1027
1028    public boolean isReportLatency() {
1029      return reportLatency;
1030    }
1031
1032    public int getMultiGet() {
1033      return multiGet;
1034    }
1035
1036    public int getMultiPut() {
1037      return multiPut;
1038    }
1039
1040    public boolean isInMemoryCF() {
1041      return inMemoryCF;
1042    }
1043
1044    public int getPresplitRegions() {
1045      return presplitRegions;
1046    }
1047
1048    public Compression.Algorithm getCompression() {
1049      return compression;
1050    }
1051
1052    public DataBlockEncoding getBlockEncoding() {
1053      return blockEncoding;
1054    }
1055
1056    public boolean isValueRandom() {
1057      return valueRandom;
1058    }
1059
1060    public int getValueSize() {
1061      return valueSize;
1062    }
1063
1064    public int getPeriod() {
1065      return period;
1066    }
1067
1068    public BloomType getBloomType() {
1069      return bloomType;
1070    }
1071
1072    public int getBlockSize() {
1073      return blockSize;
1074    }
1075
1076    public boolean isOneCon() {
1077      return oneCon;
1078    }
1079
1080    public int getMeasureAfter() {
1081      return measureAfter;
1082    }
1083
1084    public void setMeasureAfter(int measureAfter) {
1085      this.measureAfter = measureAfter;
1086    }
1087
1088    public boolean getAddColumns() {
1089      return addColumns;
1090    }
1091
1092    public void setAddColumns(boolean addColumns) {
1093      this.addColumns = addColumns;
1094    }
1095
1096    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1097      this.inMemoryCompaction = inMemoryCompaction;
1098    }
1099
1100    public MemoryCompactionPolicy getInMemoryCompaction() {
1101      return this.inMemoryCompaction;
1102    }
1103
1104    public long getBufferSize() {
1105      return this.bufferSize;
1106    }
1107  }
1108
1109  /*
1110   * A test.
1111   * Subclass to particularize what happens per row.
1112   */
1113  static abstract class TestBase {
1114    // Below is make it so when Tests are all running in the one
1115    // jvm, that they each have a differently seeded Random.
1116    private static final Random randomSeed = new Random(System.currentTimeMillis());
1117
1118    private static long nextRandomSeed() {
1119      return randomSeed.nextLong();
1120    }
1121    private final int everyN;
1122
1123    protected final Random rand = new Random(nextRandomSeed());
1124    protected final Configuration conf;
1125    protected final TestOptions opts;
1126
1127    private final Status status;
1128    private final Sampler traceSampler;
1129    private final SpanReceiverHost receiverHost;
1130
1131    private String testName;
1132    private Histogram latencyHistogram;
1133    private Histogram valueSizeHistogram;
1134    private Histogram rpcCallsHistogram;
1135    private Histogram remoteRpcCallsHistogram;
1136    private Histogram millisBetweenNextHistogram;
1137    private Histogram regionsScannedHistogram;
1138    private Histogram bytesInResultsHistogram;
1139    private Histogram bytesInRemoteResultsHistogram;
1140    private RandomDistribution.Zipf zipf;
1141
1142    /**
1143     * Note that all subclasses of this class must provide a public constructor
1144     * that has the exact same list of arguments.
1145     */
1146    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1147      this.conf = conf;
1148      this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1149      this.opts = options;
1150      this.status = status;
1151      this.testName = this.getClass().getSimpleName();
1152      if (options.traceRate >= 1.0) {
1153        this.traceSampler = Sampler.ALWAYS;
1154      } else if (options.traceRate > 0.0) {
1155        conf.setDouble("hbase.sampler.fraction", options.traceRate);
1156        this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1157      } else {
1158        this.traceSampler = Sampler.NEVER;
1159      }
1160      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1161      if (options.isValueZipf()) {
1162        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1163      }
1164      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1165    }
1166
1167    int getValueLength(final Random r) {
1168      if (this.opts.isValueRandom()) {
1169        return r.nextInt(opts.valueSize);
1170      } else if (this.opts.isValueZipf()) {
1171        return Math.abs(this.zipf.nextInt());
1172      } else {
1173        return opts.valueSize;
1174      }
1175    }
1176
1177    void updateValueSize(final Result [] rs) throws IOException {
1178      if (rs == null || !isRandomValueSize()) return;
1179      for (Result r: rs) updateValueSize(r);
1180    }
1181
1182    void updateValueSize(final Result r) throws IOException {
1183      if (r == null || !isRandomValueSize()) return;
1184      int size = 0;
1185      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1186        size += scanner.current().getValueLength();
1187      }
1188      updateValueSize(size);
1189    }
1190
1191    void updateValueSize(final int valueSize) {
1192      if (!isRandomValueSize()) return;
1193      this.valueSizeHistogram.update(valueSize);
1194    }
1195
1196    void updateScanMetrics(final ScanMetrics metrics) {
1197      if (metrics == null) return;
1198      Map<String,Long> metricsMap = metrics.getMetricsMap();
1199      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1200      if (rpcCalls != null) {
1201        this.rpcCallsHistogram.update(rpcCalls.longValue());
1202      }
1203      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1204      if (remoteRpcCalls != null) {
1205        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1206      }
1207      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1208      if (millisBetweenNext != null) {
1209        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1210      }
1211      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1212      if (regionsScanned != null) {
1213        this.regionsScannedHistogram.update(regionsScanned.longValue());
1214      }
1215      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1216      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1217        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1218      }
1219      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1220      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1221        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1222      }
1223    }
1224
1225    String generateStatus(final int sr, final int i, final int lr) {
1226      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1227        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1228    }
1229
1230    boolean isRandomValueSize() {
1231      return opts.valueRandom;
1232    }
1233
1234    protected int getReportingPeriod() {
1235      return opts.period;
1236    }
1237
1238    /**
1239     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1240     */
1241    public Histogram getLatencyHistogram() {
1242      return latencyHistogram;
1243    }
1244
1245    void testSetup() throws IOException {
1246      // test metrics
1247      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1248      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1249      // scan metrics
1250      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1251      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1252      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1253      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1254      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1255      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1256
1257      onStartup();
1258    }
1259
1260    abstract void onStartup() throws IOException;
1261
1262    void testTakedown() throws IOException {
1263      onTakedown();
1264      // Print all stats for this thread continuously.
1265      // Synchronize on Test.class so different threads don't intermingle the
1266      // output. We can't use 'this' here because each thread has its own instance of Test class.
1267      synchronized (Test.class) {
1268        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1269        status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1270            latencyHistogram));
1271        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1272        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1273        if (valueSizeHistogram.getCount() > 0) {
1274          status.setStatus("ValueSize (bytes) : "
1275              + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1276          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1277          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1278        } else {
1279          status.setStatus("No valueSize statistics available");
1280        }
1281        if (rpcCallsHistogram.getCount() > 0) {
1282          status.setStatus("rpcCalls (count): " +
1283              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1284        }
1285        if (remoteRpcCallsHistogram.getCount() > 0) {
1286          status.setStatus("remoteRpcCalls (count): " +
1287              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1288        }
1289        if (millisBetweenNextHistogram.getCount() > 0) {
1290          status.setStatus("millisBetweenNext (latency): " +
1291              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1292        }
1293        if (regionsScannedHistogram.getCount() > 0) {
1294          status.setStatus("regionsScanned (count): " +
1295              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1296        }
1297        if (bytesInResultsHistogram.getCount() > 0) {
1298          status.setStatus("bytesInResults (size): " +
1299              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1300        }
1301        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1302          status.setStatus("bytesInRemoteResults (size): " +
1303              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1304        }
1305      }
1306      receiverHost.closeReceivers();
1307    }
1308
1309    abstract void onTakedown() throws IOException;
1310
1311
1312    /*
1313     * Run test
1314     * @return Elapsed time.
1315     * @throws IOException
1316     */
1317    long test() throws IOException, InterruptedException {
1318      testSetup();
1319      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1320      final long startTime = System.nanoTime();
1321      try {
1322        testTimed();
1323      } finally {
1324        testTakedown();
1325      }
1326      return (System.nanoTime() - startTime) / 1000000;
1327    }
1328
1329    int getStartRow() {
1330      return opts.startRow;
1331    }
1332
1333    int getLastRow() {
1334      return getStartRow() + opts.perClientRunRows;
1335    }
1336
1337    /**
1338     * Provides an extension point for tests that don't want a per row invocation.
1339     */
1340    void testTimed() throws IOException, InterruptedException {
1341      int startRow = getStartRow();
1342      int lastRow = getLastRow();
1343      TraceUtil.addSampler(traceSampler);
1344      // Report on completion of 1/10th of total.
1345      for (int ii = 0; ii < opts.cycles; ii++) {
1346        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1347        for (int i = startRow; i < lastRow; i++) {
1348          if (i % everyN != 0) continue;
1349          long startTime = System.nanoTime();
1350          boolean requestSent = false;
1351          try (TraceScope scope = TraceUtil.createTrace("test row");){
1352            requestSent = testRow(i);
1353          }
1354          if ( (i - startRow) > opts.measureAfter) {
1355            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1356            // first 9 times and sends the actual get request in the 10th iteration.
1357            // We should only set latency when actual request is sent because otherwise
1358            // it turns out to be 0.
1359            if (requestSent) {
1360              latencyHistogram.update((System.nanoTime() - startTime) / 1000);
1361            }
1362            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1363              status.setStatus(generateStatus(startRow, i, lastRow));
1364            }
1365          }
1366        }
1367      }
1368    }
1369
1370    /**
1371     * @return Subset of the histograms' calculation.
1372     */
1373    public String getShortLatencyReport() {
1374      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1375    }
1376
1377    /**
1378     * @return Subset of the histograms' calculation.
1379     */
1380    public String getShortValueSizeReport() {
1381      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1382    }
1383
1384
1385    /**
1386     * Test for individual row.
1387     * @param i Row index.
1388     * @return true if the row was sent to server and need to record metrics.
1389     *         False if not, multiGet and multiPut e.g., the rows are sent
1390     *         to server only if enough gets/puts are gathered.
1391     */
1392    abstract boolean testRow(final int i) throws IOException, InterruptedException;
1393  }
1394
1395  static abstract class Test extends TestBase {
1396    protected Connection connection;
1397
1398    Test(final Connection con, final TestOptions options, final Status status) {
1399      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1400      this.connection = con;
1401    }
1402  }
1403
1404  static abstract class AsyncTest extends TestBase {
1405    protected AsyncConnection connection;
1406
1407    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1408      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1409      this.connection = con;
1410    }
1411  }
1412
1413  static abstract class TableTest extends Test {
1414    protected Table table;
1415
1416    TableTest(Connection con, TestOptions options, Status status) {
1417      super(con, options, status);
1418    }
1419
1420    @Override
1421    void onStartup() throws IOException {
1422      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1423    }
1424
1425    @Override
1426    void onTakedown() throws IOException {
1427      table.close();
1428    }
1429  }
1430
1431  static abstract class AsyncTableTest extends AsyncTest {
1432    protected AsyncTable<?> table;
1433
1434    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1435      super(con, options, status);
1436    }
1437
1438    @Override
1439    void onStartup() throws IOException {
1440      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1441    }
1442
1443    @Override
1444    void onTakedown() throws IOException {
1445    }
1446  }
1447
1448  static class AsyncRandomReadTest extends AsyncTableTest {
1449    private final Consistency consistency;
1450    private ArrayList<Get> gets;
1451    private Random rd = new Random();
1452
1453    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1454      super(con, options, status);
1455      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1456      if (opts.multiGet > 0) {
1457        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1458        this.gets = new ArrayList<>(opts.multiGet);
1459      }
1460    }
1461
1462    @Override
1463    boolean testRow(final int i) throws IOException, InterruptedException {
1464      if (opts.randomSleep > 0) {
1465        Thread.sleep(rd.nextInt(opts.randomSleep));
1466      }
1467      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1468      for (int family = 0; family < opts.families; family++) {
1469        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1470        if (opts.addColumns) {
1471          for (int column = 0; column < opts.columns; column++) {
1472            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1473            get.addColumn(familyName, qualifier);
1474          }
1475        } else {
1476          get.addFamily(familyName);
1477        }
1478      }
1479      if (opts.filterAll) {
1480        get.setFilter(new FilterAllFilter());
1481      }
1482      get.setConsistency(consistency);
1483      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1484      try {
1485        if (opts.multiGet > 0) {
1486          this.gets.add(get);
1487          if (this.gets.size() == opts.multiGet) {
1488            Result[] rs =
1489                this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1490            updateValueSize(rs);
1491            this.gets.clear();
1492          } else {
1493            return false;
1494          }
1495        } else {
1496          updateValueSize(this.table.get(get).get());
1497        }
1498      } catch (ExecutionException e) {
1499        throw new IOException(e);
1500      }
1501      return true;
1502    }
1503
1504    public static RuntimeException runtime(Throwable e) {
1505      if (e instanceof RuntimeException) {
1506        return (RuntimeException) e;
1507      }
1508      return new RuntimeException(e);
1509    }
1510
1511    public static <V> V propagate(Callable<V> callable) {
1512      try {
1513        return callable.call();
1514      } catch (Exception e) {
1515        throw runtime(e);
1516      }
1517    }
1518
1519    @Override
1520    protected int getReportingPeriod() {
1521      int period = opts.perClientRunRows / 10;
1522      return period == 0 ? opts.perClientRunRows : period;
1523    }
1524
1525    @Override
1526    protected void testTakedown() throws IOException {
1527      if (this.gets != null && this.gets.size() > 0) {
1528        this.table.get(gets);
1529        this.gets.clear();
1530      }
1531      super.testTakedown();
1532    }
1533  }
1534
1535  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1536
1537    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1538      super(con, options, status);
1539    }
1540
1541    @Override
1542    protected byte[] generateRow(final int i) {
1543      return getRandomRow(this.rand, opts.totalRows);
1544    }
1545  }
1546
1547  static class AsyncScanTest extends AsyncTableTest {
1548    private ResultScanner testScanner;
1549    private AsyncTable<?> asyncTable;
1550
1551    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1552      super(con, options, status);
1553    }
1554
1555    @Override
1556    void onStartup() throws IOException {
1557      this.asyncTable =
1558          connection.getTable(TableName.valueOf(opts.tableName),
1559            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1560    }
1561
1562    @Override
1563    void testTakedown() throws IOException {
1564      if (this.testScanner != null) {
1565        updateScanMetrics(this.testScanner.getScanMetrics());
1566        this.testScanner.close();
1567      }
1568      super.testTakedown();
1569    }
1570
1571    @Override
1572    boolean testRow(final int i) throws IOException {
1573      if (this.testScanner == null) {
1574        Scan scan =
1575            new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1576                .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1577                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1578        for (int family = 0; family < opts.families; family++) {
1579          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1580          if (opts.addColumns) {
1581            for (int column = 0; column < opts.columns; column++) {
1582              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1583              scan.addColumn(familyName, qualifier);
1584            }
1585          } else {
1586            scan.addFamily(familyName);
1587          }
1588        }
1589        if (opts.filterAll) {
1590          scan.setFilter(new FilterAllFilter());
1591        }
1592        this.testScanner = asyncTable.getScanner(scan);
1593      }
1594      Result r = testScanner.next();
1595      updateValueSize(r);
1596      return true;
1597    }
1598  }
1599
1600  static class AsyncSequentialReadTest extends AsyncTableTest {
1601    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1602      super(con, options, status);
1603    }
1604
1605    @Override
1606    boolean testRow(final int i) throws IOException, InterruptedException {
1607      Get get = new Get(format(i));
1608      for (int family = 0; family < opts.families; family++) {
1609        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1610        if (opts.addColumns) {
1611          for (int column = 0; column < opts.columns; column++) {
1612            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1613            get.addColumn(familyName, qualifier);
1614          }
1615        } else {
1616          get.addFamily(familyName);
1617        }
1618      }
1619      if (opts.filterAll) {
1620        get.setFilter(new FilterAllFilter());
1621      }
1622      try {
1623        updateValueSize(table.get(get).get());
1624      } catch (ExecutionException e) {
1625        throw new IOException(e);
1626      }
1627      return true;
1628    }
1629  }
1630
1631  static class AsyncSequentialWriteTest extends AsyncTableTest {
1632    private ArrayList<Put> puts;
1633
1634    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1635      super(con, options, status);
1636      if (opts.multiPut > 0) {
1637        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1638        this.puts = new ArrayList<>(opts.multiPut);
1639      }
1640    }
1641
1642    protected byte[] generateRow(final int i) {
1643      return format(i);
1644    }
1645
1646    @Override
1647    @SuppressWarnings("ReturnValueIgnored")
1648    boolean testRow(final int i) throws IOException, InterruptedException {
1649      byte[] row = generateRow(i);
1650      Put put = new Put(row);
1651      for (int family = 0; family < opts.families; family++) {
1652        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1653        for (int column = 0; column < opts.columns; column++) {
1654          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1655          byte[] value = generateData(this.rand, getValueLength(this.rand));
1656          if (opts.useTags) {
1657            byte[] tag = generateData(this.rand, TAG_LENGTH);
1658            Tag[] tags = new Tag[opts.noOfTags];
1659            for (int n = 0; n < opts.noOfTags; n++) {
1660              Tag t = new ArrayBackedTag((byte) n, tag);
1661              tags[n] = t;
1662            }
1663            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1664              value, tags);
1665            put.add(kv);
1666            updateValueSize(kv.getValueLength());
1667          } else {
1668            put.addColumn(familyName, qualifier, value);
1669            updateValueSize(value.length);
1670          }
1671        }
1672      }
1673      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1674      try {
1675        table.put(put).get();
1676        if (opts.multiPut > 0) {
1677          this.puts.add(put);
1678          if (this.puts.size() == opts.multiPut) {
1679            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1680            this.puts.clear();
1681          } else {
1682            return false;
1683          }
1684        } else {
1685          table.put(put).get();
1686        }
1687      } catch (ExecutionException e) {
1688        throw new IOException(e);
1689      }
1690      return true;
1691    }
1692  }
1693
1694  static abstract class BufferedMutatorTest extends Test {
1695    protected BufferedMutator mutator;
1696    protected Table table;
1697
1698    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1699      super(con, options, status);
1700    }
1701
1702    @Override
1703    void onStartup() throws IOException {
1704      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1705      p.writeBufferSize(opts.bufferSize);
1706      this.mutator = connection.getBufferedMutator(p);
1707      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1708    }
1709
1710    @Override
1711    void onTakedown() throws IOException {
1712      mutator.close();
1713      table.close();
1714    }
1715  }
1716
1717  static class RandomSeekScanTest extends TableTest {
1718    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1719      super(con, options, status);
1720    }
1721
1722    @Override
1723    boolean testRow(final int i) throws IOException {
1724      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1725          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1726          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1727          .setScanMetricsEnabled(true);
1728      FilterList list = new FilterList();
1729      for (int family = 0; family < opts.families; family++) {
1730        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1731        if (opts.addColumns) {
1732          for (int column = 0; column < opts.columns; column++) {
1733            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1734            scan.addColumn(familyName, qualifier);
1735          }
1736        } else {
1737          scan.addFamily(familyName);
1738        }
1739      }
1740      if (opts.filterAll) {
1741        list.addFilter(new FilterAllFilter());
1742      }
1743      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1744      scan.setFilter(list);
1745      ResultScanner s = this.table.getScanner(scan);
1746      try {
1747        for (Result rr; (rr = s.next()) != null;) {
1748          updateValueSize(rr);
1749        }
1750      } finally {
1751        updateScanMetrics(s.getScanMetrics());
1752        s.close();
1753      }
1754      return true;
1755    }
1756
1757    @Override
1758    protected int getReportingPeriod() {
1759      int period = opts.perClientRunRows / 100;
1760      return period == 0 ? opts.perClientRunRows : period;
1761    }
1762
1763  }
1764
1765  static abstract class RandomScanWithRangeTest extends TableTest {
1766    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1767      super(con, options, status);
1768    }
1769
1770    @Override
1771    boolean testRow(final int i) throws IOException {
1772      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1773      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1774          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1775          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1776          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1777      for (int family = 0; family < opts.families; family++) {
1778        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1779        if (opts.addColumns) {
1780          for (int column = 0; column < opts.columns; column++) {
1781            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1782            scan.addColumn(familyName, qualifier);
1783          }
1784        } else {
1785          scan.addFamily(familyName);
1786        }
1787      }
1788      if (opts.filterAll) {
1789        scan.setFilter(new FilterAllFilter());
1790      }
1791      Result r = null;
1792      int count = 0;
1793      ResultScanner s = this.table.getScanner(scan);
1794      try {
1795        for (; (r = s.next()) != null;) {
1796          updateValueSize(r);
1797          count++;
1798        }
1799        if (i % 100 == 0) {
1800          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1801            Bytes.toString(startAndStopRow.getFirst()),
1802            Bytes.toString(startAndStopRow.getSecond()), count));
1803        }
1804      } finally {
1805        updateScanMetrics(s.getScanMetrics());
1806        s.close();
1807      }
1808      return true;
1809    }
1810
1811    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1812
1813    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1814      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1815      int stop = start + maxRange;
1816      return new Pair<>(format(start), format(stop));
1817    }
1818
1819    @Override
1820    protected int getReportingPeriod() {
1821      int period = opts.perClientRunRows / 100;
1822      return period == 0? opts.perClientRunRows: period;
1823    }
1824  }
1825
1826  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1827    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1828      super(con, options, status);
1829    }
1830
1831    @Override
1832    protected Pair<byte[], byte[]> getStartAndStopRow() {
1833      return generateStartAndStopRows(10);
1834    }
1835  }
1836
1837  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1838    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1839      super(con, options, status);
1840    }
1841
1842    @Override
1843    protected Pair<byte[], byte[]> getStartAndStopRow() {
1844      return generateStartAndStopRows(100);
1845    }
1846  }
1847
1848  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1849    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1850      super(con, options, status);
1851    }
1852
1853    @Override
1854    protected Pair<byte[], byte[]> getStartAndStopRow() {
1855      return generateStartAndStopRows(1000);
1856    }
1857  }
1858
1859  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1860    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1861      super(con, options, status);
1862    }
1863
1864    @Override
1865    protected Pair<byte[], byte[]> getStartAndStopRow() {
1866      return generateStartAndStopRows(10000);
1867    }
1868  }
1869
1870  static class RandomReadTest extends TableTest {
1871    private final Consistency consistency;
1872    private ArrayList<Get> gets;
1873    private Random rd = new Random();
1874
1875    RandomReadTest(Connection con, TestOptions options, Status status) {
1876      super(con, options, status);
1877      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1878      if (opts.multiGet > 0) {
1879        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1880        this.gets = new ArrayList<>(opts.multiGet);
1881      }
1882    }
1883
1884    @Override
1885    boolean testRow(final int i) throws IOException, InterruptedException {
1886      if (opts.randomSleep > 0) {
1887        Thread.sleep(rd.nextInt(opts.randomSleep));
1888      }
1889      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1890      for (int family = 0; family < opts.families; family++) {
1891        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1892        if (opts.addColumns) {
1893          for (int column = 0; column < opts.columns; column++) {
1894            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1895            get.addColumn(familyName, qualifier);
1896          }
1897        } else {
1898          get.addFamily(familyName);
1899        }
1900      }
1901      if (opts.filterAll) {
1902        get.setFilter(new FilterAllFilter());
1903      }
1904      get.setConsistency(consistency);
1905      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1906      if (opts.multiGet > 0) {
1907        this.gets.add(get);
1908        if (this.gets.size() == opts.multiGet) {
1909          Result [] rs = this.table.get(this.gets);
1910          updateValueSize(rs);
1911          this.gets.clear();
1912        } else {
1913          return false;
1914        }
1915      } else {
1916        updateValueSize(this.table.get(get));
1917      }
1918      return true;
1919    }
1920
1921    @Override
1922    protected int getReportingPeriod() {
1923      int period = opts.perClientRunRows / 10;
1924      return period == 0 ? opts.perClientRunRows : period;
1925    }
1926
1927    @Override
1928    protected void testTakedown() throws IOException {
1929      if (this.gets != null && this.gets.size() > 0) {
1930        this.table.get(gets);
1931        this.gets.clear();
1932      }
1933      super.testTakedown();
1934    }
1935  }
1936
1937  static class RandomWriteTest extends SequentialWriteTest {
1938    RandomWriteTest(Connection con, TestOptions options, Status status) {
1939      super(con, options, status);
1940    }
1941
1942    @Override
1943    protected byte[] generateRow(final int i) {
1944      return getRandomRow(this.rand, opts.totalRows);
1945    }
1946
1947
1948  }
1949
1950  static class ScanTest extends TableTest {
1951    private ResultScanner testScanner;
1952
1953    ScanTest(Connection con, TestOptions options, Status status) {
1954      super(con, options, status);
1955    }
1956
1957    @Override
1958    void testTakedown() throws IOException {
1959      if (this.testScanner != null) {
1960        this.testScanner.close();
1961      }
1962      super.testTakedown();
1963    }
1964
1965
1966    @Override
1967    boolean testRow(final int i) throws IOException {
1968      if (this.testScanner == null) {
1969        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1970            .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1971            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1972        for (int family = 0; family < opts.families; family++) {
1973          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1974          if (opts.addColumns) {
1975            for (int column = 0; column < opts.columns; column++) {
1976              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1977              scan.addColumn(familyName, qualifier);
1978            }
1979          } else {
1980            scan.addFamily(familyName);
1981          }
1982        }
1983        if (opts.filterAll) {
1984          scan.setFilter(new FilterAllFilter());
1985        }
1986        this.testScanner = table.getScanner(scan);
1987      }
1988      Result r = testScanner.next();
1989      updateValueSize(r);
1990      return true;
1991    }
1992  }
1993
1994  /**
1995   * Base class for operations that are CAS-like; that read a value and then set it based off what
1996   * they read. In this category is increment, append, checkAndPut, etc.
1997   *
1998   * <p>These operations also want some concurrency going on. Usually when these tests run, they
1999   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2000   * same key space. We do this with our getStartRow and getLastRow overrides.
2001   */
2002  static abstract class CASTableTest extends TableTest {
2003    private final byte [] qualifier;
2004    CASTableTest(Connection con, TestOptions options, Status status) {
2005      super(con, options, status);
2006      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2007    }
2008
2009    byte [] getQualifier() {
2010      return this.qualifier;
2011    }
2012
2013    @Override
2014    int getStartRow() {
2015      return 0;
2016    }
2017
2018    @Override
2019    int getLastRow() {
2020      return opts.perClientRunRows;
2021    }
2022  }
2023
2024  static class IncrementTest extends CASTableTest {
2025    IncrementTest(Connection con, TestOptions options, Status status) {
2026      super(con, options, status);
2027    }
2028
2029    @Override
2030    boolean testRow(final int i) throws IOException {
2031      Increment increment = new Increment(format(i));
2032      // unlike checkAndXXX tests, which make most sense to do on a single value,
2033      // if multiple families are specified for an increment test we assume it is
2034      // meant to raise the work factor
2035      for (int family = 0; family < opts.families; family++) {
2036        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2037        increment.addColumn(familyName, getQualifier(), 1l);
2038      }
2039      updateValueSize(this.table.increment(increment));
2040      return true;
2041    }
2042  }
2043
2044  static class AppendTest extends CASTableTest {
2045    AppendTest(Connection con, TestOptions options, Status status) {
2046      super(con, options, status);
2047    }
2048
2049    @Override
2050    boolean testRow(final int i) throws IOException {
2051      byte [] bytes = format(i);
2052      Append append = new Append(bytes);
2053      // unlike checkAndXXX tests, which make most sense to do on a single value,
2054      // if multiple families are specified for an append test we assume it is
2055      // meant to raise the work factor
2056      for (int family = 0; family < opts.families; family++) {
2057        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2058        append.addColumn(familyName, getQualifier(), bytes);
2059      }
2060      updateValueSize(this.table.append(append));
2061      return true;
2062    }
2063  }
2064
2065  static class CheckAndMutateTest extends CASTableTest {
2066    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2067      super(con, options, status);
2068    }
2069
2070    @Override
2071    boolean testRow(final int i) throws IOException {
2072      final byte [] bytes = format(i);
2073      // checkAndXXX tests operate on only a single value
2074      // Put a known value so when we go to check it, it is there.
2075      Put put = new Put(bytes);
2076      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2077      this.table.put(put);
2078      RowMutations mutations = new RowMutations(bytes);
2079      mutations.add(put);
2080      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2081          .ifEquals(bytes).thenMutate(mutations);
2082      return true;
2083    }
2084  }
2085
2086  static class CheckAndPutTest extends CASTableTest {
2087    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2088      super(con, options, status);
2089    }
2090
2091    @Override
2092    boolean testRow(final int i) throws IOException {
2093      final byte [] bytes = format(i);
2094      // checkAndXXX tests operate on only a single value
2095      // Put a known value so when we go to check it, it is there.
2096      Put put = new Put(bytes);
2097      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2098      this.table.put(put);
2099      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2100          .ifEquals(bytes).thenPut(put);
2101      return true;
2102    }
2103  }
2104
2105  static class CheckAndDeleteTest extends CASTableTest {
2106    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2107      super(con, options, status);
2108    }
2109
2110    @Override
2111    boolean testRow(final int i) throws IOException {
2112      final byte [] bytes = format(i);
2113      // checkAndXXX tests operate on only a single value
2114      // Put a known value so when we go to check it, it is there.
2115      Put put = new Put(bytes);
2116      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2117      this.table.put(put);
2118      Delete delete = new Delete(put.getRow());
2119      delete.addColumn(FAMILY_ZERO, getQualifier());
2120      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2121          .ifEquals(bytes).thenDelete(delete);
2122      return true;
2123    }
2124  }
2125
2126  static class SequentialReadTest extends TableTest {
2127    SequentialReadTest(Connection con, TestOptions options, Status status) {
2128      super(con, options, status);
2129    }
2130
2131    @Override
2132    boolean testRow(final int i) throws IOException {
2133      Get get = new Get(format(i));
2134      for (int family = 0; family < opts.families; family++) {
2135        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2136        if (opts.addColumns) {
2137          for (int column = 0; column < opts.columns; column++) {
2138            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2139            get.addColumn(familyName, qualifier);
2140          }
2141        } else {
2142          get.addFamily(familyName);
2143        }
2144      }
2145      if (opts.filterAll) {
2146        get.setFilter(new FilterAllFilter());
2147      }
2148      updateValueSize(table.get(get));
2149      return true;
2150    }
2151  }
2152
2153  static class SequentialWriteTest extends BufferedMutatorTest {
2154    private ArrayList<Put> puts;
2155
2156
2157    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2158      super(con, options, status);
2159      if (opts.multiPut > 0) {
2160        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2161        this.puts = new ArrayList<>(opts.multiPut);
2162      }
2163    }
2164
2165    protected byte[] generateRow(final int i) {
2166      return format(i);
2167    }
2168
2169    @Override
2170    boolean testRow(final int i) throws IOException {
2171      byte[] row = generateRow(i);
2172      Put put = new Put(row);
2173      for (int family = 0; family < opts.families; family++) {
2174        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2175        for (int column = 0; column < opts.columns; column++) {
2176          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2177          byte[] value = generateData(this.rand, getValueLength(this.rand));
2178          if (opts.useTags) {
2179            byte[] tag = generateData(this.rand, TAG_LENGTH);
2180            Tag[] tags = new Tag[opts.noOfTags];
2181            for (int n = 0; n < opts.noOfTags; n++) {
2182              Tag t = new ArrayBackedTag((byte) n, tag);
2183              tags[n] = t;
2184            }
2185            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2186              value, tags);
2187            put.add(kv);
2188            updateValueSize(kv.getValueLength());
2189          } else {
2190            put.addColumn(familyName, qualifier, value);
2191            updateValueSize(value.length);
2192          }
2193        }
2194      }
2195      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2196      if (opts.autoFlush) {
2197        if (opts.multiPut > 0) {
2198          this.puts.add(put);
2199          if (this.puts.size() == opts.multiPut) {
2200            table.put(this.puts);
2201            this.puts.clear();
2202          } else {
2203            return false;
2204          }
2205        } else {
2206          table.put(put);
2207        }
2208      } else {
2209        mutator.mutate(put);
2210      }
2211      return true;
2212    }
2213  }
2214
2215  static class FilteredScanTest extends TableTest {
2216    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2217
2218    FilteredScanTest(Connection con, TestOptions options, Status status) {
2219      super(con, options, status);
2220      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2221        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB +
2222                ". This could take a very long time.");
2223      }
2224    }
2225
2226    @Override
2227    boolean testRow(int i) throws IOException {
2228      byte[] value = generateData(this.rand, getValueLength(this.rand));
2229      Scan scan = constructScan(value);
2230      ResultScanner scanner = null;
2231      try {
2232        scanner = this.table.getScanner(scan);
2233        for (Result r = null; (r = scanner.next()) != null;) {
2234          updateValueSize(r);
2235        }
2236      } finally {
2237        if (scanner != null) {
2238          updateScanMetrics(scanner.getScanMetrics());
2239          scanner.close();
2240        }
2241      }
2242      return true;
2243    }
2244
2245    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2246      FilterList list = new FilterList();
2247      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2248        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2249      list.addFilter(filter);
2250      if (opts.filterAll) {
2251        list.addFilter(new FilterAllFilter());
2252      }
2253      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2254          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2255          .setScanMetricsEnabled(true);
2256      if (opts.addColumns) {
2257        for (int column = 0; column < opts.columns; column++) {
2258          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2259          scan.addColumn(FAMILY_ZERO, qualifier);
2260        }
2261      } else {
2262        scan.addFamily(FAMILY_ZERO);
2263      }
2264      scan.setFilter(list);
2265      return scan;
2266    }
2267  }
2268
2269  /**
2270   * Compute a throughput rate in MB/s.
2271   * @param rows Number of records consumed.
2272   * @param timeMs Time taken in milliseconds.
2273   * @return String value with label, ie '123.76 MB/s'
2274   */
2275  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2276    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2277      ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2278    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2279      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2280      .divide(BYTES_PER_MB, CXT);
2281    return FMT.format(mbps) + " MB/s";
2282  }
2283
2284  /*
2285   * Format passed integer.
2286   * @param number
2287   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2288   * number (Does absolute in case number is negative).
2289   */
2290  public static byte [] format(final int number) {
2291    byte [] b = new byte[ROW_LENGTH];
2292    int d = Math.abs(number);
2293    for (int i = b.length - 1; i >= 0; i--) {
2294      b[i] = (byte)((d % 10) + '0');
2295      d /= 10;
2296    }
2297    return b;
2298  }
2299
2300  /*
2301   * This method takes some time and is done inline uploading data.  For
2302   * example, doing the mapfile test, generation of the key and value
2303   * consumes about 30% of CPU time.
2304   * @return Generated random value to insert into a table cell.
2305   */
2306  public static byte[] generateData(final Random r, int length) {
2307    byte [] b = new byte [length];
2308    int i;
2309
2310    for(i = 0; i < (length-8); i += 8) {
2311      b[i] = (byte) (65 + r.nextInt(26));
2312      b[i+1] = b[i];
2313      b[i+2] = b[i];
2314      b[i+3] = b[i];
2315      b[i+4] = b[i];
2316      b[i+5] = b[i];
2317      b[i+6] = b[i];
2318      b[i+7] = b[i];
2319    }
2320
2321    byte a = (byte) (65 + r.nextInt(26));
2322    for(; i < length; i++) {
2323      b[i] = a;
2324    }
2325    return b;
2326  }
2327
2328  static byte [] getRandomRow(final Random random, final int totalRows) {
2329    return format(generateRandomRow(random, totalRows));
2330  }
2331
2332  static int generateRandomRow(final Random random, final int totalRows) {
2333    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2334  }
2335
2336  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2337      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2338      throws IOException, InterruptedException {
2339    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2340        + opts.perClientRunRows + " rows");
2341    long totalElapsedTime;
2342
2343    final TestBase t;
2344    try {
2345      if (AsyncTest.class.isAssignableFrom(cmd)) {
2346        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2347        Constructor<? extends AsyncTest> constructor =
2348            newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2349        t = constructor.newInstance(asyncCon, opts, status);
2350      } else {
2351        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2352        Constructor<? extends Test> constructor =
2353            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2354        t = constructor.newInstance(con, opts, status);
2355      }
2356    } catch (NoSuchMethodException e) {
2357      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2358          + ".  It does not provide a constructor as described by "
2359          + "the javadoc comment.  Available constructors are: "
2360          + Arrays.toString(cmd.getConstructors()));
2361    } catch (Exception e) {
2362      throw new IllegalStateException("Failed to construct command class", e);
2363    }
2364    totalElapsedTime = t.test();
2365
2366    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2367      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2368      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2369          getAverageValueLength(opts), opts.families, opts.columns) + ")");
2370
2371    return new RunResult(totalElapsedTime, t.getLatencyHistogram());
2372  }
2373
2374  private static int getAverageValueLength(final TestOptions opts) {
2375    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2376  }
2377
2378  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2379      InterruptedException, ClassNotFoundException, ExecutionException {
2380    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2381    // the TestOptions introspection for us and dump the output in a readable format.
2382    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2383    Admin admin = null;
2384    Connection connection = null;
2385    try {
2386      connection = ConnectionFactory.createConnection(getConf());
2387      admin = connection.getAdmin();
2388      checkTable(admin, opts);
2389    } finally {
2390      if (admin != null) admin.close();
2391      if (connection != null) connection.close();
2392    }
2393    if (opts.nomapred) {
2394      doLocalClients(opts, getConf());
2395    } else {
2396      doMapReduce(opts, getConf());
2397    }
2398  }
2399
2400  protected void printUsage() {
2401    printUsage(PE_COMMAND_SHORTNAME, null);
2402  }
2403
2404  protected static void printUsage(final String message) {
2405    printUsage(PE_COMMAND_SHORTNAME, message);
2406  }
2407
2408  protected static void printUsageAndExit(final String message, final int exitCode) {
2409    printUsage(message);
2410    System.exit(exitCode);
2411  }
2412
2413  protected static void printUsage(final String shortName, final String message) {
2414    if (message != null && message.length() > 0) {
2415      System.err.println(message);
2416    }
2417    System.err.print("Usage: hbase " + shortName);
2418    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2419    System.err.println();
2420    System.err.println("General Options:");
2421    System.err.println(" nomapred        Run multiple clients using threads " +
2422      "(rather than use mapreduce)");
2423    System.err.println(" oneCon          all the threads share the same connection. Default: False");
2424    System.err.println(" connCount          connections all threads share. "
2425        + "For example, if set to 2, then all thread share 2 connection. "
2426        + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2427        + "if not, connCount=thread number");
2428
2429    System.err.println(" sampleRate      Execute test on a sample of total " +
2430      "rows. Only supported by randomRead. Default: 1.0");
2431    System.err.println(" period          Report every 'period' rows: " +
2432      "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2433    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2434    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
2435      "Default: 0");
2436    System.err.println(" latency         Set to report operation latencies. Default: False");
2437    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'" +
2438        " rows have been treated. Default: 0");
2439    System.err.println(" valueSize       Pass value size to use: Default: "
2440        + DEFAULT_OPTS.getValueSize());
2441    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
2442        "'valueSize'; set on read for stats on size: Default: Not set.");
2443    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2444        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2445    System.err.println();
2446    System.err.println("Table Creation / Write Tests:");
2447    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2448    System.err.println(" rows            Rows each client runs. Default: "
2449        + DEFAULT_OPTS.getPerClientRunRows()
2450        + ".  In case of randomReads and randomSeekScans this could"
2451        + " be specified along with --size to specify the number of rows to be scanned within"
2452        + " the total range specified by the size.");
2453    System.err.println(
2454      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2455          + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2456          + " use size to specify the end range and --rows"
2457          + " specifies the number of rows within that range. " + "Default: 1.0.");
2458    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2459    System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
2460      "Default: false");
2461    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
2462        "'valueSize' in zipf form: Default: Not set.");
2463    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2464    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2465    System.err.println(" multiPut        Batch puts together into groups of N. Only supported " +
2466        "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2467    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2468        + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2469        + "Recommended for accurate perf analysis (see guide). Default: disabled");
2470    System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
2471      "Default: false");
2472    System.err.println(" numoftags       Specify the no of tags that would be needed. " +
2473       "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2474    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2475    System.err.println(" columns         Columns to write per row. Default: 1");
2476    System.err.println(" families        Specify number of column families for the table. Default: 1");
2477    System.err.println();
2478    System.err.println("Read Tests:");
2479    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2480        + " there by not returning any thing back to the client.  Helps to check the server side"
2481        + " performance.  Uses FilterAllFilter internally. ");
2482    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
2483      "by randomRead. Default: disabled");
2484    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
2485      "inmemory as far as possible. Not guaranteed that reads are always served " +
2486      "from memory.  Default: false");
2487    System.err.println(" bloomFilter     Bloom filter type, one of "
2488        + Arrays.toString(BloomType.values()));
2489    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2490    System.err.println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2491        + "Uses the CompactingMemstore");
2492    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2493    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2494    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2495    System.err.println(" caching         Scan caching to use. Default: 30");
2496    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2497    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2498    System.err.println(" scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2499    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2500    System.err.println();
2501    System.err.println(" Note: -D properties will be applied to the conf used. ");
2502    System.err.println("  For example: ");
2503    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2504    System.err.println("   -Dmapreduce.task.timeout=60000");
2505    System.err.println();
2506    System.err.println("Command:");
2507    for (CmdDescriptor command : COMMANDS.values()) {
2508      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2509    }
2510    System.err.println();
2511    System.err.println("Args:");
2512    System.err.println(" nclients        Integer. Required. Total number of clients "
2513        + "(and HRegionServers) running. 1 <= value <= 500");
2514    System.err.println("Examples:");
2515    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2516    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2517    System.err.println(" To run 10 clients doing increments over ten rows:");
2518    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2519  }
2520
2521  /**
2522   * Parse options passed in via an arguments array. Assumes that array has been split
2523   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2524   * in the queue at the conclusion of this method call. It's up to the caller to deal
2525   * with these unrecognized arguments.
2526   */
2527  static TestOptions parseOpts(Queue<String> args) {
2528    TestOptions opts = new TestOptions();
2529
2530    String cmd = null;
2531    while ((cmd = args.poll()) != null) {
2532      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2533        // place item back onto queue so that caller knows parsing was incomplete
2534        args.add(cmd);
2535        break;
2536      }
2537
2538      final String nmr = "--nomapred";
2539      if (cmd.startsWith(nmr)) {
2540        opts.nomapred = true;
2541        continue;
2542      }
2543
2544      final String rows = "--rows=";
2545      if (cmd.startsWith(rows)) {
2546        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2547        continue;
2548      }
2549
2550      final String cycles = "--cycles=";
2551      if (cmd.startsWith(cycles)) {
2552        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2553        continue;
2554      }
2555
2556      final String sampleRate = "--sampleRate=";
2557      if (cmd.startsWith(sampleRate)) {
2558        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2559        continue;
2560      }
2561
2562      final String table = "--table=";
2563      if (cmd.startsWith(table)) {
2564        opts.tableName = cmd.substring(table.length());
2565        continue;
2566      }
2567
2568      final String startRow = "--startRow=";
2569      if (cmd.startsWith(startRow)) {
2570        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2571        continue;
2572      }
2573
2574      final String compress = "--compress=";
2575      if (cmd.startsWith(compress)) {
2576        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2577        continue;
2578      }
2579
2580      final String traceRate = "--traceRate=";
2581      if (cmd.startsWith(traceRate)) {
2582        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2583        continue;
2584      }
2585
2586      final String blockEncoding = "--blockEncoding=";
2587      if (cmd.startsWith(blockEncoding)) {
2588        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2589        continue;
2590      }
2591
2592      final String flushCommits = "--flushCommits=";
2593      if (cmd.startsWith(flushCommits)) {
2594        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2595        continue;
2596      }
2597
2598      final String writeToWAL = "--writeToWAL=";
2599      if (cmd.startsWith(writeToWAL)) {
2600        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2601        continue;
2602      }
2603
2604      final String presplit = "--presplit=";
2605      if (cmd.startsWith(presplit)) {
2606        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2607        continue;
2608      }
2609
2610      final String inMemory = "--inmemory=";
2611      if (cmd.startsWith(inMemory)) {
2612        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2613        continue;
2614      }
2615
2616      final String autoFlush = "--autoFlush=";
2617      if (cmd.startsWith(autoFlush)) {
2618        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2619        if (!opts.autoFlush && opts.multiPut > 0) {
2620          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2621        }
2622        continue;
2623      }
2624
2625      final String onceCon = "--oneCon=";
2626      if (cmd.startsWith(onceCon)) {
2627        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2628        if (opts.oneCon && opts.connCount > 1) {
2629          throw new IllegalArgumentException("oneCon is set to true, "
2630              + "connCount should not bigger than 1");
2631        }
2632        continue;
2633      }
2634
2635      final String connCount = "--connCount=";
2636      if (cmd.startsWith(connCount)) {
2637        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2638        if (opts.oneCon && opts.connCount > 1) {
2639          throw new IllegalArgumentException("oneCon is set to true, "
2640              + "connCount should not bigger than 1");
2641        }
2642        continue;
2643      }
2644
2645      final String latency = "--latency";
2646      if (cmd.startsWith(latency)) {
2647        opts.reportLatency = true;
2648        continue;
2649      }
2650
2651      final String multiGet = "--multiGet=";
2652      if (cmd.startsWith(multiGet)) {
2653        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2654        continue;
2655      }
2656
2657      final String multiPut = "--multiPut=";
2658      if (cmd.startsWith(multiPut)) {
2659        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2660        if (!opts.autoFlush && opts.multiPut > 0) {
2661          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2662        }
2663        continue;
2664      }
2665
2666      final String useTags = "--usetags=";
2667      if (cmd.startsWith(useTags)) {
2668        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2669        continue;
2670      }
2671
2672      final String noOfTags = "--numoftags=";
2673      if (cmd.startsWith(noOfTags)) {
2674        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2675        continue;
2676      }
2677
2678      final String replicas = "--replicas=";
2679      if (cmd.startsWith(replicas)) {
2680        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2681        continue;
2682      }
2683
2684      final String filterOutAll = "--filterAll";
2685      if (cmd.startsWith(filterOutAll)) {
2686        opts.filterAll = true;
2687        continue;
2688      }
2689
2690      final String size = "--size=";
2691      if (cmd.startsWith(size)) {
2692        opts.size = Float.parseFloat(cmd.substring(size.length()));
2693        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2694        continue;
2695      }
2696
2697      final String splitPolicy = "--splitPolicy=";
2698      if (cmd.startsWith(splitPolicy)) {
2699        opts.splitPolicy = cmd.substring(splitPolicy.length());
2700        continue;
2701      }
2702
2703      final String randomSleep = "--randomSleep=";
2704      if (cmd.startsWith(randomSleep)) {
2705        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2706        continue;
2707      }
2708
2709      final String measureAfter = "--measureAfter=";
2710      if (cmd.startsWith(measureAfter)) {
2711        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2712        continue;
2713      }
2714
2715      final String bloomFilter = "--bloomFilter=";
2716      if (cmd.startsWith(bloomFilter)) {
2717        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2718        continue;
2719      }
2720
2721      final String blockSize = "--blockSize=";
2722      if(cmd.startsWith(blockSize) ) {
2723        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2724        continue;
2725      }
2726
2727      final String valueSize = "--valueSize=";
2728      if (cmd.startsWith(valueSize)) {
2729        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2730        continue;
2731      }
2732
2733      final String valueRandom = "--valueRandom";
2734      if (cmd.startsWith(valueRandom)) {
2735        opts.valueRandom = true;
2736        if (opts.valueZipf) {
2737          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2738        }
2739        continue;
2740      }
2741
2742      final String valueZipf = "--valueZipf";
2743      if (cmd.startsWith(valueZipf)) {
2744        opts.valueZipf = true;
2745        if (opts.valueRandom) {
2746          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2747        }
2748        continue;
2749      }
2750
2751      final String period = "--period=";
2752      if (cmd.startsWith(period)) {
2753        opts.period = Integer.parseInt(cmd.substring(period.length()));
2754        continue;
2755      }
2756
2757      final String addColumns = "--addColumns=";
2758      if (cmd.startsWith(addColumns)) {
2759        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2760        continue;
2761      }
2762
2763      final String inMemoryCompaction = "--inmemoryCompaction=";
2764      if (cmd.startsWith(inMemoryCompaction)) {
2765        opts.inMemoryCompaction =
2766            MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2767        continue;
2768      }
2769
2770      final String columns = "--columns=";
2771      if (cmd.startsWith(columns)) {
2772        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2773        continue;
2774      }
2775
2776      final String families = "--families=";
2777      if (cmd.startsWith(families)) {
2778        opts.families = Integer.parseInt(cmd.substring(families.length()));
2779        continue;
2780      }
2781
2782      final String caching = "--caching=";
2783      if (cmd.startsWith(caching)) {
2784        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2785        continue;
2786      }
2787
2788      final String asyncPrefetch = "--asyncPrefetch";
2789      if (cmd.startsWith(asyncPrefetch)) {
2790        opts.asyncPrefetch = true;
2791        continue;
2792      }
2793
2794      final String cacheBlocks = "--cacheBlocks=";
2795      if (cmd.startsWith(cacheBlocks)) {
2796        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2797        continue;
2798      }
2799
2800      final String scanReadType = "--scanReadType=";
2801      if (cmd.startsWith(scanReadType)) {
2802        opts.scanReadType =
2803            Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2804        continue;
2805      }
2806
2807      final String bufferSize = "--bufferSize=";
2808      if (cmd.startsWith(bufferSize)) {
2809        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2810        continue;
2811      }
2812
2813      if (isCommandClass(cmd)) {
2814        opts.cmdName = cmd;
2815        try {
2816          opts.numClientThreads = Integer.parseInt(args.remove());
2817        } catch (NoSuchElementException | NumberFormatException e) {
2818          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2819        }
2820        opts = calculateRowsAndSize(opts);
2821        break;
2822      } else {
2823        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2824      }
2825
2826      // Not matching any option or command.
2827      System.err.println("Error: Wrong option or command: " + cmd);
2828      args.add(cmd);
2829      break;
2830    }
2831    return opts;
2832  }
2833
2834  static TestOptions calculateRowsAndSize(final TestOptions opts) {
2835    int rowsPerGB = getRowsPerGB(opts);
2836    if ((opts.getCmdName() != null
2837        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
2838        && opts.size != DEFAULT_OPTS.size
2839        && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2840      opts.totalRows = (int) opts.size * rowsPerGB;
2841    } else if (opts.size != DEFAULT_OPTS.size) {
2842      // total size in GB specified
2843      opts.totalRows = (int) opts.size * rowsPerGB;
2844      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2845    } else {
2846      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2847      opts.size = opts.totalRows / rowsPerGB;
2848    }
2849    return opts;
2850  }
2851
2852  static int getRowsPerGB(final TestOptions opts) {
2853    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
2854        opts.getColumns());
2855  }
2856
2857  @Override
2858  public int run(String[] args) throws Exception {
2859    // Process command-line args. TODO: Better cmd-line processing
2860    // (but hopefully something not as painful as cli options).
2861    int errCode = -1;
2862    if (args.length < 1) {
2863      printUsage();
2864      return errCode;
2865    }
2866
2867    try {
2868      LinkedList<String> argv = new LinkedList<>();
2869      argv.addAll(Arrays.asList(args));
2870      TestOptions opts = parseOpts(argv);
2871
2872      // args remaining, print help and exit
2873      if (!argv.isEmpty()) {
2874        errCode = 0;
2875        printUsage();
2876        return errCode;
2877      }
2878
2879      // must run at least 1 client
2880      if (opts.numClientThreads <= 0) {
2881        throw new IllegalArgumentException("Number of clients must be > 0");
2882      }
2883
2884      // cmdName should not be null, print help and exit
2885      if (opts.cmdName == null) {
2886        printUsage();
2887        return errCode;
2888      }
2889
2890      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
2891      if (cmdClass != null) {
2892        runTest(cmdClass, opts);
2893        errCode = 0;
2894      }
2895
2896    } catch (Exception e) {
2897      e.printStackTrace();
2898    }
2899
2900    return errCode;
2901  }
2902
2903  private static boolean isCommandClass(String cmd) {
2904    return COMMANDS.containsKey(cmd);
2905  }
2906
2907  private static Class<? extends TestBase> determineCommandClass(String cmd) {
2908    CmdDescriptor descriptor = COMMANDS.get(cmd);
2909    return descriptor != null ? descriptor.getCmdClass() : null;
2910  }
2911
2912  public static void main(final String[] args) throws Exception {
2913    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2914    System.exit(res);
2915  }
2916}