001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.rest; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.lang.reflect.Constructor; 026import java.text.SimpleDateFormat; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Date; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.TreeMap; 034import java.util.regex.Matcher; 035import java.util.regex.Pattern; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.conf.Configured; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.CompareOperator; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HColumnDescriptor; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HTableDescriptor; 048import org.apache.hadoop.hbase.KeyValue; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.Tag; 051import org.apache.hadoop.hbase.ArrayBackedTag; 052import org.apache.hadoop.hbase.client.BufferedMutator; 053import org.apache.hadoop.hbase.client.Connection; 054import org.apache.hadoop.hbase.client.ConnectionFactory; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Get; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.ResultScanner; 060import org.apache.hadoop.hbase.client.Scan; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.filter.BinaryComparator; 063import org.apache.hadoop.hbase.filter.Filter; 064import org.apache.hadoop.hbase.filter.PageFilter; 065import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 066import org.apache.hadoop.hbase.filter.WhileMatchFilter; 067import org.apache.hadoop.hbase.io.compress.Compression; 068import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 069import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 070import org.apache.hadoop.hbase.rest.client.Client; 071import org.apache.hadoop.hbase.rest.client.Cluster; 072import org.apache.hadoop.hbase.rest.client.RemoteAdmin; 073import org.apache.hadoop.hbase.util.ByteArrayHashKey; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.Hash; 076import org.apache.hadoop.hbase.util.MurmurHash; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.io.LongWritable; 079import org.apache.hadoop.io.NullWritable; 080import org.apache.hadoop.io.Text; 081import org.apache.hadoop.io.Writable; 082import org.apache.hadoop.mapreduce.InputSplit; 083import org.apache.hadoop.mapreduce.Job; 084import org.apache.hadoop.mapreduce.JobContext; 085import org.apache.hadoop.mapreduce.Mapper; 086import org.apache.hadoop.mapreduce.RecordReader; 087import org.apache.hadoop.mapreduce.TaskAttemptContext; 088import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 089import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 090import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 091import org.apache.hadoop.util.LineReader; 092import org.apache.hadoop.util.Tool; 093import org.apache.hadoop.util.ToolRunner; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * Script used evaluating Stargate performance and scalability. Runs a SG 099 * client that steps through one of a set of hardcoded tests or 'experiments' 100 * (e.g. a random reads test, a random writes test, etc.). Pass on the 101 * command-line which test to run and how many clients are participating in 102 * this experiment. Run <code>java PerformanceEvaluation --help</code> to 103 * obtain usage. 104 * 105 * <p>This class sets up and runs the evaluation programs described in 106 * Section 7, <i>Performance Evaluation</i>, of the <a 107 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 108 * paper, pages 8-10. 109 * 110 * <p>If number of clients > 1, we start up a MapReduce job. Each map task 111 * runs an individual client. Each client does about 1GB of data. 112 */ 113public class PerformanceEvaluation extends Configured implements Tool { 114 protected static final Logger LOG = 115 LoggerFactory.getLogger(PerformanceEvaluation.class); 116 117 private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; 118 private static final int ROW_LENGTH = 1000; 119 private static final int TAG_LENGTH = 256; 120 private static final int ONE_GB = 1024 * 1024 * 1000; 121 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; 122 123 public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); 124 public static final byte [] FAMILY_NAME = Bytes.toBytes("info"); 125 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); 126 private TableName tableName = TABLE_NAME; 127 128 protected HTableDescriptor TABLE_DESCRIPTOR; 129 protected Map<String, CmdDescriptor> commands = new TreeMap<>(); 130 protected static Cluster cluster = new Cluster(); 131 132 volatile Configuration conf; 133 private boolean nomapred = false; 134 private int N = 1; 135 private int R = ROWS_PER_GB; 136 private Compression.Algorithm compression = Compression.Algorithm.NONE; 137 private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 138 private boolean flushCommits = true; 139 private boolean writeToWAL = true; 140 private boolean inMemoryCF = false; 141 private int presplitRegions = 0; 142 private boolean useTags = false; 143 private int noOfTags = 1; 144 private Connection connection; 145 146 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 147 /** 148 * Regex to parse lines in input file passed to mapreduce task. 149 */ 150 public static final Pattern LINE_PATTERN = 151 Pattern.compile("tableName=(\\w+),\\s+" + 152 "startRow=(\\d+),\\s+" + 153 "perClientRunRows=(\\d+),\\s+" + 154 "totalRows=(\\d+),\\s+" + 155 "clients=(\\d+),\\s+" + 156 "flushCommits=(\\w+),\\s+" + 157 "writeToWAL=(\\w+),\\s+" + 158 "useTags=(\\w+),\\s+" + 159 "noOfTags=(\\d+)"); 160 161 /** 162 * Enum for map metrics. Keep it out here rather than inside in the Map 163 * inner-class so we can find associated properties. 164 */ 165 protected static enum Counter { 166 /** elapsed time */ 167 ELAPSED_TIME, 168 /** number of rows */ 169 ROWS} 170 171 /** 172 * Constructor 173 * @param c Configuration object 174 */ 175 public PerformanceEvaluation(final Configuration c) { 176 this.conf = c; 177 178 addCommandDescriptor(RandomReadTest.class, "randomRead", 179 "Run random read test"); 180 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", 181 "Run random seek and scan 100 test"); 182 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 183 "Run random seek scan with both start and stop row (max 10 rows)"); 184 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 185 "Run random seek scan with both start and stop row (max 100 rows)"); 186 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 187 "Run random seek scan with both start and stop row (max 1000 rows)"); 188 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 189 "Run random seek scan with both start and stop row (max 10000 rows)"); 190 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 191 "Run random write test"); 192 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 193 "Run sequential read test"); 194 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 195 "Run sequential write test"); 196 addCommandDescriptor(ScanTest.class, "scan", 197 "Run scan test (read every row)"); 198 addCommandDescriptor(FilteredScanTest.class, "filterScan", 199 "Run scan test using a filter to find a specific row based " + 200 "on it's value (make sure to use --rows=20)"); 201 } 202 203 protected void addCommandDescriptor(Class<? extends Test> cmdClass, 204 String name, String description) { 205 CmdDescriptor cmdDescriptor = 206 new CmdDescriptor(cmdClass, name, description); 207 commands.put(name, cmdDescriptor); 208 } 209 210 /** 211 * Implementations can have their status set. 212 */ 213 interface Status { 214 /** 215 * Sets status 216 * @param msg status message 217 * @throws IOException 218 */ 219 void setStatus(final String msg) throws IOException; 220 } 221 222 /** 223 * This class works as the InputSplit of Performance Evaluation 224 * MapReduce InputFormat, and the Record Value of RecordReader. 225 * Each map task will only read one record from a PeInputSplit, 226 * the record value is the PeInputSplit itself. 227 */ 228 public static class PeInputSplit extends InputSplit implements Writable { 229 private TableName tableName = TABLE_NAME; 230 private int startRow = 0; 231 private int rows = 0; 232 private int totalRows = 0; 233 private int clients = 0; 234 private boolean flushCommits = false; 235 private boolean writeToWAL = true; 236 private boolean useTags = false; 237 private int noOfTags = 0; 238 239 public PeInputSplit() { 240 } 241 242 public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, 243 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { 244 this.tableName = tableName; 245 this.startRow = startRow; 246 this.rows = rows; 247 this.totalRows = totalRows; 248 this.clients = clients; 249 this.flushCommits = flushCommits; 250 this.writeToWAL = writeToWAL; 251 this.useTags = useTags; 252 this.noOfTags = noOfTags; 253 } 254 255 @Override 256 public void readFields(DataInput in) throws IOException { 257 int tableNameLen = in.readInt(); 258 byte[] name = new byte[tableNameLen]; 259 in.readFully(name); 260 this.tableName = TableName.valueOf(name); 261 this.startRow = in.readInt(); 262 this.rows = in.readInt(); 263 this.totalRows = in.readInt(); 264 this.clients = in.readInt(); 265 this.flushCommits = in.readBoolean(); 266 this.writeToWAL = in.readBoolean(); 267 this.useTags = in.readBoolean(); 268 this.noOfTags = in.readInt(); 269 } 270 271 @Override 272 public void write(DataOutput out) throws IOException { 273 byte[] name = this.tableName.toBytes(); 274 out.writeInt(name.length); 275 out.write(name); 276 out.writeInt(startRow); 277 out.writeInt(rows); 278 out.writeInt(totalRows); 279 out.writeInt(clients); 280 out.writeBoolean(flushCommits); 281 out.writeBoolean(writeToWAL); 282 out.writeBoolean(useTags); 283 out.writeInt(noOfTags); 284 } 285 286 @Override 287 public long getLength() throws IOException, InterruptedException { 288 return 0; 289 } 290 291 @Override 292 public String[] getLocations() throws IOException, InterruptedException { 293 return new String[0]; 294 } 295 296 public int getStartRow() { 297 return startRow; 298 } 299 300 public TableName getTableName() { 301 return tableName; 302 } 303 304 public int getRows() { 305 return rows; 306 } 307 308 public int getTotalRows() { 309 return totalRows; 310 } 311 312 public int getClients() { 313 return clients; 314 } 315 316 public boolean isFlushCommits() { 317 return flushCommits; 318 } 319 320 public boolean isWriteToWAL() { 321 return writeToWAL; 322 } 323 324 public boolean isUseTags() { 325 return useTags; 326 } 327 328 public int getNoOfTags() { 329 return noOfTags; 330 } 331 } 332 333 /** 334 * InputFormat of Performance Evaluation MapReduce job. 335 * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). 336 */ 337 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> { 338 339 @Override 340 public List<InputSplit> getSplits(JobContext job) throws IOException { 341 // generate splits 342 List<InputSplit> splitList = new ArrayList<>(); 343 344 for (FileStatus file: listStatus(job)) { 345 if (file.isDirectory()) { 346 continue; 347 } 348 Path path = file.getPath(); 349 FileSystem fs = path.getFileSystem(job.getConfiguration()); 350 FSDataInputStream fileIn = fs.open(path); 351 LineReader in = new LineReader(fileIn, job.getConfiguration()); 352 int lineLen = 0; 353 while(true) { 354 Text lineText = new Text(); 355 lineLen = in.readLine(lineText); 356 if(lineLen <= 0) { 357 break; 358 } 359 Matcher m = LINE_PATTERN.matcher(lineText.toString()); 360 if((m != null) && m.matches()) { 361 TableName tableName = TableName.valueOf(m.group(1)); 362 int startRow = Integer.parseInt(m.group(2)); 363 int rows = Integer.parseInt(m.group(3)); 364 int totalRows = Integer.parseInt(m.group(4)); 365 int clients = Integer.parseInt(m.group(5)); 366 boolean flushCommits = Boolean.parseBoolean(m.group(6)); 367 boolean writeToWAL = Boolean.parseBoolean(m.group(7)); 368 boolean useTags = Boolean.parseBoolean(m.group(8)); 369 int noOfTags = Integer.parseInt(m.group(9)); 370 371 LOG.debug("tableName=" + tableName + 372 " split["+ splitList.size() + "] " + 373 " startRow=" + startRow + 374 " rows=" + rows + 375 " totalRows=" + totalRows + 376 " clients=" + clients + 377 " flushCommits=" + flushCommits + 378 " writeToWAL=" + writeToWAL + 379 " useTags=" + useTags + 380 " noOfTags=" + noOfTags); 381 382 PeInputSplit newSplit = 383 new PeInputSplit(tableName, startRow, rows, totalRows, clients, 384 flushCommits, writeToWAL, useTags, noOfTags); 385 splitList.add(newSplit); 386 } 387 } 388 in.close(); 389 } 390 391 LOG.info("Total # of splits: " + splitList.size()); 392 return splitList; 393 } 394 395 @Override 396 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split, 397 TaskAttemptContext context) { 398 return new PeRecordReader(); 399 } 400 401 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> { 402 private boolean readOver = false; 403 private PeInputSplit split = null; 404 private NullWritable key = null; 405 private PeInputSplit value = null; 406 407 @Override 408 public void initialize(InputSplit split, TaskAttemptContext context) 409 throws IOException, InterruptedException { 410 this.readOver = false; 411 this.split = (PeInputSplit)split; 412 } 413 414 @Override 415 public boolean nextKeyValue() throws IOException, InterruptedException { 416 if(readOver) { 417 return false; 418 } 419 420 key = NullWritable.get(); 421 value = split; 422 423 readOver = true; 424 return true; 425 } 426 427 @Override 428 public NullWritable getCurrentKey() throws IOException, InterruptedException { 429 return key; 430 } 431 432 @Override 433 public PeInputSplit getCurrentValue() throws IOException, InterruptedException { 434 return value; 435 } 436 437 @Override 438 public float getProgress() throws IOException, InterruptedException { 439 if(readOver) { 440 return 1.0f; 441 } else { 442 return 0.0f; 443 } 444 } 445 446 @Override 447 public void close() throws IOException { 448 // do nothing 449 } 450 } 451 } 452 453 /** 454 * MapReduce job that runs a performance evaluation client in each map task. 455 */ 456 public static class EvaluationMapTask 457 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> { 458 459 /** configuration parameter name that contains the command */ 460 public final static String CMD_KEY = "EvaluationMapTask.command"; 461 /** configuration parameter name that contains the PE impl */ 462 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 463 464 private Class<? extends Test> cmd; 465 private PerformanceEvaluation pe; 466 467 @Override 468 protected void setup(Context context) throws IOException, InterruptedException { 469 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 470 471 // this is required so that extensions of PE are instantiated within the 472 // map reduce task... 473 Class<? extends PerformanceEvaluation> peClass = 474 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 475 try { 476 this.pe = peClass.getConstructor(Configuration.class) 477 .newInstance(context.getConfiguration()); 478 } catch (Exception e) { 479 throw new IllegalStateException("Could not instantiate PE instance", e); 480 } 481 } 482 483 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 484 Class<? extends Type> clazz = null; 485 try { 486 clazz = Class.forName(className).asSubclass(type); 487 } catch (ClassNotFoundException e) { 488 throw new IllegalStateException("Could not find class for name: " + className, e); 489 } 490 return clazz; 491 } 492 493 @Override 494 protected void map(NullWritable key, PeInputSplit value, final Context context) 495 throws IOException, InterruptedException { 496 497 Status status = new Status() { 498 @Override 499 public void setStatus(String msg) { 500 context.setStatus(msg); 501 } 502 }; 503 504 // Evaluation task 505 pe.tableName = value.getTableName(); 506 long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), 507 value.getRows(), value.getTotalRows(), 508 value.isFlushCommits(), value.isWriteToWAL(), 509 value.isUseTags(), value.getNoOfTags(), 510 ConnectionFactory.createConnection(context.getConfiguration()), status); 511 // Collect how much time the thing took. Report as map output and 512 // to the ELAPSED_TIME counter. 513 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); 514 context.getCounter(Counter.ROWS).increment(value.rows); 515 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); 516 context.progress(); 517 } 518 } 519 520 /* 521 * If table does not already exist, create. 522 * @param c Client to use checking. 523 * @return True if we created the table. 524 * @throws IOException 525 */ 526 private boolean checkTable(RemoteAdmin admin) throws IOException { 527 HTableDescriptor tableDescriptor = getTableDescriptor(); 528 if (this.presplitRegions > 0) { 529 // presplit requested 530 if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) { 531 admin.deleteTable(tableDescriptor.getTableName().getName()); 532 } 533 534 byte[][] splits = getSplits(); 535 for (int i=0; i < splits.length; i++) { 536 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 537 } 538 admin.createTable(tableDescriptor); 539 LOG.info ("Table created with " + this.presplitRegions + " splits"); 540 } else { 541 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 542 if (!tableExists) { 543 admin.createTable(tableDescriptor); 544 LOG.info("Table " + tableDescriptor + " created"); 545 } 546 } 547 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 548 return tableExists; 549 } 550 551 protected HTableDescriptor getTableDescriptor() { 552 if (TABLE_DESCRIPTOR == null) { 553 TABLE_DESCRIPTOR = new HTableDescriptor(tableName); 554 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 555 family.setDataBlockEncoding(blockEncoding); 556 family.setCompressionType(compression); 557 if (inMemoryCF) { 558 family.setInMemory(true); 559 } 560 TABLE_DESCRIPTOR.addFamily(family); 561 } 562 return TABLE_DESCRIPTOR; 563 } 564 565 /** 566 * Generates splits based on total number of rows and specified split regions 567 * 568 * @return splits : array of byte [] 569 */ 570 protected byte[][] getSplits() { 571 if (this.presplitRegions == 0) 572 return new byte [0][]; 573 574 int numSplitPoints = presplitRegions - 1; 575 byte[][] splits = new byte[numSplitPoints][]; 576 int jump = this.R / this.presplitRegions; 577 for (int i=0; i < numSplitPoints; i++) { 578 int rowkey = jump * (1 + i); 579 splits[i] = format(rowkey); 580 } 581 return splits; 582 } 583 584 /* 585 * We're to run multiple clients concurrently. Setup a mapreduce job. Run 586 * one map per client. Then run a single reduce to sum the elapsed times. 587 * @param cmd Command to run. 588 * @throws IOException 589 */ 590 private void runNIsMoreThanOne(final Class<? extends Test> cmd) 591 throws IOException, InterruptedException, ClassNotFoundException { 592 RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf()); 593 checkTable(remoteAdmin); 594 if (nomapred) { 595 doMultipleClients(cmd); 596 } else { 597 doMapReduce(cmd); 598 } 599 } 600 601 /* 602 * Run all clients in this vm each to its own thread. 603 * @param cmd Command to run. 604 * @throws IOException 605 */ 606 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException { 607 final List<Thread> threads = new ArrayList<>(this.N); 608 final long[] timings = new long[this.N]; 609 final int perClientRows = R/N; 610 final TableName tableName = this.tableName; 611 final DataBlockEncoding encoding = this.blockEncoding; 612 final boolean flushCommits = this.flushCommits; 613 final Compression.Algorithm compression = this.compression; 614 final boolean writeToWal = this.writeToWAL; 615 final int preSplitRegions = this.presplitRegions; 616 final boolean useTags = this.useTags; 617 final int numTags = this.noOfTags; 618 final Connection connection = ConnectionFactory.createConnection(getConf()); 619 for (int i = 0; i < this.N; i++) { 620 final int index = i; 621 Thread t = new Thread ("TestClient-" + i) { 622 @Override 623 public void run() { 624 super.run(); 625 PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); 626 pe.tableName = tableName; 627 pe.blockEncoding = encoding; 628 pe.flushCommits = flushCommits; 629 pe.compression = compression; 630 pe.writeToWAL = writeToWal; 631 pe.presplitRegions = preSplitRegions; 632 pe.N = N; 633 pe.connection = connection; 634 pe.useTags = useTags; 635 pe.noOfTags = numTags; 636 try { 637 long elapsedTime = pe.runOneClient(cmd, index * perClientRows, 638 perClientRows, R, 639 flushCommits, writeToWAL, useTags, noOfTags, connection, new Status() { 640 @Override 641 public void setStatus(final String msg) throws IOException { 642 LOG.info("client-" + getName() + " " + msg); 643 } 644 }); 645 timings[index] = elapsedTime; 646 LOG.info("Finished " + getName() + " in " + elapsedTime + 647 "ms writing " + perClientRows + " rows"); 648 } catch (IOException e) { 649 throw new RuntimeException(e); 650 } 651 } 652 }; 653 threads.add(t); 654 } 655 for (Thread t: threads) { 656 t.start(); 657 } 658 for (Thread t: threads) { 659 while(t.isAlive()) { 660 try { 661 t.join(); 662 } catch (InterruptedException e) { 663 LOG.debug("Interrupted, continuing" + e.toString()); 664 } 665 } 666 } 667 final String test = cmd.getSimpleName(); 668 LOG.info("[" + test + "] Summary of timings (ms): " 669 + Arrays.toString(timings)); 670 Arrays.sort(timings); 671 long total = 0; 672 for (int i = 0; i < this.N; i++) { 673 total += timings[i]; 674 } 675 LOG.info("[" + test + "]" 676 + "\tMin: " + timings[0] + "ms" 677 + "\tMax: " + timings[this.N - 1] + "ms" 678 + "\tAvg: " + (total / this.N) + "ms"); 679 } 680 681 /* 682 * Run a mapreduce job. Run as many maps as asked-for clients. 683 * Before we start up the job, write out an input file with instruction 684 * per client regards which row they are to start on. 685 * @param cmd Command to run. 686 * @throws IOException 687 */ 688 private void doMapReduce(final Class<? extends Test> cmd) throws IOException, 689 InterruptedException, ClassNotFoundException { 690 Configuration conf = getConf(); 691 Path inputDir = writeInputFile(conf); 692 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 693 conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); 694 Job job = Job.getInstance(conf); 695 job.setJarByClass(PerformanceEvaluation.class); 696 job.setJobName("HBase Performance Evaluation"); 697 698 job.setInputFormatClass(PeInputFormat.class); 699 PeInputFormat.setInputPaths(job, inputDir); 700 701 job.setOutputKeyClass(LongWritable.class); 702 job.setOutputValueClass(LongWritable.class); 703 704 job.setMapperClass(EvaluationMapTask.class); 705 job.setReducerClass(LongSumReducer.class); 706 job.setNumReduceTasks(1); 707 708 job.setOutputFormatClass(TextOutputFormat.class); 709 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 710 TableMapReduceUtil.addDependencyJars(job); 711 TableMapReduceUtil.initCredentials(job); 712 job.waitForCompletion(true); 713 } 714 715 /* 716 * Write input file of offsets-per-client for the mapreduce job. 717 * @param c Configuration 718 * @return Directory that contains file written. 719 * @throws IOException 720 */ 721 private Path writeInputFile(final Configuration c) throws IOException { 722 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 723 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); 724 Path inputDir = new Path(jobdir, "inputs"); 725 726 FileSystem fs = FileSystem.get(c); 727 fs.mkdirs(inputDir); 728 Path inputFile = new Path(inputDir, "input.txt"); 729 PrintStream out = new PrintStream(fs.create(inputFile)); 730 // Make input random. 731 Map<Integer, String> m = new TreeMap<>(); 732 Hash h = MurmurHash.getInstance(); 733 int perClientRows = (this.R / this.N); 734 try { 735 for (int i = 0; i < 10; i++) { 736 for (int j = 0; j < N; j++) { 737 String s = "tableName=" + this.tableName + 738 ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + 739 ", perClientRunRows=" + (perClientRows / 10) + 740 ", totalRows=" + this.R + 741 ", clients=" + this.N + 742 ", flushCommits=" + this.flushCommits + 743 ", writeToWAL=" + this.writeToWAL + 744 ", useTags=" + this.useTags + 745 ", noOfTags=" + this.noOfTags; 746 byte[] b = Bytes.toBytes(s); 747 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 748 m.put(hash, s); 749 } 750 } 751 for (Map.Entry<Integer, String> e: m.entrySet()) { 752 out.println(e.getValue()); 753 } 754 } finally { 755 out.close(); 756 } 757 return inputDir; 758 } 759 760 /** 761 * Describes a command. 762 */ 763 static class CmdDescriptor { 764 private Class<? extends Test> cmdClass; 765 private String name; 766 private String description; 767 768 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) { 769 this.cmdClass = cmdClass; 770 this.name = name; 771 this.description = description; 772 } 773 774 public Class<? extends Test> getCmdClass() { 775 return cmdClass; 776 } 777 778 public String getName() { 779 return name; 780 } 781 782 public String getDescription() { 783 return description; 784 } 785 } 786 787 /** 788 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test 789 * tests}. This makes the reflection logic a little easier to understand... 790 */ 791 static class TestOptions { 792 private int startRow; 793 private int perClientRunRows; 794 private int totalRows; 795 private int numClientThreads; 796 private TableName tableName; 797 private boolean flushCommits; 798 private boolean writeToWAL = true; 799 private boolean useTags = false; 800 private int noOfTags = 0; 801 private Connection connection; 802 803 TestOptions() { 804 } 805 806 TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, 807 TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, 808 int noOfTags, Connection connection) { 809 this.startRow = startRow; 810 this.perClientRunRows = perClientRunRows; 811 this.totalRows = totalRows; 812 this.numClientThreads = numClientThreads; 813 this.tableName = tableName; 814 this.flushCommits = flushCommits; 815 this.writeToWAL = writeToWAL; 816 this.useTags = useTags; 817 this.noOfTags = noOfTags; 818 this.connection = connection; 819 } 820 821 public int getStartRow() { 822 return startRow; 823 } 824 825 public int getPerClientRunRows() { 826 return perClientRunRows; 827 } 828 829 public int getTotalRows() { 830 return totalRows; 831 } 832 833 public int getNumClientThreads() { 834 return numClientThreads; 835 } 836 837 public TableName getTableName() { 838 return tableName; 839 } 840 841 public boolean isFlushCommits() { 842 return flushCommits; 843 } 844 845 public boolean isWriteToWAL() { 846 return writeToWAL; 847 } 848 849 public Connection getConnection() { 850 return connection; 851 } 852 853 public boolean isUseTags() { 854 return this.useTags; 855 } 856 857 public int getNumTags() { 858 return this.noOfTags; 859 } 860 } 861 862 /* 863 * A test. 864 * Subclass to particularize what happens per row. 865 */ 866 static abstract class Test { 867 // Below is make it so when Tests are all running in the one 868 // jvm, that they each have a differently seeded Random. 869 private static final Random randomSeed = 870 new Random(System.currentTimeMillis()); 871 private static long nextRandomSeed() { 872 return randomSeed.nextLong(); 873 } 874 protected final Random rand = new Random(nextRandomSeed()); 875 876 protected final int startRow; 877 protected final int perClientRunRows; 878 protected final int totalRows; 879 private final Status status; 880 protected TableName tableName; 881 protected volatile Configuration conf; 882 protected boolean writeToWAL; 883 protected boolean useTags; 884 protected int noOfTags; 885 protected Connection connection; 886 887 /** 888 * Note that all subclasses of this class must provide a public contructor 889 * that has the exact same list of arguments. 890 */ 891 Test(final Configuration conf, final TestOptions options, final Status status) { 892 super(); 893 this.startRow = options.getStartRow(); 894 this.perClientRunRows = options.getPerClientRunRows(); 895 this.totalRows = options.getTotalRows(); 896 this.status = status; 897 this.tableName = options.getTableName(); 898 this.conf = conf; 899 this.writeToWAL = options.isWriteToWAL(); 900 this.useTags = options.isUseTags(); 901 this.noOfTags = options.getNumTags(); 902 this.connection = options.getConnection(); 903 } 904 905 protected String generateStatus(final int sr, final int i, final int lr) { 906 return sr + "/" + i + "/" + lr; 907 } 908 909 protected int getReportingPeriod() { 910 int period = this.perClientRunRows / 10; 911 return period == 0? this.perClientRunRows: period; 912 } 913 914 abstract void testTakedown() throws IOException; 915 /* 916 * Run test 917 * @return Elapsed time. 918 * @throws IOException 919 */ 920 long test() throws IOException { 921 testSetup(); 922 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 923 final long startTime = System.nanoTime(); 924 try { 925 testTimed(); 926 } finally { 927 testTakedown(); 928 } 929 return (System.nanoTime() - startTime) / 1000000; 930 } 931 932 abstract void testSetup() throws IOException; 933 934 /** 935 * Provides an extension point for tests that don't want a per row invocation. 936 */ 937 void testTimed() throws IOException { 938 int lastRow = this.startRow + this.perClientRunRows; 939 // Report on completion of 1/10th of total. 940 for (int i = this.startRow; i < lastRow; i++) { 941 testRow(i); 942 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 943 status.setStatus(generateStatus(this.startRow, i, lastRow)); 944 } 945 } 946 } 947 948 /* 949 * Test for individual row. 950 * @param i Row index. 951 */ 952 abstract void testRow(final int i) throws IOException; 953 } 954 955 static abstract class TableTest extends Test { 956 protected Table table; 957 958 public TableTest(Configuration conf, TestOptions options, Status status) { 959 super(conf, options, status); 960 } 961 962 @Override 963 void testSetup() throws IOException { 964 this.table = connection.getTable(tableName); 965 } 966 967 @Override 968 void testTakedown() throws IOException { 969 table.close(); 970 } 971 } 972 973 static abstract class BufferedMutatorTest extends Test { 974 protected BufferedMutator mutator; 975 protected boolean flushCommits; 976 977 public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { 978 super(conf, options, status); 979 this.flushCommits = options.isFlushCommits(); 980 } 981 982 @Override 983 void testSetup() throws IOException { 984 this.mutator = connection.getBufferedMutator(tableName); 985 } 986 987 @Override 988 void testTakedown() throws IOException { 989 if (flushCommits) { 990 this.mutator.flush(); 991 } 992 mutator.close(); 993 } 994 } 995 996 static class RandomSeekScanTest extends TableTest { 997 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { 998 super(conf, options, status); 999 } 1000 1001 @Override 1002 void testRow(final int i) throws IOException { 1003 Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); 1004 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1005 scan.setFilter(new WhileMatchFilter(new PageFilter(120))); 1006 ResultScanner s = this.table.getScanner(scan); 1007 s.close(); 1008 } 1009 1010 @Override 1011 protected int getReportingPeriod() { 1012 int period = this.perClientRunRows / 100; 1013 return period == 0? this.perClientRunRows: period; 1014 } 1015 1016 } 1017 1018 @SuppressWarnings("unused") 1019 static abstract class RandomScanWithRangeTest extends TableTest { 1020 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { 1021 super(conf, options, status); 1022 } 1023 1024 @Override 1025 void testRow(final int i) throws IOException { 1026 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1027 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); 1028 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1029 ResultScanner s = this.table.getScanner(scan); 1030 int count = 0; 1031 for (Result rr = null; (rr = s.next()) != null;) { 1032 count++; 1033 } 1034 1035 if (i % 100 == 0) { 1036 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1037 Bytes.toString(startAndStopRow.getFirst()), 1038 Bytes.toString(startAndStopRow.getSecond()), count)); 1039 } 1040 1041 s.close(); 1042 } 1043 1044 protected abstract Pair<byte[],byte[]> getStartAndStopRow(); 1045 1046 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1047 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; 1048 int stop = start + maxRange; 1049 return new Pair<>(format(start), format(stop)); 1050 } 1051 1052 @Override 1053 protected int getReportingPeriod() { 1054 int period = this.perClientRunRows / 100; 1055 return period == 0? this.perClientRunRows: period; 1056 } 1057 } 1058 1059 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1060 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) { 1061 super(conf, options, status); 1062 } 1063 1064 @Override 1065 protected Pair<byte[], byte[]> getStartAndStopRow() { 1066 return generateStartAndStopRows(10); 1067 } 1068 } 1069 1070 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1071 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) { 1072 super(conf, options, status); 1073 } 1074 1075 @Override 1076 protected Pair<byte[], byte[]> getStartAndStopRow() { 1077 return generateStartAndStopRows(100); 1078 } 1079 } 1080 1081 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1082 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) { 1083 super(conf, options, status); 1084 } 1085 1086 @Override 1087 protected Pair<byte[], byte[]> getStartAndStopRow() { 1088 return generateStartAndStopRows(1000); 1089 } 1090 } 1091 1092 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1093 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) { 1094 super(conf, options, status); 1095 } 1096 1097 @Override 1098 protected Pair<byte[], byte[]> getStartAndStopRow() { 1099 return generateStartAndStopRows(10000); 1100 } 1101 } 1102 1103 static class RandomReadTest extends TableTest { 1104 RandomReadTest(Configuration conf, TestOptions options, Status status) { 1105 super(conf, options, status); 1106 } 1107 1108 @Override 1109 void testRow(final int i) throws IOException { 1110 Get get = new Get(getRandomRow(this.rand, this.totalRows)); 1111 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1112 this.table.get(get); 1113 } 1114 1115 @Override 1116 protected int getReportingPeriod() { 1117 int period = this.perClientRunRows / 100; 1118 return period == 0? this.perClientRunRows: period; 1119 } 1120 1121 } 1122 1123 static class RandomWriteTest extends BufferedMutatorTest { 1124 RandomWriteTest(Configuration conf, TestOptions options, Status status) { 1125 super(conf, options, status); 1126 } 1127 1128 @Override 1129 void testRow(final int i) throws IOException { 1130 byte[] row = getRandomRow(this.rand, this.totalRows); 1131 Put put = new Put(row); 1132 byte[] value = generateData(this.rand, ROW_LENGTH); 1133 if (useTags) { 1134 byte[] tag = generateData(this.rand, TAG_LENGTH); 1135 Tag[] tags = new Tag[noOfTags]; 1136 for (int n = 0; n < noOfTags; n++) { 1137 Tag t = new ArrayBackedTag((byte) n, tag); 1138 tags[n] = t; 1139 } 1140 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1141 value, tags); 1142 put.add(kv); 1143 } else { 1144 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1145 } 1146 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1147 mutator.mutate(put); 1148 } 1149 } 1150 1151 static class ScanTest extends TableTest { 1152 private ResultScanner testScanner; 1153 1154 ScanTest(Configuration conf, TestOptions options, Status status) { 1155 super(conf, options, status); 1156 } 1157 1158 @Override 1159 void testTakedown() throws IOException { 1160 if (this.testScanner != null) { 1161 this.testScanner.close(); 1162 } 1163 super.testTakedown(); 1164 } 1165 1166 1167 @Override 1168 void testRow(final int i) throws IOException { 1169 if (this.testScanner == null) { 1170 Scan scan = new Scan(format(this.startRow)); 1171 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1172 this.testScanner = table.getScanner(scan); 1173 } 1174 testScanner.next(); 1175 } 1176 1177 } 1178 1179 static class SequentialReadTest extends TableTest { 1180 SequentialReadTest(Configuration conf, TestOptions options, Status status) { 1181 super(conf, options, status); 1182 } 1183 1184 @Override 1185 void testRow(final int i) throws IOException { 1186 Get get = new Get(format(i)); 1187 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1188 table.get(get); 1189 } 1190 1191 } 1192 1193 static class SequentialWriteTest extends BufferedMutatorTest { 1194 1195 SequentialWriteTest(Configuration conf, TestOptions options, Status status) { 1196 super(conf, options, status); 1197 } 1198 1199 @Override 1200 void testRow(final int i) throws IOException { 1201 byte[] row = format(i); 1202 Put put = new Put(row); 1203 byte[] value = generateData(this.rand, ROW_LENGTH); 1204 if (useTags) { 1205 byte[] tag = generateData(this.rand, TAG_LENGTH); 1206 Tag[] tags = new Tag[noOfTags]; 1207 for (int n = 0; n < noOfTags; n++) { 1208 Tag t = new ArrayBackedTag((byte) n, tag); 1209 tags[n] = t; 1210 } 1211 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, 1212 value, tags); 1213 put.add(kv); 1214 } else { 1215 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1216 } 1217 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1218 mutator.mutate(put); 1219 } 1220 } 1221 1222 static class FilteredScanTest extends TableTest { 1223 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 1224 1225 FilteredScanTest(Configuration conf, TestOptions options, Status status) { 1226 super(conf, options, status); 1227 } 1228 1229 @Override 1230 void testRow(int i) throws IOException { 1231 byte[] value = generateValue(this.rand); 1232 Scan scan = constructScan(value); 1233 ResultScanner scanner = null; 1234 try { 1235 scanner = this.table.getScanner(scan); 1236 while (scanner.next() != null) { 1237 } 1238 } finally { 1239 if (scanner != null) scanner.close(); 1240 } 1241 } 1242 1243 protected Scan constructScan(byte[] valuePrefix) throws IOException { 1244 Filter filter = new SingleColumnValueFilter( 1245 FAMILY_NAME, QUALIFIER_NAME, CompareOperator.EQUAL, 1246 new BinaryComparator(valuePrefix) 1247 ); 1248 Scan scan = new Scan(); 1249 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1250 scan.setFilter(filter); 1251 return scan; 1252 } 1253 } 1254 1255 /* 1256 * Format passed integer. 1257 * @param number 1258 * @return Returns zero-prefixed 10-byte wide decimal version of passed 1259 * number (Does absolute in case number is negative). 1260 */ 1261 public static byte [] format(final int number) { 1262 byte [] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; 1263 int d = Math.abs(number); 1264 for (int i = b.length - 1; i >= 0; i--) { 1265 b[i] = (byte)((d % 10) + '0'); 1266 d /= 10; 1267 } 1268 return b; 1269 } 1270 1271 public static byte[] generateData(final Random r, int length) { 1272 byte [] b = new byte [length]; 1273 int i = 0; 1274 1275 for(i = 0; i < (length-8); i += 8) { 1276 b[i] = (byte) (65 + r.nextInt(26)); 1277 b[i+1] = b[i]; 1278 b[i+2] = b[i]; 1279 b[i+3] = b[i]; 1280 b[i+4] = b[i]; 1281 b[i+5] = b[i]; 1282 b[i+6] = b[i]; 1283 b[i+7] = b[i]; 1284 } 1285 1286 byte a = (byte) (65 + r.nextInt(26)); 1287 for(; i < length; i++) { 1288 b[i] = a; 1289 } 1290 return b; 1291 } 1292 1293 public static byte[] generateValue(final Random r) { 1294 byte [] b = new byte [ROW_LENGTH]; 1295 r.nextBytes(b); 1296 return b; 1297 } 1298 1299 static byte [] getRandomRow(final Random random, final int totalRows) { 1300 return format(random.nextInt(Integer.MAX_VALUE) % totalRows); 1301 } 1302 1303 long runOneClient(final Class<? extends Test> cmd, final int startRow, 1304 final int perClientRunRows, final int totalRows, 1305 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 1306 Connection connection, final Status status) 1307 throws IOException { 1308 status.setStatus("Start " + cmd + " at offset " + startRow + " for " + 1309 perClientRunRows + " rows"); 1310 long totalElapsedTime = 0; 1311 1312 TestOptions options = new TestOptions(startRow, perClientRunRows, 1313 totalRows, N, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection); 1314 final Test t; 1315 try { 1316 Constructor<? extends Test> constructor = cmd.getDeclaredConstructor( 1317 Configuration.class, TestOptions.class, Status.class); 1318 t = constructor.newInstance(this.conf, options, status); 1319 } catch (NoSuchMethodException e) { 1320 throw new IllegalArgumentException("Invalid command class: " + 1321 cmd.getName() + ". It does not provide a constructor as described by" + 1322 "the javadoc comment. Available constructors are: " + 1323 Arrays.toString(cmd.getConstructors())); 1324 } catch (Exception e) { 1325 throw new IllegalStateException("Failed to construct command class", e); 1326 } 1327 totalElapsedTime = t.test(); 1328 1329 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 1330 "ms at offset " + startRow + " for " + perClientRunRows + " rows"); 1331 return totalElapsedTime; 1332 } 1333 1334 private void runNIsOne(final Class<? extends Test> cmd) { 1335 Status status = new Status() { 1336 @Override 1337 public void setStatus(String msg) throws IOException { 1338 LOG.info(msg); 1339 } 1340 }; 1341 1342 RemoteAdmin admin = null; 1343 try { 1344 Client client = new Client(cluster); 1345 admin = new RemoteAdmin(client, getConf()); 1346 checkTable(admin); 1347 runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, 1348 this.useTags, this.noOfTags, this.connection, status); 1349 } catch (Exception e) { 1350 LOG.error("Failed", e); 1351 } 1352 } 1353 1354 private void runTest(final Class<? extends Test> cmd) throws IOException, 1355 InterruptedException, ClassNotFoundException { 1356 if (N == 1) { 1357 // If there is only one client and one HRegionServer, we assume nothing 1358 // has been set up at all. 1359 runNIsOne(cmd); 1360 } else { 1361 // Else, run 1362 runNIsMoreThanOne(cmd); 1363 } 1364 } 1365 1366 protected void printUsage() { 1367 printUsage(null); 1368 } 1369 1370 protected void printUsage(final String message) { 1371 if (message != null && message.length() > 0) { 1372 System.err.println(message); 1373 } 1374 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 1375 System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); 1376 System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " + 1377 "[-D<property=value>]* <command> <nclients>"); 1378 System.err.println(); 1379 System.err.println("General Options:"); 1380 System.err.println(" nomapred Run multiple clients using threads " + 1381 "(rather than use mapreduce)"); 1382 System.err.println(" rows Rows each client runs. Default: One million"); 1383 System.err.println(); 1384 System.err.println("Table Creation / Write Tests:"); 1385 System.err.println(" table Alternate table name. Default: 'TestTable'"); 1386 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 1387 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 1388 "Default: false"); 1389 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 1390 System.err.println(" presplit Create presplit table. Recommended for accurate perf " + 1391 "analysis (see guide). Default: disabled"); 1392 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 1393 "Default : false"); 1394 System.err.println(" numoftags Specify the no of tags that would be needed. " + 1395 "This works only if usetags is true."); 1396 System.err.println(); 1397 System.err.println("Read Tests:"); 1398 System.err.println(" inmemory Tries to keep the HFiles of the CF inmemory as far as " + 1399 "possible. Not guaranteed that reads are always served from inmemory. Default: false"); 1400 System.err.println(); 1401 System.err.println(" Note: -D properties will be applied to the conf used. "); 1402 System.err.println(" For example: "); 1403 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 1404 System.err.println(" -Dmapreduce.task.timeout=60000"); 1405 System.err.println(); 1406 System.err.println("Command:"); 1407 for (CmdDescriptor command : commands.values()) { 1408 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); 1409 } 1410 System.err.println(); 1411 System.err.println("Args:"); 1412 System.err.println(" nclients Integer. Required. Total number of " + 1413 "clients (and HRegionServers)"); 1414 System.err.println(" running: 1 <= value <= 500"); 1415 System.err.println("Examples:"); 1416 System.err.println(" To run a single evaluation client:"); 1417 System.err.println(" $ hbase " + this.getClass().getName() 1418 + " sequentialWrite 1"); 1419 } 1420 1421 private void getArgs(final int start, final String[] args) { 1422 if(start + 1 > args.length) { 1423 throw new IllegalArgumentException("must supply the number of clients"); 1424 } 1425 N = Integer.parseInt(args[start]); 1426 if (N < 1) { 1427 throw new IllegalArgumentException("Number of clients must be > 1"); 1428 } 1429 // Set total number of rows to write. 1430 R = R * N; 1431 } 1432 1433 @Override 1434 public int run(String[] args) throws Exception { 1435 // Process command-line args. TODO: Better cmd-line processing 1436 // (but hopefully something not as painful as cli options). 1437 int errCode = -1; 1438 if (args.length < 1) { 1439 printUsage(); 1440 return errCode; 1441 } 1442 1443 try { 1444 for (int i = 0; i < args.length; i++) { 1445 String cmd = args[i]; 1446 if (cmd.equals("-h") || cmd.startsWith("--h")) { 1447 printUsage(); 1448 errCode = 0; 1449 break; 1450 } 1451 1452 final String nmr = "--nomapred"; 1453 if (cmd.startsWith(nmr)) { 1454 nomapred = true; 1455 continue; 1456 } 1457 1458 final String rows = "--rows="; 1459 if (cmd.startsWith(rows)) { 1460 R = Integer.parseInt(cmd.substring(rows.length())); 1461 continue; 1462 } 1463 1464 final String table = "--table="; 1465 if (cmd.startsWith(table)) { 1466 this.tableName = TableName.valueOf(cmd.substring(table.length())); 1467 continue; 1468 } 1469 1470 final String compress = "--compress="; 1471 if (cmd.startsWith(compress)) { 1472 this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 1473 continue; 1474 } 1475 1476 final String blockEncoding = "--blockEncoding="; 1477 if (cmd.startsWith(blockEncoding)) { 1478 this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 1479 continue; 1480 } 1481 1482 final String flushCommits = "--flushCommits="; 1483 if (cmd.startsWith(flushCommits)) { 1484 this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 1485 continue; 1486 } 1487 1488 final String writeToWAL = "--writeToWAL="; 1489 if (cmd.startsWith(writeToWAL)) { 1490 this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 1491 continue; 1492 } 1493 1494 final String presplit = "--presplit="; 1495 if (cmd.startsWith(presplit)) { 1496 this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 1497 continue; 1498 } 1499 1500 final String inMemory = "--inmemory="; 1501 if (cmd.startsWith(inMemory)) { 1502 this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 1503 continue; 1504 } 1505 1506 this.connection = ConnectionFactory.createConnection(getConf()); 1507 1508 final String useTags = "--usetags="; 1509 if (cmd.startsWith(useTags)) { 1510 this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 1511 continue; 1512 } 1513 1514 final String noOfTags = "--nooftags="; 1515 if (cmd.startsWith(noOfTags)) { 1516 this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 1517 continue; 1518 } 1519 1520 final String host = "--host="; 1521 if (cmd.startsWith(host)) { 1522 cluster.add(cmd.substring(host.length())); 1523 continue; 1524 } 1525 1526 Class<? extends Test> cmdClass = determineCommandClass(cmd); 1527 if (cmdClass != null) { 1528 getArgs(i + 1, args); 1529 if (cluster.isEmpty()) { 1530 String s = conf.get("stargate.hostname", "localhost"); 1531 if (s.contains(":")) { 1532 cluster.add(s); 1533 } else { 1534 cluster.add(s, conf.getInt("stargate.port", 8080)); 1535 } 1536 } 1537 runTest(cmdClass); 1538 errCode = 0; 1539 break; 1540 } 1541 1542 printUsage(); 1543 break; 1544 } 1545 } catch (Exception e) { 1546 LOG.error("Failed", e); 1547 } 1548 1549 return errCode; 1550 } 1551 1552 private Class<? extends Test> determineCommandClass(String cmd) { 1553 CmdDescriptor descriptor = commands.get(cmd); 1554 return descriptor != null ? descriptor.getCmdClass() : null; 1555 } 1556 1557 /** 1558 * @param args 1559 */ 1560 public static void main(final String[] args) throws Exception { 1561 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 1562 System.exit(res); 1563 } 1564}