001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import com.codahale.metrics.Histogram;
022import com.codahale.metrics.UniformReservoir;
023import java.io.IOException;
024import java.io.PrintStream;
025import java.lang.reflect.Constructor;
026import java.math.BigDecimal;
027import java.math.MathContext;
028import java.text.DecimalFormat;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Date;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.NoSuchElementException;
038import java.util.Queue;
039import java.util.Random;
040import java.util.TreeMap;
041import java.util.concurrent.Callable;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Executors;
045import java.util.concurrent.Future;
046import org.apache.commons.lang3.StringUtils;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.conf.Configured;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.client.Admin;
052import org.apache.hadoop.hbase.client.Append;
053import org.apache.hadoop.hbase.client.AsyncConnection;
054import org.apache.hadoop.hbase.client.AsyncTable;
055import org.apache.hadoop.hbase.client.BufferedMutator;
056import org.apache.hadoop.hbase.client.BufferedMutatorParams;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.Consistency;
060import org.apache.hadoop.hbase.client.Delete;
061import org.apache.hadoop.hbase.client.Durability;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Increment;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionInfo;
066import org.apache.hadoop.hbase.client.RegionInfoBuilder;
067import org.apache.hadoop.hbase.client.RegionLocator;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.ResultScanner;
070import org.apache.hadoop.hbase.client.RowMutations;
071import org.apache.hadoop.hbase.client.Scan;
072import org.apache.hadoop.hbase.client.Table;
073import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
074import org.apache.hadoop.hbase.filter.BinaryComparator;
075import org.apache.hadoop.hbase.filter.Filter;
076import org.apache.hadoop.hbase.filter.FilterAllFilter;
077import org.apache.hadoop.hbase.filter.FilterList;
078import org.apache.hadoop.hbase.filter.PageFilter;
079import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
080import org.apache.hadoop.hbase.filter.WhileMatchFilter;
081import org.apache.hadoop.hbase.io.compress.Compression;
082import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
083import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
084import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
085import org.apache.hadoop.hbase.regionserver.BloomType;
086import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
087import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
088import org.apache.hadoop.hbase.trace.SpanReceiverHost;
089import org.apache.hadoop.hbase.trace.TraceUtil;
090import org.apache.hadoop.hbase.util.ByteArrayHashKey;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.GsonUtil;
093import org.apache.hadoop.hbase.util.Hash;
094import org.apache.hadoop.hbase.util.MurmurHash;
095import org.apache.hadoop.hbase.util.Pair;
096import org.apache.hadoop.hbase.util.YammerHistogramUtils;
097import org.apache.hadoop.io.LongWritable;
098import org.apache.hadoop.io.Text;
099import org.apache.hadoop.mapreduce.Job;
100import org.apache.hadoop.mapreduce.Mapper;
101import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
102import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
103import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
104import org.apache.hadoop.util.Tool;
105import org.apache.hadoop.util.ToolRunner;
106import org.apache.htrace.core.ProbabilitySampler;
107import org.apache.htrace.core.Sampler;
108import org.apache.htrace.core.TraceScope;
109import org.apache.yetus.audience.InterfaceAudience;
110import org.slf4j.Logger;
111import org.slf4j.LoggerFactory;
112
113import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
114import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
115import org.apache.hbase.thirdparty.com.google.gson.Gson;
116
117/**
118 * Script used evaluating HBase performance and scalability.  Runs a HBase
119 * client that steps through one of a set of hardcoded tests or 'experiments'
120 * (e.g. a random reads test, a random writes test, etc.). Pass on the
121 * command-line which test to run and how many clients are participating in
122 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
123 *
124 * <p>This class sets up and runs the evaluation programs described in
125 * Section 7, <i>Performance Evaluation</i>, of the <a
126 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
127 * paper, pages 8-10.
128 *
129 * <p>By default, runs as a mapreduce job where each mapper runs a single test
130 * client. Can also run as a non-mapreduce, multithreaded application by
131 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
132 * specified otherwise.
133 */
134@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
135public class PerformanceEvaluation extends Configured implements Tool {
136  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
137  static final String RANDOM_READ = "randomRead";
138  static final String PE_COMMAND_SHORTNAME = "pe";
139  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
140  private static final Gson GSON = GsonUtil.createGson().create();
141
142  public static final String TABLE_NAME = "TestTable";
143  public static final String FAMILY_NAME_BASE = "info";
144  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
145  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
146  public static final int DEFAULT_VALUE_LENGTH = 1000;
147  public static final int ROW_LENGTH = 26;
148
149  private static final int ONE_GB = 1024 * 1024 * 1000;
150  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
151  // TODO : should we make this configurable
152  private static final int TAG_LENGTH = 256;
153  private static final DecimalFormat FMT = new DecimalFormat("0.##");
154  private static final MathContext CXT = MathContext.DECIMAL64;
155  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
156  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
157  private static final TestOptions DEFAULT_OPTS = new TestOptions();
158
159  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
160  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
161
162  static {
163    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
164      "Run async random read test");
165    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
166      "Run async random write test");
167    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
168      "Run async sequential read test");
169    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
170      "Run async sequential write test");
171    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
172      "Run async scan test (read every row)");
173    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
174    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead",
175      "Run getRegionLocation test");
176    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
177      "Run random seek and scan 100 test");
178    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
179      "Run random seek scan with both start and stop row (max 10 rows)");
180    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
181      "Run random seek scan with both start and stop row (max 100 rows)");
182    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
183      "Run random seek scan with both start and stop row (max 1000 rows)");
184    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
185      "Run random seek scan with both start and stop row (max 10000 rows)");
186    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
187      "Run random write test");
188    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
189      "Run sequential read test");
190    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
191      "Run sequential write test");
192    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
193      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
194    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
195    addCommandDescriptor(FilteredScanTest.class, "filterScan",
196      "Run scan test using a filter to find a specific row based on it's value " +
197        "(make sure to use --rows=20)");
198    addCommandDescriptor(IncrementTest.class, "increment",
199      "Increment on each row; clients overlap on keyspace so some concurrent operations");
200    addCommandDescriptor(AppendTest.class, "append",
201      "Append on each row; clients overlap on keyspace so some concurrent operations");
202    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
203      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
204    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
205      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
206    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
207      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
208    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
209      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
210  }
211
212  /**
213   * Enum for map metrics.  Keep it out here rather than inside in the Map
214   * inner-class so we can find associated properties.
215   */
216  protected static enum Counter {
217    /** elapsed time */
218    ELAPSED_TIME,
219    /** number of rows */
220    ROWS
221  }
222
223  protected static class RunResult implements Comparable<RunResult> {
224    public RunResult(long duration, Histogram hist) {
225      this.duration = duration;
226      this.hist = hist;
227      numbOfReplyOverThreshold = 0;
228      numOfReplyFromReplica = 0;
229    }
230
231    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
232      Histogram hist) {
233      this.duration = duration;
234      this.hist = hist;
235      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
236      this.numOfReplyFromReplica = numOfReplyFromReplica;
237    }
238
239    public final long duration;
240    public final Histogram hist;
241    public final long numbOfReplyOverThreshold;
242    public final long numOfReplyFromReplica;
243
244    @Override
245    public String toString() {
246      return Long.toString(duration);
247    }
248
249    @Override public int compareTo(RunResult o) {
250      return Long.compare(this.duration, o.duration);
251    }
252  }
253
254  /**
255   * Constructor
256   * @param conf Configuration object
257   */
258  public PerformanceEvaluation(final Configuration conf) {
259    super(conf);
260  }
261
262  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
263      String name, String description) {
264    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
265    COMMANDS.put(name, cmdDescriptor);
266  }
267
268  /**
269   * Implementations can have their status set.
270   */
271  interface Status {
272    /**
273     * Sets status
274     * @param msg status message
275     * @throws IOException
276     */
277    void setStatus(final String msg) throws IOException;
278  }
279
280  /**
281   * MapReduce job that runs a performance evaluation client in each map task.
282   */
283  public static class EvaluationMapTask
284      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
285
286    /** configuration parameter name that contains the command */
287    public final static String CMD_KEY = "EvaluationMapTask.command";
288    /** configuration parameter name that contains the PE impl */
289    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
290
291    private Class<? extends Test> cmd;
292
293    @Override
294    protected void setup(Context context) throws IOException, InterruptedException {
295      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
296
297      // this is required so that extensions of PE are instantiated within the
298      // map reduce task...
299      Class<? extends PerformanceEvaluation> peClass =
300          forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
301      try {
302        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
303      } catch (Exception e) {
304        throw new IllegalStateException("Could not instantiate PE instance", e);
305      }
306    }
307
308    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
309      try {
310        return Class.forName(className).asSubclass(type);
311      } catch (ClassNotFoundException e) {
312        throw new IllegalStateException("Could not find class for name: " + className, e);
313      }
314    }
315
316    @Override
317    protected void map(LongWritable key, Text value, final Context context)
318           throws IOException, InterruptedException {
319
320      Status status = new Status() {
321        @Override
322        public void setStatus(String msg) {
323           context.setStatus(msg);
324        }
325      };
326
327      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
328      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
329      final Connection con = ConnectionFactory.createConnection(conf);
330      AsyncConnection asyncCon = null;
331      try {
332        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
333      } catch (ExecutionException e) {
334        throw new IOException(e);
335      }
336
337      // Evaluation task
338      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
339      // Collect how much time the thing took. Report as map output and
340      // to the ELAPSED_TIME counter.
341      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
342      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
343      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
344      context.progress();
345    }
346  }
347
348  /*
349   * If table does not already exist, create. Also create a table when
350   * {@code opts.presplitRegions} is specified or when the existing table's
351   * region replica count doesn't match {@code opts.replicas}.
352   */
353  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
354    TableName tableName = TableName.valueOf(opts.tableName);
355    boolean needsDelete = false, exists = admin.tableExists(tableName);
356    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
357      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
358    if (!exists && isReadCmd) {
359      throw new IllegalStateException(
360        "Must specify an existing table for read commands. Run a write command first.");
361    }
362    HTableDescriptor desc =
363      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
364    byte[][] splits = getSplits(opts);
365
366    // recreate the table when user has requested presplit or when existing
367    // {RegionSplitPolicy,replica count} does not match requested, or when the
368    // number of column families does not match requested.
369    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
370      || (!isReadCmd && desc != null &&
371          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
372      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
373      || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
374      needsDelete = true;
375      // wait, why did it delete my table?!?
376      LOG.debug(MoreObjects.toStringHelper("needsDelete")
377        .add("needsDelete", needsDelete)
378        .add("isReadCmd", isReadCmd)
379        .add("exists", exists)
380        .add("desc", desc)
381        .add("presplit", opts.presplitRegions)
382        .add("splitPolicy", opts.splitPolicy)
383        .add("replicas", opts.replicas)
384        .add("families", opts.families)
385        .toString());
386    }
387
388    // remove an existing table
389    if (needsDelete) {
390      if (admin.isTableEnabled(tableName)) {
391        admin.disableTable(tableName);
392      }
393      admin.deleteTable(tableName);
394    }
395
396    // table creation is necessary
397    if (!exists || needsDelete) {
398      desc = getTableDescriptor(opts);
399      if (splits != null) {
400        if (LOG.isDebugEnabled()) {
401          for (int i = 0; i < splits.length; i++) {
402            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
403          }
404        }
405      }
406      admin.createTable(desc, splits);
407      LOG.info("Table " + desc + " created");
408    }
409    return admin.tableExists(tableName);
410  }
411
412  /**
413   * Create an HTableDescriptor from provided TestOptions.
414   */
415  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
416    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
417    for (int family = 0; family < opts.families; family++) {
418      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
419      HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
420      familyDesc.setDataBlockEncoding(opts.blockEncoding);
421      familyDesc.setCompressionType(opts.compression);
422      familyDesc.setBloomFilterType(opts.bloomType);
423      familyDesc.setBlocksize(opts.blockSize);
424      if (opts.inMemoryCF) {
425        familyDesc.setInMemory(true);
426      }
427      familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
428      tableDesc.addFamily(familyDesc);
429    }
430    if (opts.replicas != DEFAULT_OPTS.replicas) {
431      tableDesc.setRegionReplication(opts.replicas);
432    }
433    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
434      tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
435    }
436    return tableDesc;
437  }
438
439  /**
440   * generates splits based on total number of rows and specified split regions
441   */
442  protected static byte[][] getSplits(TestOptions opts) {
443    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
444      return null;
445
446    int numSplitPoints = opts.presplitRegions - 1;
447    byte[][] splits = new byte[numSplitPoints][];
448    int jump = opts.totalRows / opts.presplitRegions;
449    for (int i = 0; i < numSplitPoints; i++) {
450      int rowkey = jump * (1 + i);
451      splits[i] = format(rowkey);
452    }
453    return splits;
454  }
455
456  static void setupConnectionCount(final TestOptions opts) {
457    if (opts.oneCon) {
458      opts.connCount = 1;
459    } else {
460      if (opts.connCount == -1) {
461        // set to thread number if connCount is not set
462        opts.connCount = opts.numClientThreads;
463      }
464    }
465  }
466
467  /*
468   * Run all clients in this vm each to its own thread.
469   */
470  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
471      throws IOException, InterruptedException, ExecutionException {
472    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
473    assert cmd != null;
474    @SuppressWarnings("unchecked")
475    Future<RunResult>[] threads = new Future[opts.numClientThreads];
476    RunResult[] results = new RunResult[opts.numClientThreads];
477    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
478      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
479    setupConnectionCount(opts);
480    final Connection[] cons = new Connection[opts.connCount];
481    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
482    for (int i = 0; i < opts.connCount; i++) {
483      cons[i] = ConnectionFactory.createConnection(conf);
484      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
485    }
486    LOG.info("Created " + opts.connCount + " connections for " +
487        opts.numClientThreads + " threads");
488    for (int i = 0; i < threads.length; i++) {
489      final int index = i;
490      threads[i] = pool.submit(new Callable<RunResult>() {
491        @Override
492        public RunResult call() throws Exception {
493          TestOptions threadOpts = new TestOptions(opts);
494          final Connection con = cons[index % cons.length];
495          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
496          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
497          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
498            @Override
499            public void setStatus(final String msg) throws IOException {
500              LOG.info(msg);
501            }
502          });
503          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
504            "ms over " + threadOpts.perClientRunRows + " rows");
505          if (opts.latencyThreshold > 0) {
506            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold +
507              "(ms) is " + run.numbOfReplyOverThreshold);
508          }
509          return run;
510        }
511      });
512    }
513    pool.shutdown();
514
515    for (int i = 0; i < threads.length; i++) {
516      try {
517        results[i] = threads[i].get();
518      } catch (ExecutionException e) {
519        throw new IOException(e.getCause());
520      }
521    }
522    final String test = cmd.getSimpleName();
523    LOG.info("[" + test + "] Summary of timings (ms): "
524             + Arrays.toString(results));
525    Arrays.sort(results);
526    long total = 0;
527    float avgLatency = 0 ;
528    float avgTPS = 0;
529    long replicaWins = 0;
530    for (RunResult result : results) {
531      total += result.duration;
532      avgLatency += result.hist.getSnapshot().getMean();
533      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
534      replicaWins += result.numOfReplyFromReplica;
535    }
536    avgTPS *= 1000; // ms to second
537    avgLatency = avgLatency / results.length;
538    LOG.info("[" + test + " duration ]"
539      + "\tMin: " + results[0] + "ms"
540      + "\tMax: " + results[results.length - 1] + "ms"
541      + "\tAvg: " + (total / results.length) + "ms");
542    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
543    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
544    if (opts.replicas > 1) {
545      LOG.info("[results from replica regions] " + replicaWins);
546    }
547
548    for (int i = 0; i < opts.connCount; i++) {
549      cons[i].close();
550      asyncCons[i].close();
551    }
552
553    return results;
554  }
555
556  /*
557   * Run a mapreduce job.  Run as many maps as asked-for clients.
558   * Before we start up the job, write out an input file with instruction
559   * per client regards which row they are to start on.
560   * @param cmd Command to run.
561   * @throws IOException
562   */
563  static Job doMapReduce(TestOptions opts, final Configuration conf)
564      throws IOException, InterruptedException, ClassNotFoundException {
565    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
566    assert cmd != null;
567    Path inputDir = writeInputFile(conf, opts);
568    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
569    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
570    Job job = Job.getInstance(conf);
571    job.setJarByClass(PerformanceEvaluation.class);
572    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
573
574    job.setInputFormatClass(NLineInputFormat.class);
575    NLineInputFormat.setInputPaths(job, inputDir);
576    // this is default, but be explicit about it just in case.
577    NLineInputFormat.setNumLinesPerSplit(job, 1);
578
579    job.setOutputKeyClass(LongWritable.class);
580    job.setOutputValueClass(LongWritable.class);
581
582    job.setMapperClass(EvaluationMapTask.class);
583    job.setReducerClass(LongSumReducer.class);
584
585    job.setNumReduceTasks(1);
586
587    job.setOutputFormatClass(TextOutputFormat.class);
588    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
589
590    TableMapReduceUtil.addDependencyJars(job);
591    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
592      Histogram.class,     // yammer metrics
593      Gson.class,  // gson
594      FilterAllFilter.class // hbase-server tests jar
595      );
596
597    TableMapReduceUtil.initCredentials(job);
598
599    job.waitForCompletion(true);
600    return job;
601  }
602
603  /**
604   * Each client has one mapper to do the work,  and client do the resulting count in a map task.
605   */
606
607  static String JOB_INPUT_FILENAME = "input.txt";
608
609  /*
610   * Write input file of offsets-per-client for the mapreduce job.
611   * @param c Configuration
612   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
613   * @throws IOException
614   */
615  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
616    return writeInputFile(c, opts, new Path("."));
617  }
618
619  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
620  throws IOException {
621    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
622    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
623    Path inputDir = new Path(jobdir, "inputs");
624
625    FileSystem fs = FileSystem.get(c);
626    fs.mkdirs(inputDir);
627
628    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
629    PrintStream out = new PrintStream(fs.create(inputFile));
630    // Make input random.
631    Map<Integer, String> m = new TreeMap<>();
632    Hash h = MurmurHash.getInstance();
633    int perClientRows = (opts.totalRows / opts.numClientThreads);
634    try {
635      for (int j = 0; j < opts.numClientThreads; j++) {
636        TestOptions next = new TestOptions(opts);
637        next.startRow = j * perClientRows;
638        next.perClientRunRows = perClientRows;
639        String s = GSON.toJson(next);
640        LOG.info("Client=" + j + ", input=" + s);
641        byte[] b = Bytes.toBytes(s);
642        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
643        m.put(hash, s);
644      }
645      for (Map.Entry<Integer, String> e: m.entrySet()) {
646        out.println(e.getValue());
647      }
648    } finally {
649      out.close();
650    }
651    return inputDir;
652  }
653
654  /**
655   * Describes a command.
656   */
657  static class CmdDescriptor {
658    private Class<? extends TestBase> cmdClass;
659    private String name;
660    private String description;
661
662    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
663      this.cmdClass = cmdClass;
664      this.name = name;
665      this.description = description;
666    }
667
668    public Class<? extends TestBase> getCmdClass() {
669      return cmdClass;
670    }
671
672    public String getName() {
673      return name;
674    }
675
676    public String getDescription() {
677      return description;
678    }
679  }
680
681  /**
682   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
683   * This makes tracking all these arguments a little easier.
684   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
685   * serialization of this TestOptions class behave), and you need to add to the clone constructor
686   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
687   */
688  static class TestOptions {
689    String cmdName = null;
690    boolean nomapred = false;
691    boolean filterAll = false;
692    int startRow = 0;
693    float size = 1.0f;
694    int perClientRunRows = DEFAULT_ROWS_PER_GB;
695    int numClientThreads = 1;
696    int totalRows = DEFAULT_ROWS_PER_GB;
697    int measureAfter = 0;
698    float sampleRate = 1.0f;
699    double traceRate = 0.0;
700    String tableName = TABLE_NAME;
701    boolean flushCommits = true;
702    boolean writeToWAL = true;
703    boolean autoFlush = false;
704    boolean oneCon = false;
705    int connCount = -1; //wil decide the actual num later
706    boolean useTags = false;
707    int noOfTags = 1;
708    boolean reportLatency = false;
709    int multiGet = 0;
710    int multiPut = 0;
711    int randomSleep = 0;
712    boolean inMemoryCF = false;
713    int presplitRegions = 0;
714    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
715    String splitPolicy = null;
716    Compression.Algorithm compression = Compression.Algorithm.NONE;
717    BloomType bloomType = BloomType.ROW;
718    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
719    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
720    boolean valueRandom = false;
721    boolean valueZipf = false;
722    int valueSize = DEFAULT_VALUE_LENGTH;
723    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
724    int cycles = 1;
725    int columns = 1;
726    int families = 1;
727    int caching = 30;
728    int latencyThreshold = 0; // in millsecond
729    boolean addColumns = true;
730    MemoryCompactionPolicy inMemoryCompaction =
731        MemoryCompactionPolicy.valueOf(
732            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
733    boolean asyncPrefetch = false;
734    boolean cacheBlocks = true;
735    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
736    long bufferSize = 2l * 1024l * 1024l;
737
738    public TestOptions() {}
739
740    /**
741     * Clone constructor.
742     * @param that Object to copy from.
743     */
744    public TestOptions(TestOptions that) {
745      this.cmdName = that.cmdName;
746      this.cycles = that.cycles;
747      this.nomapred = that.nomapred;
748      this.startRow = that.startRow;
749      this.size = that.size;
750      this.perClientRunRows = that.perClientRunRows;
751      this.numClientThreads = that.numClientThreads;
752      this.totalRows = that.totalRows;
753      this.sampleRate = that.sampleRate;
754      this.traceRate = that.traceRate;
755      this.tableName = that.tableName;
756      this.flushCommits = that.flushCommits;
757      this.writeToWAL = that.writeToWAL;
758      this.autoFlush = that.autoFlush;
759      this.oneCon = that.oneCon;
760      this.connCount = that.connCount;
761      this.useTags = that.useTags;
762      this.noOfTags = that.noOfTags;
763      this.reportLatency = that.reportLatency;
764      this.latencyThreshold = that.latencyThreshold;
765      this.multiGet = that.multiGet;
766      this.multiPut = that.multiPut;
767      this.inMemoryCF = that.inMemoryCF;
768      this.presplitRegions = that.presplitRegions;
769      this.replicas = that.replicas;
770      this.splitPolicy = that.splitPolicy;
771      this.compression = that.compression;
772      this.blockEncoding = that.blockEncoding;
773      this.filterAll = that.filterAll;
774      this.bloomType = that.bloomType;
775      this.blockSize = that.blockSize;
776      this.valueRandom = that.valueRandom;
777      this.valueZipf = that.valueZipf;
778      this.valueSize = that.valueSize;
779      this.period = that.period;
780      this.randomSleep = that.randomSleep;
781      this.measureAfter = that.measureAfter;
782      this.addColumns = that.addColumns;
783      this.columns = that.columns;
784      this.families = that.families;
785      this.caching = that.caching;
786      this.inMemoryCompaction = that.inMemoryCompaction;
787      this.asyncPrefetch = that.asyncPrefetch;
788      this.cacheBlocks = that.cacheBlocks;
789      this.scanReadType = that.scanReadType;
790      this.bufferSize = that.bufferSize;
791    }
792
793    public int getCaching() {
794      return this.caching;
795    }
796
797    public void setCaching(final int caching) {
798      this.caching = caching;
799    }
800
801    public int getColumns() {
802      return this.columns;
803    }
804
805    public void setColumns(final int columns) {
806      this.columns = columns;
807    }
808
809    public int getFamilies() {
810      return this.families;
811    }
812
813    public void setFamilies(final int families) {
814      this.families = families;
815    }
816
817    public int getCycles() {
818      return this.cycles;
819    }
820
821    public void setCycles(final int cycles) {
822      this.cycles = cycles;
823    }
824
825    public boolean isValueZipf() {
826      return valueZipf;
827    }
828
829    public void setValueZipf(boolean valueZipf) {
830      this.valueZipf = valueZipf;
831    }
832
833    public String getCmdName() {
834      return cmdName;
835    }
836
837    public void setCmdName(String cmdName) {
838      this.cmdName = cmdName;
839    }
840
841    public int getRandomSleep() {
842      return randomSleep;
843    }
844
845    public void setRandomSleep(int randomSleep) {
846      this.randomSleep = randomSleep;
847    }
848
849    public int getReplicas() {
850      return replicas;
851    }
852
853    public void setReplicas(int replicas) {
854      this.replicas = replicas;
855    }
856
857    public String getSplitPolicy() {
858      return splitPolicy;
859    }
860
861    public void setSplitPolicy(String splitPolicy) {
862      this.splitPolicy = splitPolicy;
863    }
864
865    public void setNomapred(boolean nomapred) {
866      this.nomapred = nomapred;
867    }
868
869    public void setFilterAll(boolean filterAll) {
870      this.filterAll = filterAll;
871    }
872
873    public void setStartRow(int startRow) {
874      this.startRow = startRow;
875    }
876
877    public void setSize(float size) {
878      this.size = size;
879    }
880
881    public void setPerClientRunRows(int perClientRunRows) {
882      this.perClientRunRows = perClientRunRows;
883    }
884
885    public void setNumClientThreads(int numClientThreads) {
886      this.numClientThreads = numClientThreads;
887    }
888
889    public void setTotalRows(int totalRows) {
890      this.totalRows = totalRows;
891    }
892
893    public void setSampleRate(float sampleRate) {
894      this.sampleRate = sampleRate;
895    }
896
897    public void setTraceRate(double traceRate) {
898      this.traceRate = traceRate;
899    }
900
901    public void setTableName(String tableName) {
902      this.tableName = tableName;
903    }
904
905    public void setFlushCommits(boolean flushCommits) {
906      this.flushCommits = flushCommits;
907    }
908
909    public void setWriteToWAL(boolean writeToWAL) {
910      this.writeToWAL = writeToWAL;
911    }
912
913    public void setAutoFlush(boolean autoFlush) {
914      this.autoFlush = autoFlush;
915    }
916
917    public void setOneCon(boolean oneCon) {
918      this.oneCon = oneCon;
919    }
920
921    public int getConnCount() {
922      return connCount;
923    }
924
925    public void setConnCount(int connCount) {
926      this.connCount = connCount;
927    }
928
929    public void setUseTags(boolean useTags) {
930      this.useTags = useTags;
931    }
932
933    public void setNoOfTags(int noOfTags) {
934      this.noOfTags = noOfTags;
935    }
936
937    public void setReportLatency(boolean reportLatency) {
938      this.reportLatency = reportLatency;
939    }
940
941    public void setMultiGet(int multiGet) {
942      this.multiGet = multiGet;
943    }
944
945    public void setMultiPut(int multiPut) {
946      this.multiPut = multiPut;
947    }
948
949    public void setInMemoryCF(boolean inMemoryCF) {
950      this.inMemoryCF = inMemoryCF;
951    }
952
953    public void setPresplitRegions(int presplitRegions) {
954      this.presplitRegions = presplitRegions;
955    }
956
957    public void setCompression(Compression.Algorithm compression) {
958      this.compression = compression;
959    }
960
961    public void setBloomType(BloomType bloomType) {
962      this.bloomType = bloomType;
963    }
964
965    public void setBlockSize(int blockSize) {
966      this.blockSize = blockSize;
967    }
968
969    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
970      this.blockEncoding = blockEncoding;
971    }
972
973    public void setValueRandom(boolean valueRandom) {
974      this.valueRandom = valueRandom;
975    }
976
977    public void setValueSize(int valueSize) {
978      this.valueSize = valueSize;
979    }
980
981    public void setBufferSize(long bufferSize) {
982      this.bufferSize = bufferSize;
983    }
984
985    public void setPeriod(int period) {
986      this.period = period;
987    }
988
989    public boolean isNomapred() {
990      return nomapred;
991    }
992
993    public boolean isFilterAll() {
994      return filterAll;
995    }
996
997    public int getStartRow() {
998      return startRow;
999    }
1000
1001    public float getSize() {
1002      return size;
1003    }
1004
1005    public int getPerClientRunRows() {
1006      return perClientRunRows;
1007    }
1008
1009    public int getNumClientThreads() {
1010      return numClientThreads;
1011    }
1012
1013    public int getTotalRows() {
1014      return totalRows;
1015    }
1016
1017    public float getSampleRate() {
1018      return sampleRate;
1019    }
1020
1021    public double getTraceRate() {
1022      return traceRate;
1023    }
1024
1025    public String getTableName() {
1026      return tableName;
1027    }
1028
1029    public boolean isFlushCommits() {
1030      return flushCommits;
1031    }
1032
1033    public boolean isWriteToWAL() {
1034      return writeToWAL;
1035    }
1036
1037    public boolean isAutoFlush() {
1038      return autoFlush;
1039    }
1040
1041    public boolean isUseTags() {
1042      return useTags;
1043    }
1044
1045    public int getNoOfTags() {
1046      return noOfTags;
1047    }
1048
1049    public boolean isReportLatency() {
1050      return reportLatency;
1051    }
1052
1053    public int getMultiGet() {
1054      return multiGet;
1055    }
1056
1057    public int getMultiPut() {
1058      return multiPut;
1059    }
1060
1061    public boolean isInMemoryCF() {
1062      return inMemoryCF;
1063    }
1064
1065    public int getPresplitRegions() {
1066      return presplitRegions;
1067    }
1068
1069    public Compression.Algorithm getCompression() {
1070      return compression;
1071    }
1072
1073    public DataBlockEncoding getBlockEncoding() {
1074      return blockEncoding;
1075    }
1076
1077    public boolean isValueRandom() {
1078      return valueRandom;
1079    }
1080
1081    public int getValueSize() {
1082      return valueSize;
1083    }
1084
1085    public int getPeriod() {
1086      return period;
1087    }
1088
1089    public BloomType getBloomType() {
1090      return bloomType;
1091    }
1092
1093    public int getBlockSize() {
1094      return blockSize;
1095    }
1096
1097    public boolean isOneCon() {
1098      return oneCon;
1099    }
1100
1101    public int getMeasureAfter() {
1102      return measureAfter;
1103    }
1104
1105    public void setMeasureAfter(int measureAfter) {
1106      this.measureAfter = measureAfter;
1107    }
1108
1109    public boolean getAddColumns() {
1110      return addColumns;
1111    }
1112
1113    public void setAddColumns(boolean addColumns) {
1114      this.addColumns = addColumns;
1115    }
1116
1117    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1118      this.inMemoryCompaction = inMemoryCompaction;
1119    }
1120
1121    public MemoryCompactionPolicy getInMemoryCompaction() {
1122      return this.inMemoryCompaction;
1123    }
1124
1125    public long getBufferSize() {
1126      return this.bufferSize;
1127    }
1128  }
1129
1130  /*
1131   * A test.
1132   * Subclass to particularize what happens per row.
1133   */
1134  static abstract class TestBase {
1135    // Below is make it so when Tests are all running in the one
1136    // jvm, that they each have a differently seeded Random.
1137    private static final Random randomSeed = new Random(System.currentTimeMillis());
1138
1139    private static long nextRandomSeed() {
1140      return randomSeed.nextLong();
1141    }
1142    private final int everyN;
1143
1144    protected final Random rand = new Random(nextRandomSeed());
1145    protected final Configuration conf;
1146    protected final TestOptions opts;
1147
1148    private final Status status;
1149    private final Sampler traceSampler;
1150    private final SpanReceiverHost receiverHost;
1151
1152    private String testName;
1153    private Histogram latencyHistogram;
1154    private Histogram replicaLatencyHistogram;
1155    private Histogram valueSizeHistogram;
1156    private Histogram rpcCallsHistogram;
1157    private Histogram remoteRpcCallsHistogram;
1158    private Histogram millisBetweenNextHistogram;
1159    private Histogram regionsScannedHistogram;
1160    private Histogram bytesInResultsHistogram;
1161    private Histogram bytesInRemoteResultsHistogram;
1162    private RandomDistribution.Zipf zipf;
1163    private long numOfReplyOverLatencyThreshold = 0;
1164    private long numOfReplyFromReplica = 0;
1165
1166    /**
1167     * Note that all subclasses of this class must provide a public constructor
1168     * that has the exact same list of arguments.
1169     */
1170    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1171      this.conf = conf;
1172      this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1173      this.opts = options;
1174      this.status = status;
1175      this.testName = this.getClass().getSimpleName();
1176      if (options.traceRate >= 1.0) {
1177        this.traceSampler = Sampler.ALWAYS;
1178      } else if (options.traceRate > 0.0) {
1179        conf.setDouble("hbase.sampler.fraction", options.traceRate);
1180        this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1181      } else {
1182        this.traceSampler = Sampler.NEVER;
1183      }
1184      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1185      if (options.isValueZipf()) {
1186        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1187      }
1188      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1189    }
1190
1191    int getValueLength(final Random r) {
1192      if (this.opts.isValueRandom()) {
1193        return r.nextInt(opts.valueSize);
1194      } else if (this.opts.isValueZipf()) {
1195        return Math.abs(this.zipf.nextInt());
1196      } else {
1197        return opts.valueSize;
1198      }
1199    }
1200
1201    void updateValueSize(final Result [] rs) throws IOException {
1202      updateValueSize(rs, 0);
1203    }
1204
1205    void updateValueSize(final Result [] rs, final long latency) throws IOException {
1206      if (rs == null || (latency == 0)) return;
1207      for (Result r: rs) updateValueSize(r, latency);
1208    }
1209
1210    void updateValueSize(final Result r) throws IOException {
1211      updateValueSize(r, 0);
1212    }
1213
1214    void updateValueSize(final Result r, final long latency) throws IOException {
1215      if (r == null || (latency == 0)) return;
1216      int size = 0;
1217      // update replicaHistogram
1218      if (r.isStale()) {
1219        replicaLatencyHistogram.update(latency / 1000);
1220        numOfReplyFromReplica ++;
1221      }
1222      if (!isRandomValueSize()) return;
1223
1224      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1225        size += scanner.current().getValueLength();
1226      }
1227      updateValueSize(size);
1228    }
1229
1230    void updateValueSize(final int valueSize) {
1231      if (!isRandomValueSize()) return;
1232      this.valueSizeHistogram.update(valueSize);
1233    }
1234
1235    void updateScanMetrics(final ScanMetrics metrics) {
1236      if (metrics == null) return;
1237      Map<String,Long> metricsMap = metrics.getMetricsMap();
1238      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1239      if (rpcCalls != null) {
1240        this.rpcCallsHistogram.update(rpcCalls.longValue());
1241      }
1242      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1243      if (remoteRpcCalls != null) {
1244        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1245      }
1246      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1247      if (millisBetweenNext != null) {
1248        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1249      }
1250      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1251      if (regionsScanned != null) {
1252        this.regionsScannedHistogram.update(regionsScanned.longValue());
1253      }
1254      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1255      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1256        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1257      }
1258      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1259      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1260        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1261      }
1262    }
1263
1264    String generateStatus(final int sr, final int i, final int lr) {
1265      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1266        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1267    }
1268
1269    boolean isRandomValueSize() {
1270      return opts.valueRandom;
1271    }
1272
1273    protected int getReportingPeriod() {
1274      return opts.period;
1275    }
1276
1277    /**
1278     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1279     */
1280    public Histogram getLatencyHistogram() {
1281      return latencyHistogram;
1282    }
1283
1284    void testSetup() throws IOException {
1285      // test metrics
1286      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1287      // If it is a replica test, set up histogram for replica.
1288      if (opts.replicas > 1) {
1289        replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1290      }
1291      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1292      // scan metrics
1293      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1294      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1295      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1296      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1297      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1298      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1299
1300      onStartup();
1301    }
1302
1303    abstract void onStartup() throws IOException;
1304
1305    void testTakedown() throws IOException {
1306      onTakedown();
1307      // Print all stats for this thread continuously.
1308      // Synchronize on Test.class so different threads don't intermingle the
1309      // output. We can't use 'this' here because each thread has its own instance of Test class.
1310      synchronized (Test.class) {
1311        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1312        status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1313            latencyHistogram));
1314        if (opts.replicas > 1) {
1315          status.setStatus("Latency (us) from Replica Regions: " +
1316            YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1317        }
1318        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1319        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1320        if (valueSizeHistogram.getCount() > 0) {
1321          status.setStatus("ValueSize (bytes) : "
1322              + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1323          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1324          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1325        } else {
1326          status.setStatus("No valueSize statistics available");
1327        }
1328        if (rpcCallsHistogram.getCount() > 0) {
1329          status.setStatus("rpcCalls (count): " +
1330              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1331        }
1332        if (remoteRpcCallsHistogram.getCount() > 0) {
1333          status.setStatus("remoteRpcCalls (count): " +
1334              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1335        }
1336        if (millisBetweenNextHistogram.getCount() > 0) {
1337          status.setStatus("millisBetweenNext (latency): " +
1338              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1339        }
1340        if (regionsScannedHistogram.getCount() > 0) {
1341          status.setStatus("regionsScanned (count): " +
1342              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1343        }
1344        if (bytesInResultsHistogram.getCount() > 0) {
1345          status.setStatus("bytesInResults (size): " +
1346              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1347        }
1348        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1349          status.setStatus("bytesInRemoteResults (size): " +
1350              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1351        }
1352      }
1353      receiverHost.closeReceivers();
1354    }
1355
1356    abstract void onTakedown() throws IOException;
1357
1358
1359    /*
1360     * Run test
1361     * @return Elapsed time.
1362     * @throws IOException
1363     */
1364    long test() throws IOException, InterruptedException {
1365      testSetup();
1366      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1367      final long startTime = System.nanoTime();
1368      try {
1369        testTimed();
1370      } finally {
1371        testTakedown();
1372      }
1373      return (System.nanoTime() - startTime) / 1000000;
1374    }
1375
1376    int getStartRow() {
1377      return opts.startRow;
1378    }
1379
1380    int getLastRow() {
1381      return getStartRow() + opts.perClientRunRows;
1382    }
1383
1384    /**
1385     * Provides an extension point for tests that don't want a per row invocation.
1386     */
1387    void testTimed() throws IOException, InterruptedException {
1388      int startRow = getStartRow();
1389      int lastRow = getLastRow();
1390      TraceUtil.addSampler(traceSampler);
1391      // Report on completion of 1/10th of total.
1392      for (int ii = 0; ii < opts.cycles; ii++) {
1393        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1394        for (int i = startRow; i < lastRow; i++) {
1395          if (i % everyN != 0) continue;
1396          long startTime = System.nanoTime();
1397          boolean requestSent = false;
1398          try (TraceScope scope = TraceUtil.createTrace("test row");){
1399            requestSent = testRow(i, startTime);
1400          }
1401          if ( (i - startRow) > opts.measureAfter) {
1402            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1403            // first 9 times and sends the actual get request in the 10th iteration.
1404            // We should only set latency when actual request is sent because otherwise
1405            // it turns out to be 0.
1406            if (requestSent) {
1407              long latency = (System.nanoTime() - startTime) / 1000;
1408              latencyHistogram.update(latency);
1409              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1410                numOfReplyOverLatencyThreshold ++;
1411              }
1412            }
1413            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1414              status.setStatus(generateStatus(startRow, i, lastRow));
1415            }
1416          }
1417        }
1418      }
1419    }
1420
1421    /**
1422     * @return Subset of the histograms' calculation.
1423     */
1424    public String getShortLatencyReport() {
1425      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1426    }
1427
1428    /**
1429     * @return Subset of the histograms' calculation.
1430     */
1431    public String getShortValueSizeReport() {
1432      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1433    }
1434
1435
1436    /**
1437     * Test for individual row.
1438     * @param i Row index.
1439     * @return true if the row was sent to server and need to record metrics.
1440     *         False if not, multiGet and multiPut e.g., the rows are sent
1441     *         to server only if enough gets/puts are gathered.
1442     */
1443    abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException;
1444  }
1445
1446  static abstract class Test extends TestBase {
1447    protected Connection connection;
1448
1449    Test(final Connection con, final TestOptions options, final Status status) {
1450      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1451      this.connection = con;
1452    }
1453  }
1454
1455  static abstract class AsyncTest extends TestBase {
1456    protected AsyncConnection connection;
1457
1458    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1459      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1460      this.connection = con;
1461    }
1462  }
1463
1464  static abstract class TableTest extends Test {
1465    protected Table table;
1466
1467    TableTest(Connection con, TestOptions options, Status status) {
1468      super(con, options, status);
1469    }
1470
1471    @Override
1472    void onStartup() throws IOException {
1473      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1474    }
1475
1476    @Override
1477    void onTakedown() throws IOException {
1478      table.close();
1479    }
1480  }
1481
1482  /*
1483  Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1484   */
1485  static abstract class MetaTest extends TableTest {
1486    protected int keyLength;
1487
1488    MetaTest(Connection con, TestOptions options, Status status) {
1489      super(con, options, status);
1490      keyLength = Integer.toString(opts.perClientRunRows).length();
1491    }
1492
1493    @Override
1494    void onTakedown() throws IOException {
1495      // No clean up
1496    }
1497
1498    /*
1499    Generates Lexicographically ascending strings
1500     */
1501    protected byte[] getSplitKey(final int i) {
1502      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1503    }
1504
1505  }
1506
1507  static abstract class AsyncTableTest extends AsyncTest {
1508    protected AsyncTable<?> table;
1509
1510    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1511      super(con, options, status);
1512    }
1513
1514    @Override
1515    void onStartup() throws IOException {
1516      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1517    }
1518
1519    @Override
1520    void onTakedown() throws IOException {
1521    }
1522  }
1523
1524  static class AsyncRandomReadTest extends AsyncTableTest {
1525    private final Consistency consistency;
1526    private ArrayList<Get> gets;
1527    private Random rd = new Random();
1528
1529    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1530      super(con, options, status);
1531      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1532      if (opts.multiGet > 0) {
1533        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1534        this.gets = new ArrayList<>(opts.multiGet);
1535      }
1536    }
1537
1538    @Override
1539    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1540      if (opts.randomSleep > 0) {
1541        Thread.sleep(rd.nextInt(opts.randomSleep));
1542      }
1543      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1544      for (int family = 0; family < opts.families; family++) {
1545        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1546        if (opts.addColumns) {
1547          for (int column = 0; column < opts.columns; column++) {
1548            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1549            get.addColumn(familyName, qualifier);
1550          }
1551        } else {
1552          get.addFamily(familyName);
1553        }
1554      }
1555      if (opts.filterAll) {
1556        get.setFilter(new FilterAllFilter());
1557      }
1558      get.setConsistency(consistency);
1559      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1560      try {
1561        if (opts.multiGet > 0) {
1562          this.gets.add(get);
1563          if (this.gets.size() == opts.multiGet) {
1564            Result[] rs =
1565                this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1566            updateValueSize(rs);
1567            this.gets.clear();
1568          } else {
1569            return false;
1570          }
1571        } else {
1572          updateValueSize(this.table.get(get).get());
1573        }
1574      } catch (ExecutionException e) {
1575        throw new IOException(e);
1576      }
1577      return true;
1578    }
1579
1580    public static RuntimeException runtime(Throwable e) {
1581      if (e instanceof RuntimeException) {
1582        return (RuntimeException) e;
1583      }
1584      return new RuntimeException(e);
1585    }
1586
1587    public static <V> V propagate(Callable<V> callable) {
1588      try {
1589        return callable.call();
1590      } catch (Exception e) {
1591        throw runtime(e);
1592      }
1593    }
1594
1595    @Override
1596    protected int getReportingPeriod() {
1597      int period = opts.perClientRunRows / 10;
1598      return period == 0 ? opts.perClientRunRows : period;
1599    }
1600
1601    @Override
1602    protected void testTakedown() throws IOException {
1603      if (this.gets != null && this.gets.size() > 0) {
1604        this.table.get(gets);
1605        this.gets.clear();
1606      }
1607      super.testTakedown();
1608    }
1609  }
1610
1611  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1612
1613    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1614      super(con, options, status);
1615    }
1616
1617    @Override
1618    protected byte[] generateRow(final int i) {
1619      return getRandomRow(this.rand, opts.totalRows);
1620    }
1621  }
1622
1623  static class AsyncScanTest extends AsyncTableTest {
1624    private ResultScanner testScanner;
1625    private AsyncTable<?> asyncTable;
1626
1627    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1628      super(con, options, status);
1629    }
1630
1631    @Override
1632    void onStartup() throws IOException {
1633      this.asyncTable =
1634          connection.getTable(TableName.valueOf(opts.tableName),
1635            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1636    }
1637
1638    @Override
1639    void testTakedown() throws IOException {
1640      if (this.testScanner != null) {
1641        updateScanMetrics(this.testScanner.getScanMetrics());
1642        this.testScanner.close();
1643      }
1644      super.testTakedown();
1645    }
1646
1647    @Override
1648    boolean testRow(final int i, final long startTime) throws IOException {
1649      if (this.testScanner == null) {
1650        Scan scan =
1651            new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1652                .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1653                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1654        for (int family = 0; family < opts.families; family++) {
1655          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1656          if (opts.addColumns) {
1657            for (int column = 0; column < opts.columns; column++) {
1658              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1659              scan.addColumn(familyName, qualifier);
1660            }
1661          } else {
1662            scan.addFamily(familyName);
1663          }
1664        }
1665        if (opts.filterAll) {
1666          scan.setFilter(new FilterAllFilter());
1667        }
1668        this.testScanner = asyncTable.getScanner(scan);
1669      }
1670      Result r = testScanner.next();
1671      updateValueSize(r);
1672      return true;
1673    }
1674  }
1675
1676  static class AsyncSequentialReadTest extends AsyncTableTest {
1677    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1678      super(con, options, status);
1679    }
1680
1681    @Override
1682    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1683      Get get = new Get(format(i));
1684      for (int family = 0; family < opts.families; family++) {
1685        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1686        if (opts.addColumns) {
1687          for (int column = 0; column < opts.columns; column++) {
1688            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1689            get.addColumn(familyName, qualifier);
1690          }
1691        } else {
1692          get.addFamily(familyName);
1693        }
1694      }
1695      if (opts.filterAll) {
1696        get.setFilter(new FilterAllFilter());
1697      }
1698      try {
1699        updateValueSize(table.get(get).get());
1700      } catch (ExecutionException e) {
1701        throw new IOException(e);
1702      }
1703      return true;
1704    }
1705  }
1706
1707  static class AsyncSequentialWriteTest extends AsyncTableTest {
1708    private ArrayList<Put> puts;
1709
1710    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1711      super(con, options, status);
1712      if (opts.multiPut > 0) {
1713        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1714        this.puts = new ArrayList<>(opts.multiPut);
1715      }
1716    }
1717
1718    protected byte[] generateRow(final int i) {
1719      return format(i);
1720    }
1721
1722    @Override
1723    @SuppressWarnings("ReturnValueIgnored")
1724    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1725      byte[] row = generateRow(i);
1726      Put put = new Put(row);
1727      for (int family = 0; family < opts.families; family++) {
1728        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1729        for (int column = 0; column < opts.columns; column++) {
1730          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1731          byte[] value = generateData(this.rand, getValueLength(this.rand));
1732          if (opts.useTags) {
1733            byte[] tag = generateData(this.rand, TAG_LENGTH);
1734            Tag[] tags = new Tag[opts.noOfTags];
1735            for (int n = 0; n < opts.noOfTags; n++) {
1736              Tag t = new ArrayBackedTag((byte) n, tag);
1737              tags[n] = t;
1738            }
1739            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1740              value, tags);
1741            put.add(kv);
1742            updateValueSize(kv.getValueLength());
1743          } else {
1744            put.addColumn(familyName, qualifier, value);
1745            updateValueSize(value.length);
1746          }
1747        }
1748      }
1749      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1750      try {
1751        table.put(put).get();
1752        if (opts.multiPut > 0) {
1753          this.puts.add(put);
1754          if (this.puts.size() == opts.multiPut) {
1755            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1756            this.puts.clear();
1757          } else {
1758            return false;
1759          }
1760        } else {
1761          table.put(put).get();
1762        }
1763      } catch (ExecutionException e) {
1764        throw new IOException(e);
1765      }
1766      return true;
1767    }
1768  }
1769
1770  static abstract class BufferedMutatorTest extends Test {
1771    protected BufferedMutator mutator;
1772    protected Table table;
1773
1774    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1775      super(con, options, status);
1776    }
1777
1778    @Override
1779    void onStartup() throws IOException {
1780      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1781      p.writeBufferSize(opts.bufferSize);
1782      this.mutator = connection.getBufferedMutator(p);
1783      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1784    }
1785
1786    @Override
1787    void onTakedown() throws IOException {
1788      mutator.close();
1789      table.close();
1790    }
1791  }
1792
1793  static class RandomSeekScanTest extends TableTest {
1794    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1795      super(con, options, status);
1796    }
1797
1798    @Override
1799    boolean testRow(final int i, final long startTime) throws IOException {
1800      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1801          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1802          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1803          .setScanMetricsEnabled(true);
1804      FilterList list = new FilterList();
1805      for (int family = 0; family < opts.families; family++) {
1806        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1807        if (opts.addColumns) {
1808          for (int column = 0; column < opts.columns; column++) {
1809            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1810            scan.addColumn(familyName, qualifier);
1811          }
1812        } else {
1813          scan.addFamily(familyName);
1814        }
1815      }
1816      if (opts.filterAll) {
1817        list.addFilter(new FilterAllFilter());
1818      }
1819      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1820      scan.setFilter(list);
1821      ResultScanner s = this.table.getScanner(scan);
1822      try {
1823        for (Result rr; (rr = s.next()) != null;) {
1824          updateValueSize(rr);
1825        }
1826      } finally {
1827        updateScanMetrics(s.getScanMetrics());
1828        s.close();
1829      }
1830      return true;
1831    }
1832
1833    @Override
1834    protected int getReportingPeriod() {
1835      int period = opts.perClientRunRows / 100;
1836      return period == 0 ? opts.perClientRunRows : period;
1837    }
1838
1839  }
1840
1841  static abstract class RandomScanWithRangeTest extends TableTest {
1842    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1843      super(con, options, status);
1844    }
1845
1846    @Override
1847    boolean testRow(final int i, final long startTime) throws IOException {
1848      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1849      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1850          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1851          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1852          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1853      for (int family = 0; family < opts.families; family++) {
1854        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1855        if (opts.addColumns) {
1856          for (int column = 0; column < opts.columns; column++) {
1857            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1858            scan.addColumn(familyName, qualifier);
1859          }
1860        } else {
1861          scan.addFamily(familyName);
1862        }
1863      }
1864      if (opts.filterAll) {
1865        scan.setFilter(new FilterAllFilter());
1866      }
1867      Result r = null;
1868      int count = 0;
1869      ResultScanner s = this.table.getScanner(scan);
1870      try {
1871        for (; (r = s.next()) != null;) {
1872          updateValueSize(r);
1873          count++;
1874        }
1875        if (i % 100 == 0) {
1876          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1877            Bytes.toString(startAndStopRow.getFirst()),
1878            Bytes.toString(startAndStopRow.getSecond()), count));
1879        }
1880      } finally {
1881        updateScanMetrics(s.getScanMetrics());
1882        s.close();
1883      }
1884      return true;
1885    }
1886
1887    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1888
1889    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1890      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1891      int stop = start + maxRange;
1892      return new Pair<>(format(start), format(stop));
1893    }
1894
1895    @Override
1896    protected int getReportingPeriod() {
1897      int period = opts.perClientRunRows / 100;
1898      return period == 0? opts.perClientRunRows: period;
1899    }
1900  }
1901
1902  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1903    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1904      super(con, options, status);
1905    }
1906
1907    @Override
1908    protected Pair<byte[], byte[]> getStartAndStopRow() {
1909      return generateStartAndStopRows(10);
1910    }
1911  }
1912
1913  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1914    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1915      super(con, options, status);
1916    }
1917
1918    @Override
1919    protected Pair<byte[], byte[]> getStartAndStopRow() {
1920      return generateStartAndStopRows(100);
1921    }
1922  }
1923
1924  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1925    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1926      super(con, options, status);
1927    }
1928
1929    @Override
1930    protected Pair<byte[], byte[]> getStartAndStopRow() {
1931      return generateStartAndStopRows(1000);
1932    }
1933  }
1934
1935  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1936    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1937      super(con, options, status);
1938    }
1939
1940    @Override
1941    protected Pair<byte[], byte[]> getStartAndStopRow() {
1942      return generateStartAndStopRows(10000);
1943    }
1944  }
1945
1946  static class RandomReadTest extends TableTest {
1947    private final Consistency consistency;
1948    private ArrayList<Get> gets;
1949    private Random rd = new Random();
1950    private long numOfReplyFromReplica = 0;
1951
1952    RandomReadTest(Connection con, TestOptions options, Status status) {
1953      super(con, options, status);
1954      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1955      if (opts.multiGet > 0) {
1956        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1957        this.gets = new ArrayList<>(opts.multiGet);
1958      }
1959    }
1960
1961    @Override
1962    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
1963      if (opts.randomSleep > 0) {
1964        Thread.sleep(rd.nextInt(opts.randomSleep));
1965      }
1966      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1967      for (int family = 0; family < opts.families; family++) {
1968        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1969        if (opts.addColumns) {
1970          for (int column = 0; column < opts.columns; column++) {
1971            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1972            get.addColumn(familyName, qualifier);
1973          }
1974        } else {
1975          get.addFamily(familyName);
1976        }
1977      }
1978      if (opts.filterAll) {
1979        get.setFilter(new FilterAllFilter());
1980      }
1981      get.setConsistency(consistency);
1982      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1983      if (opts.multiGet > 0) {
1984        this.gets.add(get);
1985        if (this.gets.size() == opts.multiGet) {
1986          Result [] rs = this.table.get(this.gets);
1987          if (opts.replicas > 1) {
1988            long latency = System.nanoTime() - startTime;
1989            updateValueSize(rs, latency);
1990          } else {
1991            updateValueSize(rs);
1992          }
1993          this.gets.clear();
1994        } else {
1995          return false;
1996        }
1997      } else {
1998        if (opts.replicas > 1) {
1999          Result r = this.table.get(get);
2000          long latency = System.nanoTime() - startTime;
2001          updateValueSize(r, latency);
2002        } else {
2003          updateValueSize(this.table.get(get));
2004        }
2005      }
2006      return true;
2007    }
2008
2009    @Override
2010    protected int getReportingPeriod() {
2011      int period = opts.perClientRunRows / 10;
2012      return period == 0 ? opts.perClientRunRows : period;
2013    }
2014
2015    @Override
2016    protected void testTakedown() throws IOException {
2017      if (this.gets != null && this.gets.size() > 0) {
2018        this.table.get(gets);
2019        this.gets.clear();
2020      }
2021      super.testTakedown();
2022    }
2023  }
2024
2025  /*
2026  Send random reads against fake regions inserted by MetaWriteTest
2027   */
2028  static class MetaRandomReadTest extends MetaTest {
2029    private Random rd = new Random();
2030    private RegionLocator regionLocator;
2031
2032    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2033      super(con, options, status);
2034      LOG.info("call getRegionLocation");
2035    }
2036
2037    @Override
2038    void onStartup() throws IOException {
2039      super.onStartup();
2040      this.regionLocator = connection.getRegionLocator(table.getName());
2041    }
2042
2043    @Override
2044    boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
2045      if (opts.randomSleep > 0) {
2046        Thread.sleep(rd.nextInt(opts.randomSleep));
2047      }
2048      HRegionLocation hRegionLocation = regionLocator.getRegionLocation(
2049        getSplitKey(rd.nextInt(opts.perClientRunRows)), true);
2050      LOG.debug("get location for region: " + hRegionLocation);
2051      return true;
2052    }
2053
2054    @Override
2055    protected int getReportingPeriod() {
2056      int period = opts.perClientRunRows / 10;
2057      return period == 0 ? opts.perClientRunRows : period;
2058    }
2059
2060    @Override
2061    protected void testTakedown() throws IOException {
2062      super.testTakedown();
2063    }
2064  }
2065
2066  static class RandomWriteTest extends SequentialWriteTest {
2067    RandomWriteTest(Connection con, TestOptions options, Status status) {
2068      super(con, options, status);
2069    }
2070
2071    @Override
2072    protected byte[] generateRow(final int i) {
2073      return getRandomRow(this.rand, opts.totalRows);
2074    }
2075
2076
2077  }
2078
2079  static class ScanTest extends TableTest {
2080    private ResultScanner testScanner;
2081
2082    ScanTest(Connection con, TestOptions options, Status status) {
2083      super(con, options, status);
2084    }
2085
2086    @Override
2087    void testTakedown() throws IOException {
2088      if (this.testScanner != null) {
2089        this.testScanner.close();
2090      }
2091      super.testTakedown();
2092    }
2093
2094
2095    @Override
2096    boolean testRow(final int i, final long startTime) throws IOException {
2097      if (this.testScanner == null) {
2098        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2099            .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2100            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2101        for (int family = 0; family < opts.families; family++) {
2102          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2103          if (opts.addColumns) {
2104            for (int column = 0; column < opts.columns; column++) {
2105              byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2106              scan.addColumn(familyName, qualifier);
2107            }
2108          } else {
2109            scan.addFamily(familyName);
2110          }
2111        }
2112        if (opts.filterAll) {
2113          scan.setFilter(new FilterAllFilter());
2114        }
2115        this.testScanner = table.getScanner(scan);
2116      }
2117      Result r = testScanner.next();
2118      updateValueSize(r);
2119      return true;
2120    }
2121  }
2122
2123  /**
2124   * Base class for operations that are CAS-like; that read a value and then set it based off what
2125   * they read. In this category is increment, append, checkAndPut, etc.
2126   *
2127   * <p>These operations also want some concurrency going on. Usually when these tests run, they
2128   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2129   * same key space. We do this with our getStartRow and getLastRow overrides.
2130   */
2131  static abstract class CASTableTest extends TableTest {
2132    private final byte [] qualifier;
2133    CASTableTest(Connection con, TestOptions options, Status status) {
2134      super(con, options, status);
2135      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2136    }
2137
2138    byte [] getQualifier() {
2139      return this.qualifier;
2140    }
2141
2142    @Override
2143    int getStartRow() {
2144      return 0;
2145    }
2146
2147    @Override
2148    int getLastRow() {
2149      return opts.perClientRunRows;
2150    }
2151  }
2152
2153  static class IncrementTest extends CASTableTest {
2154    IncrementTest(Connection con, TestOptions options, Status status) {
2155      super(con, options, status);
2156    }
2157
2158    @Override
2159    boolean testRow(final int i, final long startTime) throws IOException {
2160      Increment increment = new Increment(format(i));
2161      // unlike checkAndXXX tests, which make most sense to do on a single value,
2162      // if multiple families are specified for an increment test we assume it is
2163      // meant to raise the work factor
2164      for (int family = 0; family < opts.families; family++) {
2165        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2166        increment.addColumn(familyName, getQualifier(), 1l);
2167      }
2168      updateValueSize(this.table.increment(increment));
2169      return true;
2170    }
2171  }
2172
2173  static class AppendTest extends CASTableTest {
2174    AppendTest(Connection con, TestOptions options, Status status) {
2175      super(con, options, status);
2176    }
2177
2178    @Override
2179    boolean testRow(final int i, final long startTime) throws IOException {
2180      byte [] bytes = format(i);
2181      Append append = new Append(bytes);
2182      // unlike checkAndXXX tests, which make most sense to do on a single value,
2183      // if multiple families are specified for an append test we assume it is
2184      // meant to raise the work factor
2185      for (int family = 0; family < opts.families; family++) {
2186        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2187        append.addColumn(familyName, getQualifier(), bytes);
2188      }
2189      updateValueSize(this.table.append(append));
2190      return true;
2191    }
2192  }
2193
2194  static class CheckAndMutateTest extends CASTableTest {
2195    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2196      super(con, options, status);
2197    }
2198
2199    @Override
2200    boolean testRow(final int i, final long startTime) throws IOException {
2201      final byte [] bytes = format(i);
2202      // checkAndXXX tests operate on only a single value
2203      // Put a known value so when we go to check it, it is there.
2204      Put put = new Put(bytes);
2205      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2206      this.table.put(put);
2207      RowMutations mutations = new RowMutations(bytes);
2208      mutations.add(put);
2209      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2210          .ifEquals(bytes).thenMutate(mutations);
2211      return true;
2212    }
2213  }
2214
2215  static class CheckAndPutTest extends CASTableTest {
2216    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2217      super(con, options, status);
2218    }
2219
2220    @Override
2221    boolean testRow(final int i, final long startTime) throws IOException {
2222      final byte [] bytes = format(i);
2223      // checkAndXXX tests operate on only a single value
2224      // Put a known value so when we go to check it, it is there.
2225      Put put = new Put(bytes);
2226      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2227      this.table.put(put);
2228      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2229          .ifEquals(bytes).thenPut(put);
2230      return true;
2231    }
2232  }
2233
2234  static class CheckAndDeleteTest extends CASTableTest {
2235    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2236      super(con, options, status);
2237    }
2238
2239    @Override
2240    boolean testRow(final int i, final long startTime) throws IOException {
2241      final byte [] bytes = format(i);
2242      // checkAndXXX tests operate on only a single value
2243      // Put a known value so when we go to check it, it is there.
2244      Put put = new Put(bytes);
2245      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2246      this.table.put(put);
2247      Delete delete = new Delete(put.getRow());
2248      delete.addColumn(FAMILY_ZERO, getQualifier());
2249      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2250          .ifEquals(bytes).thenDelete(delete);
2251      return true;
2252    }
2253  }
2254
2255  /*
2256  Delete all fake regions inserted to meta table by MetaWriteTest.
2257   */
2258  static class CleanMetaTest extends MetaTest {
2259    CleanMetaTest(Connection con, TestOptions options, Status status) {
2260      super(con, options, status);
2261    }
2262
2263    @Override
2264    boolean testRow(final int i, final long startTime) throws IOException {
2265      try {
2266        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2267          .getRegionLocation(getSplitKey(i), false).getRegion();
2268        LOG.debug("deleting region from meta: " + regionInfo);
2269
2270        Delete delete = MetaTableAccessor
2271          .makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2272        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2273          t.delete(delete);
2274        }
2275      } catch (IOException ie) {
2276        // Log and continue
2277        LOG.error("cannot find region with start key: " + i);
2278      }
2279      return true;
2280    }
2281  }
2282
2283  static class SequentialReadTest extends TableTest {
2284    SequentialReadTest(Connection con, TestOptions options, Status status) {
2285      super(con, options, status);
2286    }
2287
2288    @Override
2289    boolean testRow(final int i, final long startTime) throws IOException {
2290      Get get = new Get(format(i));
2291      for (int family = 0; family < opts.families; family++) {
2292        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2293        if (opts.addColumns) {
2294          for (int column = 0; column < opts.columns; column++) {
2295            byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2296            get.addColumn(familyName, qualifier);
2297          }
2298        } else {
2299          get.addFamily(familyName);
2300        }
2301      }
2302      if (opts.filterAll) {
2303        get.setFilter(new FilterAllFilter());
2304      }
2305      updateValueSize(table.get(get));
2306      return true;
2307    }
2308  }
2309
2310  static class SequentialWriteTest extends BufferedMutatorTest {
2311    private ArrayList<Put> puts;
2312
2313
2314    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2315      super(con, options, status);
2316      if (opts.multiPut > 0) {
2317        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2318        this.puts = new ArrayList<>(opts.multiPut);
2319      }
2320    }
2321
2322    protected byte[] generateRow(final int i) {
2323      return format(i);
2324    }
2325
2326    @Override
2327    boolean testRow(final int i, final long startTime) throws IOException {
2328      byte[] row = generateRow(i);
2329      Put put = new Put(row);
2330      for (int family = 0; family < opts.families; family++) {
2331        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2332        for (int column = 0; column < opts.columns; column++) {
2333          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2334          byte[] value = generateData(this.rand, getValueLength(this.rand));
2335          if (opts.useTags) {
2336            byte[] tag = generateData(this.rand, TAG_LENGTH);
2337            Tag[] tags = new Tag[opts.noOfTags];
2338            for (int n = 0; n < opts.noOfTags; n++) {
2339              Tag t = new ArrayBackedTag((byte) n, tag);
2340              tags[n] = t;
2341            }
2342            KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2343              value, tags);
2344            put.add(kv);
2345            updateValueSize(kv.getValueLength());
2346          } else {
2347            put.addColumn(familyName, qualifier, value);
2348            updateValueSize(value.length);
2349          }
2350        }
2351      }
2352      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2353      if (opts.autoFlush) {
2354        if (opts.multiPut > 0) {
2355          this.puts.add(put);
2356          if (this.puts.size() == opts.multiPut) {
2357            table.put(this.puts);
2358            this.puts.clear();
2359          } else {
2360            return false;
2361          }
2362        } else {
2363          table.put(put);
2364        }
2365      } else {
2366        mutator.mutate(put);
2367      }
2368      return true;
2369    }
2370  }
2371
2372  /*
2373  Insert fake regions into meta table with contiguous split keys.
2374   */
2375  static class MetaWriteTest extends MetaTest {
2376
2377    MetaWriteTest(Connection con, TestOptions options, Status status) {
2378      super(con, options, status);
2379    }
2380
2381    @Override
2382    boolean testRow(final int i, final long startTime) throws IOException {
2383      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2384      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2385        .setStartKey(getSplitKey(i))
2386        .setEndKey(getSplitKey(i + 1))
2387        .build());
2388      regionInfos.add(regionInfo);
2389      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2390
2391      // write the serverName columns
2392      MetaTableAccessor.updateRegionLocation(connection,
2393        regionInfo, ServerName.valueOf("localhost", 60010, rand.nextLong()), i,
2394        System.currentTimeMillis());
2395      return true;
2396    }
2397  }
2398  static class FilteredScanTest extends TableTest {
2399    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2400
2401    FilteredScanTest(Connection con, TestOptions options, Status status) {
2402      super(con, options, status);
2403      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2404        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB +
2405                ". This could take a very long time.");
2406      }
2407    }
2408
2409    @Override
2410    boolean testRow(int i, final long startTime) throws IOException {
2411      byte[] value = generateData(this.rand, getValueLength(this.rand));
2412      Scan scan = constructScan(value);
2413      ResultScanner scanner = null;
2414      try {
2415        scanner = this.table.getScanner(scan);
2416        for (Result r = null; (r = scanner.next()) != null;) {
2417          updateValueSize(r);
2418        }
2419      } finally {
2420        if (scanner != null) {
2421          updateScanMetrics(scanner.getScanMetrics());
2422          scanner.close();
2423        }
2424      }
2425      return true;
2426    }
2427
2428    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2429      FilterList list = new FilterList();
2430      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2431        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2432      list.addFilter(filter);
2433      if (opts.filterAll) {
2434        list.addFilter(new FilterAllFilter());
2435      }
2436      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2437          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2438          .setScanMetricsEnabled(true);
2439      if (opts.addColumns) {
2440        for (int column = 0; column < opts.columns; column++) {
2441          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2442          scan.addColumn(FAMILY_ZERO, qualifier);
2443        }
2444      } else {
2445        scan.addFamily(FAMILY_ZERO);
2446      }
2447      scan.setFilter(list);
2448      return scan;
2449    }
2450  }
2451
2452  /**
2453   * Compute a throughput rate in MB/s.
2454   * @param rows Number of records consumed.
2455   * @param timeMs Time taken in milliseconds.
2456   * @return String value with label, ie '123.76 MB/s'
2457   */
2458  private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2459    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2460      ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2461    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2462      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2463      .divide(BYTES_PER_MB, CXT);
2464    return FMT.format(mbps) + " MB/s";
2465  }
2466
2467  /*
2468   * Format passed integer.
2469   * @param number
2470   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2471   * number (Does absolute in case number is negative).
2472   */
2473  public static byte [] format(final int number) {
2474    byte [] b = new byte[ROW_LENGTH];
2475    int d = Math.abs(number);
2476    for (int i = b.length - 1; i >= 0; i--) {
2477      b[i] = (byte)((d % 10) + '0');
2478      d /= 10;
2479    }
2480    return b;
2481  }
2482
2483  /*
2484   * This method takes some time and is done inline uploading data.  For
2485   * example, doing the mapfile test, generation of the key and value
2486   * consumes about 30% of CPU time.
2487   * @return Generated random value to insert into a table cell.
2488   */
2489  public static byte[] generateData(final Random r, int length) {
2490    byte [] b = new byte [length];
2491    int i;
2492
2493    for(i = 0; i < (length-8); i += 8) {
2494      b[i] = (byte) (65 + r.nextInt(26));
2495      b[i+1] = b[i];
2496      b[i+2] = b[i];
2497      b[i+3] = b[i];
2498      b[i+4] = b[i];
2499      b[i+5] = b[i];
2500      b[i+6] = b[i];
2501      b[i+7] = b[i];
2502    }
2503
2504    byte a = (byte) (65 + r.nextInt(26));
2505    for(; i < length; i++) {
2506      b[i] = a;
2507    }
2508    return b;
2509  }
2510
2511  static byte [] getRandomRow(final Random random, final int totalRows) {
2512    return format(generateRandomRow(random, totalRows));
2513  }
2514
2515  static int generateRandomRow(final Random random, final int totalRows) {
2516    return random.nextInt(Integer.MAX_VALUE) % totalRows;
2517  }
2518
2519  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2520      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2521      throws IOException, InterruptedException {
2522    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2523        + opts.perClientRunRows + " rows");
2524    long totalElapsedTime;
2525
2526    final TestBase t;
2527    try {
2528      if (AsyncTest.class.isAssignableFrom(cmd)) {
2529        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2530        Constructor<? extends AsyncTest> constructor =
2531            newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2532        t = constructor.newInstance(asyncCon, opts, status);
2533      } else {
2534        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2535        Constructor<? extends Test> constructor =
2536            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2537        t = constructor.newInstance(con, opts, status);
2538      }
2539    } catch (NoSuchMethodException e) {
2540      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2541          + ".  It does not provide a constructor as described by "
2542          + "the javadoc comment.  Available constructors are: "
2543          + Arrays.toString(cmd.getConstructors()));
2544    } catch (Exception e) {
2545      throw new IllegalStateException("Failed to construct command class", e);
2546    }
2547    totalElapsedTime = t.test();
2548
2549    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2550      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2551      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2552          getAverageValueLength(opts), opts.families, opts.columns) + ")");
2553
2554    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2555      t.numOfReplyFromReplica, t.getLatencyHistogram());
2556  }
2557
2558  private static int getAverageValueLength(final TestOptions opts) {
2559    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2560  }
2561
2562  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2563      InterruptedException, ClassNotFoundException, ExecutionException {
2564    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2565    // the TestOptions introspection for us and dump the output in a readable format.
2566    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2567    Admin admin = null;
2568    Connection connection = null;
2569    try {
2570      connection = ConnectionFactory.createConnection(getConf());
2571      admin = connection.getAdmin();
2572      checkTable(admin, opts);
2573    } finally {
2574      if (admin != null) admin.close();
2575      if (connection != null) connection.close();
2576    }
2577    if (opts.nomapred) {
2578      doLocalClients(opts, getConf());
2579    } else {
2580      doMapReduce(opts, getConf());
2581    }
2582  }
2583
2584  protected void printUsage() {
2585    printUsage(PE_COMMAND_SHORTNAME, null);
2586  }
2587
2588  protected static void printUsage(final String message) {
2589    printUsage(PE_COMMAND_SHORTNAME, message);
2590  }
2591
2592  protected static void printUsageAndExit(final String message, final int exitCode) {
2593    printUsage(message);
2594    System.exit(exitCode);
2595  }
2596
2597  protected static void printUsage(final String shortName, final String message) {
2598    if (message != null && message.length() > 0) {
2599      System.err.println(message);
2600    }
2601    System.err.print("Usage: hbase " + shortName);
2602    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
2603    System.err.println();
2604    System.err.println("General Options:");
2605    System.err.println(" nomapred        Run multiple clients using threads " +
2606      "(rather than use mapreduce)");
2607    System.err.println(" oneCon          all the threads share the same connection. Default: False");
2608    System.err.println(" connCount          connections all threads share. "
2609        + "For example, if set to 2, then all thread share 2 connection. "
2610        + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2611        + "if not, connCount=thread number");
2612
2613    System.err.println(" sampleRate      Execute test on a sample of total " +
2614      "rows. Only supported by randomRead. Default: 1.0");
2615    System.err.println(" period          Report every 'period' rows: " +
2616      "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2617    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2618    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
2619      "Default: 0");
2620    System.err.println(" latency         Set to report operation latencies. Default: False");
2621    System.err.println(" latencyThreshold  Set to report number of operations with latency " +
2622      "over lantencyThreshold, unit in millisecond, default 0");
2623    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'" +
2624        " rows have been treated. Default: 0");
2625    System.err.println(" valueSize       Pass value size to use: Default: "
2626        + DEFAULT_OPTS.getValueSize());
2627    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
2628        "'valueSize'; set on read for stats on size: Default: Not set.");
2629    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2630        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2631    System.err.println();
2632    System.err.println("Table Creation / Write Tests:");
2633    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2634    System.err.println(" rows            Rows each client runs. Default: "
2635        + DEFAULT_OPTS.getPerClientRunRows()
2636        + ".  In case of randomReads and randomSeekScans this could"
2637        + " be specified along with --size to specify the number of rows to be scanned within"
2638        + " the total range specified by the size.");
2639    System.err.println(
2640      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2641          + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2642          + " use size to specify the end range and --rows"
2643          + " specifies the number of rows within that range. " + "Default: 1.0.");
2644    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2645    System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
2646      "Default: false");
2647    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
2648        "'valueSize' in zipf form: Default: Not set.");
2649    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2650    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2651    System.err.println(" multiPut        Batch puts together into groups of N. Only supported " +
2652        "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2653    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2654        + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2655        + "Recommended for accurate perf analysis (see guide). Default: disabled");
2656    System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
2657      "Default: false");
2658    System.err.println(" numoftags       Specify the no of tags that would be needed. " +
2659       "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2660    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2661    System.err.println(" columns         Columns to write per row. Default: 1");
2662    System.err.println(" families        Specify number of column families for the table. Default: 1");
2663    System.err.println();
2664    System.err.println("Read Tests:");
2665    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2666        + " there by not returning any thing back to the client.  Helps to check the server side"
2667        + " performance.  Uses FilterAllFilter internally. ");
2668    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
2669      "by randomRead. Default: disabled");
2670    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
2671      "inmemory as far as possible. Not guaranteed that reads are always served " +
2672      "from memory.  Default: false");
2673    System.err.println(" bloomFilter     Bloom filter type, one of "
2674        + Arrays.toString(BloomType.values()));
2675    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2676    System.err.println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2677        + "Uses the CompactingMemstore");
2678    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2679    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2680    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2681    System.err.println(" caching         Scan caching to use. Default: 30");
2682    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2683    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2684    System.err.println(" scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2685    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2686    System.err.println();
2687    System.err.println(" Note: -D properties will be applied to the conf used. ");
2688    System.err.println("  For example: ");
2689    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2690    System.err.println("   -Dmapreduce.task.timeout=60000");
2691    System.err.println();
2692    System.err.println("Command:");
2693    for (CmdDescriptor command : COMMANDS.values()) {
2694      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2695    }
2696    System.err.println();
2697    System.err.println("Args:");
2698    System.err.println(" nclients        Integer. Required. Total number of clients "
2699        + "(and HRegionServers) running. 1 <= value <= 500");
2700    System.err.println("Examples:");
2701    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2702    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2703    System.err.println(" To run 10 clients doing increments over ten rows:");
2704    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2705  }
2706
2707  /**
2708   * Parse options passed in via an arguments array. Assumes that array has been split
2709   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2710   * in the queue at the conclusion of this method call. It's up to the caller to deal
2711   * with these unrecognized arguments.
2712   */
2713  static TestOptions parseOpts(Queue<String> args) {
2714    TestOptions opts = new TestOptions();
2715
2716    String cmd = null;
2717    while ((cmd = args.poll()) != null) {
2718      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2719        // place item back onto queue so that caller knows parsing was incomplete
2720        args.add(cmd);
2721        break;
2722      }
2723
2724      final String nmr = "--nomapred";
2725      if (cmd.startsWith(nmr)) {
2726        opts.nomapred = true;
2727        continue;
2728      }
2729
2730      final String rows = "--rows=";
2731      if (cmd.startsWith(rows)) {
2732        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2733        continue;
2734      }
2735
2736      final String cycles = "--cycles=";
2737      if (cmd.startsWith(cycles)) {
2738        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2739        continue;
2740      }
2741
2742      final String sampleRate = "--sampleRate=";
2743      if (cmd.startsWith(sampleRate)) {
2744        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2745        continue;
2746      }
2747
2748      final String table = "--table=";
2749      if (cmd.startsWith(table)) {
2750        opts.tableName = cmd.substring(table.length());
2751        continue;
2752      }
2753
2754      final String startRow = "--startRow=";
2755      if (cmd.startsWith(startRow)) {
2756        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2757        continue;
2758      }
2759
2760      final String compress = "--compress=";
2761      if (cmd.startsWith(compress)) {
2762        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2763        continue;
2764      }
2765
2766      final String traceRate = "--traceRate=";
2767      if (cmd.startsWith(traceRate)) {
2768        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2769        continue;
2770      }
2771
2772      final String blockEncoding = "--blockEncoding=";
2773      if (cmd.startsWith(blockEncoding)) {
2774        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2775        continue;
2776      }
2777
2778      final String flushCommits = "--flushCommits=";
2779      if (cmd.startsWith(flushCommits)) {
2780        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2781        continue;
2782      }
2783
2784      final String writeToWAL = "--writeToWAL=";
2785      if (cmd.startsWith(writeToWAL)) {
2786        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2787        continue;
2788      }
2789
2790      final String presplit = "--presplit=";
2791      if (cmd.startsWith(presplit)) {
2792        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2793        continue;
2794      }
2795
2796      final String inMemory = "--inmemory=";
2797      if (cmd.startsWith(inMemory)) {
2798        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2799        continue;
2800      }
2801
2802      final String autoFlush = "--autoFlush=";
2803      if (cmd.startsWith(autoFlush)) {
2804        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2805        continue;
2806      }
2807
2808      final String onceCon = "--oneCon=";
2809      if (cmd.startsWith(onceCon)) {
2810        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2811        continue;
2812      }
2813
2814      final String connCount = "--connCount=";
2815      if (cmd.startsWith(connCount)) {
2816        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2817        continue;
2818      }
2819
2820      final String latencyThreshold = "--latencyThreshold=";
2821      if (cmd.startsWith(latencyThreshold)) {
2822        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2823        continue;
2824      }
2825
2826      final String latency = "--latency";
2827      if (cmd.startsWith(latency)) {
2828        opts.reportLatency = true;
2829        continue;
2830      }
2831
2832      final String multiGet = "--multiGet=";
2833      if (cmd.startsWith(multiGet)) {
2834        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2835        continue;
2836      }
2837
2838      final String multiPut = "--multiPut=";
2839      if (cmd.startsWith(multiPut)) {
2840        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2841        continue;
2842      }
2843
2844      final String useTags = "--usetags=";
2845      if (cmd.startsWith(useTags)) {
2846        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2847        continue;
2848      }
2849
2850      final String noOfTags = "--numoftags=";
2851      if (cmd.startsWith(noOfTags)) {
2852        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2853        continue;
2854      }
2855
2856      final String replicas = "--replicas=";
2857      if (cmd.startsWith(replicas)) {
2858        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2859        continue;
2860      }
2861
2862      final String filterOutAll = "--filterAll";
2863      if (cmd.startsWith(filterOutAll)) {
2864        opts.filterAll = true;
2865        continue;
2866      }
2867
2868      final String size = "--size=";
2869      if (cmd.startsWith(size)) {
2870        opts.size = Float.parseFloat(cmd.substring(size.length()));
2871        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2872        continue;
2873      }
2874
2875      final String splitPolicy = "--splitPolicy=";
2876      if (cmd.startsWith(splitPolicy)) {
2877        opts.splitPolicy = cmd.substring(splitPolicy.length());
2878        continue;
2879      }
2880
2881      final String randomSleep = "--randomSleep=";
2882      if (cmd.startsWith(randomSleep)) {
2883        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2884        continue;
2885      }
2886
2887      final String measureAfter = "--measureAfter=";
2888      if (cmd.startsWith(measureAfter)) {
2889        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2890        continue;
2891      }
2892
2893      final String bloomFilter = "--bloomFilter=";
2894      if (cmd.startsWith(bloomFilter)) {
2895        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2896        continue;
2897      }
2898
2899      final String blockSize = "--blockSize=";
2900      if(cmd.startsWith(blockSize) ) {
2901        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2902        continue;
2903      }
2904
2905      final String valueSize = "--valueSize=";
2906      if (cmd.startsWith(valueSize)) {
2907        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2908        continue;
2909      }
2910
2911      final String valueRandom = "--valueRandom";
2912      if (cmd.startsWith(valueRandom)) {
2913        opts.valueRandom = true;
2914        continue;
2915      }
2916
2917      final String valueZipf = "--valueZipf";
2918      if (cmd.startsWith(valueZipf)) {
2919        opts.valueZipf = true;
2920        continue;
2921      }
2922
2923      final String period = "--period=";
2924      if (cmd.startsWith(period)) {
2925        opts.period = Integer.parseInt(cmd.substring(period.length()));
2926        continue;
2927      }
2928
2929      final String addColumns = "--addColumns=";
2930      if (cmd.startsWith(addColumns)) {
2931        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2932        continue;
2933      }
2934
2935      final String inMemoryCompaction = "--inmemoryCompaction=";
2936      if (cmd.startsWith(inMemoryCompaction)) {
2937        opts.inMemoryCompaction =
2938            MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2939        continue;
2940      }
2941
2942      final String columns = "--columns=";
2943      if (cmd.startsWith(columns)) {
2944        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2945        continue;
2946      }
2947
2948      final String families = "--families=";
2949      if (cmd.startsWith(families)) {
2950        opts.families = Integer.parseInt(cmd.substring(families.length()));
2951        continue;
2952      }
2953
2954      final String caching = "--caching=";
2955      if (cmd.startsWith(caching)) {
2956        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2957        continue;
2958      }
2959
2960      final String asyncPrefetch = "--asyncPrefetch";
2961      if (cmd.startsWith(asyncPrefetch)) {
2962        opts.asyncPrefetch = true;
2963        continue;
2964      }
2965
2966      final String cacheBlocks = "--cacheBlocks=";
2967      if (cmd.startsWith(cacheBlocks)) {
2968        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2969        continue;
2970      }
2971
2972      final String scanReadType = "--scanReadType=";
2973      if (cmd.startsWith(scanReadType)) {
2974        opts.scanReadType =
2975            Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2976        continue;
2977      }
2978
2979      final String bufferSize = "--bufferSize=";
2980      if (cmd.startsWith(bufferSize)) {
2981        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2982        continue;
2983      }
2984
2985      validateParsedOpts(opts);
2986
2987      if (isCommandClass(cmd)) {
2988        opts.cmdName = cmd;
2989        try {
2990          opts.numClientThreads = Integer.parseInt(args.remove());
2991        } catch (NoSuchElementException | NumberFormatException e) {
2992          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2993        }
2994        opts = calculateRowsAndSize(opts);
2995        break;
2996      } else {
2997        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2998      }
2999
3000      // Not matching any option or command.
3001      System.err.println("Error: Wrong option or command: " + cmd);
3002      args.add(cmd);
3003      break;
3004    }
3005    return opts;
3006  }
3007
3008  /**
3009  * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3010  */
3011  private static void validateParsedOpts(TestOptions opts)  {
3012
3013    if (!opts.autoFlush && opts.multiPut > 0) {
3014      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3015    }
3016
3017    if (opts.oneCon && opts.connCount > 1) {
3018      throw new IllegalArgumentException("oneCon is set to true, "
3019          + "connCount should not bigger than 1");
3020    }
3021
3022    if (opts.valueZipf && opts.valueRandom) {
3023      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3024    }
3025  }
3026
3027  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3028    int rowsPerGB = getRowsPerGB(opts);
3029    if ((opts.getCmdName() != null
3030        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3031        && opts.size != DEFAULT_OPTS.size
3032        && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
3033      opts.totalRows = (int) opts.size * rowsPerGB;
3034    } else if (opts.size != DEFAULT_OPTS.size) {
3035      // total size in GB specified
3036      opts.totalRows = (int) opts.size * rowsPerGB;
3037      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3038    } else {
3039      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3040      opts.size = opts.totalRows / rowsPerGB;
3041    }
3042    return opts;
3043  }
3044
3045  static int getRowsPerGB(final TestOptions opts) {
3046    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
3047        opts.getColumns());
3048  }
3049
3050  @Override
3051  public int run(String[] args) throws Exception {
3052    // Process command-line args. TODO: Better cmd-line processing
3053    // (but hopefully something not as painful as cli options).
3054    int errCode = -1;
3055    if (args.length < 1) {
3056      printUsage();
3057      return errCode;
3058    }
3059
3060    try {
3061      LinkedList<String> argv = new LinkedList<>();
3062      argv.addAll(Arrays.asList(args));
3063      TestOptions opts = parseOpts(argv);
3064
3065      // args remaining, print help and exit
3066      if (!argv.isEmpty()) {
3067        errCode = 0;
3068        printUsage();
3069        return errCode;
3070      }
3071
3072      // must run at least 1 client
3073      if (opts.numClientThreads <= 0) {
3074        throw new IllegalArgumentException("Number of clients must be > 0");
3075      }
3076
3077      // cmdName should not be null, print help and exit
3078      if (opts.cmdName == null) {
3079        printUsage();
3080        return errCode;
3081      }
3082
3083      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3084      if (cmdClass != null) {
3085        runTest(cmdClass, opts);
3086        errCode = 0;
3087      }
3088
3089    } catch (Exception e) {
3090      e.printStackTrace();
3091    }
3092
3093    return errCode;
3094  }
3095
3096  private static boolean isCommandClass(String cmd) {
3097    return COMMANDS.containsKey(cmd);
3098  }
3099
3100  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3101    CmdDescriptor descriptor = COMMANDS.get(cmd);
3102    return descriptor != null ? descriptor.getCmdClass() : null;
3103  }
3104
3105  public static void main(final String[] args) throws Exception {
3106    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3107    System.exit(res);
3108  }
3109}