001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.rest; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.lang.reflect.Constructor; 026import java.text.SimpleDateFormat; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Date; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.TreeMap; 034import java.util.regex.Matcher; 035import java.util.regex.Pattern; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.conf.Configured; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ArrayBackedTag; 044import org.apache.hadoop.hbase.CompareOperator; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HColumnDescriptor; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.Tag; 052import org.apache.hadoop.hbase.client.BufferedMutator; 053import org.apache.hadoop.hbase.client.Connection; 054import org.apache.hadoop.hbase.client.ConnectionFactory; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Get; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.ResultScanner; 060import org.apache.hadoop.hbase.client.Scan; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.filter.BinaryComparator; 063import org.apache.hadoop.hbase.filter.Filter; 064import org.apache.hadoop.hbase.filter.PageFilter; 065import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 066import org.apache.hadoop.hbase.filter.WhileMatchFilter; 067import org.apache.hadoop.hbase.io.compress.Compression; 068import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 069import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 070import org.apache.hadoop.hbase.rest.client.Client; 071import org.apache.hadoop.hbase.rest.client.Cluster; 072import org.apache.hadoop.hbase.rest.client.RemoteAdmin; 073import org.apache.hadoop.hbase.util.ByteArrayHashKey; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.Hash; 076import org.apache.hadoop.hbase.util.MurmurHash; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.io.LongWritable; 079import org.apache.hadoop.io.NullWritable; 080import org.apache.hadoop.io.Text; 081import org.apache.hadoop.io.Writable; 082import org.apache.hadoop.mapreduce.InputSplit; 083import org.apache.hadoop.mapreduce.Job; 084import org.apache.hadoop.mapreduce.JobContext; 085import org.apache.hadoop.mapreduce.Mapper; 086import org.apache.hadoop.mapreduce.RecordReader; 087import org.apache.hadoop.mapreduce.TaskAttemptContext; 088import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 089import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 090import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 091import org.apache.hadoop.util.LineReader; 092import org.apache.hadoop.util.Tool; 093import org.apache.hadoop.util.ToolRunner; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * Script used evaluating Stargate performance and scalability. Runs a SG 099 * client that steps through one of a set of hardcoded tests or 'experiments' 100 * (e.g. a random reads test, a random writes test, etc.). Pass on the 101 * command-line which test to run and how many clients are participating in 102 * this experiment. Run <code>java PerformanceEvaluation --help</code> to 103 * obtain usage. 104 * 105 * <p>This class sets up and runs the evaluation programs described in 106 * Section 7, <i>Performance Evaluation</i>, of the <a 107 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 108 * paper, pages 8-10. 109 * 110 * <p>If number of clients > 1, we start up a MapReduce job. Each map task 111 * runs an individual client. Each client does about 1GB of data. 112 */ 113public class PerformanceEvaluation extends Configured implements Tool { 114 protected static final Logger LOG = 115 LoggerFactory.getLogger(PerformanceEvaluation.class); 116 117 private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; 118 private static final int ROW_LENGTH = 1000; 119 private static final int TAG_LENGTH = 256; 120 private static final int ONE_GB = 1024 * 1024 * 1000; 121 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; 122 123 public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); 124 public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); 125 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); 126 private TableName tableName = TABLE_NAME; 127 128 protected HTableDescriptor TABLE_DESCRIPTOR; 129 protected Map<String, CmdDescriptor> commands = new TreeMap<>(); 130 protected static Cluster cluster = new Cluster(); 131 132 volatile Configuration conf; 133 private boolean nomapred = false; 134 private int N = 1; 135 private int R = ROWS_PER_GB; 136 private Compression.Algorithm compression = Compression.Algorithm.NONE; 137 private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 138 private boolean flushCommits = true; 139 private boolean writeToWAL = true; 140 private boolean inMemoryCF = false; 141 private int presplitRegions = 0; 142 private boolean useTags = false; 143 private int noOfTags = 1; 144 private Connection connection; 145 146 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 147 148 /** 149 * Regex to parse lines in input file passed to mapreduce task. 150 */ 151 public static final Pattern LINE_PATTERN = 152 Pattern.compile("tableName=(\\w+),\\s+" + 153 "startRow=(\\d+),\\s+" + 154 "perClientRunRows=(\\d+),\\s+" + 155 "totalRows=(\\d+),\\s+" + 156 "clients=(\\d+),\\s+" + 157 "flushCommits=(\\w+),\\s+" + 158 "writeToWAL=(\\w+),\\s+" + 159 "useTags=(\\w+),\\s+" + 160 "noOfTags=(\\d+)"); 161 162 /** 163 * Enum for map metrics. Keep it out here rather than inside in the Map 164 * inner-class so we can find associated properties. 165 */ 166 protected enum Counter { 167 /** elapsed time */ 168 ELAPSED_TIME, 169 /** number of rows */ 170 ROWS 171 } 172 173 /** 174 * Constructor 175 * @param c Configuration object 176 */ 177 public PerformanceEvaluation(final Configuration c) { 178 this.conf = c; 179 180 addCommandDescriptor(RandomReadTest.class, "randomRead", 181 "Run random read test"); 182 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", 183 "Run random seek and scan 100 test"); 184 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 185 "Run random seek scan with both start and stop row (max 10 rows)"); 186 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 187 "Run random seek scan with both start and stop row (max 100 rows)"); 188 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 189 "Run random seek scan with both start and stop row (max 1000 rows)"); 190 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 191 "Run random seek scan with both start and stop row (max 10000 rows)"); 192 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 193 "Run random write test"); 194 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 195 "Run sequential read test"); 196 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 197 "Run sequential write test"); 198 addCommandDescriptor(ScanTest.class, "scan", 199 "Run scan test (read every row)"); 200 addCommandDescriptor(FilteredScanTest.class, "filterScan", 201 "Run scan test using a filter to find a specific row based " + 202 "on it's value (make sure to use --rows=20)"); 203 } 204 205 protected void addCommandDescriptor(Class<? extends Test> cmdClass, 206 String name, String description) { 207 CmdDescriptor cmdDescriptor = 208 new CmdDescriptor(cmdClass, name, description); 209 commands.put(name, cmdDescriptor); 210 } 211 212 /** 213 * Implementations can have their status set. 214 */ 215 interface Status { 216 /** 217 * Sets status 218 * @param msg status message 219 * @throws IOException if setting the status fails 220 */ 221 void setStatus(final String msg) throws IOException; 222 } 223 224 /** 225 * This class works as the InputSplit of Performance Evaluation 226 * MapReduce InputFormat, and the Record Value of RecordReader. 227 * Each map task will only read one record from a PeInputSplit, 228 * the record value is the PeInputSplit itself. 229 */ 230 public static class PeInputSplit extends InputSplit implements Writable { 231 private TableName tableName; 232 private int startRow; 233 private int rows; 234 private int totalRows; 235 private int clients; 236 private boolean flushCommits; 237 private boolean writeToWAL; 238 private boolean useTags; 239 private int noOfTags; 240 241 public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, 242 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { 243 this.tableName = tableName; 244 this.startRow = startRow; 245 this.rows = rows; 246 this.totalRows = totalRows; 247 this.clients = clients; 248 this.flushCommits = flushCommits; 249 this.writeToWAL = writeToWAL; 250 this.useTags = useTags; 251 this.noOfTags = noOfTags; 252 } 253 254 @Override 255 public void readFields(DataInput in) throws IOException { 256 int tableNameLen = in.readInt(); 257 byte[] name = new byte[tableNameLen]; 258 in.readFully(name); 259 this.tableName = TableName.valueOf(name); 260 this.startRow = in.readInt(); 261 this.rows = in.readInt(); 262 this.totalRows = in.readInt(); 263 this.clients = in.readInt(); 264 this.flushCommits = in.readBoolean(); 265 this.writeToWAL = in.readBoolean(); 266 this.useTags = in.readBoolean(); 267 this.noOfTags = in.readInt(); 268 } 269 270 @Override 271 public void write(DataOutput out) throws IOException { 272 byte[] name = this.tableName.toBytes(); 273 out.writeInt(name.length); 274 out.write(name); 275 out.writeInt(startRow); 276 out.writeInt(rows); 277 out.writeInt(totalRows); 278 out.writeInt(clients); 279 out.writeBoolean(flushCommits); 280 out.writeBoolean(writeToWAL); 281 out.writeBoolean(useTags); 282 out.writeInt(noOfTags); 283 } 284 285 @Override 286 public long getLength() { 287 return 0; 288 } 289 290 @Override 291 public String[] getLocations() { 292 return new String[0]; 293 } 294 295 public int getStartRow() { 296 return startRow; 297 } 298 299 public TableName getTableName() { 300 return tableName; 301 } 302 303 public int getRows() { 304 return rows; 305 } 306 307 public int getTotalRows() { 308 return totalRows; 309 } 310 311 public boolean isFlushCommits() { 312 return flushCommits; 313 } 314 315 public boolean isWriteToWAL() { 316 return writeToWAL; 317 } 318 319 public boolean isUseTags() { 320 return useTags; 321 } 322 323 public int getNoOfTags() { 324 return noOfTags; 325 } 326 } 327 328 /** 329 * InputFormat of Performance Evaluation MapReduce job. 330 * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). 331 */ 332 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> { 333 @Override 334 public List<InputSplit> getSplits(JobContext job) throws IOException { 335 // generate splits 336 List<InputSplit> splitList = new ArrayList<>(); 337 338 for (FileStatus file: listStatus(job)) { 339 if (file.isDirectory()) { 340 continue; 341 } 342 Path path = file.getPath(); 343 FileSystem fs = path.getFileSystem(job.getConfiguration()); 344 FSDataInputStream fileIn = fs.open(path); 345 LineReader in = new LineReader(fileIn, job.getConfiguration()); 346 int lineLen; 347 while(true) { 348 Text lineText = new Text(); 349 lineLen = in.readLine(lineText); 350 if(lineLen <= 0) { 351 break; 352 } 353 Matcher m = LINE_PATTERN.matcher(lineText.toString()); 354 if ((m != null) && m.matches()) { 355 TableName tableName = TableName.valueOf(m.group(1)); 356 int startRow = Integer.parseInt(m.group(2)); 357 int rows = Integer.parseInt(m.group(3)); 358 int totalRows = Integer.parseInt(m.group(4)); 359 int clients = Integer.parseInt(m.group(5)); 360 boolean flushCommits = Boolean.parseBoolean(m.group(6)); 361 boolean writeToWAL = Boolean.parseBoolean(m.group(7)); 362 boolean useTags = Boolean.parseBoolean(m.group(8)); 363 int noOfTags = Integer.parseInt(m.group(9)); 364 365 LOG.debug("tableName=" + tableName + 366 " split["+ splitList.size() + "] " + 367 " startRow=" + startRow + 368 " rows=" + rows + 369 " totalRows=" + totalRows + 370 " clients=" + clients + 371 " flushCommits=" + flushCommits + 372 " writeToWAL=" + writeToWAL + 373 " useTags=" + useTags + 374 " noOfTags=" + noOfTags); 375 376 PeInputSplit newSplit = 377 new PeInputSplit(tableName, startRow, rows, totalRows, clients, 378 flushCommits, writeToWAL, useTags, noOfTags); 379 splitList.add(newSplit); 380 } 381 } 382 in.close(); 383 } 384 385 LOG.info("Total # of splits: " + splitList.size()); 386 return splitList; 387 } 388 389 @Override 390 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split, 391 TaskAttemptContext context) { 392 return new PeRecordReader(); 393 } 394 395 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> { 396 private boolean readOver = false; 397 private PeInputSplit split = null; 398 private NullWritable key = null; 399 private PeInputSplit value = null; 400 401 @Override 402 public void initialize(InputSplit split, TaskAttemptContext context) { 403 this.readOver = false; 404 this.split = (PeInputSplit)split; 405 } 406 407 @Override 408 public boolean nextKeyValue() { 409 if(readOver) { 410 return false; 411 } 412 413 key = NullWritable.get(); 414 value = split; 415 416 readOver = true; 417 return true; 418 } 419 420 @Override 421 public NullWritable getCurrentKey() { 422 return key; 423 } 424 425 @Override 426 public PeInputSplit getCurrentValue() { 427 return value; 428 } 429 430 @Override 431 public float getProgress() { 432 if(readOver) { 433 return 1.0f; 434 } else { 435 return 0.0f; 436 } 437 } 438 439 @Override 440 public void close() { 441 // do nothing 442 } 443 } 444 } 445 446 /** 447 * MapReduce job that runs a performance evaluation client in each map task. 448 */ 449 public static class EvaluationMapTask 450 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> { 451 452 /** configuration parameter name that contains the command */ 453 public final static String CMD_KEY = "EvaluationMapTask.command"; 454 /** configuration parameter name that contains the PE impl */ 455 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 456 457 private Class<? extends Test> cmd; 458 private PerformanceEvaluation pe; 459 460 @Override 461 protected void setup(Context context) { 462 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 463 464 // this is required so that extensions of PE are instantiated within the 465 // map reduce task... 466 Class<? extends PerformanceEvaluation> peClass = 467 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 468 try { 469 this.pe = peClass.getConstructor(Configuration.class) 470 .newInstance(context.getConfiguration()); 471 } catch (Exception e) { 472 throw new IllegalStateException("Could not instantiate PE instance", e); 473 } 474 } 475 476 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 477 Class<? extends Type> clazz; 478 try { 479 clazz = Class.forName(className).asSubclass(type); 480 } catch (ClassNotFoundException e) { 481 throw new IllegalStateException("Could not find class for name: " + className, e); 482 } 483 return clazz; 484 } 485 486 @Override 487 protected void map(NullWritable key, PeInputSplit value, final Context context) 488 throws IOException, InterruptedException { 489 Status status = context::setStatus; 490 491 // Evaluation task 492 pe.tableName = value.getTableName(); 493 long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), 494 value.getRows(), value.getTotalRows(), 495 value.isFlushCommits(), value.isWriteToWAL(), 496 value.isUseTags(), value.getNoOfTags(), 497 ConnectionFactory.createConnection(context.getConfiguration()), status); 498 // Collect how much time the thing took. Report as map output and 499 // to the ELAPSED_TIME counter. 500 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); 501 context.getCounter(Counter.ROWS).increment(value.rows); 502 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); 503 context.progress(); 504 } 505 } 506 507 /** 508 * If table does not already exist, create. 509 * @param admin Client to use checking. 510 * @return True if we created the table. 511 * @throws IOException if an operation on the table fails 512 */ 513 private boolean checkTable(RemoteAdmin admin) throws IOException { 514 HTableDescriptor tableDescriptor = getTableDescriptor(); 515 if (this.presplitRegions > 0) { 516 // presplit requested 517 if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) { 518 admin.deleteTable(tableDescriptor.getTableName().getName()); 519 } 520 521 byte[][] splits = getSplits(); 522 for (int i=0; i < splits.length; i++) { 523 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 524 } 525 admin.createTable(tableDescriptor); 526 LOG.info("Table created with " + this.presplitRegions + " splits"); 527 } else { 528 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 529 if (!tableExists) { 530 admin.createTable(tableDescriptor); 531 LOG.info("Table " + tableDescriptor + " created"); 532 } 533 } 534 535 return admin.isTableAvailable(tableDescriptor.getTableName().getName()); 536 } 537 538 protected HTableDescriptor getTableDescriptor() { 539 if (TABLE_DESCRIPTOR == null) { 540 TABLE_DESCRIPTOR = new HTableDescriptor(tableName); 541 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 542 family.setDataBlockEncoding(blockEncoding); 543 family.setCompressionType(compression); 544 if (inMemoryCF) { 545 family.setInMemory(true); 546 } 547 TABLE_DESCRIPTOR.addFamily(family); 548 } 549 return TABLE_DESCRIPTOR; 550 } 551 552 /** 553 * Generates splits based on total number of rows and specified split regions 554 * 555 * @return splits : array of byte [] 556 */ 557 protected byte[][] getSplits() { 558 if (this.presplitRegions == 0) { 559 return new byte[0][]; 560 } 561 562 int numSplitPoints = presplitRegions - 1; 563 byte[][] splits = new byte[numSplitPoints][]; 564 int jump = this.R / this.presplitRegions; 565 for (int i=0; i < numSplitPoints; i++) { 566 int rowkey = jump * (1 + i); 567 splits[i] = format(rowkey); 568 } 569 return splits; 570 } 571 572 /** 573 * We're to run multiple clients concurrently. Setup a mapreduce job. Run 574 * one map per client. Then run a single reduce to sum the elapsed times. 575 * @param cmd Command to run. 576 */ 577 private void runNIsMoreThanOne(final Class<? extends Test> cmd) 578 throws IOException, InterruptedException, ClassNotFoundException { 579 RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf()); 580 checkTable(remoteAdmin); 581 if (nomapred) { 582 doMultipleClients(cmd); 583 } else { 584 doMapReduce(cmd); 585 } 586 } 587 588 /** 589 * Run all clients in this vm each to its own thread. 590 * @param cmd Command to run 591 * @throws IOException if creating a connection fails 592 */ 593 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException { 594 final List<Thread> threads = new ArrayList<>(this.N); 595 final long[] timings = new long[this.N]; 596 final int perClientRows = R/N; 597 final TableName tableName = this.tableName; 598 final DataBlockEncoding encoding = this.blockEncoding; 599 final boolean flushCommits = this.flushCommits; 600 final Compression.Algorithm compression = this.compression; 601 final boolean writeToWal = this.writeToWAL; 602 final int preSplitRegions = this.presplitRegions; 603 final boolean useTags = this.useTags; 604 final int numTags = this.noOfTags; 605 final Connection connection = ConnectionFactory.createConnection(getConf()); 606 for (int i = 0; i < this.N; i++) { 607 final int index = i; 608 Thread t = new Thread("TestClient-" + i) { 609 @Override 610 public void run() { 611 super.run(); 612 PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); 613 pe.tableName = tableName; 614 pe.blockEncoding = encoding; 615 pe.flushCommits = flushCommits; 616 pe.compression = compression; 617 pe.writeToWAL = writeToWal; 618 pe.presplitRegions = preSplitRegions; 619 pe.N = N; 620 pe.connection = connection; 621 pe.useTags = useTags; 622 pe.noOfTags = numTags; 623 try { 624 long elapsedTime = pe.runOneClient(cmd, index * perClientRows, 625 perClientRows, R, 626 flushCommits, writeToWAL, useTags, noOfTags, connection, 627 msg -> LOG.info("client-" + getName() + " " + msg)); 628 timings[index] = elapsedTime; 629 LOG.info("Finished " + getName() + " in " + elapsedTime + 630 "ms writing " + perClientRows + " rows"); 631 } catch (IOException e) { 632 throw new RuntimeException(e); 633 } 634 } 635 }; 636 threads.add(t); 637 } 638 for (Thread t: threads) { 639 t.start(); 640 } 641 for (Thread t: threads) { 642 while(t.isAlive()) { 643 try { 644 t.join(); 645 } catch (InterruptedException e) { 646 LOG.debug("Interrupted, continuing" + e.toString()); 647 } 648 } 649 } 650 final String test = cmd.getSimpleName(); 651 LOG.info("[" + test + "] Summary of timings (ms): " 652 + Arrays.toString(timings)); 653 Arrays.sort(timings); 654 long total = 0; 655 for (int i = 0; i < this.N; i++) { 656 total += timings[i]; 657 } 658 LOG.info("[" + test + "]" 659 + "\tMin: " + timings[0] + "ms" 660 + "\tMax: " + timings[this.N - 1] + "ms" 661 + "\tAvg: " + (total / this.N) + "ms"); 662 } 663 664 /** 665 * Run a mapreduce job. Run as many maps as asked-for clients. 666 * Before we start up the job, write out an input file with instruction 667 * per client regards which row they are to start on. 668 * @param cmd Command to run. 669 */ 670 private void doMapReduce(final Class<? extends Test> cmd) 671 throws IOException, InterruptedException, ClassNotFoundException { 672 Configuration conf = getConf(); 673 Path inputDir = writeInputFile(conf); 674 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 675 conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); 676 Job job = Job.getInstance(conf); 677 job.setJarByClass(PerformanceEvaluation.class); 678 job.setJobName("HBase Performance Evaluation"); 679 680 job.setInputFormatClass(PeInputFormat.class); 681 PeInputFormat.setInputPaths(job, inputDir); 682 683 job.setOutputKeyClass(LongWritable.class); 684 job.setOutputValueClass(LongWritable.class); 685 686 job.setMapperClass(EvaluationMapTask.class); 687 job.setReducerClass(LongSumReducer.class); 688 job.setNumReduceTasks(1); 689 690 job.setOutputFormatClass(TextOutputFormat.class); 691 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 692 TableMapReduceUtil.addDependencyJars(job); 693 TableMapReduceUtil.initCredentials(job); 694 job.waitForCompletion(true); 695 } 696 697 /** 698 * Write input file of offsets-per-client for the mapreduce job. 699 * @param c Configuration 700 * @return Directory that contains file written. 701 * @throws IOException if creating the directory or the file fails 702 */ 703 private Path writeInputFile(final Configuration c) throws IOException { 704 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 705 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); 706 Path inputDir = new Path(jobdir, "inputs"); 707 708 FileSystem fs = FileSystem.get(c); 709 fs.mkdirs(inputDir); 710 Path inputFile = new Path(inputDir, "input.txt"); 711 PrintStream out = new PrintStream(fs.create(inputFile)); 712 // Make input random. 713 Map<Integer, String> m = new TreeMap<>(); 714 Hash h = MurmurHash.getInstance(); 715 int perClientRows = (this.R / this.N); 716 try { 717 for (int i = 0; i < 10; i++) { 718 for (int j = 0; j < N; j++) { 719 String s = "tableName=" + this.tableName + 720 ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + 721 ", perClientRunRows=" + (perClientRows / 10) + 722 ", totalRows=" + this.R + 723 ", clients=" + this.N + 724 ", flushCommits=" + this.flushCommits + 725 ", writeToWAL=" + this.writeToWAL + 726 ", useTags=" + this.useTags + 727 ", noOfTags=" + this.noOfTags; 728 byte[] b = Bytes.toBytes(s); 729 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 730 m.put(hash, s); 731 } 732 } 733 for (Map.Entry<Integer, String> e: m.entrySet()) { 734 out.println(e.getValue()); 735 } 736 } finally { 737 out.close(); 738 } 739 return inputDir; 740 } 741 742 /** 743 * Describes a command. 744 */ 745 static class CmdDescriptor { 746 private Class<? extends Test> cmdClass; 747 private String name; 748 private String description; 749 750 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) { 751 this.cmdClass = cmdClass; 752 this.name = name; 753 this.description = description; 754 } 755 756 public Class<? extends Test> getCmdClass() { 757 return cmdClass; 758 } 759 760 public String getName() { 761 return name; 762 } 763 764 public String getDescription() { 765 return description; 766 } 767 } 768 769 /** 770 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation} tests 771 * This makes the reflection logic a little easier to understand... 772 */ 773 static class TestOptions { 774 private int startRow; 775 private int perClientRunRows; 776 private int totalRows; 777 private TableName tableName; 778 private boolean flushCommits; 779 private boolean writeToWAL; 780 private boolean useTags; 781 private int noOfTags; 782 private Connection connection; 783 784 TestOptions(int startRow, int perClientRunRows, int totalRows, TableName tableName, 785 boolean flushCommits, boolean writeToWAL, boolean useTags, 786 int noOfTags, Connection connection) { 787 this.startRow = startRow; 788 this.perClientRunRows = perClientRunRows; 789 this.totalRows = totalRows; 790 this.tableName = tableName; 791 this.flushCommits = flushCommits; 792 this.writeToWAL = writeToWAL; 793 this.useTags = useTags; 794 this.noOfTags = noOfTags; 795 this.connection = connection; 796 } 797 798 public int getStartRow() { 799 return startRow; 800 } 801 802 public int getPerClientRunRows() { 803 return perClientRunRows; 804 } 805 806 public int getTotalRows() { 807 return totalRows; 808 } 809 810 public TableName getTableName() { 811 return tableName; 812 } 813 814 public boolean isFlushCommits() { 815 return flushCommits; 816 } 817 818 public boolean isWriteToWAL() { 819 return writeToWAL; 820 } 821 822 public Connection getConnection() { 823 return connection; 824 } 825 826 public boolean isUseTags() { 827 return this.useTags; 828 } 829 830 public int getNumTags() { 831 return this.noOfTags; 832 } 833 } 834 835 /* 836 * A test. 837 * Subclass to particularize what happens per row. 838 */ 839 static abstract class Test { 840 // Below is make it so when Tests are all running in the one 841 // jvm, that they each have a differently seeded Random. 842 private static final Random randomSeed = 843 new Random(System.currentTimeMillis()); 844 private static long nextRandomSeed() { 845 return randomSeed.nextLong(); 846 } 847 protected final Random rand = new Random(nextRandomSeed()); 848 849 protected final int startRow; 850 protected final int perClientRunRows; 851 protected final int totalRows; 852 private final Status status; 853 protected TableName tableName; 854 protected volatile Configuration conf; 855 protected boolean writeToWAL; 856 protected boolean useTags; 857 protected int noOfTags; 858 protected Connection connection; 859 860 /** 861 * Note that all subclasses of this class must provide a public contructor 862 * that has the exact same list of arguments. 863 */ 864 Test(final Configuration conf, final TestOptions options, final Status status) { 865 super(); 866 this.startRow = options.getStartRow(); 867 this.perClientRunRows = options.getPerClientRunRows(); 868 this.totalRows = options.getTotalRows(); 869 this.status = status; 870 this.tableName = options.getTableName(); 871 this.conf = conf; 872 this.writeToWAL = options.isWriteToWAL(); 873 this.useTags = options.isUseTags(); 874 this.noOfTags = options.getNumTags(); 875 this.connection = options.getConnection(); 876 } 877 878 protected String generateStatus(final int sr, final int i, final int lr) { 879 return sr + "/" + i + "/" + lr; 880 } 881 882 protected int getReportingPeriod() { 883 int period = this.perClientRunRows / 10; 884 return period == 0? this.perClientRunRows: period; 885 } 886 887 abstract void testTakedown() throws IOException; 888 889 /** 890 * Run test 891 * @return Elapsed time. 892 * @throws IOException if something in the test fails 893 */ 894 long test() throws IOException { 895 testSetup(); 896 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 897 final long startTime = System.nanoTime(); 898 try { 899 testTimed(); 900 } finally { 901 testTakedown(); 902 } 903 return (System.nanoTime() - startTime) / 1000000; 904 } 905 906 abstract void testSetup() throws IOException; 907 908 /** 909 * Provides an extension point for tests that don't want a per row invocation. 910 */ 911 void testTimed() throws IOException { 912 int lastRow = this.startRow + this.perClientRunRows; 913 // Report on completion of 1/10th of total. 914 for (int i = this.startRow; i < lastRow; i++) { 915 testRow(i); 916 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 917 status.setStatus(generateStatus(this.startRow, i, lastRow)); 918 } 919 } 920 } 921 922 /** 923 * Test for individual row. 924 * @param i Row index. 925 */ 926 abstract void testRow(final int i) throws IOException; 927 } 928 929 static abstract class TableTest extends Test { 930 protected Table table; 931 932 public TableTest(Configuration conf, TestOptions options, Status status) { 933 super(conf, options, status); 934 } 935 936 @Override 937 void testSetup() throws IOException { 938 this.table = connection.getTable(tableName); 939 } 940 941 @Override 942 void testTakedown() throws IOException { 943 table.close(); 944 } 945 } 946 947 static abstract class BufferedMutatorTest extends Test { 948 protected BufferedMutator mutator; 949 protected boolean flushCommits; 950 951 public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { 952 super(conf, options, status); 953 this.flushCommits = options.isFlushCommits(); 954 } 955 956 @Override 957 void testSetup() throws IOException { 958 this.mutator = connection.getBufferedMutator(tableName); 959 } 960 961 @Override 962 void testTakedown() throws IOException { 963 if (flushCommits) { 964 this.mutator.flush(); 965 } 966 mutator.close(); 967 } 968 } 969 970 static class RandomSeekScanTest extends TableTest { 971 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { 972 super(conf, options, status); 973 } 974 975 @Override 976 void testRow(final int i) throws IOException { 977 Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); 978 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 979 scan.setFilter(new WhileMatchFilter(new PageFilter(120))); 980 ResultScanner s = this.table.getScanner(scan); 981 s.close(); 982 } 983 984 @Override 985 protected int getReportingPeriod() { 986 int period = this.perClientRunRows / 100; 987 return period == 0? this.perClientRunRows: period; 988 } 989 } 990 991 @SuppressWarnings("unused") 992 static abstract class RandomScanWithRangeTest extends TableTest { 993 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { 994 super(conf, options, status); 995 } 996 997 @Override 998 void testRow(final int i) throws IOException { 999 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1000 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); 1001 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1002 ResultScanner s = this.table.getScanner(scan); 1003 int count = 0; 1004 for (Result rr = null; (rr = s.next()) != null;) { 1005 count++; 1006 } 1007 1008 if (i % 100 == 0) { 1009 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1010 Bytes.toString(startAndStopRow.getFirst()), 1011 Bytes.toString(startAndStopRow.getSecond()), count)); 1012 } 1013 1014 s.close(); 1015 } 1016 1017 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1018 1019 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1020 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; 1021 int stop = start + maxRange; 1022 return new Pair<>(format(start), format(stop)); 1023 } 1024 1025 @Override 1026 protected int getReportingPeriod() { 1027 int period = this.perClientRunRows / 100; 1028 return period == 0? this.perClientRunRows: period; 1029 } 1030 } 1031 1032 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1033 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) { 1034 super(conf, options, status); 1035 } 1036 1037 @Override 1038 protected Pair<byte[], byte[]> getStartAndStopRow() { 1039 return generateStartAndStopRows(10); 1040 } 1041 } 1042 1043 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1044 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) { 1045 super(conf, options, status); 1046 } 1047 1048 @Override 1049 protected Pair<byte[], byte[]> getStartAndStopRow() { 1050 return generateStartAndStopRows(100); 1051 } 1052 } 1053 1054 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1055 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) { 1056 super(conf, options, status); 1057 } 1058 1059 @Override 1060 protected Pair<byte[], byte[]> getStartAndStopRow() { 1061 return generateStartAndStopRows(1000); 1062 } 1063 } 1064 1065 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1066 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) { 1067 super(conf, options, status); 1068 } 1069 1070 @Override 1071 protected Pair<byte[], byte[]> getStartAndStopRow() { 1072 return generateStartAndStopRows(10000); 1073 } 1074 } 1075 1076 static class RandomReadTest extends TableTest { 1077 RandomReadTest(Configuration conf, TestOptions options, Status status) { 1078 super(conf, options, status); 1079 } 1080 1081 @Override 1082 void testRow(final int i) throws IOException { 1083 Get get = new Get(getRandomRow(this.rand, this.totalRows)); 1084 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1085 this.table.get(get); 1086 } 1087 1088 @Override 1089 protected int getReportingPeriod() { 1090 int period = this.perClientRunRows / 100; 1091 return period == 0? this.perClientRunRows: period; 1092 } 1093 } 1094 1095 static class RandomWriteTest extends BufferedMutatorTest { 1096 RandomWriteTest(Configuration conf, TestOptions options, Status status) { 1097 super(conf, options, status); 1098 } 1099 1100 @Override 1101 void testRow(final int i) throws IOException { 1102 byte[] row = getRandomRow(this.rand, this.totalRows); 1103 Put put = new Put(row); 1104 byte[] value = generateData(this.rand, ROW_LENGTH); 1105 if (useTags) { 1106 byte[] tag = generateData(this.rand, TAG_LENGTH); 1107 Tag[] tags = new Tag[noOfTags]; 1108 for (int n = 0; n < noOfTags; n++) { 1109 Tag t = new ArrayBackedTag((byte) n, tag); 1110 tags[n] = t; 1111 } 1112 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1113 value, tags); 1114 put.add(kv); 1115 } else { 1116 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1117 } 1118 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1119 mutator.mutate(put); 1120 } 1121 } 1122 1123 static class ScanTest extends TableTest { 1124 private ResultScanner testScanner; 1125 1126 ScanTest(Configuration conf, TestOptions options, Status status) { 1127 super(conf, options, status); 1128 } 1129 1130 @Override 1131 void testTakedown() throws IOException { 1132 if (this.testScanner != null) { 1133 this.testScanner.close(); 1134 } 1135 super.testTakedown(); 1136 } 1137 1138 @Override 1139 void testRow(final int i) throws IOException { 1140 if (this.testScanner == null) { 1141 Scan scan = new Scan(format(this.startRow)); 1142 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1143 this.testScanner = table.getScanner(scan); 1144 } 1145 testScanner.next(); 1146 } 1147 } 1148 1149 static class SequentialReadTest extends TableTest { 1150 SequentialReadTest(Configuration conf, TestOptions options, Status status) { 1151 super(conf, options, status); 1152 } 1153 1154 @Override 1155 void testRow(final int i) throws IOException { 1156 Get get = new Get(format(i)); 1157 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1158 table.get(get); 1159 } 1160 } 1161 1162 static class SequentialWriteTest extends BufferedMutatorTest { 1163 SequentialWriteTest(Configuration conf, TestOptions options, Status status) { 1164 super(conf, options, status); 1165 } 1166 1167 @Override 1168 void testRow(final int i) throws IOException { 1169 byte[] row = format(i); 1170 Put put = new Put(row); 1171 byte[] value = generateData(this.rand, ROW_LENGTH); 1172 if (useTags) { 1173 byte[] tag = generateData(this.rand, TAG_LENGTH); 1174 Tag[] tags = new Tag[noOfTags]; 1175 for (int n = 0; n < noOfTags; n++) { 1176 Tag t = new ArrayBackedTag((byte) n, tag); 1177 tags[n] = t; 1178 } 1179 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1180 value, tags); 1181 put.add(kv); 1182 } else { 1183 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1184 } 1185 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1186 mutator.mutate(put); 1187 } 1188 } 1189 1190 static class FilteredScanTest extends TableTest { 1191 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 1192 1193 FilteredScanTest(Configuration conf, TestOptions options, Status status) { 1194 super(conf, options, status); 1195 } 1196 1197 @Override 1198 void testRow(int i) throws IOException { 1199 byte[] value = generateValue(this.rand); 1200 Scan scan = constructScan(value); 1201 ResultScanner scanner = null; 1202 try { 1203 scanner = this.table.getScanner(scan); 1204 while (scanner.next() != null) { 1205 } 1206 } finally { 1207 if (scanner != null) { 1208 scanner.close(); 1209 } 1210 } 1211 } 1212 1213 protected Scan constructScan(byte[] valuePrefix) { 1214 Filter filter = new SingleColumnValueFilter( 1215 FAMILY_NAME, QUALIFIER_NAME, CompareOperator.EQUAL, 1216 new BinaryComparator(valuePrefix) 1217 ); 1218 Scan scan = new Scan(); 1219 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1220 scan.setFilter(filter); 1221 return scan; 1222 } 1223 } 1224 1225 /** 1226 * Format passed integer. 1227 * @param number the integer to format 1228 * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in 1229 * case number is negative). 1230 */ 1231 public static byte [] format(final int number) { 1232 byte[] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; 1233 int d = Math.abs(number); 1234 for (int i = b.length - 1; i >= 0; i--) { 1235 b[i] = (byte)((d % 10) + '0'); 1236 d /= 10; 1237 } 1238 return b; 1239 } 1240 1241 public static byte[] generateData(final Random r, int length) { 1242 byte[] b = new byte [length]; 1243 int i; 1244 1245 for (i = 0; i < (length-8); i += 8) { 1246 b[i] = (byte) (65 + r.nextInt(26)); 1247 b[i+1] = b[i]; 1248 b[i+2] = b[i]; 1249 b[i+3] = b[i]; 1250 b[i+4] = b[i]; 1251 b[i+5] = b[i]; 1252 b[i+6] = b[i]; 1253 b[i+7] = b[i]; 1254 } 1255 1256 byte a = (byte) (65 + r.nextInt(26)); 1257 for (; i < length; i++) { 1258 b[i] = a; 1259 } 1260 return b; 1261 } 1262 1263 public static byte[] generateValue(final Random r) { 1264 byte [] b = new byte [ROW_LENGTH]; 1265 r.nextBytes(b); 1266 return b; 1267 } 1268 1269 static byte[] getRandomRow(final Random random, final int totalRows) { 1270 return format(random.nextInt(Integer.MAX_VALUE) % totalRows); 1271 } 1272 1273 long runOneClient(final Class<? extends Test> cmd, final int startRow, 1274 final int perClientRunRows, final int totalRows, 1275 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 1276 Connection connection, final Status status) throws IOException { 1277 status.setStatus("Start " + cmd + " at offset " + startRow + " for " + 1278 perClientRunRows + " rows"); 1279 long totalElapsedTime; 1280 1281 TestOptions options = new TestOptions(startRow, perClientRunRows, 1282 totalRows, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection); 1283 final Test t; 1284 try { 1285 Constructor<? extends Test> constructor = cmd.getDeclaredConstructor( 1286 Configuration.class, TestOptions.class, Status.class); 1287 t = constructor.newInstance(this.conf, options, status); 1288 } catch (NoSuchMethodException e) { 1289 throw new IllegalArgumentException("Invalid command class: " + 1290 cmd.getName() + ". It does not provide a constructor as described by" + 1291 "the javadoc comment. Available constructors are: " + 1292 Arrays.toString(cmd.getConstructors())); 1293 } catch (Exception e) { 1294 throw new IllegalStateException("Failed to construct command class", e); 1295 } 1296 totalElapsedTime = t.test(); 1297 1298 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 1299 "ms at offset " + startRow + " for " + perClientRunRows + " rows"); 1300 return totalElapsedTime; 1301 } 1302 1303 private void runNIsOne(final Class<? extends Test> cmd) { 1304 Status status = LOG::info; 1305 1306 RemoteAdmin admin; 1307 try { 1308 Client client = new Client(cluster); 1309 admin = new RemoteAdmin(client, getConf()); 1310 checkTable(admin); 1311 runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, 1312 this.useTags, this.noOfTags, this.connection, status); 1313 } catch (Exception e) { 1314 LOG.error("Failed", e); 1315 } 1316 } 1317 1318 private void runTest(final Class<? extends Test> cmd) 1319 throws IOException, InterruptedException, ClassNotFoundException { 1320 if (N == 1) { 1321 // If there is only one client and one HRegionServer, we assume nothing 1322 // has been set up at all. 1323 runNIsOne(cmd); 1324 } else { 1325 // Else, run 1326 runNIsMoreThanOne(cmd); 1327 } 1328 } 1329 1330 protected void printUsage() { 1331 printUsage(null); 1332 } 1333 1334 protected void printUsage(final String message) { 1335 if (message != null && message.length() > 0) { 1336 System.err.println(message); 1337 } 1338 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 1339 System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); 1340 System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " + 1341 "[-D<property=value>]* <command> <nclients>"); 1342 System.err.println(); 1343 System.err.println("General Options:"); 1344 System.err.println(" nomapred Run multiple clients using threads " + 1345 "(rather than use mapreduce)"); 1346 System.err.println(" rows Rows each client runs. Default: One million"); 1347 System.err.println(); 1348 System.err.println("Table Creation / Write Tests:"); 1349 System.err.println(" table Alternate table name. Default: 'TestTable'"); 1350 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 1351 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 1352 "Default: false"); 1353 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 1354 System.err.println(" presplit Create presplit table. Recommended for accurate perf " + 1355 "analysis (see guide). Default: disabled"); 1356 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 1357 "Default : false"); 1358 System.err.println(" numoftags Specify the no of tags that would be needed. " + 1359 "This works only if usetags is true."); 1360 System.err.println(); 1361 System.err.println("Read Tests:"); 1362 System.err.println(" inmemory Tries to keep the HFiles of the CF inmemory as far as " + 1363 "possible. Not guaranteed that reads are always served from inmemory. Default: false"); 1364 System.err.println(); 1365 System.err.println(" Note: -D properties will be applied to the conf used. "); 1366 System.err.println(" For example: "); 1367 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 1368 System.err.println(" -Dmapreduce.task.timeout=60000"); 1369 System.err.println(); 1370 System.err.println("Command:"); 1371 for (CmdDescriptor command : commands.values()) { 1372 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); 1373 } 1374 System.err.println(); 1375 System.err.println("Args:"); 1376 System.err.println(" nclients Integer. Required. Total number of " + 1377 "clients (and HRegionServers)"); 1378 System.err.println(" running: 1 <= value <= 500"); 1379 System.err.println("Examples:"); 1380 System.err.println(" To run a single evaluation client:"); 1381 System.err.println(" $ hbase " + this.getClass().getName() 1382 + " sequentialWrite 1"); 1383 } 1384 1385 private void getArgs(final int start, final String[] args) { 1386 if (start + 1 > args.length) { 1387 throw new IllegalArgumentException("must supply the number of clients"); 1388 } 1389 N = Integer.parseInt(args[start]); 1390 if (N < 1) { 1391 throw new IllegalArgumentException("Number of clients must be > 1"); 1392 } 1393 // Set total number of rows to write. 1394 R = R * N; 1395 } 1396 1397 @Override 1398 public int run(String[] args) throws Exception { 1399 // Process command-line args. TODO: Better cmd-line processing 1400 // (but hopefully something not as painful as cli options). 1401 int errCode = -1; 1402 if (args.length < 1) { 1403 printUsage(); 1404 return errCode; 1405 } 1406 1407 try { 1408 for (int i = 0; i < args.length; i++) { 1409 String cmd = args[i]; 1410 if (cmd.equals("-h") || cmd.startsWith("--h")) { 1411 printUsage(); 1412 errCode = 0; 1413 break; 1414 } 1415 1416 final String nmr = "--nomapred"; 1417 if (cmd.startsWith(nmr)) { 1418 nomapred = true; 1419 continue; 1420 } 1421 1422 final String rows = "--rows="; 1423 if (cmd.startsWith(rows)) { 1424 R = Integer.parseInt(cmd.substring(rows.length())); 1425 continue; 1426 } 1427 1428 final String table = "--table="; 1429 if (cmd.startsWith(table)) { 1430 this.tableName = TableName.valueOf(cmd.substring(table.length())); 1431 continue; 1432 } 1433 1434 final String compress = "--compress="; 1435 if (cmd.startsWith(compress)) { 1436 this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 1437 continue; 1438 } 1439 1440 final String blockEncoding = "--blockEncoding="; 1441 if (cmd.startsWith(blockEncoding)) { 1442 this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 1443 continue; 1444 } 1445 1446 final String flushCommits = "--flushCommits="; 1447 if (cmd.startsWith(flushCommits)) { 1448 this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 1449 continue; 1450 } 1451 1452 final String writeToWAL = "--writeToWAL="; 1453 if (cmd.startsWith(writeToWAL)) { 1454 this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 1455 continue; 1456 } 1457 1458 final String presplit = "--presplit="; 1459 if (cmd.startsWith(presplit)) { 1460 this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 1461 continue; 1462 } 1463 1464 final String inMemory = "--inmemory="; 1465 if (cmd.startsWith(inMemory)) { 1466 this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 1467 continue; 1468 } 1469 1470 this.connection = ConnectionFactory.createConnection(getConf()); 1471 1472 final String useTags = "--usetags="; 1473 if (cmd.startsWith(useTags)) { 1474 this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 1475 continue; 1476 } 1477 1478 final String noOfTags = "--nooftags="; 1479 if (cmd.startsWith(noOfTags)) { 1480 this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 1481 continue; 1482 } 1483 1484 final String host = "--host="; 1485 if (cmd.startsWith(host)) { 1486 cluster.add(cmd.substring(host.length())); 1487 continue; 1488 } 1489 1490 Class<? extends Test> cmdClass = determineCommandClass(cmd); 1491 if (cmdClass != null) { 1492 getArgs(i + 1, args); 1493 if (cluster.isEmpty()) { 1494 String s = conf.get("stargate.hostname", "localhost"); 1495 if (s.contains(":")) { 1496 cluster.add(s); 1497 } else { 1498 cluster.add(s, conf.getInt("stargate.port", 8080)); 1499 } 1500 } 1501 runTest(cmdClass); 1502 errCode = 0; 1503 break; 1504 } 1505 1506 printUsage(); 1507 break; 1508 } 1509 } catch (Exception e) { 1510 LOG.error("Failed", e); 1511 } 1512 1513 return errCode; 1514 } 1515 1516 private Class<? extends Test> determineCommandClass(String cmd) { 1517 CmdDescriptor descriptor = commands.get(cmd); 1518 return descriptor != null ? descriptor.getCmdClass() : null; 1519 } 1520 1521 public static void main(final String[] args) throws Exception { 1522 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 1523 System.exit(res); 1524 } 1525}