001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.rest; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.lang.reflect.Constructor; 026import java.text.SimpleDateFormat; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Date; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.TreeMap; 034import java.util.regex.Matcher; 035import java.util.regex.Pattern; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.conf.Configured; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ArrayBackedTag; 044import org.apache.hadoop.hbase.CompareOperator; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HColumnDescriptor; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.Tag; 052import org.apache.hadoop.hbase.client.BufferedMutator; 053import org.apache.hadoop.hbase.client.Connection; 054import org.apache.hadoop.hbase.client.ConnectionFactory; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Get; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.ResultScanner; 060import org.apache.hadoop.hbase.client.Scan; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.filter.BinaryComparator; 063import org.apache.hadoop.hbase.filter.Filter; 064import org.apache.hadoop.hbase.filter.PageFilter; 065import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 066import org.apache.hadoop.hbase.filter.WhileMatchFilter; 067import org.apache.hadoop.hbase.io.compress.Compression; 068import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 069import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 070import org.apache.hadoop.hbase.rest.client.Client; 071import org.apache.hadoop.hbase.rest.client.Cluster; 072import org.apache.hadoop.hbase.rest.client.RemoteAdmin; 073import org.apache.hadoop.hbase.util.ByteArrayHashKey; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.Hash; 076import org.apache.hadoop.hbase.util.MurmurHash; 077import org.apache.hadoop.hbase.util.Pair; 078 079import org.apache.hadoop.io.LongWritable; 080import org.apache.hadoop.io.NullWritable; 081import org.apache.hadoop.io.Text; 082import org.apache.hadoop.io.Writable; 083import org.apache.hadoop.mapreduce.InputSplit; 084import org.apache.hadoop.mapreduce.Job; 085import org.apache.hadoop.mapreduce.JobContext; 086import org.apache.hadoop.mapreduce.Mapper; 087import org.apache.hadoop.mapreduce.RecordReader; 088import org.apache.hadoop.mapreduce.TaskAttemptContext; 089import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 090import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 091import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 092import org.apache.hadoop.util.LineReader; 093import org.apache.hadoop.util.Tool; 094import org.apache.hadoop.util.ToolRunner; 095 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098 099/** 100 * Script used evaluating Stargate performance and scalability. Runs a SG 101 * client that steps through one of a set of hardcoded tests or 'experiments' 102 * (e.g. a random reads test, a random writes test, etc.). Pass on the 103 * command-line which test to run and how many clients are participating in 104 * this experiment. Run <code>java PerformanceEvaluation --help</code> to 105 * obtain usage. 106 * 107 * <p>This class sets up and runs the evaluation programs described in 108 * Section 7, <i>Performance Evaluation</i>, of the <a 109 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 110 * paper, pages 8-10. 111 * 112 * <p>If number of clients > 1, we start up a MapReduce job. Each map task 113 * runs an individual client. Each client does about 1GB of data. 114 */ 115public class PerformanceEvaluation extends Configured implements Tool { 116 protected static final Logger LOG = 117 LoggerFactory.getLogger(PerformanceEvaluation.class); 118 119 private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; 120 private static final int ROW_LENGTH = 1000; 121 private static final int TAG_LENGTH = 256; 122 private static final int ONE_GB = 1024 * 1024 * 1000; 123 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; 124 125 public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); 126 public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); 127 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); 128 private TableName tableName = TABLE_NAME; 129 130 protected HTableDescriptor TABLE_DESCRIPTOR; 131 protected Map<String, CmdDescriptor> commands = new TreeMap<>(); 132 protected static Cluster cluster = new Cluster(); 133 134 volatile Configuration conf; 135 private boolean nomapred = false; 136 private int N = 1; 137 private int R = ROWS_PER_GB; 138 private Compression.Algorithm compression = Compression.Algorithm.NONE; 139 private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 140 private boolean flushCommits = true; 141 private boolean writeToWAL = true; 142 private boolean inMemoryCF = false; 143 private int presplitRegions = 0; 144 private boolean useTags = false; 145 private int noOfTags = 1; 146 private Connection connection; 147 148 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 149 150 /** 151 * Regex to parse lines in input file passed to mapreduce task. 152 */ 153 public static final Pattern LINE_PATTERN = 154 Pattern.compile("tableName=(\\w+),\\s+" + 155 "startRow=(\\d+),\\s+" + 156 "perClientRunRows=(\\d+),\\s+" + 157 "totalRows=(\\d+),\\s+" + 158 "clients=(\\d+),\\s+" + 159 "flushCommits=(\\w+),\\s+" + 160 "writeToWAL=(\\w+),\\s+" + 161 "useTags=(\\w+),\\s+" + 162 "noOfTags=(\\d+)"); 163 164 /** 165 * Enum for map metrics. Keep it out here rather than inside in the Map 166 * inner-class so we can find associated properties. 167 */ 168 protected enum Counter { 169 /** elapsed time */ 170 ELAPSED_TIME, 171 /** number of rows */ 172 ROWS 173 } 174 175 /** 176 * Constructor 177 * @param c Configuration object 178 */ 179 public PerformanceEvaluation(final Configuration c) { 180 this.conf = c; 181 182 addCommandDescriptor(RandomReadTest.class, "randomRead", 183 "Run random read test"); 184 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", 185 "Run random seek and scan 100 test"); 186 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 187 "Run random seek scan with both start and stop row (max 10 rows)"); 188 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 189 "Run random seek scan with both start and stop row (max 100 rows)"); 190 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 191 "Run random seek scan with both start and stop row (max 1000 rows)"); 192 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 193 "Run random seek scan with both start and stop row (max 10000 rows)"); 194 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 195 "Run random write test"); 196 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 197 "Run sequential read test"); 198 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 199 "Run sequential write test"); 200 addCommandDescriptor(ScanTest.class, "scan", 201 "Run scan test (read every row)"); 202 addCommandDescriptor(FilteredScanTest.class, "filterScan", 203 "Run scan test using a filter to find a specific row based " + 204 "on it's value (make sure to use --rows=20)"); 205 } 206 207 protected void addCommandDescriptor(Class<? extends Test> cmdClass, 208 String name, String description) { 209 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 210 commands.put(name, cmdDescriptor); 211 } 212 213 /** 214 * Implementations can have their status set. 215 */ 216 interface Status { 217 /** 218 * Sets status 219 * @param msg status message 220 * @throws IOException if setting the status fails 221 */ 222 void setStatus(final String msg) throws IOException; 223 } 224 225 /** 226 * This class works as the InputSplit of Performance Evaluation 227 * MapReduce InputFormat, and the Record Value of RecordReader. 228 * Each map task will only read one record from a PeInputSplit, 229 * the record value is the PeInputSplit itself. 230 */ 231 public static class PeInputSplit extends InputSplit implements Writable { 232 private TableName tableName; 233 private int startRow; 234 private int rows; 235 private int totalRows; 236 private int clients; 237 private boolean flushCommits; 238 private boolean writeToWAL; 239 private boolean useTags; 240 private int noOfTags; 241 242 public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, 243 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { 244 this.tableName = tableName; 245 this.startRow = startRow; 246 this.rows = rows; 247 this.totalRows = totalRows; 248 this.clients = clients; 249 this.flushCommits = flushCommits; 250 this.writeToWAL = writeToWAL; 251 this.useTags = useTags; 252 this.noOfTags = noOfTags; 253 } 254 255 @Override 256 public void readFields(DataInput in) throws IOException { 257 int tableNameLen = in.readInt(); 258 byte[] name = new byte[tableNameLen]; 259 in.readFully(name); 260 this.tableName = TableName.valueOf(name); 261 this.startRow = in.readInt(); 262 this.rows = in.readInt(); 263 this.totalRows = in.readInt(); 264 this.clients = in.readInt(); 265 this.flushCommits = in.readBoolean(); 266 this.writeToWAL = in.readBoolean(); 267 this.useTags = in.readBoolean(); 268 this.noOfTags = in.readInt(); 269 } 270 271 @Override 272 public void write(DataOutput out) throws IOException { 273 byte[] name = this.tableName.toBytes(); 274 out.writeInt(name.length); 275 out.write(name); 276 out.writeInt(startRow); 277 out.writeInt(rows); 278 out.writeInt(totalRows); 279 out.writeInt(clients); 280 out.writeBoolean(flushCommits); 281 out.writeBoolean(writeToWAL); 282 out.writeBoolean(useTags); 283 out.writeInt(noOfTags); 284 } 285 286 @Override 287 public long getLength() { 288 return 0; 289 } 290 291 @Override 292 public String[] getLocations() { 293 return new String[0]; 294 } 295 296 public int getStartRow() { 297 return startRow; 298 } 299 300 public TableName getTableName() { 301 return tableName; 302 } 303 304 public int getRows() { 305 return rows; 306 } 307 308 public int getTotalRows() { 309 return totalRows; 310 } 311 312 public boolean isFlushCommits() { 313 return flushCommits; 314 } 315 316 public boolean isWriteToWAL() { 317 return writeToWAL; 318 } 319 320 public boolean isUseTags() { 321 return useTags; 322 } 323 324 public int getNoOfTags() { 325 return noOfTags; 326 } 327 } 328 329 /** 330 * InputFormat of Performance Evaluation MapReduce job. 331 * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). 332 */ 333 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> { 334 @Override 335 public List<InputSplit> getSplits(JobContext job) throws IOException { 336 // generate splits 337 List<InputSplit> splitList = new ArrayList<>(); 338 339 for (FileStatus file : listStatus(job)) { 340 if (file.isDirectory()) { 341 continue; 342 } 343 Path path = file.getPath(); 344 FileSystem fs = path.getFileSystem(job.getConfiguration()); 345 FSDataInputStream fileIn = fs.open(path); 346 LineReader in = new LineReader(fileIn, job.getConfiguration()); 347 int lineLen; 348 while (true) { 349 Text lineText = new Text(); 350 lineLen = in.readLine(lineText); 351 if (lineLen <= 0) { 352 break; 353 } 354 Matcher m = LINE_PATTERN.matcher(lineText.toString()); 355 if ((m != null) && m.matches()) { 356 TableName tableName = TableName.valueOf(m.group(1)); 357 int startRow = Integer.parseInt(m.group(2)); 358 int rows = Integer.parseInt(m.group(3)); 359 int totalRows = Integer.parseInt(m.group(4)); 360 int clients = Integer.parseInt(m.group(5)); 361 boolean flushCommits = Boolean.parseBoolean(m.group(6)); 362 boolean writeToWAL = Boolean.parseBoolean(m.group(7)); 363 boolean useTags = Boolean.parseBoolean(m.group(8)); 364 int noOfTags = Integer.parseInt(m.group(9)); 365 366 LOG.debug("tableName=" + tableName + 367 " split["+ splitList.size() + "] " + 368 " startRow=" + startRow + 369 " rows=" + rows + 370 " totalRows=" + totalRows + 371 " clients=" + clients + 372 " flushCommits=" + flushCommits + 373 " writeToWAL=" + writeToWAL + 374 " useTags=" + useTags + 375 " noOfTags=" + noOfTags); 376 377 PeInputSplit newSplit = 378 new PeInputSplit(tableName, startRow, rows, totalRows, clients, 379 flushCommits, writeToWAL, useTags, noOfTags); 380 splitList.add(newSplit); 381 } 382 } 383 in.close(); 384 } 385 386 LOG.info("Total # of splits: " + splitList.size()); 387 return splitList; 388 } 389 390 @Override 391 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split, 392 TaskAttemptContext context) { 393 return new PeRecordReader(); 394 } 395 396 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> { 397 private boolean readOver = false; 398 private PeInputSplit split = null; 399 private NullWritable key = null; 400 private PeInputSplit value = null; 401 402 @Override 403 public void initialize(InputSplit split, TaskAttemptContext context) { 404 this.readOver = false; 405 this.split = (PeInputSplit)split; 406 } 407 408 @Override 409 public boolean nextKeyValue() { 410 if (readOver) { 411 return false; 412 } 413 414 key = NullWritable.get(); 415 value = split; 416 417 readOver = true; 418 return true; 419 } 420 421 @Override 422 public NullWritable getCurrentKey() { 423 return key; 424 } 425 426 @Override 427 public PeInputSplit getCurrentValue() { 428 return value; 429 } 430 431 @Override 432 public float getProgress() { 433 if (readOver) { 434 return 1.0f; 435 } else { 436 return 0.0f; 437 } 438 } 439 440 @Override 441 public void close() { 442 // do nothing 443 } 444 } 445 } 446 447 /** 448 * MapReduce job that runs a performance evaluation client in each map task. 449 */ 450 public static class EvaluationMapTask 451 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> { 452 453 /** configuration parameter name that contains the command */ 454 public final static String CMD_KEY = "EvaluationMapTask.command"; 455 /** configuration parameter name that contains the PE impl */ 456 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 457 458 private Class<? extends Test> cmd; 459 private PerformanceEvaluation pe; 460 461 @Override 462 protected void setup(Context context) { 463 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 464 465 // this is required so that extensions of PE are instantiated within the 466 // map reduce task... 467 Class<? extends PerformanceEvaluation> peClass = 468 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 469 try { 470 this.pe = peClass.getConstructor(Configuration.class) 471 .newInstance(context.getConfiguration()); 472 } catch (Exception e) { 473 throw new IllegalStateException("Could not instantiate PE instance", e); 474 } 475 } 476 477 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 478 Class<? extends Type> clazz; 479 try { 480 clazz = Class.forName(className).asSubclass(type); 481 } catch (ClassNotFoundException e) { 482 throw new IllegalStateException("Could not find class for name: " + className, e); 483 } 484 return clazz; 485 } 486 487 @Override 488 protected void map(NullWritable key, PeInputSplit value, final Context context) 489 throws IOException, InterruptedException { 490 Status status = context::setStatus; 491 492 // Evaluation task 493 pe.tableName = value.getTableName(); 494 long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), 495 value.getRows(), value.getTotalRows(), 496 value.isFlushCommits(), value.isWriteToWAL(), 497 value.isUseTags(), value.getNoOfTags(), 498 ConnectionFactory.createConnection(context.getConfiguration()), status); 499 // Collect how much time the thing took. Report as map output and 500 // to the ELAPSED_TIME counter. 501 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); 502 context.getCounter(Counter.ROWS).increment(value.rows); 503 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); 504 context.progress(); 505 } 506 } 507 508 /** 509 * If table does not already exist, create. 510 * @param admin Client to use checking. 511 * @return True if we created the table. 512 * @throws IOException if an operation on the table fails 513 */ 514 private boolean checkTable(RemoteAdmin admin) throws IOException { 515 HTableDescriptor tableDescriptor = getTableDescriptor(); 516 if (this.presplitRegions > 0) { 517 // presplit requested 518 if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) { 519 admin.deleteTable(tableDescriptor.getTableName().getName()); 520 } 521 522 byte[][] splits = getSplits(); 523 for (int i=0; i < splits.length; i++) { 524 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 525 } 526 admin.createTable(tableDescriptor); 527 LOG.info("Table created with " + this.presplitRegions + " splits"); 528 } else { 529 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 530 if (!tableExists) { 531 admin.createTable(tableDescriptor); 532 LOG.info("Table " + tableDescriptor + " created"); 533 } 534 } 535 536 return admin.isTableAvailable(tableDescriptor.getTableName().getName()); 537 } 538 539 protected HTableDescriptor getTableDescriptor() { 540 if (TABLE_DESCRIPTOR == null) { 541 TABLE_DESCRIPTOR = new HTableDescriptor(tableName); 542 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 543 family.setDataBlockEncoding(blockEncoding); 544 family.setCompressionType(compression); 545 if (inMemoryCF) { 546 family.setInMemory(true); 547 } 548 TABLE_DESCRIPTOR.addFamily(family); 549 } 550 return TABLE_DESCRIPTOR; 551 } 552 553 /** 554 * Generates splits based on total number of rows and specified split regions 555 * 556 * @return splits : array of byte [] 557 */ 558 protected byte[][] getSplits() { 559 if (this.presplitRegions == 0) { 560 return new byte[0][]; 561 } 562 563 int numSplitPoints = presplitRegions - 1; 564 byte[][] splits = new byte[numSplitPoints][]; 565 int jump = this.R / this.presplitRegions; 566 for (int i = 0; i < numSplitPoints; i++) { 567 int rowkey = jump * (1 + i); 568 splits[i] = format(rowkey); 569 } 570 return splits; 571 } 572 573 /** 574 * We're to run multiple clients concurrently. Setup a mapreduce job. Run 575 * one map per client. Then run a single reduce to sum the elapsed times. 576 * @param cmd Command to run. 577 */ 578 private void runNIsMoreThanOne(final Class<? extends Test> cmd) 579 throws IOException, InterruptedException, ClassNotFoundException { 580 RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf()); 581 checkTable(remoteAdmin); 582 if (nomapred) { 583 doMultipleClients(cmd); 584 } else { 585 doMapReduce(cmd); 586 } 587 } 588 589 /** 590 * Run all clients in this vm each to its own thread. 591 * @param cmd Command to run 592 * @throws IOException if creating a connection fails 593 */ 594 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException { 595 final List<Thread> threads = new ArrayList<>(this.N); 596 final long[] timings = new long[this.N]; 597 final int perClientRows = R/N; 598 final TableName tableName = this.tableName; 599 final DataBlockEncoding encoding = this.blockEncoding; 600 final boolean flushCommits = this.flushCommits; 601 final Compression.Algorithm compression = this.compression; 602 final boolean writeToWal = this.writeToWAL; 603 final int preSplitRegions = this.presplitRegions; 604 final boolean useTags = this.useTags; 605 final int numTags = this.noOfTags; 606 final Connection connection = ConnectionFactory.createConnection(getConf()); 607 for (int i = 0; i < this.N; i++) { 608 final int index = i; 609 Thread t = new Thread("TestClient-" + i) { 610 @Override 611 public void run() { 612 super.run(); 613 PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); 614 pe.tableName = tableName; 615 pe.blockEncoding = encoding; 616 pe.flushCommits = flushCommits; 617 pe.compression = compression; 618 pe.writeToWAL = writeToWal; 619 pe.presplitRegions = preSplitRegions; 620 pe.N = N; 621 pe.connection = connection; 622 pe.useTags = useTags; 623 pe.noOfTags = numTags; 624 try { 625 long elapsedTime = pe.runOneClient(cmd, index * perClientRows, 626 perClientRows, R, 627 flushCommits, writeToWAL, useTags, noOfTags, connection, 628 msg -> LOG.info("client-" + getName() + " " + msg)); 629 timings[index] = elapsedTime; 630 LOG.info("Finished " + getName() + " in " + elapsedTime + 631 "ms writing " + perClientRows + " rows"); 632 } catch (IOException e) { 633 throw new RuntimeException(e); 634 } 635 } 636 }; 637 threads.add(t); 638 } 639 for (Thread t : threads) { 640 t.start(); 641 } 642 for (Thread t : threads) { 643 while (t.isAlive()) { 644 try { 645 t.join(); 646 } catch (InterruptedException e) { 647 LOG.debug("Interrupted, continuing" + e.toString()); 648 } 649 } 650 } 651 final String test = cmd.getSimpleName(); 652 LOG.info("[" + test + "] Summary of timings (ms): " 653 + Arrays.toString(timings)); 654 Arrays.sort(timings); 655 long total = 0; 656 for (int i = 0; i < this.N; i++) { 657 total += timings[i]; 658 } 659 LOG.info("[" + test + "]" 660 + "\tMin: " + timings[0] + "ms" 661 + "\tMax: " + timings[this.N - 1] + "ms" 662 + "\tAvg: " + (total / this.N) + "ms"); 663 } 664 665 /** 666 * Run a mapreduce job. Run as many maps as asked-for clients. 667 * Before we start up the job, write out an input file with instruction 668 * per client regards which row they are to start on. 669 * @param cmd Command to run. 670 */ 671 private void doMapReduce(final Class<? extends Test> cmd) 672 throws IOException, InterruptedException, ClassNotFoundException { 673 Configuration conf = getConf(); 674 Path inputDir = writeInputFile(conf); 675 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 676 conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); 677 Job job = Job.getInstance(conf); 678 job.setJarByClass(PerformanceEvaluation.class); 679 job.setJobName("HBase Performance Evaluation"); 680 681 job.setInputFormatClass(PeInputFormat.class); 682 PeInputFormat.setInputPaths(job, inputDir); 683 684 job.setOutputKeyClass(LongWritable.class); 685 job.setOutputValueClass(LongWritable.class); 686 687 job.setMapperClass(EvaluationMapTask.class); 688 job.setReducerClass(LongSumReducer.class); 689 job.setNumReduceTasks(1); 690 691 job.setOutputFormatClass(TextOutputFormat.class); 692 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 693 TableMapReduceUtil.addDependencyJars(job); 694 TableMapReduceUtil.initCredentials(job); 695 job.waitForCompletion(true); 696 } 697 698 /** 699 * Write input file of offsets-per-client for the mapreduce job. 700 * @param c Configuration 701 * @return Directory that contains file written. 702 * @throws IOException if creating the directory or the file fails 703 */ 704 private Path writeInputFile(final Configuration c) throws IOException { 705 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 706 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); 707 Path inputDir = new Path(jobdir, "inputs"); 708 709 FileSystem fs = FileSystem.get(c); 710 fs.mkdirs(inputDir); 711 Path inputFile = new Path(inputDir, "input.txt"); 712 // Make input random. 713 try (PrintStream out = new PrintStream(fs.create(inputFile))) { 714 Map<Integer, String> m = new TreeMap<>(); 715 Hash h = MurmurHash.getInstance(); 716 int perClientRows = (this.R / this.N); 717 for (int i = 0; i < 10; i++) { 718 for (int j = 0; j < N; j++) { 719 StringBuilder s = new StringBuilder(); 720 s.append("tableName=").append(tableName); 721 s.append(", startRow=").append((j * perClientRows) + (i * (perClientRows / 10))); 722 s.append(", perClientRunRows=").append(perClientRows / 10); 723 s.append(", totalRows=").append(R); 724 s.append(", clients=").append(N); 725 s.append(", flushCommits=").append(flushCommits); 726 s.append(", writeToWAL=").append(writeToWAL); 727 s.append(", useTags=").append(useTags); 728 s.append(", noOfTags=").append(noOfTags); 729 730 byte[] b = Bytes.toBytes(s.toString()); 731 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 732 m.put(hash, s.toString()); 733 } 734 } 735 for (Map.Entry<Integer, String> e : m.entrySet()) { 736 out.println(e.getValue()); 737 } 738 } 739 return inputDir; 740 } 741 742 /** 743 * Describes a command. 744 */ 745 static class CmdDescriptor { 746 private Class<? extends Test> cmdClass; 747 private String name; 748 private String description; 749 750 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) { 751 this.cmdClass = cmdClass; 752 this.name = name; 753 this.description = description; 754 } 755 756 public Class<? extends Test> getCmdClass() { 757 return cmdClass; 758 } 759 760 public String getName() { 761 return name; 762 } 763 764 public String getDescription() { 765 return description; 766 } 767 } 768 769 /** 770 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation} tests 771 * This makes the reflection logic a little easier to understand... 772 */ 773 static class TestOptions { 774 private int startRow; 775 private int perClientRunRows; 776 private int totalRows; 777 private TableName tableName; 778 private boolean flushCommits; 779 private boolean writeToWAL; 780 private boolean useTags; 781 private int noOfTags; 782 private Connection connection; 783 784 TestOptions(int startRow, int perClientRunRows, int totalRows, TableName tableName, 785 boolean flushCommits, boolean writeToWAL, boolean useTags, 786 int noOfTags, Connection connection) { 787 this.startRow = startRow; 788 this.perClientRunRows = perClientRunRows; 789 this.totalRows = totalRows; 790 this.tableName = tableName; 791 this.flushCommits = flushCommits; 792 this.writeToWAL = writeToWAL; 793 this.useTags = useTags; 794 this.noOfTags = noOfTags; 795 this.connection = connection; 796 } 797 798 public int getStartRow() { 799 return startRow; 800 } 801 802 public int getPerClientRunRows() { 803 return perClientRunRows; 804 } 805 806 public int getTotalRows() { 807 return totalRows; 808 } 809 810 public TableName getTableName() { 811 return tableName; 812 } 813 814 public boolean isFlushCommits() { 815 return flushCommits; 816 } 817 818 public boolean isWriteToWAL() { 819 return writeToWAL; 820 } 821 822 public Connection getConnection() { 823 return connection; 824 } 825 826 public boolean isUseTags() { 827 return this.useTags; 828 } 829 830 public int getNumTags() { 831 return this.noOfTags; 832 } 833 } 834 835 /* 836 * A test. 837 * Subclass to particularize what happens per row. 838 */ 839 static abstract class Test { 840 // Below is make it so when Tests are all running in the one 841 // jvm, that they each have a differently seeded Random. 842 private static final Random randomSeed = 843 new Random(System.currentTimeMillis()); 844 private static long nextRandomSeed() { 845 return randomSeed.nextLong(); 846 } 847 protected final Random rand = new Random(nextRandomSeed()); 848 849 protected final int startRow; 850 protected final int perClientRunRows; 851 protected final int totalRows; 852 private final Status status; 853 protected TableName tableName; 854 protected volatile Configuration conf; 855 protected boolean writeToWAL; 856 protected boolean useTags; 857 protected int noOfTags; 858 protected Connection connection; 859 860 /** 861 * Note that all subclasses of this class must provide a public contructor 862 * that has the exact same list of arguments. 863 */ 864 Test(final Configuration conf, final TestOptions options, final Status status) { 865 super(); 866 this.startRow = options.getStartRow(); 867 this.perClientRunRows = options.getPerClientRunRows(); 868 this.totalRows = options.getTotalRows(); 869 this.status = status; 870 this.tableName = options.getTableName(); 871 this.conf = conf; 872 this.writeToWAL = options.isWriteToWAL(); 873 this.useTags = options.isUseTags(); 874 this.noOfTags = options.getNumTags(); 875 this.connection = options.getConnection(); 876 } 877 878 protected String generateStatus(final int sr, final int i, final int lr) { 879 return sr + "/" + i + "/" + lr; 880 } 881 882 protected int getReportingPeriod() { 883 int period = this.perClientRunRows / 10; 884 return period == 0? this.perClientRunRows: period; 885 } 886 887 abstract void testTakedown() throws IOException; 888 889 /** 890 * Run test 891 * @return Elapsed time. 892 * @throws IOException if something in the test fails 893 */ 894 long test() throws IOException { 895 testSetup(); 896 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 897 final long startTime = System.nanoTime(); 898 try { 899 testTimed(); 900 } finally { 901 testTakedown(); 902 } 903 return (System.nanoTime() - startTime) / 1000000; 904 } 905 906 abstract void testSetup() throws IOException; 907 908 /** 909 * Provides an extension point for tests that don't want a per row invocation. 910 */ 911 void testTimed() throws IOException { 912 int lastRow = this.startRow + this.perClientRunRows; 913 // Report on completion of 1/10th of total. 914 for (int i = this.startRow; i < lastRow; i++) { 915 testRow(i); 916 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 917 status.setStatus(generateStatus(this.startRow, i, lastRow)); 918 } 919 } 920 } 921 922 /** 923 * Test for individual row. 924 * @param i Row index. 925 */ 926 abstract void testRow(final int i) throws IOException; 927 } 928 929 static abstract class TableTest extends Test { 930 protected Table table; 931 932 public TableTest(Configuration conf, TestOptions options, Status status) { 933 super(conf, options, status); 934 } 935 936 @Override 937 void testSetup() throws IOException { 938 this.table = connection.getTable(tableName); 939 } 940 941 @Override 942 void testTakedown() throws IOException { 943 table.close(); 944 } 945 } 946 947 static abstract class BufferedMutatorTest extends Test { 948 protected BufferedMutator mutator; 949 protected boolean flushCommits; 950 951 public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { 952 super(conf, options, status); 953 this.flushCommits = options.isFlushCommits(); 954 } 955 956 @Override 957 void testSetup() throws IOException { 958 this.mutator = connection.getBufferedMutator(tableName); 959 } 960 961 @Override 962 void testTakedown() throws IOException { 963 if (flushCommits) { 964 this.mutator.flush(); 965 } 966 mutator.close(); 967 } 968 } 969 970 static class RandomSeekScanTest extends TableTest { 971 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { 972 super(conf, options, status); 973 } 974 975 @Override 976 void testRow(final int i) throws IOException { 977 Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); 978 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 979 scan.setFilter(new WhileMatchFilter(new PageFilter(120))); 980 ResultScanner s = this.table.getScanner(scan); 981 s.close(); 982 } 983 984 @Override 985 protected int getReportingPeriod() { 986 int period = this.perClientRunRows / 100; 987 return period == 0? this.perClientRunRows: period; 988 } 989 } 990 991 @SuppressWarnings("unused") 992 static abstract class RandomScanWithRangeTest extends TableTest { 993 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { 994 super(conf, options, status); 995 } 996 997 @Override 998 void testRow(final int i) throws IOException { 999 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1000 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); 1001 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1002 ResultScanner s = this.table.getScanner(scan); 1003 int count = 0; 1004 for (Result rr = null; (rr = s.next()) != null;) { 1005 count++; 1006 } 1007 1008 if (i % 100 == 0) { 1009 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1010 Bytes.toString(startAndStopRow.getFirst()), 1011 Bytes.toString(startAndStopRow.getSecond()), count)); 1012 } 1013 1014 s.close(); 1015 } 1016 1017 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1018 1019 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1020 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; 1021 int stop = start + maxRange; 1022 return new Pair<>(format(start), format(stop)); 1023 } 1024 1025 @Override 1026 protected int getReportingPeriod() { 1027 int period = this.perClientRunRows / 100; 1028 return period == 0? this.perClientRunRows: period; 1029 } 1030 } 1031 1032 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1033 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) { 1034 super(conf, options, status); 1035 } 1036 1037 @Override 1038 protected Pair<byte[], byte[]> getStartAndStopRow() { 1039 return generateStartAndStopRows(10); 1040 } 1041 } 1042 1043 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1044 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) { 1045 super(conf, options, status); 1046 } 1047 1048 @Override 1049 protected Pair<byte[], byte[]> getStartAndStopRow() { 1050 return generateStartAndStopRows(100); 1051 } 1052 } 1053 1054 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1055 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) { 1056 super(conf, options, status); 1057 } 1058 1059 @Override 1060 protected Pair<byte[], byte[]> getStartAndStopRow() { 1061 return generateStartAndStopRows(1000); 1062 } 1063 } 1064 1065 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1066 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) { 1067 super(conf, options, status); 1068 } 1069 1070 @Override 1071 protected Pair<byte[], byte[]> getStartAndStopRow() { 1072 return generateStartAndStopRows(10000); 1073 } 1074 } 1075 1076 static class RandomReadTest extends TableTest { 1077 RandomReadTest(Configuration conf, TestOptions options, Status status) { 1078 super(conf, options, status); 1079 } 1080 1081 @Override 1082 void testRow(final int i) throws IOException { 1083 Get get = new Get(getRandomRow(this.rand, this.totalRows)); 1084 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1085 this.table.get(get); 1086 } 1087 1088 @Override 1089 protected int getReportingPeriod() { 1090 int period = this.perClientRunRows / 100; 1091 return period == 0? this.perClientRunRows: period; 1092 } 1093 } 1094 1095 static class RandomWriteTest extends BufferedMutatorTest { 1096 RandomWriteTest(Configuration conf, TestOptions options, Status status) { 1097 super(conf, options, status); 1098 } 1099 1100 @Override 1101 void testRow(final int i) throws IOException { 1102 byte[] row = getRandomRow(this.rand, this.totalRows); 1103 Put put = new Put(row); 1104 byte[] value = generateData(this.rand, ROW_LENGTH); 1105 if (useTags) { 1106 byte[] tag = generateData(this.rand, TAG_LENGTH); 1107 Tag[] tags = new Tag[noOfTags]; 1108 for (int n = 0; n < noOfTags; n++) { 1109 Tag t = new ArrayBackedTag((byte) n, tag); 1110 tags[n] = t; 1111 } 1112 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1113 value, tags); 1114 put.add(kv); 1115 } else { 1116 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1117 } 1118 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1119 mutator.mutate(put); 1120 } 1121 } 1122 1123 static class ScanTest extends TableTest { 1124 private ResultScanner testScanner; 1125 1126 ScanTest(Configuration conf, TestOptions options, Status status) { 1127 super(conf, options, status); 1128 } 1129 1130 @Override 1131 void testTakedown() throws IOException { 1132 if (this.testScanner != null) { 1133 this.testScanner.close(); 1134 } 1135 super.testTakedown(); 1136 } 1137 1138 @Override 1139 void testRow(final int i) throws IOException { 1140 if (this.testScanner == null) { 1141 Scan scan = new Scan(format(this.startRow)); 1142 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1143 this.testScanner = table.getScanner(scan); 1144 } 1145 testScanner.next(); 1146 } 1147 } 1148 1149 static class SequentialReadTest extends TableTest { 1150 SequentialReadTest(Configuration conf, TestOptions options, Status status) { 1151 super(conf, options, status); 1152 } 1153 1154 @Override 1155 void testRow(final int i) throws IOException { 1156 Get get = new Get(format(i)); 1157 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1158 table.get(get); 1159 } 1160 } 1161 1162 static class SequentialWriteTest extends BufferedMutatorTest { 1163 SequentialWriteTest(Configuration conf, TestOptions options, Status status) { 1164 super(conf, options, status); 1165 } 1166 1167 @Override 1168 void testRow(final int i) throws IOException { 1169 byte[] row = format(i); 1170 Put put = new Put(row); 1171 byte[] value = generateData(this.rand, ROW_LENGTH); 1172 if (useTags) { 1173 byte[] tag = generateData(this.rand, TAG_LENGTH); 1174 Tag[] tags = new Tag[noOfTags]; 1175 for (int n = 0; n < noOfTags; n++) { 1176 Tag t = new ArrayBackedTag((byte) n, tag); 1177 tags[n] = t; 1178 } 1179 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1180 value, tags); 1181 put.add(kv); 1182 } else { 1183 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1184 } 1185 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1186 mutator.mutate(put); 1187 } 1188 } 1189 1190 static class FilteredScanTest extends TableTest { 1191 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 1192 1193 FilteredScanTest(Configuration conf, TestOptions options, Status status) { 1194 super(conf, options, status); 1195 } 1196 1197 @Override 1198 void testRow(int i) throws IOException { 1199 byte[] value = generateValue(this.rand); 1200 Scan scan = constructScan(value); 1201 try (ResultScanner scanner = this.table.getScanner(scan)) { 1202 while (scanner.next() != null) { 1203 } 1204 } 1205 } 1206 1207 protected Scan constructScan(byte[] valuePrefix) { 1208 Filter filter = new SingleColumnValueFilter( 1209 FAMILY_NAME, QUALIFIER_NAME, CompareOperator.EQUAL, 1210 new BinaryComparator(valuePrefix) 1211 ); 1212 Scan scan = new Scan(); 1213 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1214 scan.setFilter(filter); 1215 return scan; 1216 } 1217 } 1218 1219 /** 1220 * Format passed integer. 1221 * @param number the integer to format 1222 * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in 1223 * case number is negative). 1224 */ 1225 public static byte [] format(final int number) { 1226 byte[] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; 1227 int d = Math.abs(number); 1228 for (int i = b.length - 1; i >= 0; i--) { 1229 b[i] = (byte)((d % 10) + '0'); 1230 d /= 10; 1231 } 1232 return b; 1233 } 1234 1235 public static byte[] generateData(final Random r, int length) { 1236 byte[] b = new byte [length]; 1237 int i; 1238 1239 for (i = 0; i < (length-8); i += 8) { 1240 b[i] = (byte) (65 + r.nextInt(26)); 1241 b[i+1] = b[i]; 1242 b[i+2] = b[i]; 1243 b[i+3] = b[i]; 1244 b[i+4] = b[i]; 1245 b[i+5] = b[i]; 1246 b[i+6] = b[i]; 1247 b[i+7] = b[i]; 1248 } 1249 1250 byte a = (byte) (65 + r.nextInt(26)); 1251 for (; i < length; i++) { 1252 b[i] = a; 1253 } 1254 return b; 1255 } 1256 1257 public static byte[] generateValue(final Random r) { 1258 byte [] b = new byte [ROW_LENGTH]; 1259 r.nextBytes(b); 1260 return b; 1261 } 1262 1263 static byte[] getRandomRow(final Random random, final int totalRows) { 1264 return format(random.nextInt(Integer.MAX_VALUE) % totalRows); 1265 } 1266 1267 long runOneClient(final Class<? extends Test> cmd, final int startRow, 1268 final int perClientRunRows, final int totalRows, 1269 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 1270 Connection connection, final Status status) throws IOException { 1271 status.setStatus("Start " + cmd + " at offset " + startRow + " for " + 1272 perClientRunRows + " rows"); 1273 long totalElapsedTime; 1274 1275 TestOptions options = new TestOptions(startRow, perClientRunRows, 1276 totalRows, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection); 1277 final Test t; 1278 try { 1279 Constructor<? extends Test> constructor = cmd.getDeclaredConstructor( 1280 Configuration.class, TestOptions.class, Status.class); 1281 t = constructor.newInstance(this.conf, options, status); 1282 } catch (NoSuchMethodException e) { 1283 throw new IllegalArgumentException("Invalid command class: " + 1284 cmd.getName() + ". It does not provide a constructor as described by" + 1285 "the javadoc comment. Available constructors are: " + 1286 Arrays.toString(cmd.getConstructors())); 1287 } catch (Exception e) { 1288 throw new IllegalStateException("Failed to construct command class", e); 1289 } 1290 totalElapsedTime = t.test(); 1291 1292 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 1293 "ms at offset " + startRow + " for " + perClientRunRows + " rows"); 1294 return totalElapsedTime; 1295 } 1296 1297 private void runNIsOne(final Class<? extends Test> cmd) { 1298 Status status = LOG::info; 1299 1300 RemoteAdmin admin; 1301 try { 1302 Client client = new Client(cluster); 1303 admin = new RemoteAdmin(client, getConf()); 1304 checkTable(admin); 1305 runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, 1306 this.useTags, this.noOfTags, this.connection, status); 1307 } catch (Exception e) { 1308 LOG.error("Failed", e); 1309 } 1310 } 1311 1312 private void runTest(final Class<? extends Test> cmd) 1313 throws IOException, InterruptedException, ClassNotFoundException { 1314 if (N == 1) { 1315 // If there is only one client and one HRegionServer, we assume nothing 1316 // has been set up at all. 1317 runNIsOne(cmd); 1318 } else { 1319 // Else, run 1320 runNIsMoreThanOne(cmd); 1321 } 1322 } 1323 1324 protected void printUsage() { 1325 printUsage(null); 1326 } 1327 1328 protected void printUsage(final String message) { 1329 if (message != null && message.length() > 0) { 1330 System.err.println(message); 1331 } 1332 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 1333 System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); 1334 System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " + 1335 "[-D<property=value>]* <command> <nclients>"); 1336 System.err.println(); 1337 System.err.println("General Options:"); 1338 System.err.println(" nomapred Run multiple clients using threads " + 1339 "(rather than use mapreduce)"); 1340 System.err.println(" rows Rows each client runs. Default: One million"); 1341 System.err.println(); 1342 System.err.println("Table Creation / Write Tests:"); 1343 System.err.println(" table Alternate table name. Default: 'TestTable'"); 1344 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 1345 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 1346 "Default: false"); 1347 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 1348 System.err.println(" presplit Create presplit table. Recommended for accurate perf " + 1349 "analysis (see guide). Default: disabled"); 1350 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 1351 "Default : false"); 1352 System.err.println(" numoftags Specify the no of tags that would be needed. " + 1353 "This works only if usetags is true."); 1354 System.err.println(); 1355 System.err.println("Read Tests:"); 1356 System.err.println(" inmemory Tries to keep the HFiles of the CF inmemory as far as " + 1357 "possible. Not guaranteed that reads are always served from inmemory. Default: false"); 1358 System.err.println(); 1359 System.err.println(" Note: -D properties will be applied to the conf used. "); 1360 System.err.println(" For example: "); 1361 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 1362 System.err.println(" -Dmapreduce.task.timeout=60000"); 1363 System.err.println(); 1364 System.err.println("Command:"); 1365 for (CmdDescriptor command : commands.values()) { 1366 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); 1367 } 1368 System.err.println(); 1369 System.err.println("Args:"); 1370 System.err.println(" nclients Integer. Required. Total number of " + 1371 "clients (and HRegionServers)"); 1372 System.err.println(" running: 1 <= value <= 500"); 1373 System.err.println("Examples:"); 1374 System.err.println(" To run a single evaluation client:"); 1375 System.err.println(" $ hbase " + this.getClass().getName() 1376 + " sequentialWrite 1"); 1377 } 1378 1379 private void getArgs(final int start, final String[] args) { 1380 if (start + 1 > args.length) { 1381 throw new IllegalArgumentException("must supply the number of clients"); 1382 } 1383 N = Integer.parseInt(args[start]); 1384 if (N < 1) { 1385 throw new IllegalArgumentException("Number of clients must be > 1"); 1386 } 1387 // Set total number of rows to write. 1388 R = R * N; 1389 } 1390 1391 @Override 1392 public int run(String[] args) throws Exception { 1393 // Process command-line args. TODO: Better cmd-line processing 1394 // (but hopefully something not as painful as cli options). 1395 int errCode = -1; 1396 if (args.length < 1) { 1397 printUsage(); 1398 return errCode; 1399 } 1400 1401 try { 1402 for (int i = 0; i < args.length; i++) { 1403 String cmd = args[i]; 1404 if (cmd.equals("-h") || cmd.startsWith("--h")) { 1405 printUsage(); 1406 errCode = 0; 1407 break; 1408 } 1409 1410 final String nmr = "--nomapred"; 1411 if (cmd.startsWith(nmr)) { 1412 nomapred = true; 1413 continue; 1414 } 1415 1416 final String rows = "--rows="; 1417 if (cmd.startsWith(rows)) { 1418 R = Integer.parseInt(cmd.substring(rows.length())); 1419 continue; 1420 } 1421 1422 final String table = "--table="; 1423 if (cmd.startsWith(table)) { 1424 this.tableName = TableName.valueOf(cmd.substring(table.length())); 1425 continue; 1426 } 1427 1428 final String compress = "--compress="; 1429 if (cmd.startsWith(compress)) { 1430 this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 1431 continue; 1432 } 1433 1434 final String blockEncoding = "--blockEncoding="; 1435 if (cmd.startsWith(blockEncoding)) { 1436 this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 1437 continue; 1438 } 1439 1440 final String flushCommits = "--flushCommits="; 1441 if (cmd.startsWith(flushCommits)) { 1442 this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 1443 continue; 1444 } 1445 1446 final String writeToWAL = "--writeToWAL="; 1447 if (cmd.startsWith(writeToWAL)) { 1448 this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 1449 continue; 1450 } 1451 1452 final String presplit = "--presplit="; 1453 if (cmd.startsWith(presplit)) { 1454 this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 1455 continue; 1456 } 1457 1458 final String inMemory = "--inmemory="; 1459 if (cmd.startsWith(inMemory)) { 1460 this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 1461 continue; 1462 } 1463 1464 this.connection = ConnectionFactory.createConnection(getConf()); 1465 1466 final String useTags = "--usetags="; 1467 if (cmd.startsWith(useTags)) { 1468 this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 1469 continue; 1470 } 1471 1472 final String noOfTags = "--nooftags="; 1473 if (cmd.startsWith(noOfTags)) { 1474 this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 1475 continue; 1476 } 1477 1478 final String host = "--host="; 1479 if (cmd.startsWith(host)) { 1480 cluster.add(cmd.substring(host.length())); 1481 continue; 1482 } 1483 1484 Class<? extends Test> cmdClass = determineCommandClass(cmd); 1485 if (cmdClass != null) { 1486 getArgs(i + 1, args); 1487 if (cluster.isEmpty()) { 1488 String s = conf.get("stargate.hostname", "localhost"); 1489 if (s.contains(":")) { 1490 cluster.add(s); 1491 } else { 1492 cluster.add(s, conf.getInt("stargate.port", 8080)); 1493 } 1494 } 1495 runTest(cmdClass); 1496 errCode = 0; 1497 break; 1498 } 1499 1500 printUsage(); 1501 break; 1502 } 1503 } catch (Exception e) { 1504 LOG.error("Failed", e); 1505 } 1506 1507 return errCode; 1508 } 1509 1510 private Class<? extends Test> determineCommandClass(String cmd) { 1511 CmdDescriptor descriptor = commands.get(cmd); 1512 return descriptor != null ? descriptor.getCmdClass() : null; 1513 } 1514 1515 public static void main(final String[] args) throws Exception { 1516 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 1517 System.exit(res); 1518 } 1519}