001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import com.codahale.metrics.Histogram;
022import com.codahale.metrics.UniformReservoir;
023import com.fasterxml.jackson.databind.MapperFeature;
024import com.fasterxml.jackson.databind.ObjectMapper;
025
026import java.io.IOException;
027import java.io.PrintStream;
028import java.lang.reflect.Constructor;
029import java.math.BigDecimal;
030import java.math.MathContext;
031import java.text.DecimalFormat;
032import java.text.SimpleDateFormat;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Date;
036import java.util.LinkedList;
037import java.util.Locale;
038import java.util.Map;
039import java.util.NoSuchElementException;
040import java.util.Queue;
041import java.util.Random;
042import java.util.TreeMap;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048
049import org.apache.commons.lang3.StringUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.conf.Configured;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.Append;
056import org.apache.hadoop.hbase.client.AsyncConnection;
057import org.apache.hadoop.hbase.client.AsyncTable;
058import org.apache.hadoop.hbase.client.BufferedMutator;
059import org.apache.hadoop.hbase.client.BufferedMutatorParams;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Consistency;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.ResultScanner;
070import org.apache.hadoop.hbase.client.RowMutations;
071import org.apache.hadoop.hbase.client.Scan;
072import org.apache.hadoop.hbase.client.Table;
073import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
074import org.apache.hadoop.hbase.filter.BinaryComparator;
075import org.apache.hadoop.hbase.filter.Filter;
076import org.apache.hadoop.hbase.filter.FilterAllFilter;
077import org.apache.hadoop.hbase.filter.FilterList;
078import org.apache.hadoop.hbase.filter.PageFilter;
079import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
080import org.apache.hadoop.hbase.filter.WhileMatchFilter;
081import org.apache.hadoop.hbase.io.compress.Compression;
082import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
083import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
084import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
085import org.apache.hadoop.hbase.regionserver.BloomType;
086import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
087import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
088import org.apache.hadoop.hbase.trace.SpanReceiverHost;
089import org.apache.hadoop.hbase.trace.TraceUtil;
090import org.apache.hadoop.hbase.util.ByteArrayHashKey;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.Hash;
093import org.apache.hadoop.hbase.util.MurmurHash;
094import org.apache.hadoop.hbase.util.Pair;
095import org.apache.hadoop.hbase.util.YammerHistogramUtils;
096import org.apache.hadoop.io.LongWritable;
097import org.apache.hadoop.io.Text;
098import org.apache.hadoop.mapreduce.Job;
099import org.apache.hadoop.mapreduce.Mapper;
100import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
101import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
102import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
103import org.apache.hadoop.util.Tool;
104import org.apache.hadoop.util.ToolRunner;
105import org.apache.htrace.core.ProbabilitySampler;
106import org.apache.htrace.core.Sampler;
107import org.apache.htrace.core.TraceScope;
108import org.apache.yetus.audience.InterfaceAudience;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
113
114/**
115 * Script used evaluating HBase performance and scalability.  Runs a HBase
116 * client that steps through one of a set of hardcoded tests or 'experiments'
117 * (e.g. a random reads test, a random writes test, etc.). Pass on the
118 * command-line which test to run and how many clients are participating in
119 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
120 *
121 * <p>This class sets up and runs the evaluation programs described in
122 * Section 7, <i>Performance Evaluation</i>, of the <a
123 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
124 * paper, pages 8-10.
125 *
126 * <p>By default, runs as a mapreduce job where each mapper runs a single test
127 * client. Can also run as a non-mapreduce, multithreaded application by
128 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
129 * specified otherwise.
130 */
131@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
132public class PerformanceEvaluation extends Configured implements Tool {
133  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
134  static final String RANDOM_READ = "randomRead";
135  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
136  private static final ObjectMapper MAPPER = new ObjectMapper();
137  static {
138    MAPPER.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
139  }
140
141  public static final String TABLE_NAME = "TestTable";
142  public static final String FAMILY_NAME_BASE = "info";
143  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
144  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
145  public static final int DEFAULT_VALUE_LENGTH = 1000;
146  public static final int ROW_LENGTH = 26;
147
148  private static final int ONE_GB = 1024 * 1024 * 1000;
149  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
150  // TODO : should we make this configurable
151  private static final int TAG_LENGTH = 256;
152  private static final DecimalFormat FMT = new DecimalFormat("0.##");
153  private static final MathContext CXT = MathContext.DECIMAL64;
154  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
155  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
156  private static final TestOptions DEFAULT_OPTS = new TestOptions();
157
158  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
159  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
160
161  static {
162    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
163        "Run async random read test");
164    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
165        "Run async random write test");
166    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
167        "Run async sequential read test");
168    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
169        "Run async sequential write test");
170    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
171        "Run async scan test (read every row)");
172    addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
173      "Run random read test");
174    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
175      "Run random seek and scan 100 test");
176    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
177      "Run random seek scan with both start and stop row (max 10 rows)");
178    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
179      "Run random seek scan with both start and stop row (max 100 rows)");
180    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
181      "Run random seek scan with both start and stop row (max 1000 rows)");
182    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
183      "Run random seek scan with both start and stop row (max 10000 rows)");
184    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
185      "Run random write test");
186    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
187      "Run sequential read test");
188    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
189      "Run sequential write test");
190    addCommandDescriptor(ScanTest.class, "scan",
191      "Run scan test (read every row)");
192    addCommandDescriptor(FilteredScanTest.class, "filterScan",
193      "Run scan test using a filter to find a specific row based on it's value " +
194      "(make sure to use --rows=20)");
195    addCommandDescriptor(IncrementTest.class, "increment",
196      "Increment on each row; clients overlap on keyspace so some concurrent operations");
197    addCommandDescriptor(AppendTest.class, "append",
198      "Append on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
200      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
202      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
204      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
205  }
206
207  /**
208   * Enum for map metrics.  Keep it out here rather than inside in the Map
209   * inner-class so we can find associated properties.
210   */
211  protected static enum Counter {
212    /** elapsed time */
213    ELAPSED_TIME,
214    /** number of rows */
215    ROWS
216  }
217
218  protected static class RunResult implements Comparable<RunResult> {
219    public RunResult(long duration, Histogram hist) {
220      this.duration = duration;
221      this.hist = hist;
222    }
223
224    public final long duration;
225    public final Histogram hist;
226
227    @Override
228    public String toString() {
229      return Long.toString(duration);
230    }
231
232    @Override public int compareTo(RunResult o) {
233      return Long.compare(this.duration, o.duration);
234    }
235  }
236
237  /**
238   * Constructor
239   * @param conf Configuration object
240   */
241  public PerformanceEvaluation(final Configuration conf) {
242    super(conf);
243  }
244
245  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
246      String name, String description) {
247    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
248    COMMANDS.put(name, cmdDescriptor);
249  }
250
251  /**
252   * Implementations can have their status set.
253   */
254  interface Status {
255    /**
256     * Sets status
257     * @param msg status message
258     * @throws IOException
259     */
260    void setStatus(final String msg) throws IOException;
261  }
262
263  /**
264   * MapReduce job that runs a performance evaluation client in each map task.
265   */
266  public static class EvaluationMapTask
267      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
268
269    /** configuration parameter name that contains the command */
270    public final static String CMD_KEY = "EvaluationMapTask.command";
271    /** configuration parameter name that contains the PE impl */
272    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
273
274    private Class<? extends Test> cmd;
275
276    @Override
277    protected void setup(Context context) throws IOException, InterruptedException {
278      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
279
280      // this is required so that extensions of PE are instantiated within the
281      // map reduce task...
282      Class<? extends PerformanceEvaluation> peClass =
283          forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
284      try {
285        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
286      } catch (Exception e) {
287        throw new IllegalStateException("Could not instantiate PE instance", e);
288      }
289    }
290
291    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
292      try {
293        return Class.forName(className).asSubclass(type);
294      } catch (ClassNotFoundException e) {
295        throw new IllegalStateException("Could not find class for name: " + className, e);
296      }
297    }
298
299    @Override
300    protected void map(LongWritable key, Text value, final Context context)
301           throws IOException, InterruptedException {
302
303      Status status = new Status() {
304        @Override
305        public void setStatus(String msg) {
306           context.setStatus(msg);
307        }
308      };
309
310      ObjectMapper mapper = new ObjectMapper();
311      TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
312      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
313      final Connection con = ConnectionFactory.createConnection(conf);
314      AsyncConnection asyncCon = null;
315      try {
316        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
317      } catch (ExecutionException e) {
318        throw new IOException(e);
319      }
320
321      // Evaluation task
322      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
323      // Collect how much time the thing took. Report as map output and
324      // to the ELAPSED_TIME counter.
325      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
326      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
327      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
328      context.progress();
329    }
330  }
331
332  /*
333   * If table does not already exist, create. Also create a table when
334   * {@code opts.presplitRegions} is specified or when the existing table's
335   * region replica count doesn't match {@code opts.replicas}.
336   */
337  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
338    TableName tableName = TableName.valueOf(opts.tableName);
339    boolean needsDelete = false, exists = admin.tableExists(tableName);
340    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
341      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
342    if (!exists && isReadCmd) {
343      throw new IllegalStateException(
344        "Must specify an existing table for read commands. Run a write command first.");
345    }
346    HTableDescriptor desc =
347      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
348    byte[][] splits = getSplits(opts);
349
350    // recreate the table when user has requested presplit or when existing
351    // {RegionSplitPolicy,replica count} does not match requested, or when the
352    // number of column families does not match requested.
353    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
354      || (!isReadCmd && desc != null &&
355          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
356      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
357      || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
358      needsDelete = true;
359      // wait, why did it delete my table?!?
360      LOG.debug(MoreObjects.toStringHelper("needsDelete")
361        .add("needsDelete", needsDelete)
362        .add("isReadCmd", isReadCmd)
363        .add("exists", exists)
364        .add("desc", desc)
365        .add("presplit", opts.presplitRegions)
366        .add("splitPolicy", opts.splitPolicy)
367        .add("replicas", opts.replicas)
368        .add("families", opts.families)
369        .toString());
370    }
371
372    // remove an existing table
373    if (needsDelete) {
374      if (admin.isTableEnabled(tableName)) {
375        admin.disableTable(tableName);
376      }
377      admin.deleteTable(tableName);
378    }
379
380    // table creation is necessary
381    if (!exists || needsDelete) {
382      desc = getTableDescriptor(opts);
383      if (splits != null) {
384        if (LOG.isDebugEnabled()) {
385          for (int i = 0; i < splits.length; i++) {
386            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
387          }
388        }
389      }
390      admin.createTable(desc, splits);
391      LOG.info("Table " + desc + " created");
392    }
393    return admin.tableExists(tableName);
394  }
395
396  /**
397   * Create an HTableDescriptor from provided TestOptions.
398   */
399  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
400    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
401    for (int family = 0; family < opts.families; family++) {
402      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
403      HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
404      familyDesc.setDataBlockEncoding(opts.blockEncoding);
405      familyDesc.setCompressionType(opts.compression);
406      familyDesc.setBloomFilterType(opts.bloomType);
407      familyDesc.setBlocksize(opts.blockSize);
408      if (opts.inMemoryCF) {
409        familyDesc.setInMemory(true);
410      }
411      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
412      tableDesc.addFamily(familyDesc);
413    }
414    if (opts.replicas != DEFAULT_OPTS.replicas) {
415      tableDesc.setRegionReplication(opts.replicas);
416    }
417    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
418      tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
419    }
420    return tableDesc;
421  }
422
423  /**
424   * generates splits based on total number of rows and specified split regions
425   */
426  protected static byte[][] getSplits(TestOptions opts) {
427    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
428      return null;
429
430    int numSplitPoints = opts.presplitRegions - 1;
431    byte[][] splits = new byte[numSplitPoints][];
432    int jump = opts.totalRows / opts.presplitRegions;
433    for (int i = 0; i < numSplitPoints; i++) {
434      int rowkey = jump * (1 + i);
435      splits[i] = format(rowkey);
436    }
437    return splits;
438  }
439
440  /*
441   * Run all clients in this vm each to its own thread.
442   */
443  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
444      throws IOException, InterruptedException, ExecutionException {
445    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
446    assert cmd != null;
447    @SuppressWarnings("unchecked")
448    Future<RunResult>[] threads = new Future[opts.numClientThreads];
449    RunResult[] results = new RunResult[opts.numClientThreads];
450    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
451      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
452    final Connection con = ConnectionFactory.createConnection(conf);
453    final AsyncConnection asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
454    for (int i = 0; i < threads.length; i++) {
455      final int index = i;
456      threads[i] = pool.submit(new Callable<RunResult>() {
457        @Override
458        public RunResult call() throws Exception {
459          TestOptions threadOpts = new TestOptions(opts);
460          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
461          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
462            @Override
463            public void setStatus(final String msg) throws IOException {
464              LOG.info(msg);
465            }
466          });
467          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
468            "ms over " + threadOpts.perClientRunRows + " rows");
469          return run;
470        }
471      });
472    }
473    pool.shutdown();
474
475    for (int i = 0; i < threads.length; i++) {
476      try {
477        results[i] = threads[i].get();
478      } catch (ExecutionException e) {
479        throw new IOException(e.getCause());
480      }
481    }
482    final String test = cmd.getSimpleName();
483    LOG.info("[" + test + "] Summary of timings (ms): "
484             + Arrays.toString(results));
485    Arrays.sort(results);
486    long total = 0;
487    for (RunResult result : results) {
488      total += result.duration;
489    }
490    LOG.info("[" + test + "]"
491      + "\tMin: " + results[0] + "ms"
492      + "\tMax: " + results[results.length - 1] + "ms"
493      + "\tAvg: " + (total / results.length) + "ms");
494
495    con.close();
496    asyncCon.close();
497
498    return results;
499  }
500
501  /*
502   * Run a mapreduce job.  Run as many maps as asked-for clients.
503   * Before we start up the job, write out an input file with instruction
504   * per client regards which row they are to start on.
505   * @param cmd Command to run.
506   * @throws IOException
507   */
508  static Job doMapReduce(TestOptions opts, final Configuration conf)
509      throws IOException, InterruptedException, ClassNotFoundException {
510    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
511    assert cmd != null;
512    Path inputDir = writeInputFile(conf, opts);
513    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
514    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
515    Job job = Job.getInstance(conf);
516    job.setJarByClass(PerformanceEvaluation.class);
517    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
518
519    job.setInputFormatClass(NLineInputFormat.class);
520    NLineInputFormat.setInputPaths(job, inputDir);
521    // this is default, but be explicit about it just in case.
522    NLineInputFormat.setNumLinesPerSplit(job, 1);
523
524    job.setOutputKeyClass(LongWritable.class);
525    job.setOutputValueClass(LongWritable.class);
526
527    job.setMapperClass(EvaluationMapTask.class);
528    job.setReducerClass(LongSumReducer.class);
529
530    job.setNumReduceTasks(1);
531
532    job.setOutputFormatClass(TextOutputFormat.class);
533    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
534
535    TableMapReduceUtil.addDependencyJars(job);
536    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
537      Histogram.class,     // yammer metrics
538      ObjectMapper.class,  // jackson-mapper-asl
539      FilterAllFilter.class // hbase-server tests jar
540      );
541
542    TableMapReduceUtil.initCredentials(job);
543
544    job.waitForCompletion(true);
545    return job;
546  }
547
548  /**
549   * Each client has one mapper to do the work,  and client do the resulting count in a map task.
550   */
551
552  static String JOB_INPUT_FILENAME = "input.txt";
553
554  /*
555   * Write input file of offsets-per-client for the mapreduce job.
556   * @param c Configuration
557   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
558   * @throws IOException
559   */
560  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
561    return writeInputFile(c, opts, new Path("."));
562  }
563
564  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
565  throws IOException {
566    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
567    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
568    Path inputDir = new Path(jobdir, "inputs");
569
570    FileSystem fs = FileSystem.get(c);
571    fs.mkdirs(inputDir);
572
573    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
574    PrintStream out = new PrintStream(fs.create(inputFile));
575    // Make input random.
576    Map<Integer, String> m = new TreeMap<>();
577    Hash h = MurmurHash.getInstance();
578    int perClientRows = (opts.totalRows / opts.numClientThreads);
579    try {
580      for (int j = 0; j < opts.numClientThreads; j++) {
581        TestOptions next = new TestOptions(opts);
582        next.startRow = j * perClientRows;
583        next.perClientRunRows = perClientRows;
584        String s = MAPPER.writeValueAsString(next);
585        LOG.info("Client=" + j + ", input=" + s);
586        byte[] b = Bytes.toBytes(s);
587        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
588        m.put(hash, s);
589      }
590      for (Map.Entry<Integer, String> e: m.entrySet()) {
591        out.println(e.getValue());
592      }
593    } finally {
594      out.close();
595    }
596    return inputDir;
597  }
598
599  /**
600   * Describes a command.
601   */
602  static class CmdDescriptor {
603    private Class<? extends TestBase> cmdClass;
604    private String name;
605    private String description;
606
607    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
608      this.cmdClass = cmdClass;
609      this.name = name;
610      this.description = description;
611    }
612
613    public Class<? extends TestBase> getCmdClass() {
614      return cmdClass;
615    }
616
617    public String getName() {
618      return name;
619    }
620
621    public String getDescription() {
622      return description;
623    }
624  }
625
626  /**
627   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
628   * This makes tracking all these arguments a little easier.
629   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
630   * serialization of this TestOptions class behave), and you need to add to the clone constructor
631   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
632   */
633  static class TestOptions {
634    String cmdName = null;
635    boolean nomapred = false;
636    boolean filterAll = false;
637    int startRow = 0;
638    float size = 1.0f;
639    int perClientRunRows = DEFAULT_ROWS_PER_GB;
640    int numClientThreads = 1;
641    int totalRows = DEFAULT_ROWS_PER_GB;
642    int measureAfter = 0;
643    float sampleRate = 1.0f;
644    double traceRate = 0.0;
645    String tableName = TABLE_NAME;
646    boolean flushCommits = true;
647    boolean writeToWAL = true;
648    boolean autoFlush = false;
649    boolean oneCon = false;
650    boolean useTags = false;
651    int noOfTags = 1;
652    boolean reportLatency = false;
653    int multiGet = 0;
654    int randomSleep = 0;
655    boolean inMemoryCF = false;
656    int presplitRegions = 0;
657    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
658    String splitPolicy = null;
659    Compression.Algorithm compression = Compression.Algorithm.NONE;
660    BloomType bloomType = BloomType.ROW;
661    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
662    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
663    boolean valueRandom = false;
664    boolean valueZipf = false;
665    int valueSize = DEFAULT_VALUE_LENGTH;
666    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
667    int cycles = 1;
668    int columns = 1;
669    int families = 1;
670    int caching = 30;
671    boolean addColumns = true;
672    MemoryCompactionPolicy inMemoryCompaction =
673        MemoryCompactionPolicy.valueOf(
674            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
675    boolean asyncPrefetch = false;
676    boolean cacheBlocks = true;
677    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
678    long bufferSize = 2l * 1024l * 1024l;
679
680    public TestOptions() {}
681
682    /**
683     * Clone constructor.
684     * @param that Object to copy from.
685     */
686    public TestOptions(TestOptions that) {
687      this.cmdName = that.cmdName;
688      this.cycles = that.cycles;
689      this.nomapred = that.nomapred;
690      this.startRow = that.startRow;
691      this.size = that.size;
692      this.perClientRunRows = that.perClientRunRows;
693      this.numClientThreads = that.numClientThreads;
694      this.totalRows = that.totalRows;
695      this.sampleRate = that.sampleRate;
696      this.traceRate = that.traceRate;
697      this.tableName = that.tableName;
698      this.flushCommits = that.flushCommits;
699      this.writeToWAL = that.writeToWAL;
700      this.autoFlush = that.autoFlush;
701      this.oneCon = that.oneCon;
702      this.useTags = that.useTags;
703      this.noOfTags = that.noOfTags;
704      this.reportLatency = that.reportLatency;
705      this.multiGet = that.multiGet;
706      this.inMemoryCF = that.inMemoryCF;
707      this.presplitRegions = that.presplitRegions;
708      this.replicas = that.replicas;
709      this.splitPolicy = that.splitPolicy;
710      this.compression = that.compression;
711      this.blockEncoding = that.blockEncoding;
712      this.filterAll = that.filterAll;
713      this.bloomType = that.bloomType;
714      this.blockSize = that.blockSize;
715      this.valueRandom = that.valueRandom;
716      this.valueZipf = that.valueZipf;
717      this.valueSize = that.valueSize;
718      this.period = that.period;
719      this.randomSleep = that.randomSleep;
720      this.measureAfter = that.measureAfter;
721      this.addColumns = that.addColumns;
722      this.columns = that.columns;
723      this.families = that.families;
724      this.caching = that.caching;
725      this.inMemoryCompaction = that.inMemoryCompaction;
726      this.asyncPrefetch = that.asyncPrefetch;
727      this.cacheBlocks = that.cacheBlocks;
728      this.scanReadType = that.scanReadType;
729      this.bufferSize = that.bufferSize;
730    }
731
732    public int getCaching() {
733      return this.caching;
734    }
735
736    public void setCaching(final int caching) {
737      this.caching = caching;
738    }
739
740    public int getColumns() {
741      return this.columns;
742    }
743
744    public void setColumns(final int columns) {
745      this.columns = columns;
746    }
747
748    public int getFamilies() {
749      return this.families;
750    }
751
752    public void setFamilies(final int families) {
753      this.families = families;
754    }
755
756    public int getCycles() {
757      return this.cycles;
758    }
759
760    public void setCycles(final int cycles) {
761      this.cycles = cycles;
762    }
763
764    public boolean isValueZipf() {
765      return valueZipf;
766    }
767
768    public void setValueZipf(boolean valueZipf) {
769      this.valueZipf = valueZipf;
770    }
771
772    public String getCmdName() {
773      return cmdName;
774    }
775
776    public void setCmdName(String cmdName) {
777      this.cmdName = cmdName;
778    }
779
780    public int getRandomSleep() {
781      return randomSleep;
782    }
783
784    public void setRandomSleep(int randomSleep) {
785      this.randomSleep = randomSleep;
786    }
787
788    public int getReplicas() {
789      return replicas;
790    }
791
792    public void setReplicas(int replicas) {
793      this.replicas = replicas;
794    }
795
796    public String getSplitPolicy() {
797      return splitPolicy;
798    }
799
800    public void setSplitPolicy(String splitPolicy) {
801      this.splitPolicy = splitPolicy;
802    }
803
804    public void setNomapred(boolean nomapred) {
805      this.nomapred = nomapred;
806    }
807
808    public void setFilterAll(boolean filterAll) {
809      this.filterAll = filterAll;
810    }
811
812    public void setStartRow(int startRow) {
813      this.startRow = startRow;
814    }
815
816    public void setSize(float size) {
817      this.size = size;
818    }
819
820    public void setPerClientRunRows(int perClientRunRows) {
821      this.perClientRunRows = perClientRunRows;
822    }
823
824    public void setNumClientThreads(int numClientThreads) {
825      this.numClientThreads = numClientThreads;
826    }
827
828    public void setTotalRows(int totalRows) {
829      this.totalRows = totalRows;
830    }
831
832    public void setSampleRate(float sampleRate) {
833      this.sampleRate = sampleRate;
834    }
835
836    public void setTraceRate(double traceRate) {
837      this.traceRate = traceRate;
838    }
839
840    public void setTableName(String tableName) {
841      this.tableName = tableName;
842    }
843
844    public void setFlushCommits(boolean flushCommits) {
845      this.flushCommits = flushCommits;
846    }
847
848    public void setWriteToWAL(boolean writeToWAL) {
849      this.writeToWAL = writeToWAL;
850    }
851
852    public void setAutoFlush(boolean autoFlush) {
853      this.autoFlush = autoFlush;
854    }
855
856    public void setOneCon(boolean oneCon) {
857      this.oneCon = oneCon;
858    }
859
860    public void setUseTags(boolean useTags) {
861      this.useTags = useTags;
862    }
863
864    public void setNoOfTags(int noOfTags) {
865      this.noOfTags = noOfTags;
866    }
867
868    public void setReportLatency(boolean reportLatency) {
869      this.reportLatency = reportLatency;
870    }
871
872    public void setMultiGet(int multiGet) {
873      this.multiGet = multiGet;
874    }
875
876    public void setInMemoryCF(boolean inMemoryCF) {
877      this.inMemoryCF = inMemoryCF;
878    }
879
880    public void setPresplitRegions(int presplitRegions) {
881      this.presplitRegions = presplitRegions;
882    }
883
884    public void setCompression(Compression.Algorithm compression) {
885      this.compression = compression;
886    }
887
888    public void setBloomType(BloomType bloomType) {
889      this.bloomType = bloomType;
890    }
891
892    public void setBlockSize(int blockSize) {
893      this.blockSize = blockSize;
894    }
895
896    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
897      this.blockEncoding = blockEncoding;
898    }
899
900    public void setValueRandom(boolean valueRandom) {
901      this.valueRandom = valueRandom;
902    }
903
904    public void setValueSize(int valueSize) {
905      this.valueSize = valueSize;
906    }
907
908    public void setBufferSize(long bufferSize) {
909      this.bufferSize = bufferSize;
910    }
911
912    public void setPeriod(int period) {
913      this.period = period;
914    }
915
916    public boolean isNomapred() {
917      return nomapred;
918    }
919
920    public boolean isFilterAll() {
921      return filterAll;
922    }
923
924    public int getStartRow() {
925      return startRow;
926    }
927
928    public float getSize() {
929      return size;
930    }
931
932    public int getPerClientRunRows() {
933      return perClientRunRows;
934    }
935
936    public int getNumClientThreads() {
937      return numClientThreads;
938    }
939
940    public int getTotalRows() {
941      return totalRows;
942    }
943
944    public float getSampleRate() {
945      return sampleRate;
946    }
947
948    public double getTraceRate() {
949      return traceRate;
950    }
951
952    public String getTableName() {
953      return tableName;
954    }
955
956    public boolean isFlushCommits() {
957      return flushCommits;
958    }
959
960    public boolean isWriteToWAL() {
961      return writeToWAL;
962    }
963
964    public boolean isAutoFlush() {
965      return autoFlush;
966    }
967
968    public boolean isUseTags() {
969      return useTags;
970    }
971
972    public int getNoOfTags() {
973      return noOfTags;
974    }
975
976    public boolean isReportLatency() {
977      return reportLatency;
978    }
979
980    public int getMultiGet() {
981      return multiGet;
982    }
983
984    public boolean isInMemoryCF() {
985      return inMemoryCF;
986    }
987
988    public int getPresplitRegions() {
989      return presplitRegions;
990    }
991
992    public Compression.Algorithm getCompression() {
993      return compression;
994    }
995
996    public DataBlockEncoding getBlockEncoding() {
997      return blockEncoding;
998    }
999
1000    public boolean isValueRandom() {
1001      return valueRandom;
1002    }
1003
1004    public int getValueSize() {
1005      return valueSize;
1006    }
1007
1008    public int getPeriod() {
1009      return period;
1010    }
1011
1012    public BloomType getBloomType() {
1013      return bloomType;
1014    }
1015
1016    public int getBlockSize() {
1017      return blockSize;
1018    }
1019
1020    public boolean isOneCon() {
1021      return oneCon;
1022    }
1023
1024    public int getMeasureAfter() {
1025      return measureAfter;
1026    }
1027
1028    public void setMeasureAfter(int measureAfter) {
1029      this.measureAfter = measureAfter;
1030    }
1031
1032    public boolean getAddColumns() {
1033      return addColumns;
1034    }
1035
1036    public void setAddColumns(boolean addColumns) {
1037      this.addColumns = addColumns;
1038    }
1039
1040    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1041      this.inMemoryCompaction = inMemoryCompaction;
1042    }
1043
1044    public MemoryCompactionPolicy getInMemoryCompaction() {
1045      return this.inMemoryCompaction;
1046    }
1047
1048    public long getBufferSize() {
1049      return this.bufferSize;
1050    }
1051  }
1052
1053  /*
1054   * A test.
1055   * Subclass to particularize what happens per row.
1056   */
1057  static abstract class TestBase {
1058    // Below is make it so when Tests are all running in the one
1059    // jvm, that they each have a differently seeded Random.
1060    private static final Random randomSeed = new Random(System.currentTimeMillis());
1061
1062    private static long nextRandomSeed() {
1063      return randomSeed.nextLong();
1064    }
1065    private final int everyN;
1066
1067    protected final Random rand = new Random(nextRandomSeed());
1068    protected final Configuration conf;
1069    protected final TestOptions opts;
1070
1071    private final Status status;
1072    private final Sampler traceSampler;
1073    private final SpanReceiverHost receiverHost;
1074
1075    private String testName;
1076    private Histogram latencyHistogram;
1077    private Histogram valueSizeHistogram;
1078    private Histogram rpcCallsHistogram;
1079    private Histogram remoteRpcCallsHistogram;
1080    private Histogram millisBetweenNextHistogram;
1081    private Histogram regionsScannedHistogram;
1082    private Histogram bytesInResultsHistogram;
1083    private Histogram bytesInRemoteResultsHistogram;
1084    private RandomDistribution.Zipf zipf;
1085
1086    /**
1087     * Note that all subclasses of this class must provide a public constructor
1088     * that has the exact same list of arguments.
1089     */
1090    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1091      this.conf = conf;
1092      this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1093      this.opts = options;
1094      this.status = status;
1095      this.testName = this.getClass().getSimpleName();
1096      if (options.traceRate >= 1.0) {
1097        this.traceSampler = Sampler.ALWAYS;
1098      } else if (options.traceRate > 0.0) {
1099        conf.setDouble("hbase.sampler.fraction", options.traceRate);
1100        this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1101      } else {
1102        this.traceSampler = Sampler.NEVER;
1103      }
1104      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1105      if (options.isValueZipf()) {
1106        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1107      }
1108      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1109    }
1110
1111    int getValueLength(final Random r) {
1112      if (this.opts.isValueRandom()) {
1113        return r.nextInt(opts.valueSize);
1114      } else if (this.opts.isValueZipf()) {
1115        return Math.abs(this.zipf.nextInt());
1116      } else {
1117        return opts.valueSize;
1118      }
1119    }
1120
1121    void updateValueSize(final Result [] rs) throws IOException {
1122      if (rs == null || !isRandomValueSize()) return;
1123      for (Result r: rs) updateValueSize(r);
1124    }
1125
1126    void updateValueSize(final Result r) throws IOException {
1127      if (r == null || !isRandomValueSize()) return;
1128      int size = 0;
1129      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1130        size += scanner.current().getValueLength();
1131      }
1132      updateValueSize(size);
1133    }
1134
1135    void updateValueSize(final int valueSize) {
1136      if (!isRandomValueSize()) return;
1137      this.valueSizeHistogram.update(valueSize);
1138    }
1139
1140    void updateScanMetrics(final ScanMetrics metrics) {
1141      if (metrics == null) return;
1142      Map<String,Long> metricsMap = metrics.getMetricsMap();
1143      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1144      if (rpcCalls != null) {
1145        this.rpcCallsHistogram.update(rpcCalls.longValue());
1146      }
1147      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1148      if (remoteRpcCalls != null) {
1149        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1150      }
1151      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1152      if (millisBetweenNext != null) {
1153        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1154      }
1155      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1156      if (regionsScanned != null) {
1157        this.regionsScannedHistogram.update(regionsScanned.longValue());
1158      }
1159      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1160      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1161        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1162      }
1163      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1164      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1165        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1166      }
1167    }
1168
1169    String generateStatus(final int sr, final int i, final int lr) {
1170      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1171        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1172    }
1173
1174    boolean isRandomValueSize() {
1175      return opts.valueRandom;
1176    }
1177
1178    protected int getReportingPeriod() {
1179      return opts.period;
1180    }
1181
1182    /**
1183     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1184     */
1185    public Histogram getLatencyHistogram() {
1186      return latencyHistogram;
1187    }
1188
1189    void testSetup() throws IOException {
1190      // test metrics
1191      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1192      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1193      // scan metrics
1194      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1195      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1196      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1197      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1198      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1199      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1200
1201      createConnection();
1202      onStartup();
1203    }
1204
1205    abstract void createConnection() throws IOException;
1206
1207    abstract void onStartup() throws IOException;
1208
1209    void testTakedown() throws IOException {
1210      onTakedown();
1211      // Print all stats for this thread continuously.
1212      // Synchronize on Test.class so different threads don't intermingle the
1213      // output. We can't use 'this' here because each thread has its own instance of Test class.
1214      synchronized (Test.class) {
1215        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1216        status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1217            latencyHistogram));
1218        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1219        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1220        status.setStatus("ValueSize (bytes) : "
1221            + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1222        status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1223        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1224        if (rpcCallsHistogram.getCount() > 0) {
1225          status.setStatus("rpcCalls (count): " +
1226              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1227        }
1228        if (remoteRpcCallsHistogram.getCount() > 0) {
1229          status.setStatus("remoteRpcCalls (count): " +
1230              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1231        }
1232        if (millisBetweenNextHistogram.getCount() > 0) {
1233          status.setStatus("millisBetweenNext (latency): " +
1234              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1235        }
1236        if (regionsScannedHistogram.getCount() > 0) {
1237          status.setStatus("regionsScanned (count): " +
1238              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1239        }
1240        if (bytesInResultsHistogram.getCount() > 0) {
1241          status.setStatus("bytesInResults (size): " +
1242              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1243        }
1244        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1245          status.setStatus("bytesInRemoteResults (size): " +
1246              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1247        }
1248      }
1249      closeConnection();
1250      receiverHost.closeReceivers();
1251    }
1252
1253    abstract void onTakedown() throws IOException;
1254
1255    abstract void closeConnection() throws IOException;
1256
1257    /*
1258     * Run test
1259     * @return Elapsed time.
1260     * @throws IOException
1261     */
1262    long test() throws IOException, InterruptedException {
1263      testSetup();
1264      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1265      final long startTime = System.nanoTime();
1266      try {
1267        testTimed();
1268      } finally {
1269        testTakedown();
1270      }
1271      return (System.nanoTime() - startTime) / 1000000;
1272    }
1273
1274    int getStartRow() {
1275      return opts.startRow;
1276    }
1277
1278    int getLastRow() {
1279      return getStartRow() + opts.perClientRunRows;
1280    }
1281
1282    /**
1283     * Provides an extension point for tests that don't want a per row invocation.
1284     */
1285    void testTimed() throws IOException, InterruptedException {
1286      int startRow = getStartRow();
1287      int lastRow = getLastRow();
1288      TraceUtil.addSampler(traceSampler);
1289      // Report on completion of 1/10th of total.
1290      for (int ii = 0; ii < opts.cycles; ii++) {
1291        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1292        for (int i = startRow; i < lastRow; i++) {
1293          if (i % everyN != 0) continue;
1294          long startTime = System.nanoTime();
1295          try (TraceScope scope = TraceUtil.createTrace("test row");){
1296            testRow(i);
1297          }
1298          if ( (i - startRow) > opts.measureAfter) {
1299            // If multiget is enabled, say set to 10, testRow() returns immediately first 9 times
1300            // and sends the actual get request in the 10th iteration. We should only set latency
1301            // when actual request is sent because otherwise it turns out to be 0.
1302            if (opts.multiGet == 0 || (i - startRow + 1) % opts.multiGet == 0) {
1303              latencyHistogram.update((System.nanoTime() - startTime) / 1000);
1304            }
1305            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1306              status.setStatus(generateStatus(startRow, i, lastRow));
1307            }
1308          }
1309        }
1310      }
1311    }
1312
1313    /**
1314     * @return Subset of the histograms' calculation.
1315     */
1316    public String getShortLatencyReport() {
1317      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1318    }
1319
1320    /**
1321     * @return Subset of the histograms' calculation.
1322     */
1323    public String getShortValueSizeReport() {
1324      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1325    }
1326
1327    /*
1328    * Test for individual row.
1329    * @param i Row index.
1330    */
1331    abstract void testRow(final int i) throws IOException, InterruptedException;
1332  }
1333
1334  static abstract class Test extends TestBase {
1335    protected Connection connection;
1336
1337    Test(final Connection con, final TestOptions options, final Status status) {
1338      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1339      this.connection = con;
1340    }
1341
1342    @Override
1343    void createConnection() throws IOException {
1344      if (!opts.isOneCon()) {
1345        this.connection = ConnectionFactory.createConnection(conf);
1346      }
1347    }
1348
1349    @Override
1350    void closeConnection() throws IOException {
1351      if (!opts.isOneCon()) {
1352        this.connection.close();
1353      }
1354    }
1355  }
1356
1357  static abstract class AsyncTest extends TestBase {
1358    protected AsyncConnection connection;
1359
1360    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1361      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1362      this.connection = con;
1363    }
1364
1365    @Override
1366    void createConnection() {
1367      if (!opts.isOneCon()) {
1368        try {
1369          this.connection = ConnectionFactory.createAsyncConnection(conf).get();
1370        } catch (InterruptedException | ExecutionException e) {
1371          LOG.error("Failed to create async connection", e);
1372        }
1373      }
1374    }
1375
1376    @Override
1377    void closeConnection() throws IOException {
1378      if (!opts.isOneCon()) {
1379        this.connection.close();
1380      }
1381    }
1382  }
1383
1384  static abstract class TableTest extends Test {
1385    protected Table table;
1386
1387    TableTest(Connection con, TestOptions options, Status status) {
1388      super(con, options, status);
1389    }
1390
1391    @Override
1392    void onStartup() throws IOException {
1393      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1394    }
1395
1396    @Override
1397    void onTakedown() throws IOException {
1398      table.close();
1399    }
1400  }
1401
1402  static abstract class AsyncTableTest extends AsyncTest {
1403    protected AsyncTable<?> table;
1404
1405    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1406      super(con, options, status);
1407    }
1408
1409    @Override
1410    void onStartup() throws IOException {
1411      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1412    }
1413
1414    @Override
1415    void onTakedown() throws IOException {
1416    }
1417  }
1418
1419  static class AsyncRandomReadTest extends AsyncTableTest {
1420    private final Consistency consistency;
1421    private ArrayList<Get> gets;
1422    private Random rd = new Random();
1423
1424    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1425      super(con, options, status);
1426      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1427      if (opts.multiGet > 0) {
1428        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1429        this.gets = new ArrayList<>(opts.multiGet);
1430      }
1431    }
1432
1433    @Override
1434    void testRow(final int i) throws IOException, InterruptedException {
1435      if (opts.randomSleep > 0) {
1436        Thread.sleep(rd.nextInt(opts.randomSleep));
1437      }
1438      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1439      for (int family = 0; family < opts.families; family++) {
1440        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1441        if (opts.addColumns) {
1442          for (int column = 0; column < opts.columns; column++) {
1443            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1444            get.addColumn(familyName, qualifier);
1445          }
1446        } else {
1447          get.addFamily(familyName);
1448        }
1449      }
1450      if (opts.filterAll) {
1451        get.setFilter(new FilterAllFilter());
1452      }
1453      get.setConsistency(consistency);
1454      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1455      try {
1456        if (opts.multiGet > 0) {
1457          this.gets.add(get);
1458          if (this.gets.size() == opts.multiGet) {
1459            Result[] rs =
1460                this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1461            updateValueSize(rs);
1462            this.gets.clear();
1463          }
1464        } else {
1465          updateValueSize(this.table.get(get).get());
1466        }
1467      } catch (ExecutionException e) {
1468        throw new IOException(e);
1469      }
1470    }
1471
1472    public static RuntimeException runtime(Throwable e) {
1473      if (e instanceof RuntimeException) {
1474        return (RuntimeException) e;
1475      }
1476      return new RuntimeException(e);
1477    }
1478
1479    public static <V> V propagate(Callable<V> callable) {
1480      try {
1481        return callable.call();
1482      } catch (Exception e) {
1483        throw runtime(e);
1484      }
1485    }
1486
1487    @Override
1488    protected int getReportingPeriod() {
1489      int period = opts.perClientRunRows / 10;
1490      return period == 0 ? opts.perClientRunRows : period;
1491    }
1492
1493    @Override
1494    protected void testTakedown() throws IOException {
1495      if (this.gets != null && this.gets.size() > 0) {
1496        this.table.get(gets);
1497        this.gets.clear();
1498      }
1499      super.testTakedown();
1500    }
1501  }
1502
1503  static class AsyncRandomWriteTest extends AsyncTableTest {
1504    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1505      super(con, options, status);
1506    }
1507
1508    @Override
1509    void testRow(final int i) throws IOException, InterruptedException {
1510      byte[] row = getRandomRow(this.rand, opts.totalRows);
1511      Put put = new Put(row);
1512      for (int family = 0; family < opts.families; family++) {
1513        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1514        for (int column = 0; column < opts.columns; column++) {
1515          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1516          byte[] value = generateData(this.rand, getValueLength(this.rand));
1517          if (opts.useTags) {
1518            byte[] tag = generateData(this.rand, TAG_LENGTH);
1519            Tag[] tags = new Tag[opts.noOfTags];
1520            for (int n = 0; n < opts.noOfTags; n++) {
1521              Tag t = new ArrayBackedTag((byte) n, tag);
1522              tags[n] = t;
1523            }
1524            KeyValue kv =
1525              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1526            put.add(kv);
1527            updateValueSize(kv.getValueLength());
1528          } else {
1529            put.addColumn(familyName, qualifier, value);
1530            updateValueSize(value.length);
1531          }
1532        }
1533      }
1534      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1535      try {
1536        table.put(put).get();
1537      } catch (ExecutionException e) {
1538        throw new IOException(e);
1539      }
1540    }
1541  }
1542
1543  static class AsyncScanTest extends AsyncTableTest {
1544    private ResultScanner testScanner;
1545    private AsyncTable<?> asyncTable;
1546
1547    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1548      super(con, options, status);
1549    }
1550
1551    @Override
1552    void onStartup() throws IOException {
1553      this.asyncTable =
1554          connection.getTable(TableName.valueOf(opts.tableName),
1555            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1556    }
1557
1558    @Override
1559    void testTakedown() throws IOException {
1560      if (this.testScanner != null) {
1561        updateScanMetrics(this.testScanner.getScanMetrics());
1562        this.testScanner.close();
1563      }
1564      super.testTakedown();
1565    }
1566
1567    @Override
1568    void testRow(final int i) throws IOException {
1569      if (this.testScanner == null) {
1570        Scan scan =
1571            new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1572                .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1573                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1574        for (int family = 0; family < opts.families; family++) {
1575          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1576          if (opts.addColumns) {
1577            for (int column = 0; column < opts.columns; column++) {
1578              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1579              scan.addColumn(familyName, qualifier);
1580            }
1581          } else {
1582            scan.addFamily(familyName);
1583          }
1584        }
1585        if (opts.filterAll) {
1586          scan.setFilter(new FilterAllFilter());
1587        }
1588        this.testScanner = asyncTable.getScanner(scan);
1589      }
1590      Result r = testScanner.next();
1591      updateValueSize(r);
1592    }
1593  }
1594
1595  static class AsyncSequentialReadTest extends AsyncTableTest {
1596    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1597      super(con, options, status);
1598    }
1599
1600    @Override
1601    void testRow(final int i) throws IOException, InterruptedException {
1602      Get get = new Get(format(i));
1603      for (int family = 0; family < opts.families; family++) {
1604        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1605        if (opts.addColumns) {
1606          for (int column = 0; column < opts.columns; column++) {
1607            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1608            get.addColumn(familyName, qualifier);
1609          }
1610        } else {
1611          get.addFamily(familyName);
1612        }
1613      }
1614      if (opts.filterAll) {
1615        get.setFilter(new FilterAllFilter());
1616      }
1617      try {
1618        updateValueSize(table.get(get).get());
1619      } catch (ExecutionException e) {
1620        throw new IOException(e);
1621      }
1622    }
1623  }
1624
1625  static class AsyncSequentialWriteTest extends AsyncTableTest {
1626    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1627      super(con, options, status);
1628    }
1629
1630    @Override
1631    void testRow(final int i) throws IOException, InterruptedException {
1632      byte[] row = format(i);
1633      Put put = new Put(row);
1634      for (int family = 0; family < opts.families; family++) {
1635        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1636        for (int column = 0; column < opts.columns; column++) {
1637          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1638          byte[] value = generateData(this.rand, getValueLength(this.rand));
1639          if (opts.useTags) {
1640            byte[] tag = generateData(this.rand, TAG_LENGTH);
1641            Tag[] tags = new Tag[opts.noOfTags];
1642            for (int n = 0; n < opts.noOfTags; n++) {
1643              Tag t = new ArrayBackedTag((byte) n, tag);
1644              tags[n] = t;
1645            }
1646            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1647              value, tags);
1648            put.add(kv);
1649            updateValueSize(kv.getValueLength());
1650          } else {
1651            put.addColumn(familyName, qualifier, value);
1652            updateValueSize(value.length);
1653          }
1654        }
1655      }
1656      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1657      try {
1658        table.put(put).get();
1659      } catch (ExecutionException e) {
1660        throw new IOException(e);
1661      }
1662    }
1663  }
1664
1665  static abstract class BufferedMutatorTest extends Test {
1666    protected BufferedMutator mutator;
1667    protected Table table;
1668
1669    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1670      super(con, options, status);
1671    }
1672
1673    @Override
1674    void onStartup() throws IOException {
1675      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1676      p.writeBufferSize(opts.bufferSize);
1677      this.mutator = connection.getBufferedMutator(p);
1678      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1679    }
1680
1681    @Override
1682    void onTakedown() throws IOException {
1683      mutator.close();
1684      table.close();
1685    }
1686  }
1687
1688  static class RandomSeekScanTest extends TableTest {
1689    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1690      super(con, options, status);
1691    }
1692
1693    @Override
1694    void testRow(final int i) throws IOException {
1695      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1696          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1697          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1698          .setScanMetricsEnabled(true);
1699      FilterList list = new FilterList();
1700      for (int family = 0; family < opts.families; family++) {
1701        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1702        if (opts.addColumns) {
1703          for (int column = 0; column < opts.columns; column++) {
1704            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1705            scan.addColumn(familyName, qualifier);
1706          }
1707        } else {
1708          scan.addFamily(familyName);
1709        }
1710      }
1711      if (opts.filterAll) {
1712        list.addFilter(new FilterAllFilter());
1713      }
1714      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1715      scan.setFilter(list);
1716      ResultScanner s = this.table.getScanner(scan);
1717      try {
1718        for (Result rr; (rr = s.next()) != null;) {
1719          updateValueSize(rr);
1720        }
1721      } finally {
1722        updateScanMetrics(s.getScanMetrics());
1723        s.close();
1724      }
1725    }
1726
1727    @Override
1728    protected int getReportingPeriod() {
1729      int period = opts.perClientRunRows / 100;
1730      return period == 0 ? opts.perClientRunRows : period;
1731    }
1732
1733  }
1734
1735  static abstract class RandomScanWithRangeTest extends TableTest {
1736    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1737      super(con, options, status);
1738    }
1739
1740    @Override
1741    void testRow(final int i) throws IOException {
1742      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1743      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1744          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1745          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1746          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1747      for (int family = 0; family < opts.families; family++) {
1748        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1749        if (opts.addColumns) {
1750          for (int column = 0; column < opts.columns; column++) {
1751            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1752            scan.addColumn(familyName, qualifier);
1753          }
1754        } else {
1755          scan.addFamily(familyName);
1756        }
1757      }
1758      if (opts.filterAll) {
1759        scan.setFilter(new FilterAllFilter());
1760      }
1761      Result r = null;
1762      int count = 0;
1763      ResultScanner s = this.table.getScanner(scan);
1764      try {
1765        for (; (r = s.next()) != null;) {
1766          updateValueSize(r);
1767          count++;
1768        }
1769        if (i % 100 == 0) {
1770          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1771            Bytes.toString(startAndStopRow.getFirst()),
1772            Bytes.toString(startAndStopRow.getSecond()), count));
1773        }
1774      } finally {
1775        updateScanMetrics(s.getScanMetrics());
1776        s.close();
1777      }
1778    }
1779
1780    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1781
1782    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1783      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1784      int stop = start + maxRange;
1785      return new Pair<>(format(start), format(stop));
1786    }
1787
1788    @Override
1789    protected int getReportingPeriod() {
1790      int period = opts.perClientRunRows / 100;
1791      return period == 0? opts.perClientRunRows: period;
1792    }
1793  }
1794
1795  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1796    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1797      super(con, options, status);
1798    }
1799
1800    @Override
1801    protected Pair<byte[], byte[]> getStartAndStopRow() {
1802      return generateStartAndStopRows(10);
1803    }
1804  }
1805
1806  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1807    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1808      super(con, options, status);
1809    }
1810
1811    @Override
1812    protected Pair<byte[], byte[]> getStartAndStopRow() {
1813      return generateStartAndStopRows(100);
1814    }
1815  }
1816
1817  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1818    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1819      super(con, options, status);
1820    }
1821
1822    @Override
1823    protected Pair<byte[], byte[]> getStartAndStopRow() {
1824      return generateStartAndStopRows(1000);
1825    }
1826  }
1827
1828  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1829    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1830      super(con, options, status);
1831    }
1832
1833    @Override
1834    protected Pair<byte[], byte[]> getStartAndStopRow() {
1835      return generateStartAndStopRows(10000);
1836    }
1837  }
1838
1839  static class RandomReadTest extends TableTest {
1840    private final Consistency consistency;
1841    private ArrayList<Get> gets;
1842    private Random rd = new Random();
1843
1844    RandomReadTest(Connection con, TestOptions options, Status status) {
1845      super(con, options, status);
1846      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1847      if (opts.multiGet > 0) {
1848        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1849        this.gets = new ArrayList<>(opts.multiGet);
1850      }
1851    }
1852
1853    @Override
1854    void testRow(final int i) throws IOException, InterruptedException {
1855      if (opts.randomSleep > 0) {
1856        Thread.sleep(rd.nextInt(opts.randomSleep));
1857      }
1858      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1859      for (int family = 0; family < opts.families; family++) {
1860        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1861        if (opts.addColumns) {
1862          for (int column = 0; column < opts.columns; column++) {
1863            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1864            get.addColumn(familyName, qualifier);
1865          }
1866        } else {
1867          get.addFamily(familyName);
1868        }
1869      }
1870      if (opts.filterAll) {
1871        get.setFilter(new FilterAllFilter());
1872      }
1873      get.setConsistency(consistency);
1874      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1875      if (opts.multiGet > 0) {
1876        this.gets.add(get);
1877        if (this.gets.size() == opts.multiGet) {
1878          Result [] rs = this.table.get(this.gets);
1879          updateValueSize(rs);
1880          this.gets.clear();
1881        }
1882      } else {
1883        updateValueSize(this.table.get(get));
1884      }
1885    }
1886
1887    @Override
1888    protected int getReportingPeriod() {
1889      int period = opts.perClientRunRows / 10;
1890      return period == 0 ? opts.perClientRunRows : period;
1891    }
1892
1893    @Override
1894    protected void testTakedown() throws IOException {
1895      if (this.gets != null && this.gets.size() > 0) {
1896        this.table.get(gets);
1897        this.gets.clear();
1898      }
1899      super.testTakedown();
1900    }
1901  }
1902
1903  static class RandomWriteTest extends BufferedMutatorTest {
1904    RandomWriteTest(Connection con, TestOptions options, Status status) {
1905      super(con, options, status);
1906    }
1907
1908    @Override
1909    void testRow(final int i) throws IOException {
1910      byte[] row = getRandomRow(this.rand, opts.totalRows);
1911      Put put = new Put(row);
1912      for (int family = 0; family < opts.families; family++) {
1913        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1914        for (int column = 0; column < opts.columns; column++) {
1915          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1916          byte[] value = generateData(this.rand, getValueLength(this.rand));
1917          if (opts.useTags) {
1918            byte[] tag = generateData(this.rand, TAG_LENGTH);
1919            Tag[] tags = new Tag[opts.noOfTags];
1920            for (int n = 0; n < opts.noOfTags; n++) {
1921              Tag t = new ArrayBackedTag((byte) n, tag);
1922              tags[n] = t;
1923            }
1924            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1925              value, tags);
1926            put.add(kv);
1927            updateValueSize(kv.getValueLength());
1928          } else {
1929            put.addColumn(familyName, qualifier, value);
1930            updateValueSize(value.length);
1931          }
1932        }
1933      }
1934      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1935      if (opts.autoFlush) {
1936        table.put(put);
1937      } else {
1938        mutator.mutate(put);
1939      }
1940    }
1941  }
1942
1943  static class ScanTest extends TableTest {
1944    private ResultScanner testScanner;
1945
1946    ScanTest(Connection con, TestOptions options, Status status) {
1947      super(con, options, status);
1948    }
1949
1950    @Override
1951    void testTakedown() throws IOException {
1952      if (this.testScanner != null) {
1953        this.testScanner.close();
1954      }
1955      super.testTakedown();
1956    }
1957
1958
1959    @Override
1960    void testRow(final int i) throws IOException {
1961      if (this.testScanner == null) {
1962        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1963            .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1964            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1965        for (int family = 0; family < opts.families; family++) {
1966          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1967          if (opts.addColumns) {
1968            for (int column = 0; column < opts.columns; column++) {
1969              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1970              scan.addColumn(familyName, qualifier);
1971            }
1972          } else {
1973            scan.addFamily(familyName);
1974          }
1975        }
1976        if (opts.filterAll) {
1977          scan.setFilter(new FilterAllFilter());
1978        }
1979        this.testScanner = table.getScanner(scan);
1980      }
1981      Result r = testScanner.next();
1982      updateValueSize(r);
1983    }
1984  }
1985
1986  /**
1987   * Base class for operations that are CAS-like; that read a value and then set it based off what
1988   * they read. In this category is increment, append, checkAndPut, etc.
1989   *
1990   * <p>These operations also want some concurrency going on. Usually when these tests run, they
1991   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
1992   * same key space. We do this with our getStartRow and getLastRow overrides.
1993   */
1994  static abstract class CASTableTest extends TableTest {
1995    private final byte [] qualifier;
1996    CASTableTest(Connection con, TestOptions options, Status status) {
1997      super(con, options, status);
1998      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
1999    }
2000
2001    byte [] getQualifier() {
2002      return this.qualifier;
2003    }
2004
2005    @Override
2006    int getStartRow() {
2007      return 0;
2008    }
2009
2010    @Override
2011    int getLastRow() {
2012      return opts.perClientRunRows;
2013    }
2014  }
2015
2016  static class IncrementTest extends CASTableTest {
2017    IncrementTest(Connection con, TestOptions options, Status status) {
2018      super(con, options, status);
2019    }
2020
2021    @Override
2022    void testRow(final int i) throws IOException {
2023      Increment increment = new Increment(format(i));
2024      // unlike checkAndXXX tests, which make most sense to do on a single value,
2025      // if multiple families are specified for an increment test we assume it is
2026      // meant to raise the work factor
2027      for (int family = 0; family < opts.families; family++) {
2028        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2029        increment.addColumn(familyName, getQualifier(), 1l);
2030      }
2031      updateValueSize(this.table.increment(increment));
2032    }
2033  }
2034
2035  static class AppendTest extends CASTableTest {
2036    AppendTest(Connection con, TestOptions options, Status status) {
2037      super(con, options, status);
2038    }
2039
2040    @Override
2041    void testRow(final int i) throws IOException {
2042      byte [] bytes = format(i);
2043      Append append = new Append(bytes);
2044      // unlike checkAndXXX tests, which make most sense to do on a single value,
2045      // if multiple families are specified for an append test we assume it is
2046      // meant to raise the work factor
2047      for (int family = 0; family < opts.families; family++) {
2048        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2049        append.addColumn(familyName, getQualifier(), bytes);
2050      }
2051      updateValueSize(this.table.append(append));
2052    }
2053  }
2054
2055  static class CheckAndMutateTest extends CASTableTest {
2056    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2057      super(con, options, status);
2058    }
2059
2060    @Override
2061    void testRow(final int i) throws IOException {
2062      final byte [] bytes = format(i);
2063      // checkAndXXX tests operate on only a single value
2064      // Put a known value so when we go to check it, it is there.
2065      Put put = new Put(bytes);
2066      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2067      this.table.put(put);
2068      RowMutations mutations = new RowMutations(bytes);
2069      mutations.add(put);
2070      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2071          .ifEquals(bytes).thenMutate(mutations);
2072    }
2073  }
2074
2075  static class CheckAndPutTest extends CASTableTest {
2076    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2077      super(con, options, status);
2078    }
2079
2080    @Override
2081    void testRow(final int i) throws IOException {
2082      final byte [] bytes = format(i);
2083      // checkAndXXX tests operate on only a single value
2084      // Put a known value so when we go to check it, it is there.
2085      Put put = new Put(bytes);
2086      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2087      this.table.put(put);
2088      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2089          .ifEquals(bytes).thenPut(put);
2090    }
2091  }
2092
2093  static class CheckAndDeleteTest extends CASTableTest {
2094    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2095      super(con, options, status);
2096    }
2097
2098    @Override
2099    void testRow(final int i) throws IOException {
2100      final byte [] bytes = format(i);
2101      // checkAndXXX tests operate on only a single value
2102      // Put a known value so when we go to check it, it is there.
2103      Put put = new Put(bytes);
2104      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2105      this.table.put(put);
2106      Delete delete = new Delete(put.getRow());
2107      delete.addColumn(FAMILY_ZERO, getQualifier());
2108      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2109          .ifEquals(bytes).thenDelete(delete);
2110    }
2111  }
2112
2113  static class SequentialReadTest extends TableTest {
2114    SequentialReadTest(Connection con, TestOptions options, Status status) {
2115      super(con, options, status);
2116    }
2117
2118    @Override
2119    void testRow(final int i) throws IOException {
2120      Get get = new Get(format(i));
2121      for (int family = 0; family < opts.families; family++) {
2122        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2123        if (opts.addColumns) {
2124          for (int column = 0; column < opts.columns; column++) {
2125            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2126            get.addColumn(familyName, qualifier);
2127          }
2128        } else {
2129          get.addFamily(familyName);
2130        }
2131      }
2132      if (opts.filterAll) {
2133        get.setFilter(new FilterAllFilter());
2134      }
2135      updateValueSize(table.get(get));
2136    }
2137  }
2138
2139  static class SequentialWriteTest extends BufferedMutatorTest {
2140    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2141      super(con, options, status);
2142    }
2143
2144    @Override
2145    void testRow(final int i) throws IOException {
2146      byte[] row = format(i);
2147      Put put = new Put(row);
2148      for (int family = 0; family < opts.families; family++) {
2149        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2150        for (int column = 0; column < opts.columns; column++) {
2151          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2152          byte[] value = generateData(this.rand, getValueLength(this.rand));
2153          if (opts.useTags) {
2154            byte[] tag = generateData(this.rand, TAG_LENGTH);
2155            Tag[] tags = new Tag[opts.noOfTags];
2156            for (int n = 0; n < opts.noOfTags; n++) {
2157              Tag t = new ArrayBackedTag((byte) n, tag);
2158              tags[n] = t;
2159            }
2160            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2161              value, tags);
2162            put.add(kv);
2163            updateValueSize(kv.getValueLength());
2164          } else {
2165            put.addColumn(familyName, qualifier, value);
2166            updateValueSize(value.length);
2167          }
2168        }
2169      }
2170      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2171      if (opts.autoFlush) {
2172        table.put(put);
2173      } else {
2174        mutator.mutate(put);
2175      }
2176    }
2177  }
2178
2179  static class FilteredScanTest extends TableTest {
2180    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2181
2182    FilteredScanTest(Connection con, TestOptions options, Status status) {
2183      super(con, options, status);
2184    }
2185
2186    @Override
2187    void testRow(int i) throws IOException {
2188      byte[] value = generateData(this.rand, getValueLength(this.rand));
2189      Scan scan = constructScan(value);
2190      ResultScanner scanner = null;
2191      try {
2192        scanner = this.table.getScanner(scan);
2193        for (Result r = null; (r = scanner.next()) != null;) {
2194          updateValueSize(r);
2195        }
2196      } finally {
2197        if (scanner != null) {
2198          updateScanMetrics(scanner.getScanMetrics());
2199          scanner.close();
2200        }
2201      }
2202    }
2203
2204    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2205      FilterList list = new FilterList();
2206      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2207        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2208      list.addFilter(filter);
2209      if (opts.filterAll) {
2210        list.addFilter(new FilterAllFilter());
2211      }
2212      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2213          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2214          .setScanMetricsEnabled(true);
2215      if (opts.addColumns) {
2216        for (int column = 0; column < opts.columns; column++) {
2217          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2218          scan.addColumn(FAMILY_ZERO, qualifier);
2219        }
2220      } else {
2221        scan.addFamily(FAMILY_ZERO);
2222      }
2223      scan.setFilter(list);
2224      return scan;
2225    }
2226  }
2227
2228  /**
2229   * Compute a throughput rate in MB/s.
2230   * @param rows Number of records consumed.
2231   * @param timeMs Time taken in milliseconds.
2232   * @return String value with label, ie '123.76 MB/s'
2233   */
2234  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2235    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2236      ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2237    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2238      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2239      .divide(BYTES_PER_MB, CXT);
2240    return FMT.format(mbps) + " MB/s";
2241  }
2242
2243  /*
2244   * Format passed integer.
2245   * @param number
2246   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2247   * number (Does absolute in case number is negative).
2248   */
2249  public static byte [] format(final int number) {
2250    byte [] b = new byte[ROW_LENGTH];
2251    int d = Math.abs(number);
2252    for (int i = b.length - 1; i >= 0; i--) {
2253      b[i] = (byte)((d % 10) + '0');
2254      d /= 10;
2255    }
2256    return b;
2257  }
2258
2259  /*
2260   * This method takes some time and is done inline uploading data.  For
2261   * example, doing the mapfile test, generation of the key and value
2262   * consumes about 30% of CPU time.
2263   * @return Generated random value to insert into a table cell.
2264   */
2265  public static byte[] generateData(final Random r, int length) {
2266    byte [] b = new byte [length];
2267    int i;
2268
2269    for(i = 0; i < (length-8); i += 8) {
2270      b[i] = (byte) (65 + r.nextInt(26));
2271      b[i+1] = b[i];
2272      b[i+2] = b[i];
2273      b[i+3] = b[i];
2274      b[i+4] = b[i];
2275      b[i+5] = b[i];
2276      b[i+6] = b[i];
2277      b[i+7] = b[i];
2278    }
2279
2280    byte a = (byte) (65 + r.nextInt(26));
2281    for(; i < length; i++) {
2282      b[i] = a;
2283    }
2284    return b;
2285  }
2286
2287  static byte [] getRandomRow(final Random random, final int totalRows) {
2288    return format(generateRandomRow(random, totalRows));
2289  }
2290
2291  static int generateRandomRow(final Random random, final int totalRows) {
2292    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2293  }
2294
2295  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2296      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2297      throws IOException, InterruptedException {
2298    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2299        + opts.perClientRunRows + " rows");
2300    long totalElapsedTime;
2301
2302    final TestBase t;
2303    try {
2304      if (AsyncTest.class.isAssignableFrom(cmd)) {
2305        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2306        Constructor<? extends AsyncTest> constructor =
2307            newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2308        t = constructor.newInstance(asyncCon, opts, status);
2309      } else {
2310        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2311        Constructor<? extends Test> constructor =
2312            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2313        t = constructor.newInstance(con, opts, status);
2314      }
2315    } catch (NoSuchMethodException e) {
2316      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2317          + ".  It does not provide a constructor as described by "
2318          + "the javadoc comment.  Available constructors are: "
2319          + Arrays.toString(cmd.getConstructors()));
2320    } catch (Exception e) {
2321      throw new IllegalStateException("Failed to construct command class", e);
2322    }
2323    totalElapsedTime = t.test();
2324
2325    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2326      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2327      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2328          getAverageValueLength(opts), opts.families, opts.columns) + ")");
2329
2330    return new RunResult(totalElapsedTime, t.getLatencyHistogram());
2331  }
2332
2333  private static int getAverageValueLength(final TestOptions opts) {
2334    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2335  }
2336
2337  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2338      InterruptedException, ClassNotFoundException, ExecutionException {
2339    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2340    // the TestOptions introspection for us and dump the output in a readable format.
2341    LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
2342    Admin admin = null;
2343    Connection connection = null;
2344    try {
2345      connection = ConnectionFactory.createConnection(getConf());
2346      admin = connection.getAdmin();
2347      checkTable(admin, opts);
2348    } finally {
2349      if (admin != null) admin.close();
2350      if (connection != null) connection.close();
2351    }
2352    if (opts.nomapred) {
2353      doLocalClients(opts, getConf());
2354    } else {
2355      doMapReduce(opts, getConf());
2356    }
2357  }
2358
2359  protected void printUsage() {
2360    printUsage(this.getClass().getName(), null);
2361  }
2362
2363  protected static void printUsage(final String message) {
2364    printUsage(PerformanceEvaluation.class.getName(), message);
2365  }
2366
2367  protected static void printUsageAndExit(final String message, final int exitCode) {
2368    printUsage(message);
2369    System.exit(exitCode);
2370  }
2371
2372  protected static void printUsage(final String className, final String message) {
2373    if (message != null && message.length() > 0) {
2374      System.err.println(message);
2375    }
2376    System.err.println("Usage: java " + className + " \\");
2377    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2378    System.err.println();
2379    System.err.println("General Options:");
2380    System.err.println(" nomapred        Run multiple clients using threads " +
2381      "(rather than use mapreduce)");
2382    System.err.println(" oneCon          all the threads share the same connection. Default: False");
2383    System.err.println(" sampleRate      Execute test on a sample of total " +
2384      "rows. Only supported by randomRead. Default: 1.0");
2385    System.err.println(" period          Report every 'period' rows: " +
2386      "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2387    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2388    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
2389      "Default: 0");
2390    System.err.println(" latency         Set to report operation latencies. Default: False");
2391    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'" +
2392        " rows have been treated. Default: 0");
2393    System.err.println(" valueSize       Pass value size to use: Default: "
2394        + DEFAULT_OPTS.getValueSize());
2395    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
2396        "'valueSize'; set on read for stats on size: Default: Not set.");
2397    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2398        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2399    System.err.println();
2400    System.err.println("Table Creation / Write Tests:");
2401    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2402    System.err.println(" rows            Rows each client runs. Default: "
2403        + DEFAULT_OPTS.getPerClientRunRows()
2404        + ".  In case of randomReads and randomSeekScans this could"
2405        + " be specified along with --size to specify the number of rows to be scanned within"
2406        + " the total range specified by the size.");
2407    System.err.println(
2408      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2409          + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2410          + " use size to specify the end range and --rows"
2411          + " specifies the number of rows within that range. " + "Default: 1.0.");
2412    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2413    System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
2414      "Default: false");
2415    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
2416        "'valueSize' in zipf form: Default: Not set.");
2417    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2418    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2419    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2420        + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2421        + "Recommended for accurate perf analysis (see guide). Default: disabled");
2422    System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
2423      "Default: false");
2424    System.err.println(" numoftags       Specify the no of tags that would be needed. " +
2425       "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2426    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2427    System.err.println(" columns         Columns to write per row. Default: 1");
2428    System.err.println(" families        Specify number of column families for the table. Default: 1");
2429    System.err.println();
2430    System.err.println("Read Tests:");
2431    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2432        + " there by not returning any thing back to the client.  Helps to check the server side"
2433        + " performance.  Uses FilterAllFilter internally. ");
2434    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
2435      "by randomRead. Default: disabled");
2436    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
2437      "inmemory as far as possible. Not guaranteed that reads are always served " +
2438      "from memory.  Default: false");
2439    System.err.println(" bloomFilter     Bloom filter type, one of "
2440        + Arrays.toString(BloomType.values()));
2441    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2442    System.err.println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2443        + "Uses the CompactingMemstore");
2444    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2445    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2446    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2447    System.err.println(" caching         Scan caching to use. Default: 30");
2448    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2449    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2450    System.err.println(" scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2451    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2452    System.err.println();
2453    System.err.println(" Note: -D properties will be applied to the conf used. ");
2454    System.err.println("  For example: ");
2455    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2456    System.err.println("   -Dmapreduce.task.timeout=60000");
2457    System.err.println();
2458    System.err.println("Command:");
2459    for (CmdDescriptor command : COMMANDS.values()) {
2460      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2461    }
2462    System.err.println();
2463    System.err.println("Args:");
2464    System.err.println(" nclients        Integer. Required. Total number of clients "
2465        + "(and HRegionServers) running. 1 <= value <= 500");
2466    System.err.println("Examples:");
2467    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2468    System.err.println(" $ hbase " + className + " sequentialWrite 1");
2469    System.err.println(" To run 10 clients doing increments over ten rows:");
2470    System.err.println(" $ hbase " + className + " --rows=10 --nomapred increment 10");
2471  }
2472
2473  /**
2474   * Parse options passed in via an arguments array. Assumes that array has been split
2475   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2476   * in the queue at the conclusion of this method call. It's up to the caller to deal
2477   * with these unrecognized arguments.
2478   */
2479  static TestOptions parseOpts(Queue<String> args) {
2480    TestOptions opts = new TestOptions();
2481
2482    String cmd = null;
2483    while ((cmd = args.poll()) != null) {
2484      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2485        // place item back onto queue so that caller knows parsing was incomplete
2486        args.add(cmd);
2487        break;
2488      }
2489
2490      final String nmr = "--nomapred";
2491      if (cmd.startsWith(nmr)) {
2492        opts.nomapred = true;
2493        continue;
2494      }
2495
2496      final String rows = "--rows=";
2497      if (cmd.startsWith(rows)) {
2498        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2499        continue;
2500      }
2501
2502      final String cycles = "--cycles=";
2503      if (cmd.startsWith(cycles)) {
2504        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2505        continue;
2506      }
2507
2508      final String sampleRate = "--sampleRate=";
2509      if (cmd.startsWith(sampleRate)) {
2510        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2511        continue;
2512      }
2513
2514      final String table = "--table=";
2515      if (cmd.startsWith(table)) {
2516        opts.tableName = cmd.substring(table.length());
2517        continue;
2518      }
2519
2520      final String startRow = "--startRow=";
2521      if (cmd.startsWith(startRow)) {
2522        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2523        continue;
2524      }
2525
2526      final String compress = "--compress=";
2527      if (cmd.startsWith(compress)) {
2528        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2529        continue;
2530      }
2531
2532      final String traceRate = "--traceRate=";
2533      if (cmd.startsWith(traceRate)) {
2534        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2535        continue;
2536      }
2537
2538      final String blockEncoding = "--blockEncoding=";
2539      if (cmd.startsWith(blockEncoding)) {
2540        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2541        continue;
2542      }
2543
2544      final String flushCommits = "--flushCommits=";
2545      if (cmd.startsWith(flushCommits)) {
2546        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2547        continue;
2548      }
2549
2550      final String writeToWAL = "--writeToWAL=";
2551      if (cmd.startsWith(writeToWAL)) {
2552        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2553        continue;
2554      }
2555
2556      final String presplit = "--presplit=";
2557      if (cmd.startsWith(presplit)) {
2558        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2559        continue;
2560      }
2561
2562      final String inMemory = "--inmemory=";
2563      if (cmd.startsWith(inMemory)) {
2564        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2565        continue;
2566      }
2567
2568      final String autoFlush = "--autoFlush=";
2569      if (cmd.startsWith(autoFlush)) {
2570        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2571        continue;
2572      }
2573
2574      final String onceCon = "--oneCon=";
2575      if (cmd.startsWith(onceCon)) {
2576        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2577        continue;
2578      }
2579
2580      final String latency = "--latency";
2581      if (cmd.startsWith(latency)) {
2582        opts.reportLatency = true;
2583        continue;
2584      }
2585
2586      final String multiGet = "--multiGet=";
2587      if (cmd.startsWith(multiGet)) {
2588        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2589        continue;
2590      }
2591
2592      final String useTags = "--usetags=";
2593      if (cmd.startsWith(useTags)) {
2594        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2595        continue;
2596      }
2597
2598      final String noOfTags = "--numoftags=";
2599      if (cmd.startsWith(noOfTags)) {
2600        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2601        continue;
2602      }
2603
2604      final String replicas = "--replicas=";
2605      if (cmd.startsWith(replicas)) {
2606        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2607        continue;
2608      }
2609
2610      final String filterOutAll = "--filterAll";
2611      if (cmd.startsWith(filterOutAll)) {
2612        opts.filterAll = true;
2613        continue;
2614      }
2615
2616      final String size = "--size=";
2617      if (cmd.startsWith(size)) {
2618        opts.size = Float.parseFloat(cmd.substring(size.length()));
2619        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2620        continue;
2621      }
2622
2623      final String splitPolicy = "--splitPolicy=";
2624      if (cmd.startsWith(splitPolicy)) {
2625        opts.splitPolicy = cmd.substring(splitPolicy.length());
2626        continue;
2627      }
2628
2629      final String randomSleep = "--randomSleep=";
2630      if (cmd.startsWith(randomSleep)) {
2631        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2632        continue;
2633      }
2634
2635      final String measureAfter = "--measureAfter=";
2636      if (cmd.startsWith(measureAfter)) {
2637        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2638        continue;
2639      }
2640
2641      final String bloomFilter = "--bloomFilter=";
2642      if (cmd.startsWith(bloomFilter)) {
2643        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2644        continue;
2645      }
2646
2647      final String blockSize = "--blockSize=";
2648      if(cmd.startsWith(blockSize) ) {
2649        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2650        continue;
2651      }
2652
2653      final String valueSize = "--valueSize=";
2654      if (cmd.startsWith(valueSize)) {
2655        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2656        continue;
2657      }
2658
2659      final String valueRandom = "--valueRandom";
2660      if (cmd.startsWith(valueRandom)) {
2661        opts.valueRandom = true;
2662        if (opts.valueZipf) {
2663          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2664        }
2665        continue;
2666      }
2667
2668      final String valueZipf = "--valueZipf";
2669      if (cmd.startsWith(valueZipf)) {
2670        opts.valueZipf = true;
2671        if (opts.valueRandom) {
2672          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2673        }
2674        continue;
2675      }
2676
2677      final String period = "--period=";
2678      if (cmd.startsWith(period)) {
2679        opts.period = Integer.parseInt(cmd.substring(period.length()));
2680        continue;
2681      }
2682
2683      final String addColumns = "--addColumns=";
2684      if (cmd.startsWith(addColumns)) {
2685        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2686        continue;
2687      }
2688
2689      final String inMemoryCompaction = "--inmemoryCompaction=";
2690      if (cmd.startsWith(inMemoryCompaction)) {
2691        opts.inMemoryCompaction =
2692            MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2693        continue;
2694      }
2695
2696      final String columns = "--columns=";
2697      if (cmd.startsWith(columns)) {
2698        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2699        continue;
2700      }
2701
2702      final String families = "--families=";
2703      if (cmd.startsWith(families)) {
2704        opts.families = Integer.parseInt(cmd.substring(families.length()));
2705        continue;
2706      }
2707
2708      final String caching = "--caching=";
2709      if (cmd.startsWith(caching)) {
2710        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2711        continue;
2712      }
2713
2714      final String asyncPrefetch = "--asyncPrefetch";
2715      if (cmd.startsWith(asyncPrefetch)) {
2716        opts.asyncPrefetch = true;
2717        continue;
2718      }
2719
2720      final String cacheBlocks = "--cacheBlocks=";
2721      if (cmd.startsWith(cacheBlocks)) {
2722        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2723        continue;
2724      }
2725
2726      final String scanReadType = "--scanReadType=";
2727      if (cmd.startsWith(scanReadType)) {
2728        opts.scanReadType =
2729            Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2730        continue;
2731      }
2732
2733      final String bufferSize = "--bufferSize=";
2734      if (cmd.startsWith(bufferSize)) {
2735        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2736        continue;
2737      }
2738
2739      if (isCommandClass(cmd)) {
2740        opts.cmdName = cmd;
2741        try {
2742          opts.numClientThreads = Integer.parseInt(args.remove());
2743        } catch (NoSuchElementException | NumberFormatException e) {
2744          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2745        }
2746        opts = calculateRowsAndSize(opts);
2747        break;
2748      } else {
2749        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2750      }
2751
2752      // Not matching any option or command.
2753      System.err.println("Error: Wrong option or command: " + cmd);
2754      args.add(cmd);
2755      break;
2756    }
2757    return opts;
2758  }
2759
2760  static TestOptions calculateRowsAndSize(final TestOptions opts) {
2761    int rowsPerGB = getRowsPerGB(opts);
2762    if ((opts.getCmdName() != null
2763        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
2764        && opts.size != DEFAULT_OPTS.size
2765        && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2766      opts.totalRows = (int) opts.size * rowsPerGB;
2767    } else if (opts.size != DEFAULT_OPTS.size) {
2768      // total size in GB specified
2769      opts.totalRows = (int) opts.size * rowsPerGB;
2770      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2771    } else {
2772      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2773      opts.size = opts.totalRows / rowsPerGB;
2774    }
2775    return opts;
2776  }
2777
2778  static int getRowsPerGB(final TestOptions opts) {
2779    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
2780        opts.getColumns());
2781  }
2782
2783  @Override
2784  public int run(String[] args) throws Exception {
2785    // Process command-line args. TODO: Better cmd-line processing
2786    // (but hopefully something not as painful as cli options).
2787    int errCode = -1;
2788    if (args.length < 1) {
2789      printUsage();
2790      return errCode;
2791    }
2792
2793    try {
2794      LinkedList<String> argv = new LinkedList<>();
2795      argv.addAll(Arrays.asList(args));
2796      TestOptions opts = parseOpts(argv);
2797
2798      // args remaining, print help and exit
2799      if (!argv.isEmpty()) {
2800        errCode = 0;
2801        printUsage();
2802        return errCode;
2803      }
2804
2805      // must run at least 1 client
2806      if (opts.numClientThreads <= 0) {
2807        throw new IllegalArgumentException("Number of clients must be > 0");
2808      }
2809
2810      // cmdName should not be null, print help and exit
2811      if (opts.cmdName == null) {
2812        printUsage();
2813        return errCode;
2814      }
2815
2816      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
2817      if (cmdClass != null) {
2818        runTest(cmdClass, opts);
2819        errCode = 0;
2820      }
2821
2822    } catch (Exception e) {
2823      e.printStackTrace();
2824    }
2825
2826    return errCode;
2827  }
2828
2829  private static boolean isCommandClass(String cmd) {
2830    return COMMANDS.containsKey(cmd);
2831  }
2832
2833  private static Class<? extends TestBase> determineCommandClass(String cmd) {
2834    CmdDescriptor descriptor = COMMANDS.get(cmd);
2835    return descriptor != null ? descriptor.getCmdClass() : null;
2836  }
2837
2838  public static void main(final String[] args) throws Exception {
2839    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2840    System.exit(res);
2841  }
2842}