001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rest;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.io.PrintStream;
024import java.lang.reflect.Constructor;
025import java.text.SimpleDateFormat;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Date;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.TreeMap;
033import java.util.regex.Matcher;
034import java.util.regex.Pattern;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.conf.Configured;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.ArrayBackedTag;
042import org.apache.hadoop.hbase.CompareOperator;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HTableDescriptor;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.Tag;
050import org.apache.hadoop.hbase.client.BufferedMutator;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.Durability;
054import org.apache.hadoop.hbase.client.Get;
055import org.apache.hadoop.hbase.client.Put;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.Scan;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.filter.BinaryComparator;
061import org.apache.hadoop.hbase.filter.Filter;
062import org.apache.hadoop.hbase.filter.PageFilter;
063import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
064import org.apache.hadoop.hbase.filter.WhileMatchFilter;
065import org.apache.hadoop.hbase.io.compress.Compression;
066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
067import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
068import org.apache.hadoop.hbase.rest.client.Client;
069import org.apache.hadoop.hbase.rest.client.Cluster;
070import org.apache.hadoop.hbase.rest.client.RemoteAdmin;
071import org.apache.hadoop.hbase.util.ByteArrayHashKey;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.Hash;
075import org.apache.hadoop.hbase.util.MurmurHash;
076import org.apache.hadoop.hbase.util.Pair;
077import org.apache.hadoop.io.LongWritable;
078import org.apache.hadoop.io.NullWritable;
079import org.apache.hadoop.io.Text;
080import org.apache.hadoop.io.Writable;
081import org.apache.hadoop.mapreduce.InputSplit;
082import org.apache.hadoop.mapreduce.Job;
083import org.apache.hadoop.mapreduce.JobContext;
084import org.apache.hadoop.mapreduce.Mapper;
085import org.apache.hadoop.mapreduce.RecordReader;
086import org.apache.hadoop.mapreduce.TaskAttemptContext;
087import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
088import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
089import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
090import org.apache.hadoop.util.LineReader;
091import org.apache.hadoop.util.Tool;
092import org.apache.hadoop.util.ToolRunner;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096/**
097 * Script used evaluating Stargate performance and scalability. Runs a SG client that steps through
098 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
099 * etc.). Pass on the command-line which test to run and how many clients are participating in this
100 * experiment. Run <code>java PerformanceEvaluation --help</code> to obtain usage.
101 * <p>
102 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
103 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
104 * pages 8-10.
105 * <p>
106 * If number of clients > 1, we start up a MapReduce job. Each map task runs an individual client.
107 * Each client does about 1GB of data.
108 */
109public class PerformanceEvaluation extends Configured implements Tool {
110  protected static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class);
111
112  private static final int DEFAULT_ROW_PREFIX_LENGTH = 16;
113  private static final int ROW_LENGTH = 1000;
114  private static final int TAG_LENGTH = 256;
115  private static final int ONE_GB = 1024 * 1024 * 1000;
116  private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
117
118  public static final TableName TABLE_NAME = TableName.valueOf("TestTable");
119  public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
120  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
121  private TableName tableName = TABLE_NAME;
122
123  protected HTableDescriptor TABLE_DESCRIPTOR;
124  protected Map<String, CmdDescriptor> commands = new TreeMap<>();
125  protected static Cluster cluster = new Cluster();
126
127  volatile Configuration conf;
128  private boolean nomapred = false;
129  private int N = 1;
130  private int R = ROWS_PER_GB;
131  private Compression.Algorithm compression = Compression.Algorithm.NONE;
132  private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
133  private boolean flushCommits = true;
134  private boolean writeToWAL = true;
135  private boolean inMemoryCF = false;
136  private int presplitRegions = 0;
137  private boolean useTags = false;
138  private int noOfTags = 1;
139  private Connection connection;
140
141  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
142
143  /**
144   * Regex to parse lines in input file passed to mapreduce task.
145   */
146  public static final Pattern LINE_PATTERN = Pattern
147    .compile("tableName=(\\w+),\\s+" + "startRow=(\\d+),\\s+" + "perClientRunRows=(\\d+),\\s+"
148      + "totalRows=(\\d+),\\s+" + "clients=(\\d+),\\s+" + "flushCommits=(\\w+),\\s+"
149      + "writeToWAL=(\\w+),\\s+" + "useTags=(\\w+),\\s+" + "noOfTags=(\\d+)");
150
151  /**
152   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
153   * associated properties.
154   */
155  protected enum Counter {
156    /** elapsed time */
157    ELAPSED_TIME,
158    /** number of rows */
159    ROWS
160  }
161
162  /**
163   * Constructor
164   * @param c Configuration object
165   */
166  public PerformanceEvaluation(final Configuration c) {
167    this.conf = c;
168
169    addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test");
170    addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
171      "Run random seek and scan 100 test");
172    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
173      "Run random seek scan with both start and stop row (max 10 rows)");
174    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
175      "Run random seek scan with both start and stop row (max 100 rows)");
176    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
177      "Run random seek scan with both start and stop row (max 1000 rows)");
178    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
179      "Run random seek scan with both start and stop row (max 10000 rows)");
180    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
181    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
182    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
183    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
184    addCommandDescriptor(FilteredScanTest.class, "filterScan",
185      "Run scan test using a filter to find a specific row based "
186        + "on it's value (make sure to use --rows=20)");
187  }
188
189  protected void addCommandDescriptor(Class<? extends Test> cmdClass, String name,
190    String description) {
191    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
192    commands.put(name, cmdDescriptor);
193  }
194
195  /**
196   * Implementations can have their status set.
197   */
198  interface Status {
199    /**
200     * Sets status
201     * @param msg status message
202     * @throws IOException if setting the status fails
203     */
204    void setStatus(final String msg) throws IOException;
205  }
206
207  /**
208   * This class works as the InputSplit of Performance Evaluation MapReduce InputFormat, and the
209   * Record Value of RecordReader. Each map task will only read one record from a PeInputSplit, the
210   * record value is the PeInputSplit itself.
211   */
212  public static class PeInputSplit extends InputSplit implements Writable {
213    private TableName tableName;
214    private int startRow;
215    private int rows;
216    private int totalRows;
217    private int clients;
218    private boolean flushCommits;
219    private boolean writeToWAL;
220    private boolean useTags;
221    private int noOfTags;
222
223    public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients,
224      boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) {
225      this.tableName = tableName;
226      this.startRow = startRow;
227      this.rows = rows;
228      this.totalRows = totalRows;
229      this.clients = clients;
230      this.flushCommits = flushCommits;
231      this.writeToWAL = writeToWAL;
232      this.useTags = useTags;
233      this.noOfTags = noOfTags;
234    }
235
236    @Override
237    public void readFields(DataInput in) throws IOException {
238      int tableNameLen = in.readInt();
239      byte[] name = new byte[tableNameLen];
240      in.readFully(name);
241      this.tableName = TableName.valueOf(name);
242      this.startRow = in.readInt();
243      this.rows = in.readInt();
244      this.totalRows = in.readInt();
245      this.clients = in.readInt();
246      this.flushCommits = in.readBoolean();
247      this.writeToWAL = in.readBoolean();
248      this.useTags = in.readBoolean();
249      this.noOfTags = in.readInt();
250    }
251
252    @Override
253    public void write(DataOutput out) throws IOException {
254      byte[] name = this.tableName.toBytes();
255      out.writeInt(name.length);
256      out.write(name);
257      out.writeInt(startRow);
258      out.writeInt(rows);
259      out.writeInt(totalRows);
260      out.writeInt(clients);
261      out.writeBoolean(flushCommits);
262      out.writeBoolean(writeToWAL);
263      out.writeBoolean(useTags);
264      out.writeInt(noOfTags);
265    }
266
267    @Override
268    public long getLength() {
269      return 0;
270    }
271
272    @Override
273    public String[] getLocations() {
274      return new String[0];
275    }
276
277    public int getStartRow() {
278      return startRow;
279    }
280
281    public TableName getTableName() {
282      return tableName;
283    }
284
285    public int getRows() {
286      return rows;
287    }
288
289    public int getTotalRows() {
290      return totalRows;
291    }
292
293    public boolean isFlushCommits() {
294      return flushCommits;
295    }
296
297    public boolean isWriteToWAL() {
298      return writeToWAL;
299    }
300
301    public boolean isUseTags() {
302      return useTags;
303    }
304
305    public int getNoOfTags() {
306      return noOfTags;
307    }
308  }
309
310  /**
311   * InputFormat of Performance Evaluation MapReduce job. It extends from FileInputFormat, want to
312   * use it's methods such as setInputPaths().
313   */
314  public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
315    @Override
316    public List<InputSplit> getSplits(JobContext job) throws IOException {
317      // generate splits
318      List<InputSplit> splitList = new ArrayList<>();
319
320      for (FileStatus file : listStatus(job)) {
321        if (file.isDirectory()) {
322          continue;
323        }
324        Path path = file.getPath();
325        FileSystem fs = path.getFileSystem(job.getConfiguration());
326        FSDataInputStream fileIn = fs.open(path);
327        LineReader in = new LineReader(fileIn, job.getConfiguration());
328        int lineLen;
329        while (true) {
330          Text lineText = new Text();
331          lineLen = in.readLine(lineText);
332          if (lineLen <= 0) {
333            break;
334          }
335          Matcher m = LINE_PATTERN.matcher(lineText.toString());
336          if ((m != null) && m.matches()) {
337            TableName tableName = TableName.valueOf(m.group(1));
338            int startRow = Integer.parseInt(m.group(2));
339            int rows = Integer.parseInt(m.group(3));
340            int totalRows = Integer.parseInt(m.group(4));
341            int clients = Integer.parseInt(m.group(5));
342            boolean flushCommits = Boolean.parseBoolean(m.group(6));
343            boolean writeToWAL = Boolean.parseBoolean(m.group(7));
344            boolean useTags = Boolean.parseBoolean(m.group(8));
345            int noOfTags = Integer.parseInt(m.group(9));
346
347            LOG.debug("tableName=" + tableName + " split[" + splitList.size() + "] " + " startRow="
348              + startRow + " rows=" + rows + " totalRows=" + totalRows + " clients=" + clients
349              + " flushCommits=" + flushCommits + " writeToWAL=" + writeToWAL + " useTags="
350              + useTags + " noOfTags=" + noOfTags);
351
352            PeInputSplit newSplit = new PeInputSplit(tableName, startRow, rows, totalRows, clients,
353              flushCommits, writeToWAL, useTags, noOfTags);
354            splitList.add(newSplit);
355          }
356        }
357        in.close();
358      }
359
360      LOG.info("Total # of splits: " + splitList.size());
361      return splitList;
362    }
363
364    @Override
365    public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
366      TaskAttemptContext context) {
367      return new PeRecordReader();
368    }
369
370    public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
371      private boolean readOver = false;
372      private PeInputSplit split = null;
373      private NullWritable key = null;
374      private PeInputSplit value = null;
375
376      @Override
377      public void initialize(InputSplit split, TaskAttemptContext context) {
378        this.readOver = false;
379        this.split = (PeInputSplit) split;
380      }
381
382      @Override
383      public boolean nextKeyValue() {
384        if (readOver) {
385          return false;
386        }
387
388        key = NullWritable.get();
389        value = split;
390
391        readOver = true;
392        return true;
393      }
394
395      @Override
396      public NullWritable getCurrentKey() {
397        return key;
398      }
399
400      @Override
401      public PeInputSplit getCurrentValue() {
402        return value;
403      }
404
405      @Override
406      public float getProgress() {
407        if (readOver) {
408          return 1.0f;
409        } else {
410          return 0.0f;
411        }
412      }
413
414      @Override
415      public void close() {
416        // do nothing
417      }
418    }
419  }
420
421  /**
422   * MapReduce job that runs a performance evaluation client in each map task.
423   */
424  public static class EvaluationMapTask
425    extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
426
427    /** configuration parameter name that contains the command */
428    public final static String CMD_KEY = "EvaluationMapTask.command";
429    /** configuration parameter name that contains the PE impl */
430    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
431
432    private Class<? extends Test> cmd;
433    private PerformanceEvaluation pe;
434
435    @Override
436    protected void setup(Context context) {
437      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
438
439      // this is required so that extensions of PE are instantiated within the
440      // map reduce task...
441      Class<? extends PerformanceEvaluation> peClass =
442        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
443      try {
444        this.pe =
445          peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
446      } catch (Exception e) {
447        throw new IllegalStateException("Could not instantiate PE instance", e);
448      }
449    }
450
451    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
452      Class<? extends Type> clazz;
453      try {
454        clazz = Class.forName(className).asSubclass(type);
455      } catch (ClassNotFoundException e) {
456        throw new IllegalStateException("Could not find class for name: " + className, e);
457      }
458      return clazz;
459    }
460
461    @Override
462    protected void map(NullWritable key, PeInputSplit value, final Context context)
463      throws IOException, InterruptedException {
464      Status status = context::setStatus;
465
466      // Evaluation task
467      pe.tableName = value.getTableName();
468      long elapsedTime =
469        this.pe.runOneClient(this.cmd, value.getStartRow(), value.getRows(), value.getTotalRows(),
470          value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(),
471          ConnectionFactory.createConnection(context.getConfiguration()), status);
472      // Collect how much time the thing took. Report as map output and
473      // to the ELAPSED_TIME counter.
474      context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
475      context.getCounter(Counter.ROWS).increment(value.rows);
476      context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
477      context.progress();
478    }
479  }
480
481  /**
482   * If table does not already exist, create.
483   * @param admin Client to use checking.
484   * @return True if we created the table.
485   * @throws IOException if an operation on the table fails
486   */
487  private boolean checkTable(RemoteAdmin admin) throws IOException {
488    HTableDescriptor tableDescriptor = getTableDescriptor();
489    if (this.presplitRegions > 0) {
490      // presplit requested
491      if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) {
492        admin.deleteTable(tableDescriptor.getTableName().getName());
493      }
494
495      byte[][] splits = getSplits();
496      for (int i = 0; i < splits.length; i++) {
497        LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
498      }
499      admin.createTable(tableDescriptor);
500      LOG.info("Table created with " + this.presplitRegions + " splits");
501    } else {
502      boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName());
503      if (!tableExists) {
504        admin.createTable(tableDescriptor);
505        LOG.info("Table " + tableDescriptor + " created");
506      }
507    }
508
509    return admin.isTableAvailable(tableDescriptor.getTableName().getName());
510  }
511
512  protected HTableDescriptor getTableDescriptor() {
513    if (TABLE_DESCRIPTOR == null) {
514      TABLE_DESCRIPTOR = new HTableDescriptor(tableName);
515      HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
516      family.setDataBlockEncoding(blockEncoding);
517      family.setCompressionType(compression);
518      if (inMemoryCF) {
519        family.setInMemory(true);
520      }
521      TABLE_DESCRIPTOR.addFamily(family);
522    }
523    return TABLE_DESCRIPTOR;
524  }
525
526  /**
527   * Generates splits based on total number of rows and specified split regions
528   * @return splits : array of byte []
529   */
530  protected byte[][] getSplits() {
531    if (this.presplitRegions == 0) {
532      return new byte[0][];
533    }
534
535    int numSplitPoints = presplitRegions - 1;
536    byte[][] splits = new byte[numSplitPoints][];
537    int jump = this.R / this.presplitRegions;
538    for (int i = 0; i < numSplitPoints; i++) {
539      int rowkey = jump * (1 + i);
540      splits[i] = format(rowkey);
541    }
542    return splits;
543  }
544
545  /**
546   * We're to run multiple clients concurrently. Setup a mapreduce job. Run one map per client. Then
547   * run a single reduce to sum the elapsed times.
548   * @param cmd Command to run.
549   */
550  private void runNIsMoreThanOne(final Class<? extends Test> cmd)
551    throws IOException, InterruptedException, ClassNotFoundException {
552    RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf());
553    checkTable(remoteAdmin);
554    if (nomapred) {
555      doMultipleClients(cmd);
556    } else {
557      doMapReduce(cmd);
558    }
559  }
560
561  /**
562   * Run all clients in this vm each to its own thread.
563   * @param cmd Command to run
564   * @throws IOException if creating a connection fails
565   */
566  private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
567    final List<Thread> threads = new ArrayList<>(this.N);
568    final long[] timings = new long[this.N];
569    final int perClientRows = R / N;
570    final TableName tableName = this.tableName;
571    final DataBlockEncoding encoding = this.blockEncoding;
572    final boolean flushCommits = this.flushCommits;
573    final Compression.Algorithm compression = this.compression;
574    final boolean writeToWal = this.writeToWAL;
575    final int preSplitRegions = this.presplitRegions;
576    final boolean useTags = this.useTags;
577    final int numTags = this.noOfTags;
578    final Connection connection = ConnectionFactory.createConnection(getConf());
579    for (int i = 0; i < this.N; i++) {
580      final int index = i;
581      Thread t = new Thread("TestClient-" + i) {
582        @Override
583        public void run() {
584          super.run();
585          PerformanceEvaluation pe = new PerformanceEvaluation(getConf());
586          pe.tableName = tableName;
587          pe.blockEncoding = encoding;
588          pe.flushCommits = flushCommits;
589          pe.compression = compression;
590          pe.writeToWAL = writeToWal;
591          pe.presplitRegions = preSplitRegions;
592          pe.N = N;
593          pe.connection = connection;
594          pe.useTags = useTags;
595          pe.noOfTags = numTags;
596          try {
597            long elapsedTime = pe.runOneClient(cmd, index * perClientRows, perClientRows, R,
598              flushCommits, writeToWAL, useTags, noOfTags, connection,
599              msg -> LOG.info("client-" + getName() + " " + msg));
600            timings[index] = elapsedTime;
601            LOG.info("Finished " + getName() + " in " + elapsedTime + "ms writing " + perClientRows
602              + " rows");
603          } catch (IOException e) {
604            throw new RuntimeException(e);
605          }
606        }
607      };
608      threads.add(t);
609    }
610    for (Thread t : threads) {
611      t.start();
612    }
613    for (Thread t : threads) {
614      while (t.isAlive()) {
615        try {
616          t.join();
617        } catch (InterruptedException e) {
618          LOG.debug("Interrupted, continuing" + e.toString());
619        }
620      }
621    }
622    final String test = cmd.getSimpleName();
623    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(timings));
624    Arrays.sort(timings);
625    long total = 0;
626    for (int i = 0; i < this.N; i++) {
627      total += timings[i];
628    }
629    LOG.info("[" + test + "]" + "\tMin: " + timings[0] + "ms" + "\tMax: " + timings[this.N - 1]
630      + "ms" + "\tAvg: " + (total / this.N) + "ms");
631  }
632
633  /**
634   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
635   * out an input file with instruction per client regards which row they are to start on.
636   * @param cmd Command to run.
637   */
638  private void doMapReduce(final Class<? extends Test> cmd)
639    throws IOException, InterruptedException, ClassNotFoundException {
640    Configuration conf = getConf();
641    Path inputDir = writeInputFile(conf);
642    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
643    conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
644    Job job = Job.getInstance(conf);
645    job.setJarByClass(PerformanceEvaluation.class);
646    job.setJobName("HBase Performance Evaluation");
647
648    job.setInputFormatClass(PeInputFormat.class);
649    PeInputFormat.setInputPaths(job, inputDir);
650
651    job.setOutputKeyClass(LongWritable.class);
652    job.setOutputValueClass(LongWritable.class);
653
654    job.setMapperClass(EvaluationMapTask.class);
655    job.setReducerClass(LongSumReducer.class);
656    job.setNumReduceTasks(1);
657
658    job.setOutputFormatClass(TextOutputFormat.class);
659    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
660    TableMapReduceUtil.addDependencyJars(job);
661    TableMapReduceUtil.initCredentials(job);
662    job.waitForCompletion(true);
663  }
664
665  /**
666   * Write input file of offsets-per-client for the mapreduce job.
667   * @param c Configuration
668   * @return Directory that contains file written.
669   * @throws IOException if creating the directory or the file fails
670   */
671  private Path writeInputFile(final Configuration c) throws IOException {
672    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
673    Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
674    Path inputDir = new Path(jobdir, "inputs");
675
676    FileSystem fs = FileSystem.get(c);
677    fs.mkdirs(inputDir);
678    Path inputFile = new Path(inputDir, "input.txt");
679    // Make input random.
680    try (PrintStream out = new PrintStream(fs.create(inputFile))) {
681      Map<Integer, String> m = new TreeMap<>();
682      Hash h = MurmurHash.getInstance();
683      int perClientRows = (this.R / this.N);
684      for (int i = 0; i < 10; i++) {
685        for (int j = 0; j < N; j++) {
686          StringBuilder s = new StringBuilder();
687          s.append("tableName=").append(tableName);
688          s.append(", startRow=").append((j * perClientRows) + (i * (perClientRows / 10)));
689          s.append(", perClientRunRows=").append(perClientRows / 10);
690          s.append(", totalRows=").append(R);
691          s.append(", clients=").append(N);
692          s.append(", flushCommits=").append(flushCommits);
693          s.append(", writeToWAL=").append(writeToWAL);
694          s.append(", useTags=").append(useTags);
695          s.append(", noOfTags=").append(noOfTags);
696
697          byte[] b = Bytes.toBytes(s.toString());
698          int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
699          m.put(hash, s.toString());
700        }
701      }
702      for (Map.Entry<Integer, String> e : m.entrySet()) {
703        out.println(e.getValue());
704      }
705    }
706    return inputDir;
707  }
708
709  /**
710   * Describes a command.
711   */
712  static class CmdDescriptor {
713    private Class<? extends Test> cmdClass;
714    private String name;
715    private String description;
716
717    CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
718      this.cmdClass = cmdClass;
719      this.name = name;
720      this.description = description;
721    }
722
723    public Class<? extends Test> getCmdClass() {
724      return cmdClass;
725    }
726
727    public String getName() {
728      return name;
729    }
730
731    public String getDescription() {
732      return description;
733    }
734  }
735
736  /**
737   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation} tests This
738   * makes the reflection logic a little easier to understand...
739   */
740  static class TestOptions {
741    private int startRow;
742    private int perClientRunRows;
743    private int totalRows;
744    private TableName tableName;
745    private boolean flushCommits;
746    private boolean writeToWAL;
747    private boolean useTags;
748    private int noOfTags;
749    private Connection connection;
750
751    TestOptions(int startRow, int perClientRunRows, int totalRows, TableName tableName,
752      boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
753      Connection connection) {
754      this.startRow = startRow;
755      this.perClientRunRows = perClientRunRows;
756      this.totalRows = totalRows;
757      this.tableName = tableName;
758      this.flushCommits = flushCommits;
759      this.writeToWAL = writeToWAL;
760      this.useTags = useTags;
761      this.noOfTags = noOfTags;
762      this.connection = connection;
763    }
764
765    public int getStartRow() {
766      return startRow;
767    }
768
769    public int getPerClientRunRows() {
770      return perClientRunRows;
771    }
772
773    public int getTotalRows() {
774      return totalRows;
775    }
776
777    public TableName getTableName() {
778      return tableName;
779    }
780
781    public boolean isFlushCommits() {
782      return flushCommits;
783    }
784
785    public boolean isWriteToWAL() {
786      return writeToWAL;
787    }
788
789    public Connection getConnection() {
790      return connection;
791    }
792
793    public boolean isUseTags() {
794      return this.useTags;
795    }
796
797    public int getNumTags() {
798      return this.noOfTags;
799    }
800  }
801
802  /*
803   * A test. Subclass to particularize what happens per row.
804   */
805  static abstract class Test {
806    // Below is make it so when Tests are all running in the one
807    // jvm, that they each have a differently seeded Random.
808    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
809
810    private static long nextRandomSeed() {
811      return randomSeed.nextLong();
812    }
813
814    protected final Random rand = new Random(nextRandomSeed());
815
816    protected final int startRow;
817    protected final int perClientRunRows;
818    protected final int totalRows;
819    private final Status status;
820    protected TableName tableName;
821    protected volatile Configuration conf;
822    protected boolean writeToWAL;
823    protected boolean useTags;
824    protected int noOfTags;
825    protected Connection connection;
826
827    /**
828     * Note that all subclasses of this class must provide a public contructor that has the exact
829     * same list of arguments.
830     */
831    Test(final Configuration conf, final TestOptions options, final Status status) {
832      super();
833      this.startRow = options.getStartRow();
834      this.perClientRunRows = options.getPerClientRunRows();
835      this.totalRows = options.getTotalRows();
836      this.status = status;
837      this.tableName = options.getTableName();
838      this.conf = conf;
839      this.writeToWAL = options.isWriteToWAL();
840      this.useTags = options.isUseTags();
841      this.noOfTags = options.getNumTags();
842      this.connection = options.getConnection();
843    }
844
845    protected String generateStatus(final int sr, final int i, final int lr) {
846      return sr + "/" + i + "/" + lr;
847    }
848
849    protected int getReportingPeriod() {
850      int period = this.perClientRunRows / 10;
851      return period == 0 ? this.perClientRunRows : period;
852    }
853
854    abstract void testTakedown() throws IOException;
855
856    /**
857     * Run test
858     * @return Elapsed time.
859     * @throws IOException if something in the test fails
860     */
861    long test() throws IOException {
862      testSetup();
863      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
864      final long startTime = System.nanoTime();
865      try {
866        testTimed();
867      } finally {
868        testTakedown();
869      }
870      return (System.nanoTime() - startTime) / 1000000;
871    }
872
873    abstract void testSetup() throws IOException;
874
875    /**
876     * Provides an extension point for tests that don't want a per row invocation.
877     */
878    void testTimed() throws IOException {
879      int lastRow = this.startRow + this.perClientRunRows;
880      // Report on completion of 1/10th of total.
881      for (int i = this.startRow; i < lastRow; i++) {
882        testRow(i);
883        if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
884          status.setStatus(generateStatus(this.startRow, i, lastRow));
885        }
886      }
887    }
888
889    /**
890     * Test for individual row.
891     * @param i Row index.
892     */
893    abstract void testRow(final int i) throws IOException;
894  }
895
896  static abstract class TableTest extends Test {
897    protected Table table;
898
899    public TableTest(Configuration conf, TestOptions options, Status status) {
900      super(conf, options, status);
901    }
902
903    @Override
904    void testSetup() throws IOException {
905      this.table = connection.getTable(tableName);
906    }
907
908    @Override
909    void testTakedown() throws IOException {
910      table.close();
911    }
912  }
913
914  static abstract class BufferedMutatorTest extends Test {
915    protected BufferedMutator mutator;
916    protected boolean flushCommits;
917
918    public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) {
919      super(conf, options, status);
920      this.flushCommits = options.isFlushCommits();
921    }
922
923    @Override
924    void testSetup() throws IOException {
925      this.mutator = connection.getBufferedMutator(tableName);
926    }
927
928    @Override
929    void testTakedown() throws IOException {
930      if (flushCommits) {
931        this.mutator.flush();
932      }
933      mutator.close();
934    }
935  }
936
937  static class RandomSeekScanTest extends TableTest {
938    RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
939      super(conf, options, status);
940    }
941
942    @Override
943    void testRow(final int i) throws IOException {
944      Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
945      scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
946      scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
947      ResultScanner s = this.table.getScanner(scan);
948      s.close();
949    }
950
951    @Override
952    protected int getReportingPeriod() {
953      int period = this.perClientRunRows / 100;
954      return period == 0 ? this.perClientRunRows : period;
955    }
956  }
957
958  @SuppressWarnings("unused")
959  static abstract class RandomScanWithRangeTest extends TableTest {
960    RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
961      super(conf, options, status);
962    }
963
964    @Override
965    void testRow(final int i) throws IOException {
966      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
967      Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
968      scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
969      ResultScanner s = this.table.getScanner(scan);
970      int count = 0;
971      for (Result rr = null; (rr = s.next()) != null;) {
972        count++;
973      }
974
975      if (i % 100 == 0) {
976        LOG.info(String.format("Scan for key range %s - %s returned %s rows",
977          Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
978          count));
979      }
980
981      s.close();
982    }
983
984    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
985
986    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
987      int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
988      int stop = start + maxRange;
989      return new Pair<>(format(start), format(stop));
990    }
991
992    @Override
993    protected int getReportingPeriod() {
994      int period = this.perClientRunRows / 100;
995      return period == 0 ? this.perClientRunRows : period;
996    }
997  }
998
999  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1000    RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
1001      super(conf, options, status);
1002    }
1003
1004    @Override
1005    protected Pair<byte[], byte[]> getStartAndStopRow() {
1006      return generateStartAndStopRows(10);
1007    }
1008  }
1009
1010  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1011    RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
1012      super(conf, options, status);
1013    }
1014
1015    @Override
1016    protected Pair<byte[], byte[]> getStartAndStopRow() {
1017      return generateStartAndStopRows(100);
1018    }
1019  }
1020
1021  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1022    RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
1023      super(conf, options, status);
1024    }
1025
1026    @Override
1027    protected Pair<byte[], byte[]> getStartAndStopRow() {
1028      return generateStartAndStopRows(1000);
1029    }
1030  }
1031
1032  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1033    RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
1034      super(conf, options, status);
1035    }
1036
1037    @Override
1038    protected Pair<byte[], byte[]> getStartAndStopRow() {
1039      return generateStartAndStopRows(10000);
1040    }
1041  }
1042
1043  static class RandomReadTest extends TableTest {
1044    RandomReadTest(Configuration conf, TestOptions options, Status status) {
1045      super(conf, options, status);
1046    }
1047
1048    @Override
1049    void testRow(final int i) throws IOException {
1050      Get get = new Get(getRandomRow(this.rand, this.totalRows));
1051      get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1052      this.table.get(get);
1053    }
1054
1055    @Override
1056    protected int getReportingPeriod() {
1057      int period = this.perClientRunRows / 100;
1058      return period == 0 ? this.perClientRunRows : period;
1059    }
1060  }
1061
1062  static class RandomWriteTest extends BufferedMutatorTest {
1063    RandomWriteTest(Configuration conf, TestOptions options, Status status) {
1064      super(conf, options, status);
1065    }
1066
1067    @Override
1068    void testRow(final int i) throws IOException {
1069      byte[] row = getRandomRow(this.rand, this.totalRows);
1070      Put put = new Put(row);
1071      byte[] value = generateData(this.rand, ROW_LENGTH);
1072      if (useTags) {
1073        byte[] tag = generateData(this.rand, TAG_LENGTH);
1074        Tag[] tags = new Tag[noOfTags];
1075        for (int n = 0; n < noOfTags; n++) {
1076          Tag t = new ArrayBackedTag((byte) n, tag);
1077          tags[n] = t;
1078        }
1079        KeyValue kv =
1080          new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags);
1081        put.add(kv);
1082      } else {
1083        put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value);
1084      }
1085      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1086      mutator.mutate(put);
1087    }
1088  }
1089
1090  static class ScanTest extends TableTest {
1091    private ResultScanner testScanner;
1092
1093    ScanTest(Configuration conf, TestOptions options, Status status) {
1094      super(conf, options, status);
1095    }
1096
1097    @Override
1098    void testTakedown() throws IOException {
1099      if (this.testScanner != null) {
1100        this.testScanner.close();
1101      }
1102      super.testTakedown();
1103    }
1104
1105    @Override
1106    void testRow(final int i) throws IOException {
1107      if (this.testScanner == null) {
1108        Scan scan = new Scan(format(this.startRow));
1109        scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1110        this.testScanner = table.getScanner(scan);
1111      }
1112      testScanner.next();
1113    }
1114  }
1115
1116  static class SequentialReadTest extends TableTest {
1117    SequentialReadTest(Configuration conf, TestOptions options, Status status) {
1118      super(conf, options, status);
1119    }
1120
1121    @Override
1122    void testRow(final int i) throws IOException {
1123      Get get = new Get(format(i));
1124      get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1125      table.get(get);
1126    }
1127  }
1128
1129  static class SequentialWriteTest extends BufferedMutatorTest {
1130    SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
1131      super(conf, options, status);
1132    }
1133
1134    @Override
1135    void testRow(final int i) throws IOException {
1136      byte[] row = format(i);
1137      Put put = new Put(row);
1138      byte[] value = generateData(this.rand, ROW_LENGTH);
1139      if (useTags) {
1140        byte[] tag = generateData(this.rand, TAG_LENGTH);
1141        Tag[] tags = new Tag[noOfTags];
1142        for (int n = 0; n < noOfTags; n++) {
1143          Tag t = new ArrayBackedTag((byte) n, tag);
1144          tags[n] = t;
1145        }
1146        KeyValue kv =
1147          new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags);
1148        put.add(kv);
1149      } else {
1150        put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value);
1151      }
1152      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1153      mutator.mutate(put);
1154    }
1155  }
1156
1157  static class FilteredScanTest extends TableTest {
1158    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
1159
1160    FilteredScanTest(Configuration conf, TestOptions options, Status status) {
1161      super(conf, options, status);
1162    }
1163
1164    @Override
1165    void testRow(int i) throws IOException {
1166      byte[] value = generateValue(this.rand);
1167      Scan scan = constructScan(value);
1168      try (ResultScanner scanner = this.table.getScanner(scan)) {
1169        while (scanner.next() != null) {
1170        }
1171      }
1172    }
1173
1174    protected Scan constructScan(byte[] valuePrefix) {
1175      Filter filter = new SingleColumnValueFilter(FAMILY_NAME, QUALIFIER_NAME,
1176        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
1177      Scan scan = new Scan();
1178      scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1179      scan.setFilter(filter);
1180      return scan;
1181    }
1182  }
1183
1184  /**
1185   * Format passed integer.
1186   * @param number the integer to format
1187   * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in
1188   *         case number is negative).
1189   */
1190  public static byte[] format(final int number) {
1191    byte[] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10];
1192    int d = Math.abs(number);
1193    for (int i = b.length - 1; i >= 0; i--) {
1194      b[i] = (byte) ((d % 10) + '0');
1195      d /= 10;
1196    }
1197    return b;
1198  }
1199
1200  public static byte[] generateData(final Random r, int length) {
1201    byte[] b = new byte[length];
1202    int i;
1203
1204    for (i = 0; i < (length - 8); i += 8) {
1205      b[i] = (byte) (65 + r.nextInt(26));
1206      b[i + 1] = b[i];
1207      b[i + 2] = b[i];
1208      b[i + 3] = b[i];
1209      b[i + 4] = b[i];
1210      b[i + 5] = b[i];
1211      b[i + 6] = b[i];
1212      b[i + 7] = b[i];
1213    }
1214
1215    byte a = (byte) (65 + r.nextInt(26));
1216    for (; i < length; i++) {
1217      b[i] = a;
1218    }
1219    return b;
1220  }
1221
1222  public static byte[] generateValue(final Random r) {
1223    byte[] b = new byte[ROW_LENGTH];
1224    r.nextBytes(b);
1225    return b;
1226  }
1227
1228  static byte[] getRandomRow(final Random random, final int totalRows) {
1229    return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1230  }
1231
1232  long runOneClient(final Class<? extends Test> cmd, final int startRow, final int perClientRunRows,
1233    final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
1234    Connection connection, final Status status) throws IOException {
1235    status
1236      .setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows");
1237    long totalElapsedTime;
1238
1239    TestOptions options = new TestOptions(startRow, perClientRunRows, totalRows, tableName,
1240      flushCommits, writeToWAL, useTags, noOfTags, connection);
1241    final Test t;
1242    try {
1243      Constructor<? extends Test> constructor =
1244        cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
1245      t = constructor.newInstance(this.conf, options, status);
1246    } catch (NoSuchMethodException e) {
1247      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
1248        + ".  It does not provide a constructor as described by"
1249        + "the javadoc comment.  Available constructors are: "
1250        + Arrays.toString(cmd.getConstructors()));
1251    } catch (Exception e) {
1252      throw new IllegalStateException("Failed to construct command class", e);
1253    }
1254    totalElapsedTime = t.test();
1255
1256    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + startRow
1257      + " for " + perClientRunRows + " rows");
1258    return totalElapsedTime;
1259  }
1260
1261  private void runNIsOne(final Class<? extends Test> cmd) {
1262    Status status = LOG::info;
1263
1264    RemoteAdmin admin;
1265    try {
1266      Client client = new Client(cluster);
1267      admin = new RemoteAdmin(client, getConf());
1268      checkTable(admin);
1269      runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, this.useTags,
1270        this.noOfTags, this.connection, status);
1271    } catch (Exception e) {
1272      LOG.error("Failed", e);
1273    }
1274  }
1275
1276  private void runTest(final Class<? extends Test> cmd)
1277    throws IOException, InterruptedException, ClassNotFoundException {
1278    if (N == 1) {
1279      // If there is only one client and one HRegionServer, we assume nothing
1280      // has been set up at all.
1281      runNIsOne(cmd);
1282    } else {
1283      // Else, run
1284      runNIsMoreThanOne(cmd);
1285    }
1286  }
1287
1288  protected void printUsage() {
1289    printUsage(null);
1290  }
1291
1292  protected void printUsage(final String message) {
1293    if (message != null && message.length() > 0) {
1294      System.err.println(message);
1295    }
1296    System.err.println("Usage: java " + this.getClass().getName() + " \\");
1297    System.err.println("  [--nomapred] [--rows=ROWS] [--table=NAME] \\");
1298    System.err.println(
1299      "  [--compress=TYPE] [--blockEncoding=TYPE] " + "[-D<property=value>]* <command> <nclients>");
1300    System.err.println();
1301    System.err.println("General Options:");
1302    System.err.println(
1303      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
1304    System.err.println(" rows            Rows each client runs. Default: One million");
1305    System.err.println();
1306    System.err.println("Table Creation / Write Tests:");
1307    System.err.println(" table           Alternate table name. Default: 'TestTable'");
1308    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
1309    System.err.println(
1310      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
1311    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
1312    System.err.println(" presplit        Create presplit table. Recommended for accurate perf "
1313      + "analysis (see guide).  Default: disabled");
1314    System.err.println(
1315      " usetags         Writes tags along with KVs.  Use with HFile V3. " + "Default : false");
1316    System.err.println(" numoftags        Specify the no of tags that would be needed. "
1317      + "This works only if usetags is true.");
1318    System.err.println();
1319    System.err.println("Read Tests:");
1320    System.err.println(" inmemory        Tries to keep the HFiles of the CF inmemory as far as "
1321      + "possible.  Not guaranteed that reads are always served from inmemory.  Default: false");
1322    System.err.println();
1323    System.err.println(" Note: -D properties will be applied to the conf used. ");
1324    System.err.println("  For example: ");
1325    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
1326    System.err.println("   -Dmapreduce.task.timeout=60000");
1327    System.err.println();
1328    System.err.println("Command:");
1329    for (CmdDescriptor command : commands.values()) {
1330      System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1331    }
1332    System.err.println();
1333    System.err.println("Args:");
1334    System.err.println(
1335      " nclients      Integer. Required. Total number of " + "clients (and HRegionServers)");
1336    System.err.println("               running: 1 <= value <= 500");
1337    System.err.println("Examples:");
1338    System.err.println(" To run a single evaluation client:");
1339    System.err.println(" $ hbase " + this.getClass().getName() + " sequentialWrite 1");
1340  }
1341
1342  private void getArgs(final int start, final String[] args) {
1343    if (start + 1 > args.length) {
1344      throw new IllegalArgumentException("must supply the number of clients");
1345    }
1346    N = Integer.parseInt(args[start]);
1347    if (N < 1) {
1348      throw new IllegalArgumentException("Number of clients must be > 1");
1349    }
1350    // Set total number of rows to write.
1351    R = R * N;
1352  }
1353
1354  @Override
1355  public int run(String[] args) throws Exception {
1356    // Process command-line args. TODO: Better cmd-line processing
1357    // (but hopefully something not as painful as cli options).
1358    int errCode = -1;
1359    if (args.length < 1) {
1360      printUsage();
1361      return errCode;
1362    }
1363
1364    try {
1365      for (int i = 0; i < args.length; i++) {
1366        String cmd = args[i];
1367        if (cmd.equals("-h") || cmd.startsWith("--h")) {
1368          printUsage();
1369          errCode = 0;
1370          break;
1371        }
1372
1373        final String nmr = "--nomapred";
1374        if (cmd.startsWith(nmr)) {
1375          nomapred = true;
1376          continue;
1377        }
1378
1379        final String rows = "--rows=";
1380        if (cmd.startsWith(rows)) {
1381          R = Integer.parseInt(cmd.substring(rows.length()));
1382          continue;
1383        }
1384
1385        final String table = "--table=";
1386        if (cmd.startsWith(table)) {
1387          this.tableName = TableName.valueOf(cmd.substring(table.length()));
1388          continue;
1389        }
1390
1391        final String compress = "--compress=";
1392        if (cmd.startsWith(compress)) {
1393          this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
1394          continue;
1395        }
1396
1397        final String blockEncoding = "--blockEncoding=";
1398        if (cmd.startsWith(blockEncoding)) {
1399          this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
1400          continue;
1401        }
1402
1403        final String flushCommits = "--flushCommits=";
1404        if (cmd.startsWith(flushCommits)) {
1405          this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1406          continue;
1407        }
1408
1409        final String writeToWAL = "--writeToWAL=";
1410        if (cmd.startsWith(writeToWAL)) {
1411          this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1412          continue;
1413        }
1414
1415        final String presplit = "--presplit=";
1416        if (cmd.startsWith(presplit)) {
1417          this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
1418          continue;
1419        }
1420
1421        final String inMemory = "--inmemory=";
1422        if (cmd.startsWith(inMemory)) {
1423          this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
1424          continue;
1425        }
1426
1427        this.connection = ConnectionFactory.createConnection(getConf());
1428
1429        final String useTags = "--usetags=";
1430        if (cmd.startsWith(useTags)) {
1431          this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
1432          continue;
1433        }
1434
1435        final String noOfTags = "--nooftags=";
1436        if (cmd.startsWith(noOfTags)) {
1437          this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
1438          continue;
1439        }
1440
1441        final String host = "--host=";
1442        if (cmd.startsWith(host)) {
1443          cluster.add(cmd.substring(host.length()));
1444          continue;
1445        }
1446
1447        Class<? extends Test> cmdClass = determineCommandClass(cmd);
1448        if (cmdClass != null) {
1449          getArgs(i + 1, args);
1450          if (cluster.isEmpty()) {
1451            String s = conf.get("stargate.hostname", "localhost");
1452            if (s.contains(":")) {
1453              cluster.add(s);
1454            } else {
1455              cluster.add(s, conf.getInt("stargate.port", 8080));
1456            }
1457          }
1458          runTest(cmdClass);
1459          errCode = 0;
1460          break;
1461        }
1462
1463        printUsage();
1464        break;
1465      }
1466    } catch (Exception e) {
1467      LOG.error("Failed", e);
1468    }
1469
1470    return errCode;
1471  }
1472
1473  private Class<? extends Test> determineCommandClass(String cmd) {
1474    CmdDescriptor descriptor = commands.get(cmd);
1475    return descriptor != null ? descriptor.getCmdClass() : null;
1476  }
1477
1478  public static void main(final String[] args) throws Exception {
1479    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
1480    System.exit(res);
1481  }
1482}