001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rest; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.io.PrintStream; 024import java.lang.reflect.Constructor; 025import java.text.SimpleDateFormat; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Date; 029import java.util.List; 030import java.util.Map; 031import java.util.Random; 032import java.util.TreeMap; 033import java.util.regex.Matcher; 034import java.util.regex.Pattern; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.conf.Configured; 037import org.apache.hadoop.fs.FSDataInputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.ArrayBackedTag; 042import org.apache.hadoop.hbase.CompareOperator; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HColumnDescriptor; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HTableDescriptor; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.Tag; 050import org.apache.hadoop.hbase.client.BufferedMutator; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.Durability; 054import org.apache.hadoop.hbase.client.Get; 055import org.apache.hadoop.hbase.client.Put; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.ResultScanner; 058import org.apache.hadoop.hbase.client.Scan; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.filter.BinaryComparator; 061import org.apache.hadoop.hbase.filter.Filter; 062import org.apache.hadoop.hbase.filter.PageFilter; 063import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 064import org.apache.hadoop.hbase.filter.WhileMatchFilter; 065import org.apache.hadoop.hbase.io.compress.Compression; 066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 067import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 068import org.apache.hadoop.hbase.rest.client.Client; 069import org.apache.hadoop.hbase.rest.client.Cluster; 070import org.apache.hadoop.hbase.rest.client.RemoteAdmin; 071import org.apache.hadoop.hbase.util.ByteArrayHashKey; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.Hash; 075import org.apache.hadoop.hbase.util.MurmurHash; 076import org.apache.hadoop.hbase.util.Pair; 077import org.apache.hadoop.io.LongWritable; 078import org.apache.hadoop.io.NullWritable; 079import org.apache.hadoop.io.Text; 080import org.apache.hadoop.io.Writable; 081import org.apache.hadoop.mapreduce.InputSplit; 082import org.apache.hadoop.mapreduce.Job; 083import org.apache.hadoop.mapreduce.JobContext; 084import org.apache.hadoop.mapreduce.Mapper; 085import org.apache.hadoop.mapreduce.RecordReader; 086import org.apache.hadoop.mapreduce.TaskAttemptContext; 087import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 088import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 089import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 090import org.apache.hadoop.util.LineReader; 091import org.apache.hadoop.util.Tool; 092import org.apache.hadoop.util.ToolRunner; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096/** 097 * Script used evaluating Stargate performance and scalability. Runs a SG client that steps through 098 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 099 * etc.). Pass on the command-line which test to run and how many clients are participating in this 100 * experiment. Run <code>java PerformanceEvaluation --help</code> to obtain usage. 101 * <p> 102 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 103 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 104 * pages 8-10. 105 * <p> 106 * If number of clients > 1, we start up a MapReduce job. Each map task runs an individual client. 107 * Each client does about 1GB of data. 108 */ 109public class PerformanceEvaluation extends Configured implements Tool { 110 protected static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class); 111 112 private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; 113 private static final int ROW_LENGTH = 1000; 114 private static final int TAG_LENGTH = 256; 115 private static final int ONE_GB = 1024 * 1024 * 1000; 116 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; 117 118 public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); 119 public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); 120 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); 121 private TableName tableName = TABLE_NAME; 122 123 protected HTableDescriptor TABLE_DESCRIPTOR; 124 protected Map<String, CmdDescriptor> commands = new TreeMap<>(); 125 protected static Cluster cluster = new Cluster(); 126 127 volatile Configuration conf; 128 private boolean nomapred = false; 129 private int N = 1; 130 private int R = ROWS_PER_GB; 131 private Compression.Algorithm compression = Compression.Algorithm.NONE; 132 private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 133 private boolean flushCommits = true; 134 private boolean writeToWAL = true; 135 private boolean inMemoryCF = false; 136 private int presplitRegions = 0; 137 private boolean useTags = false; 138 private int noOfTags = 1; 139 private Connection connection; 140 141 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 142 143 /** 144 * Regex to parse lines in input file passed to mapreduce task. 145 */ 146 public static final Pattern LINE_PATTERN = Pattern 147 .compile("tableName=(\\w+),\\s+" + "startRow=(\\d+),\\s+" + "perClientRunRows=(\\d+),\\s+" 148 + "totalRows=(\\d+),\\s+" + "clients=(\\d+),\\s+" + "flushCommits=(\\w+),\\s+" 149 + "writeToWAL=(\\w+),\\s+" + "useTags=(\\w+),\\s+" + "noOfTags=(\\d+)"); 150 151 /** 152 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 153 * associated properties. 154 */ 155 protected enum Counter { 156 /** elapsed time */ 157 ELAPSED_TIME, 158 /** number of rows */ 159 ROWS 160 } 161 162 /** 163 * Constructor 164 * @param c Configuration object 165 */ 166 public PerformanceEvaluation(final Configuration c) { 167 this.conf = c; 168 169 addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test"); 170 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", 171 "Run random seek and scan 100 test"); 172 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 173 "Run random seek scan with both start and stop row (max 10 rows)"); 174 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 175 "Run random seek scan with both start and stop row (max 100 rows)"); 176 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 177 "Run random seek scan with both start and stop row (max 1000 rows)"); 178 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 179 "Run random seek scan with both start and stop row (max 10000 rows)"); 180 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 181 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 182 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 183 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 184 addCommandDescriptor(FilteredScanTest.class, "filterScan", 185 "Run scan test using a filter to find a specific row based " 186 + "on it's value (make sure to use --rows=20)"); 187 } 188 189 protected void addCommandDescriptor(Class<? extends Test> cmdClass, String name, 190 String description) { 191 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 192 commands.put(name, cmdDescriptor); 193 } 194 195 /** 196 * Implementations can have their status set. 197 */ 198 interface Status { 199 /** 200 * Sets status 201 * @param msg status message 202 * @throws IOException if setting the status fails 203 */ 204 void setStatus(final String msg) throws IOException; 205 } 206 207 /** 208 * This class works as the InputSplit of Performance Evaluation MapReduce InputFormat, and the 209 * Record Value of RecordReader. Each map task will only read one record from a PeInputSplit, the 210 * record value is the PeInputSplit itself. 211 */ 212 public static class PeInputSplit extends InputSplit implements Writable { 213 private TableName tableName; 214 private int startRow; 215 private int rows; 216 private int totalRows; 217 private int clients; 218 private boolean flushCommits; 219 private boolean writeToWAL; 220 private boolean useTags; 221 private int noOfTags; 222 223 public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, 224 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { 225 this.tableName = tableName; 226 this.startRow = startRow; 227 this.rows = rows; 228 this.totalRows = totalRows; 229 this.clients = clients; 230 this.flushCommits = flushCommits; 231 this.writeToWAL = writeToWAL; 232 this.useTags = useTags; 233 this.noOfTags = noOfTags; 234 } 235 236 @Override 237 public void readFields(DataInput in) throws IOException { 238 int tableNameLen = in.readInt(); 239 byte[] name = new byte[tableNameLen]; 240 in.readFully(name); 241 this.tableName = TableName.valueOf(name); 242 this.startRow = in.readInt(); 243 this.rows = in.readInt(); 244 this.totalRows = in.readInt(); 245 this.clients = in.readInt(); 246 this.flushCommits = in.readBoolean(); 247 this.writeToWAL = in.readBoolean(); 248 this.useTags = in.readBoolean(); 249 this.noOfTags = in.readInt(); 250 } 251 252 @Override 253 public void write(DataOutput out) throws IOException { 254 byte[] name = this.tableName.toBytes(); 255 out.writeInt(name.length); 256 out.write(name); 257 out.writeInt(startRow); 258 out.writeInt(rows); 259 out.writeInt(totalRows); 260 out.writeInt(clients); 261 out.writeBoolean(flushCommits); 262 out.writeBoolean(writeToWAL); 263 out.writeBoolean(useTags); 264 out.writeInt(noOfTags); 265 } 266 267 @Override 268 public long getLength() { 269 return 0; 270 } 271 272 @Override 273 public String[] getLocations() { 274 return new String[0]; 275 } 276 277 public int getStartRow() { 278 return startRow; 279 } 280 281 public TableName getTableName() { 282 return tableName; 283 } 284 285 public int getRows() { 286 return rows; 287 } 288 289 public int getTotalRows() { 290 return totalRows; 291 } 292 293 public boolean isFlushCommits() { 294 return flushCommits; 295 } 296 297 public boolean isWriteToWAL() { 298 return writeToWAL; 299 } 300 301 public boolean isUseTags() { 302 return useTags; 303 } 304 305 public int getNoOfTags() { 306 return noOfTags; 307 } 308 } 309 310 /** 311 * InputFormat of Performance Evaluation MapReduce job. It extends from FileInputFormat, want to 312 * use it's methods such as setInputPaths(). 313 */ 314 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> { 315 @Override 316 public List<InputSplit> getSplits(JobContext job) throws IOException { 317 // generate splits 318 List<InputSplit> splitList = new ArrayList<>(); 319 320 for (FileStatus file : listStatus(job)) { 321 if (file.isDirectory()) { 322 continue; 323 } 324 Path path = file.getPath(); 325 FileSystem fs = path.getFileSystem(job.getConfiguration()); 326 FSDataInputStream fileIn = fs.open(path); 327 LineReader in = new LineReader(fileIn, job.getConfiguration()); 328 int lineLen; 329 while (true) { 330 Text lineText = new Text(); 331 lineLen = in.readLine(lineText); 332 if (lineLen <= 0) { 333 break; 334 } 335 Matcher m = LINE_PATTERN.matcher(lineText.toString()); 336 if ((m != null) && m.matches()) { 337 TableName tableName = TableName.valueOf(m.group(1)); 338 int startRow = Integer.parseInt(m.group(2)); 339 int rows = Integer.parseInt(m.group(3)); 340 int totalRows = Integer.parseInt(m.group(4)); 341 int clients = Integer.parseInt(m.group(5)); 342 boolean flushCommits = Boolean.parseBoolean(m.group(6)); 343 boolean writeToWAL = Boolean.parseBoolean(m.group(7)); 344 boolean useTags = Boolean.parseBoolean(m.group(8)); 345 int noOfTags = Integer.parseInt(m.group(9)); 346 347 LOG.debug("tableName=" + tableName + " split[" + splitList.size() + "] " + " startRow=" 348 + startRow + " rows=" + rows + " totalRows=" + totalRows + " clients=" + clients 349 + " flushCommits=" + flushCommits + " writeToWAL=" + writeToWAL + " useTags=" 350 + useTags + " noOfTags=" + noOfTags); 351 352 PeInputSplit newSplit = new PeInputSplit(tableName, startRow, rows, totalRows, clients, 353 flushCommits, writeToWAL, useTags, noOfTags); 354 splitList.add(newSplit); 355 } 356 } 357 in.close(); 358 } 359 360 LOG.info("Total # of splits: " + splitList.size()); 361 return splitList; 362 } 363 364 @Override 365 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split, 366 TaskAttemptContext context) { 367 return new PeRecordReader(); 368 } 369 370 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> { 371 private boolean readOver = false; 372 private PeInputSplit split = null; 373 private NullWritable key = null; 374 private PeInputSplit value = null; 375 376 @Override 377 public void initialize(InputSplit split, TaskAttemptContext context) { 378 this.readOver = false; 379 this.split = (PeInputSplit) split; 380 } 381 382 @Override 383 public boolean nextKeyValue() { 384 if (readOver) { 385 return false; 386 } 387 388 key = NullWritable.get(); 389 value = split; 390 391 readOver = true; 392 return true; 393 } 394 395 @Override 396 public NullWritable getCurrentKey() { 397 return key; 398 } 399 400 @Override 401 public PeInputSplit getCurrentValue() { 402 return value; 403 } 404 405 @Override 406 public float getProgress() { 407 if (readOver) { 408 return 1.0f; 409 } else { 410 return 0.0f; 411 } 412 } 413 414 @Override 415 public void close() { 416 // do nothing 417 } 418 } 419 } 420 421 /** 422 * MapReduce job that runs a performance evaluation client in each map task. 423 */ 424 public static class EvaluationMapTask 425 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> { 426 427 /** configuration parameter name that contains the command */ 428 public final static String CMD_KEY = "EvaluationMapTask.command"; 429 /** configuration parameter name that contains the PE impl */ 430 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 431 432 private Class<? extends Test> cmd; 433 private PerformanceEvaluation pe; 434 435 @Override 436 protected void setup(Context context) { 437 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 438 439 // this is required so that extensions of PE are instantiated within the 440 // map reduce task... 441 Class<? extends PerformanceEvaluation> peClass = 442 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 443 try { 444 this.pe = 445 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 446 } catch (Exception e) { 447 throw new IllegalStateException("Could not instantiate PE instance", e); 448 } 449 } 450 451 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 452 Class<? extends Type> clazz; 453 try { 454 clazz = Class.forName(className).asSubclass(type); 455 } catch (ClassNotFoundException e) { 456 throw new IllegalStateException("Could not find class for name: " + className, e); 457 } 458 return clazz; 459 } 460 461 @Override 462 protected void map(NullWritable key, PeInputSplit value, final Context context) 463 throws IOException, InterruptedException { 464 Status status = context::setStatus; 465 466 // Evaluation task 467 pe.tableName = value.getTableName(); 468 long elapsedTime = 469 this.pe.runOneClient(this.cmd, value.getStartRow(), value.getRows(), value.getTotalRows(), 470 value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(), 471 ConnectionFactory.createConnection(context.getConfiguration()), status); 472 // Collect how much time the thing took. Report as map output and 473 // to the ELAPSED_TIME counter. 474 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); 475 context.getCounter(Counter.ROWS).increment(value.rows); 476 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); 477 context.progress(); 478 } 479 } 480 481 /** 482 * If table does not already exist, create. 483 * @param admin Client to use checking. 484 * @return True if we created the table. 485 * @throws IOException if an operation on the table fails 486 */ 487 private boolean checkTable(RemoteAdmin admin) throws IOException { 488 HTableDescriptor tableDescriptor = getTableDescriptor(); 489 if (this.presplitRegions > 0) { 490 // presplit requested 491 if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) { 492 admin.deleteTable(tableDescriptor.getTableName().getName()); 493 } 494 495 byte[][] splits = getSplits(); 496 for (int i = 0; i < splits.length; i++) { 497 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 498 } 499 admin.createTable(tableDescriptor); 500 LOG.info("Table created with " + this.presplitRegions + " splits"); 501 } else { 502 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 503 if (!tableExists) { 504 admin.createTable(tableDescriptor); 505 LOG.info("Table " + tableDescriptor + " created"); 506 } 507 } 508 509 return admin.isTableAvailable(tableDescriptor.getTableName().getName()); 510 } 511 512 protected HTableDescriptor getTableDescriptor() { 513 if (TABLE_DESCRIPTOR == null) { 514 TABLE_DESCRIPTOR = new HTableDescriptor(tableName); 515 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 516 family.setDataBlockEncoding(blockEncoding); 517 family.setCompressionType(compression); 518 if (inMemoryCF) { 519 family.setInMemory(true); 520 } 521 TABLE_DESCRIPTOR.addFamily(family); 522 } 523 return TABLE_DESCRIPTOR; 524 } 525 526 /** 527 * Generates splits based on total number of rows and specified split regions 528 * @return splits : array of byte [] 529 */ 530 protected byte[][] getSplits() { 531 if (this.presplitRegions == 0) { 532 return new byte[0][]; 533 } 534 535 int numSplitPoints = presplitRegions - 1; 536 byte[][] splits = new byte[numSplitPoints][]; 537 int jump = this.R / this.presplitRegions; 538 for (int i = 0; i < numSplitPoints; i++) { 539 int rowkey = jump * (1 + i); 540 splits[i] = format(rowkey); 541 } 542 return splits; 543 } 544 545 /** 546 * We're to run multiple clients concurrently. Setup a mapreduce job. Run one map per client. Then 547 * run a single reduce to sum the elapsed times. 548 * @param cmd Command to run. 549 */ 550 private void runNIsMoreThanOne(final Class<? extends Test> cmd) 551 throws IOException, InterruptedException, ClassNotFoundException { 552 RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf()); 553 checkTable(remoteAdmin); 554 if (nomapred) { 555 doMultipleClients(cmd); 556 } else { 557 doMapReduce(cmd); 558 } 559 } 560 561 /** 562 * Run all clients in this vm each to its own thread. 563 * @param cmd Command to run 564 * @throws IOException if creating a connection fails 565 */ 566 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException { 567 final List<Thread> threads = new ArrayList<>(this.N); 568 final long[] timings = new long[this.N]; 569 final int perClientRows = R / N; 570 final TableName tableName = this.tableName; 571 final DataBlockEncoding encoding = this.blockEncoding; 572 final boolean flushCommits = this.flushCommits; 573 final Compression.Algorithm compression = this.compression; 574 final boolean writeToWal = this.writeToWAL; 575 final int preSplitRegions = this.presplitRegions; 576 final boolean useTags = this.useTags; 577 final int numTags = this.noOfTags; 578 final Connection connection = ConnectionFactory.createConnection(getConf()); 579 for (int i = 0; i < this.N; i++) { 580 final int index = i; 581 Thread t = new Thread("TestClient-" + i) { 582 @Override 583 public void run() { 584 super.run(); 585 PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); 586 pe.tableName = tableName; 587 pe.blockEncoding = encoding; 588 pe.flushCommits = flushCommits; 589 pe.compression = compression; 590 pe.writeToWAL = writeToWal; 591 pe.presplitRegions = preSplitRegions; 592 pe.N = N; 593 pe.connection = connection; 594 pe.useTags = useTags; 595 pe.noOfTags = numTags; 596 try { 597 long elapsedTime = pe.runOneClient(cmd, index * perClientRows, perClientRows, R, 598 flushCommits, writeToWAL, useTags, noOfTags, connection, 599 msg -> LOG.info("client-" + getName() + " " + msg)); 600 timings[index] = elapsedTime; 601 LOG.info("Finished " + getName() + " in " + elapsedTime + "ms writing " + perClientRows 602 + " rows"); 603 } catch (IOException e) { 604 throw new RuntimeException(e); 605 } 606 } 607 }; 608 threads.add(t); 609 } 610 for (Thread t : threads) { 611 t.start(); 612 } 613 for (Thread t : threads) { 614 while (t.isAlive()) { 615 try { 616 t.join(); 617 } catch (InterruptedException e) { 618 LOG.debug("Interrupted, continuing" + e.toString()); 619 } 620 } 621 } 622 final String test = cmd.getSimpleName(); 623 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(timings)); 624 Arrays.sort(timings); 625 long total = 0; 626 for (int i = 0; i < this.N; i++) { 627 total += timings[i]; 628 } 629 LOG.info("[" + test + "]" + "\tMin: " + timings[0] + "ms" + "\tMax: " + timings[this.N - 1] 630 + "ms" + "\tAvg: " + (total / this.N) + "ms"); 631 } 632 633 /** 634 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 635 * out an input file with instruction per client regards which row they are to start on. 636 * @param cmd Command to run. 637 */ 638 private void doMapReduce(final Class<? extends Test> cmd) 639 throws IOException, InterruptedException, ClassNotFoundException { 640 Configuration conf = getConf(); 641 Path inputDir = writeInputFile(conf); 642 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 643 conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); 644 Job job = Job.getInstance(conf); 645 job.setJarByClass(PerformanceEvaluation.class); 646 job.setJobName("HBase Performance Evaluation"); 647 648 job.setInputFormatClass(PeInputFormat.class); 649 PeInputFormat.setInputPaths(job, inputDir); 650 651 job.setOutputKeyClass(LongWritable.class); 652 job.setOutputValueClass(LongWritable.class); 653 654 job.setMapperClass(EvaluationMapTask.class); 655 job.setReducerClass(LongSumReducer.class); 656 job.setNumReduceTasks(1); 657 658 job.setOutputFormatClass(TextOutputFormat.class); 659 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 660 TableMapReduceUtil.addDependencyJars(job); 661 TableMapReduceUtil.initCredentials(job); 662 job.waitForCompletion(true); 663 } 664 665 /** 666 * Write input file of offsets-per-client for the mapreduce job. 667 * @param c Configuration 668 * @return Directory that contains file written. 669 * @throws IOException if creating the directory or the file fails 670 */ 671 private Path writeInputFile(final Configuration c) throws IOException { 672 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 673 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); 674 Path inputDir = new Path(jobdir, "inputs"); 675 676 FileSystem fs = FileSystem.get(c); 677 fs.mkdirs(inputDir); 678 Path inputFile = new Path(inputDir, "input.txt"); 679 // Make input random. 680 try (PrintStream out = new PrintStream(fs.create(inputFile))) { 681 Map<Integer, String> m = new TreeMap<>(); 682 Hash h = MurmurHash.getInstance(); 683 int perClientRows = (this.R / this.N); 684 for (int i = 0; i < 10; i++) { 685 for (int j = 0; j < N; j++) { 686 StringBuilder s = new StringBuilder(); 687 s.append("tableName=").append(tableName); 688 s.append(", startRow=").append((j * perClientRows) + (i * (perClientRows / 10))); 689 s.append(", perClientRunRows=").append(perClientRows / 10); 690 s.append(", totalRows=").append(R); 691 s.append(", clients=").append(N); 692 s.append(", flushCommits=").append(flushCommits); 693 s.append(", writeToWAL=").append(writeToWAL); 694 s.append(", useTags=").append(useTags); 695 s.append(", noOfTags=").append(noOfTags); 696 697 byte[] b = Bytes.toBytes(s.toString()); 698 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 699 m.put(hash, s.toString()); 700 } 701 } 702 for (Map.Entry<Integer, String> e : m.entrySet()) { 703 out.println(e.getValue()); 704 } 705 } 706 return inputDir; 707 } 708 709 /** 710 * Describes a command. 711 */ 712 static class CmdDescriptor { 713 private Class<? extends Test> cmdClass; 714 private String name; 715 private String description; 716 717 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) { 718 this.cmdClass = cmdClass; 719 this.name = name; 720 this.description = description; 721 } 722 723 public Class<? extends Test> getCmdClass() { 724 return cmdClass; 725 } 726 727 public String getName() { 728 return name; 729 } 730 731 public String getDescription() { 732 return description; 733 } 734 } 735 736 /** 737 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation} tests This 738 * makes the reflection logic a little easier to understand... 739 */ 740 static class TestOptions { 741 private int startRow; 742 private int perClientRunRows; 743 private int totalRows; 744 private TableName tableName; 745 private boolean flushCommits; 746 private boolean writeToWAL; 747 private boolean useTags; 748 private int noOfTags; 749 private Connection connection; 750 751 TestOptions(int startRow, int perClientRunRows, int totalRows, TableName tableName, 752 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 753 Connection connection) { 754 this.startRow = startRow; 755 this.perClientRunRows = perClientRunRows; 756 this.totalRows = totalRows; 757 this.tableName = tableName; 758 this.flushCommits = flushCommits; 759 this.writeToWAL = writeToWAL; 760 this.useTags = useTags; 761 this.noOfTags = noOfTags; 762 this.connection = connection; 763 } 764 765 public int getStartRow() { 766 return startRow; 767 } 768 769 public int getPerClientRunRows() { 770 return perClientRunRows; 771 } 772 773 public int getTotalRows() { 774 return totalRows; 775 } 776 777 public TableName getTableName() { 778 return tableName; 779 } 780 781 public boolean isFlushCommits() { 782 return flushCommits; 783 } 784 785 public boolean isWriteToWAL() { 786 return writeToWAL; 787 } 788 789 public Connection getConnection() { 790 return connection; 791 } 792 793 public boolean isUseTags() { 794 return this.useTags; 795 } 796 797 public int getNumTags() { 798 return this.noOfTags; 799 } 800 } 801 802 /* 803 * A test. Subclass to particularize what happens per row. 804 */ 805 static abstract class Test { 806 // Below is make it so when Tests are all running in the one 807 // jvm, that they each have a differently seeded Random. 808 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 809 810 private static long nextRandomSeed() { 811 return randomSeed.nextLong(); 812 } 813 814 protected final Random rand = new Random(nextRandomSeed()); 815 816 protected final int startRow; 817 protected final int perClientRunRows; 818 protected final int totalRows; 819 private final Status status; 820 protected TableName tableName; 821 protected volatile Configuration conf; 822 protected boolean writeToWAL; 823 protected boolean useTags; 824 protected int noOfTags; 825 protected Connection connection; 826 827 /** 828 * Note that all subclasses of this class must provide a public contructor that has the exact 829 * same list of arguments. 830 */ 831 Test(final Configuration conf, final TestOptions options, final Status status) { 832 super(); 833 this.startRow = options.getStartRow(); 834 this.perClientRunRows = options.getPerClientRunRows(); 835 this.totalRows = options.getTotalRows(); 836 this.status = status; 837 this.tableName = options.getTableName(); 838 this.conf = conf; 839 this.writeToWAL = options.isWriteToWAL(); 840 this.useTags = options.isUseTags(); 841 this.noOfTags = options.getNumTags(); 842 this.connection = options.getConnection(); 843 } 844 845 protected String generateStatus(final int sr, final int i, final int lr) { 846 return sr + "/" + i + "/" + lr; 847 } 848 849 protected int getReportingPeriod() { 850 int period = this.perClientRunRows / 10; 851 return period == 0 ? this.perClientRunRows : period; 852 } 853 854 abstract void testTakedown() throws IOException; 855 856 /** 857 * Run test 858 * @return Elapsed time. 859 * @throws IOException if something in the test fails 860 */ 861 long test() throws IOException { 862 testSetup(); 863 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 864 final long startTime = System.nanoTime(); 865 try { 866 testTimed(); 867 } finally { 868 testTakedown(); 869 } 870 return (System.nanoTime() - startTime) / 1000000; 871 } 872 873 abstract void testSetup() throws IOException; 874 875 /** 876 * Provides an extension point for tests that don't want a per row invocation. 877 */ 878 void testTimed() throws IOException { 879 int lastRow = this.startRow + this.perClientRunRows; 880 // Report on completion of 1/10th of total. 881 for (int i = this.startRow; i < lastRow; i++) { 882 testRow(i); 883 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 884 status.setStatus(generateStatus(this.startRow, i, lastRow)); 885 } 886 } 887 } 888 889 /** 890 * Test for individual row. 891 * @param i Row index. 892 */ 893 abstract void testRow(final int i) throws IOException; 894 } 895 896 static abstract class TableTest extends Test { 897 protected Table table; 898 899 public TableTest(Configuration conf, TestOptions options, Status status) { 900 super(conf, options, status); 901 } 902 903 @Override 904 void testSetup() throws IOException { 905 this.table = connection.getTable(tableName); 906 } 907 908 @Override 909 void testTakedown() throws IOException { 910 table.close(); 911 } 912 } 913 914 static abstract class BufferedMutatorTest extends Test { 915 protected BufferedMutator mutator; 916 protected boolean flushCommits; 917 918 public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { 919 super(conf, options, status); 920 this.flushCommits = options.isFlushCommits(); 921 } 922 923 @Override 924 void testSetup() throws IOException { 925 this.mutator = connection.getBufferedMutator(tableName); 926 } 927 928 @Override 929 void testTakedown() throws IOException { 930 if (flushCommits) { 931 this.mutator.flush(); 932 } 933 mutator.close(); 934 } 935 } 936 937 static class RandomSeekScanTest extends TableTest { 938 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { 939 super(conf, options, status); 940 } 941 942 @Override 943 void testRow(final int i) throws IOException { 944 Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); 945 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 946 scan.setFilter(new WhileMatchFilter(new PageFilter(120))); 947 ResultScanner s = this.table.getScanner(scan); 948 s.close(); 949 } 950 951 @Override 952 protected int getReportingPeriod() { 953 int period = this.perClientRunRows / 100; 954 return period == 0 ? this.perClientRunRows : period; 955 } 956 } 957 958 @SuppressWarnings("unused") 959 static abstract class RandomScanWithRangeTest extends TableTest { 960 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { 961 super(conf, options, status); 962 } 963 964 @Override 965 void testRow(final int i) throws IOException { 966 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 967 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); 968 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 969 ResultScanner s = this.table.getScanner(scan); 970 int count = 0; 971 for (Result rr = null; (rr = s.next()) != null;) { 972 count++; 973 } 974 975 if (i % 100 == 0) { 976 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 977 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 978 count)); 979 } 980 981 s.close(); 982 } 983 984 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 985 986 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 987 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; 988 int stop = start + maxRange; 989 return new Pair<>(format(start), format(stop)); 990 } 991 992 @Override 993 protected int getReportingPeriod() { 994 int period = this.perClientRunRows / 100; 995 return period == 0 ? this.perClientRunRows : period; 996 } 997 } 998 999 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1000 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) { 1001 super(conf, options, status); 1002 } 1003 1004 @Override 1005 protected Pair<byte[], byte[]> getStartAndStopRow() { 1006 return generateStartAndStopRows(10); 1007 } 1008 } 1009 1010 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1011 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) { 1012 super(conf, options, status); 1013 } 1014 1015 @Override 1016 protected Pair<byte[], byte[]> getStartAndStopRow() { 1017 return generateStartAndStopRows(100); 1018 } 1019 } 1020 1021 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1022 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) { 1023 super(conf, options, status); 1024 } 1025 1026 @Override 1027 protected Pair<byte[], byte[]> getStartAndStopRow() { 1028 return generateStartAndStopRows(1000); 1029 } 1030 } 1031 1032 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1033 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) { 1034 super(conf, options, status); 1035 } 1036 1037 @Override 1038 protected Pair<byte[], byte[]> getStartAndStopRow() { 1039 return generateStartAndStopRows(10000); 1040 } 1041 } 1042 1043 static class RandomReadTest extends TableTest { 1044 RandomReadTest(Configuration conf, TestOptions options, Status status) { 1045 super(conf, options, status); 1046 } 1047 1048 @Override 1049 void testRow(final int i) throws IOException { 1050 Get get = new Get(getRandomRow(this.rand, this.totalRows)); 1051 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1052 this.table.get(get); 1053 } 1054 1055 @Override 1056 protected int getReportingPeriod() { 1057 int period = this.perClientRunRows / 100; 1058 return period == 0 ? this.perClientRunRows : period; 1059 } 1060 } 1061 1062 static class RandomWriteTest extends BufferedMutatorTest { 1063 RandomWriteTest(Configuration conf, TestOptions options, Status status) { 1064 super(conf, options, status); 1065 } 1066 1067 @Override 1068 void testRow(final int i) throws IOException { 1069 byte[] row = getRandomRow(this.rand, this.totalRows); 1070 Put put = new Put(row); 1071 byte[] value = generateData(this.rand, ROW_LENGTH); 1072 if (useTags) { 1073 byte[] tag = generateData(this.rand, TAG_LENGTH); 1074 Tag[] tags = new Tag[noOfTags]; 1075 for (int n = 0; n < noOfTags; n++) { 1076 Tag t = new ArrayBackedTag((byte) n, tag); 1077 tags[n] = t; 1078 } 1079 KeyValue kv = 1080 new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags); 1081 put.add(kv); 1082 } else { 1083 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1084 } 1085 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1086 mutator.mutate(put); 1087 } 1088 } 1089 1090 static class ScanTest extends TableTest { 1091 private ResultScanner testScanner; 1092 1093 ScanTest(Configuration conf, TestOptions options, Status status) { 1094 super(conf, options, status); 1095 } 1096 1097 @Override 1098 void testTakedown() throws IOException { 1099 if (this.testScanner != null) { 1100 this.testScanner.close(); 1101 } 1102 super.testTakedown(); 1103 } 1104 1105 @Override 1106 void testRow(final int i) throws IOException { 1107 if (this.testScanner == null) { 1108 Scan scan = new Scan(format(this.startRow)); 1109 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1110 this.testScanner = table.getScanner(scan); 1111 } 1112 testScanner.next(); 1113 } 1114 } 1115 1116 static class SequentialReadTest extends TableTest { 1117 SequentialReadTest(Configuration conf, TestOptions options, Status status) { 1118 super(conf, options, status); 1119 } 1120 1121 @Override 1122 void testRow(final int i) throws IOException { 1123 Get get = new Get(format(i)); 1124 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1125 table.get(get); 1126 } 1127 } 1128 1129 static class SequentialWriteTest extends BufferedMutatorTest { 1130 SequentialWriteTest(Configuration conf, TestOptions options, Status status) { 1131 super(conf, options, status); 1132 } 1133 1134 @Override 1135 void testRow(final int i) throws IOException { 1136 byte[] row = format(i); 1137 Put put = new Put(row); 1138 byte[] value = generateData(this.rand, ROW_LENGTH); 1139 if (useTags) { 1140 byte[] tag = generateData(this.rand, TAG_LENGTH); 1141 Tag[] tags = new Tag[noOfTags]; 1142 for (int n = 0; n < noOfTags; n++) { 1143 Tag t = new ArrayBackedTag((byte) n, tag); 1144 tags[n] = t; 1145 } 1146 KeyValue kv = 1147 new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags); 1148 put.add(kv); 1149 } else { 1150 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1151 } 1152 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1153 mutator.mutate(put); 1154 } 1155 } 1156 1157 static class FilteredScanTest extends TableTest { 1158 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 1159 1160 FilteredScanTest(Configuration conf, TestOptions options, Status status) { 1161 super(conf, options, status); 1162 } 1163 1164 @Override 1165 void testRow(int i) throws IOException { 1166 byte[] value = generateValue(this.rand); 1167 Scan scan = constructScan(value); 1168 try (ResultScanner scanner = this.table.getScanner(scan)) { 1169 while (scanner.next() != null) { 1170 } 1171 } 1172 } 1173 1174 protected Scan constructScan(byte[] valuePrefix) { 1175 Filter filter = new SingleColumnValueFilter(FAMILY_NAME, QUALIFIER_NAME, 1176 CompareOperator.EQUAL, new BinaryComparator(valuePrefix)); 1177 Scan scan = new Scan(); 1178 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1179 scan.setFilter(filter); 1180 return scan; 1181 } 1182 } 1183 1184 /** 1185 * Format passed integer. 1186 * @param number the integer to format 1187 * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in 1188 * case number is negative). 1189 */ 1190 public static byte[] format(final int number) { 1191 byte[] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; 1192 int d = Math.abs(number); 1193 for (int i = b.length - 1; i >= 0; i--) { 1194 b[i] = (byte) ((d % 10) + '0'); 1195 d /= 10; 1196 } 1197 return b; 1198 } 1199 1200 public static byte[] generateData(final Random r, int length) { 1201 byte[] b = new byte[length]; 1202 int i; 1203 1204 for (i = 0; i < (length - 8); i += 8) { 1205 b[i] = (byte) (65 + r.nextInt(26)); 1206 b[i + 1] = b[i]; 1207 b[i + 2] = b[i]; 1208 b[i + 3] = b[i]; 1209 b[i + 4] = b[i]; 1210 b[i + 5] = b[i]; 1211 b[i + 6] = b[i]; 1212 b[i + 7] = b[i]; 1213 } 1214 1215 byte a = (byte) (65 + r.nextInt(26)); 1216 for (; i < length; i++) { 1217 b[i] = a; 1218 } 1219 return b; 1220 } 1221 1222 public static byte[] generateValue(final Random r) { 1223 byte[] b = new byte[ROW_LENGTH]; 1224 r.nextBytes(b); 1225 return b; 1226 } 1227 1228 static byte[] getRandomRow(final Random random, final int totalRows) { 1229 return format(random.nextInt(Integer.MAX_VALUE) % totalRows); 1230 } 1231 1232 long runOneClient(final Class<? extends Test> cmd, final int startRow, final int perClientRunRows, 1233 final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 1234 Connection connection, final Status status) throws IOException { 1235 status 1236 .setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); 1237 long totalElapsedTime; 1238 1239 TestOptions options = new TestOptions(startRow, perClientRunRows, totalRows, tableName, 1240 flushCommits, writeToWAL, useTags, noOfTags, connection); 1241 final Test t; 1242 try { 1243 Constructor<? extends Test> constructor = 1244 cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class); 1245 t = constructor.newInstance(this.conf, options, status); 1246 } catch (NoSuchMethodException e) { 1247 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 1248 + ". It does not provide a constructor as described by" 1249 + "the javadoc comment. Available constructors are: " 1250 + Arrays.toString(cmd.getConstructors())); 1251 } catch (Exception e) { 1252 throw new IllegalStateException("Failed to construct command class", e); 1253 } 1254 totalElapsedTime = t.test(); 1255 1256 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + startRow 1257 + " for " + perClientRunRows + " rows"); 1258 return totalElapsedTime; 1259 } 1260 1261 private void runNIsOne(final Class<? extends Test> cmd) { 1262 Status status = LOG::info; 1263 1264 RemoteAdmin admin; 1265 try { 1266 Client client = new Client(cluster); 1267 admin = new RemoteAdmin(client, getConf()); 1268 checkTable(admin); 1269 runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, this.useTags, 1270 this.noOfTags, this.connection, status); 1271 } catch (Exception e) { 1272 LOG.error("Failed", e); 1273 } 1274 } 1275 1276 private void runTest(final Class<? extends Test> cmd) 1277 throws IOException, InterruptedException, ClassNotFoundException { 1278 if (N == 1) { 1279 // If there is only one client and one HRegionServer, we assume nothing 1280 // has been set up at all. 1281 runNIsOne(cmd); 1282 } else { 1283 // Else, run 1284 runNIsMoreThanOne(cmd); 1285 } 1286 } 1287 1288 protected void printUsage() { 1289 printUsage(null); 1290 } 1291 1292 protected void printUsage(final String message) { 1293 if (message != null && message.length() > 0) { 1294 System.err.println(message); 1295 } 1296 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 1297 System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); 1298 System.err.println( 1299 " [--compress=TYPE] [--blockEncoding=TYPE] " + "[-D<property=value>]* <command> <nclients>"); 1300 System.err.println(); 1301 System.err.println("General Options:"); 1302 System.err.println( 1303 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 1304 System.err.println(" rows Rows each client runs. Default: One million"); 1305 System.err.println(); 1306 System.err.println("Table Creation / Write Tests:"); 1307 System.err.println(" table Alternate table name. Default: 'TestTable'"); 1308 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 1309 System.err.println( 1310 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 1311 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 1312 System.err.println(" presplit Create presplit table. Recommended for accurate perf " 1313 + "analysis (see guide). Default: disabled"); 1314 System.err.println( 1315 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default : false"); 1316 System.err.println(" numoftags Specify the no of tags that would be needed. " 1317 + "This works only if usetags is true."); 1318 System.err.println(); 1319 System.err.println("Read Tests:"); 1320 System.err.println(" inmemory Tries to keep the HFiles of the CF inmemory as far as " 1321 + "possible. Not guaranteed that reads are always served from inmemory. Default: false"); 1322 System.err.println(); 1323 System.err.println(" Note: -D properties will be applied to the conf used. "); 1324 System.err.println(" For example: "); 1325 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 1326 System.err.println(" -Dmapreduce.task.timeout=60000"); 1327 System.err.println(); 1328 System.err.println("Command:"); 1329 for (CmdDescriptor command : commands.values()) { 1330 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); 1331 } 1332 System.err.println(); 1333 System.err.println("Args:"); 1334 System.err.println( 1335 " nclients Integer. Required. Total number of " + "clients (and HRegionServers)"); 1336 System.err.println(" running: 1 <= value <= 500"); 1337 System.err.println("Examples:"); 1338 System.err.println(" To run a single evaluation client:"); 1339 System.err.println(" $ hbase " + this.getClass().getName() + " sequentialWrite 1"); 1340 } 1341 1342 private void getArgs(final int start, final String[] args) { 1343 if (start + 1 > args.length) { 1344 throw new IllegalArgumentException("must supply the number of clients"); 1345 } 1346 N = Integer.parseInt(args[start]); 1347 if (N < 1) { 1348 throw new IllegalArgumentException("Number of clients must be > 1"); 1349 } 1350 // Set total number of rows to write. 1351 R = R * N; 1352 } 1353 1354 @Override 1355 public int run(String[] args) throws Exception { 1356 // Process command-line args. TODO: Better cmd-line processing 1357 // (but hopefully something not as painful as cli options). 1358 int errCode = -1; 1359 if (args.length < 1) { 1360 printUsage(); 1361 return errCode; 1362 } 1363 1364 try { 1365 for (int i = 0; i < args.length; i++) { 1366 String cmd = args[i]; 1367 if (cmd.equals("-h") || cmd.startsWith("--h")) { 1368 printUsage(); 1369 errCode = 0; 1370 break; 1371 } 1372 1373 final String nmr = "--nomapred"; 1374 if (cmd.startsWith(nmr)) { 1375 nomapred = true; 1376 continue; 1377 } 1378 1379 final String rows = "--rows="; 1380 if (cmd.startsWith(rows)) { 1381 R = Integer.parseInt(cmd.substring(rows.length())); 1382 continue; 1383 } 1384 1385 final String table = "--table="; 1386 if (cmd.startsWith(table)) { 1387 this.tableName = TableName.valueOf(cmd.substring(table.length())); 1388 continue; 1389 } 1390 1391 final String compress = "--compress="; 1392 if (cmd.startsWith(compress)) { 1393 this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 1394 continue; 1395 } 1396 1397 final String blockEncoding = "--blockEncoding="; 1398 if (cmd.startsWith(blockEncoding)) { 1399 this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 1400 continue; 1401 } 1402 1403 final String flushCommits = "--flushCommits="; 1404 if (cmd.startsWith(flushCommits)) { 1405 this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 1406 continue; 1407 } 1408 1409 final String writeToWAL = "--writeToWAL="; 1410 if (cmd.startsWith(writeToWAL)) { 1411 this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 1412 continue; 1413 } 1414 1415 final String presplit = "--presplit="; 1416 if (cmd.startsWith(presplit)) { 1417 this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 1418 continue; 1419 } 1420 1421 final String inMemory = "--inmemory="; 1422 if (cmd.startsWith(inMemory)) { 1423 this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 1424 continue; 1425 } 1426 1427 this.connection = ConnectionFactory.createConnection(getConf()); 1428 1429 final String useTags = "--usetags="; 1430 if (cmd.startsWith(useTags)) { 1431 this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 1432 continue; 1433 } 1434 1435 final String noOfTags = "--nooftags="; 1436 if (cmd.startsWith(noOfTags)) { 1437 this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 1438 continue; 1439 } 1440 1441 final String host = "--host="; 1442 if (cmd.startsWith(host)) { 1443 cluster.add(cmd.substring(host.length())); 1444 continue; 1445 } 1446 1447 Class<? extends Test> cmdClass = determineCommandClass(cmd); 1448 if (cmdClass != null) { 1449 getArgs(i + 1, args); 1450 if (cluster.isEmpty()) { 1451 String s = conf.get("stargate.hostname", "localhost"); 1452 if (s.contains(":")) { 1453 cluster.add(s); 1454 } else { 1455 cluster.add(s, conf.getInt("stargate.port", 8080)); 1456 } 1457 } 1458 runTest(cmdClass); 1459 errCode = 0; 1460 break; 1461 } 1462 1463 printUsage(); 1464 break; 1465 } 1466 } catch (Exception e) { 1467 LOG.error("Failed", e); 1468 } 1469 1470 return errCode; 1471 } 1472 1473 private Class<? extends Test> determineCommandClass(String cmd) { 1474 CmdDescriptor descriptor = commands.get(cmd); 1475 return descriptor != null ? descriptor.getCmdClass() : null; 1476 } 1477 1478 public static void main(final String[] args) throws Exception { 1479 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 1480 System.exit(res); 1481 } 1482}