001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.PrintStream;
027import java.lang.reflect.Constructor;
028import java.math.BigDecimal;
029import java.math.MathContext;
030import java.text.DecimalFormat;
031import java.text.SimpleDateFormat;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Date;
035import java.util.HashMap;
036import java.util.LinkedList;
037import java.util.List;
038import java.util.Locale;
039import java.util.Map;
040import java.util.NoSuchElementException;
041import java.util.Properties;
042import java.util.Queue;
043import java.util.Random;
044import java.util.TreeMap;
045import java.util.concurrent.Callable;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.Executors;
049import java.util.concurrent.Future;
050import java.util.concurrent.ThreadLocalRandom;
051import java.util.function.Consumer;
052import java.util.stream.Collectors;
053import org.apache.commons.lang3.StringUtils;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.conf.Configured;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.Append;
060import org.apache.hadoop.hbase.client.AsyncConnection;
061import org.apache.hadoop.hbase.client.AsyncTable;
062import org.apache.hadoop.hbase.client.BufferedMutator;
063import org.apache.hadoop.hbase.client.BufferedMutatorParams;
064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
065import org.apache.hadoop.hbase.client.Connection;
066import org.apache.hadoop.hbase.client.ConnectionFactory;
067import org.apache.hadoop.hbase.client.Consistency;
068import org.apache.hadoop.hbase.client.Delete;
069import org.apache.hadoop.hbase.client.Durability;
070import org.apache.hadoop.hbase.client.Get;
071import org.apache.hadoop.hbase.client.Increment;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.client.RegionLocator;
076import org.apache.hadoop.hbase.client.Result;
077import org.apache.hadoop.hbase.client.ResultScanner;
078import org.apache.hadoop.hbase.client.RowMutations;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
084import org.apache.hadoop.hbase.filter.BinaryComparator;
085import org.apache.hadoop.hbase.filter.Filter;
086import org.apache.hadoop.hbase.filter.FilterAllFilter;
087import org.apache.hadoop.hbase.filter.FilterList;
088import org.apache.hadoop.hbase.filter.PageFilter;
089import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
090import org.apache.hadoop.hbase.filter.WhileMatchFilter;
091import org.apache.hadoop.hbase.io.compress.Compression;
092import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
093import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
094import org.apache.hadoop.hbase.regionserver.BloomType;
095import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
096import org.apache.hadoop.hbase.trace.TraceUtil;
097import org.apache.hadoop.hbase.util.ByteArrayHashKey;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.GsonUtil;
101import org.apache.hadoop.hbase.util.Hash;
102import org.apache.hadoop.hbase.util.MurmurHash;
103import org.apache.hadoop.hbase.util.Pair;
104import org.apache.hadoop.hbase.util.RandomDistribution;
105import org.apache.hadoop.hbase.util.YammerHistogramUtils;
106import org.apache.hadoop.io.LongWritable;
107import org.apache.hadoop.io.Text;
108import org.apache.hadoop.mapreduce.Job;
109import org.apache.hadoop.mapreduce.Mapper;
110import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
111import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
112import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
113import org.apache.hadoop.util.Tool;
114import org.apache.hadoop.util.ToolRunner;
115import org.apache.yetus.audience.InterfaceAudience;
116import org.slf4j.Logger;
117import org.slf4j.LoggerFactory;
118
119import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
120import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
121import org.apache.hbase.thirdparty.com.google.gson.Gson;
122
123/**
124 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
125 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
126 * etc.). Pass on the command-line which test to run and how many clients are participating in this
127 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
128 * <p>
129 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
130 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
131 * pages 8-10.
132 * <p>
133 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
134 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
135 * about 1GB of data, unless specified otherwise.
136 */
137@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
138public class PerformanceEvaluation extends Configured implements Tool {
139  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
140  static final String RANDOM_READ = "randomRead";
141  static final String PE_COMMAND_SHORTNAME = "pe";
142  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
143  private static final Gson GSON = GsonUtil.createGson().create();
144
145  public static final String TABLE_NAME = "TestTable";
146  public static final String FAMILY_NAME_BASE = "info";
147  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
148  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
149  public static final int DEFAULT_VALUE_LENGTH = 1000;
150  public static final int ROW_LENGTH = 26;
151
152  private static final int ONE_GB = 1024 * 1024 * 1000;
153  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
154  // TODO : should we make this configurable
155  private static final int TAG_LENGTH = 256;
156  private static final DecimalFormat FMT = new DecimalFormat("0.##");
157  private static final MathContext CXT = MathContext.DECIMAL64;
158  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
159  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
160  private static final TestOptions DEFAULT_OPTS = new TestOptions();
161
162  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
163  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
164
165  static {
166    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
167      "Run async random read test");
168    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
169      "Run async random write test");
170    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
171      "Run async sequential read test");
172    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
173      "Run async sequential write test");
174    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
175    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
176    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
177    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
178      "Run random seek and scan 100 test");
179    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
180      "Run random seek scan with both start and stop row (max 10 rows)");
181    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
182      "Run random seek scan with both start and stop row (max 100 rows)");
183    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
184      "Run random seek scan with both start and stop row (max 1000 rows)");
185    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
186      "Run random seek scan with both start and stop row (max 10000 rows)");
187    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
188    addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
189    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
190    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
191    addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
192      "Run sequential delete test");
193    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
194      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
195    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
196    addCommandDescriptor(ReverseScanTest.class, "reverseScan",
197      "Run reverse scan test (read every row)");
198    addCommandDescriptor(FilteredScanTest.class, "filterScan",
199      "Run scan test using a filter to find a specific row based on it's value "
200        + "(make sure to use --rows=20)");
201    addCommandDescriptor(IncrementTest.class, "increment",
202      "Increment on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(AppendTest.class, "append",
204      "Append on each row; clients overlap on keyspace so some concurrent operations");
205    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
206      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
207    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
208      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
209    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
210      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
211    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
212      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
213  }
214
215  /**
216   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
217   * associated properties.
218   */
219  protected static enum Counter {
220    /** elapsed time */
221    ELAPSED_TIME,
222    /** number of rows */
223    ROWS
224  }
225
226  protected static class RunResult implements Comparable<RunResult> {
227    public RunResult(long duration, Histogram hist) {
228      this.duration = duration;
229      this.hist = hist;
230      numbOfReplyOverThreshold = 0;
231      numOfReplyFromReplica = 0;
232    }
233
234    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
235      Histogram hist) {
236      this.duration = duration;
237      this.hist = hist;
238      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
239      this.numOfReplyFromReplica = numOfReplyFromReplica;
240    }
241
242    public final long duration;
243    public final Histogram hist;
244    public final long numbOfReplyOverThreshold;
245    public final long numOfReplyFromReplica;
246
247    @Override
248    public String toString() {
249      return Long.toString(duration);
250    }
251
252    @Override
253    public int compareTo(RunResult o) {
254      return Long.compare(this.duration, o.duration);
255    }
256
257    @Override
258    public boolean equals(Object obj) {
259      if (this == obj) {
260        return true;
261      }
262      if (obj == null || getClass() != obj.getClass()) {
263        return false;
264      }
265      return this.compareTo((RunResult) obj) == 0;
266    }
267
268    @Override
269    public int hashCode() {
270      return Long.hashCode(duration);
271    }
272  }
273
274  /**
275   * Constructor
276   * @param conf Configuration object
277   */
278  public PerformanceEvaluation(final Configuration conf) {
279    super(conf);
280  }
281
282  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
283    String description) {
284    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
285    COMMANDS.put(name, cmdDescriptor);
286  }
287
288  /**
289   * Implementations can have their status set.
290   */
291  interface Status {
292    /**
293     * Sets status
294     * @param msg status message
295     */
296    void setStatus(final String msg) throws IOException;
297  }
298
299  /**
300   * MapReduce job that runs a performance evaluation client in each map task.
301   */
302  public static class EvaluationMapTask
303    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
304
305    /** configuration parameter name that contains the command */
306    public final static String CMD_KEY = "EvaluationMapTask.command";
307    /** configuration parameter name that contains the PE impl */
308    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
309
310    private Class<? extends Test> cmd;
311
312    @Override
313    protected void setup(Context context) throws IOException, InterruptedException {
314      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
315
316      // this is required so that extensions of PE are instantiated within the
317      // map reduce task...
318      Class<? extends PerformanceEvaluation> peClass =
319        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
320      try {
321        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
322      } catch (Exception e) {
323        throw new IllegalStateException("Could not instantiate PE instance", e);
324      }
325    }
326
327    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
328      try {
329        return Class.forName(className).asSubclass(type);
330      } catch (ClassNotFoundException e) {
331        throw new IllegalStateException("Could not find class for name: " + className, e);
332      }
333    }
334
335    @Override
336    protected void map(LongWritable key, Text value, final Context context)
337      throws IOException, InterruptedException {
338
339      Status status = new Status() {
340        @Override
341        public void setStatus(String msg) {
342          context.setStatus(msg);
343        }
344      };
345
346      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
347      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
348      final Connection con = ConnectionFactory.createConnection(conf);
349      AsyncConnection asyncCon = null;
350      try {
351        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
352      } catch (ExecutionException e) {
353        throw new IOException(e);
354      }
355
356      // Evaluation task
357      RunResult result =
358        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
359      // Collect how much time the thing took. Report as map output and
360      // to the ELAPSED_TIME counter.
361      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
362      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
363      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
364      context.progress();
365    }
366  }
367
368  /*
369   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
370   * is specified or when the existing table's region replica count doesn't match {@code
371   * opts.replicas}.
372   */
373  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
374    final TableName tableName = TableName.valueOf(opts.tableName);
375    final boolean exists = admin.tableExists(tableName);
376    final boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
377      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
378    final boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
379    final boolean needsData = isReadCmd || isDeleteCmd;
380    if (!exists && needsData) {
381      throw new IllegalStateException(
382        "Must specify an existing table for read/delete commands. Run a write command first.");
383    }
384    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
385    final byte[][] splits = getSplits(opts);
386
387    // recreate the table when user has requested presplit or when existing
388    // {RegionSplitPolicy,replica count} does not match requested, or when the
389    // number of column families does not match requested.
390    final boolean regionCountChanged =
391      exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions
392        && opts.presplitRegions != admin.getRegions(tableName).size();
393    final boolean splitPolicyChanged =
394      exists && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy);
395    final boolean regionReplicationChanged = exists && desc.getRegionReplication() != opts.replicas;
396    final boolean columnFamilyCountChanged = exists && desc.getColumnFamilyCount() != opts.families;
397
398    boolean needsDelete = regionCountChanged || splitPolicyChanged || regionReplicationChanged
399      || columnFamilyCountChanged;
400
401    if (needsDelete) {
402      final List<String> errors = new ArrayList<>();
403      if (columnFamilyCountChanged) {
404        final String error = String.format("--families=%d, but found %d column families",
405          opts.families, desc.getColumnFamilyCount());
406        if (needsData) {
407          // We can't proceed the test in this case
408          throw new IllegalStateException(
409            "Cannot proceed the test. Run a write command first: " + error);
410        }
411        errors.add(error);
412      }
413      if (regionCountChanged) {
414        errors.add(String.format("--presplit=%d, but found %d regions", opts.presplitRegions,
415          admin.getRegions(tableName).size()));
416      }
417      if (splitPolicyChanged) {
418        errors.add(String.format("--splitPolicy=%s, but current policy is %s", opts.splitPolicy,
419          desc.getRegionSplitPolicyClassName()));
420      }
421      if (regionReplicationChanged) {
422        errors.add(String.format("--replicas=%d, but found %d replicas", opts.replicas,
423          desc.getRegionReplication()));
424      }
425      final String reason =
426        errors.stream().map(s -> '[' + s + ']').collect(Collectors.joining(", "));
427
428      if (needsData) {
429        LOG.warn("Unexpected or incorrect options provided for {}. "
430          + "Please verify whether the detected inconsistencies are expected or ignorable: {}. "
431          + "The test will proceed, but the results may not be reliable.", opts.cmdName, reason);
432        needsDelete = false;
433      } else {
434        LOG.info("Table will be recreated: " + reason);
435      }
436    }
437
438    // remove an existing table
439    if (needsDelete) {
440      if (admin.isTableEnabled(tableName)) {
441        admin.disableTable(tableName);
442      }
443      admin.deleteTable(tableName);
444    }
445
446    // table creation is necessary
447    if (!exists || needsDelete) {
448      desc = getTableDescriptor(opts);
449      if (splits != null) {
450        if (LOG.isDebugEnabled()) {
451          for (int i = 0; i < splits.length; i++) {
452            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
453          }
454        }
455      }
456      if (splits != null) {
457        admin.createTable(desc, splits);
458      } else {
459        admin.createTable(desc);
460      }
461      LOG.info("Table " + desc + " created");
462    }
463    return admin.tableExists(tableName);
464  }
465
466  /**
467   * Create an HTableDescriptor from provided TestOptions.
468   */
469  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
470    TableDescriptorBuilder builder =
471      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
472
473    for (int family = 0; family < opts.families; family++) {
474      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
475      ColumnFamilyDescriptorBuilder cfBuilder =
476        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
477      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
478      cfBuilder.setCompressionType(opts.compression);
479      cfBuilder.setEncryptionType(opts.encryption);
480      cfBuilder.setBloomFilterType(opts.bloomType);
481      cfBuilder.setBlocksize(opts.blockSize);
482      if (opts.inMemoryCF) {
483        cfBuilder.setInMemory(true);
484      }
485      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
486      builder.setColumnFamily(cfBuilder.build());
487    }
488    if (opts.replicas != DEFAULT_OPTS.replicas) {
489      builder.setRegionReplication(opts.replicas);
490    }
491    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
492      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
493    }
494    return builder.build();
495  }
496
497  /**
498   * generates splits based on total number of rows and specified split regions
499   */
500  protected static byte[][] getSplits(TestOptions opts) {
501    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
502
503    int numSplitPoints = opts.presplitRegions - 1;
504    byte[][] splits = new byte[numSplitPoints][];
505    long jump = opts.totalRows / opts.presplitRegions;
506    for (int i = 0; i < numSplitPoints; i++) {
507      long rowkey = jump * (1 + i);
508      splits[i] = format(rowkey);
509    }
510    return splits;
511  }
512
513  static void setupConnectionCount(final TestOptions opts) {
514    if (opts.oneCon) {
515      opts.connCount = 1;
516    } else {
517      if (opts.connCount == -1) {
518        // set to thread number if connCount is not set
519        opts.connCount = opts.numClientThreads;
520      }
521    }
522  }
523
524  /*
525   * Run all clients in this vm each to its own thread.
526   */
527  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
528    throws IOException, InterruptedException, ExecutionException {
529    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
530    assert cmd != null;
531    @SuppressWarnings("unchecked")
532    Future<RunResult>[] threads = new Future[opts.numClientThreads];
533    RunResult[] results = new RunResult[opts.numClientThreads];
534    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
535      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
536    setupConnectionCount(opts);
537    final Connection[] cons = new Connection[opts.connCount];
538    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
539    for (int i = 0; i < opts.connCount; i++) {
540      cons[i] = ConnectionFactory.createConnection(conf);
541      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
542    }
543    LOG
544      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
545    for (int i = 0; i < threads.length; i++) {
546      final int index = i;
547      threads[i] = pool.submit(new Callable<RunResult>() {
548        @Override
549        public RunResult call() throws Exception {
550          TestOptions threadOpts = new TestOptions(opts);
551          final Connection con = cons[index % cons.length];
552          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
553          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
554          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
555            @Override
556            public void setStatus(final String msg) throws IOException {
557              LOG.info(msg);
558            }
559          });
560          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
561            + "ms over " + threadOpts.perClientRunRows + " rows");
562          if (opts.latencyThreshold > 0) {
563            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
564              + "(ms) is " + run.numbOfReplyOverThreshold);
565          }
566          return run;
567        }
568      });
569    }
570    pool.shutdown();
571
572    for (int i = 0; i < threads.length; i++) {
573      try {
574        results[i] = threads[i].get();
575      } catch (ExecutionException e) {
576        throw new IOException(e.getCause());
577      }
578    }
579    final String test = cmd.getSimpleName();
580    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
581    Arrays.sort(results);
582    long total = 0;
583    float avgLatency = 0;
584    float avgTPS = 0;
585    long replicaWins = 0;
586    for (RunResult result : results) {
587      total += result.duration;
588      avgLatency += result.hist.getSnapshot().getMean();
589      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
590      replicaWins += result.numOfReplyFromReplica;
591    }
592    avgTPS *= 1000; // ms to second
593    avgLatency = avgLatency / results.length;
594    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
595      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
596    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
597    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
598    if (opts.replicas > 1) {
599      LOG.info("[results from replica regions] " + replicaWins);
600    }
601
602    for (int i = 0; i < opts.connCount; i++) {
603      cons[i].close();
604      asyncCons[i].close();
605    }
606
607    return results;
608  }
609
610  /*
611   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
612   * out an input file with instruction per client regards which row they are to start on.
613   * @param cmd Command to run.
614   */
615  static Job doMapReduce(TestOptions opts, final Configuration conf)
616    throws IOException, InterruptedException, ClassNotFoundException {
617    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
618    assert cmd != null;
619    Path inputDir = writeInputFile(conf, opts);
620    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
621    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
622    Job job = Job.getInstance(conf);
623    job.setJarByClass(PerformanceEvaluation.class);
624    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
625
626    job.setInputFormatClass(NLineInputFormat.class);
627    NLineInputFormat.setInputPaths(job, inputDir);
628    // this is default, but be explicit about it just in case.
629    NLineInputFormat.setNumLinesPerSplit(job, 1);
630
631    job.setOutputKeyClass(LongWritable.class);
632    job.setOutputValueClass(LongWritable.class);
633
634    job.setMapperClass(EvaluationMapTask.class);
635    job.setReducerClass(LongSumReducer.class);
636
637    job.setNumReduceTasks(1);
638
639    job.setOutputFormatClass(TextOutputFormat.class);
640    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
641
642    TableMapReduceUtil.addDependencyJars(job);
643    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
644                                                                                            // metrics
645      Gson.class, // gson
646      FilterAllFilter.class // hbase-server tests jar
647    );
648
649    TableMapReduceUtil.initCredentials(job);
650
651    job.waitForCompletion(true);
652    return job;
653  }
654
655  /**
656   * Each client has one mapper to do the work, and client do the resulting count in a map task.
657   */
658
659  static String JOB_INPUT_FILENAME = "input.txt";
660
661  /*
662   * Write input file of offsets-per-client for the mapreduce job.
663   * @param c Configuration
664   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
665   */
666  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
667    return writeInputFile(c, opts, new Path("."));
668  }
669
670  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
671    throws IOException {
672    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
673    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
674    Path inputDir = new Path(jobdir, "inputs");
675
676    FileSystem fs = FileSystem.get(c);
677    fs.mkdirs(inputDir);
678
679    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
680    PrintStream out = new PrintStream(fs.create(inputFile));
681    // Make input random.
682    Map<Integer, String> m = new TreeMap<>();
683    Hash h = MurmurHash.getInstance();
684    long perClientRows = (opts.totalRows / opts.numClientThreads);
685    try {
686      for (int j = 0; j < opts.numClientThreads; j++) {
687        TestOptions next = new TestOptions(opts);
688        next.startRow = j * perClientRows;
689        next.perClientRunRows = perClientRows;
690        String s = GSON.toJson(next);
691        LOG.info("Client=" + j + ", input=" + s);
692        byte[] b = Bytes.toBytes(s);
693        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
694        m.put(hash, s);
695      }
696      for (Map.Entry<Integer, String> e : m.entrySet()) {
697        out.println(e.getValue());
698      }
699    } finally {
700      out.close();
701    }
702    return inputDir;
703  }
704
705  /**
706   * Describes a command.
707   */
708  static class CmdDescriptor {
709    private Class<? extends TestBase> cmdClass;
710    private String name;
711    private String description;
712
713    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
714      this.cmdClass = cmdClass;
715      this.name = name;
716      this.description = description;
717    }
718
719    public Class<? extends TestBase> getCmdClass() {
720      return cmdClass;
721    }
722
723    public String getName() {
724      return name;
725    }
726
727    public String getDescription() {
728      return description;
729    }
730  }
731
732  /**
733   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
734   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
735   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
736   * need to add to the clone constructor below copying your new option from the 'that' to the
737   * 'this'. Look for 'clone' below.
738   */
739  static class TestOptions {
740    String cmdName = null;
741    boolean nomapred = false;
742    boolean filterAll = false;
743    long startRow = 0;
744    float size = 1.0f;
745    long perClientRunRows = DEFAULT_ROWS_PER_GB;
746    int numClientThreads = 1;
747    long totalRows = DEFAULT_ROWS_PER_GB;
748    int measureAfter = 0;
749    float sampleRate = 1.0f;
750    /**
751     * @deprecated Useless after switching to OpenTelemetry
752     */
753    @Deprecated
754    double traceRate = 0.0;
755    String tableName = TABLE_NAME;
756    boolean flushCommits = true;
757    boolean writeToWAL = true;
758    boolean autoFlush = false;
759    boolean oneCon = false;
760    int connCount = -1; // wil decide the actual num later
761    boolean useTags = false;
762    int noOfTags = 1;
763    boolean reportLatency = false;
764    int multiGet = 0;
765    int multiPut = 0;
766    int randomSleep = 0;
767    boolean inMemoryCF = false;
768    int presplitRegions = 0;
769    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
770    String splitPolicy = null;
771    Compression.Algorithm compression = Compression.Algorithm.NONE;
772    String encryption = null;
773    BloomType bloomType = BloomType.ROW;
774    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
775    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
776    boolean valueRandom = false;
777    boolean valueZipf = false;
778    int valueSize = DEFAULT_VALUE_LENGTH;
779    long period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
780    int cycles = 1;
781    int columns = 1;
782    int families = 1;
783    int caching = 30;
784    int latencyThreshold = 0; // in millsecond
785    boolean addColumns = true;
786    MemoryCompactionPolicy inMemoryCompaction =
787      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
788    boolean asyncPrefetch = false;
789    boolean cacheBlocks = true;
790    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
791    long bufferSize = 2l * 1024l * 1024l;
792    Properties commandProperties;
793
794    public TestOptions() {
795    }
796
797    /**
798     * Clone constructor.
799     * @param that Object to copy from.
800     */
801    public TestOptions(TestOptions that) {
802      this.cmdName = that.cmdName;
803      this.cycles = that.cycles;
804      this.nomapred = that.nomapred;
805      this.startRow = that.startRow;
806      this.size = that.size;
807      this.perClientRunRows = that.perClientRunRows;
808      this.numClientThreads = that.numClientThreads;
809      this.totalRows = that.totalRows;
810      this.sampleRate = that.sampleRate;
811      this.traceRate = that.traceRate;
812      this.tableName = that.tableName;
813      this.flushCommits = that.flushCommits;
814      this.writeToWAL = that.writeToWAL;
815      this.autoFlush = that.autoFlush;
816      this.oneCon = that.oneCon;
817      this.connCount = that.connCount;
818      this.useTags = that.useTags;
819      this.noOfTags = that.noOfTags;
820      this.reportLatency = that.reportLatency;
821      this.latencyThreshold = that.latencyThreshold;
822      this.multiGet = that.multiGet;
823      this.multiPut = that.multiPut;
824      this.inMemoryCF = that.inMemoryCF;
825      this.presplitRegions = that.presplitRegions;
826      this.replicas = that.replicas;
827      this.splitPolicy = that.splitPolicy;
828      this.compression = that.compression;
829      this.encryption = that.encryption;
830      this.blockEncoding = that.blockEncoding;
831      this.filterAll = that.filterAll;
832      this.bloomType = that.bloomType;
833      this.blockSize = that.blockSize;
834      this.valueRandom = that.valueRandom;
835      this.valueZipf = that.valueZipf;
836      this.valueSize = that.valueSize;
837      this.period = that.period;
838      this.randomSleep = that.randomSleep;
839      this.measureAfter = that.measureAfter;
840      this.addColumns = that.addColumns;
841      this.columns = that.columns;
842      this.families = that.families;
843      this.caching = that.caching;
844      this.inMemoryCompaction = that.inMemoryCompaction;
845      this.asyncPrefetch = that.asyncPrefetch;
846      this.cacheBlocks = that.cacheBlocks;
847      this.scanReadType = that.scanReadType;
848      this.bufferSize = that.bufferSize;
849      this.commandProperties = that.commandProperties;
850    }
851
852    public Properties getCommandProperties() {
853      return commandProperties;
854    }
855
856    public int getCaching() {
857      return this.caching;
858    }
859
860    public void setCaching(final int caching) {
861      this.caching = caching;
862    }
863
864    public int getColumns() {
865      return this.columns;
866    }
867
868    public void setColumns(final int columns) {
869      this.columns = columns;
870    }
871
872    public int getFamilies() {
873      return this.families;
874    }
875
876    public void setFamilies(final int families) {
877      this.families = families;
878    }
879
880    public int getCycles() {
881      return this.cycles;
882    }
883
884    public void setCycles(final int cycles) {
885      this.cycles = cycles;
886    }
887
888    public boolean isValueZipf() {
889      return valueZipf;
890    }
891
892    public void setValueZipf(boolean valueZipf) {
893      this.valueZipf = valueZipf;
894    }
895
896    public String getCmdName() {
897      return cmdName;
898    }
899
900    public void setCmdName(String cmdName) {
901      this.cmdName = cmdName;
902    }
903
904    public int getRandomSleep() {
905      return randomSleep;
906    }
907
908    public void setRandomSleep(int randomSleep) {
909      this.randomSleep = randomSleep;
910    }
911
912    public int getReplicas() {
913      return replicas;
914    }
915
916    public void setReplicas(int replicas) {
917      this.replicas = replicas;
918    }
919
920    public String getSplitPolicy() {
921      return splitPolicy;
922    }
923
924    public void setSplitPolicy(String splitPolicy) {
925      this.splitPolicy = splitPolicy;
926    }
927
928    public void setNomapred(boolean nomapred) {
929      this.nomapred = nomapred;
930    }
931
932    public void setFilterAll(boolean filterAll) {
933      this.filterAll = filterAll;
934    }
935
936    public void setStartRow(long startRow) {
937      this.startRow = startRow;
938    }
939
940    public void setSize(float size) {
941      this.size = size;
942    }
943
944    public void setPerClientRunRows(int perClientRunRows) {
945      this.perClientRunRows = perClientRunRows;
946    }
947
948    public void setNumClientThreads(int numClientThreads) {
949      this.numClientThreads = numClientThreads;
950    }
951
952    public void setTotalRows(long totalRows) {
953      this.totalRows = totalRows;
954    }
955
956    public void setSampleRate(float sampleRate) {
957      this.sampleRate = sampleRate;
958    }
959
960    public void setTraceRate(double traceRate) {
961      this.traceRate = traceRate;
962    }
963
964    public void setTableName(String tableName) {
965      this.tableName = tableName;
966    }
967
968    public void setFlushCommits(boolean flushCommits) {
969      this.flushCommits = flushCommits;
970    }
971
972    public void setWriteToWAL(boolean writeToWAL) {
973      this.writeToWAL = writeToWAL;
974    }
975
976    public void setAutoFlush(boolean autoFlush) {
977      this.autoFlush = autoFlush;
978    }
979
980    public void setOneCon(boolean oneCon) {
981      this.oneCon = oneCon;
982    }
983
984    public int getConnCount() {
985      return connCount;
986    }
987
988    public void setConnCount(int connCount) {
989      this.connCount = connCount;
990    }
991
992    public void setUseTags(boolean useTags) {
993      this.useTags = useTags;
994    }
995
996    public void setNoOfTags(int noOfTags) {
997      this.noOfTags = noOfTags;
998    }
999
1000    public void setReportLatency(boolean reportLatency) {
1001      this.reportLatency = reportLatency;
1002    }
1003
1004    public void setMultiGet(int multiGet) {
1005      this.multiGet = multiGet;
1006    }
1007
1008    public void setMultiPut(int multiPut) {
1009      this.multiPut = multiPut;
1010    }
1011
1012    public void setInMemoryCF(boolean inMemoryCF) {
1013      this.inMemoryCF = inMemoryCF;
1014    }
1015
1016    public void setPresplitRegions(int presplitRegions) {
1017      this.presplitRegions = presplitRegions;
1018    }
1019
1020    public void setCompression(Compression.Algorithm compression) {
1021      this.compression = compression;
1022    }
1023
1024    public void setEncryption(String encryption) {
1025      this.encryption = encryption;
1026    }
1027
1028    public void setBloomType(BloomType bloomType) {
1029      this.bloomType = bloomType;
1030    }
1031
1032    public void setBlockSize(int blockSize) {
1033      this.blockSize = blockSize;
1034    }
1035
1036    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
1037      this.blockEncoding = blockEncoding;
1038    }
1039
1040    public void setValueRandom(boolean valueRandom) {
1041      this.valueRandom = valueRandom;
1042    }
1043
1044    public void setValueSize(int valueSize) {
1045      this.valueSize = valueSize;
1046    }
1047
1048    public void setBufferSize(long bufferSize) {
1049      this.bufferSize = bufferSize;
1050    }
1051
1052    public void setPeriod(int period) {
1053      this.period = period;
1054    }
1055
1056    public boolean isNomapred() {
1057      return nomapred;
1058    }
1059
1060    public boolean isFilterAll() {
1061      return filterAll;
1062    }
1063
1064    public long getStartRow() {
1065      return startRow;
1066    }
1067
1068    public float getSize() {
1069      return size;
1070    }
1071
1072    public long getPerClientRunRows() {
1073      return perClientRunRows;
1074    }
1075
1076    public int getNumClientThreads() {
1077      return numClientThreads;
1078    }
1079
1080    public long getTotalRows() {
1081      return totalRows;
1082    }
1083
1084    public float getSampleRate() {
1085      return sampleRate;
1086    }
1087
1088    public double getTraceRate() {
1089      return traceRate;
1090    }
1091
1092    public String getTableName() {
1093      return tableName;
1094    }
1095
1096    public boolean isFlushCommits() {
1097      return flushCommits;
1098    }
1099
1100    public boolean isWriteToWAL() {
1101      return writeToWAL;
1102    }
1103
1104    public boolean isAutoFlush() {
1105      return autoFlush;
1106    }
1107
1108    public boolean isUseTags() {
1109      return useTags;
1110    }
1111
1112    public int getNoOfTags() {
1113      return noOfTags;
1114    }
1115
1116    public boolean isReportLatency() {
1117      return reportLatency;
1118    }
1119
1120    public int getMultiGet() {
1121      return multiGet;
1122    }
1123
1124    public int getMultiPut() {
1125      return multiPut;
1126    }
1127
1128    public boolean isInMemoryCF() {
1129      return inMemoryCF;
1130    }
1131
1132    public int getPresplitRegions() {
1133      return presplitRegions;
1134    }
1135
1136    public Compression.Algorithm getCompression() {
1137      return compression;
1138    }
1139
1140    public String getEncryption() {
1141      return encryption;
1142    }
1143
1144    public DataBlockEncoding getBlockEncoding() {
1145      return blockEncoding;
1146    }
1147
1148    public boolean isValueRandom() {
1149      return valueRandom;
1150    }
1151
1152    public int getValueSize() {
1153      return valueSize;
1154    }
1155
1156    public long getPeriod() {
1157      return period;
1158    }
1159
1160    public BloomType getBloomType() {
1161      return bloomType;
1162    }
1163
1164    public int getBlockSize() {
1165      return blockSize;
1166    }
1167
1168    public boolean isOneCon() {
1169      return oneCon;
1170    }
1171
1172    public int getMeasureAfter() {
1173      return measureAfter;
1174    }
1175
1176    public void setMeasureAfter(int measureAfter) {
1177      this.measureAfter = measureAfter;
1178    }
1179
1180    public boolean getAddColumns() {
1181      return addColumns;
1182    }
1183
1184    public void setAddColumns(boolean addColumns) {
1185      this.addColumns = addColumns;
1186    }
1187
1188    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1189      this.inMemoryCompaction = inMemoryCompaction;
1190    }
1191
1192    public MemoryCompactionPolicy getInMemoryCompaction() {
1193      return this.inMemoryCompaction;
1194    }
1195
1196    public long getBufferSize() {
1197      return this.bufferSize;
1198    }
1199  }
1200
1201  /*
1202   * A test. Subclass to particularize what happens per row.
1203   */
1204  static abstract class TestBase {
1205    private final long everyN;
1206
1207    protected final Configuration conf;
1208    protected final TestOptions opts;
1209
1210    protected final Status status;
1211
1212    private String testName;
1213    protected Histogram latencyHistogram;
1214    private Histogram replicaLatencyHistogram;
1215    private Histogram valueSizeHistogram;
1216    private Histogram rpcCallsHistogram;
1217    private Histogram remoteRpcCallsHistogram;
1218    private Histogram millisBetweenNextHistogram;
1219    private Histogram regionsScannedHistogram;
1220    private Histogram bytesInResultsHistogram;
1221    private Histogram bytesInRemoteResultsHistogram;
1222    private RandomDistribution.Zipf zipf;
1223    private long numOfReplyOverLatencyThreshold = 0;
1224    private long numOfReplyFromReplica = 0;
1225
1226    /**
1227     * Note that all subclasses of this class must provide a public constructor that has the exact
1228     * same list of arguments.
1229     */
1230    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1231      this.conf = conf;
1232      this.opts = options;
1233      this.status = status;
1234      this.testName = this.getClass().getSimpleName();
1235      everyN = (long) (1 / opts.sampleRate);
1236      if (options.isValueZipf()) {
1237        this.zipf =
1238          new RandomDistribution.Zipf(ThreadLocalRandom.current(), 1, options.getValueSize(), 1.2);
1239      }
1240      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1241    }
1242
1243    int getValueLength() {
1244      if (this.opts.isValueRandom()) {
1245        return ThreadLocalRandom.current().nextInt(opts.valueSize);
1246      } else if (this.opts.isValueZipf()) {
1247        return Math.abs(this.zipf.nextInt());
1248      } else {
1249        return opts.valueSize;
1250      }
1251    }
1252
1253    void updateValueSize(final Result[] rs) throws IOException {
1254      updateValueSize(rs, 0);
1255    }
1256
1257    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1258      if (rs == null || (latency == 0)) return;
1259      for (Result r : rs)
1260        updateValueSize(r, latency);
1261    }
1262
1263    void updateValueSize(final Result r) throws IOException {
1264      updateValueSize(r, 0);
1265    }
1266
1267    void updateValueSize(final Result r, final long latency) throws IOException {
1268      if (r == null || (latency == 0)) return;
1269      int size = 0;
1270      // update replicaHistogram
1271      if (r.isStale()) {
1272        replicaLatencyHistogram.update(latency / 1000);
1273        numOfReplyFromReplica++;
1274      }
1275      if (!isRandomValueSize()) return;
1276
1277      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1278        size += scanner.current().getValueLength();
1279      }
1280      updateValueSize(size);
1281    }
1282
1283    void updateValueSize(final int valueSize) {
1284      if (!isRandomValueSize()) return;
1285      this.valueSizeHistogram.update(valueSize);
1286    }
1287
1288    void updateScanMetrics(final ScanMetrics metrics) {
1289      if (metrics == null) return;
1290      Map<String, Long> metricsMap = metrics.getMetricsMap();
1291      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1292      if (rpcCalls != null) {
1293        this.rpcCallsHistogram.update(rpcCalls.longValue());
1294      }
1295      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1296      if (remoteRpcCalls != null) {
1297        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1298      }
1299      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1300      if (millisBetweenNext != null) {
1301        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1302      }
1303      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1304      if (regionsScanned != null) {
1305        this.regionsScannedHistogram.update(regionsScanned.longValue());
1306      }
1307      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1308      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1309        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1310      }
1311      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1312      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1313        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1314      }
1315    }
1316
1317    String generateStatus(final long sr, final long i, final long lr) {
1318      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1319        + getShortLatencyReport() + "]"
1320        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1321    }
1322
1323    boolean isRandomValueSize() {
1324      return opts.valueRandom;
1325    }
1326
1327    protected long getReportingPeriod() {
1328      return opts.period;
1329    }
1330
1331    /**
1332     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1333     */
1334    public Histogram getLatencyHistogram() {
1335      return latencyHistogram;
1336    }
1337
1338    void testSetup() throws IOException {
1339      // test metrics
1340      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1341      // If it is a replica test, set up histogram for replica.
1342      if (opts.replicas > 1) {
1343        replicaLatencyHistogram =
1344          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1345      }
1346      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1347      // scan metrics
1348      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1349      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1350      millisBetweenNextHistogram =
1351        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1352      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1353      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1354      bytesInRemoteResultsHistogram =
1355        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1356
1357      onStartup();
1358    }
1359
1360    abstract void onStartup() throws IOException;
1361
1362    void testTakedown() throws IOException {
1363      onTakedown();
1364      // Print all stats for this thread continuously.
1365      // Synchronize on Test.class so different threads don't intermingle the
1366      // output. We can't use 'this' here because each thread has its own instance of Test class.
1367      synchronized (Test.class) {
1368        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1369        status
1370          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1371        if (opts.replicas > 1) {
1372          status.setStatus("Latency (us) from Replica Regions: "
1373            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1374        }
1375        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1376        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1377        if (valueSizeHistogram.getCount() > 0) {
1378          status.setStatus(
1379            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1380          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1381          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1382        } else {
1383          status.setStatus("No valueSize statistics available");
1384        }
1385        if (rpcCallsHistogram.getCount() > 0) {
1386          status.setStatus(
1387            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1388        }
1389        if (remoteRpcCallsHistogram.getCount() > 0) {
1390          status.setStatus("remoteRpcCalls (count): "
1391            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1392        }
1393        if (millisBetweenNextHistogram.getCount() > 0) {
1394          status.setStatus("millisBetweenNext (latency): "
1395            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1396        }
1397        if (regionsScannedHistogram.getCount() > 0) {
1398          status.setStatus("regionsScanned (count): "
1399            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1400        }
1401        if (bytesInResultsHistogram.getCount() > 0) {
1402          status.setStatus("bytesInResults (size): "
1403            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1404        }
1405        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1406          status.setStatus("bytesInRemoteResults (size): "
1407            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1408        }
1409      }
1410    }
1411
1412    abstract void onTakedown() throws IOException;
1413
1414    /*
1415     * Run test
1416     * @return Elapsed time.
1417     */
1418    long test() throws IOException, InterruptedException {
1419      testSetup();
1420      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1421      final long startTime = System.nanoTime();
1422      try {
1423        testTimed();
1424      } finally {
1425        testTakedown();
1426      }
1427      return (System.nanoTime() - startTime) / 1000000;
1428    }
1429
1430    long getStartRow() {
1431      return opts.startRow;
1432    }
1433
1434    long getLastRow() {
1435      return getStartRow() + opts.perClientRunRows;
1436    }
1437
1438    /**
1439     * Provides an extension point for tests that don't want a per row invocation.
1440     */
1441    void testTimed() throws IOException, InterruptedException {
1442      long startRow = getStartRow();
1443      long lastRow = getLastRow();
1444      // Report on completion of 1/10th of total.
1445      for (int ii = 0; ii < opts.cycles; ii++) {
1446        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1447        for (long i = startRow; i < lastRow; i++) {
1448          if (i % everyN != 0) continue;
1449          long startTime = System.nanoTime();
1450          boolean requestSent = false;
1451          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1452          try (Scope scope = span.makeCurrent()) {
1453            requestSent = testRow(i, startTime);
1454          } finally {
1455            span.end();
1456          }
1457          if ((i - startRow) > opts.measureAfter) {
1458            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1459            // first 9 times and sends the actual get request in the 10th iteration.
1460            // We should only set latency when actual request is sent because otherwise
1461            // it turns out to be 0.
1462            if (requestSent) {
1463              long latency = (System.nanoTime() - startTime) / 1000;
1464              latencyHistogram.update(latency);
1465              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1466                numOfReplyOverLatencyThreshold++;
1467              }
1468            }
1469            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1470              status.setStatus(generateStatus(startRow, i, lastRow));
1471            }
1472          }
1473        }
1474      }
1475    }
1476
1477    /** Returns Subset of the histograms' calculation. */
1478    public String getShortLatencyReport() {
1479      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1480    }
1481
1482    /** Returns Subset of the histograms' calculation. */
1483    public String getShortValueSizeReport() {
1484      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1485    }
1486
1487    /**
1488     * Test for individual row.
1489     * @param i Row index.
1490     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1491     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1492     */
1493    abstract boolean testRow(final long i, final long startTime)
1494      throws IOException, InterruptedException;
1495  }
1496
1497  static abstract class Test extends TestBase {
1498    protected Connection connection;
1499
1500    Test(final Connection con, final TestOptions options, final Status status) {
1501      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1502      this.connection = con;
1503    }
1504  }
1505
1506  static abstract class AsyncTest extends TestBase {
1507    protected AsyncConnection connection;
1508
1509    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1510      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1511      this.connection = con;
1512    }
1513  }
1514
1515  static abstract class TableTest extends Test {
1516    protected Table table;
1517
1518    TableTest(Connection con, TestOptions options, Status status) {
1519      super(con, options, status);
1520    }
1521
1522    @Override
1523    void onStartup() throws IOException {
1524      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1525    }
1526
1527    @Override
1528    void onTakedown() throws IOException {
1529      table.close();
1530    }
1531  }
1532
1533  /*
1534   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1535   */
1536  static abstract class MetaTest extends TableTest {
1537    protected int keyLength;
1538
1539    MetaTest(Connection con, TestOptions options, Status status) {
1540      super(con, options, status);
1541      keyLength = Long.toString(opts.perClientRunRows).length();
1542    }
1543
1544    @Override
1545    void onTakedown() throws IOException {
1546      // No clean up
1547    }
1548
1549    /*
1550     * Generates Lexicographically ascending strings
1551     */
1552    protected byte[] getSplitKey(final long i) {
1553      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1554    }
1555
1556  }
1557
1558  static abstract class AsyncTableTest extends AsyncTest {
1559    protected AsyncTable<?> table;
1560
1561    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1562      super(con, options, status);
1563    }
1564
1565    @Override
1566    void onStartup() throws IOException {
1567      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1568    }
1569
1570    @Override
1571    void onTakedown() throws IOException {
1572    }
1573  }
1574
1575  static class AsyncRandomReadTest extends AsyncTableTest {
1576    private final Consistency consistency;
1577    private ArrayList<Get> gets;
1578
1579    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1580      super(con, options, status);
1581      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1582      if (opts.multiGet > 0) {
1583        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1584        this.gets = new ArrayList<>(opts.multiGet);
1585      }
1586    }
1587
1588    @Override
1589    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1590      if (opts.randomSleep > 0) {
1591        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1592      }
1593      Get get = new Get(getRandomRow(opts.totalRows));
1594      for (int family = 0; family < opts.families; family++) {
1595        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1596        if (opts.addColumns) {
1597          for (int column = 0; column < opts.columns; column++) {
1598            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1599            get.addColumn(familyName, qualifier);
1600          }
1601        } else {
1602          get.addFamily(familyName);
1603        }
1604      }
1605      if (opts.filterAll) {
1606        get.setFilter(new FilterAllFilter());
1607      }
1608      get.setConsistency(consistency);
1609      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1610      try {
1611        if (opts.multiGet > 0) {
1612          this.gets.add(get);
1613          if (this.gets.size() == opts.multiGet) {
1614            Result[] rs =
1615              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1616            updateValueSize(rs);
1617            this.gets.clear();
1618          } else {
1619            return false;
1620          }
1621        } else {
1622          updateValueSize(this.table.get(get).get());
1623        }
1624      } catch (ExecutionException e) {
1625        throw new IOException(e);
1626      }
1627      return true;
1628    }
1629
1630    public static RuntimeException runtime(Throwable e) {
1631      if (e instanceof RuntimeException) {
1632        return (RuntimeException) e;
1633      }
1634      return new RuntimeException(e);
1635    }
1636
1637    public static <V> V propagate(Callable<V> callable) {
1638      try {
1639        return callable.call();
1640      } catch (Exception e) {
1641        throw runtime(e);
1642      }
1643    }
1644
1645    @Override
1646    protected long getReportingPeriod() {
1647      long period = opts.perClientRunRows / 10;
1648      return period == 0 ? opts.perClientRunRows : period;
1649    }
1650
1651    @Override
1652    protected void testTakedown() throws IOException {
1653      if (this.gets != null && this.gets.size() > 0) {
1654        this.table.get(gets);
1655        this.gets.clear();
1656      }
1657      super.testTakedown();
1658    }
1659  }
1660
1661  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1662
1663    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1664      super(con, options, status);
1665    }
1666
1667    @Override
1668    protected byte[] generateRow(final long i) {
1669      return getRandomRow(opts.totalRows);
1670    }
1671  }
1672
1673  static class AsyncScanTest extends AsyncTableTest {
1674    private ResultScanner testScanner;
1675    private AsyncTable<?> asyncTable;
1676
1677    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1678      super(con, options, status);
1679    }
1680
1681    @Override
1682    void onStartup() throws IOException {
1683      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1684        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1685    }
1686
1687    @Override
1688    void testTakedown() throws IOException {
1689      if (this.testScanner != null) {
1690        updateScanMetrics(this.testScanner.getScanMetrics());
1691        this.testScanner.close();
1692      }
1693      super.testTakedown();
1694    }
1695
1696    @Override
1697    boolean testRow(final long i, final long startTime) throws IOException {
1698      if (this.testScanner == null) {
1699        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1700          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1701          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1702        for (int family = 0; family < opts.families; family++) {
1703          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1704          if (opts.addColumns) {
1705            for (int column = 0; column < opts.columns; column++) {
1706              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1707              scan.addColumn(familyName, qualifier);
1708            }
1709          } else {
1710            scan.addFamily(familyName);
1711          }
1712        }
1713        if (opts.filterAll) {
1714          scan.setFilter(new FilterAllFilter());
1715        }
1716        this.testScanner = asyncTable.getScanner(scan);
1717      }
1718      Result r = testScanner.next();
1719      updateValueSize(r);
1720      return true;
1721    }
1722  }
1723
1724  static class AsyncSequentialReadTest extends AsyncTableTest {
1725    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1726      super(con, options, status);
1727    }
1728
1729    @Override
1730    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1731      Get get = new Get(format(i));
1732      for (int family = 0; family < opts.families; family++) {
1733        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1734        if (opts.addColumns) {
1735          for (int column = 0; column < opts.columns; column++) {
1736            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1737            get.addColumn(familyName, qualifier);
1738          }
1739        } else {
1740          get.addFamily(familyName);
1741        }
1742      }
1743      if (opts.filterAll) {
1744        get.setFilter(new FilterAllFilter());
1745      }
1746      try {
1747        updateValueSize(table.get(get).get());
1748      } catch (ExecutionException e) {
1749        throw new IOException(e);
1750      }
1751      return true;
1752    }
1753  }
1754
1755  static class AsyncSequentialWriteTest extends AsyncTableTest {
1756    private ArrayList<Put> puts;
1757
1758    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1759      super(con, options, status);
1760      if (opts.multiPut > 0) {
1761        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1762        this.puts = new ArrayList<>(opts.multiPut);
1763      }
1764    }
1765
1766    protected byte[] generateRow(final long i) {
1767      return format(i);
1768    }
1769
1770    @Override
1771    @SuppressWarnings("ReturnValueIgnored")
1772    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1773      byte[] row = generateRow(i);
1774      Put put = new Put(row);
1775      for (int family = 0; family < opts.families; family++) {
1776        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1777        for (int column = 0; column < opts.columns; column++) {
1778          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1779          byte[] value = generateData(getValueLength());
1780          if (opts.useTags) {
1781            byte[] tag = generateData(TAG_LENGTH);
1782            Tag[] tags = new Tag[opts.noOfTags];
1783            for (int n = 0; n < opts.noOfTags; n++) {
1784              Tag t = new ArrayBackedTag((byte) n, tag);
1785              tags[n] = t;
1786            }
1787            KeyValue kv =
1788              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1789            put.add(kv);
1790            updateValueSize(kv.getValueLength());
1791          } else {
1792            put.addColumn(familyName, qualifier, value);
1793            updateValueSize(value.length);
1794          }
1795        }
1796      }
1797      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1798      try {
1799        table.put(put).get();
1800        if (opts.multiPut > 0) {
1801          this.puts.add(put);
1802          if (this.puts.size() == opts.multiPut) {
1803            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1804            this.puts.clear();
1805          } else {
1806            return false;
1807          }
1808        } else {
1809          table.put(put).get();
1810        }
1811      } catch (ExecutionException e) {
1812        throw new IOException(e);
1813      }
1814      return true;
1815    }
1816  }
1817
1818  static abstract class BufferedMutatorTest extends Test {
1819    protected BufferedMutator mutator;
1820    protected Table table;
1821
1822    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1823      super(con, options, status);
1824    }
1825
1826    @Override
1827    void onStartup() throws IOException {
1828      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1829      p.writeBufferSize(opts.bufferSize);
1830      this.mutator = connection.getBufferedMutator(p);
1831      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1832    }
1833
1834    @Override
1835    void onTakedown() throws IOException {
1836      mutator.close();
1837      table.close();
1838    }
1839  }
1840
1841  static class RandomSeekScanTest extends TableTest {
1842    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1843      super(con, options, status);
1844    }
1845
1846    @Override
1847    boolean testRow(final long i, final long startTime) throws IOException {
1848      Scan scan = new Scan().withStartRow(getRandomRow(opts.totalRows)).setCaching(opts.caching)
1849        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1850        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1851      FilterList list = new FilterList();
1852      for (int family = 0; family < opts.families; family++) {
1853        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1854        if (opts.addColumns) {
1855          for (int column = 0; column < opts.columns; column++) {
1856            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1857            scan.addColumn(familyName, qualifier);
1858          }
1859        } else {
1860          scan.addFamily(familyName);
1861        }
1862      }
1863      if (opts.filterAll) {
1864        list.addFilter(new FilterAllFilter());
1865      }
1866      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1867      scan.setFilter(list);
1868      ResultScanner s = this.table.getScanner(scan);
1869      try {
1870        for (Result rr; (rr = s.next()) != null;) {
1871          updateValueSize(rr);
1872        }
1873      } finally {
1874        updateScanMetrics(s.getScanMetrics());
1875        s.close();
1876      }
1877      return true;
1878    }
1879
1880    @Override
1881    protected long getReportingPeriod() {
1882      long period = opts.perClientRunRows / 100;
1883      return period == 0 ? opts.perClientRunRows : period;
1884    }
1885
1886  }
1887
1888  static abstract class RandomScanWithRangeTest extends TableTest {
1889    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1890      super(con, options, status);
1891    }
1892
1893    @Override
1894    boolean testRow(final long i, final long startTime) throws IOException {
1895      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1896      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1897        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1898        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1899        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1900      for (int family = 0; family < opts.families; family++) {
1901        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1902        if (opts.addColumns) {
1903          for (int column = 0; column < opts.columns; column++) {
1904            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1905            scan.addColumn(familyName, qualifier);
1906          }
1907        } else {
1908          scan.addFamily(familyName);
1909        }
1910      }
1911      if (opts.filterAll) {
1912        scan.setFilter(new FilterAllFilter());
1913      }
1914      Result r = null;
1915      int count = 0;
1916      ResultScanner s = this.table.getScanner(scan);
1917      try {
1918        for (; (r = s.next()) != null;) {
1919          updateValueSize(r);
1920          count++;
1921        }
1922        if (i % 100 == 0) {
1923          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1924            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1925            count));
1926        }
1927      } finally {
1928        updateScanMetrics(s.getScanMetrics());
1929        s.close();
1930      }
1931      return true;
1932    }
1933
1934    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1935
1936    protected Pair<byte[], byte[]> generateStartAndStopRows(long maxRange) {
1937      long start = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % opts.totalRows;
1938      long stop = start + maxRange;
1939      return new Pair<>(format(start), format(stop));
1940    }
1941
1942    @Override
1943    protected long getReportingPeriod() {
1944      long period = opts.perClientRunRows / 100;
1945      return period == 0 ? opts.perClientRunRows : period;
1946    }
1947  }
1948
1949  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1950    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1951      super(con, options, status);
1952    }
1953
1954    @Override
1955    protected Pair<byte[], byte[]> getStartAndStopRow() {
1956      return generateStartAndStopRows(10);
1957    }
1958  }
1959
1960  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1961    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1962      super(con, options, status);
1963    }
1964
1965    @Override
1966    protected Pair<byte[], byte[]> getStartAndStopRow() {
1967      return generateStartAndStopRows(100);
1968    }
1969  }
1970
1971  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1972    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1973      super(con, options, status);
1974    }
1975
1976    @Override
1977    protected Pair<byte[], byte[]> getStartAndStopRow() {
1978      return generateStartAndStopRows(1000);
1979    }
1980  }
1981
1982  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1983    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1984      super(con, options, status);
1985    }
1986
1987    @Override
1988    protected Pair<byte[], byte[]> getStartAndStopRow() {
1989      return generateStartAndStopRows(10000);
1990    }
1991  }
1992
1993  static class RandomReadTest extends TableTest {
1994    private final Consistency consistency;
1995    private ArrayList<Get> gets;
1996
1997    RandomReadTest(Connection con, TestOptions options, Status status) {
1998      super(con, options, status);
1999      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
2000      if (opts.multiGet > 0) {
2001        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
2002        this.gets = new ArrayList<>(opts.multiGet);
2003      }
2004    }
2005
2006    @Override
2007    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
2008      if (opts.randomSleep > 0) {
2009        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
2010      }
2011      Get get = new Get(getRandomRow(opts.totalRows));
2012      for (int family = 0; family < opts.families; family++) {
2013        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2014        if (opts.addColumns) {
2015          for (int column = 0; column < opts.columns; column++) {
2016            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2017            get.addColumn(familyName, qualifier);
2018          }
2019        } else {
2020          get.addFamily(familyName);
2021        }
2022      }
2023      if (opts.filterAll) {
2024        get.setFilter(new FilterAllFilter());
2025      }
2026      get.setConsistency(consistency);
2027      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
2028      if (opts.multiGet > 0) {
2029        this.gets.add(get);
2030        if (this.gets.size() == opts.multiGet) {
2031          Result[] rs = this.table.get(this.gets);
2032          if (opts.replicas > 1) {
2033            long latency = System.nanoTime() - startTime;
2034            updateValueSize(rs, latency);
2035          } else {
2036            updateValueSize(rs);
2037          }
2038          this.gets.clear();
2039        } else {
2040          return false;
2041        }
2042      } else {
2043        if (opts.replicas > 1) {
2044          Result r = this.table.get(get);
2045          long latency = System.nanoTime() - startTime;
2046          updateValueSize(r, latency);
2047        } else {
2048          updateValueSize(this.table.get(get));
2049        }
2050      }
2051      return true;
2052    }
2053
2054    @Override
2055    protected long getReportingPeriod() {
2056      long period = opts.perClientRunRows / 10;
2057      return period == 0 ? opts.perClientRunRows : period;
2058    }
2059
2060    @Override
2061    protected void testTakedown() throws IOException {
2062      if (this.gets != null && this.gets.size() > 0) {
2063        this.table.get(gets);
2064        this.gets.clear();
2065      }
2066      super.testTakedown();
2067    }
2068  }
2069
2070  /*
2071   * Send random reads against fake regions inserted by MetaWriteTest
2072   */
2073  static class MetaRandomReadTest extends MetaTest {
2074    private RegionLocator regionLocator;
2075
2076    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2077      super(con, options, status);
2078      LOG.info("call getRegionLocation");
2079    }
2080
2081    @Override
2082    void onStartup() throws IOException {
2083      super.onStartup();
2084      this.regionLocator = connection.getRegionLocator(table.getName());
2085    }
2086
2087    @Override
2088    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
2089      if (opts.randomSleep > 0) {
2090        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
2091      }
2092      HRegionLocation hRegionLocation = regionLocator.getRegionLocation(
2093        getSplitKey(ThreadLocalRandom.current().nextLong(opts.perClientRunRows)), true);
2094      LOG.debug("get location for region: " + hRegionLocation);
2095      return true;
2096    }
2097
2098    @Override
2099    protected long getReportingPeriod() {
2100      long period = opts.perClientRunRows / 10;
2101      return period == 0 ? opts.perClientRunRows : period;
2102    }
2103
2104    @Override
2105    protected void testTakedown() throws IOException {
2106      super.testTakedown();
2107    }
2108  }
2109
2110  static class RandomWriteTest extends SequentialWriteTest {
2111    RandomWriteTest(Connection con, TestOptions options, Status status) {
2112      super(con, options, status);
2113    }
2114
2115    @Override
2116    protected byte[] generateRow(final long i) {
2117      return getRandomRow(opts.totalRows);
2118    }
2119
2120  }
2121
2122  static class RandomDeleteTest extends SequentialDeleteTest {
2123    RandomDeleteTest(Connection con, TestOptions options, Status status) {
2124      super(con, options, status);
2125    }
2126
2127    @Override
2128    protected byte[] generateRow(final long i) {
2129      return getRandomRow(opts.totalRows);
2130    }
2131
2132  }
2133
2134  static class ScanTest extends TableTest {
2135    private ResultScanner testScanner;
2136
2137    ScanTest(Connection con, TestOptions options, Status status) {
2138      super(con, options, status);
2139    }
2140
2141    @Override
2142    void testTakedown() throws IOException {
2143      if (this.testScanner != null) {
2144        this.testScanner.close();
2145      }
2146      super.testTakedown();
2147    }
2148
2149    @Override
2150    boolean testRow(final long i, final long startTime) throws IOException {
2151      if (this.testScanner == null) {
2152        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2153          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2154          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2155        for (int family = 0; family < opts.families; family++) {
2156          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2157          if (opts.addColumns) {
2158            for (int column = 0; column < opts.columns; column++) {
2159              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2160              scan.addColumn(familyName, qualifier);
2161            }
2162          } else {
2163            scan.addFamily(familyName);
2164          }
2165        }
2166        if (opts.filterAll) {
2167          scan.setFilter(new FilterAllFilter());
2168        }
2169        this.testScanner = table.getScanner(scan);
2170      }
2171      Result r = testScanner.next();
2172      updateValueSize(r);
2173      return true;
2174    }
2175  }
2176
2177  static class ReverseScanTest extends TableTest {
2178    private ResultScanner testScanner;
2179
2180    ReverseScanTest(Connection con, TestOptions options, Status status) {
2181      super(con, options, status);
2182    }
2183
2184    @Override
2185    void testTakedown() throws IOException {
2186      if (this.testScanner != null) {
2187        this.testScanner.close();
2188      }
2189      super.testTakedown();
2190    }
2191
2192    @Override
2193    boolean testRow(final long i, final long startTime) throws IOException {
2194      if (this.testScanner == null) {
2195        Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2196          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2197          .setScanMetricsEnabled(true).setReversed(true);
2198        for (int family = 0; family < opts.families; family++) {
2199          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2200          if (opts.addColumns) {
2201            for (int column = 0; column < opts.columns; column++) {
2202              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2203              scan.addColumn(familyName, qualifier);
2204            }
2205          } else {
2206            scan.addFamily(familyName);
2207          }
2208        }
2209        if (opts.filterAll) {
2210          scan.setFilter(new FilterAllFilter());
2211        }
2212        this.testScanner = table.getScanner(scan);
2213      }
2214      Result r = testScanner.next();
2215      updateValueSize(r);
2216      return true;
2217    }
2218  }
2219
2220  /**
2221   * Base class for operations that are CAS-like; that read a value and then set it based off what
2222   * they read. In this category is increment, append, checkAndPut, etc.
2223   * <p>
2224   * These operations also want some concurrency going on. Usually when these tests run, they
2225   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2226   * same key space. We do this with our getStartRow and getLastRow overrides.
2227   */
2228  static abstract class CASTableTest extends TableTest {
2229    private final byte[] qualifier;
2230
2231    CASTableTest(Connection con, TestOptions options, Status status) {
2232      super(con, options, status);
2233      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2234    }
2235
2236    byte[] getQualifier() {
2237      return this.qualifier;
2238    }
2239
2240    @Override
2241    long getStartRow() {
2242      return 0;
2243    }
2244
2245    @Override
2246    long getLastRow() {
2247      return opts.perClientRunRows;
2248    }
2249  }
2250
2251  static class IncrementTest extends CASTableTest {
2252    IncrementTest(Connection con, TestOptions options, Status status) {
2253      super(con, options, status);
2254    }
2255
2256    @Override
2257    boolean testRow(final long i, final long startTime) throws IOException {
2258      Increment increment = new Increment(format(i));
2259      // unlike checkAndXXX tests, which make most sense to do on a single value,
2260      // if multiple families are specified for an increment test we assume it is
2261      // meant to raise the work factor
2262      for (int family = 0; family < opts.families; family++) {
2263        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2264        increment.addColumn(familyName, getQualifier(), 1l);
2265      }
2266      updateValueSize(this.table.increment(increment));
2267      return true;
2268    }
2269  }
2270
2271  static class AppendTest extends CASTableTest {
2272    AppendTest(Connection con, TestOptions options, Status status) {
2273      super(con, options, status);
2274    }
2275
2276    @Override
2277    boolean testRow(final long i, final long startTime) throws IOException {
2278      byte[] bytes = format(i);
2279      Append append = new Append(bytes);
2280      // unlike checkAndXXX tests, which make most sense to do on a single value,
2281      // if multiple families are specified for an append test we assume it is
2282      // meant to raise the work factor
2283      for (int family = 0; family < opts.families; family++) {
2284        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2285        append.addColumn(familyName, getQualifier(), bytes);
2286      }
2287      updateValueSize(this.table.append(append));
2288      return true;
2289    }
2290  }
2291
2292  static class CheckAndMutateTest extends CASTableTest {
2293    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2294      super(con, options, status);
2295    }
2296
2297    @Override
2298    boolean testRow(final long i, final long startTime) throws IOException {
2299      final byte[] bytes = format(i);
2300      // checkAndXXX tests operate on only a single value
2301      // Put a known value so when we go to check it, it is there.
2302      Put put = new Put(bytes);
2303      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2304      this.table.put(put);
2305      RowMutations mutations = new RowMutations(bytes);
2306      mutations.add(put);
2307      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2308        .thenMutate(mutations);
2309      return true;
2310    }
2311  }
2312
2313  static class CheckAndPutTest extends CASTableTest {
2314    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2315      super(con, options, status);
2316    }
2317
2318    @Override
2319    boolean testRow(final long i, final long startTime) throws IOException {
2320      final byte[] bytes = format(i);
2321      // checkAndXXX tests operate on only a single value
2322      // Put a known value so when we go to check it, it is there.
2323      Put put = new Put(bytes);
2324      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2325      this.table.put(put);
2326      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2327        .thenPut(put);
2328      return true;
2329    }
2330  }
2331
2332  static class CheckAndDeleteTest extends CASTableTest {
2333    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2334      super(con, options, status);
2335    }
2336
2337    @Override
2338    boolean testRow(final long i, final long startTime) throws IOException {
2339      final byte[] bytes = format(i);
2340      // checkAndXXX tests operate on only a single value
2341      // Put a known value so when we go to check it, it is there.
2342      Put put = new Put(bytes);
2343      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2344      this.table.put(put);
2345      Delete delete = new Delete(put.getRow());
2346      delete.addColumn(FAMILY_ZERO, getQualifier());
2347      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2348        .thenDelete(delete);
2349      return true;
2350    }
2351  }
2352
2353  /*
2354   * Delete all fake regions inserted to meta table by MetaWriteTest.
2355   */
2356  static class CleanMetaTest extends MetaTest {
2357    CleanMetaTest(Connection con, TestOptions options, Status status) {
2358      super(con, options, status);
2359    }
2360
2361    @Override
2362    boolean testRow(final long i, final long startTime) throws IOException {
2363      try {
2364        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2365          .getRegionLocation(getSplitKey(i), false).getRegion();
2366        LOG.debug("deleting region from meta: " + regionInfo);
2367
2368        Delete delete =
2369          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2370        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2371          t.delete(delete);
2372        }
2373      } catch (IOException ie) {
2374        // Log and continue
2375        LOG.error("cannot find region with start key: " + i);
2376      }
2377      return true;
2378    }
2379  }
2380
2381  static class SequentialReadTest extends TableTest {
2382    SequentialReadTest(Connection con, TestOptions options, Status status) {
2383      super(con, options, status);
2384    }
2385
2386    @Override
2387    boolean testRow(final long i, final long startTime) throws IOException {
2388      Get get = new Get(format(i));
2389      for (int family = 0; family < opts.families; family++) {
2390        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2391        if (opts.addColumns) {
2392          for (int column = 0; column < opts.columns; column++) {
2393            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2394            get.addColumn(familyName, qualifier);
2395          }
2396        } else {
2397          get.addFamily(familyName);
2398        }
2399      }
2400      if (opts.filterAll) {
2401        get.setFilter(new FilterAllFilter());
2402      }
2403      updateValueSize(table.get(get));
2404      return true;
2405    }
2406  }
2407
2408  static class SequentialWriteTest extends BufferedMutatorTest {
2409    private ArrayList<Put> puts;
2410
2411    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2412      super(con, options, status);
2413      if (opts.multiPut > 0) {
2414        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2415        this.puts = new ArrayList<>(opts.multiPut);
2416      }
2417    }
2418
2419    protected byte[] generateRow(final long i) {
2420      return format(i);
2421    }
2422
2423    @Override
2424    boolean testRow(final long i, final long startTime) throws IOException {
2425      byte[] row = generateRow(i);
2426      Put put = new Put(row);
2427      for (int family = 0; family < opts.families; family++) {
2428        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2429        for (int column = 0; column < opts.columns; column++) {
2430          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2431          byte[] value = generateData(getValueLength());
2432          if (opts.useTags) {
2433            byte[] tag = generateData(TAG_LENGTH);
2434            Tag[] tags = new Tag[opts.noOfTags];
2435            for (int n = 0; n < opts.noOfTags; n++) {
2436              Tag t = new ArrayBackedTag((byte) n, tag);
2437              tags[n] = t;
2438            }
2439            KeyValue kv =
2440              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2441            put.add(kv);
2442            updateValueSize(kv.getValueLength());
2443          } else {
2444            put.addColumn(familyName, qualifier, value);
2445            updateValueSize(value.length);
2446          }
2447        }
2448      }
2449      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2450      if (opts.autoFlush) {
2451        if (opts.multiPut > 0) {
2452          this.puts.add(put);
2453          if (this.puts.size() == opts.multiPut) {
2454            table.put(this.puts);
2455            this.puts.clear();
2456          } else {
2457            return false;
2458          }
2459        } else {
2460          table.put(put);
2461        }
2462      } else {
2463        mutator.mutate(put);
2464      }
2465      return true;
2466    }
2467  }
2468
2469  static class SequentialDeleteTest extends BufferedMutatorTest {
2470
2471    SequentialDeleteTest(Connection con, TestOptions options, Status status) {
2472      super(con, options, status);
2473    }
2474
2475    protected byte[] generateRow(final long i) {
2476      return format(i);
2477    }
2478
2479    @Override
2480    boolean testRow(final long i, final long startTime) throws IOException {
2481      byte[] row = generateRow(i);
2482      Delete delete = new Delete(row);
2483      for (int family = 0; family < opts.families; family++) {
2484        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2485        delete.addFamily(familyName);
2486      }
2487      delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2488      if (opts.autoFlush) {
2489        table.delete(delete);
2490      } else {
2491        mutator.mutate(delete);
2492      }
2493      return true;
2494    }
2495  }
2496
2497  /*
2498   * Insert fake regions into meta table with contiguous split keys.
2499   */
2500  static class MetaWriteTest extends MetaTest {
2501
2502    MetaWriteTest(Connection con, TestOptions options, Status status) {
2503      super(con, options, status);
2504    }
2505
2506    @Override
2507    boolean testRow(final long i, final long startTime) throws IOException {
2508      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2509      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2510        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2511      regionInfos.add(regionInfo);
2512      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2513
2514      // write the serverName columns
2515      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2516        ServerName.valueOf("localhost", 60010, ThreadLocalRandom.current().nextLong()), i,
2517        EnvironmentEdgeManager.currentTime());
2518      return true;
2519    }
2520  }
2521
2522  static class FilteredScanTest extends TableTest {
2523    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2524
2525    FilteredScanTest(Connection con, TestOptions options, Status status) {
2526      super(con, options, status);
2527      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2528        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2529          + ". This could take a very long time.");
2530      }
2531    }
2532
2533    @Override
2534    boolean testRow(long i, final long startTime) throws IOException {
2535      byte[] value = generateData(getValueLength());
2536      Scan scan = constructScan(value);
2537      ResultScanner scanner = null;
2538      try {
2539        scanner = this.table.getScanner(scan);
2540        for (Result r = null; (r = scanner.next()) != null;) {
2541          updateValueSize(r);
2542        }
2543      } finally {
2544        if (scanner != null) {
2545          updateScanMetrics(scanner.getScanMetrics());
2546          scanner.close();
2547        }
2548      }
2549      return true;
2550    }
2551
2552    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2553      FilterList list = new FilterList();
2554      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2555        new BinaryComparator(valuePrefix));
2556      list.addFilter(filter);
2557      if (opts.filterAll) {
2558        list.addFilter(new FilterAllFilter());
2559      }
2560      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2561        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2562        .setScanMetricsEnabled(true);
2563      if (opts.addColumns) {
2564        for (int column = 0; column < opts.columns; column++) {
2565          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2566          scan.addColumn(FAMILY_ZERO, qualifier);
2567        }
2568      } else {
2569        scan.addFamily(FAMILY_ZERO);
2570      }
2571      scan.setFilter(list);
2572      return scan;
2573    }
2574  }
2575
2576  /**
2577   * Compute a throughput rate in MB/s.
2578   * @param rows   Number of records consumed.
2579   * @param timeMs Time taken in milliseconds.
2580   * @return String value with label, ie '123.76 MB/s'
2581   */
2582  private static String calculateMbps(long rows, long timeMs, final int valueSize, int families,
2583    int columns) {
2584    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2585      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2586    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2587      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2588    return FMT.format(mbps) + " MB/s";
2589  }
2590
2591  /*
2592   * Format passed integer.
2593   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2594   * absolute in case number is negative).
2595   */
2596  public static byte[] format(final long number) {
2597    byte[] b = new byte[ROW_LENGTH];
2598    long d = Math.abs(number);
2599    for (int i = b.length - 1; i >= 0; i--) {
2600      b[i] = (byte) ((d % 10) + '0');
2601      d /= 10;
2602    }
2603    return b;
2604  }
2605
2606  /*
2607   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2608   * test, generation of the key and value consumes about 30% of CPU time.
2609   * @return Generated random value to insert into a table cell.
2610   */
2611  public static byte[] generateData(int length) {
2612    byte[] b = new byte[length];
2613    int i;
2614
2615    Random r = ThreadLocalRandom.current();
2616    for (i = 0; i < (length - 8); i += 8) {
2617      b[i] = (byte) (65 + r.nextInt(26));
2618      b[i + 1] = b[i];
2619      b[i + 2] = b[i];
2620      b[i + 3] = b[i];
2621      b[i + 4] = b[i];
2622      b[i + 5] = b[i];
2623      b[i + 6] = b[i];
2624      b[i + 7] = b[i];
2625    }
2626
2627    byte a = (byte) (65 + r.nextInt(26));
2628    for (; i < length; i++) {
2629      b[i] = a;
2630    }
2631    return b;
2632  }
2633
2634  static byte[] getRandomRow(final long totalRows) {
2635    return format(generateRandomRow(totalRows));
2636  }
2637
2638  static long generateRandomRow(final long totalRows) {
2639    return ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % totalRows;
2640  }
2641
2642  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2643    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2644    throws IOException, InterruptedException {
2645    status.setStatus(
2646      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2647    long totalElapsedTime;
2648
2649    final TestBase t;
2650    try {
2651      if (AsyncTest.class.isAssignableFrom(cmd)) {
2652        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2653        Constructor<? extends AsyncTest> constructor =
2654          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2655        t = constructor.newInstance(asyncCon, opts, status);
2656      } else {
2657        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2658        Constructor<? extends Test> constructor =
2659          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2660        t = constructor.newInstance(con, opts, status);
2661      }
2662    } catch (NoSuchMethodException e) {
2663      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2664        + ".  It does not provide a constructor as described by "
2665        + "the javadoc comment.  Available constructors are: "
2666        + Arrays.toString(cmd.getConstructors()));
2667    } catch (Exception e) {
2668      throw new IllegalStateException("Failed to construct command class", e);
2669    }
2670    totalElapsedTime = t.test();
2671
2672    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2673      + " for " + opts.perClientRunRows + " rows" + " ("
2674      + calculateMbps((long) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2675        getAverageValueLength(opts), opts.families, opts.columns)
2676      + ")");
2677
2678    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2679      t.numOfReplyFromReplica, t.getLatencyHistogram());
2680  }
2681
2682  private static int getAverageValueLength(final TestOptions opts) {
2683    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2684  }
2685
2686  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2687    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2688    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2689    // the TestOptions introspection for us and dump the output in a readable format.
2690    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2691    Admin admin = null;
2692    Connection connection = null;
2693    try {
2694      connection = ConnectionFactory.createConnection(getConf());
2695      admin = connection.getAdmin();
2696      checkTable(admin, opts);
2697    } finally {
2698      if (admin != null) admin.close();
2699      if (connection != null) connection.close();
2700    }
2701    if (opts.nomapred) {
2702      doLocalClients(opts, getConf());
2703    } else {
2704      doMapReduce(opts, getConf());
2705    }
2706  }
2707
2708  protected void printUsage() {
2709    printUsage(PE_COMMAND_SHORTNAME, null);
2710  }
2711
2712  protected static void printUsage(final String message) {
2713    printUsage(PE_COMMAND_SHORTNAME, message);
2714  }
2715
2716  protected static void printUsageAndExit(final String message, final int exitCode) {
2717    printUsage(message);
2718    System.exit(exitCode);
2719  }
2720
2721  protected static void printUsage(final String shortName, final String message) {
2722    if (message != null && message.length() > 0) {
2723      System.err.println(message);
2724    }
2725    System.err.print("Usage: hbase " + shortName);
2726    System.err.println("  <OPTIONS> [-D<property=value>]* <command|class> <nclients>");
2727    System.err.println();
2728    System.err.println("General Options:");
2729    System.err.println(
2730      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2731    System.err
2732      .println(" oneCon          all the threads share the same connection. Default: False");
2733    System.err.println(" connCount          connections all threads share. "
2734      + "For example, if set to 2, then all thread share 2 connection. "
2735      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2736      + "if not, connCount=thread number");
2737
2738    System.err.println(" sampleRate      Execute test on a sample of total rows. Default: 1.0");
2739    System.err.println(" period          Report every 'period' rows: "
2740      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2741    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2742    System.err.println(
2743      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2744    System.err.println(" latency         Set to report operation latencies. Default: False");
2745    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2746      + "over lantencyThreshold, unit in millisecond, default 0");
2747    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2748      + " rows have been treated. Default: 0");
2749    System.err
2750      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2751    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2752      + "'valueSize'; set on read for stats on size: Default: Not set.");
2753    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2754      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2755    System.err.println();
2756    System.err.println("Table Creation / Write Tests:");
2757    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2758    System.err.println(
2759      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2760        + ".  In case of randomReads and randomSeekScans this could"
2761        + " be specified along with --size to specify the number of rows to be scanned within"
2762        + " the total range specified by the size.");
2763    System.err.println(
2764      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2765        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2766        + " use size to specify the end range and --rows"
2767        + " specifies the number of rows within that range. " + "Default: 1.0.");
2768    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2769    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2770    System.err.println(
2771      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2772    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2773      + "'valueSize' in zipf form: Default: Not set.");
2774    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2775    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2776    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2777      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2778    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2779      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2780      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2781    System.err.println(
2782      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2783    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2784      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2785    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2786    System.err.println(" columns         Columns to write per row. Default: 1");
2787    System.err
2788      .println(" families        Specify number of column families for the table. Default: 1");
2789    System.err.println();
2790    System.err.println("Read Tests:");
2791    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2792      + " there by not returning any thing back to the client.  Helps to check the server side"
2793      + " performance.  Uses FilterAllFilter internally. ");
2794    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2795      + "by randomRead. Default: disabled");
2796    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2797      + "inmemory as far as possible. Not guaranteed that reads are always served "
2798      + "from memory.  Default: false");
2799    System.err
2800      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2801    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2802    System.err
2803      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2804        + "Uses the CompactingMemstore");
2805    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2806    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2807    System.err.println(
2808      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2809    System.err.println(" caching         Scan caching to use. Default: 30");
2810    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2811    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2812    System.err.println(
2813      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2814    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2815    System.err.println();
2816    System.err.println(" Note: -D properties will be applied to the conf used. ");
2817    System.err.println("  For example: ");
2818    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2819    System.err.println("   -Dmapreduce.task.timeout=60000");
2820    System.err.println();
2821    System.err.println("Command:");
2822    for (CmdDescriptor command : COMMANDS.values()) {
2823      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2824    }
2825    System.err.println();
2826    System.err.println("Class:");
2827    System.err.println("To run any custom implementation of PerformanceEvaluation.Test, "
2828      + "provide the classname of the implementaion class in place of "
2829      + "command name and it will be loaded at runtime from classpath.:");
2830    System.err.println("Please consider to contribute back "
2831      + "this custom test impl into a builtin PE command for the benefit of the community");
2832    System.err.println();
2833    System.err.println("Args:");
2834    System.err.println(" nclients        Integer. Required. Total number of clients "
2835      + "(and HRegionServers) running. 1 <= value <= 500");
2836    System.err.println("Examples:");
2837    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2838    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2839    System.err.println(" To run 10 clients doing increments over ten rows:");
2840    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2841  }
2842
2843  /**
2844   * Parse options passed in via an arguments array. Assumes that array has been split on
2845   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2846   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2847   * arguments.
2848   */
2849  static TestOptions parseOpts(Queue<String> args) {
2850    final TestOptions opts = new TestOptions();
2851
2852    final Map<String, Consumer<Boolean>> flagHandlers = new HashMap<>();
2853    flagHandlers.put("nomapred", v -> opts.nomapred = v);
2854    flagHandlers.put("flushCommits", v -> opts.flushCommits = v);
2855    flagHandlers.put("writeToWAL", v -> opts.writeToWAL = v);
2856    flagHandlers.put("inmemory", v -> opts.inMemoryCF = v);
2857    flagHandlers.put("autoFlush", v -> opts.autoFlush = v);
2858    flagHandlers.put("oneCon", v -> opts.oneCon = v);
2859    flagHandlers.put("latency", v -> opts.reportLatency = v);
2860    flagHandlers.put("usetags", v -> opts.useTags = v);
2861    flagHandlers.put("filterAll", v -> opts.filterAll = v);
2862    flagHandlers.put("valueRandom", v -> opts.valueRandom = v);
2863    flagHandlers.put("valueZipf", v -> opts.valueZipf = v);
2864    flagHandlers.put("addColumns", v -> opts.addColumns = v);
2865    flagHandlers.put("asyncPrefetch", v -> opts.asyncPrefetch = v);
2866    flagHandlers.put("cacheBlocks", v -> opts.cacheBlocks = v);
2867
2868    final Map<String, Consumer<String>> handlers = new HashMap<>();
2869    handlers.put("rows", v -> opts.perClientRunRows = Long.parseLong(v));
2870    handlers.put("cycles", v -> opts.cycles = Integer.parseInt(v));
2871    handlers.put("sampleRate", v -> opts.sampleRate = Float.parseFloat(v));
2872    handlers.put("table", v -> opts.tableName = v);
2873    handlers.put("startRow", v -> opts.startRow = Long.parseLong(v));
2874    handlers.put("compress", v -> opts.compression = Compression.Algorithm.valueOf(v));
2875    handlers.put("encryption", v -> opts.encryption = v);
2876    handlers.put("traceRate", v -> opts.traceRate = Double.parseDouble(v));
2877    handlers.put("blockEncoding", v -> opts.blockEncoding = DataBlockEncoding.valueOf(v));
2878    handlers.put("presplit", v -> opts.presplitRegions = Integer.parseInt(v));
2879    handlers.put("connCount", v -> opts.connCount = Integer.parseInt(v));
2880    handlers.put("latencyThreshold", v -> opts.latencyThreshold = Integer.parseInt(v));
2881    handlers.put("multiGet", v -> opts.multiGet = Integer.parseInt(v));
2882    handlers.put("multiPut", v -> opts.multiPut = Integer.parseInt(v));
2883    handlers.put("numoftags", v -> opts.noOfTags = Integer.parseInt(v));
2884    handlers.put("replicas", v -> opts.replicas = Integer.parseInt(v));
2885    handlers.put("size", v -> {
2886      opts.size = Float.parseFloat(v);
2887      if (opts.size <= 1.0f) {
2888        throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2889      }
2890    });
2891    handlers.put("splitPolicy", v -> opts.splitPolicy = v);
2892    handlers.put("randomSleep", v -> opts.randomSleep = Integer.parseInt(v));
2893    handlers.put("measureAfter", v -> opts.measureAfter = Integer.parseInt(v));
2894    handlers.put("bloomFilter", v -> opts.bloomType = BloomType.valueOf(v));
2895    handlers.put("blockSize", v -> opts.blockSize = Integer.parseInt(v));
2896    handlers.put("valueSize", v -> opts.valueSize = Integer.parseInt(v));
2897    handlers.put("period", v -> opts.period = Integer.parseInt(v));
2898    handlers.put("inmemoryCompaction",
2899      v -> opts.inMemoryCompaction = MemoryCompactionPolicy.valueOf(v));
2900    handlers.put("columns", v -> opts.columns = Integer.parseInt(v));
2901    handlers.put("families", v -> opts.families = Integer.parseInt(v));
2902    handlers.put("caching", v -> opts.caching = Integer.parseInt(v));
2903    handlers.put("scanReadType", v -> opts.scanReadType = Scan.ReadType.valueOf(v.toUpperCase()));
2904    handlers.put("bufferSize", v -> opts.bufferSize = Long.parseLong(v));
2905    handlers.put("commandPropertiesFile", fileName -> {
2906      try {
2907        final Properties properties = new Properties();
2908        final InputStream resourceStream =
2909          PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName);
2910        if (resourceStream == null) {
2911          throw new IllegalArgumentException("Resource file not found: " + fileName);
2912        }
2913        properties.load(resourceStream);
2914        opts.commandProperties = properties;
2915      } catch (IOException e) {
2916        throw new IllegalStateException("Failed to load command properties from file: " + fileName,
2917          e);
2918      }
2919    });
2920
2921    String cmd = null;
2922    final Splitter splitter = Splitter.on("=").limit(2).trimResults();
2923    while ((cmd = args.poll()) != null) {
2924      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2925        // place item back onto queue so that caller knows parsing was incomplete
2926        args.add(cmd);
2927        break;
2928      }
2929
2930      if (cmd.startsWith("--")) {
2931        final List<String> parts = splitter.splitToList(cmd.substring(2));
2932        final String key = parts.get(0);
2933
2934        try {
2935          // Boolean options can be specified as --flag or --flag=true/false
2936          final Consumer<Boolean> flagHandler = flagHandlers.get(key);
2937          if (flagHandler != null) {
2938            flagHandler.accept(parts.size() > 1 ? parseBoolean(parts.get(1)) : true);
2939            continue;
2940          }
2941
2942          // Options that require a value followed by an equals sign
2943          final Consumer<String> handler = handlers.get(key);
2944          if (handler != null) {
2945            if (parts.size() < 2) {
2946              throw new IllegalArgumentException("--" + key + " requires a value");
2947            }
2948            handler.accept(parts.get(1));
2949            continue;
2950          }
2951        } catch (RuntimeException e) {
2952          throw new IllegalArgumentException("Invalid option: " + cmd, e);
2953        }
2954      }
2955
2956      if (isCommandClass(cmd)) {
2957        opts.cmdName = cmd;
2958        try {
2959          opts.numClientThreads = Integer.parseInt(args.remove());
2960        } catch (NoSuchElementException | NumberFormatException e) {
2961          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2962        }
2963        calculateRowsAndSize(opts);
2964        break;
2965      }
2966
2967      printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2968    }
2969
2970    validateParsedOpts(opts);
2971    return opts;
2972  }
2973
2974  // Boolean.parseBoolean is not strict enough for our needs, so we implement our own
2975  private static boolean parseBoolean(String value) {
2976    if ("true".equalsIgnoreCase(value)) {
2977      return true;
2978    }
2979    if ("false".equalsIgnoreCase(value)) {
2980      return false;
2981    }
2982    throw new IllegalArgumentException("Invalid boolean value: " + value);
2983  }
2984
2985  /**
2986   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
2987   */
2988  private static void validateParsedOpts(TestOptions opts) {
2989
2990    if (!opts.autoFlush && opts.multiPut > 0) {
2991      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2992    }
2993
2994    if (opts.oneCon && opts.connCount > 1) {
2995      throw new IllegalArgumentException(
2996        "oneCon is set to true, " + "connCount should not bigger than 1");
2997    }
2998
2999    if (opts.valueZipf && opts.valueRandom) {
3000      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3001    }
3002  }
3003
3004  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3005    int rowsPerGB = getRowsPerGB(opts);
3006    if (
3007      (opts.getCmdName() != null
3008        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3009        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3010    ) {
3011      opts.totalRows = (long) (opts.size * rowsPerGB);
3012    } else if (opts.size != DEFAULT_OPTS.size) {
3013      // total size in GB specified
3014      opts.totalRows = (long) (opts.size * rowsPerGB);
3015      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3016    } else {
3017      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3018      // Cast to float to ensure floating-point division
3019      opts.size = (float) opts.totalRows / rowsPerGB;
3020    }
3021    return opts;
3022  }
3023
3024  static int getRowsPerGB(final TestOptions opts) {
3025    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3026      * opts.getColumns());
3027  }
3028
3029  @Override
3030  public int run(String[] args) throws Exception {
3031    // Process command-line args. TODO: Better cmd-line processing
3032    // (but hopefully something not as painful as cli options).
3033    int errCode = -1;
3034    if (args.length < 1) {
3035      printUsage();
3036      return errCode;
3037    }
3038
3039    try {
3040      LinkedList<String> argv = new LinkedList<>();
3041      argv.addAll(Arrays.asList(args));
3042      TestOptions opts = parseOpts(argv);
3043
3044      // args remaining, print help and exit
3045      if (!argv.isEmpty()) {
3046        errCode = 0;
3047        printUsage();
3048        return errCode;
3049      }
3050
3051      // must run at least 1 client
3052      if (opts.numClientThreads <= 0) {
3053        throw new IllegalArgumentException("Number of clients must be > 0");
3054      }
3055
3056      // cmdName should not be null, print help and exit
3057      if (opts.cmdName == null) {
3058        printUsage();
3059        return errCode;
3060      }
3061
3062      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3063      if (cmdClass != null) {
3064        runTest(cmdClass, opts);
3065        errCode = 0;
3066      }
3067
3068    } catch (Exception e) {
3069      e.printStackTrace();
3070    }
3071
3072    return errCode;
3073  }
3074
3075  private static boolean isCommandClass(String cmd) {
3076    return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd);
3077  }
3078
3079  private static boolean isCustomTestClass(String cmd) {
3080    Class<? extends Test> cmdClass;
3081    try {
3082      cmdClass =
3083        (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd);
3084      addCommandDescriptor(cmdClass, cmd, "custom command");
3085      return true;
3086    } catch (Throwable th) {
3087      LOG.info("No class found for command: " + cmd, th);
3088      return false;
3089    }
3090  }
3091
3092  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3093    CmdDescriptor descriptor = COMMANDS.get(cmd);
3094    return descriptor != null ? descriptor.getCmdClass() : null;
3095  }
3096
3097  public static void main(final String[] args) throws Exception {
3098    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3099    System.exit(res);
3100  }
3101}