001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import com.codahale.metrics.Histogram;
022import com.codahale.metrics.UniformReservoir;
023import java.io.IOException;
024import java.io.PrintStream;
025import java.lang.reflect.Constructor;
026import java.math.BigDecimal;
027import java.math.MathContext;
028import java.text.DecimalFormat;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Date;
033import java.util.LinkedList;
034import java.util.Locale;
035import java.util.Map;
036import java.util.NoSuchElementException;
037import java.util.Queue;
038import java.util.Random;
039import java.util.TreeMap;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import org.apache.commons.lang3.StringUtils;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.conf.Configured;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.Append;
052import org.apache.hadoop.hbase.client.AsyncConnection;
053import org.apache.hadoop.hbase.client.AsyncTable;
054import org.apache.hadoop.hbase.client.BufferedMutator;
055import org.apache.hadoop.hbase.client.BufferedMutatorParams;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Consistency;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Durability;
061import org.apache.hadoop.hbase.client.Get;
062import org.apache.hadoop.hbase.client.Increment;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.RowMutations;
067import org.apache.hadoop.hbase.client.Scan;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
070import org.apache.hadoop.hbase.filter.BinaryComparator;
071import org.apache.hadoop.hbase.filter.Filter;
072import org.apache.hadoop.hbase.filter.FilterAllFilter;
073import org.apache.hadoop.hbase.filter.FilterList;
074import org.apache.hadoop.hbase.filter.PageFilter;
075import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
076import org.apache.hadoop.hbase.filter.WhileMatchFilter;
077import org.apache.hadoop.hbase.io.compress.Compression;
078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
079import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
080import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
081import org.apache.hadoop.hbase.regionserver.BloomType;
082import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
083import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
084import org.apache.hadoop.hbase.trace.SpanReceiverHost;
085import org.apache.hadoop.hbase.trace.TraceUtil;
086import org.apache.hadoop.hbase.util.ByteArrayHashKey;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.hadoop.hbase.util.GsonUtil;
089import org.apache.hadoop.hbase.util.Hash;
090import org.apache.hadoop.hbase.util.MurmurHash;
091import org.apache.hadoop.hbase.util.Pair;
092import org.apache.hadoop.hbase.util.YammerHistogramUtils;
093import org.apache.hadoop.io.LongWritable;
094import org.apache.hadoop.io.Text;
095import org.apache.hadoop.mapreduce.Job;
096import org.apache.hadoop.mapreduce.Mapper;
097import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
098import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
099import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
100import org.apache.hadoop.util.Tool;
101import org.apache.hadoop.util.ToolRunner;
102import org.apache.htrace.core.ProbabilitySampler;
103import org.apache.htrace.core.Sampler;
104import org.apache.htrace.core.TraceScope;
105import org.apache.yetus.audience.InterfaceAudience;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
110import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
111import org.apache.hbase.thirdparty.com.google.gson.Gson;
112
113/**
114 * Script used evaluating HBase performance and scalability.  Runs a HBase
115 * client that steps through one of a set of hardcoded tests or 'experiments'
116 * (e.g. a random reads test, a random writes test, etc.). Pass on the
117 * command-line which test to run and how many clients are participating in
118 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
119 *
120 * <p>This class sets up and runs the evaluation programs described in
121 * Section 7, <i>Performance Evaluation</i>, of the <a
122 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
123 * paper, pages 8-10.
124 *
125 * <p>By default, runs as a mapreduce job where each mapper runs a single test
126 * client. Can also run as a non-mapreduce, multithreaded application by
127 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
128 * specified otherwise.
129 */
130@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
131public class PerformanceEvaluation extends Configured implements Tool {
132  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
133  static final String RANDOM_READ = "randomRead";
134  static final String PE_COMMAND_SHORTNAME = "pe";
135  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
136  private static final Gson GSON = GsonUtil.createGson().create();
137
138  public static final String TABLE_NAME = "TestTable";
139  public static final String FAMILY_NAME_BASE = "info";
140  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
141  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
142  public static final int DEFAULT_VALUE_LENGTH = 1000;
143  public static final int ROW_LENGTH = 26;
144
145  private static final int ONE_GB = 1024 * 1024 * 1000;
146  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
147  // TODO : should we make this configurable
148  private static final int TAG_LENGTH = 256;
149  private static final DecimalFormat FMT = new DecimalFormat("0.##");
150  private static final MathContext CXT = MathContext.DECIMAL64;
151  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
152  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
153  private static final TestOptions DEFAULT_OPTS = new TestOptions();
154
155  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
156  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
157
158  static {
159    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
160        "Run async random read test");
161    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
162        "Run async random write test");
163    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
164        "Run async sequential read test");
165    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
166        "Run async sequential write test");
167    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
168        "Run async scan test (read every row)");
169    addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
170      "Run random read test");
171    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
172      "Run random seek and scan 100 test");
173    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
174      "Run random seek scan with both start and stop row (max 10 rows)");
175    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
176      "Run random seek scan with both start and stop row (max 100 rows)");
177    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
178      "Run random seek scan with both start and stop row (max 1000 rows)");
179    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
180      "Run random seek scan with both start and stop row (max 10000 rows)");
181    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
182      "Run random write test");
183    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
184      "Run sequential read test");
185    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
186      "Run sequential write test");
187    addCommandDescriptor(ScanTest.class, "scan",
188      "Run scan test (read every row)");
189    addCommandDescriptor(FilteredScanTest.class, "filterScan",
190      "Run scan test using a filter to find a specific row based on it's value " +
191      "(make sure to use --rows=20)");
192    addCommandDescriptor(IncrementTest.class, "increment",
193      "Increment on each row; clients overlap on keyspace so some concurrent operations");
194    addCommandDescriptor(AppendTest.class, "append",
195      "Append on each row; clients overlap on keyspace so some concurrent operations");
196    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
197      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
198    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
199      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
200    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
201      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
202  }
203
204  /**
205   * Enum for map metrics.  Keep it out here rather than inside in the Map
206   * inner-class so we can find associated properties.
207   */
208  protected static enum Counter {
209    /** elapsed time */
210    ELAPSED_TIME,
211    /** number of rows */
212    ROWS
213  }
214
215  protected static class RunResult implements Comparable<RunResult> {
216    public RunResult(long duration, Histogram hist) {
217      this.duration = duration;
218      this.hist = hist;
219    }
220
221    public final long duration;
222    public final Histogram hist;
223
224    @Override
225    public String toString() {
226      return Long.toString(duration);
227    }
228
229    @Override public int compareTo(RunResult o) {
230      return Long.compare(this.duration, o.duration);
231    }
232  }
233
234  /**
235   * Constructor
236   * @param conf Configuration object
237   */
238  public PerformanceEvaluation(final Configuration conf) {
239    super(conf);
240  }
241
242  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
243      String name, String description) {
244    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
245    COMMANDS.put(name, cmdDescriptor);
246  }
247
248  /**
249   * Implementations can have their status set.
250   */
251  interface Status {
252    /**
253     * Sets status
254     * @param msg status message
255     * @throws IOException
256     */
257    void setStatus(final String msg) throws IOException;
258  }
259
260  /**
261   * MapReduce job that runs a performance evaluation client in each map task.
262   */
263  public static class EvaluationMapTask
264      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
265
266    /** configuration parameter name that contains the command */
267    public final static String CMD_KEY = "EvaluationMapTask.command";
268    /** configuration parameter name that contains the PE impl */
269    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
270
271    private Class<? extends Test> cmd;
272
273    @Override
274    protected void setup(Context context) throws IOException, InterruptedException {
275      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
276
277      // this is required so that extensions of PE are instantiated within the
278      // map reduce task...
279      Class<? extends PerformanceEvaluation> peClass =
280          forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
281      try {
282        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
283      } catch (Exception e) {
284        throw new IllegalStateException("Could not instantiate PE instance", e);
285      }
286    }
287
288    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
289      try {
290        return Class.forName(className).asSubclass(type);
291      } catch (ClassNotFoundException e) {
292        throw new IllegalStateException("Could not find class for name: " + className, e);
293      }
294    }
295
296    @Override
297    protected void map(LongWritable key, Text value, final Context context)
298           throws IOException, InterruptedException {
299
300      Status status = new Status() {
301        @Override
302        public void setStatus(String msg) {
303           context.setStatus(msg);
304        }
305      };
306
307      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
308      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
309      final Connection con = ConnectionFactory.createConnection(conf);
310      AsyncConnection asyncCon = null;
311      try {
312        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
313      } catch (ExecutionException e) {
314        throw new IOException(e);
315      }
316
317      // Evaluation task
318      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
319      // Collect how much time the thing took. Report as map output and
320      // to the ELAPSED_TIME counter.
321      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
322      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
323      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
324      context.progress();
325    }
326  }
327
328  /*
329   * If table does not already exist, create. Also create a table when
330   * {@code opts.presplitRegions} is specified or when the existing table's
331   * region replica count doesn't match {@code opts.replicas}.
332   */
333  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
334    TableName tableName = TableName.valueOf(opts.tableName);
335    boolean needsDelete = false, exists = admin.tableExists(tableName);
336    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
337      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
338    if (!exists && isReadCmd) {
339      throw new IllegalStateException(
340        "Must specify an existing table for read commands. Run a write command first.");
341    }
342    HTableDescriptor desc =
343      exists ? new HTableDescriptor(admin.getDescriptor(TableName.valueOf(opts.tableName))) : null;
344    byte[][] splits = getSplits(opts);
345
346    // recreate the table when user has requested presplit or when existing
347    // {RegionSplitPolicy,replica count} does not match requested, or when the
348    // number of column families does not match requested.
349    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
350      || (!isReadCmd && desc != null &&
351          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
352      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
353      || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
354      needsDelete = true;
355      // wait, why did it delete my table?!?
356      LOG.debug(MoreObjects.toStringHelper("needsDelete")
357        .add("needsDelete", needsDelete)
358        .add("isReadCmd", isReadCmd)
359        .add("exists", exists)
360        .add("desc", desc)
361        .add("presplit", opts.presplitRegions)
362        .add("splitPolicy", opts.splitPolicy)
363        .add("replicas", opts.replicas)
364        .add("families", opts.families)
365        .toString());
366    }
367
368    // remove an existing table
369    if (needsDelete) {
370      if (admin.isTableEnabled(tableName)) {
371        admin.disableTable(tableName);
372      }
373      admin.deleteTable(tableName);
374    }
375
376    // table creation is necessary
377    if (!exists || needsDelete) {
378      desc = getTableDescriptor(opts);
379      if (splits != null) {
380        if (LOG.isDebugEnabled()) {
381          for (int i = 0; i < splits.length; i++) {
382            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
383          }
384        }
385      }
386      if (splits != null) {
387        admin.createTable(desc, splits);
388      } else {
389        admin.createTable(desc);
390      }
391      LOG.info("Table " + desc + " created");
392    }
393    return admin.tableExists(tableName);
394  }
395
396  /**
397   * Create an HTableDescriptor from provided TestOptions.
398   */
399  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
400    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
401    for (int family = 0; family < opts.families; family++) {
402      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
403      HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
404      familyDesc.setDataBlockEncoding(opts.blockEncoding);
405      familyDesc.setCompressionType(opts.compression);
406      familyDesc.setBloomFilterType(opts.bloomType);
407      familyDesc.setBlocksize(opts.blockSize);
408      if (opts.inMemoryCF) {
409        familyDesc.setInMemory(true);
410      }
411      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
412      tableDesc.addFamily(familyDesc);
413    }
414    if (opts.replicas != DEFAULT_OPTS.replicas) {
415      tableDesc.setRegionReplication(opts.replicas);
416    }
417    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
418      tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
419    }
420    return tableDesc;
421  }
422
423  /**
424   * generates splits based on total number of rows and specified split regions
425   */
426  protected static byte[][] getSplits(TestOptions opts) {
427    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
428      return null;
429
430    int numSplitPoints = opts.presplitRegions - 1;
431    byte[][] splits = new byte[numSplitPoints][];
432    int jump = opts.totalRows / opts.presplitRegions;
433    for (int i = 0; i < numSplitPoints; i++) {
434      int rowkey = jump * (1 + i);
435      splits[i] = format(rowkey);
436    }
437    return splits;
438  }
439
440  static void setupConnectionCount(final TestOptions opts) {
441    if (opts.oneCon) {
442      opts.connCount = 1;
443    } else {
444      if (opts.connCount == -1) {
445        // set to thread number if connCount is not set
446        opts.connCount = opts.numClientThreads;
447      }
448    }
449  }
450
451  /*
452   * Run all clients in this vm each to its own thread.
453   */
454  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
455      throws IOException, InterruptedException, ExecutionException {
456    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
457    assert cmd != null;
458    @SuppressWarnings("unchecked")
459    Future<RunResult>[] threads = new Future[opts.numClientThreads];
460    RunResult[] results = new RunResult[opts.numClientThreads];
461    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
462      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
463    setupConnectionCount(opts);
464    final Connection[] cons = new Connection[opts.connCount];
465    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
466    for (int i = 0; i < opts.connCount; i++) {
467      cons[i] = ConnectionFactory.createConnection(conf);
468      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
469    }
470    LOG.info("Created " + opts.connCount + " connections for " +
471        opts.numClientThreads + " threads");
472    for (int i = 0; i < threads.length; i++) {
473      final int index = i;
474      threads[i] = pool.submit(new Callable<RunResult>() {
475        @Override
476        public RunResult call() throws Exception {
477          TestOptions threadOpts = new TestOptions(opts);
478          final Connection con = cons[index % cons.length];
479          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
480          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
481          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
482            @Override
483            public void setStatus(final String msg) throws IOException {
484              LOG.info(msg);
485            }
486          });
487          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
488            "ms over " + threadOpts.perClientRunRows + " rows");
489          return run;
490        }
491      });
492    }
493    pool.shutdown();
494
495    for (int i = 0; i < threads.length; i++) {
496      try {
497        results[i] = threads[i].get();
498      } catch (ExecutionException e) {
499        throw new IOException(e.getCause());
500      }
501    }
502    final String test = cmd.getSimpleName();
503    LOG.info("[" + test + "] Summary of timings (ms): "
504             + Arrays.toString(results));
505    Arrays.sort(results);
506    long total = 0;
507    float avgLatency = 0 ;
508    float avgTPS = 0;
509    for (RunResult result : results) {
510      total += result.duration;
511      avgLatency += result.hist.getSnapshot().getMean();
512      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
513    }
514    avgTPS *= 1000; // ms to second
515    avgLatency = avgLatency / results.length;
516    LOG.info("[" + test + " duration ]"
517      + "\tMin: " + results[0] + "ms"
518      + "\tMax: " + results[results.length - 1] + "ms"
519      + "\tAvg: " + (total / results.length) + "ms");
520    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
521    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
522    for (int i = 0; i < opts.connCount; i++) {
523      cons[i].close();
524      asyncCons[i].close();
525    }
526
527
528    return results;
529  }
530
531  /*
532   * Run a mapreduce job.  Run as many maps as asked-for clients.
533   * Before we start up the job, write out an input file with instruction
534   * per client regards which row they are to start on.
535   * @param cmd Command to run.
536   * @throws IOException
537   */
538  static Job doMapReduce(TestOptions opts, final Configuration conf)
539      throws IOException, InterruptedException, ClassNotFoundException {
540    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
541    assert cmd != null;
542    Path inputDir = writeInputFile(conf, opts);
543    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
544    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
545    Job job = Job.getInstance(conf);
546    job.setJarByClass(PerformanceEvaluation.class);
547    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
548
549    job.setInputFormatClass(NLineInputFormat.class);
550    NLineInputFormat.setInputPaths(job, inputDir);
551    // this is default, but be explicit about it just in case.
552    NLineInputFormat.setNumLinesPerSplit(job, 1);
553
554    job.setOutputKeyClass(LongWritable.class);
555    job.setOutputValueClass(LongWritable.class);
556
557    job.setMapperClass(EvaluationMapTask.class);
558    job.setReducerClass(LongSumReducer.class);
559
560    job.setNumReduceTasks(1);
561
562    job.setOutputFormatClass(TextOutputFormat.class);
563    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
564
565    TableMapReduceUtil.addDependencyJars(job);
566    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
567      Histogram.class,     // yammer metrics
568      Gson.class,  // gson
569      FilterAllFilter.class // hbase-server tests jar
570      );
571
572    TableMapReduceUtil.initCredentials(job);
573
574    job.waitForCompletion(true);
575    return job;
576  }
577
578  /**
579   * Each client has one mapper to do the work,  and client do the resulting count in a map task.
580   */
581
582  static String JOB_INPUT_FILENAME = "input.txt";
583
584  /*
585   * Write input file of offsets-per-client for the mapreduce job.
586   * @param c Configuration
587   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
588   * @throws IOException
589   */
590  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
591    return writeInputFile(c, opts, new Path("."));
592  }
593
594  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
595  throws IOException {
596    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
597    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
598    Path inputDir = new Path(jobdir, "inputs");
599
600    FileSystem fs = FileSystem.get(c);
601    fs.mkdirs(inputDir);
602
603    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
604    PrintStream out = new PrintStream(fs.create(inputFile));
605    // Make input random.
606    Map<Integer, String> m = new TreeMap<>();
607    Hash h = MurmurHash.getInstance();
608    int perClientRows = (opts.totalRows / opts.numClientThreads);
609    try {
610      for (int j = 0; j < opts.numClientThreads; j++) {
611        TestOptions next = new TestOptions(opts);
612        next.startRow = j * perClientRows;
613        next.perClientRunRows = perClientRows;
614        String s = GSON.toJson(next);
615        LOG.info("Client=" + j + ", input=" + s);
616        byte[] b = Bytes.toBytes(s);
617        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
618        m.put(hash, s);
619      }
620      for (Map.Entry<Integer, String> e: m.entrySet()) {
621        out.println(e.getValue());
622      }
623    } finally {
624      out.close();
625    }
626    return inputDir;
627  }
628
629  /**
630   * Describes a command.
631   */
632  static class CmdDescriptor {
633    private Class<? extends TestBase> cmdClass;
634    private String name;
635    private String description;
636
637    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
638      this.cmdClass = cmdClass;
639      this.name = name;
640      this.description = description;
641    }
642
643    public Class<? extends TestBase> getCmdClass() {
644      return cmdClass;
645    }
646
647    public String getName() {
648      return name;
649    }
650
651    public String getDescription() {
652      return description;
653    }
654  }
655
656  /**
657   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
658   * This makes tracking all these arguments a little easier.
659   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
660   * serialization of this TestOptions class behave), and you need to add to the clone constructor
661   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
662   */
663  static class TestOptions {
664    String cmdName = null;
665    boolean nomapred = false;
666    boolean filterAll = false;
667    int startRow = 0;
668    float size = 1.0f;
669    int perClientRunRows = DEFAULT_ROWS_PER_GB;
670    int numClientThreads = 1;
671    int totalRows = DEFAULT_ROWS_PER_GB;
672    int measureAfter = 0;
673    float sampleRate = 1.0f;
674    double traceRate = 0.0;
675    String tableName = TABLE_NAME;
676    boolean flushCommits = true;
677    boolean writeToWAL = true;
678    boolean autoFlush = false;
679    boolean oneCon = false;
680    int connCount = -1; //wil decide the actual num later
681    boolean useTags = false;
682    int noOfTags = 1;
683    boolean reportLatency = false;
684    int multiGet = 0;
685    int multiPut = 0;
686    int randomSleep = 0;
687    boolean inMemoryCF = false;
688    int presplitRegions = 0;
689    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
690    String splitPolicy = null;
691    Compression.Algorithm compression = Compression.Algorithm.NONE;
692    BloomType bloomType = BloomType.ROW;
693    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
694    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
695    boolean valueRandom = false;
696    boolean valueZipf = false;
697    int valueSize = DEFAULT_VALUE_LENGTH;
698    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
699    int cycles = 1;
700    int columns = 1;
701    int families = 1;
702    int caching = 30;
703    boolean addColumns = true;
704    MemoryCompactionPolicy inMemoryCompaction =
705        MemoryCompactionPolicy.valueOf(
706            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
707    boolean asyncPrefetch = false;
708    boolean cacheBlocks = true;
709    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
710    long bufferSize = 2l * 1024l * 1024l;
711
712    public TestOptions() {}
713
714    /**
715     * Clone constructor.
716     * @param that Object to copy from.
717     */
718    public TestOptions(TestOptions that) {
719      this.cmdName = that.cmdName;
720      this.cycles = that.cycles;
721      this.nomapred = that.nomapred;
722      this.startRow = that.startRow;
723      this.size = that.size;
724      this.perClientRunRows = that.perClientRunRows;
725      this.numClientThreads = that.numClientThreads;
726      this.totalRows = that.totalRows;
727      this.sampleRate = that.sampleRate;
728      this.traceRate = that.traceRate;
729      this.tableName = that.tableName;
730      this.flushCommits = that.flushCommits;
731      this.writeToWAL = that.writeToWAL;
732      this.autoFlush = that.autoFlush;
733      this.oneCon = that.oneCon;
734      this.connCount = that.connCount;
735      this.useTags = that.useTags;
736      this.noOfTags = that.noOfTags;
737      this.reportLatency = that.reportLatency;
738      this.multiGet = that.multiGet;
739      this.multiPut = that.multiPut;
740      this.inMemoryCF = that.inMemoryCF;
741      this.presplitRegions = that.presplitRegions;
742      this.replicas = that.replicas;
743      this.splitPolicy = that.splitPolicy;
744      this.compression = that.compression;
745      this.blockEncoding = that.blockEncoding;
746      this.filterAll = that.filterAll;
747      this.bloomType = that.bloomType;
748      this.blockSize = that.blockSize;
749      this.valueRandom = that.valueRandom;
750      this.valueZipf = that.valueZipf;
751      this.valueSize = that.valueSize;
752      this.period = that.period;
753      this.randomSleep = that.randomSleep;
754      this.measureAfter = that.measureAfter;
755      this.addColumns = that.addColumns;
756      this.columns = that.columns;
757      this.families = that.families;
758      this.caching = that.caching;
759      this.inMemoryCompaction = that.inMemoryCompaction;
760      this.asyncPrefetch = that.asyncPrefetch;
761      this.cacheBlocks = that.cacheBlocks;
762      this.scanReadType = that.scanReadType;
763      this.bufferSize = that.bufferSize;
764    }
765
766    public int getCaching() {
767      return this.caching;
768    }
769
770    public void setCaching(final int caching) {
771      this.caching = caching;
772    }
773
774    public int getColumns() {
775      return this.columns;
776    }
777
778    public void setColumns(final int columns) {
779      this.columns = columns;
780    }
781
782    public int getFamilies() {
783      return this.families;
784    }
785
786    public void setFamilies(final int families) {
787      this.families = families;
788    }
789
790    public int getCycles() {
791      return this.cycles;
792    }
793
794    public void setCycles(final int cycles) {
795      this.cycles = cycles;
796    }
797
798    public boolean isValueZipf() {
799      return valueZipf;
800    }
801
802    public void setValueZipf(boolean valueZipf) {
803      this.valueZipf = valueZipf;
804    }
805
806    public String getCmdName() {
807      return cmdName;
808    }
809
810    public void setCmdName(String cmdName) {
811      this.cmdName = cmdName;
812    }
813
814    public int getRandomSleep() {
815      return randomSleep;
816    }
817
818    public void setRandomSleep(int randomSleep) {
819      this.randomSleep = randomSleep;
820    }
821
822    public int getReplicas() {
823      return replicas;
824    }
825
826    public void setReplicas(int replicas) {
827      this.replicas = replicas;
828    }
829
830    public String getSplitPolicy() {
831      return splitPolicy;
832    }
833
834    public void setSplitPolicy(String splitPolicy) {
835      this.splitPolicy = splitPolicy;
836    }
837
838    public void setNomapred(boolean nomapred) {
839      this.nomapred = nomapred;
840    }
841
842    public void setFilterAll(boolean filterAll) {
843      this.filterAll = filterAll;
844    }
845
846    public void setStartRow(int startRow) {
847      this.startRow = startRow;
848    }
849
850    public void setSize(float size) {
851      this.size = size;
852    }
853
854    public void setPerClientRunRows(int perClientRunRows) {
855      this.perClientRunRows = perClientRunRows;
856    }
857
858    public void setNumClientThreads(int numClientThreads) {
859      this.numClientThreads = numClientThreads;
860    }
861
862    public void setTotalRows(int totalRows) {
863      this.totalRows = totalRows;
864    }
865
866    public void setSampleRate(float sampleRate) {
867      this.sampleRate = sampleRate;
868    }
869
870    public void setTraceRate(double traceRate) {
871      this.traceRate = traceRate;
872    }
873
874    public void setTableName(String tableName) {
875      this.tableName = tableName;
876    }
877
878    public void setFlushCommits(boolean flushCommits) {
879      this.flushCommits = flushCommits;
880    }
881
882    public void setWriteToWAL(boolean writeToWAL) {
883      this.writeToWAL = writeToWAL;
884    }
885
886    public void setAutoFlush(boolean autoFlush) {
887      this.autoFlush = autoFlush;
888    }
889
890    public void setOneCon(boolean oneCon) {
891      this.oneCon = oneCon;
892    }
893
894    public int getConnCount() {
895      return connCount;
896    }
897
898    public void setConnCount(int connCount) {
899      this.connCount = connCount;
900    }
901
902    public void setUseTags(boolean useTags) {
903      this.useTags = useTags;
904    }
905
906    public void setNoOfTags(int noOfTags) {
907      this.noOfTags = noOfTags;
908    }
909
910    public void setReportLatency(boolean reportLatency) {
911      this.reportLatency = reportLatency;
912    }
913
914    public void setMultiGet(int multiGet) {
915      this.multiGet = multiGet;
916    }
917
918    public void setMultiPut(int multiPut) {
919      this.multiPut = multiPut;
920    }
921
922    public void setInMemoryCF(boolean inMemoryCF) {
923      this.inMemoryCF = inMemoryCF;
924    }
925
926    public void setPresplitRegions(int presplitRegions) {
927      this.presplitRegions = presplitRegions;
928    }
929
930    public void setCompression(Compression.Algorithm compression) {
931      this.compression = compression;
932    }
933
934    public void setBloomType(BloomType bloomType) {
935      this.bloomType = bloomType;
936    }
937
938    public void setBlockSize(int blockSize) {
939      this.blockSize = blockSize;
940    }
941
942    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
943      this.blockEncoding = blockEncoding;
944    }
945
946    public void setValueRandom(boolean valueRandom) {
947      this.valueRandom = valueRandom;
948    }
949
950    public void setValueSize(int valueSize) {
951      this.valueSize = valueSize;
952    }
953
954    public void setBufferSize(long bufferSize) {
955      this.bufferSize = bufferSize;
956    }
957
958    public void setPeriod(int period) {
959      this.period = period;
960    }
961
962    public boolean isNomapred() {
963      return nomapred;
964    }
965
966    public boolean isFilterAll() {
967      return filterAll;
968    }
969
970    public int getStartRow() {
971      return startRow;
972    }
973
974    public float getSize() {
975      return size;
976    }
977
978    public int getPerClientRunRows() {
979      return perClientRunRows;
980    }
981
982    public int getNumClientThreads() {
983      return numClientThreads;
984    }
985
986    public int getTotalRows() {
987      return totalRows;
988    }
989
990    public float getSampleRate() {
991      return sampleRate;
992    }
993
994    public double getTraceRate() {
995      return traceRate;
996    }
997
998    public String getTableName() {
999      return tableName;
1000    }
1001
1002    public boolean isFlushCommits() {
1003      return flushCommits;
1004    }
1005
1006    public boolean isWriteToWAL() {
1007      return writeToWAL;
1008    }
1009
1010    public boolean isAutoFlush() {
1011      return autoFlush;
1012    }
1013
1014    public boolean isUseTags() {
1015      return useTags;
1016    }
1017
1018    public int getNoOfTags() {
1019      return noOfTags;
1020    }
1021
1022    public boolean isReportLatency() {
1023      return reportLatency;
1024    }
1025
1026    public int getMultiGet() {
1027      return multiGet;
1028    }
1029
1030    public int getMultiPut() {
1031      return multiPut;
1032    }
1033
1034    public boolean isInMemoryCF() {
1035      return inMemoryCF;
1036    }
1037
1038    public int getPresplitRegions() {
1039      return presplitRegions;
1040    }
1041
1042    public Compression.Algorithm getCompression() {
1043      return compression;
1044    }
1045
1046    public DataBlockEncoding getBlockEncoding() {
1047      return blockEncoding;
1048    }
1049
1050    public boolean isValueRandom() {
1051      return valueRandom;
1052    }
1053
1054    public int getValueSize() {
1055      return valueSize;
1056    }
1057
1058    public int getPeriod() {
1059      return period;
1060    }
1061
1062    public BloomType getBloomType() {
1063      return bloomType;
1064    }
1065
1066    public int getBlockSize() {
1067      return blockSize;
1068    }
1069
1070    public boolean isOneCon() {
1071      return oneCon;
1072    }
1073
1074    public int getMeasureAfter() {
1075      return measureAfter;
1076    }
1077
1078    public void setMeasureAfter(int measureAfter) {
1079      this.measureAfter = measureAfter;
1080    }
1081
1082    public boolean getAddColumns() {
1083      return addColumns;
1084    }
1085
1086    public void setAddColumns(boolean addColumns) {
1087      this.addColumns = addColumns;
1088    }
1089
1090    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1091      this.inMemoryCompaction = inMemoryCompaction;
1092    }
1093
1094    public MemoryCompactionPolicy getInMemoryCompaction() {
1095      return this.inMemoryCompaction;
1096    }
1097
1098    public long getBufferSize() {
1099      return this.bufferSize;
1100    }
1101  }
1102
1103  /*
1104   * A test.
1105   * Subclass to particularize what happens per row.
1106   */
1107  static abstract class TestBase {
1108    // Below is make it so when Tests are all running in the one
1109    // jvm, that they each have a differently seeded Random.
1110    private static final Random randomSeed = new Random(System.currentTimeMillis());
1111
1112    private static long nextRandomSeed() {
1113      return randomSeed.nextLong();
1114    }
1115    private final int everyN;
1116
1117    protected final Random rand = new Random(nextRandomSeed());
1118    protected final Configuration conf;
1119    protected final TestOptions opts;
1120
1121    private final Status status;
1122    private final Sampler traceSampler;
1123    private final SpanReceiverHost receiverHost;
1124
1125    private String testName;
1126    private Histogram latencyHistogram;
1127    private Histogram valueSizeHistogram;
1128    private Histogram rpcCallsHistogram;
1129    private Histogram remoteRpcCallsHistogram;
1130    private Histogram millisBetweenNextHistogram;
1131    private Histogram regionsScannedHistogram;
1132    private Histogram bytesInResultsHistogram;
1133    private Histogram bytesInRemoteResultsHistogram;
1134    private RandomDistribution.Zipf zipf;
1135
1136    /**
1137     * Note that all subclasses of this class must provide a public constructor
1138     * that has the exact same list of arguments.
1139     */
1140    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1141      this.conf = conf;
1142      this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1143      this.opts = options;
1144      this.status = status;
1145      this.testName = this.getClass().getSimpleName();
1146      if (options.traceRate >= 1.0) {
1147        this.traceSampler = Sampler.ALWAYS;
1148      } else if (options.traceRate > 0.0) {
1149        conf.setDouble("hbase.sampler.fraction", options.traceRate);
1150        this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1151      } else {
1152        this.traceSampler = Sampler.NEVER;
1153      }
1154      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1155      if (options.isValueZipf()) {
1156        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1157      }
1158      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1159    }
1160
1161    int getValueLength(final Random r) {
1162      if (this.opts.isValueRandom()) {
1163        return r.nextInt(opts.valueSize);
1164      } else if (this.opts.isValueZipf()) {
1165        return Math.abs(this.zipf.nextInt());
1166      } else {
1167        return opts.valueSize;
1168      }
1169    }
1170
1171    void updateValueSize(final Result [] rs) throws IOException {
1172      if (rs == null || !isRandomValueSize()) return;
1173      for (Result r: rs) updateValueSize(r);
1174    }
1175
1176    void updateValueSize(final Result r) throws IOException {
1177      if (r == null || !isRandomValueSize()) return;
1178      int size = 0;
1179      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1180        size += scanner.current().getValueLength();
1181      }
1182      updateValueSize(size);
1183    }
1184
1185    void updateValueSize(final int valueSize) {
1186      if (!isRandomValueSize()) return;
1187      this.valueSizeHistogram.update(valueSize);
1188    }
1189
1190    void updateScanMetrics(final ScanMetrics metrics) {
1191      if (metrics == null) return;
1192      Map<String,Long> metricsMap = metrics.getMetricsMap();
1193      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1194      if (rpcCalls != null) {
1195        this.rpcCallsHistogram.update(rpcCalls.longValue());
1196      }
1197      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1198      if (remoteRpcCalls != null) {
1199        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1200      }
1201      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1202      if (millisBetweenNext != null) {
1203        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1204      }
1205      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1206      if (regionsScanned != null) {
1207        this.regionsScannedHistogram.update(regionsScanned.longValue());
1208      }
1209      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1210      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1211        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1212      }
1213      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1214      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1215        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1216      }
1217    }
1218
1219    String generateStatus(final int sr, final int i, final int lr) {
1220      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1221        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1222    }
1223
1224    boolean isRandomValueSize() {
1225      return opts.valueRandom;
1226    }
1227
1228    protected int getReportingPeriod() {
1229      return opts.period;
1230    }
1231
1232    /**
1233     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1234     */
1235    public Histogram getLatencyHistogram() {
1236      return latencyHistogram;
1237    }
1238
1239    void testSetup() throws IOException {
1240      // test metrics
1241      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1242      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1243      // scan metrics
1244      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1245      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1246      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1247      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1248      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1249      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1250
1251      onStartup();
1252    }
1253
1254    abstract void onStartup() throws IOException;
1255
1256    void testTakedown() throws IOException {
1257      onTakedown();
1258      // Print all stats for this thread continuously.
1259      // Synchronize on Test.class so different threads don't intermingle the
1260      // output. We can't use 'this' here because each thread has its own instance of Test class.
1261      synchronized (Test.class) {
1262        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1263        status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1264            latencyHistogram));
1265        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1266        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1267        if (valueSizeHistogram.getCount() > 0) {
1268          status.setStatus("ValueSize (bytes) : "
1269              + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1270          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1271          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1272        } else {
1273          status.setStatus("No valueSize statistics available");
1274        }
1275        if (rpcCallsHistogram.getCount() > 0) {
1276          status.setStatus("rpcCalls (count): " +
1277              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1278        }
1279        if (remoteRpcCallsHistogram.getCount() > 0) {
1280          status.setStatus("remoteRpcCalls (count): " +
1281              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1282        }
1283        if (millisBetweenNextHistogram.getCount() > 0) {
1284          status.setStatus("millisBetweenNext (latency): " +
1285              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1286        }
1287        if (regionsScannedHistogram.getCount() > 0) {
1288          status.setStatus("regionsScanned (count): " +
1289              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1290        }
1291        if (bytesInResultsHistogram.getCount() > 0) {
1292          status.setStatus("bytesInResults (size): " +
1293              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1294        }
1295        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1296          status.setStatus("bytesInRemoteResults (size): " +
1297              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1298        }
1299      }
1300      receiverHost.closeReceivers();
1301    }
1302
1303    abstract void onTakedown() throws IOException;
1304
1305
1306    /*
1307     * Run test
1308     * @return Elapsed time.
1309     * @throws IOException
1310     */
1311    long test() throws IOException, InterruptedException {
1312      testSetup();
1313      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1314      final long startTime = System.nanoTime();
1315      try {
1316        testTimed();
1317      } finally {
1318        testTakedown();
1319      }
1320      return (System.nanoTime() - startTime) / 1000000;
1321    }
1322
1323    int getStartRow() {
1324      return opts.startRow;
1325    }
1326
1327    int getLastRow() {
1328      return getStartRow() + opts.perClientRunRows;
1329    }
1330
1331    /**
1332     * Provides an extension point for tests that don't want a per row invocation.
1333     */
1334    void testTimed() throws IOException, InterruptedException {
1335      int startRow = getStartRow();
1336      int lastRow = getLastRow();
1337      TraceUtil.addSampler(traceSampler);
1338      // Report on completion of 1/10th of total.
1339      for (int ii = 0; ii < opts.cycles; ii++) {
1340        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1341        for (int i = startRow; i < lastRow; i++) {
1342          if (i % everyN != 0) continue;
1343          long startTime = System.nanoTime();
1344          boolean requestSent = false;
1345          try (TraceScope scope = TraceUtil.createTrace("test row");){
1346            requestSent = testRow(i);
1347          }
1348          if ( (i - startRow) > opts.measureAfter) {
1349            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1350            // first 9 times and sends the actual get request in the 10th iteration.
1351            // We should only set latency when actual request is sent because otherwise
1352            // it turns out to be 0.
1353            if (requestSent) {
1354              latencyHistogram.update((System.nanoTime() - startTime) / 1000);
1355            }
1356            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1357              status.setStatus(generateStatus(startRow, i, lastRow));
1358            }
1359          }
1360        }
1361      }
1362    }
1363
1364    /**
1365     * @return Subset of the histograms' calculation.
1366     */
1367    public String getShortLatencyReport() {
1368      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1369    }
1370
1371    /**
1372     * @return Subset of the histograms' calculation.
1373     */
1374    public String getShortValueSizeReport() {
1375      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1376    }
1377
1378
1379    /**
1380     * Test for individual row.
1381     * @param i Row index.
1382     * @return true if the row was sent to server and need to record metrics.
1383     *         False if not, multiGet and multiPut e.g., the rows are sent
1384     *         to server only if enough gets/puts are gathered.
1385     */
1386    abstract boolean testRow(final int i) throws IOException, InterruptedException;
1387  }
1388
1389  static abstract class Test extends TestBase {
1390    protected Connection connection;
1391
1392    Test(final Connection con, final TestOptions options, final Status status) {
1393      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1394      this.connection = con;
1395    }
1396  }
1397
1398  static abstract class AsyncTest extends TestBase {
1399    protected AsyncConnection connection;
1400
1401    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1402      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1403      this.connection = con;
1404    }
1405  }
1406
1407  static abstract class TableTest extends Test {
1408    protected Table table;
1409
1410    TableTest(Connection con, TestOptions options, Status status) {
1411      super(con, options, status);
1412    }
1413
1414    @Override
1415    void onStartup() throws IOException {
1416      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1417    }
1418
1419    @Override
1420    void onTakedown() throws IOException {
1421      table.close();
1422    }
1423  }
1424
1425  static abstract class AsyncTableTest extends AsyncTest {
1426    protected AsyncTable<?> table;
1427
1428    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1429      super(con, options, status);
1430    }
1431
1432    @Override
1433    void onStartup() throws IOException {
1434      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1435    }
1436
1437    @Override
1438    void onTakedown() throws IOException {
1439    }
1440  }
1441
1442  static class AsyncRandomReadTest extends AsyncTableTest {
1443    private final Consistency consistency;
1444    private ArrayList<Get> gets;
1445    private Random rd = new Random();
1446
1447    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1448      super(con, options, status);
1449      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1450      if (opts.multiGet > 0) {
1451        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1452        this.gets = new ArrayList<>(opts.multiGet);
1453      }
1454    }
1455
1456    @Override
1457    boolean testRow(final int i) throws IOException, InterruptedException {
1458      if (opts.randomSleep > 0) {
1459        Thread.sleep(rd.nextInt(opts.randomSleep));
1460      }
1461      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1462      for (int family = 0; family < opts.families; family++) {
1463        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1464        if (opts.addColumns) {
1465          for (int column = 0; column < opts.columns; column++) {
1466            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1467            get.addColumn(familyName, qualifier);
1468          }
1469        } else {
1470          get.addFamily(familyName);
1471        }
1472      }
1473      if (opts.filterAll) {
1474        get.setFilter(new FilterAllFilter());
1475      }
1476      get.setConsistency(consistency);
1477      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1478      try {
1479        if (opts.multiGet > 0) {
1480          this.gets.add(get);
1481          if (this.gets.size() == opts.multiGet) {
1482            Result[] rs =
1483                this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1484            updateValueSize(rs);
1485            this.gets.clear();
1486          } else {
1487            return false;
1488          }
1489        } else {
1490          updateValueSize(this.table.get(get).get());
1491        }
1492      } catch (ExecutionException e) {
1493        throw new IOException(e);
1494      }
1495      return true;
1496    }
1497
1498    public static RuntimeException runtime(Throwable e) {
1499      if (e instanceof RuntimeException) {
1500        return (RuntimeException) e;
1501      }
1502      return new RuntimeException(e);
1503    }
1504
1505    public static <V> V propagate(Callable<V> callable) {
1506      try {
1507        return callable.call();
1508      } catch (Exception e) {
1509        throw runtime(e);
1510      }
1511    }
1512
1513    @Override
1514    protected int getReportingPeriod() {
1515      int period = opts.perClientRunRows / 10;
1516      return period == 0 ? opts.perClientRunRows : period;
1517    }
1518
1519    @Override
1520    protected void testTakedown() throws IOException {
1521      if (this.gets != null && this.gets.size() > 0) {
1522        this.table.get(gets);
1523        this.gets.clear();
1524      }
1525      super.testTakedown();
1526    }
1527  }
1528
1529  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1530
1531    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1532      super(con, options, status);
1533    }
1534
1535    @Override
1536    protected byte[] generateRow(final int i) {
1537      return getRandomRow(this.rand, opts.totalRows);
1538    }
1539  }
1540
1541  static class AsyncScanTest extends AsyncTableTest {
1542    private ResultScanner testScanner;
1543    private AsyncTable<?> asyncTable;
1544
1545    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1546      super(con, options, status);
1547    }
1548
1549    @Override
1550    void onStartup() throws IOException {
1551      this.asyncTable =
1552          connection.getTable(TableName.valueOf(opts.tableName),
1553            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1554    }
1555
1556    @Override
1557    void testTakedown() throws IOException {
1558      if (this.testScanner != null) {
1559        updateScanMetrics(this.testScanner.getScanMetrics());
1560        this.testScanner.close();
1561      }
1562      super.testTakedown();
1563    }
1564
1565    @Override
1566    boolean testRow(final int i) throws IOException {
1567      if (this.testScanner == null) {
1568        Scan scan =
1569            new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1570                .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1571                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1572        for (int family = 0; family < opts.families; family++) {
1573          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1574          if (opts.addColumns) {
1575            for (int column = 0; column < opts.columns; column++) {
1576              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1577              scan.addColumn(familyName, qualifier);
1578            }
1579          } else {
1580            scan.addFamily(familyName);
1581          }
1582        }
1583        if (opts.filterAll) {
1584          scan.setFilter(new FilterAllFilter());
1585        }
1586        this.testScanner = asyncTable.getScanner(scan);
1587      }
1588      Result r = testScanner.next();
1589      updateValueSize(r);
1590      return true;
1591    }
1592  }
1593
1594  static class AsyncSequentialReadTest extends AsyncTableTest {
1595    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1596      super(con, options, status);
1597    }
1598
1599    @Override
1600    boolean testRow(final int i) throws IOException, InterruptedException {
1601      Get get = new Get(format(i));
1602      for (int family = 0; family < opts.families; family++) {
1603        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1604        if (opts.addColumns) {
1605          for (int column = 0; column < opts.columns; column++) {
1606            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1607            get.addColumn(familyName, qualifier);
1608          }
1609        } else {
1610          get.addFamily(familyName);
1611        }
1612      }
1613      if (opts.filterAll) {
1614        get.setFilter(new FilterAllFilter());
1615      }
1616      try {
1617        updateValueSize(table.get(get).get());
1618      } catch (ExecutionException e) {
1619        throw new IOException(e);
1620      }
1621      return true;
1622    }
1623  }
1624
1625  static class AsyncSequentialWriteTest extends AsyncTableTest {
1626    private ArrayList<Put> puts;
1627
1628    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1629      super(con, options, status);
1630      if (opts.multiPut > 0) {
1631        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1632        this.puts = new ArrayList<>(opts.multiPut);
1633      }
1634    }
1635
1636    protected byte[] generateRow(final int i) {
1637      return format(i);
1638    }
1639
1640    @Override
1641    @SuppressWarnings("ReturnValueIgnored")
1642    boolean testRow(final int i) throws IOException, InterruptedException {
1643      byte[] row = generateRow(i);
1644      Put put = new Put(row);
1645      for (int family = 0; family < opts.families; family++) {
1646        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1647        for (int column = 0; column < opts.columns; column++) {
1648          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1649          byte[] value = generateData(this.rand, getValueLength(this.rand));
1650          if (opts.useTags) {
1651            byte[] tag = generateData(this.rand, TAG_LENGTH);
1652            Tag[] tags = new Tag[opts.noOfTags];
1653            for (int n = 0; n < opts.noOfTags; n++) {
1654              Tag t = new ArrayBackedTag((byte) n, tag);
1655              tags[n] = t;
1656            }
1657            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1658              value, tags);
1659            put.add(kv);
1660            updateValueSize(kv.getValueLength());
1661          } else {
1662            put.addColumn(familyName, qualifier, value);
1663            updateValueSize(value.length);
1664          }
1665        }
1666      }
1667      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1668      try {
1669        table.put(put).get();
1670        if (opts.multiPut > 0) {
1671          this.puts.add(put);
1672          if (this.puts.size() == opts.multiPut) {
1673            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1674            this.puts.clear();
1675          } else {
1676            return false;
1677          }
1678        } else {
1679          table.put(put).get();
1680        }
1681      } catch (ExecutionException e) {
1682        throw new IOException(e);
1683      }
1684      return true;
1685    }
1686  }
1687
1688  static abstract class BufferedMutatorTest extends Test {
1689    protected BufferedMutator mutator;
1690    protected Table table;
1691
1692    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1693      super(con, options, status);
1694    }
1695
1696    @Override
1697    void onStartup() throws IOException {
1698      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1699      p.writeBufferSize(opts.bufferSize);
1700      this.mutator = connection.getBufferedMutator(p);
1701      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1702    }
1703
1704    @Override
1705    void onTakedown() throws IOException {
1706      mutator.close();
1707      table.close();
1708    }
1709  }
1710
1711  static class RandomSeekScanTest extends TableTest {
1712    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1713      super(con, options, status);
1714    }
1715
1716    @Override
1717    boolean testRow(final int i) throws IOException {
1718      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1719          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1720          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1721          .setScanMetricsEnabled(true);
1722      FilterList list = new FilterList();
1723      for (int family = 0; family < opts.families; family++) {
1724        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1725        if (opts.addColumns) {
1726          for (int column = 0; column < opts.columns; column++) {
1727            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1728            scan.addColumn(familyName, qualifier);
1729          }
1730        } else {
1731          scan.addFamily(familyName);
1732        }
1733      }
1734      if (opts.filterAll) {
1735        list.addFilter(new FilterAllFilter());
1736      }
1737      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1738      scan.setFilter(list);
1739      ResultScanner s = this.table.getScanner(scan);
1740      try {
1741        for (Result rr; (rr = s.next()) != null;) {
1742          updateValueSize(rr);
1743        }
1744      } finally {
1745        updateScanMetrics(s.getScanMetrics());
1746        s.close();
1747      }
1748      return true;
1749    }
1750
1751    @Override
1752    protected int getReportingPeriod() {
1753      int period = opts.perClientRunRows / 100;
1754      return period == 0 ? opts.perClientRunRows : period;
1755    }
1756
1757  }
1758
1759  static abstract class RandomScanWithRangeTest extends TableTest {
1760    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1761      super(con, options, status);
1762    }
1763
1764    @Override
1765    boolean testRow(final int i) throws IOException {
1766      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1767      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1768          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1769          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1770          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1771      for (int family = 0; family < opts.families; family++) {
1772        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1773        if (opts.addColumns) {
1774          for (int column = 0; column < opts.columns; column++) {
1775            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1776            scan.addColumn(familyName, qualifier);
1777          }
1778        } else {
1779          scan.addFamily(familyName);
1780        }
1781      }
1782      if (opts.filterAll) {
1783        scan.setFilter(new FilterAllFilter());
1784      }
1785      Result r = null;
1786      int count = 0;
1787      ResultScanner s = this.table.getScanner(scan);
1788      try {
1789        for (; (r = s.next()) != null;) {
1790          updateValueSize(r);
1791          count++;
1792        }
1793        if (i % 100 == 0) {
1794          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1795            Bytes.toString(startAndStopRow.getFirst()),
1796            Bytes.toString(startAndStopRow.getSecond()), count));
1797        }
1798      } finally {
1799        updateScanMetrics(s.getScanMetrics());
1800        s.close();
1801      }
1802      return true;
1803    }
1804
1805    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1806
1807    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1808      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1809      int stop = start + maxRange;
1810      return new Pair<>(format(start), format(stop));
1811    }
1812
1813    @Override
1814    protected int getReportingPeriod() {
1815      int period = opts.perClientRunRows / 100;
1816      return period == 0? opts.perClientRunRows: period;
1817    }
1818  }
1819
1820  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1821    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1822      super(con, options, status);
1823    }
1824
1825    @Override
1826    protected Pair<byte[], byte[]> getStartAndStopRow() {
1827      return generateStartAndStopRows(10);
1828    }
1829  }
1830
1831  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1832    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1833      super(con, options, status);
1834    }
1835
1836    @Override
1837    protected Pair<byte[], byte[]> getStartAndStopRow() {
1838      return generateStartAndStopRows(100);
1839    }
1840  }
1841
1842  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1843    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1844      super(con, options, status);
1845    }
1846
1847    @Override
1848    protected Pair<byte[], byte[]> getStartAndStopRow() {
1849      return generateStartAndStopRows(1000);
1850    }
1851  }
1852
1853  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1854    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1855      super(con, options, status);
1856    }
1857
1858    @Override
1859    protected Pair<byte[], byte[]> getStartAndStopRow() {
1860      return generateStartAndStopRows(10000);
1861    }
1862  }
1863
1864  static class RandomReadTest extends TableTest {
1865    private final Consistency consistency;
1866    private ArrayList<Get> gets;
1867    private Random rd = new Random();
1868
1869    RandomReadTest(Connection con, TestOptions options, Status status) {
1870      super(con, options, status);
1871      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1872      if (opts.multiGet > 0) {
1873        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1874        this.gets = new ArrayList<>(opts.multiGet);
1875      }
1876    }
1877
1878    @Override
1879    boolean testRow(final int i) throws IOException, InterruptedException {
1880      if (opts.randomSleep > 0) {
1881        Thread.sleep(rd.nextInt(opts.randomSleep));
1882      }
1883      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1884      for (int family = 0; family < opts.families; family++) {
1885        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1886        if (opts.addColumns) {
1887          for (int column = 0; column < opts.columns; column++) {
1888            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1889            get.addColumn(familyName, qualifier);
1890          }
1891        } else {
1892          get.addFamily(familyName);
1893        }
1894      }
1895      if (opts.filterAll) {
1896        get.setFilter(new FilterAllFilter());
1897      }
1898      get.setConsistency(consistency);
1899      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1900      if (opts.multiGet > 0) {
1901        this.gets.add(get);
1902        if (this.gets.size() == opts.multiGet) {
1903          Result [] rs = this.table.get(this.gets);
1904          updateValueSize(rs);
1905          this.gets.clear();
1906        } else {
1907          return false;
1908        }
1909      } else {
1910        updateValueSize(this.table.get(get));
1911      }
1912      return true;
1913    }
1914
1915    @Override
1916    protected int getReportingPeriod() {
1917      int period = opts.perClientRunRows / 10;
1918      return period == 0 ? opts.perClientRunRows : period;
1919    }
1920
1921    @Override
1922    protected void testTakedown() throws IOException {
1923      if (this.gets != null && this.gets.size() > 0) {
1924        this.table.get(gets);
1925        this.gets.clear();
1926      }
1927      super.testTakedown();
1928    }
1929  }
1930
1931  static class RandomWriteTest extends SequentialWriteTest {
1932    RandomWriteTest(Connection con, TestOptions options, Status status) {
1933      super(con, options, status);
1934    }
1935
1936    @Override
1937    protected byte[] generateRow(final int i) {
1938      return getRandomRow(this.rand, opts.totalRows);
1939    }
1940
1941
1942  }
1943
1944  static class ScanTest extends TableTest {
1945    private ResultScanner testScanner;
1946
1947    ScanTest(Connection con, TestOptions options, Status status) {
1948      super(con, options, status);
1949    }
1950
1951    @Override
1952    void testTakedown() throws IOException {
1953      if (this.testScanner != null) {
1954        this.testScanner.close();
1955      }
1956      super.testTakedown();
1957    }
1958
1959
1960    @Override
1961    boolean testRow(final int i) throws IOException {
1962      if (this.testScanner == null) {
1963        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1964            .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1965            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1966        for (int family = 0; family < opts.families; family++) {
1967          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1968          if (opts.addColumns) {
1969            for (int column = 0; column < opts.columns; column++) {
1970              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1971              scan.addColumn(familyName, qualifier);
1972            }
1973          } else {
1974            scan.addFamily(familyName);
1975          }
1976        }
1977        if (opts.filterAll) {
1978          scan.setFilter(new FilterAllFilter());
1979        }
1980        this.testScanner = table.getScanner(scan);
1981      }
1982      Result r = testScanner.next();
1983      updateValueSize(r);
1984      return true;
1985    }
1986  }
1987
1988  /**
1989   * Base class for operations that are CAS-like; that read a value and then set it based off what
1990   * they read. In this category is increment, append, checkAndPut, etc.
1991   *
1992   * <p>These operations also want some concurrency going on. Usually when these tests run, they
1993   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
1994   * same key space. We do this with our getStartRow and getLastRow overrides.
1995   */
1996  static abstract class CASTableTest extends TableTest {
1997    private final byte [] qualifier;
1998    CASTableTest(Connection con, TestOptions options, Status status) {
1999      super(con, options, status);
2000      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2001    }
2002
2003    byte [] getQualifier() {
2004      return this.qualifier;
2005    }
2006
2007    @Override
2008    int getStartRow() {
2009      return 0;
2010    }
2011
2012    @Override
2013    int getLastRow() {
2014      return opts.perClientRunRows;
2015    }
2016  }
2017
2018  static class IncrementTest extends CASTableTest {
2019    IncrementTest(Connection con, TestOptions options, Status status) {
2020      super(con, options, status);
2021    }
2022
2023    @Override
2024    boolean testRow(final int i) throws IOException {
2025      Increment increment = new Increment(format(i));
2026      // unlike checkAndXXX tests, which make most sense to do on a single value,
2027      // if multiple families are specified for an increment test we assume it is
2028      // meant to raise the work factor
2029      for (int family = 0; family < opts.families; family++) {
2030        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2031        increment.addColumn(familyName, getQualifier(), 1l);
2032      }
2033      updateValueSize(this.table.increment(increment));
2034      return true;
2035    }
2036  }
2037
2038  static class AppendTest extends CASTableTest {
2039    AppendTest(Connection con, TestOptions options, Status status) {
2040      super(con, options, status);
2041    }
2042
2043    @Override
2044    boolean testRow(final int i) throws IOException {
2045      byte [] bytes = format(i);
2046      Append append = new Append(bytes);
2047      // unlike checkAndXXX tests, which make most sense to do on a single value,
2048      // if multiple families are specified for an append test we assume it is
2049      // meant to raise the work factor
2050      for (int family = 0; family < opts.families; family++) {
2051        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2052        append.addColumn(familyName, getQualifier(), bytes);
2053      }
2054      updateValueSize(this.table.append(append));
2055      return true;
2056    }
2057  }
2058
2059  static class CheckAndMutateTest extends CASTableTest {
2060    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2061      super(con, options, status);
2062    }
2063
2064    @Override
2065    boolean testRow(final int i) throws IOException {
2066      final byte [] bytes = format(i);
2067      // checkAndXXX tests operate on only a single value
2068      // Put a known value so when we go to check it, it is there.
2069      Put put = new Put(bytes);
2070      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2071      this.table.put(put);
2072      RowMutations mutations = new RowMutations(bytes);
2073      mutations.add(put);
2074      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2075          .ifEquals(bytes).thenMutate(mutations);
2076      return true;
2077    }
2078  }
2079
2080  static class CheckAndPutTest extends CASTableTest {
2081    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2082      super(con, options, status);
2083    }
2084
2085    @Override
2086    boolean testRow(final int i) throws IOException {
2087      final byte [] bytes = format(i);
2088      // checkAndXXX tests operate on only a single value
2089      // Put a known value so when we go to check it, it is there.
2090      Put put = new Put(bytes);
2091      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2092      this.table.put(put);
2093      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2094          .ifEquals(bytes).thenPut(put);
2095      return true;
2096    }
2097  }
2098
2099  static class CheckAndDeleteTest extends CASTableTest {
2100    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2101      super(con, options, status);
2102    }
2103
2104    @Override
2105    boolean testRow(final int i) throws IOException {
2106      final byte [] bytes = format(i);
2107      // checkAndXXX tests operate on only a single value
2108      // Put a known value so when we go to check it, it is there.
2109      Put put = new Put(bytes);
2110      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2111      this.table.put(put);
2112      Delete delete = new Delete(put.getRow());
2113      delete.addColumn(FAMILY_ZERO, getQualifier());
2114      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2115          .ifEquals(bytes).thenDelete(delete);
2116      return true;
2117    }
2118  }
2119
2120  static class SequentialReadTest extends TableTest {
2121    SequentialReadTest(Connection con, TestOptions options, Status status) {
2122      super(con, options, status);
2123    }
2124
2125    @Override
2126    boolean testRow(final int i) throws IOException {
2127      Get get = new Get(format(i));
2128      for (int family = 0; family < opts.families; family++) {
2129        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2130        if (opts.addColumns) {
2131          for (int column = 0; column < opts.columns; column++) {
2132            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2133            get.addColumn(familyName, qualifier);
2134          }
2135        } else {
2136          get.addFamily(familyName);
2137        }
2138      }
2139      if (opts.filterAll) {
2140        get.setFilter(new FilterAllFilter());
2141      }
2142      updateValueSize(table.get(get));
2143      return true;
2144    }
2145  }
2146
2147  static class SequentialWriteTest extends BufferedMutatorTest {
2148    private ArrayList<Put> puts;
2149
2150
2151    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2152      super(con, options, status);
2153      if (opts.multiPut > 0) {
2154        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2155        this.puts = new ArrayList<>(opts.multiPut);
2156      }
2157    }
2158
2159    protected byte[] generateRow(final int i) {
2160      return format(i);
2161    }
2162
2163    @Override
2164    boolean testRow(final int i) throws IOException {
2165      byte[] row = generateRow(i);
2166      Put put = new Put(row);
2167      for (int family = 0; family < opts.families; family++) {
2168        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2169        for (int column = 0; column < opts.columns; column++) {
2170          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2171          byte[] value = generateData(this.rand, getValueLength(this.rand));
2172          if (opts.useTags) {
2173            byte[] tag = generateData(this.rand, TAG_LENGTH);
2174            Tag[] tags = new Tag[opts.noOfTags];
2175            for (int n = 0; n < opts.noOfTags; n++) {
2176              Tag t = new ArrayBackedTag((byte) n, tag);
2177              tags[n] = t;
2178            }
2179            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2180              value, tags);
2181            put.add(kv);
2182            updateValueSize(kv.getValueLength());
2183          } else {
2184            put.addColumn(familyName, qualifier, value);
2185            updateValueSize(value.length);
2186          }
2187        }
2188      }
2189      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2190      if (opts.autoFlush) {
2191        if (opts.multiPut > 0) {
2192          this.puts.add(put);
2193          if (this.puts.size() == opts.multiPut) {
2194            table.put(this.puts);
2195            this.puts.clear();
2196          } else {
2197            return false;
2198          }
2199        } else {
2200          table.put(put);
2201        }
2202      } else {
2203        mutator.mutate(put);
2204      }
2205      return true;
2206    }
2207  }
2208
2209  static class FilteredScanTest extends TableTest {
2210    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2211
2212    FilteredScanTest(Connection con, TestOptions options, Status status) {
2213      super(con, options, status);
2214      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2215        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB +
2216                ". This could take a very long time.");
2217      }
2218    }
2219
2220    @Override
2221    boolean testRow(int i) throws IOException {
2222      byte[] value = generateData(this.rand, getValueLength(this.rand));
2223      Scan scan = constructScan(value);
2224      ResultScanner scanner = null;
2225      try {
2226        scanner = this.table.getScanner(scan);
2227        for (Result r = null; (r = scanner.next()) != null;) {
2228          updateValueSize(r);
2229        }
2230      } finally {
2231        if (scanner != null) {
2232          updateScanMetrics(scanner.getScanMetrics());
2233          scanner.close();
2234        }
2235      }
2236      return true;
2237    }
2238
2239    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2240      FilterList list = new FilterList();
2241      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2242        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2243      list.addFilter(filter);
2244      if (opts.filterAll) {
2245        list.addFilter(new FilterAllFilter());
2246      }
2247      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2248          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2249          .setScanMetricsEnabled(true);
2250      if (opts.addColumns) {
2251        for (int column = 0; column < opts.columns; column++) {
2252          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2253          scan.addColumn(FAMILY_ZERO, qualifier);
2254        }
2255      } else {
2256        scan.addFamily(FAMILY_ZERO);
2257      }
2258      scan.setFilter(list);
2259      return scan;
2260    }
2261  }
2262
2263  /**
2264   * Compute a throughput rate in MB/s.
2265   * @param rows Number of records consumed.
2266   * @param timeMs Time taken in milliseconds.
2267   * @return String value with label, ie '123.76 MB/s'
2268   */
2269  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2270    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2271      ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2272    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2273      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2274      .divide(BYTES_PER_MB, CXT);
2275    return FMT.format(mbps) + " MB/s";
2276  }
2277
2278  /*
2279   * Format passed integer.
2280   * @param number
2281   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2282   * number (Does absolute in case number is negative).
2283   */
2284  public static byte [] format(final int number) {
2285    byte [] b = new byte[ROW_LENGTH];
2286    int d = Math.abs(number);
2287    for (int i = b.length - 1; i >= 0; i--) {
2288      b[i] = (byte)((d % 10) + '0');
2289      d /= 10;
2290    }
2291    return b;
2292  }
2293
2294  /*
2295   * This method takes some time and is done inline uploading data.  For
2296   * example, doing the mapfile test, generation of the key and value
2297   * consumes about 30% of CPU time.
2298   * @return Generated random value to insert into a table cell.
2299   */
2300  public static byte[] generateData(final Random r, int length) {
2301    byte [] b = new byte [length];
2302    int i;
2303
2304    for(i = 0; i < (length-8); i += 8) {
2305      b[i] = (byte) (65 + r.nextInt(26));
2306      b[i+1] = b[i];
2307      b[i+2] = b[i];
2308      b[i+3] = b[i];
2309      b[i+4] = b[i];
2310      b[i+5] = b[i];
2311      b[i+6] = b[i];
2312      b[i+7] = b[i];
2313    }
2314
2315    byte a = (byte) (65 + r.nextInt(26));
2316    for(; i < length; i++) {
2317      b[i] = a;
2318    }
2319    return b;
2320  }
2321
2322  static byte [] getRandomRow(final Random random, final int totalRows) {
2323    return format(generateRandomRow(random, totalRows));
2324  }
2325
2326  static int generateRandomRow(final Random random, final int totalRows) {
2327    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2328  }
2329
2330  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2331      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2332      throws IOException, InterruptedException {
2333    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2334        + opts.perClientRunRows + " rows");
2335    long totalElapsedTime;
2336
2337    final TestBase t;
2338    try {
2339      if (AsyncTest.class.isAssignableFrom(cmd)) {
2340        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2341        Constructor<? extends AsyncTest> constructor =
2342            newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2343        t = constructor.newInstance(asyncCon, opts, status);
2344      } else {
2345        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2346        Constructor<? extends Test> constructor =
2347            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2348        t = constructor.newInstance(con, opts, status);
2349      }
2350    } catch (NoSuchMethodException e) {
2351      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2352          + ".  It does not provide a constructor as described by "
2353          + "the javadoc comment.  Available constructors are: "
2354          + Arrays.toString(cmd.getConstructors()));
2355    } catch (Exception e) {
2356      throw new IllegalStateException("Failed to construct command class", e);
2357    }
2358    totalElapsedTime = t.test();
2359
2360    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2361      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2362      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2363          getAverageValueLength(opts), opts.families, opts.columns) + ")");
2364
2365    return new RunResult(totalElapsedTime, t.getLatencyHistogram());
2366  }
2367
2368  private static int getAverageValueLength(final TestOptions opts) {
2369    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2370  }
2371
2372  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2373      InterruptedException, ClassNotFoundException, ExecutionException {
2374    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2375    // the TestOptions introspection for us and dump the output in a readable format.
2376    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2377    Admin admin = null;
2378    Connection connection = null;
2379    try {
2380      connection = ConnectionFactory.createConnection(getConf());
2381      admin = connection.getAdmin();
2382      checkTable(admin, opts);
2383    } finally {
2384      if (admin != null) admin.close();
2385      if (connection != null) connection.close();
2386    }
2387    if (opts.nomapred) {
2388      doLocalClients(opts, getConf());
2389    } else {
2390      doMapReduce(opts, getConf());
2391    }
2392  }
2393
2394  protected void printUsage() {
2395    printUsage(PE_COMMAND_SHORTNAME, null);
2396  }
2397
2398  protected static void printUsage(final String message) {
2399    printUsage(PE_COMMAND_SHORTNAME, message);
2400  }
2401
2402  protected static void printUsageAndExit(final String message, final int exitCode) {
2403    printUsage(message);
2404    System.exit(exitCode);
2405  }
2406
2407  protected static void printUsage(final String shortName, final String message) {
2408    if (message != null && message.length() > 0) {
2409      System.err.println(message);
2410    }
2411    System.err.print("Usage: hbase " + shortName);
2412    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2413    System.err.println();
2414    System.err.println("General Options:");
2415    System.err.println(" nomapred        Run multiple clients using threads " +
2416      "(rather than use mapreduce)");
2417    System.err.println(" oneCon          all the threads share the same connection. Default: False");
2418    System.err.println(" connCount          connections all threads share. "
2419        + "For example, if set to 2, then all thread share 2 connection. "
2420        + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2421        + "if not, connCount=thread number");
2422
2423    System.err.println(" sampleRate      Execute test on a sample of total " +
2424      "rows. Only supported by randomRead. Default: 1.0");
2425    System.err.println(" period          Report every 'period' rows: " +
2426      "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2427    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2428    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
2429      "Default: 0");
2430    System.err.println(" latency         Set to report operation latencies. Default: False");
2431    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'" +
2432        " rows have been treated. Default: 0");
2433    System.err.println(" valueSize       Pass value size to use: Default: "
2434        + DEFAULT_OPTS.getValueSize());
2435    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
2436        "'valueSize'; set on read for stats on size: Default: Not set.");
2437    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2438        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2439    System.err.println();
2440    System.err.println("Table Creation / Write Tests:");
2441    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2442    System.err.println(" rows            Rows each client runs. Default: "
2443        + DEFAULT_OPTS.getPerClientRunRows()
2444        + ".  In case of randomReads and randomSeekScans this could"
2445        + " be specified along with --size to specify the number of rows to be scanned within"
2446        + " the total range specified by the size.");
2447    System.err.println(
2448      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2449          + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2450          + " use size to specify the end range and --rows"
2451          + " specifies the number of rows within that range. " + "Default: 1.0.");
2452    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2453    System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
2454      "Default: false");
2455    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
2456        "'valueSize' in zipf form: Default: Not set.");
2457    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2458    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2459    System.err.println(" multiPut        Batch puts together into groups of N. Only supported " +
2460        "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2461    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2462        + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2463        + "Recommended for accurate perf analysis (see guide). Default: disabled");
2464    System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
2465      "Default: false");
2466    System.err.println(" numoftags       Specify the no of tags that would be needed. " +
2467       "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2468    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2469    System.err.println(" columns         Columns to write per row. Default: 1");
2470    System.err.println(" families        Specify number of column families for the table. Default: 1");
2471    System.err.println();
2472    System.err.println("Read Tests:");
2473    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2474        + " there by not returning any thing back to the client.  Helps to check the server side"
2475        + " performance.  Uses FilterAllFilter internally. ");
2476    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
2477      "by randomRead. Default: disabled");
2478    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
2479      "inmemory as far as possible. Not guaranteed that reads are always served " +
2480      "from memory.  Default: false");
2481    System.err.println(" bloomFilter     Bloom filter type, one of "
2482        + Arrays.toString(BloomType.values()));
2483    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2484    System.err.println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2485        + "Uses the CompactingMemstore");
2486    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2487    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2488    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2489    System.err.println(" caching         Scan caching to use. Default: 30");
2490    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2491    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2492    System.err.println(" scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2493    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2494    System.err.println();
2495    System.err.println(" Note: -D properties will be applied to the conf used. ");
2496    System.err.println("  For example: ");
2497    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2498    System.err.println("   -Dmapreduce.task.timeout=60000");
2499    System.err.println();
2500    System.err.println("Command:");
2501    for (CmdDescriptor command : COMMANDS.values()) {
2502      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2503    }
2504    System.err.println();
2505    System.err.println("Args:");
2506    System.err.println(" nclients        Integer. Required. Total number of clients "
2507        + "(and HRegionServers) running. 1 <= value <= 500");
2508    System.err.println("Examples:");
2509    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2510    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2511    System.err.println(" To run 10 clients doing increments over ten rows:");
2512    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2513  }
2514
2515  /**
2516   * Parse options passed in via an arguments array. Assumes that array has been split
2517   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2518   * in the queue at the conclusion of this method call. It's up to the caller to deal
2519   * with these unrecognized arguments.
2520   */
2521  static TestOptions parseOpts(Queue<String> args) {
2522    TestOptions opts = new TestOptions();
2523
2524    String cmd = null;
2525    while ((cmd = args.poll()) != null) {
2526      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2527        // place item back onto queue so that caller knows parsing was incomplete
2528        args.add(cmd);
2529        break;
2530      }
2531
2532      final String nmr = "--nomapred";
2533      if (cmd.startsWith(nmr)) {
2534        opts.nomapred = true;
2535        continue;
2536      }
2537
2538      final String rows = "--rows=";
2539      if (cmd.startsWith(rows)) {
2540        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2541        continue;
2542      }
2543
2544      final String cycles = "--cycles=";
2545      if (cmd.startsWith(cycles)) {
2546        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2547        continue;
2548      }
2549
2550      final String sampleRate = "--sampleRate=";
2551      if (cmd.startsWith(sampleRate)) {
2552        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2553        continue;
2554      }
2555
2556      final String table = "--table=";
2557      if (cmd.startsWith(table)) {
2558        opts.tableName = cmd.substring(table.length());
2559        continue;
2560      }
2561
2562      final String startRow = "--startRow=";
2563      if (cmd.startsWith(startRow)) {
2564        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2565        continue;
2566      }
2567
2568      final String compress = "--compress=";
2569      if (cmd.startsWith(compress)) {
2570        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2571        continue;
2572      }
2573
2574      final String traceRate = "--traceRate=";
2575      if (cmd.startsWith(traceRate)) {
2576        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2577        continue;
2578      }
2579
2580      final String blockEncoding = "--blockEncoding=";
2581      if (cmd.startsWith(blockEncoding)) {
2582        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2583        continue;
2584      }
2585
2586      final String flushCommits = "--flushCommits=";
2587      if (cmd.startsWith(flushCommits)) {
2588        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2589        continue;
2590      }
2591
2592      final String writeToWAL = "--writeToWAL=";
2593      if (cmd.startsWith(writeToWAL)) {
2594        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2595        continue;
2596      }
2597
2598      final String presplit = "--presplit=";
2599      if (cmd.startsWith(presplit)) {
2600        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2601        continue;
2602      }
2603
2604      final String inMemory = "--inmemory=";
2605      if (cmd.startsWith(inMemory)) {
2606        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2607        continue;
2608      }
2609
2610      final String autoFlush = "--autoFlush=";
2611      if (cmd.startsWith(autoFlush)) {
2612        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2613        if (!opts.autoFlush && opts.multiPut > 0) {
2614          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2615        }
2616        continue;
2617      }
2618
2619      final String onceCon = "--oneCon=";
2620      if (cmd.startsWith(onceCon)) {
2621        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2622        if (opts.oneCon && opts.connCount > 1) {
2623          throw new IllegalArgumentException("oneCon is set to true, "
2624              + "connCount should not bigger than 1");
2625        }
2626        continue;
2627      }
2628
2629      final String connCount = "--connCount=";
2630      if (cmd.startsWith(connCount)) {
2631        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2632        if (opts.oneCon && opts.connCount > 1) {
2633          throw new IllegalArgumentException("oneCon is set to true, "
2634              + "connCount should not bigger than 1");
2635        }
2636        continue;
2637      }
2638
2639      final String latency = "--latency";
2640      if (cmd.startsWith(latency)) {
2641        opts.reportLatency = true;
2642        continue;
2643      }
2644
2645      final String multiGet = "--multiGet=";
2646      if (cmd.startsWith(multiGet)) {
2647        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2648        continue;
2649      }
2650
2651      final String multiPut = "--multiPut=";
2652      if (cmd.startsWith(multiPut)) {
2653        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2654        if (!opts.autoFlush && opts.multiPut > 0) {
2655          throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2656        }
2657        continue;
2658      }
2659
2660      final String useTags = "--usetags=";
2661      if (cmd.startsWith(useTags)) {
2662        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2663        continue;
2664      }
2665
2666      final String noOfTags = "--numoftags=";
2667      if (cmd.startsWith(noOfTags)) {
2668        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2669        continue;
2670      }
2671
2672      final String replicas = "--replicas=";
2673      if (cmd.startsWith(replicas)) {
2674        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2675        continue;
2676      }
2677
2678      final String filterOutAll = "--filterAll";
2679      if (cmd.startsWith(filterOutAll)) {
2680        opts.filterAll = true;
2681        continue;
2682      }
2683
2684      final String size = "--size=";
2685      if (cmd.startsWith(size)) {
2686        opts.size = Float.parseFloat(cmd.substring(size.length()));
2687        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2688        continue;
2689      }
2690
2691      final String splitPolicy = "--splitPolicy=";
2692      if (cmd.startsWith(splitPolicy)) {
2693        opts.splitPolicy = cmd.substring(splitPolicy.length());
2694        continue;
2695      }
2696
2697      final String randomSleep = "--randomSleep=";
2698      if (cmd.startsWith(randomSleep)) {
2699        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2700        continue;
2701      }
2702
2703      final String measureAfter = "--measureAfter=";
2704      if (cmd.startsWith(measureAfter)) {
2705        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2706        continue;
2707      }
2708
2709      final String bloomFilter = "--bloomFilter=";
2710      if (cmd.startsWith(bloomFilter)) {
2711        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2712        continue;
2713      }
2714
2715      final String blockSize = "--blockSize=";
2716      if(cmd.startsWith(blockSize) ) {
2717        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2718      }
2719
2720      final String valueSize = "--valueSize=";
2721      if (cmd.startsWith(valueSize)) {
2722        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2723        continue;
2724      }
2725
2726      final String valueRandom = "--valueRandom";
2727      if (cmd.startsWith(valueRandom)) {
2728        opts.valueRandom = true;
2729        if (opts.valueZipf) {
2730          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2731        }
2732        continue;
2733      }
2734
2735      final String valueZipf = "--valueZipf";
2736      if (cmd.startsWith(valueZipf)) {
2737        opts.valueZipf = true;
2738        if (opts.valueRandom) {
2739          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2740        }
2741        continue;
2742      }
2743
2744      final String period = "--period=";
2745      if (cmd.startsWith(period)) {
2746        opts.period = Integer.parseInt(cmd.substring(period.length()));
2747        continue;
2748      }
2749
2750      final String addColumns = "--addColumns=";
2751      if (cmd.startsWith(addColumns)) {
2752        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2753        continue;
2754      }
2755
2756      final String inMemoryCompaction = "--inmemoryCompaction=";
2757      if (cmd.startsWith(inMemoryCompaction)) {
2758        opts.inMemoryCompaction =
2759            MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2760        continue;
2761      }
2762
2763      final String columns = "--columns=";
2764      if (cmd.startsWith(columns)) {
2765        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2766        continue;
2767      }
2768
2769      final String families = "--families=";
2770      if (cmd.startsWith(families)) {
2771        opts.families = Integer.parseInt(cmd.substring(families.length()));
2772        continue;
2773      }
2774
2775      final String caching = "--caching=";
2776      if (cmd.startsWith(caching)) {
2777        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2778        continue;
2779      }
2780
2781      final String asyncPrefetch = "--asyncPrefetch";
2782      if (cmd.startsWith(asyncPrefetch)) {
2783        opts.asyncPrefetch = true;
2784        continue;
2785      }
2786
2787      final String cacheBlocks = "--cacheBlocks=";
2788      if (cmd.startsWith(cacheBlocks)) {
2789        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2790        continue;
2791      }
2792
2793      final String scanReadType = "--scanReadType=";
2794      if (cmd.startsWith(scanReadType)) {
2795        opts.scanReadType =
2796            Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2797        continue;
2798      }
2799
2800      final String bufferSize = "--bufferSize=";
2801      if (cmd.startsWith(bufferSize)) {
2802        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2803        continue;
2804      }
2805
2806      if (isCommandClass(cmd)) {
2807        opts.cmdName = cmd;
2808        try {
2809          opts.numClientThreads = Integer.parseInt(args.remove());
2810        } catch (NoSuchElementException | NumberFormatException e) {
2811          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2812        }
2813        opts = calculateRowsAndSize(opts);
2814        break;
2815      } else {
2816        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2817      }
2818
2819      // Not matching any option or command.
2820      System.err.println("Error: Wrong option or command: " + cmd);
2821      args.add(cmd);
2822      break;
2823    }
2824    return opts;
2825  }
2826
2827  static TestOptions calculateRowsAndSize(final TestOptions opts) {
2828    int rowsPerGB = getRowsPerGB(opts);
2829    if ((opts.getCmdName() != null
2830        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
2831        && opts.size != DEFAULT_OPTS.size
2832        && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2833      opts.totalRows = (int) opts.size * rowsPerGB;
2834    } else if (opts.size != DEFAULT_OPTS.size) {
2835      // total size in GB specified
2836      opts.totalRows = (int) opts.size * rowsPerGB;
2837      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2838    } else {
2839      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2840      opts.size = opts.totalRows / rowsPerGB;
2841    }
2842    return opts;
2843  }
2844
2845  static int getRowsPerGB(final TestOptions opts) {
2846    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
2847        opts.getColumns());
2848  }
2849
2850  @Override
2851  public int run(String[] args) throws Exception {
2852    // Process command-line args. TODO: Better cmd-line processing
2853    // (but hopefully something not as painful as cli options).
2854    int errCode = -1;
2855    if (args.length < 1) {
2856      printUsage();
2857      return errCode;
2858    }
2859
2860    try {
2861      LinkedList<String> argv = new LinkedList<>();
2862      argv.addAll(Arrays.asList(args));
2863      TestOptions opts = parseOpts(argv);
2864
2865      // args remaining, print help and exit
2866      if (!argv.isEmpty()) {
2867        errCode = 0;
2868        printUsage();
2869        return errCode;
2870      }
2871
2872      // must run at least 1 client
2873      if (opts.numClientThreads <= 0) {
2874        throw new IllegalArgumentException("Number of clients must be > 0");
2875      }
2876
2877      // cmdName should not be null, print help and exit
2878      if (opts.cmdName == null) {
2879        printUsage();
2880        return errCode;
2881      }
2882
2883      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
2884      if (cmdClass != null) {
2885        runTest(cmdClass, opts);
2886        errCode = 0;
2887      }
2888
2889    } catch (Exception e) {
2890      e.printStackTrace();
2891    }
2892
2893    return errCode;
2894  }
2895
2896  private static boolean isCommandClass(String cmd) {
2897    return COMMANDS.containsKey(cmd);
2898  }
2899
2900  private static Class<? extends TestBase> determineCommandClass(String cmd) {
2901    CmdDescriptor descriptor = COMMANDS.get(cmd);
2902    return descriptor != null ? descriptor.getCmdClass() : null;
2903  }
2904
2905  public static void main(final String[] args) throws Exception {
2906    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2907    System.exit(res);
2908  }
2909}