001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rest;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.io.PrintStream;
024import java.lang.reflect.Constructor;
025import java.text.SimpleDateFormat;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Date;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.TreeMap;
033import java.util.regex.Matcher;
034import java.util.regex.Pattern;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.conf.Configured;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.ArrayBackedTag;
042import org.apache.hadoop.hbase.CompareOperator;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.Tag;
048import org.apache.hadoop.hbase.client.BufferedMutator;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Connection;
051import org.apache.hadoop.hbase.client.ConnectionFactory;
052import org.apache.hadoop.hbase.client.Durability;
053import org.apache.hadoop.hbase.client.Get;
054import org.apache.hadoop.hbase.client.Put;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
061import org.apache.hadoop.hbase.filter.BinaryComparator;
062import org.apache.hadoop.hbase.filter.Filter;
063import org.apache.hadoop.hbase.filter.PageFilter;
064import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
065import org.apache.hadoop.hbase.filter.WhileMatchFilter;
066import org.apache.hadoop.hbase.io.compress.Compression;
067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
068import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
069import org.apache.hadoop.hbase.rest.client.Client;
070import org.apache.hadoop.hbase.rest.client.Cluster;
071import org.apache.hadoop.hbase.rest.client.RemoteAdmin;
072import org.apache.hadoop.hbase.util.ByteArrayHashKey;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
075import org.apache.hadoop.hbase.util.Hash;
076import org.apache.hadoop.hbase.util.MurmurHash;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.io.LongWritable;
079import org.apache.hadoop.io.NullWritable;
080import org.apache.hadoop.io.Text;
081import org.apache.hadoop.io.Writable;
082import org.apache.hadoop.mapreduce.InputSplit;
083import org.apache.hadoop.mapreduce.Job;
084import org.apache.hadoop.mapreduce.JobContext;
085import org.apache.hadoop.mapreduce.Mapper;
086import org.apache.hadoop.mapreduce.RecordReader;
087import org.apache.hadoop.mapreduce.TaskAttemptContext;
088import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
089import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
090import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
091import org.apache.hadoop.util.LineReader;
092import org.apache.hadoop.util.Tool;
093import org.apache.hadoop.util.ToolRunner;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * Script used evaluating Stargate performance and scalability. Runs a SG client that steps through
099 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
100 * etc.). Pass on the command-line which test to run and how many clients are participating in this
101 * experiment. Run <code>java PerformanceEvaluation --help</code> to obtain usage.
102 * <p>
103 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
104 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
105 * pages 8-10.
106 * <p>
107 * If number of clients > 1, we start up a MapReduce job. Each map task runs an individual client.
108 * Each client does about 1GB of data.
109 */
110public class PerformanceEvaluation extends Configured implements Tool {
111  protected static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class);
112
113  private static final int DEFAULT_ROW_PREFIX_LENGTH = 16;
114  private static final int ROW_LENGTH = 1000;
115  private static final int TAG_LENGTH = 256;
116  private static final int ONE_GB = 1024 * 1024 * 1000;
117  private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
118
119  public static final TableName TABLE_NAME = TableName.valueOf("TestTable");
120  public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
121  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
122  private TableName tableName = TABLE_NAME;
123
124  protected TableDescriptor TABLE_DESCRIPTOR;
125  protected Map<String, CmdDescriptor> commands = new TreeMap<>();
126  protected static Cluster cluster = new Cluster();
127
128  volatile Configuration conf;
129  private boolean nomapred = false;
130  private int N = 1;
131  private int R = ROWS_PER_GB;
132  private Compression.Algorithm compression = Compression.Algorithm.NONE;
133  private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
134  private boolean flushCommits = true;
135  private boolean writeToWAL = true;
136  private boolean inMemoryCF = false;
137  private int presplitRegions = 0;
138  private boolean useTags = false;
139  private int noOfTags = 1;
140  private Connection connection;
141
142  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
143
144  /**
145   * Regex to parse lines in input file passed to mapreduce task.
146   */
147  public static final Pattern LINE_PATTERN = Pattern
148    .compile("tableName=(\\w+),\\s+" + "startRow=(\\d+),\\s+" + "perClientRunRows=(\\d+),\\s+"
149      + "totalRows=(\\d+),\\s+" + "clients=(\\d+),\\s+" + "flushCommits=(\\w+),\\s+"
150      + "writeToWAL=(\\w+),\\s+" + "useTags=(\\w+),\\s+" + "noOfTags=(\\d+)");
151
152  /**
153   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
154   * associated properties.
155   */
156  protected enum Counter {
157    /** elapsed time */
158    ELAPSED_TIME,
159    /** number of rows */
160    ROWS
161  }
162
163  /**
164   * Constructor
165   * @param c Configuration object
166   */
167  public PerformanceEvaluation(final Configuration c) {
168    this.conf = c;
169
170    addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test");
171    addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
172      "Run random seek and scan 100 test");
173    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
174      "Run random seek scan with both start and stop row (max 10 rows)");
175    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
176      "Run random seek scan with both start and stop row (max 100 rows)");
177    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
178      "Run random seek scan with both start and stop row (max 1000 rows)");
179    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
180      "Run random seek scan with both start and stop row (max 10000 rows)");
181    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
182    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
183    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
184    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
185    addCommandDescriptor(FilteredScanTest.class, "filterScan",
186      "Run scan test using a filter to find a specific row based "
187        + "on it's value (make sure to use --rows=20)");
188  }
189
190  protected void addCommandDescriptor(Class<? extends Test> cmdClass, String name,
191    String description) {
192    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
193    commands.put(name, cmdDescriptor);
194  }
195
196  /**
197   * Implementations can have their status set.
198   */
199  interface Status {
200    /**
201     * Sets status
202     * @param msg status message
203     * @throws IOException if setting the status fails
204     */
205    void setStatus(final String msg) throws IOException;
206  }
207
208  /**
209   * This class works as the InputSplit of Performance Evaluation MapReduce InputFormat, and the
210   * Record Value of RecordReader. Each map task will only read one record from a PeInputSplit, the
211   * record value is the PeInputSplit itself.
212   */
213  public static class PeInputSplit extends InputSplit implements Writable {
214    private TableName tableName;
215    private int startRow;
216    private int rows;
217    private int totalRows;
218    private int clients;
219    private boolean flushCommits;
220    private boolean writeToWAL;
221    private boolean useTags;
222    private int noOfTags;
223
224    public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients,
225      boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) {
226      this.tableName = tableName;
227      this.startRow = startRow;
228      this.rows = rows;
229      this.totalRows = totalRows;
230      this.clients = clients;
231      this.flushCommits = flushCommits;
232      this.writeToWAL = writeToWAL;
233      this.useTags = useTags;
234      this.noOfTags = noOfTags;
235    }
236
237    @Override
238    public void readFields(DataInput in) throws IOException {
239      int tableNameLen = in.readInt();
240      byte[] name = new byte[tableNameLen];
241      in.readFully(name);
242      this.tableName = TableName.valueOf(name);
243      this.startRow = in.readInt();
244      this.rows = in.readInt();
245      this.totalRows = in.readInt();
246      this.clients = in.readInt();
247      this.flushCommits = in.readBoolean();
248      this.writeToWAL = in.readBoolean();
249      this.useTags = in.readBoolean();
250      this.noOfTags = in.readInt();
251    }
252
253    @Override
254    public void write(DataOutput out) throws IOException {
255      byte[] name = this.tableName.toBytes();
256      out.writeInt(name.length);
257      out.write(name);
258      out.writeInt(startRow);
259      out.writeInt(rows);
260      out.writeInt(totalRows);
261      out.writeInt(clients);
262      out.writeBoolean(flushCommits);
263      out.writeBoolean(writeToWAL);
264      out.writeBoolean(useTags);
265      out.writeInt(noOfTags);
266    }
267
268    @Override
269    public long getLength() {
270      return 0;
271    }
272
273    @Override
274    public String[] getLocations() {
275      return new String[0];
276    }
277
278    public int getStartRow() {
279      return startRow;
280    }
281
282    public TableName getTableName() {
283      return tableName;
284    }
285
286    public int getRows() {
287      return rows;
288    }
289
290    public int getTotalRows() {
291      return totalRows;
292    }
293
294    public boolean isFlushCommits() {
295      return flushCommits;
296    }
297
298    public boolean isWriteToWAL() {
299      return writeToWAL;
300    }
301
302    public boolean isUseTags() {
303      return useTags;
304    }
305
306    public int getNoOfTags() {
307      return noOfTags;
308    }
309  }
310
311  /**
312   * InputFormat of Performance Evaluation MapReduce job. It extends from FileInputFormat, want to
313   * use it's methods such as setInputPaths().
314   */
315  public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
316    @Override
317    public List<InputSplit> getSplits(JobContext job) throws IOException {
318      // generate splits
319      List<InputSplit> splitList = new ArrayList<>();
320
321      for (FileStatus file : listStatus(job)) {
322        if (file.isDirectory()) {
323          continue;
324        }
325        Path path = file.getPath();
326        FileSystem fs = path.getFileSystem(job.getConfiguration());
327        FSDataInputStream fileIn = fs.open(path);
328        LineReader in = new LineReader(fileIn, job.getConfiguration());
329        int lineLen;
330        while (true) {
331          Text lineText = new Text();
332          lineLen = in.readLine(lineText);
333          if (lineLen <= 0) {
334            break;
335          }
336          Matcher m = LINE_PATTERN.matcher(lineText.toString());
337          if ((m != null) && m.matches()) {
338            TableName tableName = TableName.valueOf(m.group(1));
339            int startRow = Integer.parseInt(m.group(2));
340            int rows = Integer.parseInt(m.group(3));
341            int totalRows = Integer.parseInt(m.group(4));
342            int clients = Integer.parseInt(m.group(5));
343            boolean flushCommits = Boolean.parseBoolean(m.group(6));
344            boolean writeToWAL = Boolean.parseBoolean(m.group(7));
345            boolean useTags = Boolean.parseBoolean(m.group(8));
346            int noOfTags = Integer.parseInt(m.group(9));
347
348            LOG.debug("tableName=" + tableName + " split[" + splitList.size() + "] " + " startRow="
349              + startRow + " rows=" + rows + " totalRows=" + totalRows + " clients=" + clients
350              + " flushCommits=" + flushCommits + " writeToWAL=" + writeToWAL + " useTags="
351              + useTags + " noOfTags=" + noOfTags);
352
353            PeInputSplit newSplit = new PeInputSplit(tableName, startRow, rows, totalRows, clients,
354              flushCommits, writeToWAL, useTags, noOfTags);
355            splitList.add(newSplit);
356          }
357        }
358        in.close();
359      }
360
361      LOG.info("Total # of splits: " + splitList.size());
362      return splitList;
363    }
364
365    @Override
366    public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
367      TaskAttemptContext context) {
368      return new PeRecordReader();
369    }
370
371    public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
372      private boolean readOver = false;
373      private PeInputSplit split = null;
374      private NullWritable key = null;
375      private PeInputSplit value = null;
376
377      @Override
378      public void initialize(InputSplit split, TaskAttemptContext context) {
379        this.readOver = false;
380        this.split = (PeInputSplit) split;
381      }
382
383      @Override
384      public boolean nextKeyValue() {
385        if (readOver) {
386          return false;
387        }
388
389        key = NullWritable.get();
390        value = split;
391
392        readOver = true;
393        return true;
394      }
395
396      @Override
397      public NullWritable getCurrentKey() {
398        return key;
399      }
400
401      @Override
402      public PeInputSplit getCurrentValue() {
403        return value;
404      }
405
406      @Override
407      public float getProgress() {
408        if (readOver) {
409          return 1.0f;
410        } else {
411          return 0.0f;
412        }
413      }
414
415      @Override
416      public void close() {
417        // do nothing
418      }
419    }
420  }
421
422  /**
423   * MapReduce job that runs a performance evaluation client in each map task.
424   */
425  public static class EvaluationMapTask
426    extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
427
428    /** configuration parameter name that contains the command */
429    public final static String CMD_KEY = "EvaluationMapTask.command";
430    /** configuration parameter name that contains the PE impl */
431    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
432
433    private Class<? extends Test> cmd;
434    private PerformanceEvaluation pe;
435
436    @Override
437    protected void setup(Context context) {
438      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
439
440      // this is required so that extensions of PE are instantiated within the
441      // map reduce task...
442      Class<? extends PerformanceEvaluation> peClass =
443        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
444      try {
445        this.pe =
446          peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
447      } catch (Exception e) {
448        throw new IllegalStateException("Could not instantiate PE instance", e);
449      }
450    }
451
452    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
453      Class<? extends Type> clazz;
454      try {
455        clazz = Class.forName(className).asSubclass(type);
456      } catch (ClassNotFoundException e) {
457        throw new IllegalStateException("Could not find class for name: " + className, e);
458      }
459      return clazz;
460    }
461
462    @Override
463    protected void map(NullWritable key, PeInputSplit value, final Context context)
464      throws IOException, InterruptedException {
465      Status status = context::setStatus;
466
467      // Evaluation task
468      pe.tableName = value.getTableName();
469      long elapsedTime =
470        this.pe.runOneClient(this.cmd, value.getStartRow(), value.getRows(), value.getTotalRows(),
471          value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(),
472          ConnectionFactory.createConnection(context.getConfiguration()), status);
473      // Collect how much time the thing took. Report as map output and
474      // to the ELAPSED_TIME counter.
475      context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
476      context.getCounter(Counter.ROWS).increment(value.rows);
477      context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
478      context.progress();
479    }
480  }
481
482  /**
483   * If table does not already exist, create.
484   * @param admin Client to use checking.
485   * @return True if we created the table.
486   * @throws IOException if an operation on the table fails
487   */
488  private boolean checkTable(RemoteAdmin admin) throws IOException {
489    TableDescriptor tableDescriptor = getDescriptor();
490    if (this.presplitRegions > 0) {
491      // presplit requested
492      if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) {
493        admin.deleteTable(tableDescriptor.getTableName().getName());
494      }
495
496      byte[][] splits = getSplits();
497      for (int i = 0; i < splits.length; i++) {
498        LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
499      }
500      admin.createTable(tableDescriptor);
501      LOG.info("Table created with " + this.presplitRegions + " splits");
502    } else {
503      boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName());
504      if (!tableExists) {
505        admin.createTable(tableDescriptor);
506        LOG.info("Table " + tableDescriptor + " created");
507      }
508    }
509
510    return admin.isTableAvailable(tableDescriptor.getTableName().getName());
511  }
512
513  protected TableDescriptor getDescriptor() {
514    if (TABLE_DESCRIPTOR == null) {
515      TABLE_DESCRIPTOR =
516        TableDescriptorBuilder.newBuilder(tableName)
517          .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME)
518            .setDataBlockEncoding(blockEncoding).setCompressionType(compression)
519            .setInMemory(inMemoryCF).build())
520          .build();
521    }
522    return TABLE_DESCRIPTOR;
523  }
524
525  /**
526   * Generates splits based on total number of rows and specified split regions
527   * @return splits : array of byte []
528   */
529  protected byte[][] getSplits() {
530    if (this.presplitRegions == 0) {
531      return new byte[0][];
532    }
533
534    int numSplitPoints = presplitRegions - 1;
535    byte[][] splits = new byte[numSplitPoints][];
536    int jump = this.R / this.presplitRegions;
537    for (int i = 0; i < numSplitPoints; i++) {
538      int rowkey = jump * (1 + i);
539      splits[i] = format(rowkey);
540    }
541    return splits;
542  }
543
544  /**
545   * We're to run multiple clients concurrently. Setup a mapreduce job. Run one map per client. Then
546   * run a single reduce to sum the elapsed times.
547   * @param cmd Command to run.
548   */
549  private void runNIsMoreThanOne(final Class<? extends Test> cmd)
550    throws IOException, InterruptedException, ClassNotFoundException {
551    RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf());
552    checkTable(remoteAdmin);
553    if (nomapred) {
554      doMultipleClients(cmd);
555    } else {
556      doMapReduce(cmd);
557    }
558  }
559
560  /**
561   * Run all clients in this vm each to its own thread.
562   * @param cmd Command to run
563   * @throws IOException if creating a connection fails
564   */
565  private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
566    final List<Thread> threads = new ArrayList<>(this.N);
567    final long[] timings = new long[this.N];
568    final int perClientRows = R / N;
569    final TableName tableName = this.tableName;
570    final DataBlockEncoding encoding = this.blockEncoding;
571    final boolean flushCommits = this.flushCommits;
572    final Compression.Algorithm compression = this.compression;
573    final boolean writeToWal = this.writeToWAL;
574    final int preSplitRegions = this.presplitRegions;
575    final boolean useTags = this.useTags;
576    final int numTags = this.noOfTags;
577    final Connection connection = ConnectionFactory.createConnection(getConf());
578    for (int i = 0; i < this.N; i++) {
579      final int index = i;
580      Thread t = new Thread("TestClient-" + i) {
581        @Override
582        public void run() {
583          super.run();
584          PerformanceEvaluation pe = new PerformanceEvaluation(getConf());
585          pe.tableName = tableName;
586          pe.blockEncoding = encoding;
587          pe.flushCommits = flushCommits;
588          pe.compression = compression;
589          pe.writeToWAL = writeToWal;
590          pe.presplitRegions = preSplitRegions;
591          pe.N = N;
592          pe.connection = connection;
593          pe.useTags = useTags;
594          pe.noOfTags = numTags;
595          try {
596            long elapsedTime = pe.runOneClient(cmd, index * perClientRows, perClientRows, R,
597              flushCommits, writeToWAL, useTags, noOfTags, connection,
598              msg -> LOG.info("client-" + getName() + " " + msg));
599            timings[index] = elapsedTime;
600            LOG.info("Finished " + getName() + " in " + elapsedTime + "ms writing " + perClientRows
601              + " rows");
602          } catch (IOException e) {
603            throw new RuntimeException(e);
604          }
605        }
606      };
607      threads.add(t);
608    }
609    for (Thread t : threads) {
610      t.start();
611    }
612    for (Thread t : threads) {
613      while (t.isAlive()) {
614        try {
615          t.join();
616        } catch (InterruptedException e) {
617          LOG.debug("Interrupted, continuing" + e.toString());
618        }
619      }
620    }
621    final String test = cmd.getSimpleName();
622    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(timings));
623    Arrays.sort(timings);
624    long total = 0;
625    for (int i = 0; i < this.N; i++) {
626      total += timings[i];
627    }
628    LOG.info("[" + test + "]" + "\tMin: " + timings[0] + "ms" + "\tMax: " + timings[this.N - 1]
629      + "ms" + "\tAvg: " + (total / this.N) + "ms");
630  }
631
632  /**
633   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
634   * out an input file with instruction per client regards which row they are to start on.
635   * @param cmd Command to run.
636   */
637  private void doMapReduce(final Class<? extends Test> cmd)
638    throws IOException, InterruptedException, ClassNotFoundException {
639    Configuration conf = getConf();
640    Path inputDir = writeInputFile(conf);
641    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
642    conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
643    Job job = Job.getInstance(conf);
644    job.setJarByClass(PerformanceEvaluation.class);
645    job.setJobName("HBase Performance Evaluation");
646
647    job.setInputFormatClass(PeInputFormat.class);
648    PeInputFormat.setInputPaths(job, inputDir);
649
650    job.setOutputKeyClass(LongWritable.class);
651    job.setOutputValueClass(LongWritable.class);
652
653    job.setMapperClass(EvaluationMapTask.class);
654    job.setReducerClass(LongSumReducer.class);
655    job.setNumReduceTasks(1);
656
657    job.setOutputFormatClass(TextOutputFormat.class);
658    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
659    TableMapReduceUtil.addDependencyJars(job);
660    TableMapReduceUtil.initCredentials(job);
661    job.waitForCompletion(true);
662  }
663
664  /**
665   * Write input file of offsets-per-client for the mapreduce job.
666   * @param c Configuration
667   * @return Directory that contains file written.
668   * @throws IOException if creating the directory or the file fails
669   */
670  private Path writeInputFile(final Configuration c) throws IOException {
671    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
672    Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
673    Path inputDir = new Path(jobdir, "inputs");
674
675    FileSystem fs = FileSystem.get(c);
676    fs.mkdirs(inputDir);
677    Path inputFile = new Path(inputDir, "input.txt");
678    // Make input random.
679    try (PrintStream out = new PrintStream(fs.create(inputFile))) {
680      Map<Integer, String> m = new TreeMap<>();
681      Hash h = MurmurHash.getInstance();
682      int perClientRows = (this.R / this.N);
683      for (int i = 0; i < 10; i++) {
684        for (int j = 0; j < N; j++) {
685          StringBuilder s = new StringBuilder();
686          s.append("tableName=").append(tableName);
687          s.append(", startRow=").append((j * perClientRows) + (i * (perClientRows / 10)));
688          s.append(", perClientRunRows=").append(perClientRows / 10);
689          s.append(", totalRows=").append(R);
690          s.append(", clients=").append(N);
691          s.append(", flushCommits=").append(flushCommits);
692          s.append(", writeToWAL=").append(writeToWAL);
693          s.append(", useTags=").append(useTags);
694          s.append(", noOfTags=").append(noOfTags);
695
696          byte[] b = Bytes.toBytes(s.toString());
697          int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
698          m.put(hash, s.toString());
699        }
700      }
701      for (Map.Entry<Integer, String> e : m.entrySet()) {
702        out.println(e.getValue());
703      }
704    }
705    return inputDir;
706  }
707
708  /**
709   * Describes a command.
710   */
711  static class CmdDescriptor {
712    private Class<? extends Test> cmdClass;
713    private String name;
714    private String description;
715
716    CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
717      this.cmdClass = cmdClass;
718      this.name = name;
719      this.description = description;
720    }
721
722    public Class<? extends Test> getCmdClass() {
723      return cmdClass;
724    }
725
726    public String getName() {
727      return name;
728    }
729
730    public String getDescription() {
731      return description;
732    }
733  }
734
735  /**
736   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation} tests This
737   * makes the reflection logic a little easier to understand...
738   */
739  static class TestOptions {
740    private int startRow;
741    private int perClientRunRows;
742    private int totalRows;
743    private TableName tableName;
744    private boolean flushCommits;
745    private boolean writeToWAL;
746    private boolean useTags;
747    private int noOfTags;
748    private Connection connection;
749
750    TestOptions(int startRow, int perClientRunRows, int totalRows, TableName tableName,
751      boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
752      Connection connection) {
753      this.startRow = startRow;
754      this.perClientRunRows = perClientRunRows;
755      this.totalRows = totalRows;
756      this.tableName = tableName;
757      this.flushCommits = flushCommits;
758      this.writeToWAL = writeToWAL;
759      this.useTags = useTags;
760      this.noOfTags = noOfTags;
761      this.connection = connection;
762    }
763
764    public int getStartRow() {
765      return startRow;
766    }
767
768    public int getPerClientRunRows() {
769      return perClientRunRows;
770    }
771
772    public int getTotalRows() {
773      return totalRows;
774    }
775
776    public TableName getTableName() {
777      return tableName;
778    }
779
780    public boolean isFlushCommits() {
781      return flushCommits;
782    }
783
784    public boolean isWriteToWAL() {
785      return writeToWAL;
786    }
787
788    public Connection getConnection() {
789      return connection;
790    }
791
792    public boolean isUseTags() {
793      return this.useTags;
794    }
795
796    public int getNumTags() {
797      return this.noOfTags;
798    }
799  }
800
801  /*
802   * A test. Subclass to particularize what happens per row.
803   */
804  static abstract class Test {
805    // Below is make it so when Tests are all running in the one
806    // jvm, that they each have a differently seeded Random.
807    private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime());
808
809    private static long nextRandomSeed() {
810      return randomSeed.nextLong();
811    }
812
813    protected final Random rand = new Random(nextRandomSeed());
814
815    protected final int startRow;
816    protected final int perClientRunRows;
817    protected final int totalRows;
818    private final Status status;
819    protected TableName tableName;
820    protected volatile Configuration conf;
821    protected boolean writeToWAL;
822    protected boolean useTags;
823    protected int noOfTags;
824    protected Connection connection;
825
826    /**
827     * Note that all subclasses of this class must provide a public contructor that has the exact
828     * same list of arguments.
829     */
830    Test(final Configuration conf, final TestOptions options, final Status status) {
831      super();
832      this.startRow = options.getStartRow();
833      this.perClientRunRows = options.getPerClientRunRows();
834      this.totalRows = options.getTotalRows();
835      this.status = status;
836      this.tableName = options.getTableName();
837      this.conf = conf;
838      this.writeToWAL = options.isWriteToWAL();
839      this.useTags = options.isUseTags();
840      this.noOfTags = options.getNumTags();
841      this.connection = options.getConnection();
842    }
843
844    protected String generateStatus(final int sr, final int i, final int lr) {
845      return sr + "/" + i + "/" + lr;
846    }
847
848    protected int getReportingPeriod() {
849      int period = this.perClientRunRows / 10;
850      return period == 0 ? this.perClientRunRows : period;
851    }
852
853    abstract void testTakedown() throws IOException;
854
855    /**
856     * Run test
857     * @return Elapsed time.
858     * @throws IOException if something in the test fails
859     */
860    long test() throws IOException {
861      testSetup();
862      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
863      final long startTime = System.nanoTime();
864      try {
865        testTimed();
866      } finally {
867        testTakedown();
868      }
869      return (System.nanoTime() - startTime) / 1000000;
870    }
871
872    abstract void testSetup() throws IOException;
873
874    /**
875     * Provides an extension point for tests that don't want a per row invocation.
876     */
877    void testTimed() throws IOException {
878      int lastRow = this.startRow + this.perClientRunRows;
879      // Report on completion of 1/10th of total.
880      for (int i = this.startRow; i < lastRow; i++) {
881        testRow(i);
882        if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
883          status.setStatus(generateStatus(this.startRow, i, lastRow));
884        }
885      }
886    }
887
888    /**
889     * Test for individual row.
890     * @param i Row index.
891     */
892    abstract void testRow(final int i) throws IOException;
893  }
894
895  static abstract class TableTest extends Test {
896    protected Table table;
897
898    public TableTest(Configuration conf, TestOptions options, Status status) {
899      super(conf, options, status);
900    }
901
902    @Override
903    void testSetup() throws IOException {
904      this.table = connection.getTable(tableName);
905    }
906
907    @Override
908    void testTakedown() throws IOException {
909      table.close();
910    }
911  }
912
913  static abstract class BufferedMutatorTest extends Test {
914    protected BufferedMutator mutator;
915    protected boolean flushCommits;
916
917    public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) {
918      super(conf, options, status);
919      this.flushCommits = options.isFlushCommits();
920    }
921
922    @Override
923    void testSetup() throws IOException {
924      this.mutator = connection.getBufferedMutator(tableName);
925    }
926
927    @Override
928    void testTakedown() throws IOException {
929      if (flushCommits) {
930        this.mutator.flush();
931      }
932      mutator.close();
933    }
934  }
935
936  static class RandomSeekScanTest extends TableTest {
937    RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
938      super(conf, options, status);
939    }
940
941    @Override
942    void testRow(final int i) throws IOException {
943      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, this.totalRows));
944      scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
945      scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
946      ResultScanner s = this.table.getScanner(scan);
947      s.close();
948    }
949
950    @Override
951    protected int getReportingPeriod() {
952      int period = this.perClientRunRows / 100;
953      return period == 0 ? this.perClientRunRows : period;
954    }
955  }
956
957  @SuppressWarnings("unused")
958  static abstract class RandomScanWithRangeTest extends TableTest {
959    RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
960      super(conf, options, status);
961    }
962
963    @Override
964    void testRow(final int i) throws IOException {
965      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
966      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
967        .withStopRow(startAndStopRow.getSecond());
968      scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
969      ResultScanner s = this.table.getScanner(scan);
970      int count = 0;
971      for (Result rr = null; (rr = s.next()) != null;) {
972        count++;
973      }
974
975      if (i % 100 == 0) {
976        LOG.info(String.format("Scan for key range %s - %s returned %s rows",
977          Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
978          count));
979      }
980
981      s.close();
982    }
983
984    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
985
986    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
987      int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
988      int stop = start + maxRange;
989      return new Pair<>(format(start), format(stop));
990    }
991
992    @Override
993    protected int getReportingPeriod() {
994      int period = this.perClientRunRows / 100;
995      return period == 0 ? this.perClientRunRows : period;
996    }
997  }
998
999  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1000    RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
1001      super(conf, options, status);
1002    }
1003
1004    @Override
1005    protected Pair<byte[], byte[]> getStartAndStopRow() {
1006      return generateStartAndStopRows(10);
1007    }
1008  }
1009
1010  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1011    RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
1012      super(conf, options, status);
1013    }
1014
1015    @Override
1016    protected Pair<byte[], byte[]> getStartAndStopRow() {
1017      return generateStartAndStopRows(100);
1018    }
1019  }
1020
1021  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1022    RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
1023      super(conf, options, status);
1024    }
1025
1026    @Override
1027    protected Pair<byte[], byte[]> getStartAndStopRow() {
1028      return generateStartAndStopRows(1000);
1029    }
1030  }
1031
1032  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1033    RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
1034      super(conf, options, status);
1035    }
1036
1037    @Override
1038    protected Pair<byte[], byte[]> getStartAndStopRow() {
1039      return generateStartAndStopRows(10000);
1040    }
1041  }
1042
1043  static class RandomReadTest extends TableTest {
1044    RandomReadTest(Configuration conf, TestOptions options, Status status) {
1045      super(conf, options, status);
1046    }
1047
1048    @Override
1049    void testRow(final int i) throws IOException {
1050      Get get = new Get(getRandomRow(this.rand, this.totalRows));
1051      get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1052      this.table.get(get);
1053    }
1054
1055    @Override
1056    protected int getReportingPeriod() {
1057      int period = this.perClientRunRows / 100;
1058      return period == 0 ? this.perClientRunRows : period;
1059    }
1060  }
1061
1062  static class RandomWriteTest extends BufferedMutatorTest {
1063    RandomWriteTest(Configuration conf, TestOptions options, Status status) {
1064      super(conf, options, status);
1065    }
1066
1067    @Override
1068    void testRow(final int i) throws IOException {
1069      byte[] row = getRandomRow(this.rand, this.totalRows);
1070      Put put = new Put(row);
1071      byte[] value = generateData(this.rand, ROW_LENGTH);
1072      if (useTags) {
1073        byte[] tag = generateData(this.rand, TAG_LENGTH);
1074        Tag[] tags = new Tag[noOfTags];
1075        for (int n = 0; n < noOfTags; n++) {
1076          Tag t = new ArrayBackedTag((byte) n, tag);
1077          tags[n] = t;
1078        }
1079        KeyValue kv =
1080          new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags);
1081        put.add(kv);
1082      } else {
1083        put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value);
1084      }
1085      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1086      mutator.mutate(put);
1087    }
1088  }
1089
1090  static class ScanTest extends TableTest {
1091    private ResultScanner testScanner;
1092
1093    ScanTest(Configuration conf, TestOptions options, Status status) {
1094      super(conf, options, status);
1095    }
1096
1097    @Override
1098    void testTakedown() throws IOException {
1099      if (this.testScanner != null) {
1100        this.testScanner.close();
1101      }
1102      super.testTakedown();
1103    }
1104
1105    @Override
1106    void testRow(final int i) throws IOException {
1107      if (this.testScanner == null) {
1108        Scan scan = new Scan().withStartRow(format(this.startRow));
1109        scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1110        this.testScanner = table.getScanner(scan);
1111      }
1112      testScanner.next();
1113    }
1114  }
1115
1116  static class SequentialReadTest extends TableTest {
1117    SequentialReadTest(Configuration conf, TestOptions options, Status status) {
1118      super(conf, options, status);
1119    }
1120
1121    @Override
1122    void testRow(final int i) throws IOException {
1123      Get get = new Get(format(i));
1124      get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1125      table.get(get);
1126    }
1127  }
1128
1129  static class SequentialWriteTest extends BufferedMutatorTest {
1130    SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
1131      super(conf, options, status);
1132    }
1133
1134    @Override
1135    void testRow(final int i) throws IOException {
1136      byte[] row = format(i);
1137      Put put = new Put(row);
1138      byte[] value = generateData(this.rand, ROW_LENGTH);
1139      if (useTags) {
1140        byte[] tag = generateData(this.rand, TAG_LENGTH);
1141        Tag[] tags = new Tag[noOfTags];
1142        for (int n = 0; n < noOfTags; n++) {
1143          Tag t = new ArrayBackedTag((byte) n, tag);
1144          tags[n] = t;
1145        }
1146        KeyValue kv =
1147          new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags);
1148        put.add(kv);
1149      } else {
1150        put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value);
1151      }
1152      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1153      mutator.mutate(put);
1154    }
1155  }
1156
1157  static class FilteredScanTest extends TableTest {
1158    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
1159
1160    FilteredScanTest(Configuration conf, TestOptions options, Status status) {
1161      super(conf, options, status);
1162    }
1163
1164    @Override
1165    void testRow(int i) throws IOException {
1166      byte[] value = generateValue(this.rand);
1167      Scan scan = constructScan(value);
1168      try (ResultScanner scanner = this.table.getScanner(scan)) {
1169        while (scanner.next() != null) {
1170        }
1171      }
1172    }
1173
1174    protected Scan constructScan(byte[] valuePrefix) {
1175      Filter filter = new SingleColumnValueFilter(FAMILY_NAME, QUALIFIER_NAME,
1176        CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
1177      Scan scan = new Scan();
1178      scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1179      scan.setFilter(filter);
1180      return scan;
1181    }
1182  }
1183
1184  /**
1185   * Format passed integer.
1186   * @param number the integer to format
1187   * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in
1188   *         case number is negative).
1189   */
1190  public static byte[] format(final int number) {
1191    byte[] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10];
1192    int d = Math.abs(number);
1193    for (int i = b.length - 1; i >= 0; i--) {
1194      b[i] = (byte) ((d % 10) + '0');
1195      d /= 10;
1196    }
1197    return b;
1198  }
1199
1200  public static byte[] generateData(final Random r, int length) {
1201    byte[] b = new byte[length];
1202    int i;
1203
1204    for (i = 0; i < (length - 8); i += 8) {
1205      b[i] = (byte) (65 + r.nextInt(26));
1206      b[i + 1] = b[i];
1207      b[i + 2] = b[i];
1208      b[i + 3] = b[i];
1209      b[i + 4] = b[i];
1210      b[i + 5] = b[i];
1211      b[i + 6] = b[i];
1212      b[i + 7] = b[i];
1213    }
1214
1215    byte a = (byte) (65 + r.nextInt(26));
1216    for (; i < length; i++) {
1217      b[i] = a;
1218    }
1219    return b;
1220  }
1221
1222  public static byte[] generateValue(final Random r) {
1223    byte[] b = new byte[ROW_LENGTH];
1224    r.nextBytes(b);
1225    return b;
1226  }
1227
1228  static byte[] getRandomRow(final Random random, final int totalRows) {
1229    return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1230  }
1231
1232  long runOneClient(final Class<? extends Test> cmd, final int startRow, final int perClientRunRows,
1233    final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
1234    Connection connection, final Status status) throws IOException {
1235    status
1236      .setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows");
1237    long totalElapsedTime;
1238
1239    TestOptions options = new TestOptions(startRow, perClientRunRows, totalRows, tableName,
1240      flushCommits, writeToWAL, useTags, noOfTags, connection);
1241    final Test t;
1242    try {
1243      Constructor<? extends Test> constructor =
1244        cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
1245      t = constructor.newInstance(this.conf, options, status);
1246    } catch (NoSuchMethodException e) {
1247      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
1248        + ".  It does not provide a constructor as described by"
1249        + "the javadoc comment.  Available constructors are: "
1250        + Arrays.toString(cmd.getConstructors()));
1251    } catch (Exception e) {
1252      throw new IllegalStateException("Failed to construct command class", e);
1253    }
1254    totalElapsedTime = t.test();
1255
1256    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + startRow
1257      + " for " + perClientRunRows + " rows");
1258    return totalElapsedTime;
1259  }
1260
1261  private void runNIsOne(final Class<? extends Test> cmd) {
1262    Status status = LOG::info;
1263
1264    RemoteAdmin admin;
1265    try {
1266      Client client = new Client(cluster);
1267      admin = new RemoteAdmin(client, getConf());
1268      checkTable(admin);
1269      runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, this.useTags,
1270        this.noOfTags, this.connection, status);
1271    } catch (Exception e) {
1272      LOG.error("Failed", e);
1273    }
1274  }
1275
1276  private void runTest(final Class<? extends Test> cmd)
1277    throws IOException, InterruptedException, ClassNotFoundException {
1278    if (N == 1) {
1279      // If there is only one client and one HRegionServer, we assume nothing
1280      // has been set up at all.
1281      runNIsOne(cmd);
1282    } else {
1283      // Else, run
1284      runNIsMoreThanOne(cmd);
1285    }
1286  }
1287
1288  protected void printUsage() {
1289    printUsage(null);
1290  }
1291
1292  protected void printUsage(final String message) {
1293    if (message != null && message.length() > 0) {
1294      System.err.println(message);
1295    }
1296    System.err.println("Usage: java " + this.getClass().getName() + " \\");
1297    System.err.println("  [--nomapred] [--rows=ROWS] [--table=NAME] \\");
1298    System.err.println(
1299      "  [--compress=TYPE] [--blockEncoding=TYPE] " + "[-D<property=value>]* <command> <nclients>");
1300    System.err.println();
1301    System.err.println("General Options:");
1302    System.err.println(
1303      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
1304    System.err.println(" rows            Rows each client runs. Default: One million");
1305    System.err.println();
1306    System.err.println("Table Creation / Write Tests:");
1307    System.err.println(" table           Alternate table name. Default: 'TestTable'");
1308    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
1309    System.err.println(
1310      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
1311    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
1312    System.err.println(" presplit        Create presplit table. Recommended for accurate perf "
1313      + "analysis (see guide).  Default: disabled");
1314    System.err.println(
1315      " usetags         Writes tags along with KVs.  Use with HFile V3. " + "Default : false");
1316    System.err.println(" numoftags        Specify the no of tags that would be needed. "
1317      + "This works only if usetags is true.");
1318    System.err.println();
1319    System.err.println("Read Tests:");
1320    System.err.println(" inmemory        Tries to keep the HFiles of the CF inmemory as far as "
1321      + "possible.  Not guaranteed that reads are always served from inmemory.  Default: false");
1322    System.err.println();
1323    System.err.println(" Note: -D properties will be applied to the conf used. ");
1324    System.err.println("  For example: ");
1325    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
1326    System.err.println("   -Dmapreduce.task.timeout=60000");
1327    System.err.println();
1328    System.err.println("Command:");
1329    for (CmdDescriptor command : commands.values()) {
1330      System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1331    }
1332    System.err.println();
1333    System.err.println("Args:");
1334    System.err.println(
1335      " nclients      Integer. Required. Total number of " + "clients (and HRegionServers)");
1336    System.err.println("               running: 1 <= value <= 500");
1337    System.err.println("Examples:");
1338    System.err.println(" To run a single evaluation client:");
1339    System.err.println(" $ hbase " + this.getClass().getName() + " sequentialWrite 1");
1340  }
1341
1342  private void getArgs(final int start, final String[] args) {
1343    if (start + 1 > args.length) {
1344      throw new IllegalArgumentException("must supply the number of clients");
1345    }
1346    N = Integer.parseInt(args[start]);
1347    if (N < 1) {
1348      throw new IllegalArgumentException("Number of clients must be > 1");
1349    }
1350    // Set total number of rows to write.
1351    R = R * N;
1352  }
1353
1354  @Override
1355  public int run(String[] args) throws Exception {
1356    // Process command-line args. TODO: Better cmd-line processing
1357    // (but hopefully something not as painful as cli options).
1358    int errCode = -1;
1359    if (args.length < 1) {
1360      printUsage();
1361      return errCode;
1362    }
1363
1364    try {
1365      for (int i = 0; i < args.length; i++) {
1366        String cmd = args[i];
1367        if (cmd.equals("-h") || cmd.startsWith("--h")) {
1368          printUsage();
1369          errCode = 0;
1370          break;
1371        }
1372
1373        final String nmr = "--nomapred";
1374        if (cmd.startsWith(nmr)) {
1375          nomapred = true;
1376          continue;
1377        }
1378
1379        final String rows = "--rows=";
1380        if (cmd.startsWith(rows)) {
1381          R = Integer.parseInt(cmd.substring(rows.length()));
1382          continue;
1383        }
1384
1385        final String table = "--table=";
1386        if (cmd.startsWith(table)) {
1387          this.tableName = TableName.valueOf(cmd.substring(table.length()));
1388          continue;
1389        }
1390
1391        final String compress = "--compress=";
1392        if (cmd.startsWith(compress)) {
1393          this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
1394          continue;
1395        }
1396
1397        final String blockEncoding = "--blockEncoding=";
1398        if (cmd.startsWith(blockEncoding)) {
1399          this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
1400          continue;
1401        }
1402
1403        final String flushCommits = "--flushCommits=";
1404        if (cmd.startsWith(flushCommits)) {
1405          this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1406          continue;
1407        }
1408
1409        final String writeToWAL = "--writeToWAL=";
1410        if (cmd.startsWith(writeToWAL)) {
1411          this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1412          continue;
1413        }
1414
1415        final String presplit = "--presplit=";
1416        if (cmd.startsWith(presplit)) {
1417          this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
1418          continue;
1419        }
1420
1421        final String inMemory = "--inmemory=";
1422        if (cmd.startsWith(inMemory)) {
1423          this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
1424          continue;
1425        }
1426
1427        this.connection = ConnectionFactory.createConnection(getConf());
1428
1429        final String useTags = "--usetags=";
1430        if (cmd.startsWith(useTags)) {
1431          this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
1432          continue;
1433        }
1434
1435        final String noOfTags = "--nooftags=";
1436        if (cmd.startsWith(noOfTags)) {
1437          this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
1438          continue;
1439        }
1440
1441        final String host = "--host=";
1442        if (cmd.startsWith(host)) {
1443          cluster.add(cmd.substring(host.length()));
1444          continue;
1445        }
1446
1447        Class<? extends Test> cmdClass = determineCommandClass(cmd);
1448        if (cmdClass != null) {
1449          getArgs(i + 1, args);
1450          if (cluster.isEmpty()) {
1451            String s = conf.get("stargate.hostname", "localhost");
1452            if (s.contains(":")) {
1453              cluster.add(s);
1454            } else {
1455              cluster.add(s, conf.getInt("stargate.port", 8080));
1456            }
1457          }
1458          runTest(cmdClass);
1459          errCode = 0;
1460          break;
1461        }
1462
1463        printUsage();
1464        break;
1465      }
1466    } catch (Exception e) {
1467      LOG.error("Failed", e);
1468    }
1469
1470    return errCode;
1471  }
1472
1473  private Class<? extends Test> determineCommandClass(String cmd) {
1474    CmdDescriptor descriptor = commands.get(cmd);
1475    return descriptor != null ? descriptor.getCmdClass() : null;
1476  }
1477
1478  public static void main(final String[] args) throws Exception {
1479    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
1480    System.exit(res);
1481  }
1482}