001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import com.codahale.metrics.Histogram;
022import com.codahale.metrics.UniformReservoir;
023import com.fasterxml.jackson.databind.MapperFeature;
024import com.fasterxml.jackson.databind.ObjectMapper;
025
026import java.io.IOException;
027import java.io.PrintStream;
028import java.lang.reflect.Constructor;
029import java.math.BigDecimal;
030import java.math.MathContext;
031import java.text.DecimalFormat;
032import java.text.SimpleDateFormat;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Date;
036import java.util.LinkedList;
037import java.util.Locale;
038import java.util.Map;
039import java.util.NoSuchElementException;
040import java.util.Queue;
041import java.util.Random;
042import java.util.TreeMap;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048
049import org.apache.commons.lang3.StringUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.conf.Configured;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.Append;
056import org.apache.hadoop.hbase.client.AsyncConnection;
057import org.apache.hadoop.hbase.client.AsyncTable;
058import org.apache.hadoop.hbase.client.BufferedMutator;
059import org.apache.hadoop.hbase.client.BufferedMutatorParams;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Consistency;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.ResultScanner;
070import org.apache.hadoop.hbase.client.RowMutations;
071import org.apache.hadoop.hbase.client.Scan;
072import org.apache.hadoop.hbase.client.Table;
073import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
074import org.apache.hadoop.hbase.filter.BinaryComparator;
075import org.apache.hadoop.hbase.filter.Filter;
076import org.apache.hadoop.hbase.filter.FilterAllFilter;
077import org.apache.hadoop.hbase.filter.FilterList;
078import org.apache.hadoop.hbase.filter.PageFilter;
079import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
080import org.apache.hadoop.hbase.filter.WhileMatchFilter;
081import org.apache.hadoop.hbase.io.compress.Compression;
082import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
083import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
084import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
085import org.apache.hadoop.hbase.regionserver.BloomType;
086import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
087import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
088import org.apache.hadoop.hbase.trace.SpanReceiverHost;
089import org.apache.hadoop.hbase.trace.TraceUtil;
090import org.apache.hadoop.hbase.util.ByteArrayHashKey;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.Hash;
093import org.apache.hadoop.hbase.util.MurmurHash;
094import org.apache.hadoop.hbase.util.Pair;
095import org.apache.hadoop.hbase.util.YammerHistogramUtils;
096import org.apache.hadoop.io.LongWritable;
097import org.apache.hadoop.io.Text;
098import org.apache.hadoop.mapreduce.Job;
099import org.apache.hadoop.mapreduce.Mapper;
100import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
101import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
102import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
103import org.apache.hadoop.util.Tool;
104import org.apache.hadoop.util.ToolRunner;
105import org.apache.htrace.core.ProbabilitySampler;
106import org.apache.htrace.core.Sampler;
107import org.apache.htrace.core.TraceScope;
108import org.apache.yetus.audience.InterfaceAudience;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
113
114/**
115 * Script used evaluating HBase performance and scalability.  Runs a HBase
116 * client that steps through one of a set of hardcoded tests or 'experiments'
117 * (e.g. a random reads test, a random writes test, etc.). Pass on the
118 * command-line which test to run and how many clients are participating in
119 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
120 *
121 * <p>This class sets up and runs the evaluation programs described in
122 * Section 7, <i>Performance Evaluation</i>, of the <a
123 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
124 * paper, pages 8-10.
125 *
126 * <p>By default, runs as a mapreduce job where each mapper runs a single test
127 * client. Can also run as a non-mapreduce, multithreaded application by
128 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
129 * specified otherwise.
130 */
131@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
132public class PerformanceEvaluation extends Configured implements Tool {
133  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
134  static final String RANDOM_READ = "randomRead";
135  static final String PE_COMMAND_SHORTNAME = "pe";
136  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
137  private static final ObjectMapper MAPPER = new ObjectMapper();
138  static {
139    MAPPER.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
140  }
141
142  public static final String TABLE_NAME = "TestTable";
143  public static final String FAMILY_NAME_BASE = "info";
144  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
145  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
146  public static final int DEFAULT_VALUE_LENGTH = 1000;
147  public static final int ROW_LENGTH = 26;
148
149  private static final int ONE_GB = 1024 * 1024 * 1000;
150  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
151  // TODO : should we make this configurable
152  private static final int TAG_LENGTH = 256;
153  private static final DecimalFormat FMT = new DecimalFormat("0.##");
154  private static final MathContext CXT = MathContext.DECIMAL64;
155  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
156  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
157  private static final TestOptions DEFAULT_OPTS = new TestOptions();
158
159  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
160  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
161
162  static {
163    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
164        "Run async random read test");
165    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
166        "Run async random write test");
167    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
168        "Run async sequential read test");
169    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
170        "Run async sequential write test");
171    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
172        "Run async scan test (read every row)");
173    addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
174      "Run random read test");
175    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
176      "Run random seek and scan 100 test");
177    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
178      "Run random seek scan with both start and stop row (max 10 rows)");
179    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
180      "Run random seek scan with both start and stop row (max 100 rows)");
181    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
182      "Run random seek scan with both start and stop row (max 1000 rows)");
183    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
184      "Run random seek scan with both start and stop row (max 10000 rows)");
185    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
186      "Run random write test");
187    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
188      "Run sequential read test");
189    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
190      "Run sequential write test");
191    addCommandDescriptor(ScanTest.class, "scan",
192      "Run scan test (read every row)");
193    addCommandDescriptor(FilteredScanTest.class, "filterScan",
194      "Run scan test using a filter to find a specific row based on it's value " +
195      "(make sure to use --rows=20)");
196    addCommandDescriptor(IncrementTest.class, "increment",
197      "Increment on each row; clients overlap on keyspace so some concurrent operations");
198    addCommandDescriptor(AppendTest.class, "append",
199      "Append on each row; clients overlap on keyspace so some concurrent operations");
200    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
201      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
202    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
203      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
204    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
205      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
206  }
207
208  /**
209   * Enum for map metrics.  Keep it out here rather than inside in the Map
210   * inner-class so we can find associated properties.
211   */
212  protected static enum Counter {
213    /** elapsed time */
214    ELAPSED_TIME,
215    /** number of rows */
216    ROWS
217  }
218
219  protected static class RunResult implements Comparable<RunResult> {
220    public RunResult(long duration, Histogram hist) {
221      this.duration = duration;
222      this.hist = hist;
223    }
224
225    public final long duration;
226    public final Histogram hist;
227
228    @Override
229    public String toString() {
230      return Long.toString(duration);
231    }
232
233    @Override public int compareTo(RunResult o) {
234      return Long.compare(this.duration, o.duration);
235    }
236  }
237
238  /**
239   * Constructor
240   * @param conf Configuration object
241   */
242  public PerformanceEvaluation(final Configuration conf) {
243    super(conf);
244  }
245
246  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
247      String name, String description) {
248    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
249    COMMANDS.put(name, cmdDescriptor);
250  }
251
252  /**
253   * Implementations can have their status set.
254   */
255  interface Status {
256    /**
257     * Sets status
258     * @param msg status message
259     * @throws IOException
260     */
261    void setStatus(final String msg) throws IOException;
262  }
263
264  /**
265   * MapReduce job that runs a performance evaluation client in each map task.
266   */
267  public static class EvaluationMapTask
268      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
269
270    /** configuration parameter name that contains the command */
271    public final static String CMD_KEY = "EvaluationMapTask.command";
272    /** configuration parameter name that contains the PE impl */
273    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
274
275    private Class<? extends Test> cmd;
276
277    @Override
278    protected void setup(Context context) throws IOException, InterruptedException {
279      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
280
281      // this is required so that extensions of PE are instantiated within the
282      // map reduce task...
283      Class<? extends PerformanceEvaluation> peClass =
284          forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
285      try {
286        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
287      } catch (Exception e) {
288        throw new IllegalStateException("Could not instantiate PE instance", e);
289      }
290    }
291
292    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
293      try {
294        return Class.forName(className).asSubclass(type);
295      } catch (ClassNotFoundException e) {
296        throw new IllegalStateException("Could not find class for name: " + className, e);
297      }
298    }
299
300    @Override
301    protected void map(LongWritable key, Text value, final Context context)
302           throws IOException, InterruptedException {
303
304      Status status = new Status() {
305        @Override
306        public void setStatus(String msg) {
307           context.setStatus(msg);
308        }
309      };
310
311      ObjectMapper mapper = new ObjectMapper();
312      TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
313      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
314      final Connection con = ConnectionFactory.createConnection(conf);
315      AsyncConnection asyncCon = null;
316      try {
317        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
318      } catch (ExecutionException e) {
319        throw new IOException(e);
320      }
321
322      // Evaluation task
323      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
324      // Collect how much time the thing took. Report as map output and
325      // to the ELAPSED_TIME counter.
326      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
327      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
328      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
329      context.progress();
330    }
331  }
332
333  /*
334   * If table does not already exist, create. Also create a table when
335   * {@code opts.presplitRegions} is specified or when the existing table's
336   * region replica count doesn't match {@code opts.replicas}.
337   */
338  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
339    TableName tableName = TableName.valueOf(opts.tableName);
340    boolean needsDelete = false, exists = admin.tableExists(tableName);
341    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
342      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
343    if (!exists && isReadCmd) {
344      throw new IllegalStateException(
345        "Must specify an existing table for read commands. Run a write command first.");
346    }
347    HTableDescriptor desc =
348      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
349    byte[][] splits = getSplits(opts);
350
351    // recreate the table when user has requested presplit or when existing
352    // {RegionSplitPolicy,replica count} does not match requested, or when the
353    // number of column families does not match requested.
354    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
355      || (!isReadCmd && desc != null &&
356          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
357      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
358      || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
359      needsDelete = true;
360      // wait, why did it delete my table?!?
361      LOG.debug(MoreObjects.toStringHelper("needsDelete")
362        .add("needsDelete", needsDelete)
363        .add("isReadCmd", isReadCmd)
364        .add("exists", exists)
365        .add("desc", desc)
366        .add("presplit", opts.presplitRegions)
367        .add("splitPolicy", opts.splitPolicy)
368        .add("replicas", opts.replicas)
369        .add("families", opts.families)
370        .toString());
371    }
372
373    // remove an existing table
374    if (needsDelete) {
375      if (admin.isTableEnabled(tableName)) {
376        admin.disableTable(tableName);
377      }
378      admin.deleteTable(tableName);
379    }
380
381    // table creation is necessary
382    if (!exists || needsDelete) {
383      desc = getTableDescriptor(opts);
384      if (splits != null) {
385        if (LOG.isDebugEnabled()) {
386          for (int i = 0; i < splits.length; i++) {
387            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
388          }
389        }
390      }
391      admin.createTable(desc, splits);
392      LOG.info("Table " + desc + " created");
393    }
394    return admin.tableExists(tableName);
395  }
396
397  /**
398   * Create an HTableDescriptor from provided TestOptions.
399   */
400  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
401    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
402    for (int family = 0; family < opts.families; family++) {
403      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
404      HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
405      familyDesc.setDataBlockEncoding(opts.blockEncoding);
406      familyDesc.setCompressionType(opts.compression);
407      familyDesc.setBloomFilterType(opts.bloomType);
408      familyDesc.setBlocksize(opts.blockSize);
409      if (opts.inMemoryCF) {
410        familyDesc.setInMemory(true);
411      }
412      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
413      tableDesc.addFamily(familyDesc);
414    }
415    if (opts.replicas != DEFAULT_OPTS.replicas) {
416      tableDesc.setRegionReplication(opts.replicas);
417    }
418    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
419      tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
420    }
421    return tableDesc;
422  }
423
424  /**
425   * generates splits based on total number of rows and specified split regions
426   */
427  protected static byte[][] getSplits(TestOptions opts) {
428    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
429      return null;
430
431    int numSplitPoints = opts.presplitRegions - 1;
432    byte[][] splits = new byte[numSplitPoints][];
433    int jump = opts.totalRows / opts.presplitRegions;
434    for (int i = 0; i < numSplitPoints; i++) {
435      int rowkey = jump * (1 + i);
436      splits[i] = format(rowkey);
437    }
438    return splits;
439  }
440
441  static void setupConnectionCount(final TestOptions opts) {
442    if (opts.oneCon) {
443      opts.connCount = 1;
444    } else {
445      if (opts.connCount == -1) {
446        // set to thread number if connCount is not set
447        opts.connCount = opts.numClientThreads;
448      }
449    }
450  }
451
452  /*
453   * Run all clients in this vm each to its own thread.
454   */
455  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
456      throws IOException, InterruptedException, ExecutionException {
457    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
458    assert cmd != null;
459    @SuppressWarnings("unchecked")
460    Future<RunResult>[] threads = new Future[opts.numClientThreads];
461    RunResult[] results = new RunResult[opts.numClientThreads];
462    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
463      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
464    setupConnectionCount(opts);
465    final Connection[] cons = new Connection[opts.connCount];
466    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
467    for (int i = 0; i < opts.connCount; i++) {
468      cons[i] = ConnectionFactory.createConnection(conf);
469      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
470    }
471    LOG.info("Created " + opts.connCount + " connections for " +
472        opts.numClientThreads + " threads");
473    for (int i = 0; i < threads.length; i++) {
474      final int index = i;
475      threads[i] = pool.submit(new Callable<RunResult>() {
476        @Override
477        public RunResult call() throws Exception {
478          TestOptions threadOpts = new TestOptions(opts);
479          final Connection con = cons[index % cons.length];
480          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
481          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
482          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
483            @Override
484            public void setStatus(final String msg) throws IOException {
485              LOG.info(msg);
486            }
487          });
488          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
489            "ms over " + threadOpts.perClientRunRows + " rows");
490          return run;
491        }
492      });
493    }
494    pool.shutdown();
495
496    for (int i = 0; i < threads.length; i++) {
497      try {
498        results[i] = threads[i].get();
499      } catch (ExecutionException e) {
500        throw new IOException(e.getCause());
501      }
502    }
503    final String test = cmd.getSimpleName();
504    LOG.info("[" + test + "] Summary of timings (ms): "
505             + Arrays.toString(results));
506    Arrays.sort(results);
507    long total = 0;
508    float avgLatency = 0 ;
509    float avgTPS = 0;
510    for (RunResult result : results) {
511      total += result.duration;
512      avgLatency += result.hist.getSnapshot().getMean();
513      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
514    }
515    avgTPS *= 1000; // ms to second
516    avgLatency = avgLatency / results.length;
517    LOG.info("[" + test + " duration ]"
518      + "\tMin: " + results[0] + "ms"
519      + "\tMax: " + results[results.length - 1] + "ms"
520      + "\tAvg: " + (total / results.length) + "ms");
521    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
522    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
523    for (int i = 0; i < opts.connCount; i++) {
524      cons[i].close();
525      asyncCons[i].close();
526    }
527
528
529    return results;
530  }
531
532  /*
533   * Run a mapreduce job.  Run as many maps as asked-for clients.
534   * Before we start up the job, write out an input file with instruction
535   * per client regards which row they are to start on.
536   * @param cmd Command to run.
537   * @throws IOException
538   */
539  static Job doMapReduce(TestOptions opts, final Configuration conf)
540      throws IOException, InterruptedException, ClassNotFoundException {
541    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
542    assert cmd != null;
543    Path inputDir = writeInputFile(conf, opts);
544    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
545    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
546    Job job = Job.getInstance(conf);
547    job.setJarByClass(PerformanceEvaluation.class);
548    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
549
550    job.setInputFormatClass(NLineInputFormat.class);
551    NLineInputFormat.setInputPaths(job, inputDir);
552    // this is default, but be explicit about it just in case.
553    NLineInputFormat.setNumLinesPerSplit(job, 1);
554
555    job.setOutputKeyClass(LongWritable.class);
556    job.setOutputValueClass(LongWritable.class);
557
558    job.setMapperClass(EvaluationMapTask.class);
559    job.setReducerClass(LongSumReducer.class);
560
561    job.setNumReduceTasks(1);
562
563    job.setOutputFormatClass(TextOutputFormat.class);
564    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
565
566    TableMapReduceUtil.addDependencyJars(job);
567    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
568      Histogram.class,     // yammer metrics
569      ObjectMapper.class,  // jackson-mapper-asl
570      FilterAllFilter.class // hbase-server tests jar
571      );
572
573    TableMapReduceUtil.initCredentials(job);
574
575    job.waitForCompletion(true);
576    return job;
577  }
578
579  /**
580   * Each client has one mapper to do the work,  and client do the resulting count in a map task.
581   */
582
583  static String JOB_INPUT_FILENAME = "input.txt";
584
585  /*
586   * Write input file of offsets-per-client for the mapreduce job.
587   * @param c Configuration
588   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
589   * @throws IOException
590   */
591  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
592    return writeInputFile(c, opts, new Path("."));
593  }
594
595  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
596  throws IOException {
597    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
598    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
599    Path inputDir = new Path(jobdir, "inputs");
600
601    FileSystem fs = FileSystem.get(c);
602    fs.mkdirs(inputDir);
603
604    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
605    PrintStream out = new PrintStream(fs.create(inputFile));
606    // Make input random.
607    Map<Integer, String> m = new TreeMap<>();
608    Hash h = MurmurHash.getInstance();
609    int perClientRows = (opts.totalRows / opts.numClientThreads);
610    try {
611      for (int j = 0; j < opts.numClientThreads; j++) {
612        TestOptions next = new TestOptions(opts);
613        next.startRow = j * perClientRows;
614        next.perClientRunRows = perClientRows;
615        String s = MAPPER.writeValueAsString(next);
616        LOG.info("Client=" + j + ", input=" + s);
617        byte[] b = Bytes.toBytes(s);
618        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
619        m.put(hash, s);
620      }
621      for (Map.Entry<Integer, String> e: m.entrySet()) {
622        out.println(e.getValue());
623      }
624    } finally {
625      out.close();
626    }
627    return inputDir;
628  }
629
630  /**
631   * Describes a command.
632   */
633  static class CmdDescriptor {
634    private Class<? extends TestBase> cmdClass;
635    private String name;
636    private String description;
637
638    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
639      this.cmdClass = cmdClass;
640      this.name = name;
641      this.description = description;
642    }
643
644    public Class<? extends TestBase> getCmdClass() {
645      return cmdClass;
646    }
647
648    public String getName() {
649      return name;
650    }
651
652    public String getDescription() {
653      return description;
654    }
655  }
656
657  /**
658   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
659   * This makes tracking all these arguments a little easier.
660   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
661   * serialization of this TestOptions class behave), and you need to add to the clone constructor
662   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
663   */
664  static class TestOptions {
665    String cmdName = null;
666    boolean nomapred = false;
667    boolean filterAll = false;
668    int startRow = 0;
669    float size = 1.0f;
670    int perClientRunRows = DEFAULT_ROWS_PER_GB;
671    int numClientThreads = 1;
672    int totalRows = DEFAULT_ROWS_PER_GB;
673    int measureAfter = 0;
674    float sampleRate = 1.0f;
675    double traceRate = 0.0;
676    String tableName = TABLE_NAME;
677    boolean flushCommits = true;
678    boolean writeToWAL = true;
679    boolean autoFlush = false;
680    boolean oneCon = false;
681    int connCount = -1; //wil decide the actual num later
682    boolean useTags = false;
683    int noOfTags = 1;
684    boolean reportLatency = false;
685    int multiGet = 0;
686    int multiPut = 0;
687    int randomSleep = 0;
688    boolean inMemoryCF = false;
689    int presplitRegions = 0;
690    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
691    String splitPolicy = null;
692    Compression.Algorithm compression = Compression.Algorithm.NONE;
693    BloomType bloomType = BloomType.ROW;
694    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
695    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
696    boolean valueRandom = false;
697    boolean valueZipf = false;
698    int valueSize = DEFAULT_VALUE_LENGTH;
699    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
700    int cycles = 1;
701    int columns = 1;
702    int families = 1;
703    int caching = 30;
704    boolean addColumns = true;
705    MemoryCompactionPolicy inMemoryCompaction =
706        MemoryCompactionPolicy.valueOf(
707            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
708    boolean asyncPrefetch = false;
709    boolean cacheBlocks = true;
710    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
711    long bufferSize = 2l * 1024l * 1024l;
712
713    public TestOptions() {}
714
715    /**
716     * Clone constructor.
717     * @param that Object to copy from.
718     */
719    public TestOptions(TestOptions that) {
720      this.cmdName = that.cmdName;
721      this.cycles = that.cycles;
722      this.nomapred = that.nomapred;
723      this.startRow = that.startRow;
724      this.size = that.size;
725      this.perClientRunRows = that.perClientRunRows;
726      this.numClientThreads = that.numClientThreads;
727      this.totalRows = that.totalRows;
728      this.sampleRate = that.sampleRate;
729      this.traceRate = that.traceRate;
730      this.tableName = that.tableName;
731      this.flushCommits = that.flushCommits;
732      this.writeToWAL = that.writeToWAL;
733      this.autoFlush = that.autoFlush;
734      this.oneCon = that.oneCon;
735      this.connCount = that.connCount;
736      this.useTags = that.useTags;
737      this.noOfTags = that.noOfTags;
738      this.reportLatency = that.reportLatency;
739      this.multiGet = that.multiGet;
740      this.multiPut = that.multiPut;
741      this.inMemoryCF = that.inMemoryCF;
742      this.presplitRegions = that.presplitRegions;
743      this.replicas = that.replicas;
744      this.splitPolicy = that.splitPolicy;
745      this.compression = that.compression;
746      this.blockEncoding = that.blockEncoding;
747      this.filterAll = that.filterAll;
748      this.bloomType = that.bloomType;
749      this.blockSize = that.blockSize;
750      this.valueRandom = that.valueRandom;
751      this.valueZipf = that.valueZipf;
752      this.valueSize = that.valueSize;
753      this.period = that.period;
754      this.randomSleep = that.randomSleep;
755      this.measureAfter = that.measureAfter;
756      this.addColumns = that.addColumns;
757      this.columns = that.columns;
758      this.families = that.families;
759      this.caching = that.caching;
760      this.inMemoryCompaction = that.inMemoryCompaction;
761      this.asyncPrefetch = that.asyncPrefetch;
762      this.cacheBlocks = that.cacheBlocks;
763      this.scanReadType = that.scanReadType;
764      this.bufferSize = that.bufferSize;
765    }
766
767    public int getCaching() {
768      return this.caching;
769    }
770
771    public void setCaching(final int caching) {
772      this.caching = caching;
773    }
774
775    public int getColumns() {
776      return this.columns;
777    }
778
779    public void setColumns(final int columns) {
780      this.columns = columns;
781    }
782
783    public int getFamilies() {
784      return this.families;
785    }
786
787    public void setFamilies(final int families) {
788      this.families = families;
789    }
790
791    public int getCycles() {
792      return this.cycles;
793    }
794
795    public void setCycles(final int cycles) {
796      this.cycles = cycles;
797    }
798
799    public boolean isValueZipf() {
800      return valueZipf;
801    }
802
803    public void setValueZipf(boolean valueZipf) {
804      this.valueZipf = valueZipf;
805    }
806
807    public String getCmdName() {
808      return cmdName;
809    }
810
811    public void setCmdName(String cmdName) {
812      this.cmdName = cmdName;
813    }
814
815    public int getRandomSleep() {
816      return randomSleep;
817    }
818
819    public void setRandomSleep(int randomSleep) {
820      this.randomSleep = randomSleep;
821    }
822
823    public int getReplicas() {
824      return replicas;
825    }
826
827    public void setReplicas(int replicas) {
828      this.replicas = replicas;
829    }
830
831    public String getSplitPolicy() {
832      return splitPolicy;
833    }
834
835    public void setSplitPolicy(String splitPolicy) {
836      this.splitPolicy = splitPolicy;
837    }
838
839    public void setNomapred(boolean nomapred) {
840      this.nomapred = nomapred;
841    }
842
843    public void setFilterAll(boolean filterAll) {
844      this.filterAll = filterAll;
845    }
846
847    public void setStartRow(int startRow) {
848      this.startRow = startRow;
849    }
850
851    public void setSize(float size) {
852      this.size = size;
853    }
854
855    public void setPerClientRunRows(int perClientRunRows) {
856      this.perClientRunRows = perClientRunRows;
857    }
858
859    public void setNumClientThreads(int numClientThreads) {
860      this.numClientThreads = numClientThreads;
861    }
862
863    public void setTotalRows(int totalRows) {
864      this.totalRows = totalRows;
865    }
866
867    public void setSampleRate(float sampleRate) {
868      this.sampleRate = sampleRate;
869    }
870
871    public void setTraceRate(double traceRate) {
872      this.traceRate = traceRate;
873    }
874
875    public void setTableName(String tableName) {
876      this.tableName = tableName;
877    }
878
879    public void setFlushCommits(boolean flushCommits) {
880      this.flushCommits = flushCommits;
881    }
882
883    public void setWriteToWAL(boolean writeToWAL) {
884      this.writeToWAL = writeToWAL;
885    }
886
887    public void setAutoFlush(boolean autoFlush) {
888      this.autoFlush = autoFlush;
889    }
890
891    public void setOneCon(boolean oneCon) {
892      this.oneCon = oneCon;
893    }
894
895    public int getConnCount() {
896      return connCount;
897    }
898
899    public void setConnCount(int connCount) {
900      this.connCount = connCount;
901    }
902
903    public void setUseTags(boolean useTags) {
904      this.useTags = useTags;
905    }
906
907    public void setNoOfTags(int noOfTags) {
908      this.noOfTags = noOfTags;
909    }
910
911    public void setReportLatency(boolean reportLatency) {
912      this.reportLatency = reportLatency;
913    }
914
915    public void setMultiGet(int multiGet) {
916      this.multiGet = multiGet;
917    }
918
919    public void setMultiPut(int multiPut) {
920      this.multiPut = multiPut;
921    }
922
923    public void setInMemoryCF(boolean inMemoryCF) {
924      this.inMemoryCF = inMemoryCF;
925    }
926
927    public void setPresplitRegions(int presplitRegions) {
928      this.presplitRegions = presplitRegions;
929    }
930
931    public void setCompression(Compression.Algorithm compression) {
932      this.compression = compression;
933    }
934
935    public void setBloomType(BloomType bloomType) {
936      this.bloomType = bloomType;
937    }
938
939    public void setBlockSize(int blockSize) {
940      this.blockSize = blockSize;
941    }
942
943    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
944      this.blockEncoding = blockEncoding;
945    }
946
947    public void setValueRandom(boolean valueRandom) {
948      this.valueRandom = valueRandom;
949    }
950
951    public void setValueSize(int valueSize) {
952      this.valueSize = valueSize;
953    }
954
955    public void setBufferSize(long bufferSize) {
956      this.bufferSize = bufferSize;
957    }
958
959    public void setPeriod(int period) {
960      this.period = period;
961    }
962
963    public boolean isNomapred() {
964      return nomapred;
965    }
966
967    public boolean isFilterAll() {
968      return filterAll;
969    }
970
971    public int getStartRow() {
972      return startRow;
973    }
974
975    public float getSize() {
976      return size;
977    }
978
979    public int getPerClientRunRows() {
980      return perClientRunRows;
981    }
982
983    public int getNumClientThreads() {
984      return numClientThreads;
985    }
986
987    public int getTotalRows() {
988      return totalRows;
989    }
990
991    public float getSampleRate() {
992      return sampleRate;
993    }
994
995    public double getTraceRate() {
996      return traceRate;
997    }
998
999    public String getTableName() {
1000      return tableName;
1001    }
1002
1003    public boolean isFlushCommits() {
1004      return flushCommits;
1005    }
1006
1007    public boolean isWriteToWAL() {
1008      return writeToWAL;
1009    }
1010
1011    public boolean isAutoFlush() {
1012      return autoFlush;
1013    }
1014
1015    public boolean isUseTags() {
1016      return useTags;
1017    }
1018
1019    public int getNoOfTags() {
1020      return noOfTags;
1021    }
1022
1023    public boolean isReportLatency() {
1024      return reportLatency;
1025    }
1026
1027    public int getMultiGet() {
1028      return multiGet;
1029    }
1030
1031    public int getMultiPut() {
1032      return multiPut;
1033    }
1034
1035    public boolean isInMemoryCF() {
1036      return inMemoryCF;
1037    }
1038
1039    public int getPresplitRegions() {
1040      return presplitRegions;
1041    }
1042
1043    public Compression.Algorithm getCompression() {
1044      return compression;
1045    }
1046
1047    public DataBlockEncoding getBlockEncoding() {
1048      return blockEncoding;
1049    }
1050
1051    public boolean isValueRandom() {
1052      return valueRandom;
1053    }
1054
1055    public int getValueSize() {
1056      return valueSize;
1057    }
1058
1059    public int getPeriod() {
1060      return period;
1061    }
1062
1063    public BloomType getBloomType() {
1064      return bloomType;
1065    }
1066
1067    public int getBlockSize() {
1068      return blockSize;
1069    }
1070
1071    public boolean isOneCon() {
1072      return oneCon;
1073    }
1074
1075    public int getMeasureAfter() {
1076      return measureAfter;
1077    }
1078
1079    public void setMeasureAfter(int measureAfter) {
1080      this.measureAfter = measureAfter;
1081    }
1082
1083    public boolean getAddColumns() {
1084      return addColumns;
1085    }
1086
1087    public void setAddColumns(boolean addColumns) {
1088      this.addColumns = addColumns;
1089    }
1090
1091    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1092      this.inMemoryCompaction = inMemoryCompaction;
1093    }
1094
1095    public MemoryCompactionPolicy getInMemoryCompaction() {
1096      return this.inMemoryCompaction;
1097    }
1098
1099    public long getBufferSize() {
1100      return this.bufferSize;
1101    }
1102  }
1103
1104  /*
1105   * A test.
1106   * Subclass to particularize what happens per row.
1107   */
1108  static abstract class TestBase {
1109    // Below is make it so when Tests are all running in the one
1110    // jvm, that they each have a differently seeded Random.
1111    private static final Random randomSeed = new Random(System.currentTimeMillis());
1112
1113    private static long nextRandomSeed() {
1114      return randomSeed.nextLong();
1115    }
1116    private final int everyN;
1117
1118    protected final Random rand = new Random(nextRandomSeed());
1119    protected final Configuration conf;
1120    protected final TestOptions opts;
1121
1122    private final Status status;
1123    private final Sampler traceSampler;
1124    private final SpanReceiverHost receiverHost;
1125
1126    private String testName;
1127    private Histogram latencyHistogram;
1128    private Histogram valueSizeHistogram;
1129    private Histogram rpcCallsHistogram;
1130    private Histogram remoteRpcCallsHistogram;
1131    private Histogram millisBetweenNextHistogram;
1132    private Histogram regionsScannedHistogram;
1133    private Histogram bytesInResultsHistogram;
1134    private Histogram bytesInRemoteResultsHistogram;
1135    private RandomDistribution.Zipf zipf;
1136
1137    /**
1138     * Note that all subclasses of this class must provide a public constructor
1139     * that has the exact same list of arguments.
1140     */
1141    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1142      this.conf = conf;
1143      this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1144      this.opts = options;
1145      this.status = status;
1146      this.testName = this.getClass().getSimpleName();
1147      if (options.traceRate >= 1.0) {
1148        this.traceSampler = Sampler.ALWAYS;
1149      } else if (options.traceRate > 0.0) {
1150        conf.setDouble("hbase.sampler.fraction", options.traceRate);
1151        this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1152      } else {
1153        this.traceSampler = Sampler.NEVER;
1154      }
1155      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1156      if (options.isValueZipf()) {
1157        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1158      }
1159      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1160    }
1161
1162    int getValueLength(final Random r) {
1163      if (this.opts.isValueRandom()) {
1164        return r.nextInt(opts.valueSize);
1165      } else if (this.opts.isValueZipf()) {
1166        return Math.abs(this.zipf.nextInt());
1167      } else {
1168        return opts.valueSize;
1169      }
1170    }
1171
1172    void updateValueSize(final Result [] rs) throws IOException {
1173      if (rs == null || !isRandomValueSize()) return;
1174      for (Result r: rs) updateValueSize(r);
1175    }
1176
1177    void updateValueSize(final Result r) throws IOException {
1178      if (r == null || !isRandomValueSize()) return;
1179      int size = 0;
1180      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1181        size += scanner.current().getValueLength();
1182      }
1183      updateValueSize(size);
1184    }
1185
1186    void updateValueSize(final int valueSize) {
1187      if (!isRandomValueSize()) return;
1188      this.valueSizeHistogram.update(valueSize);
1189    }
1190
1191    void updateScanMetrics(final ScanMetrics metrics) {
1192      if (metrics == null) return;
1193      Map<String,Long> metricsMap = metrics.getMetricsMap();
1194      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1195      if (rpcCalls != null) {
1196        this.rpcCallsHistogram.update(rpcCalls.longValue());
1197      }
1198      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1199      if (remoteRpcCalls != null) {
1200        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1201      }
1202      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1203      if (millisBetweenNext != null) {
1204        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1205      }
1206      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1207      if (regionsScanned != null) {
1208        this.regionsScannedHistogram.update(regionsScanned.longValue());
1209      }
1210      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1211      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1212        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1213      }
1214      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1215      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1216        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1217      }
1218    }
1219
1220    String generateStatus(final int sr, final int i, final int lr) {
1221      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1222        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1223    }
1224
1225    boolean isRandomValueSize() {
1226      return opts.valueRandom;
1227    }
1228
1229    protected int getReportingPeriod() {
1230      return opts.period;
1231    }
1232
1233    /**
1234     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1235     */
1236    public Histogram getLatencyHistogram() {
1237      return latencyHistogram;
1238    }
1239
1240    void testSetup() throws IOException {
1241      // test metrics
1242      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1243      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1244      // scan metrics
1245      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1246      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1247      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1248      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1249      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1250      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1251
1252      onStartup();
1253    }
1254
1255    abstract void onStartup() throws IOException;
1256
1257    void testTakedown() throws IOException {
1258      onTakedown();
1259      // Print all stats for this thread continuously.
1260      // Synchronize on Test.class so different threads don't intermingle the
1261      // output. We can't use 'this' here because each thread has its own instance of Test class.
1262      synchronized (Test.class) {
1263        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1264        status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1265            latencyHistogram));
1266        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1267        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1268        if (valueSizeHistogram.getCount() > 0) {
1269          status.setStatus("ValueSize (bytes) : "
1270              + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1271          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1272          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1273        } else {
1274          status.setStatus("No valueSize statistics available");
1275        }
1276        if (rpcCallsHistogram.getCount() > 0) {
1277          status.setStatus("rpcCalls (count): " +
1278              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1279        }
1280        if (remoteRpcCallsHistogram.getCount() > 0) {
1281          status.setStatus("remoteRpcCalls (count): " +
1282              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1283        }
1284        if (millisBetweenNextHistogram.getCount() > 0) {
1285          status.setStatus("millisBetweenNext (latency): " +
1286              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1287        }
1288        if (regionsScannedHistogram.getCount() > 0) {
1289          status.setStatus("regionsScanned (count): " +
1290              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1291        }
1292        if (bytesInResultsHistogram.getCount() > 0) {
1293          status.setStatus("bytesInResults (size): " +
1294              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1295        }
1296        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1297          status.setStatus("bytesInRemoteResults (size): " +
1298              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1299        }
1300      }
1301      receiverHost.closeReceivers();
1302    }
1303
1304    abstract void onTakedown() throws IOException;
1305
1306
1307    /*
1308     * Run test
1309     * @return Elapsed time.
1310     * @throws IOException
1311     */
1312    long test() throws IOException, InterruptedException {
1313      testSetup();
1314      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1315      final long startTime = System.nanoTime();
1316      try {
1317        testTimed();
1318      } finally {
1319        testTakedown();
1320      }
1321      return (System.nanoTime() - startTime) / 1000000;
1322    }
1323
1324    int getStartRow() {
1325      return opts.startRow;
1326    }
1327
1328    int getLastRow() {
1329      return getStartRow() + opts.perClientRunRows;
1330    }
1331
1332    /**
1333     * Provides an extension point for tests that don't want a per row invocation.
1334     */
1335    void testTimed() throws IOException, InterruptedException {
1336      int startRow = getStartRow();
1337      int lastRow = getLastRow();
1338      TraceUtil.addSampler(traceSampler);
1339      // Report on completion of 1/10th of total.
1340      for (int ii = 0; ii < opts.cycles; ii++) {
1341        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1342        for (int i = startRow; i < lastRow; i++) {
1343          if (i % everyN != 0) continue;
1344          long startTime = System.nanoTime();
1345          boolean requestSent = false;
1346          try (TraceScope scope = TraceUtil.createTrace("test row");){
1347            requestSent = testRow(i);
1348          }
1349          if ( (i - startRow) > opts.measureAfter) {
1350            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1351            // first 9 times and sends the actual get request in the 10th iteration.
1352            // We should only set latency when actual request is sent because otherwise
1353            // it turns out to be 0.
1354            if (requestSent) {
1355              latencyHistogram.update((System.nanoTime() - startTime) / 1000);
1356            }
1357            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1358              status.setStatus(generateStatus(startRow, i, lastRow));
1359            }
1360          }
1361        }
1362      }
1363    }
1364
1365    /**
1366     * @return Subset of the histograms' calculation.
1367     */
1368    public String getShortLatencyReport() {
1369      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1370    }
1371
1372    /**
1373     * @return Subset of the histograms' calculation.
1374     */
1375    public String getShortValueSizeReport() {
1376      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1377    }
1378
1379
1380    /**
1381     * Test for individual row.
1382     * @param i Row index.
1383     * @return true if the row was sent to server and need to record metrics.
1384     *         False if not, multiGet and multiPut e.g., the rows are sent
1385     *         to server only if enough gets/puts are gathered.
1386     */
1387    abstract boolean testRow(final int i) throws IOException, InterruptedException;
1388  }
1389
1390  static abstract class Test extends TestBase {
1391    protected Connection connection;
1392
1393    Test(final Connection con, final TestOptions options, final Status status) {
1394      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1395      this.connection = con;
1396    }
1397  }
1398
1399  static abstract class AsyncTest extends TestBase {
1400    protected AsyncConnection connection;
1401
1402    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1403      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1404      this.connection = con;
1405    }
1406  }
1407
1408  static abstract class TableTest extends Test {
1409    protected Table table;
1410
1411    TableTest(Connection con, TestOptions options, Status status) {
1412      super(con, options, status);
1413    }
1414
1415    @Override
1416    void onStartup() throws IOException {
1417      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1418    }
1419
1420    @Override
1421    void onTakedown() throws IOException {
1422      table.close();
1423    }
1424  }
1425
1426  static abstract class AsyncTableTest extends AsyncTest {
1427    protected AsyncTable<?> table;
1428
1429    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1430      super(con, options, status);
1431    }
1432
1433    @Override
1434    void onStartup() throws IOException {
1435      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1436    }
1437
1438    @Override
1439    void onTakedown() throws IOException {
1440    }
1441  }
1442
1443  static class AsyncRandomReadTest extends AsyncTableTest {
1444    private final Consistency consistency;
1445    private ArrayList<Get> gets;
1446    private Random rd = new Random();
1447
1448    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1449      super(con, options, status);
1450      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1451      if (opts.multiGet > 0) {
1452        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1453        this.gets = new ArrayList<>(opts.multiGet);
1454      }
1455    }
1456
1457    @Override
1458    boolean testRow(final int i) throws IOException, InterruptedException {
1459      if (opts.randomSleep > 0) {
1460        Thread.sleep(rd.nextInt(opts.randomSleep));
1461      }
1462      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1463      for (int family = 0; family < opts.families; family++) {
1464        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1465        if (opts.addColumns) {
1466          for (int column = 0; column < opts.columns; column++) {
1467            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1468            get.addColumn(familyName, qualifier);
1469          }
1470        } else {
1471          get.addFamily(familyName);
1472        }
1473      }
1474      if (opts.filterAll) {
1475        get.setFilter(new FilterAllFilter());
1476      }
1477      get.setConsistency(consistency);
1478      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1479      try {
1480        if (opts.multiGet > 0) {
1481          this.gets.add(get);
1482          if (this.gets.size() == opts.multiGet) {
1483            Result[] rs =
1484                this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1485            updateValueSize(rs);
1486            this.gets.clear();
1487          } else {
1488            return false;
1489          }
1490        } else {
1491          updateValueSize(this.table.get(get).get());
1492        }
1493      } catch (ExecutionException e) {
1494        throw new IOException(e);
1495      }
1496      return true;
1497    }
1498
1499    public static RuntimeException runtime(Throwable e) {
1500      if (e instanceof RuntimeException) {
1501        return (RuntimeException) e;
1502      }
1503      return new RuntimeException(e);
1504    }
1505
1506    public static <V> V propagate(Callable<V> callable) {
1507      try {
1508        return callable.call();
1509      } catch (Exception e) {
1510        throw runtime(e);
1511      }
1512    }
1513
1514    @Override
1515    protected int getReportingPeriod() {
1516      int period = opts.perClientRunRows / 10;
1517      return period == 0 ? opts.perClientRunRows : period;
1518    }
1519
1520    @Override
1521    protected void testTakedown() throws IOException {
1522      if (this.gets != null && this.gets.size() > 0) {
1523        this.table.get(gets);
1524        this.gets.clear();
1525      }
1526      super.testTakedown();
1527    }
1528  }
1529
1530  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1531
1532    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1533      super(con, options, status);
1534    }
1535
1536    @Override
1537    protected byte[] generateRow(final int i) {
1538      return getRandomRow(this.rand, opts.totalRows);
1539    }
1540  }
1541
1542  static class AsyncScanTest extends AsyncTableTest {
1543    private ResultScanner testScanner;
1544    private AsyncTable<?> asyncTable;
1545
1546    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1547      super(con, options, status);
1548    }
1549
1550    @Override
1551    void onStartup() throws IOException {
1552      this.asyncTable =
1553          connection.getTable(TableName.valueOf(opts.tableName),
1554            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1555    }
1556
1557    @Override
1558    void testTakedown() throws IOException {
1559      if (this.testScanner != null) {
1560        updateScanMetrics(this.testScanner.getScanMetrics());
1561        this.testScanner.close();
1562      }
1563      super.testTakedown();
1564    }
1565
1566    @Override
1567    boolean testRow(final int i) throws IOException {
1568      if (this.testScanner == null) {
1569        Scan scan =
1570            new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1571                .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1572                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1573        for (int family = 0; family < opts.families; family++) {
1574          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1575          if (opts.addColumns) {
1576            for (int column = 0; column < opts.columns; column++) {
1577              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1578              scan.addColumn(familyName, qualifier);
1579            }
1580          } else {
1581            scan.addFamily(familyName);
1582          }
1583        }
1584        if (opts.filterAll) {
1585          scan.setFilter(new FilterAllFilter());
1586        }
1587        this.testScanner = asyncTable.getScanner(scan);
1588      }
1589      Result r = testScanner.next();
1590      updateValueSize(r);
1591      return true;
1592    }
1593  }
1594
1595  static class AsyncSequentialReadTest extends AsyncTableTest {
1596    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1597      super(con, options, status);
1598    }
1599
1600    @Override
1601    boolean testRow(final int i) throws IOException, InterruptedException {
1602      Get get = new Get(format(i));
1603      for (int family = 0; family < opts.families; family++) {
1604        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1605        if (opts.addColumns) {
1606          for (int column = 0; column < opts.columns; column++) {
1607            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1608            get.addColumn(familyName, qualifier);
1609          }
1610        } else {
1611          get.addFamily(familyName);
1612        }
1613      }
1614      if (opts.filterAll) {
1615        get.setFilter(new FilterAllFilter());
1616      }
1617      try {
1618        updateValueSize(table.get(get).get());
1619      } catch (ExecutionException e) {
1620        throw new IOException(e);
1621      }
1622      return true;
1623    }
1624  }
1625
1626  static class AsyncSequentialWriteTest extends AsyncTableTest {
1627    private ArrayList<Put> puts;
1628
1629    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1630      super(con, options, status);
1631      if (opts.multiPut > 0) {
1632        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1633        this.puts = new ArrayList<>(opts.multiPut);
1634      }
1635    }
1636
1637    protected byte[] generateRow(final int i) {
1638      return format(i);
1639    }
1640
1641    @Override
1642    @SuppressWarnings("ReturnValueIgnored")
1643    boolean testRow(final int i) throws IOException, InterruptedException {
1644      byte[] row = generateRow(i);
1645      Put put = new Put(row);
1646      for (int family = 0; family < opts.families; family++) {
1647        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1648        for (int column = 0; column < opts.columns; column++) {
1649          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1650          byte[] value = generateData(this.rand, getValueLength(this.rand));
1651          if (opts.useTags) {
1652            byte[] tag = generateData(this.rand, TAG_LENGTH);
1653            Tag[] tags = new Tag[opts.noOfTags];
1654            for (int n = 0; n < opts.noOfTags; n++) {
1655              Tag t = new ArrayBackedTag((byte) n, tag);
1656              tags[n] = t;
1657            }
1658            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1659              value, tags);
1660            put.add(kv);
1661            updateValueSize(kv.getValueLength());
1662          } else {
1663            put.addColumn(familyName, qualifier, value);
1664            updateValueSize(value.length);
1665          }
1666        }
1667      }
1668      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1669      try {
1670        table.put(put).get();
1671        if (opts.multiPut > 0) {
1672          this.puts.add(put);
1673          if (this.puts.size() == opts.multiPut) {
1674            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1675            this.puts.clear();
1676          } else {
1677            return false;
1678          }
1679        } else {
1680          table.put(put).get();
1681        }
1682      } catch (ExecutionException e) {
1683        throw new IOException(e);
1684      }
1685      return true;
1686    }
1687  }
1688
1689  static abstract class BufferedMutatorTest extends Test {
1690    protected BufferedMutator mutator;
1691    protected Table table;
1692
1693    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1694      super(con, options, status);
1695    }
1696
1697    @Override
1698    void onStartup() throws IOException {
1699      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1700      p.writeBufferSize(opts.bufferSize);
1701      this.mutator = connection.getBufferedMutator(p);
1702      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1703    }
1704
1705    @Override
1706    void onTakedown() throws IOException {
1707      mutator.close();
1708      table.close();
1709    }
1710  }
1711
1712  static class RandomSeekScanTest extends TableTest {
1713    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1714      super(con, options, status);
1715    }
1716
1717    @Override
1718    boolean testRow(final int i) throws IOException {
1719      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1720          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1721          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1722          .setScanMetricsEnabled(true);
1723      FilterList list = new FilterList();
1724      for (int family = 0; family < opts.families; family++) {
1725        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1726        if (opts.addColumns) {
1727          for (int column = 0; column < opts.columns; column++) {
1728            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1729            scan.addColumn(familyName, qualifier);
1730          }
1731        } else {
1732          scan.addFamily(familyName);
1733        }
1734      }
1735      if (opts.filterAll) {
1736        list.addFilter(new FilterAllFilter());
1737      }
1738      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1739      scan.setFilter(list);
1740      ResultScanner s = this.table.getScanner(scan);
1741      try {
1742        for (Result rr; (rr = s.next()) != null;) {
1743          updateValueSize(rr);
1744        }
1745      } finally {
1746        updateScanMetrics(s.getScanMetrics());
1747        s.close();
1748      }
1749      return true;
1750    }
1751
1752    @Override
1753    protected int getReportingPeriod() {
1754      int period = opts.perClientRunRows / 100;
1755      return period == 0 ? opts.perClientRunRows : period;
1756    }
1757
1758  }
1759
1760  static abstract class RandomScanWithRangeTest extends TableTest {
1761    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1762      super(con, options, status);
1763    }
1764
1765    @Override
1766    boolean testRow(final int i) throws IOException {
1767      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1768      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1769          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1770          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1771          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1772      for (int family = 0; family < opts.families; family++) {
1773        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1774        if (opts.addColumns) {
1775          for (int column = 0; column < opts.columns; column++) {
1776            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1777            scan.addColumn(familyName, qualifier);
1778          }
1779        } else {
1780          scan.addFamily(familyName);
1781        }
1782      }
1783      if (opts.filterAll) {
1784        scan.setFilter(new FilterAllFilter());
1785      }
1786      Result r = null;
1787      int count = 0;
1788      ResultScanner s = this.table.getScanner(scan);
1789      try {
1790        for (; (r = s.next()) != null;) {
1791          updateValueSize(r);
1792          count++;
1793        }
1794        if (i % 100 == 0) {
1795          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1796            Bytes.toString(startAndStopRow.getFirst()),
1797            Bytes.toString(startAndStopRow.getSecond()), count));
1798        }
1799      } finally {
1800        updateScanMetrics(s.getScanMetrics());
1801        s.close();
1802      }
1803      return true;
1804    }
1805
1806    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1807
1808    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1809      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1810      int stop = start + maxRange;
1811      return new Pair<>(format(start), format(stop));
1812    }
1813
1814    @Override
1815    protected int getReportingPeriod() {
1816      int period = opts.perClientRunRows / 100;
1817      return period == 0? opts.perClientRunRows: period;
1818    }
1819  }
1820
1821  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1822    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1823      super(con, options, status);
1824    }
1825
1826    @Override
1827    protected Pair<byte[], byte[]> getStartAndStopRow() {
1828      return generateStartAndStopRows(10);
1829    }
1830  }
1831
1832  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1833    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1834      super(con, options, status);
1835    }
1836
1837    @Override
1838    protected Pair<byte[], byte[]> getStartAndStopRow() {
1839      return generateStartAndStopRows(100);
1840    }
1841  }
1842
1843  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1844    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1845      super(con, options, status);
1846    }
1847
1848    @Override
1849    protected Pair<byte[], byte[]> getStartAndStopRow() {
1850      return generateStartAndStopRows(1000);
1851    }
1852  }
1853
1854  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1855    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1856      super(con, options, status);
1857    }
1858
1859    @Override
1860    protected Pair<byte[], byte[]> getStartAndStopRow() {
1861      return generateStartAndStopRows(10000);
1862    }
1863  }
1864
1865  static class RandomReadTest extends TableTest {
1866    private final Consistency consistency;
1867    private ArrayList<Get> gets;
1868    private Random rd = new Random();
1869
1870    RandomReadTest(Connection con, TestOptions options, Status status) {
1871      super(con, options, status);
1872      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1873      if (opts.multiGet > 0) {
1874        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1875        this.gets = new ArrayList<>(opts.multiGet);
1876      }
1877    }
1878
1879    @Override
1880    boolean testRow(final int i) throws IOException, InterruptedException {
1881      if (opts.randomSleep > 0) {
1882        Thread.sleep(rd.nextInt(opts.randomSleep));
1883      }
1884      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1885      for (int family = 0; family < opts.families; family++) {
1886        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1887        if (opts.addColumns) {
1888          for (int column = 0; column < opts.columns; column++) {
1889            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1890            get.addColumn(familyName, qualifier);
1891          }
1892        } else {
1893          get.addFamily(familyName);
1894        }
1895      }
1896      if (opts.filterAll) {
1897        get.setFilter(new FilterAllFilter());
1898      }
1899      get.setConsistency(consistency);
1900      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1901      if (opts.multiGet > 0) {
1902        this.gets.add(get);
1903        if (this.gets.size() == opts.multiGet) {
1904          Result [] rs = this.table.get(this.gets);
1905          updateValueSize(rs);
1906          this.gets.clear();
1907        } else {
1908          return false;
1909        }
1910      } else {
1911        updateValueSize(this.table.get(get));
1912      }
1913      return true;
1914    }
1915
1916    @Override
1917    protected int getReportingPeriod() {
1918      int period = opts.perClientRunRows / 10;
1919      return period == 0 ? opts.perClientRunRows : period;
1920    }
1921
1922    @Override
1923    protected void testTakedown() throws IOException {
1924      if (this.gets != null && this.gets.size() > 0) {
1925        this.table.get(gets);
1926        this.gets.clear();
1927      }
1928      super.testTakedown();
1929    }
1930  }
1931
1932  static class RandomWriteTest extends SequentialWriteTest {
1933    RandomWriteTest(Connection con, TestOptions options, Status status) {
1934      super(con, options, status);
1935    }
1936
1937    @Override
1938    protected byte[] generateRow(final int i) {
1939      return getRandomRow(this.rand, opts.totalRows);
1940    }
1941
1942
1943  }
1944
1945  static class ScanTest extends TableTest {
1946    private ResultScanner testScanner;
1947
1948    ScanTest(Connection con, TestOptions options, Status status) {
1949      super(con, options, status);
1950    }
1951
1952    @Override
1953    void testTakedown() throws IOException {
1954      if (this.testScanner != null) {
1955        this.testScanner.close();
1956      }
1957      super.testTakedown();
1958    }
1959
1960
1961    @Override
1962    boolean testRow(final int i) throws IOException {
1963      if (this.testScanner == null) {
1964        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1965            .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1966            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1967        for (int family = 0; family < opts.families; family++) {
1968          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1969          if (opts.addColumns) {
1970            for (int column = 0; column < opts.columns; column++) {
1971              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1972              scan.addColumn(familyName, qualifier);
1973            }
1974          } else {
1975            scan.addFamily(familyName);
1976          }
1977        }
1978        if (opts.filterAll) {
1979          scan.setFilter(new FilterAllFilter());
1980        }
1981        this.testScanner = table.getScanner(scan);
1982      }
1983      Result r = testScanner.next();
1984      updateValueSize(r);
1985      return true;
1986    }
1987  }
1988
1989  /**
1990   * Base class for operations that are CAS-like; that read a value and then set it based off what
1991   * they read. In this category is increment, append, checkAndPut, etc.
1992   *
1993   * <p>These operations also want some concurrency going on. Usually when these tests run, they
1994   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
1995   * same key space. We do this with our getStartRow and getLastRow overrides.
1996   */
1997  static abstract class CASTableTest extends TableTest {
1998    private final byte [] qualifier;
1999    CASTableTest(Connection con, TestOptions options, Status status) {
2000      super(con, options, status);
2001      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2002    }
2003
2004    byte [] getQualifier() {
2005      return this.qualifier;
2006    }
2007
2008    @Override
2009    int getStartRow() {
2010      return 0;
2011    }
2012
2013    @Override
2014    int getLastRow() {
2015      return opts.perClientRunRows;
2016    }
2017  }
2018
2019  static class IncrementTest extends CASTableTest {
2020    IncrementTest(Connection con, TestOptions options, Status status) {
2021      super(con, options, status);
2022    }
2023
2024    @Override
2025    boolean testRow(final int i) throws IOException {
2026      Increment increment = new Increment(format(i));
2027      // unlike checkAndXXX tests, which make most sense to do on a single value,
2028      // if multiple families are specified for an increment test we assume it is
2029      // meant to raise the work factor
2030      for (int family = 0; family < opts.families; family++) {
2031        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2032        increment.addColumn(familyName, getQualifier(), 1l);
2033      }
2034      updateValueSize(this.table.increment(increment));
2035      return true;
2036    }
2037  }
2038
2039  static class AppendTest extends CASTableTest {
2040    AppendTest(Connection con, TestOptions options, Status status) {
2041      super(con, options, status);
2042    }
2043
2044    @Override
2045    boolean testRow(final int i) throws IOException {
2046      byte [] bytes = format(i);
2047      Append append = new Append(bytes);
2048      // unlike checkAndXXX tests, which make most sense to do on a single value,
2049      // if multiple families are specified for an append test we assume it is
2050      // meant to raise the work factor
2051      for (int family = 0; family < opts.families; family++) {
2052        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2053        append.addColumn(familyName, getQualifier(), bytes);
2054      }
2055      updateValueSize(this.table.append(append));
2056      return true;
2057    }
2058  }
2059
2060  static class CheckAndMutateTest extends CASTableTest {
2061    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2062      super(con, options, status);
2063    }
2064
2065    @Override
2066    boolean testRow(final int i) throws IOException {
2067      final byte [] bytes = format(i);
2068      // checkAndXXX tests operate on only a single value
2069      // Put a known value so when we go to check it, it is there.
2070      Put put = new Put(bytes);
2071      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2072      this.table.put(put);
2073      RowMutations mutations = new RowMutations(bytes);
2074      mutations.add(put);
2075      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2076          .ifEquals(bytes).thenMutate(mutations);
2077      return true;
2078    }
2079  }
2080
2081  static class CheckAndPutTest extends CASTableTest {
2082    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2083      super(con, options, status);
2084    }
2085
2086    @Override
2087    boolean testRow(final int i) throws IOException {
2088      final byte [] bytes = format(i);
2089      // checkAndXXX tests operate on only a single value
2090      // Put a known value so when we go to check it, it is there.
2091      Put put = new Put(bytes);
2092      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2093      this.table.put(put);
2094      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2095          .ifEquals(bytes).thenPut(put);
2096      return true;
2097    }
2098  }
2099
2100  static class CheckAndDeleteTest extends CASTableTest {
2101    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2102      super(con, options, status);
2103    }
2104
2105    @Override
2106    boolean testRow(final int i) throws IOException {
2107      final byte [] bytes = format(i);
2108      // checkAndXXX tests operate on only a single value
2109      // Put a known value so when we go to check it, it is there.
2110      Put put = new Put(bytes);
2111      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2112      this.table.put(put);
2113      Delete delete = new Delete(put.getRow());
2114      delete.addColumn(FAMILY_ZERO, getQualifier());
2115      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2116          .ifEquals(bytes).thenDelete(delete);
2117      return true;
2118    }
2119  }
2120
2121  static class SequentialReadTest extends TableTest {
2122    SequentialReadTest(Connection con, TestOptions options, Status status) {
2123      super(con, options, status);
2124    }
2125
2126    @Override
2127    boolean testRow(final int i) throws IOException {
2128      Get get = new Get(format(i));
2129      for (int family = 0; family < opts.families; family++) {
2130        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2131        if (opts.addColumns) {
2132          for (int column = 0; column < opts.columns; column++) {
2133            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2134            get.addColumn(familyName, qualifier);
2135          }
2136        } else {
2137          get.addFamily(familyName);
2138        }
2139      }
2140      if (opts.filterAll) {
2141        get.setFilter(new FilterAllFilter());
2142      }
2143      updateValueSize(table.get(get));
2144      return true;
2145    }
2146  }
2147
2148  static class SequentialWriteTest extends BufferedMutatorTest {
2149    private ArrayList<Put> puts;
2150
2151
2152    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2153      super(con, options, status);
2154      if (opts.multiPut > 0) {
2155        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2156        this.puts = new ArrayList<>(opts.multiPut);
2157      }
2158    }
2159
2160    protected byte[] generateRow(final int i) {
2161      return format(i);
2162    }
2163
2164    @Override
2165    boolean testRow(final int i) throws IOException {
2166      byte[] row = generateRow(i);
2167      Put put = new Put(row);
2168      for (int family = 0; family < opts.families; family++) {
2169        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2170        for (int column = 0; column < opts.columns; column++) {
2171          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2172          byte[] value = generateData(this.rand, getValueLength(this.rand));
2173          if (opts.useTags) {
2174            byte[] tag = generateData(this.rand, TAG_LENGTH);
2175            Tag[] tags = new Tag[opts.noOfTags];
2176            for (int n = 0; n < opts.noOfTags; n++) {
2177              Tag t = new ArrayBackedTag((byte) n, tag);
2178              tags[n] = t;
2179            }
2180            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2181              value, tags);
2182            put.add(kv);
2183            updateValueSize(kv.getValueLength());
2184          } else {
2185            put.addColumn(familyName, qualifier, value);
2186            updateValueSize(value.length);
2187          }
2188        }
2189      }
2190      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2191      if (opts.autoFlush) {
2192        if (opts.multiPut > 0) {
2193          this.puts.add(put);
2194          if (this.puts.size() == opts.multiPut) {
2195            table.put(this.puts);
2196            this.puts.clear();
2197          } else {
2198            return false;
2199          }
2200        } else {
2201          table.put(put);
2202        }
2203      } else {
2204        mutator.mutate(put);
2205      }
2206      return true;
2207    }
2208  }
2209
2210  static class FilteredScanTest extends TableTest {
2211    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2212
2213    FilteredScanTest(Connection con, TestOptions options, Status status) {
2214      super(con, options, status);
2215    }
2216
2217    @Override
2218    boolean testRow(int i) throws IOException {
2219      byte[] value = generateData(this.rand, getValueLength(this.rand));
2220      Scan scan = constructScan(value);
2221      ResultScanner scanner = null;
2222      try {
2223        scanner = this.table.getScanner(scan);
2224        for (Result r = null; (r = scanner.next()) != null;) {
2225          updateValueSize(r);
2226        }
2227      } finally {
2228        if (scanner != null) {
2229          updateScanMetrics(scanner.getScanMetrics());
2230          scanner.close();
2231        }
2232      }
2233      return true;
2234    }
2235
2236    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2237      FilterList list = new FilterList();
2238      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2239        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2240      list.addFilter(filter);
2241      if (opts.filterAll) {
2242        list.addFilter(new FilterAllFilter());
2243      }
2244      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2245          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2246          .setScanMetricsEnabled(true);
2247      if (opts.addColumns) {
2248        for (int column = 0; column < opts.columns; column++) {
2249          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2250          scan.addColumn(FAMILY_ZERO, qualifier);
2251        }
2252      } else {
2253        scan.addFamily(FAMILY_ZERO);
2254      }
2255      scan.setFilter(list);
2256      return scan;
2257    }
2258  }
2259
2260  /**
2261   * Compute a throughput rate in MB/s.
2262   * @param rows Number of records consumed.
2263   * @param timeMs Time taken in milliseconds.
2264   * @return String value with label, ie '123.76 MB/s'
2265   */
2266  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2267    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2268      ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2269    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2270      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2271      .divide(BYTES_PER_MB, CXT);
2272    return FMT.format(mbps) + " MB/s";
2273  }
2274
2275  /*
2276   * Format passed integer.
2277   * @param number
2278   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2279   * number (Does absolute in case number is negative).
2280   */
2281  public static byte [] format(final int number) {
2282    byte [] b = new byte[ROW_LENGTH];
2283    int d = Math.abs(number);
2284    for (int i = b.length - 1; i >= 0; i--) {
2285      b[i] = (byte)((d % 10) + '0');
2286      d /= 10;
2287    }
2288    return b;
2289  }
2290
2291  /*
2292   * This method takes some time and is done inline uploading data.  For
2293   * example, doing the mapfile test, generation of the key and value
2294   * consumes about 30% of CPU time.
2295   * @return Generated random value to insert into a table cell.
2296   */
2297  public static byte[] generateData(final Random r, int length) {
2298    byte [] b = new byte [length];
2299    int i;
2300
2301    for(i = 0; i < (length-8); i += 8) {
2302      b[i] = (byte) (65 + r.nextInt(26));
2303      b[i+1] = b[i];
2304      b[i+2] = b[i];
2305      b[i+3] = b[i];
2306      b[i+4] = b[i];
2307      b[i+5] = b[i];
2308      b[i+6] = b[i];
2309      b[i+7] = b[i];
2310    }
2311
2312    byte a = (byte) (65 + r.nextInt(26));
2313    for(; i < length; i++) {
2314      b[i] = a;
2315    }
2316    return b;
2317  }
2318
2319  static byte [] getRandomRow(final Random random, final int totalRows) {
2320    return format(generateRandomRow(random, totalRows));
2321  }
2322
2323  static int generateRandomRow(final Random random, final int totalRows) {
2324    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2325  }
2326
2327  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2328      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2329      throws IOException, InterruptedException {
2330    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2331        + opts.perClientRunRows + " rows");
2332    long totalElapsedTime;
2333
2334    final TestBase t;
2335    try {
2336      if (AsyncTest.class.isAssignableFrom(cmd)) {
2337        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2338        Constructor<? extends AsyncTest> constructor =
2339            newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2340        t = constructor.newInstance(asyncCon, opts, status);
2341      } else {
2342        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2343        Constructor<? extends Test> constructor =
2344            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2345        t = constructor.newInstance(con, opts, status);
2346      }
2347    } catch (NoSuchMethodException e) {
2348      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2349          + ".  It does not provide a constructor as described by "
2350          + "the javadoc comment.  Available constructors are: "
2351          + Arrays.toString(cmd.getConstructors()));
2352    } catch (Exception e) {
2353      throw new IllegalStateException("Failed to construct command class", e);
2354    }
2355    totalElapsedTime = t.test();
2356
2357    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2358      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2359      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2360          getAverageValueLength(opts), opts.families, opts.columns) + ")");
2361
2362    return new RunResult(totalElapsedTime, t.getLatencyHistogram());
2363  }
2364
2365  private static int getAverageValueLength(final TestOptions opts) {
2366    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2367  }
2368
2369  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2370      InterruptedException, ClassNotFoundException, ExecutionException {
2371    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2372    // the TestOptions introspection for us and dump the output in a readable format.
2373    LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
2374    Admin admin = null;
2375    Connection connection = null;
2376    try {
2377      connection = ConnectionFactory.createConnection(getConf());
2378      admin = connection.getAdmin();
2379      checkTable(admin, opts);
2380    } finally {
2381      if (admin != null) admin.close();
2382      if (connection != null) connection.close();
2383    }
2384    if (opts.nomapred) {
2385      doLocalClients(opts, getConf());
2386    } else {
2387      doMapReduce(opts, getConf());
2388    }
2389  }
2390
2391  protected void printUsage() {
2392    printUsage(PE_COMMAND_SHORTNAME, null);
2393  }
2394
2395  protected static void printUsage(final String message) {
2396    printUsage(PE_COMMAND_SHORTNAME, message);
2397  }
2398
2399  protected static void printUsageAndExit(final String message, final int exitCode) {
2400    printUsage(message);
2401    System.exit(exitCode);
2402  }
2403
2404  protected static void printUsage(final String shortName, final String message) {
2405    if (message != null && message.length() > 0) {
2406      System.err.println(message);
2407    }
2408    System.err.print("Usage: hbase " + shortName);
2409    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2410    System.err.println();
2411    System.err.println("General Options:");
2412    System.err.println(" nomapred        Run multiple clients using threads " +
2413      "(rather than use mapreduce)");
2414    System.err.println(" oneCon          all the threads share the same connection. Default: False");
2415    System.err.println(" connCount          connections all threads share. "
2416        + "For example, if set to 2, then all thread share 2 connection. "
2417        + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2418        + "if not, connCount=thread number");
2419
2420    System.err.println(" sampleRate      Execute test on a sample of total " +
2421      "rows. Only supported by randomRead. Default: 1.0");
2422    System.err.println(" period          Report every 'period' rows: " +
2423      "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2424    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2425    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
2426      "Default: 0");
2427    System.err.println(" latency         Set to report operation latencies. Default: False");
2428    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'" +
2429        " rows have been treated. Default: 0");
2430    System.err.println(" valueSize       Pass value size to use: Default: "
2431        + DEFAULT_OPTS.getValueSize());
2432    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
2433        "'valueSize'; set on read for stats on size: Default: Not set.");
2434    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2435        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2436    System.err.println();
2437    System.err.println("Table Creation / Write Tests:");
2438    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2439    System.err.println(" rows            Rows each client runs. Default: "
2440        + DEFAULT_OPTS.getPerClientRunRows()
2441        + ".  In case of randomReads and randomSeekScans this could"
2442        + " be specified along with --size to specify the number of rows to be scanned within"
2443        + " the total range specified by the size.");
2444    System.err.println(
2445      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2446          + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2447          + " use size to specify the end range and --rows"
2448          + " specifies the number of rows within that range. " + "Default: 1.0.");
2449    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2450    System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
2451      "Default: false");
2452    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
2453        "'valueSize' in zipf form: Default: Not set.");
2454    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2455    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2456    System.err.println(" multiPut        Batch puts together into groups of N. Only supported " +
2457        "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2458    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2459        + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2460        + "Recommended for accurate perf analysis (see guide). Default: disabled");
2461    System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
2462      "Default: false");
2463    System.err.println(" numoftags       Specify the no of tags that would be needed. " +
2464       "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2465    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2466    System.err.println(" columns         Columns to write per row. Default: 1");
2467    System.err.println(" families        Specify number of column families for the table. Default: 1");
2468    System.err.println();
2469    System.err.println("Read Tests:");
2470    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2471        + " there by not returning any thing back to the client.  Helps to check the server side"
2472        + " performance.  Uses FilterAllFilter internally. ");
2473    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
2474      "by randomRead. Default: disabled");
2475    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
2476      "inmemory as far as possible. Not guaranteed that reads are always served " +
2477      "from memory.  Default: false");
2478    System.err.println(" bloomFilter     Bloom filter type, one of "
2479        + Arrays.toString(BloomType.values()));
2480    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2481    System.err.println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2482        + "Uses the CompactingMemstore");
2483    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2484    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2485    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2486    System.err.println(" caching         Scan caching to use. Default: 30");
2487    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2488    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2489    System.err.println(" scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2490    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2491    System.err.println();
2492    System.err.println(" Note: -D properties will be applied to the conf used. ");
2493    System.err.println("  For example: ");
2494    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2495    System.err.println("   -Dmapreduce.task.timeout=60000");
2496    System.err.println();
2497    System.err.println("Command:");
2498    for (CmdDescriptor command : COMMANDS.values()) {
2499      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2500    }
2501    System.err.println();
2502    System.err.println("Args:");
2503    System.err.println(" nclients        Integer. Required. Total number of clients "
2504        + "(and HRegionServers) running. 1 <= value <= 500");
2505    System.err.println("Examples:");
2506    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2507    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2508    System.err.println(" To run 10 clients doing increments over ten rows:");
2509    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2510  }
2511
2512  /**
2513   * Parse options passed in via an arguments array. Assumes that array has been split
2514   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2515   * in the queue at the conclusion of this method call. It's up to the caller to deal
2516   * with these unrecognized arguments.
2517   */
2518  static TestOptions parseOpts(Queue<String> args) {
2519    TestOptions opts = new TestOptions();
2520
2521    String cmd = null;
2522    while ((cmd = args.poll()) != null) {
2523      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2524        // place item back onto queue so that caller knows parsing was incomplete
2525        args.add(cmd);
2526        break;
2527      }
2528
2529      final String nmr = "--nomapred";
2530      if (cmd.startsWith(nmr)) {
2531        opts.nomapred = true;
2532        continue;
2533      }
2534
2535      final String rows = "--rows=";
2536      if (cmd.startsWith(rows)) {
2537        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2538        continue;
2539      }
2540
2541      final String cycles = "--cycles=";
2542      if (cmd.startsWith(cycles)) {
2543        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2544        continue;
2545      }
2546
2547      final String sampleRate = "--sampleRate=";
2548      if (cmd.startsWith(sampleRate)) {
2549        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2550        continue;
2551      }
2552
2553      final String table = "--table=";
2554      if (cmd.startsWith(table)) {
2555        opts.tableName = cmd.substring(table.length());
2556        continue;
2557      }
2558
2559      final String startRow = "--startRow=";
2560      if (cmd.startsWith(startRow)) {
2561        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2562        continue;
2563      }
2564
2565      final String compress = "--compress=";
2566      if (cmd.startsWith(compress)) {
2567        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2568        continue;
2569      }
2570
2571      final String traceRate = "--traceRate=";
2572      if (cmd.startsWith(traceRate)) {
2573        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2574        continue;
2575      }
2576
2577      final String blockEncoding = "--blockEncoding=";
2578      if (cmd.startsWith(blockEncoding)) {
2579        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2580        continue;
2581      }
2582
2583      final String flushCommits = "--flushCommits=";
2584      if (cmd.startsWith(flushCommits)) {
2585        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2586        continue;
2587      }
2588
2589      final String writeToWAL = "--writeToWAL=";
2590      if (cmd.startsWith(writeToWAL)) {
2591        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2592        continue;
2593      }
2594
2595      final String presplit = "--presplit=";
2596      if (cmd.startsWith(presplit)) {
2597        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2598        continue;
2599      }
2600
2601      final String inMemory = "--inmemory=";
2602      if (cmd.startsWith(inMemory)) {
2603        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2604        continue;
2605      }
2606
2607      final String autoFlush = "--autoFlush=";
2608      if (cmd.startsWith(autoFlush)) {
2609        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2610        if (!opts.autoFlush && opts.multiPut > 0) {
2611          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2612        }
2613        continue;
2614      }
2615
2616      final String onceCon = "--oneCon=";
2617      if (cmd.startsWith(onceCon)) {
2618        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2619        if (opts.oneCon && opts.connCount > 1) {
2620          throw new IllegalArgumentException("oneCon is set to true, "
2621              + "connCount should not bigger than 1");
2622        }
2623        continue;
2624      }
2625
2626      final String connCount = "--connCount=";
2627      if (cmd.startsWith(connCount)) {
2628        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2629        if (opts.oneCon && opts.connCount > 1) {
2630          throw new IllegalArgumentException("oneCon is set to true, "
2631              + "connCount should not bigger than 1");
2632        }
2633        continue;
2634      }
2635
2636      final String latency = "--latency";
2637      if (cmd.startsWith(latency)) {
2638        opts.reportLatency = true;
2639        continue;
2640      }
2641
2642      final String multiGet = "--multiGet=";
2643      if (cmd.startsWith(multiGet)) {
2644        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2645        continue;
2646      }
2647
2648      final String multiPut = "--multiPut=";
2649      if (cmd.startsWith(multiPut)) {
2650        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2651        if (!opts.autoFlush && opts.multiPut > 0) {
2652          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2653        }
2654        continue;
2655      }
2656
2657      final String useTags = "--usetags=";
2658      if (cmd.startsWith(useTags)) {
2659        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2660        continue;
2661      }
2662
2663      final String noOfTags = "--numoftags=";
2664      if (cmd.startsWith(noOfTags)) {
2665        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2666        continue;
2667      }
2668
2669      final String replicas = "--replicas=";
2670      if (cmd.startsWith(replicas)) {
2671        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2672        continue;
2673      }
2674
2675      final String filterOutAll = "--filterAll";
2676      if (cmd.startsWith(filterOutAll)) {
2677        opts.filterAll = true;
2678        continue;
2679      }
2680
2681      final String size = "--size=";
2682      if (cmd.startsWith(size)) {
2683        opts.size = Float.parseFloat(cmd.substring(size.length()));
2684        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2685        continue;
2686      }
2687
2688      final String splitPolicy = "--splitPolicy=";
2689      if (cmd.startsWith(splitPolicy)) {
2690        opts.splitPolicy = cmd.substring(splitPolicy.length());
2691        continue;
2692      }
2693
2694      final String randomSleep = "--randomSleep=";
2695      if (cmd.startsWith(randomSleep)) {
2696        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2697        continue;
2698      }
2699
2700      final String measureAfter = "--measureAfter=";
2701      if (cmd.startsWith(measureAfter)) {
2702        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2703        continue;
2704      }
2705
2706      final String bloomFilter = "--bloomFilter=";
2707      if (cmd.startsWith(bloomFilter)) {
2708        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2709        continue;
2710      }
2711
2712      final String blockSize = "--blockSize=";
2713      if(cmd.startsWith(blockSize) ) {
2714        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2715      }
2716
2717      final String valueSize = "--valueSize=";
2718      if (cmd.startsWith(valueSize)) {
2719        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2720        continue;
2721      }
2722
2723      final String valueRandom = "--valueRandom";
2724      if (cmd.startsWith(valueRandom)) {
2725        opts.valueRandom = true;
2726        if (opts.valueZipf) {
2727          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2728        }
2729        continue;
2730      }
2731
2732      final String valueZipf = "--valueZipf";
2733      if (cmd.startsWith(valueZipf)) {
2734        opts.valueZipf = true;
2735        if (opts.valueRandom) {
2736          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2737        }
2738        continue;
2739      }
2740
2741      final String period = "--period=";
2742      if (cmd.startsWith(period)) {
2743        opts.period = Integer.parseInt(cmd.substring(period.length()));
2744        continue;
2745      }
2746
2747      final String addColumns = "--addColumns=";
2748      if (cmd.startsWith(addColumns)) {
2749        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2750        continue;
2751      }
2752
2753      final String inMemoryCompaction = "--inmemoryCompaction=";
2754      if (cmd.startsWith(inMemoryCompaction)) {
2755        opts.inMemoryCompaction =
2756            MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2757        continue;
2758      }
2759
2760      final String columns = "--columns=";
2761      if (cmd.startsWith(columns)) {
2762        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2763        continue;
2764      }
2765
2766      final String families = "--families=";
2767      if (cmd.startsWith(families)) {
2768        opts.families = Integer.parseInt(cmd.substring(families.length()));
2769        continue;
2770      }
2771
2772      final String caching = "--caching=";
2773      if (cmd.startsWith(caching)) {
2774        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2775        continue;
2776      }
2777
2778      final String asyncPrefetch = "--asyncPrefetch";
2779      if (cmd.startsWith(asyncPrefetch)) {
2780        opts.asyncPrefetch = true;
2781        continue;
2782      }
2783
2784      final String cacheBlocks = "--cacheBlocks=";
2785      if (cmd.startsWith(cacheBlocks)) {
2786        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2787        continue;
2788      }
2789
2790      final String scanReadType = "--scanReadType=";
2791      if (cmd.startsWith(scanReadType)) {
2792        opts.scanReadType =
2793            Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2794        continue;
2795      }
2796
2797      final String bufferSize = "--bufferSize=";
2798      if (cmd.startsWith(bufferSize)) {
2799        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2800        continue;
2801      }
2802
2803      if (isCommandClass(cmd)) {
2804        opts.cmdName = cmd;
2805        try {
2806          opts.numClientThreads = Integer.parseInt(args.remove());
2807        } catch (NoSuchElementException | NumberFormatException e) {
2808          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2809        }
2810        opts = calculateRowsAndSize(opts);
2811        break;
2812      } else {
2813        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2814      }
2815
2816      // Not matching any option or command.
2817      System.err.println("Error: Wrong option or command: " + cmd);
2818      args.add(cmd);
2819      break;
2820    }
2821    return opts;
2822  }
2823
2824  static TestOptions calculateRowsAndSize(final TestOptions opts) {
2825    int rowsPerGB = getRowsPerGB(opts);
2826    if ((opts.getCmdName() != null
2827        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
2828        && opts.size != DEFAULT_OPTS.size
2829        && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2830      opts.totalRows = (int) opts.size * rowsPerGB;
2831    } else if (opts.size != DEFAULT_OPTS.size) {
2832      // total size in GB specified
2833      opts.totalRows = (int) opts.size * rowsPerGB;
2834      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2835    } else {
2836      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2837      opts.size = opts.totalRows / rowsPerGB;
2838    }
2839    return opts;
2840  }
2841
2842  static int getRowsPerGB(final TestOptions opts) {
2843    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
2844        opts.getColumns());
2845  }
2846
2847  @Override
2848  public int run(String[] args) throws Exception {
2849    // Process command-line args. TODO: Better cmd-line processing
2850    // (but hopefully something not as painful as cli options).
2851    int errCode = -1;
2852    if (args.length < 1) {
2853      printUsage();
2854      return errCode;
2855    }
2856
2857    try {
2858      LinkedList<String> argv = new LinkedList<>();
2859      argv.addAll(Arrays.asList(args));
2860      TestOptions opts = parseOpts(argv);
2861
2862      // args remaining, print help and exit
2863      if (!argv.isEmpty()) {
2864        errCode = 0;
2865        printUsage();
2866        return errCode;
2867      }
2868
2869      // must run at least 1 client
2870      if (opts.numClientThreads <= 0) {
2871        throw new IllegalArgumentException("Number of clients must be > 0");
2872      }
2873
2874      // cmdName should not be null, print help and exit
2875      if (opts.cmdName == null) {
2876        printUsage();
2877        return errCode;
2878      }
2879
2880      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
2881      if (cmdClass != null) {
2882        runTest(cmdClass, opts);
2883        errCode = 0;
2884      }
2885
2886    } catch (Exception e) {
2887      e.printStackTrace();
2888    }
2889
2890    return errCode;
2891  }
2892
2893  private static boolean isCommandClass(String cmd) {
2894    return COMMANDS.containsKey(cmd);
2895  }
2896
2897  private static Class<? extends TestBase> determineCommandClass(String cmd) {
2898    CmdDescriptor descriptor = COMMANDS.get(cmd);
2899    return descriptor != null ? descriptor.getCmdClass() : null;
2900  }
2901
2902  public static void main(final String[] args) throws Exception {
2903    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2904    System.exit(res);
2905  }
2906}