001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import com.codahale.metrics.Histogram;
022import com.codahale.metrics.UniformReservoir;
023import java.io.IOException;
024import java.io.PrintStream;
025import java.lang.reflect.Constructor;
026import java.math.BigDecimal;
027import java.math.MathContext;
028import java.text.DecimalFormat;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Date;
033import java.util.LinkedList;
034import java.util.Locale;
035import java.util.Map;
036import java.util.NoSuchElementException;
037import java.util.Queue;
038import java.util.Random;
039import java.util.TreeMap;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import org.apache.commons.lang3.StringUtils;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.conf.Configured;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.Append;
052import org.apache.hadoop.hbase.client.AsyncConnection;
053import org.apache.hadoop.hbase.client.AsyncTable;
054import org.apache.hadoop.hbase.client.BufferedMutator;
055import org.apache.hadoop.hbase.client.BufferedMutatorParams;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Consistency;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Durability;
061import org.apache.hadoop.hbase.client.Get;
062import org.apache.hadoop.hbase.client.Increment;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.RowMutations;
067import org.apache.hadoop.hbase.client.Scan;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
070import org.apache.hadoop.hbase.filter.BinaryComparator;
071import org.apache.hadoop.hbase.filter.Filter;
072import org.apache.hadoop.hbase.filter.FilterAllFilter;
073import org.apache.hadoop.hbase.filter.FilterList;
074import org.apache.hadoop.hbase.filter.PageFilter;
075import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
076import org.apache.hadoop.hbase.filter.WhileMatchFilter;
077import org.apache.hadoop.hbase.io.compress.Compression;
078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
079import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
080import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
081import org.apache.hadoop.hbase.regionserver.BloomType;
082import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
083import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
084import org.apache.hadoop.hbase.trace.SpanReceiverHost;
085import org.apache.hadoop.hbase.trace.TraceUtil;
086import org.apache.hadoop.hbase.util.ByteArrayHashKey;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.hadoop.hbase.util.GsonUtil;
089import org.apache.hadoop.hbase.util.Hash;
090import org.apache.hadoop.hbase.util.MurmurHash;
091import org.apache.hadoop.hbase.util.Pair;
092import org.apache.hadoop.hbase.util.YammerHistogramUtils;
093import org.apache.hadoop.io.LongWritable;
094import org.apache.hadoop.io.Text;
095import org.apache.hadoop.mapreduce.Job;
096import org.apache.hadoop.mapreduce.Mapper;
097import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
098import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
099import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
100import org.apache.hadoop.util.Tool;
101import org.apache.hadoop.util.ToolRunner;
102import org.apache.htrace.core.ProbabilitySampler;
103import org.apache.htrace.core.Sampler;
104import org.apache.htrace.core.TraceScope;
105import org.apache.yetus.audience.InterfaceAudience;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
110import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
111import org.apache.hbase.thirdparty.com.google.gson.Gson;
112
113/**
114 * Script used evaluating HBase performance and scalability.  Runs a HBase
115 * client that steps through one of a set of hardcoded tests or 'experiments'
116 * (e.g. a random reads test, a random writes test, etc.). Pass on the
117 * command-line which test to run and how many clients are participating in
118 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
119 *
120 * <p>This class sets up and runs the evaluation programs described in
121 * Section 7, <i>Performance Evaluation</i>, of the <a
122 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
123 * paper, pages 8-10.
124 *
125 * <p>By default, runs as a mapreduce job where each mapper runs a single test
126 * client. Can also run as a non-mapreduce, multithreaded application by
127 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
128 * specified otherwise.
129 */
130@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
131public class PerformanceEvaluation extends Configured implements Tool {
132  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
133  static final String RANDOM_READ = "randomRead";
134  static final String PE_COMMAND_SHORTNAME = "pe";
135  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
136  private static final Gson GSON = GsonUtil.createGson().create();
137
138  public static final String TABLE_NAME = "TestTable";
139  public static final String FAMILY_NAME_BASE = "info";
140  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
141  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
142  public static final int DEFAULT_VALUE_LENGTH = 1000;
143  public static final int ROW_LENGTH = 26;
144
145  private static final int ONE_GB = 1024 * 1024 * 1000;
146  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
147  // TODO : should we make this configurable
148  private static final int TAG_LENGTH = 256;
149  private static final DecimalFormat FMT = new DecimalFormat("0.##");
150  private static final MathContext CXT = MathContext.DECIMAL64;
151  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
152  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
153  private static final TestOptions DEFAULT_OPTS = new TestOptions();
154
155  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
156  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
157
158  static {
159    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
160        "Run async random read test");
161    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
162        "Run async random write test");
163    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
164        "Run async sequential read test");
165    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
166        "Run async sequential write test");
167    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
168        "Run async scan test (read every row)");
169    addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
170      "Run random read test");
171    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
172      "Run random seek and scan 100 test");
173    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
174      "Run random seek scan with both start and stop row (max 10 rows)");
175    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
176      "Run random seek scan with both start and stop row (max 100 rows)");
177    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
178      "Run random seek scan with both start and stop row (max 1000 rows)");
179    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
180      "Run random seek scan with both start and stop row (max 10000 rows)");
181    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
182      "Run random write test");
183    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
184      "Run sequential read test");
185    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
186      "Run sequential write test");
187    addCommandDescriptor(ScanTest.class, "scan",
188      "Run scan test (read every row)");
189    addCommandDescriptor(FilteredScanTest.class, "filterScan",
190      "Run scan test using a filter to find a specific row based on it's value " +
191      "(make sure to use --rows=20)");
192    addCommandDescriptor(IncrementTest.class, "increment",
193      "Increment on each row; clients overlap on keyspace so some concurrent operations");
194    addCommandDescriptor(AppendTest.class, "append",
195      "Append on each row; clients overlap on keyspace so some concurrent operations");
196    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
197      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
198    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
199      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
200    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
201      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
202  }
203
204  /**
205   * Enum for map metrics.  Keep it out here rather than inside in the Map
206   * inner-class so we can find associated properties.
207   */
208  protected static enum Counter {
209    /** elapsed time */
210    ELAPSED_TIME,
211    /** number of rows */
212    ROWS
213  }
214
215  protected static class RunResult implements Comparable<RunResult> {
216    public RunResult(long duration, Histogram hist) {
217      this.duration = duration;
218      this.hist = hist;
219    }
220
221    public final long duration;
222    public final Histogram hist;
223
224    @Override
225    public String toString() {
226      return Long.toString(duration);
227    }
228
229    @Override public int compareTo(RunResult o) {
230      return Long.compare(this.duration, o.duration);
231    }
232  }
233
234  /**
235   * Constructor
236   * @param conf Configuration object
237   */
238  public PerformanceEvaluation(final Configuration conf) {
239    super(conf);
240  }
241
242  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
243      String name, String description) {
244    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
245    COMMANDS.put(name, cmdDescriptor);
246  }
247
248  /**
249   * Implementations can have their status set.
250   */
251  interface Status {
252    /**
253     * Sets status
254     * @param msg status message
255     * @throws IOException
256     */
257    void setStatus(final String msg) throws IOException;
258  }
259
260  /**
261   * MapReduce job that runs a performance evaluation client in each map task.
262   */
263  public static class EvaluationMapTask
264      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
265
266    /** configuration parameter name that contains the command */
267    public final static String CMD_KEY = "EvaluationMapTask.command";
268    /** configuration parameter name that contains the PE impl */
269    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
270
271    private Class<? extends Test> cmd;
272
273    @Override
274    protected void setup(Context context) throws IOException, InterruptedException {
275      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
276
277      // this is required so that extensions of PE are instantiated within the
278      // map reduce task...
279      Class<? extends PerformanceEvaluation> peClass =
280          forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
281      try {
282        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
283      } catch (Exception e) {
284        throw new IllegalStateException("Could not instantiate PE instance", e);
285      }
286    }
287
288    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
289      try {
290        return Class.forName(className).asSubclass(type);
291      } catch (ClassNotFoundException e) {
292        throw new IllegalStateException("Could not find class for name: " + className, e);
293      }
294    }
295
296    @Override
297    protected void map(LongWritable key, Text value, final Context context)
298           throws IOException, InterruptedException {
299
300      Status status = new Status() {
301        @Override
302        public void setStatus(String msg) {
303           context.setStatus(msg);
304        }
305      };
306
307      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
308      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
309      final Connection con = ConnectionFactory.createConnection(conf);
310      AsyncConnection asyncCon = null;
311      try {
312        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
313      } catch (ExecutionException e) {
314        throw new IOException(e);
315      }
316
317      // Evaluation task
318      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
319      // Collect how much time the thing took. Report as map output and
320      // to the ELAPSED_TIME counter.
321      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
322      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
323      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
324      context.progress();
325    }
326  }
327
328  /*
329   * If table does not already exist, create. Also create a table when
330   * {@code opts.presplitRegions} is specified or when the existing table's
331   * region replica count doesn't match {@code opts.replicas}.
332   */
333  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
334    TableName tableName = TableName.valueOf(opts.tableName);
335    boolean needsDelete = false, exists = admin.tableExists(tableName);
336    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
337      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
338    if (!exists && isReadCmd) {
339      throw new IllegalStateException(
340        "Must specify an existing table for read commands. Run a write command first.");
341    }
342    HTableDescriptor desc =
343      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
344    byte[][] splits = getSplits(opts);
345
346    // recreate the table when user has requested presplit or when existing
347    // {RegionSplitPolicy,replica count} does not match requested, or when the
348    // number of column families does not match requested.
349    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
350      || (!isReadCmd && desc != null &&
351          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
352      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
353      || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
354      needsDelete = true;
355      // wait, why did it delete my table?!?
356      LOG.debug(MoreObjects.toStringHelper("needsDelete")
357        .add("needsDelete", needsDelete)
358        .add("isReadCmd", isReadCmd)
359        .add("exists", exists)
360        .add("desc", desc)
361        .add("presplit", opts.presplitRegions)
362        .add("splitPolicy", opts.splitPolicy)
363        .add("replicas", opts.replicas)
364        .add("families", opts.families)
365        .toString());
366    }
367
368    // remove an existing table
369    if (needsDelete) {
370      if (admin.isTableEnabled(tableName)) {
371        admin.disableTable(tableName);
372      }
373      admin.deleteTable(tableName);
374    }
375
376    // table creation is necessary
377    if (!exists || needsDelete) {
378      desc = getTableDescriptor(opts);
379      if (splits != null) {
380        if (LOG.isDebugEnabled()) {
381          for (int i = 0; i < splits.length; i++) {
382            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
383          }
384        }
385      }
386      admin.createTable(desc, splits);
387      LOG.info("Table " + desc + " created");
388    }
389    return admin.tableExists(tableName);
390  }
391
392  /**
393   * Create an HTableDescriptor from provided TestOptions.
394   */
395  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
396    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
397    for (int family = 0; family < opts.families; family++) {
398      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
399      HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
400      familyDesc.setDataBlockEncoding(opts.blockEncoding);
401      familyDesc.setCompressionType(opts.compression);
402      familyDesc.setBloomFilterType(opts.bloomType);
403      familyDesc.setBlocksize(opts.blockSize);
404      if (opts.inMemoryCF) {
405        familyDesc.setInMemory(true);
406      }
407      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
408      tableDesc.addFamily(familyDesc);
409    }
410    if (opts.replicas != DEFAULT_OPTS.replicas) {
411      tableDesc.setRegionReplication(opts.replicas);
412    }
413    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
414      tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
415    }
416    return tableDesc;
417  }
418
419  /**
420   * generates splits based on total number of rows and specified split regions
421   */
422  protected static byte[][] getSplits(TestOptions opts) {
423    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
424      return null;
425
426    int numSplitPoints = opts.presplitRegions - 1;
427    byte[][] splits = new byte[numSplitPoints][];
428    int jump = opts.totalRows / opts.presplitRegions;
429    for (int i = 0; i < numSplitPoints; i++) {
430      int rowkey = jump * (1 + i);
431      splits[i] = format(rowkey);
432    }
433    return splits;
434  }
435
436  static void setupConnectionCount(final TestOptions opts) {
437    if (opts.oneCon) {
438      opts.connCount = 1;
439    } else {
440      if (opts.connCount == -1) {
441        // set to thread number if connCount is not set
442        opts.connCount = opts.numClientThreads;
443      }
444    }
445  }
446
447  /*
448   * Run all clients in this vm each to its own thread.
449   */
450  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
451      throws IOException, InterruptedException, ExecutionException {
452    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
453    assert cmd != null;
454    @SuppressWarnings("unchecked")
455    Future<RunResult>[] threads = new Future[opts.numClientThreads];
456    RunResult[] results = new RunResult[opts.numClientThreads];
457    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
458      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
459    setupConnectionCount(opts);
460    final Connection[] cons = new Connection[opts.connCount];
461    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
462    for (int i = 0; i < opts.connCount; i++) {
463      cons[i] = ConnectionFactory.createConnection(conf);
464      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
465    }
466    LOG.info("Created " + opts.connCount + " connections for " +
467        opts.numClientThreads + " threads");
468    for (int i = 0; i < threads.length; i++) {
469      final int index = i;
470      threads[i] = pool.submit(new Callable<RunResult>() {
471        @Override
472        public RunResult call() throws Exception {
473          TestOptions threadOpts = new TestOptions(opts);
474          final Connection con = cons[index % cons.length];
475          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
476          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
477          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
478            @Override
479            public void setStatus(final String msg) throws IOException {
480              LOG.info(msg);
481            }
482          });
483          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
484            "ms over " + threadOpts.perClientRunRows + " rows");
485          return run;
486        }
487      });
488    }
489    pool.shutdown();
490
491    for (int i = 0; i < threads.length; i++) {
492      try {
493        results[i] = threads[i].get();
494      } catch (ExecutionException e) {
495        throw new IOException(e.getCause());
496      }
497    }
498    final String test = cmd.getSimpleName();
499    LOG.info("[" + test + "] Summary of timings (ms): "
500             + Arrays.toString(results));
501    Arrays.sort(results);
502    long total = 0;
503    float avgLatency = 0 ;
504    float avgTPS = 0;
505    for (RunResult result : results) {
506      total += result.duration;
507      avgLatency += result.hist.getSnapshot().getMean();
508      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
509    }
510    avgTPS *= 1000; // ms to second
511    avgLatency = avgLatency / results.length;
512    LOG.info("[" + test + " duration ]"
513      + "\tMin: " + results[0] + "ms"
514      + "\tMax: " + results[results.length - 1] + "ms"
515      + "\tAvg: " + (total / results.length) + "ms");
516    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
517    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
518    for (int i = 0; i < opts.connCount; i++) {
519      cons[i].close();
520      asyncCons[i].close();
521    }
522
523
524    return results;
525  }
526
527  /*
528   * Run a mapreduce job.  Run as many maps as asked-for clients.
529   * Before we start up the job, write out an input file with instruction
530   * per client regards which row they are to start on.
531   * @param cmd Command to run.
532   * @throws IOException
533   */
534  static Job doMapReduce(TestOptions opts, final Configuration conf)
535      throws IOException, InterruptedException, ClassNotFoundException {
536    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
537    assert cmd != null;
538    Path inputDir = writeInputFile(conf, opts);
539    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
540    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
541    Job job = Job.getInstance(conf);
542    job.setJarByClass(PerformanceEvaluation.class);
543    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
544
545    job.setInputFormatClass(NLineInputFormat.class);
546    NLineInputFormat.setInputPaths(job, inputDir);
547    // this is default, but be explicit about it just in case.
548    NLineInputFormat.setNumLinesPerSplit(job, 1);
549
550    job.setOutputKeyClass(LongWritable.class);
551    job.setOutputValueClass(LongWritable.class);
552
553    job.setMapperClass(EvaluationMapTask.class);
554    job.setReducerClass(LongSumReducer.class);
555
556    job.setNumReduceTasks(1);
557
558    job.setOutputFormatClass(TextOutputFormat.class);
559    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
560
561    TableMapReduceUtil.addDependencyJars(job);
562    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
563      Histogram.class,     // yammer metrics
564      Gson.class,  // gson
565      FilterAllFilter.class // hbase-server tests jar
566      );
567
568    TableMapReduceUtil.initCredentials(job);
569
570    job.waitForCompletion(true);
571    return job;
572  }
573
574  /**
575   * Each client has one mapper to do the work,  and client do the resulting count in a map task.
576   */
577
578  static String JOB_INPUT_FILENAME = "input.txt";
579
580  /*
581   * Write input file of offsets-per-client for the mapreduce job.
582   * @param c Configuration
583   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
584   * @throws IOException
585   */
586  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
587    return writeInputFile(c, opts, new Path("."));
588  }
589
590  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
591  throws IOException {
592    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
593    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
594    Path inputDir = new Path(jobdir, "inputs");
595
596    FileSystem fs = FileSystem.get(c);
597    fs.mkdirs(inputDir);
598
599    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
600    PrintStream out = new PrintStream(fs.create(inputFile));
601    // Make input random.
602    Map<Integer, String> m = new TreeMap<>();
603    Hash h = MurmurHash.getInstance();
604    int perClientRows = (opts.totalRows / opts.numClientThreads);
605    try {
606      for (int j = 0; j < opts.numClientThreads; j++) {
607        TestOptions next = new TestOptions(opts);
608        next.startRow = j * perClientRows;
609        next.perClientRunRows = perClientRows;
610        String s = GSON.toJson(next);
611        LOG.info("Client=" + j + ", input=" + s);
612        byte[] b = Bytes.toBytes(s);
613        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
614        m.put(hash, s);
615      }
616      for (Map.Entry<Integer, String> e: m.entrySet()) {
617        out.println(e.getValue());
618      }
619    } finally {
620      out.close();
621    }
622    return inputDir;
623  }
624
625  /**
626   * Describes a command.
627   */
628  static class CmdDescriptor {
629    private Class<? extends TestBase> cmdClass;
630    private String name;
631    private String description;
632
633    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
634      this.cmdClass = cmdClass;
635      this.name = name;
636      this.description = description;
637    }
638
639    public Class<? extends TestBase> getCmdClass() {
640      return cmdClass;
641    }
642
643    public String getName() {
644      return name;
645    }
646
647    public String getDescription() {
648      return description;
649    }
650  }
651
652  /**
653   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
654   * This makes tracking all these arguments a little easier.
655   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
656   * serialization of this TestOptions class behave), and you need to add to the clone constructor
657   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
658   */
659  static class TestOptions {
660    String cmdName = null;
661    boolean nomapred = false;
662    boolean filterAll = false;
663    int startRow = 0;
664    float size = 1.0f;
665    int perClientRunRows = DEFAULT_ROWS_PER_GB;
666    int numClientThreads = 1;
667    int totalRows = DEFAULT_ROWS_PER_GB;
668    int measureAfter = 0;
669    float sampleRate = 1.0f;
670    double traceRate = 0.0;
671    String tableName = TABLE_NAME;
672    boolean flushCommits = true;
673    boolean writeToWAL = true;
674    boolean autoFlush = false;
675    boolean oneCon = false;
676    int connCount = -1; //wil decide the actual num later
677    boolean useTags = false;
678    int noOfTags = 1;
679    boolean reportLatency = false;
680    int multiGet = 0;
681    int multiPut = 0;
682    int randomSleep = 0;
683    boolean inMemoryCF = false;
684    int presplitRegions = 0;
685    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
686    String splitPolicy = null;
687    Compression.Algorithm compression = Compression.Algorithm.NONE;
688    BloomType bloomType = BloomType.ROW;
689    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
690    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
691    boolean valueRandom = false;
692    boolean valueZipf = false;
693    int valueSize = DEFAULT_VALUE_LENGTH;
694    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
695    int cycles = 1;
696    int columns = 1;
697    int families = 1;
698    int caching = 30;
699    boolean addColumns = true;
700    MemoryCompactionPolicy inMemoryCompaction =
701        MemoryCompactionPolicy.valueOf(
702            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
703    boolean asyncPrefetch = false;
704    boolean cacheBlocks = true;
705    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
706    long bufferSize = 2l * 1024l * 1024l;
707
708    public TestOptions() {}
709
710    /**
711     * Clone constructor.
712     * @param that Object to copy from.
713     */
714    public TestOptions(TestOptions that) {
715      this.cmdName = that.cmdName;
716      this.cycles = that.cycles;
717      this.nomapred = that.nomapred;
718      this.startRow = that.startRow;
719      this.size = that.size;
720      this.perClientRunRows = that.perClientRunRows;
721      this.numClientThreads = that.numClientThreads;
722      this.totalRows = that.totalRows;
723      this.sampleRate = that.sampleRate;
724      this.traceRate = that.traceRate;
725      this.tableName = that.tableName;
726      this.flushCommits = that.flushCommits;
727      this.writeToWAL = that.writeToWAL;
728      this.autoFlush = that.autoFlush;
729      this.oneCon = that.oneCon;
730      this.connCount = that.connCount;
731      this.useTags = that.useTags;
732      this.noOfTags = that.noOfTags;
733      this.reportLatency = that.reportLatency;
734      this.multiGet = that.multiGet;
735      this.multiPut = that.multiPut;
736      this.inMemoryCF = that.inMemoryCF;
737      this.presplitRegions = that.presplitRegions;
738      this.replicas = that.replicas;
739      this.splitPolicy = that.splitPolicy;
740      this.compression = that.compression;
741      this.blockEncoding = that.blockEncoding;
742      this.filterAll = that.filterAll;
743      this.bloomType = that.bloomType;
744      this.blockSize = that.blockSize;
745      this.valueRandom = that.valueRandom;
746      this.valueZipf = that.valueZipf;
747      this.valueSize = that.valueSize;
748      this.period = that.period;
749      this.randomSleep = that.randomSleep;
750      this.measureAfter = that.measureAfter;
751      this.addColumns = that.addColumns;
752      this.columns = that.columns;
753      this.families = that.families;
754      this.caching = that.caching;
755      this.inMemoryCompaction = that.inMemoryCompaction;
756      this.asyncPrefetch = that.asyncPrefetch;
757      this.cacheBlocks = that.cacheBlocks;
758      this.scanReadType = that.scanReadType;
759      this.bufferSize = that.bufferSize;
760    }
761
762    public int getCaching() {
763      return this.caching;
764    }
765
766    public void setCaching(final int caching) {
767      this.caching = caching;
768    }
769
770    public int getColumns() {
771      return this.columns;
772    }
773
774    public void setColumns(final int columns) {
775      this.columns = columns;
776    }
777
778    public int getFamilies() {
779      return this.families;
780    }
781
782    public void setFamilies(final int families) {
783      this.families = families;
784    }
785
786    public int getCycles() {
787      return this.cycles;
788    }
789
790    public void setCycles(final int cycles) {
791      this.cycles = cycles;
792    }
793
794    public boolean isValueZipf() {
795      return valueZipf;
796    }
797
798    public void setValueZipf(boolean valueZipf) {
799      this.valueZipf = valueZipf;
800    }
801
802    public String getCmdName() {
803      return cmdName;
804    }
805
806    public void setCmdName(String cmdName) {
807      this.cmdName = cmdName;
808    }
809
810    public int getRandomSleep() {
811      return randomSleep;
812    }
813
814    public void setRandomSleep(int randomSleep) {
815      this.randomSleep = randomSleep;
816    }
817
818    public int getReplicas() {
819      return replicas;
820    }
821
822    public void setReplicas(int replicas) {
823      this.replicas = replicas;
824    }
825
826    public String getSplitPolicy() {
827      return splitPolicy;
828    }
829
830    public void setSplitPolicy(String splitPolicy) {
831      this.splitPolicy = splitPolicy;
832    }
833
834    public void setNomapred(boolean nomapred) {
835      this.nomapred = nomapred;
836    }
837
838    public void setFilterAll(boolean filterAll) {
839      this.filterAll = filterAll;
840    }
841
842    public void setStartRow(int startRow) {
843      this.startRow = startRow;
844    }
845
846    public void setSize(float size) {
847      this.size = size;
848    }
849
850    public void setPerClientRunRows(int perClientRunRows) {
851      this.perClientRunRows = perClientRunRows;
852    }
853
854    public void setNumClientThreads(int numClientThreads) {
855      this.numClientThreads = numClientThreads;
856    }
857
858    public void setTotalRows(int totalRows) {
859      this.totalRows = totalRows;
860    }
861
862    public void setSampleRate(float sampleRate) {
863      this.sampleRate = sampleRate;
864    }
865
866    public void setTraceRate(double traceRate) {
867      this.traceRate = traceRate;
868    }
869
870    public void setTableName(String tableName) {
871      this.tableName = tableName;
872    }
873
874    public void setFlushCommits(boolean flushCommits) {
875      this.flushCommits = flushCommits;
876    }
877
878    public void setWriteToWAL(boolean writeToWAL) {
879      this.writeToWAL = writeToWAL;
880    }
881
882    public void setAutoFlush(boolean autoFlush) {
883      this.autoFlush = autoFlush;
884    }
885
886    public void setOneCon(boolean oneCon) {
887      this.oneCon = oneCon;
888    }
889
890    public int getConnCount() {
891      return connCount;
892    }
893
894    public void setConnCount(int connCount) {
895      this.connCount = connCount;
896    }
897
898    public void setUseTags(boolean useTags) {
899      this.useTags = useTags;
900    }
901
902    public void setNoOfTags(int noOfTags) {
903      this.noOfTags = noOfTags;
904    }
905
906    public void setReportLatency(boolean reportLatency) {
907      this.reportLatency = reportLatency;
908    }
909
910    public void setMultiGet(int multiGet) {
911      this.multiGet = multiGet;
912    }
913
914    public void setMultiPut(int multiPut) {
915      this.multiPut = multiPut;
916    }
917
918    public void setInMemoryCF(boolean inMemoryCF) {
919      this.inMemoryCF = inMemoryCF;
920    }
921
922    public void setPresplitRegions(int presplitRegions) {
923      this.presplitRegions = presplitRegions;
924    }
925
926    public void setCompression(Compression.Algorithm compression) {
927      this.compression = compression;
928    }
929
930    public void setBloomType(BloomType bloomType) {
931      this.bloomType = bloomType;
932    }
933
934    public void setBlockSize(int blockSize) {
935      this.blockSize = blockSize;
936    }
937
938    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
939      this.blockEncoding = blockEncoding;
940    }
941
942    public void setValueRandom(boolean valueRandom) {
943      this.valueRandom = valueRandom;
944    }
945
946    public void setValueSize(int valueSize) {
947      this.valueSize = valueSize;
948    }
949
950    public void setBufferSize(long bufferSize) {
951      this.bufferSize = bufferSize;
952    }
953
954    public void setPeriod(int period) {
955      this.period = period;
956    }
957
958    public boolean isNomapred() {
959      return nomapred;
960    }
961
962    public boolean isFilterAll() {
963      return filterAll;
964    }
965
966    public int getStartRow() {
967      return startRow;
968    }
969
970    public float getSize() {
971      return size;
972    }
973
974    public int getPerClientRunRows() {
975      return perClientRunRows;
976    }
977
978    public int getNumClientThreads() {
979      return numClientThreads;
980    }
981
982    public int getTotalRows() {
983      return totalRows;
984    }
985
986    public float getSampleRate() {
987      return sampleRate;
988    }
989
990    public double getTraceRate() {
991      return traceRate;
992    }
993
994    public String getTableName() {
995      return tableName;
996    }
997
998    public boolean isFlushCommits() {
999      return flushCommits;
1000    }
1001
1002    public boolean isWriteToWAL() {
1003      return writeToWAL;
1004    }
1005
1006    public boolean isAutoFlush() {
1007      return autoFlush;
1008    }
1009
1010    public boolean isUseTags() {
1011      return useTags;
1012    }
1013
1014    public int getNoOfTags() {
1015      return noOfTags;
1016    }
1017
1018    public boolean isReportLatency() {
1019      return reportLatency;
1020    }
1021
1022    public int getMultiGet() {
1023      return multiGet;
1024    }
1025
1026    public int getMultiPut() {
1027      return multiPut;
1028    }
1029
1030    public boolean isInMemoryCF() {
1031      return inMemoryCF;
1032    }
1033
1034    public int getPresplitRegions() {
1035      return presplitRegions;
1036    }
1037
1038    public Compression.Algorithm getCompression() {
1039      return compression;
1040    }
1041
1042    public DataBlockEncoding getBlockEncoding() {
1043      return blockEncoding;
1044    }
1045
1046    public boolean isValueRandom() {
1047      return valueRandom;
1048    }
1049
1050    public int getValueSize() {
1051      return valueSize;
1052    }
1053
1054    public int getPeriod() {
1055      return period;
1056    }
1057
1058    public BloomType getBloomType() {
1059      return bloomType;
1060    }
1061
1062    public int getBlockSize() {
1063      return blockSize;
1064    }
1065
1066    public boolean isOneCon() {
1067      return oneCon;
1068    }
1069
1070    public int getMeasureAfter() {
1071      return measureAfter;
1072    }
1073
1074    public void setMeasureAfter(int measureAfter) {
1075      this.measureAfter = measureAfter;
1076    }
1077
1078    public boolean getAddColumns() {
1079      return addColumns;
1080    }
1081
1082    public void setAddColumns(boolean addColumns) {
1083      this.addColumns = addColumns;
1084    }
1085
1086    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1087      this.inMemoryCompaction = inMemoryCompaction;
1088    }
1089
1090    public MemoryCompactionPolicy getInMemoryCompaction() {
1091      return this.inMemoryCompaction;
1092    }
1093
1094    public long getBufferSize() {
1095      return this.bufferSize;
1096    }
1097  }
1098
1099  /*
1100   * A test.
1101   * Subclass to particularize what happens per row.
1102   */
1103  static abstract class TestBase {
1104    // Below is make it so when Tests are all running in the one
1105    // jvm, that they each have a differently seeded Random.
1106    private static final Random randomSeed = new Random(System.currentTimeMillis());
1107
1108    private static long nextRandomSeed() {
1109      return randomSeed.nextLong();
1110    }
1111    private final int everyN;
1112
1113    protected final Random rand = new Random(nextRandomSeed());
1114    protected final Configuration conf;
1115    protected final TestOptions opts;
1116
1117    private final Status status;
1118    private final Sampler traceSampler;
1119    private final SpanReceiverHost receiverHost;
1120
1121    private String testName;
1122    private Histogram latencyHistogram;
1123    private Histogram valueSizeHistogram;
1124    private Histogram rpcCallsHistogram;
1125    private Histogram remoteRpcCallsHistogram;
1126    private Histogram millisBetweenNextHistogram;
1127    private Histogram regionsScannedHistogram;
1128    private Histogram bytesInResultsHistogram;
1129    private Histogram bytesInRemoteResultsHistogram;
1130    private RandomDistribution.Zipf zipf;
1131
1132    /**
1133     * Note that all subclasses of this class must provide a public constructor
1134     * that has the exact same list of arguments.
1135     */
1136    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1137      this.conf = conf;
1138      this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1139      this.opts = options;
1140      this.status = status;
1141      this.testName = this.getClass().getSimpleName();
1142      if (options.traceRate >= 1.0) {
1143        this.traceSampler = Sampler.ALWAYS;
1144      } else if (options.traceRate > 0.0) {
1145        conf.setDouble("hbase.sampler.fraction", options.traceRate);
1146        this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1147      } else {
1148        this.traceSampler = Sampler.NEVER;
1149      }
1150      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1151      if (options.isValueZipf()) {
1152        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1153      }
1154      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1155    }
1156
1157    int getValueLength(final Random r) {
1158      if (this.opts.isValueRandom()) {
1159        return r.nextInt(opts.valueSize);
1160      } else if (this.opts.isValueZipf()) {
1161        return Math.abs(this.zipf.nextInt());
1162      } else {
1163        return opts.valueSize;
1164      }
1165    }
1166
1167    void updateValueSize(final Result [] rs) throws IOException {
1168      if (rs == null || !isRandomValueSize()) return;
1169      for (Result r: rs) updateValueSize(r);
1170    }
1171
1172    void updateValueSize(final Result r) throws IOException {
1173      if (r == null || !isRandomValueSize()) return;
1174      int size = 0;
1175      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1176        size += scanner.current().getValueLength();
1177      }
1178      updateValueSize(size);
1179    }
1180
1181    void updateValueSize(final int valueSize) {
1182      if (!isRandomValueSize()) return;
1183      this.valueSizeHistogram.update(valueSize);
1184    }
1185
1186    void updateScanMetrics(final ScanMetrics metrics) {
1187      if (metrics == null) return;
1188      Map<String,Long> metricsMap = metrics.getMetricsMap();
1189      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1190      if (rpcCalls != null) {
1191        this.rpcCallsHistogram.update(rpcCalls.longValue());
1192      }
1193      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1194      if (remoteRpcCalls != null) {
1195        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1196      }
1197      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1198      if (millisBetweenNext != null) {
1199        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1200      }
1201      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1202      if (regionsScanned != null) {
1203        this.regionsScannedHistogram.update(regionsScanned.longValue());
1204      }
1205      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1206      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1207        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1208      }
1209      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1210      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1211        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1212      }
1213    }
1214
1215    String generateStatus(final int sr, final int i, final int lr) {
1216      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1217        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1218    }
1219
1220    boolean isRandomValueSize() {
1221      return opts.valueRandom;
1222    }
1223
1224    protected int getReportingPeriod() {
1225      return opts.period;
1226    }
1227
1228    /**
1229     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1230     */
1231    public Histogram getLatencyHistogram() {
1232      return latencyHistogram;
1233    }
1234
1235    void testSetup() throws IOException {
1236      // test metrics
1237      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1238      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1239      // scan metrics
1240      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1241      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1242      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1243      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1244      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1245      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1246
1247      onStartup();
1248    }
1249
1250    abstract void onStartup() throws IOException;
1251
1252    void testTakedown() throws IOException {
1253      onTakedown();
1254      // Print all stats for this thread continuously.
1255      // Synchronize on Test.class so different threads don't intermingle the
1256      // output. We can't use 'this' here because each thread has its own instance of Test class.
1257      synchronized (Test.class) {
1258        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1259        status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1260            latencyHistogram));
1261        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1262        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1263        if (valueSizeHistogram.getCount() > 0) {
1264          status.setStatus("ValueSize (bytes) : "
1265              + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1266          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1267          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1268        } else {
1269          status.setStatus("No valueSize statistics available");
1270        }
1271        if (rpcCallsHistogram.getCount() > 0) {
1272          status.setStatus("rpcCalls (count): " +
1273              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1274        }
1275        if (remoteRpcCallsHistogram.getCount() > 0) {
1276          status.setStatus("remoteRpcCalls (count): " +
1277              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1278        }
1279        if (millisBetweenNextHistogram.getCount() > 0) {
1280          status.setStatus("millisBetweenNext (latency): " +
1281              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1282        }
1283        if (regionsScannedHistogram.getCount() > 0) {
1284          status.setStatus("regionsScanned (count): " +
1285              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1286        }
1287        if (bytesInResultsHistogram.getCount() > 0) {
1288          status.setStatus("bytesInResults (size): " +
1289              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1290        }
1291        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1292          status.setStatus("bytesInRemoteResults (size): " +
1293              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1294        }
1295      }
1296      receiverHost.closeReceivers();
1297    }
1298
1299    abstract void onTakedown() throws IOException;
1300
1301
1302    /*
1303     * Run test
1304     * @return Elapsed time.
1305     * @throws IOException
1306     */
1307    long test() throws IOException, InterruptedException {
1308      testSetup();
1309      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1310      final long startTime = System.nanoTime();
1311      try {
1312        testTimed();
1313      } finally {
1314        testTakedown();
1315      }
1316      return (System.nanoTime() - startTime) / 1000000;
1317    }
1318
1319    int getStartRow() {
1320      return opts.startRow;
1321    }
1322
1323    int getLastRow() {
1324      return getStartRow() + opts.perClientRunRows;
1325    }
1326
1327    /**
1328     * Provides an extension point for tests that don't want a per row invocation.
1329     */
1330    void testTimed() throws IOException, InterruptedException {
1331      int startRow = getStartRow();
1332      int lastRow = getLastRow();
1333      TraceUtil.addSampler(traceSampler);
1334      // Report on completion of 1/10th of total.
1335      for (int ii = 0; ii < opts.cycles; ii++) {
1336        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1337        for (int i = startRow; i < lastRow; i++) {
1338          if (i % everyN != 0) continue;
1339          long startTime = System.nanoTime();
1340          boolean requestSent = false;
1341          try (TraceScope scope = TraceUtil.createTrace("test row");){
1342            requestSent = testRow(i);
1343          }
1344          if ( (i - startRow) > opts.measureAfter) {
1345            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1346            // first 9 times and sends the actual get request in the 10th iteration.
1347            // We should only set latency when actual request is sent because otherwise
1348            // it turns out to be 0.
1349            if (requestSent) {
1350              latencyHistogram.update((System.nanoTime() - startTime) / 1000);
1351            }
1352            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1353              status.setStatus(generateStatus(startRow, i, lastRow));
1354            }
1355          }
1356        }
1357      }
1358    }
1359
1360    /**
1361     * @return Subset of the histograms' calculation.
1362     */
1363    public String getShortLatencyReport() {
1364      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1365    }
1366
1367    /**
1368     * @return Subset of the histograms' calculation.
1369     */
1370    public String getShortValueSizeReport() {
1371      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1372    }
1373
1374
1375    /**
1376     * Test for individual row.
1377     * @param i Row index.
1378     * @return true if the row was sent to server and need to record metrics.
1379     *         False if not, multiGet and multiPut e.g., the rows are sent
1380     *         to server only if enough gets/puts are gathered.
1381     */
1382    abstract boolean testRow(final int i) throws IOException, InterruptedException;
1383  }
1384
1385  static abstract class Test extends TestBase {
1386    protected Connection connection;
1387
1388    Test(final Connection con, final TestOptions options, final Status status) {
1389      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1390      this.connection = con;
1391    }
1392  }
1393
1394  static abstract class AsyncTest extends TestBase {
1395    protected AsyncConnection connection;
1396
1397    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1398      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1399      this.connection = con;
1400    }
1401  }
1402
1403  static abstract class TableTest extends Test {
1404    protected Table table;
1405
1406    TableTest(Connection con, TestOptions options, Status status) {
1407      super(con, options, status);
1408    }
1409
1410    @Override
1411    void onStartup() throws IOException {
1412      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1413    }
1414
1415    @Override
1416    void onTakedown() throws IOException {
1417      table.close();
1418    }
1419  }
1420
1421  static abstract class AsyncTableTest extends AsyncTest {
1422    protected AsyncTable<?> table;
1423
1424    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1425      super(con, options, status);
1426    }
1427
1428    @Override
1429    void onStartup() throws IOException {
1430      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1431    }
1432
1433    @Override
1434    void onTakedown() throws IOException {
1435    }
1436  }
1437
1438  static class AsyncRandomReadTest extends AsyncTableTest {
1439    private final Consistency consistency;
1440    private ArrayList<Get> gets;
1441    private Random rd = new Random();
1442
1443    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1444      super(con, options, status);
1445      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1446      if (opts.multiGet > 0) {
1447        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1448        this.gets = new ArrayList<>(opts.multiGet);
1449      }
1450    }
1451
1452    @Override
1453    boolean testRow(final int i) throws IOException, InterruptedException {
1454      if (opts.randomSleep > 0) {
1455        Thread.sleep(rd.nextInt(opts.randomSleep));
1456      }
1457      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1458      for (int family = 0; family < opts.families; family++) {
1459        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1460        if (opts.addColumns) {
1461          for (int column = 0; column < opts.columns; column++) {
1462            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1463            get.addColumn(familyName, qualifier);
1464          }
1465        } else {
1466          get.addFamily(familyName);
1467        }
1468      }
1469      if (opts.filterAll) {
1470        get.setFilter(new FilterAllFilter());
1471      }
1472      get.setConsistency(consistency);
1473      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1474      try {
1475        if (opts.multiGet > 0) {
1476          this.gets.add(get);
1477          if (this.gets.size() == opts.multiGet) {
1478            Result[] rs =
1479                this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1480            updateValueSize(rs);
1481            this.gets.clear();
1482          } else {
1483            return false;
1484          }
1485        } else {
1486          updateValueSize(this.table.get(get).get());
1487        }
1488      } catch (ExecutionException e) {
1489        throw new IOException(e);
1490      }
1491      return true;
1492    }
1493
1494    public static RuntimeException runtime(Throwable e) {
1495      if (e instanceof RuntimeException) {
1496        return (RuntimeException) e;
1497      }
1498      return new RuntimeException(e);
1499    }
1500
1501    public static <V> V propagate(Callable<V> callable) {
1502      try {
1503        return callable.call();
1504      } catch (Exception e) {
1505        throw runtime(e);
1506      }
1507    }
1508
1509    @Override
1510    protected int getReportingPeriod() {
1511      int period = opts.perClientRunRows / 10;
1512      return period == 0 ? opts.perClientRunRows : period;
1513    }
1514
1515    @Override
1516    protected void testTakedown() throws IOException {
1517      if (this.gets != null && this.gets.size() > 0) {
1518        this.table.get(gets);
1519        this.gets.clear();
1520      }
1521      super.testTakedown();
1522    }
1523  }
1524
1525  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1526
1527    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1528      super(con, options, status);
1529    }
1530
1531    @Override
1532    protected byte[] generateRow(final int i) {
1533      return getRandomRow(this.rand, opts.totalRows);
1534    }
1535  }
1536
1537  static class AsyncScanTest extends AsyncTableTest {
1538    private ResultScanner testScanner;
1539    private AsyncTable<?> asyncTable;
1540
1541    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1542      super(con, options, status);
1543    }
1544
1545    @Override
1546    void onStartup() throws IOException {
1547      this.asyncTable =
1548          connection.getTable(TableName.valueOf(opts.tableName),
1549            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1550    }
1551
1552    @Override
1553    void testTakedown() throws IOException {
1554      if (this.testScanner != null) {
1555        updateScanMetrics(this.testScanner.getScanMetrics());
1556        this.testScanner.close();
1557      }
1558      super.testTakedown();
1559    }
1560
1561    @Override
1562    boolean testRow(final int i) throws IOException {
1563      if (this.testScanner == null) {
1564        Scan scan =
1565            new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1566                .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1567                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1568        for (int family = 0; family < opts.families; family++) {
1569          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1570          if (opts.addColumns) {
1571            for (int column = 0; column < opts.columns; column++) {
1572              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1573              scan.addColumn(familyName, qualifier);
1574            }
1575          } else {
1576            scan.addFamily(familyName);
1577          }
1578        }
1579        if (opts.filterAll) {
1580          scan.setFilter(new FilterAllFilter());
1581        }
1582        this.testScanner = asyncTable.getScanner(scan);
1583      }
1584      Result r = testScanner.next();
1585      updateValueSize(r);
1586      return true;
1587    }
1588  }
1589
1590  static class AsyncSequentialReadTest extends AsyncTableTest {
1591    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1592      super(con, options, status);
1593    }
1594
1595    @Override
1596    boolean testRow(final int i) throws IOException, InterruptedException {
1597      Get get = new Get(format(i));
1598      for (int family = 0; family < opts.families; family++) {
1599        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1600        if (opts.addColumns) {
1601          for (int column = 0; column < opts.columns; column++) {
1602            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1603            get.addColumn(familyName, qualifier);
1604          }
1605        } else {
1606          get.addFamily(familyName);
1607        }
1608      }
1609      if (opts.filterAll) {
1610        get.setFilter(new FilterAllFilter());
1611      }
1612      try {
1613        updateValueSize(table.get(get).get());
1614      } catch (ExecutionException e) {
1615        throw new IOException(e);
1616      }
1617      return true;
1618    }
1619  }
1620
1621  static class AsyncSequentialWriteTest extends AsyncTableTest {
1622    private ArrayList<Put> puts;
1623
1624    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1625      super(con, options, status);
1626      if (opts.multiPut > 0) {
1627        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1628        this.puts = new ArrayList<>(opts.multiPut);
1629      }
1630    }
1631
1632    protected byte[] generateRow(final int i) {
1633      return format(i);
1634    }
1635
1636    @Override
1637    @SuppressWarnings("ReturnValueIgnored")
1638    boolean testRow(final int i) throws IOException, InterruptedException {
1639      byte[] row = generateRow(i);
1640      Put put = new Put(row);
1641      for (int family = 0; family < opts.families; family++) {
1642        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1643        for (int column = 0; column < opts.columns; column++) {
1644          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1645          byte[] value = generateData(this.rand, getValueLength(this.rand));
1646          if (opts.useTags) {
1647            byte[] tag = generateData(this.rand, TAG_LENGTH);
1648            Tag[] tags = new Tag[opts.noOfTags];
1649            for (int n = 0; n < opts.noOfTags; n++) {
1650              Tag t = new ArrayBackedTag((byte) n, tag);
1651              tags[n] = t;
1652            }
1653            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1654              value, tags);
1655            put.add(kv);
1656            updateValueSize(kv.getValueLength());
1657          } else {
1658            put.addColumn(familyName, qualifier, value);
1659            updateValueSize(value.length);
1660          }
1661        }
1662      }
1663      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1664      try {
1665        table.put(put).get();
1666        if (opts.multiPut > 0) {
1667          this.puts.add(put);
1668          if (this.puts.size() == opts.multiPut) {
1669            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1670            this.puts.clear();
1671          } else {
1672            return false;
1673          }
1674        } else {
1675          table.put(put).get();
1676        }
1677      } catch (ExecutionException e) {
1678        throw new IOException(e);
1679      }
1680      return true;
1681    }
1682  }
1683
1684  static abstract class BufferedMutatorTest extends Test {
1685    protected BufferedMutator mutator;
1686    protected Table table;
1687
1688    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1689      super(con, options, status);
1690    }
1691
1692    @Override
1693    void onStartup() throws IOException {
1694      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1695      p.writeBufferSize(opts.bufferSize);
1696      this.mutator = connection.getBufferedMutator(p);
1697      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1698    }
1699
1700    @Override
1701    void onTakedown() throws IOException {
1702      mutator.close();
1703      table.close();
1704    }
1705  }
1706
1707  static class RandomSeekScanTest extends TableTest {
1708    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1709      super(con, options, status);
1710    }
1711
1712    @Override
1713    boolean testRow(final int i) throws IOException {
1714      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1715          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1716          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1717          .setScanMetricsEnabled(true);
1718      FilterList list = new FilterList();
1719      for (int family = 0; family < opts.families; family++) {
1720        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1721        if (opts.addColumns) {
1722          for (int column = 0; column < opts.columns; column++) {
1723            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1724            scan.addColumn(familyName, qualifier);
1725          }
1726        } else {
1727          scan.addFamily(familyName);
1728        }
1729      }
1730      if (opts.filterAll) {
1731        list.addFilter(new FilterAllFilter());
1732      }
1733      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1734      scan.setFilter(list);
1735      ResultScanner s = this.table.getScanner(scan);
1736      try {
1737        for (Result rr; (rr = s.next()) != null;) {
1738          updateValueSize(rr);
1739        }
1740      } finally {
1741        updateScanMetrics(s.getScanMetrics());
1742        s.close();
1743      }
1744      return true;
1745    }
1746
1747    @Override
1748    protected int getReportingPeriod() {
1749      int period = opts.perClientRunRows / 100;
1750      return period == 0 ? opts.perClientRunRows : period;
1751    }
1752
1753  }
1754
1755  static abstract class RandomScanWithRangeTest extends TableTest {
1756    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1757      super(con, options, status);
1758    }
1759
1760    @Override
1761    boolean testRow(final int i) throws IOException {
1762      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1763      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1764          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1765          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1766          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1767      for (int family = 0; family < opts.families; family++) {
1768        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1769        if (opts.addColumns) {
1770          for (int column = 0; column < opts.columns; column++) {
1771            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1772            scan.addColumn(familyName, qualifier);
1773          }
1774        } else {
1775          scan.addFamily(familyName);
1776        }
1777      }
1778      if (opts.filterAll) {
1779        scan.setFilter(new FilterAllFilter());
1780      }
1781      Result r = null;
1782      int count = 0;
1783      ResultScanner s = this.table.getScanner(scan);
1784      try {
1785        for (; (r = s.next()) != null;) {
1786          updateValueSize(r);
1787          count++;
1788        }
1789        if (i % 100 == 0) {
1790          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1791            Bytes.toString(startAndStopRow.getFirst()),
1792            Bytes.toString(startAndStopRow.getSecond()), count));
1793        }
1794      } finally {
1795        updateScanMetrics(s.getScanMetrics());
1796        s.close();
1797      }
1798      return true;
1799    }
1800
1801    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1802
1803    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1804      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1805      int stop = start + maxRange;
1806      return new Pair<>(format(start), format(stop));
1807    }
1808
1809    @Override
1810    protected int getReportingPeriod() {
1811      int period = opts.perClientRunRows / 100;
1812      return period == 0? opts.perClientRunRows: period;
1813    }
1814  }
1815
1816  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1817    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1818      super(con, options, status);
1819    }
1820
1821    @Override
1822    protected Pair<byte[], byte[]> getStartAndStopRow() {
1823      return generateStartAndStopRows(10);
1824    }
1825  }
1826
1827  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1828    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1829      super(con, options, status);
1830    }
1831
1832    @Override
1833    protected Pair<byte[], byte[]> getStartAndStopRow() {
1834      return generateStartAndStopRows(100);
1835    }
1836  }
1837
1838  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1839    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1840      super(con, options, status);
1841    }
1842
1843    @Override
1844    protected Pair<byte[], byte[]> getStartAndStopRow() {
1845      return generateStartAndStopRows(1000);
1846    }
1847  }
1848
1849  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1850    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1851      super(con, options, status);
1852    }
1853
1854    @Override
1855    protected Pair<byte[], byte[]> getStartAndStopRow() {
1856      return generateStartAndStopRows(10000);
1857    }
1858  }
1859
1860  static class RandomReadTest extends TableTest {
1861    private final Consistency consistency;
1862    private ArrayList<Get> gets;
1863    private Random rd = new Random();
1864
1865    RandomReadTest(Connection con, TestOptions options, Status status) {
1866      super(con, options, status);
1867      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1868      if (opts.multiGet > 0) {
1869        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1870        this.gets = new ArrayList<>(opts.multiGet);
1871      }
1872    }
1873
1874    @Override
1875    boolean testRow(final int i) throws IOException, InterruptedException {
1876      if (opts.randomSleep > 0) {
1877        Thread.sleep(rd.nextInt(opts.randomSleep));
1878      }
1879      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1880      for (int family = 0; family < opts.families; family++) {
1881        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1882        if (opts.addColumns) {
1883          for (int column = 0; column < opts.columns; column++) {
1884            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1885            get.addColumn(familyName, qualifier);
1886          }
1887        } else {
1888          get.addFamily(familyName);
1889        }
1890      }
1891      if (opts.filterAll) {
1892        get.setFilter(new FilterAllFilter());
1893      }
1894      get.setConsistency(consistency);
1895      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1896      if (opts.multiGet > 0) {
1897        this.gets.add(get);
1898        if (this.gets.size() == opts.multiGet) {
1899          Result [] rs = this.table.get(this.gets);
1900          updateValueSize(rs);
1901          this.gets.clear();
1902        } else {
1903          return false;
1904        }
1905      } else {
1906        updateValueSize(this.table.get(get));
1907      }
1908      return true;
1909    }
1910
1911    @Override
1912    protected int getReportingPeriod() {
1913      int period = opts.perClientRunRows / 10;
1914      return period == 0 ? opts.perClientRunRows : period;
1915    }
1916
1917    @Override
1918    protected void testTakedown() throws IOException {
1919      if (this.gets != null && this.gets.size() > 0) {
1920        this.table.get(gets);
1921        this.gets.clear();
1922      }
1923      super.testTakedown();
1924    }
1925  }
1926
1927  static class RandomWriteTest extends SequentialWriteTest {
1928    RandomWriteTest(Connection con, TestOptions options, Status status) {
1929      super(con, options, status);
1930    }
1931
1932    @Override
1933    protected byte[] generateRow(final int i) {
1934      return getRandomRow(this.rand, opts.totalRows);
1935    }
1936
1937
1938  }
1939
1940  static class ScanTest extends TableTest {
1941    private ResultScanner testScanner;
1942
1943    ScanTest(Connection con, TestOptions options, Status status) {
1944      super(con, options, status);
1945    }
1946
1947    @Override
1948    void testTakedown() throws IOException {
1949      if (this.testScanner != null) {
1950        this.testScanner.close();
1951      }
1952      super.testTakedown();
1953    }
1954
1955
1956    @Override
1957    boolean testRow(final int i) throws IOException {
1958      if (this.testScanner == null) {
1959        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1960            .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1961            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1962        for (int family = 0; family < opts.families; family++) {
1963          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1964          if (opts.addColumns) {
1965            for (int column = 0; column < opts.columns; column++) {
1966              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1967              scan.addColumn(familyName, qualifier);
1968            }
1969          } else {
1970            scan.addFamily(familyName);
1971          }
1972        }
1973        if (opts.filterAll) {
1974          scan.setFilter(new FilterAllFilter());
1975        }
1976        this.testScanner = table.getScanner(scan);
1977      }
1978      Result r = testScanner.next();
1979      updateValueSize(r);
1980      return true;
1981    }
1982  }
1983
1984  /**
1985   * Base class for operations that are CAS-like; that read a value and then set it based off what
1986   * they read. In this category is increment, append, checkAndPut, etc.
1987   *
1988   * <p>These operations also want some concurrency going on. Usually when these tests run, they
1989   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
1990   * same key space. We do this with our getStartRow and getLastRow overrides.
1991   */
1992  static abstract class CASTableTest extends TableTest {
1993    private final byte [] qualifier;
1994    CASTableTest(Connection con, TestOptions options, Status status) {
1995      super(con, options, status);
1996      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
1997    }
1998
1999    byte [] getQualifier() {
2000      return this.qualifier;
2001    }
2002
2003    @Override
2004    int getStartRow() {
2005      return 0;
2006    }
2007
2008    @Override
2009    int getLastRow() {
2010      return opts.perClientRunRows;
2011    }
2012  }
2013
2014  static class IncrementTest extends CASTableTest {
2015    IncrementTest(Connection con, TestOptions options, Status status) {
2016      super(con, options, status);
2017    }
2018
2019    @Override
2020    boolean testRow(final int i) throws IOException {
2021      Increment increment = new Increment(format(i));
2022      // unlike checkAndXXX tests, which make most sense to do on a single value,
2023      // if multiple families are specified for an increment test we assume it is
2024      // meant to raise the work factor
2025      for (int family = 0; family < opts.families; family++) {
2026        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2027        increment.addColumn(familyName, getQualifier(), 1l);
2028      }
2029      updateValueSize(this.table.increment(increment));
2030      return true;
2031    }
2032  }
2033
2034  static class AppendTest extends CASTableTest {
2035    AppendTest(Connection con, TestOptions options, Status status) {
2036      super(con, options, status);
2037    }
2038
2039    @Override
2040    boolean testRow(final int i) throws IOException {
2041      byte [] bytes = format(i);
2042      Append append = new Append(bytes);
2043      // unlike checkAndXXX tests, which make most sense to do on a single value,
2044      // if multiple families are specified for an append test we assume it is
2045      // meant to raise the work factor
2046      for (int family = 0; family < opts.families; family++) {
2047        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2048        append.addColumn(familyName, getQualifier(), bytes);
2049      }
2050      updateValueSize(this.table.append(append));
2051      return true;
2052    }
2053  }
2054
2055  static class CheckAndMutateTest extends CASTableTest {
2056    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2057      super(con, options, status);
2058    }
2059
2060    @Override
2061    boolean testRow(final int i) throws IOException {
2062      final byte [] bytes = format(i);
2063      // checkAndXXX tests operate on only a single value
2064      // Put a known value so when we go to check it, it is there.
2065      Put put = new Put(bytes);
2066      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2067      this.table.put(put);
2068      RowMutations mutations = new RowMutations(bytes);
2069      mutations.add(put);
2070      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2071          .ifEquals(bytes).thenMutate(mutations);
2072      return true;
2073    }
2074  }
2075
2076  static class CheckAndPutTest extends CASTableTest {
2077    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2078      super(con, options, status);
2079    }
2080
2081    @Override
2082    boolean testRow(final int i) throws IOException {
2083      final byte [] bytes = format(i);
2084      // checkAndXXX tests operate on only a single value
2085      // Put a known value so when we go to check it, it is there.
2086      Put put = new Put(bytes);
2087      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2088      this.table.put(put);
2089      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2090          .ifEquals(bytes).thenPut(put);
2091      return true;
2092    }
2093  }
2094
2095  static class CheckAndDeleteTest extends CASTableTest {
2096    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2097      super(con, options, status);
2098    }
2099
2100    @Override
2101    boolean testRow(final int i) throws IOException {
2102      final byte [] bytes = format(i);
2103      // checkAndXXX tests operate on only a single value
2104      // Put a known value so when we go to check it, it is there.
2105      Put put = new Put(bytes);
2106      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2107      this.table.put(put);
2108      Delete delete = new Delete(put.getRow());
2109      delete.addColumn(FAMILY_ZERO, getQualifier());
2110      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2111          .ifEquals(bytes).thenDelete(delete);
2112      return true;
2113    }
2114  }
2115
2116  static class SequentialReadTest extends TableTest {
2117    SequentialReadTest(Connection con, TestOptions options, Status status) {
2118      super(con, options, status);
2119    }
2120
2121    @Override
2122    boolean testRow(final int i) throws IOException {
2123      Get get = new Get(format(i));
2124      for (int family = 0; family < opts.families; family++) {
2125        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2126        if (opts.addColumns) {
2127          for (int column = 0; column < opts.columns; column++) {
2128            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2129            get.addColumn(familyName, qualifier);
2130          }
2131        } else {
2132          get.addFamily(familyName);
2133        }
2134      }
2135      if (opts.filterAll) {
2136        get.setFilter(new FilterAllFilter());
2137      }
2138      updateValueSize(table.get(get));
2139      return true;
2140    }
2141  }
2142
2143  static class SequentialWriteTest extends BufferedMutatorTest {
2144    private ArrayList<Put> puts;
2145
2146
2147    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2148      super(con, options, status);
2149      if (opts.multiPut > 0) {
2150        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2151        this.puts = new ArrayList<>(opts.multiPut);
2152      }
2153    }
2154
2155    protected byte[] generateRow(final int i) {
2156      return format(i);
2157    }
2158
2159    @Override
2160    boolean testRow(final int i) throws IOException {
2161      byte[] row = generateRow(i);
2162      Put put = new Put(row);
2163      for (int family = 0; family < opts.families; family++) {
2164        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2165        for (int column = 0; column < opts.columns; column++) {
2166          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2167          byte[] value = generateData(this.rand, getValueLength(this.rand));
2168          if (opts.useTags) {
2169            byte[] tag = generateData(this.rand, TAG_LENGTH);
2170            Tag[] tags = new Tag[opts.noOfTags];
2171            for (int n = 0; n < opts.noOfTags; n++) {
2172              Tag t = new ArrayBackedTag((byte) n, tag);
2173              tags[n] = t;
2174            }
2175            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2176              value, tags);
2177            put.add(kv);
2178            updateValueSize(kv.getValueLength());
2179          } else {
2180            put.addColumn(familyName, qualifier, value);
2181            updateValueSize(value.length);
2182          }
2183        }
2184      }
2185      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2186      if (opts.autoFlush) {
2187        if (opts.multiPut > 0) {
2188          this.puts.add(put);
2189          if (this.puts.size() == opts.multiPut) {
2190            table.put(this.puts);
2191            this.puts.clear();
2192          } else {
2193            return false;
2194          }
2195        } else {
2196          table.put(put);
2197        }
2198      } else {
2199        mutator.mutate(put);
2200      }
2201      return true;
2202    }
2203  }
2204
2205  static class FilteredScanTest extends TableTest {
2206    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2207
2208    FilteredScanTest(Connection con, TestOptions options, Status status) {
2209      super(con, options, status);
2210      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2211        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB +
2212                ". This could take a very long time.");
2213      }
2214    }
2215
2216    @Override
2217    boolean testRow(int i) throws IOException {
2218      byte[] value = generateData(this.rand, getValueLength(this.rand));
2219      Scan scan = constructScan(value);
2220      ResultScanner scanner = null;
2221      try {
2222        scanner = this.table.getScanner(scan);
2223        for (Result r = null; (r = scanner.next()) != null;) {
2224          updateValueSize(r);
2225        }
2226      } finally {
2227        if (scanner != null) {
2228          updateScanMetrics(scanner.getScanMetrics());
2229          scanner.close();
2230        }
2231      }
2232      return true;
2233    }
2234
2235    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2236      FilterList list = new FilterList();
2237      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2238        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2239      list.addFilter(filter);
2240      if (opts.filterAll) {
2241        list.addFilter(new FilterAllFilter());
2242      }
2243      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2244          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2245          .setScanMetricsEnabled(true);
2246      if (opts.addColumns) {
2247        for (int column = 0; column < opts.columns; column++) {
2248          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2249          scan.addColumn(FAMILY_ZERO, qualifier);
2250        }
2251      } else {
2252        scan.addFamily(FAMILY_ZERO);
2253      }
2254      scan.setFilter(list);
2255      return scan;
2256    }
2257  }
2258
2259  /**
2260   * Compute a throughput rate in MB/s.
2261   * @param rows Number of records consumed.
2262   * @param timeMs Time taken in milliseconds.
2263   * @return String value with label, ie '123.76 MB/s'
2264   */
2265  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2266    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2267      ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2268    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2269      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2270      .divide(BYTES_PER_MB, CXT);
2271    return FMT.format(mbps) + " MB/s";
2272  }
2273
2274  /*
2275   * Format passed integer.
2276   * @param number
2277   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2278   * number (Does absolute in case number is negative).
2279   */
2280  public static byte [] format(final int number) {
2281    byte [] b = new byte[ROW_LENGTH];
2282    int d = Math.abs(number);
2283    for (int i = b.length - 1; i >= 0; i--) {
2284      b[i] = (byte)((d % 10) + '0');
2285      d /= 10;
2286    }
2287    return b;
2288  }
2289
2290  /*
2291   * This method takes some time and is done inline uploading data.  For
2292   * example, doing the mapfile test, generation of the key and value
2293   * consumes about 30% of CPU time.
2294   * @return Generated random value to insert into a table cell.
2295   */
2296  public static byte[] generateData(final Random r, int length) {
2297    byte [] b = new byte [length];
2298    int i;
2299
2300    for(i = 0; i < (length-8); i += 8) {
2301      b[i] = (byte) (65 + r.nextInt(26));
2302      b[i+1] = b[i];
2303      b[i+2] = b[i];
2304      b[i+3] = b[i];
2305      b[i+4] = b[i];
2306      b[i+5] = b[i];
2307      b[i+6] = b[i];
2308      b[i+7] = b[i];
2309    }
2310
2311    byte a = (byte) (65 + r.nextInt(26));
2312    for(; i < length; i++) {
2313      b[i] = a;
2314    }
2315    return b;
2316  }
2317
2318  static byte [] getRandomRow(final Random random, final int totalRows) {
2319    return format(generateRandomRow(random, totalRows));
2320  }
2321
2322  static int generateRandomRow(final Random random, final int totalRows) {
2323    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2324  }
2325
2326  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2327      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2328      throws IOException, InterruptedException {
2329    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2330        + opts.perClientRunRows + " rows");
2331    long totalElapsedTime;
2332
2333    final TestBase t;
2334    try {
2335      if (AsyncTest.class.isAssignableFrom(cmd)) {
2336        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2337        Constructor<? extends AsyncTest> constructor =
2338            newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2339        t = constructor.newInstance(asyncCon, opts, status);
2340      } else {
2341        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2342        Constructor<? extends Test> constructor =
2343            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2344        t = constructor.newInstance(con, opts, status);
2345      }
2346    } catch (NoSuchMethodException e) {
2347      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2348          + ".  It does not provide a constructor as described by "
2349          + "the javadoc comment.  Available constructors are: "
2350          + Arrays.toString(cmd.getConstructors()));
2351    } catch (Exception e) {
2352      throw new IllegalStateException("Failed to construct command class", e);
2353    }
2354    totalElapsedTime = t.test();
2355
2356    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2357      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2358      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2359          getAverageValueLength(opts), opts.families, opts.columns) + ")");
2360
2361    return new RunResult(totalElapsedTime, t.getLatencyHistogram());
2362  }
2363
2364  private static int getAverageValueLength(final TestOptions opts) {
2365    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2366  }
2367
2368  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2369      InterruptedException, ClassNotFoundException, ExecutionException {
2370    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2371    // the TestOptions introspection for us and dump the output in a readable format.
2372    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2373    Admin admin = null;
2374    Connection connection = null;
2375    try {
2376      connection = ConnectionFactory.createConnection(getConf());
2377      admin = connection.getAdmin();
2378      checkTable(admin, opts);
2379    } finally {
2380      if (admin != null) admin.close();
2381      if (connection != null) connection.close();
2382    }
2383    if (opts.nomapred) {
2384      doLocalClients(opts, getConf());
2385    } else {
2386      doMapReduce(opts, getConf());
2387    }
2388  }
2389
2390  protected void printUsage() {
2391    printUsage(PE_COMMAND_SHORTNAME, null);
2392  }
2393
2394  protected static void printUsage(final String message) {
2395    printUsage(PE_COMMAND_SHORTNAME, message);
2396  }
2397
2398  protected static void printUsageAndExit(final String message, final int exitCode) {
2399    printUsage(message);
2400    System.exit(exitCode);
2401  }
2402
2403  protected static void printUsage(final String shortName, final String message) {
2404    if (message != null && message.length() > 0) {
2405      System.err.println(message);
2406    }
2407    System.err.print("Usage: hbase " + shortName);
2408    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2409    System.err.println();
2410    System.err.println("General Options:");
2411    System.err.println(" nomapred        Run multiple clients using threads " +
2412      "(rather than use mapreduce)");
2413    System.err.println(" oneCon          all the threads share the same connection. Default: False");
2414    System.err.println(" connCount          connections all threads share. "
2415        + "For example, if set to 2, then all thread share 2 connection. "
2416        + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2417        + "if not, connCount=thread number");
2418
2419    System.err.println(" sampleRate      Execute test on a sample of total " +
2420      "rows. Only supported by randomRead. Default: 1.0");
2421    System.err.println(" period          Report every 'period' rows: " +
2422      "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2423    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2424    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
2425      "Default: 0");
2426    System.err.println(" latency         Set to report operation latencies. Default: False");
2427    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'" +
2428        " rows have been treated. Default: 0");
2429    System.err.println(" valueSize       Pass value size to use: Default: "
2430        + DEFAULT_OPTS.getValueSize());
2431    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
2432        "'valueSize'; set on read for stats on size: Default: Not set.");
2433    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2434        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2435    System.err.println();
2436    System.err.println("Table Creation / Write Tests:");
2437    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2438    System.err.println(" rows            Rows each client runs. Default: "
2439        + DEFAULT_OPTS.getPerClientRunRows()
2440        + ".  In case of randomReads and randomSeekScans this could"
2441        + " be specified along with --size to specify the number of rows to be scanned within"
2442        + " the total range specified by the size.");
2443    System.err.println(
2444      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2445          + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2446          + " use size to specify the end range and --rows"
2447          + " specifies the number of rows within that range. " + "Default: 1.0.");
2448    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2449    System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
2450      "Default: false");
2451    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
2452        "'valueSize' in zipf form: Default: Not set.");
2453    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2454    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2455    System.err.println(" multiPut        Batch puts together into groups of N. Only supported " +
2456        "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2457    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2458        + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2459        + "Recommended for accurate perf analysis (see guide). Default: disabled");
2460    System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
2461      "Default: false");
2462    System.err.println(" numoftags       Specify the no of tags that would be needed. " +
2463       "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2464    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2465    System.err.println(" columns         Columns to write per row. Default: 1");
2466    System.err.println(" families        Specify number of column families for the table. Default: 1");
2467    System.err.println();
2468    System.err.println("Read Tests:");
2469    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2470        + " there by not returning any thing back to the client.  Helps to check the server side"
2471        + " performance.  Uses FilterAllFilter internally. ");
2472    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
2473      "by randomRead. Default: disabled");
2474    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
2475      "inmemory as far as possible. Not guaranteed that reads are always served " +
2476      "from memory.  Default: false");
2477    System.err.println(" bloomFilter     Bloom filter type, one of "
2478        + Arrays.toString(BloomType.values()));
2479    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2480    System.err.println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2481        + "Uses the CompactingMemstore");
2482    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2483    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2484    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2485    System.err.println(" caching         Scan caching to use. Default: 30");
2486    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2487    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2488    System.err.println(" scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2489    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2490    System.err.println();
2491    System.err.println(" Note: -D properties will be applied to the conf used. ");
2492    System.err.println("  For example: ");
2493    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2494    System.err.println("   -Dmapreduce.task.timeout=60000");
2495    System.err.println();
2496    System.err.println("Command:");
2497    for (CmdDescriptor command : COMMANDS.values()) {
2498      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2499    }
2500    System.err.println();
2501    System.err.println("Args:");
2502    System.err.println(" nclients        Integer. Required. Total number of clients "
2503        + "(and HRegionServers) running. 1 <= value <= 500");
2504    System.err.println("Examples:");
2505    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2506    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2507    System.err.println(" To run 10 clients doing increments over ten rows:");
2508    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2509  }
2510
2511  /**
2512   * Parse options passed in via an arguments array. Assumes that array has been split
2513   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2514   * in the queue at the conclusion of this method call. It's up to the caller to deal
2515   * with these unrecognized arguments.
2516   */
2517  static TestOptions parseOpts(Queue<String> args) {
2518    TestOptions opts = new TestOptions();
2519
2520    String cmd = null;
2521    while ((cmd = args.poll()) != null) {
2522      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2523        // place item back onto queue so that caller knows parsing was incomplete
2524        args.add(cmd);
2525        break;
2526      }
2527
2528      final String nmr = "--nomapred";
2529      if (cmd.startsWith(nmr)) {
2530        opts.nomapred = true;
2531        continue;
2532      }
2533
2534      final String rows = "--rows=";
2535      if (cmd.startsWith(rows)) {
2536        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2537        continue;
2538      }
2539
2540      final String cycles = "--cycles=";
2541      if (cmd.startsWith(cycles)) {
2542        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2543        continue;
2544      }
2545
2546      final String sampleRate = "--sampleRate=";
2547      if (cmd.startsWith(sampleRate)) {
2548        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2549        continue;
2550      }
2551
2552      final String table = "--table=";
2553      if (cmd.startsWith(table)) {
2554        opts.tableName = cmd.substring(table.length());
2555        continue;
2556      }
2557
2558      final String startRow = "--startRow=";
2559      if (cmd.startsWith(startRow)) {
2560        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2561        continue;
2562      }
2563
2564      final String compress = "--compress=";
2565      if (cmd.startsWith(compress)) {
2566        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2567        continue;
2568      }
2569
2570      final String traceRate = "--traceRate=";
2571      if (cmd.startsWith(traceRate)) {
2572        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2573        continue;
2574      }
2575
2576      final String blockEncoding = "--blockEncoding=";
2577      if (cmd.startsWith(blockEncoding)) {
2578        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2579        continue;
2580      }
2581
2582      final String flushCommits = "--flushCommits=";
2583      if (cmd.startsWith(flushCommits)) {
2584        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2585        continue;
2586      }
2587
2588      final String writeToWAL = "--writeToWAL=";
2589      if (cmd.startsWith(writeToWAL)) {
2590        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2591        continue;
2592      }
2593
2594      final String presplit = "--presplit=";
2595      if (cmd.startsWith(presplit)) {
2596        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2597        continue;
2598      }
2599
2600      final String inMemory = "--inmemory=";
2601      if (cmd.startsWith(inMemory)) {
2602        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2603        continue;
2604      }
2605
2606      final String autoFlush = "--autoFlush=";
2607      if (cmd.startsWith(autoFlush)) {
2608        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2609        if (!opts.autoFlush && opts.multiPut > 0) {
2610          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2611        }
2612        continue;
2613      }
2614
2615      final String onceCon = "--oneCon=";
2616      if (cmd.startsWith(onceCon)) {
2617        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2618        if (opts.oneCon && opts.connCount > 1) {
2619          throw new IllegalArgumentException("oneCon is set to true, "
2620              + "connCount should not bigger than 1");
2621        }
2622        continue;
2623      }
2624
2625      final String connCount = "--connCount=";
2626      if (cmd.startsWith(connCount)) {
2627        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2628        if (opts.oneCon && opts.connCount > 1) {
2629          throw new IllegalArgumentException("oneCon is set to true, "
2630              + "connCount should not bigger than 1");
2631        }
2632        continue;
2633      }
2634
2635      final String latency = "--latency";
2636      if (cmd.startsWith(latency)) {
2637        opts.reportLatency = true;
2638        continue;
2639      }
2640
2641      final String multiGet = "--multiGet=";
2642      if (cmd.startsWith(multiGet)) {
2643        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2644        continue;
2645      }
2646
2647      final String multiPut = "--multiPut=";
2648      if (cmd.startsWith(multiPut)) {
2649        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2650        if (!opts.autoFlush && opts.multiPut > 0) {
2651          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2652        }
2653        continue;
2654      }
2655
2656      final String useTags = "--usetags=";
2657      if (cmd.startsWith(useTags)) {
2658        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2659        continue;
2660      }
2661
2662      final String noOfTags = "--numoftags=";
2663      if (cmd.startsWith(noOfTags)) {
2664        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2665        continue;
2666      }
2667
2668      final String replicas = "--replicas=";
2669      if (cmd.startsWith(replicas)) {
2670        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2671        continue;
2672      }
2673
2674      final String filterOutAll = "--filterAll";
2675      if (cmd.startsWith(filterOutAll)) {
2676        opts.filterAll = true;
2677        continue;
2678      }
2679
2680      final String size = "--size=";
2681      if (cmd.startsWith(size)) {
2682        opts.size = Float.parseFloat(cmd.substring(size.length()));
2683        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2684        continue;
2685      }
2686
2687      final String splitPolicy = "--splitPolicy=";
2688      if (cmd.startsWith(splitPolicy)) {
2689        opts.splitPolicy = cmd.substring(splitPolicy.length());
2690        continue;
2691      }
2692
2693      final String randomSleep = "--randomSleep=";
2694      if (cmd.startsWith(randomSleep)) {
2695        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2696        continue;
2697      }
2698
2699      final String measureAfter = "--measureAfter=";
2700      if (cmd.startsWith(measureAfter)) {
2701        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2702        continue;
2703      }
2704
2705      final String bloomFilter = "--bloomFilter=";
2706      if (cmd.startsWith(bloomFilter)) {
2707        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2708        continue;
2709      }
2710
2711      final String blockSize = "--blockSize=";
2712      if(cmd.startsWith(blockSize) ) {
2713        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2714        continue;
2715      }
2716
2717      final String valueSize = "--valueSize=";
2718      if (cmd.startsWith(valueSize)) {
2719        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2720        continue;
2721      }
2722
2723      final String valueRandom = "--valueRandom";
2724      if (cmd.startsWith(valueRandom)) {
2725        opts.valueRandom = true;
2726        if (opts.valueZipf) {
2727          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2728        }
2729        continue;
2730      }
2731
2732      final String valueZipf = "--valueZipf";
2733      if (cmd.startsWith(valueZipf)) {
2734        opts.valueZipf = true;
2735        if (opts.valueRandom) {
2736          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2737        }
2738        continue;
2739      }
2740
2741      final String period = "--period=";
2742      if (cmd.startsWith(period)) {
2743        opts.period = Integer.parseInt(cmd.substring(period.length()));
2744        continue;
2745      }
2746
2747      final String addColumns = "--addColumns=";
2748      if (cmd.startsWith(addColumns)) {
2749        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2750        continue;
2751      }
2752
2753      final String inMemoryCompaction = "--inmemoryCompaction=";
2754      if (cmd.startsWith(inMemoryCompaction)) {
2755        opts.inMemoryCompaction =
2756            MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2757        continue;
2758      }
2759
2760      final String columns = "--columns=";
2761      if (cmd.startsWith(columns)) {
2762        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2763        continue;
2764      }
2765
2766      final String families = "--families=";
2767      if (cmd.startsWith(families)) {
2768        opts.families = Integer.parseInt(cmd.substring(families.length()));
2769        continue;
2770      }
2771
2772      final String caching = "--caching=";
2773      if (cmd.startsWith(caching)) {
2774        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2775        continue;
2776      }
2777
2778      final String asyncPrefetch = "--asyncPrefetch";
2779      if (cmd.startsWith(asyncPrefetch)) {
2780        opts.asyncPrefetch = true;
2781        continue;
2782      }
2783
2784      final String cacheBlocks = "--cacheBlocks=";
2785      if (cmd.startsWith(cacheBlocks)) {
2786        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2787        continue;
2788      }
2789
2790      final String scanReadType = "--scanReadType=";
2791      if (cmd.startsWith(scanReadType)) {
2792        opts.scanReadType =
2793            Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2794        continue;
2795      }
2796
2797      final String bufferSize = "--bufferSize=";
2798      if (cmd.startsWith(bufferSize)) {
2799        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2800        continue;
2801      }
2802
2803      if (isCommandClass(cmd)) {
2804        opts.cmdName = cmd;
2805        try {
2806          opts.numClientThreads = Integer.parseInt(args.remove());
2807        } catch (NoSuchElementException | NumberFormatException e) {
2808          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2809        }
2810        opts = calculateRowsAndSize(opts);
2811        break;
2812      } else {
2813        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2814      }
2815
2816      // Not matching any option or command.
2817      System.err.println("Error: Wrong option or command: " + cmd);
2818      args.add(cmd);
2819      break;
2820    }
2821    return opts;
2822  }
2823
2824  static TestOptions calculateRowsAndSize(final TestOptions opts) {
2825    int rowsPerGB = getRowsPerGB(opts);
2826    if ((opts.getCmdName() != null
2827        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
2828        && opts.size != DEFAULT_OPTS.size
2829        && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2830      opts.totalRows = (int) opts.size * rowsPerGB;
2831    } else if (opts.size != DEFAULT_OPTS.size) {
2832      // total size in GB specified
2833      opts.totalRows = (int) opts.size * rowsPerGB;
2834      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2835    } else {
2836      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2837      opts.size = opts.totalRows / rowsPerGB;
2838    }
2839    return opts;
2840  }
2841
2842  static int getRowsPerGB(final TestOptions opts) {
2843    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
2844        opts.getColumns());
2845  }
2846
2847  @Override
2848  public int run(String[] args) throws Exception {
2849    // Process command-line args. TODO: Better cmd-line processing
2850    // (but hopefully something not as painful as cli options).
2851    int errCode = -1;
2852    if (args.length < 1) {
2853      printUsage();
2854      return errCode;
2855    }
2856
2857    try {
2858      LinkedList<String> argv = new LinkedList<>();
2859      argv.addAll(Arrays.asList(args));
2860      TestOptions opts = parseOpts(argv);
2861
2862      // args remaining, print help and exit
2863      if (!argv.isEmpty()) {
2864        errCode = 0;
2865        printUsage();
2866        return errCode;
2867      }
2868
2869      // must run at least 1 client
2870      if (opts.numClientThreads <= 0) {
2871        throw new IllegalArgumentException("Number of clients must be > 0");
2872      }
2873
2874      // cmdName should not be null, print help and exit
2875      if (opts.cmdName == null) {
2876        printUsage();
2877        return errCode;
2878      }
2879
2880      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
2881      if (cmdClass != null) {
2882        runTest(cmdClass, opts);
2883        errCode = 0;
2884      }
2885
2886    } catch (Exception e) {
2887      e.printStackTrace();
2888    }
2889
2890    return errCode;
2891  }
2892
2893  private static boolean isCommandClass(String cmd) {
2894    return COMMANDS.containsKey(cmd);
2895  }
2896
2897  private static Class<? extends TestBase> determineCommandClass(String cmd) {
2898    CmdDescriptor descriptor = COMMANDS.get(cmd);
2899    return descriptor != null ? descriptor.getCmdClass() : null;
2900  }
2901
2902  public static void main(final String[] args) throws Exception {
2903    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2904    System.exit(res);
2905  }
2906}