001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import com.codahale.metrics.Histogram; 022import com.codahale.metrics.UniformReservoir; 023import com.fasterxml.jackson.databind.MapperFeature; 024import com.fasterxml.jackson.databind.ObjectMapper; 025 026import java.io.IOException; 027import java.io.PrintStream; 028import java.lang.reflect.Constructor; 029import java.math.BigDecimal; 030import java.math.MathContext; 031import java.text.DecimalFormat; 032import java.text.SimpleDateFormat; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Date; 036import java.util.LinkedList; 037import java.util.Locale; 038import java.util.Map; 039import java.util.NoSuchElementException; 040import java.util.Queue; 041import java.util.Random; 042import java.util.TreeMap; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048 049import org.apache.commons.lang3.StringUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.conf.Configured; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.Append; 056import org.apache.hadoop.hbase.client.AsyncConnection; 057import org.apache.hadoop.hbase.client.AsyncTable; 058import org.apache.hadoop.hbase.client.BufferedMutator; 059import org.apache.hadoop.hbase.client.BufferedMutatorParams; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Consistency; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Durability; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Increment; 067import org.apache.hadoop.hbase.client.Put; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.ResultScanner; 070import org.apache.hadoop.hbase.client.RowMutations; 071import org.apache.hadoop.hbase.client.Scan; 072import org.apache.hadoop.hbase.client.Table; 073import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 074import org.apache.hadoop.hbase.filter.BinaryComparator; 075import org.apache.hadoop.hbase.filter.Filter; 076import org.apache.hadoop.hbase.filter.FilterAllFilter; 077import org.apache.hadoop.hbase.filter.FilterList; 078import org.apache.hadoop.hbase.filter.PageFilter; 079import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 080import org.apache.hadoop.hbase.filter.WhileMatchFilter; 081import org.apache.hadoop.hbase.io.compress.Compression; 082import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 083import org.apache.hadoop.hbase.io.hfile.RandomDistribution; 084import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 085import org.apache.hadoop.hbase.regionserver.BloomType; 086import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 087import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; 088import org.apache.hadoop.hbase.trace.SpanReceiverHost; 089import org.apache.hadoop.hbase.trace.TraceUtil; 090import org.apache.hadoop.hbase.util.ByteArrayHashKey; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.Hash; 093import org.apache.hadoop.hbase.util.MurmurHash; 094import org.apache.hadoop.hbase.util.Pair; 095import org.apache.hadoop.hbase.util.YammerHistogramUtils; 096import org.apache.hadoop.io.LongWritable; 097import org.apache.hadoop.io.Text; 098import org.apache.hadoop.mapreduce.Job; 099import org.apache.hadoop.mapreduce.Mapper; 100import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 101import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 102import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 103import org.apache.hadoop.util.Tool; 104import org.apache.hadoop.util.ToolRunner; 105import org.apache.htrace.core.ProbabilitySampler; 106import org.apache.htrace.core.Sampler; 107import org.apache.htrace.core.TraceScope; 108import org.apache.yetus.audience.InterfaceAudience; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 113 114/** 115 * Script used evaluating HBase performance and scalability. Runs a HBase 116 * client that steps through one of a set of hardcoded tests or 'experiments' 117 * (e.g. a random reads test, a random writes test, etc.). Pass on the 118 * command-line which test to run and how many clients are participating in 119 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 120 * 121 * <p>This class sets up and runs the evaluation programs described in 122 * Section 7, <i>Performance Evaluation</i>, of the <a 123 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 124 * paper, pages 8-10. 125 * 126 * <p>By default, runs as a mapreduce job where each mapper runs a single test 127 * client. Can also run as a non-mapreduce, multithreaded application by 128 * specifying {@code --nomapred}. Each client does about 1GB of data, unless 129 * specified otherwise. 130 */ 131@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 132public class PerformanceEvaluation extends Configured implements Tool { 133 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 134 static final String RANDOM_READ = "randomRead"; 135 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 136 private static final ObjectMapper MAPPER = new ObjectMapper(); 137 static { 138 MAPPER.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); 139 } 140 141 public static final String TABLE_NAME = "TestTable"; 142 public static final String FAMILY_NAME_BASE = "info"; 143 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 144 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 145 public static final int DEFAULT_VALUE_LENGTH = 1000; 146 public static final int ROW_LENGTH = 26; 147 148 private static final int ONE_GB = 1024 * 1024 * 1000; 149 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 150 // TODO : should we make this configurable 151 private static final int TAG_LENGTH = 256; 152 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 153 private static final MathContext CXT = MathContext.DECIMAL64; 154 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 155 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 156 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 157 158 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 159 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 160 161 static { 162 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 163 "Run async random read test"); 164 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 165 "Run async random write test"); 166 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 167 "Run async sequential read test"); 168 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 169 "Run async sequential write test"); 170 addCommandDescriptor(AsyncScanTest.class, "asyncScan", 171 "Run async scan test (read every row)"); 172 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, 173 "Run random read test"); 174 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 175 "Run random seek and scan 100 test"); 176 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 177 "Run random seek scan with both start and stop row (max 10 rows)"); 178 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 179 "Run random seek scan with both start and stop row (max 100 rows)"); 180 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 181 "Run random seek scan with both start and stop row (max 1000 rows)"); 182 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 183 "Run random seek scan with both start and stop row (max 10000 rows)"); 184 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 185 "Run random write test"); 186 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 187 "Run sequential read test"); 188 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 189 "Run sequential write test"); 190 addCommandDescriptor(ScanTest.class, "scan", 191 "Run scan test (read every row)"); 192 addCommandDescriptor(FilteredScanTest.class, "filterScan", 193 "Run scan test using a filter to find a specific row based on it's value " + 194 "(make sure to use --rows=20)"); 195 addCommandDescriptor(IncrementTest.class, "increment", 196 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 197 addCommandDescriptor(AppendTest.class, "append", 198 "Append on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 200 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 202 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 203 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 204 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 205 } 206 207 /** 208 * Enum for map metrics. Keep it out here rather than inside in the Map 209 * inner-class so we can find associated properties. 210 */ 211 protected static enum Counter { 212 /** elapsed time */ 213 ELAPSED_TIME, 214 /** number of rows */ 215 ROWS 216 } 217 218 protected static class RunResult implements Comparable<RunResult> { 219 public RunResult(long duration, Histogram hist) { 220 this.duration = duration; 221 this.hist = hist; 222 } 223 224 public final long duration; 225 public final Histogram hist; 226 227 @Override 228 public String toString() { 229 return Long.toString(duration); 230 } 231 232 @Override public int compareTo(RunResult o) { 233 return Long.compare(this.duration, o.duration); 234 } 235 } 236 237 /** 238 * Constructor 239 * @param conf Configuration object 240 */ 241 public PerformanceEvaluation(final Configuration conf) { 242 super(conf); 243 } 244 245 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, 246 String name, String description) { 247 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 248 COMMANDS.put(name, cmdDescriptor); 249 } 250 251 /** 252 * Implementations can have their status set. 253 */ 254 interface Status { 255 /** 256 * Sets status 257 * @param msg status message 258 * @throws IOException 259 */ 260 void setStatus(final String msg) throws IOException; 261 } 262 263 /** 264 * MapReduce job that runs a performance evaluation client in each map task. 265 */ 266 public static class EvaluationMapTask 267 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 268 269 /** configuration parameter name that contains the command */ 270 public final static String CMD_KEY = "EvaluationMapTask.command"; 271 /** configuration parameter name that contains the PE impl */ 272 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 273 274 private Class<? extends Test> cmd; 275 276 @Override 277 protected void setup(Context context) throws IOException, InterruptedException { 278 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 279 280 // this is required so that extensions of PE are instantiated within the 281 // map reduce task... 282 Class<? extends PerformanceEvaluation> peClass = 283 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 284 try { 285 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 286 } catch (Exception e) { 287 throw new IllegalStateException("Could not instantiate PE instance", e); 288 } 289 } 290 291 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 292 try { 293 return Class.forName(className).asSubclass(type); 294 } catch (ClassNotFoundException e) { 295 throw new IllegalStateException("Could not find class for name: " + className, e); 296 } 297 } 298 299 @Override 300 protected void map(LongWritable key, Text value, final Context context) 301 throws IOException, InterruptedException { 302 303 Status status = new Status() { 304 @Override 305 public void setStatus(String msg) { 306 context.setStatus(msg); 307 } 308 }; 309 310 ObjectMapper mapper = new ObjectMapper(); 311 TestOptions opts = mapper.readValue(value.toString(), TestOptions.class); 312 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 313 final Connection con = ConnectionFactory.createConnection(conf); 314 AsyncConnection asyncCon = null; 315 try { 316 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 317 } catch (ExecutionException e) { 318 throw new IOException(e); 319 } 320 321 // Evaluation task 322 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 323 // Collect how much time the thing took. Report as map output and 324 // to the ELAPSED_TIME counter. 325 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 326 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 327 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 328 context.progress(); 329 } 330 } 331 332 /* 333 * If table does not already exist, create. Also create a table when 334 * {@code opts.presplitRegions} is specified or when the existing table's 335 * region replica count doesn't match {@code opts.replicas}. 336 */ 337 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 338 TableName tableName = TableName.valueOf(opts.tableName); 339 boolean needsDelete = false, exists = admin.tableExists(tableName); 340 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 341 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 342 if (!exists && isReadCmd) { 343 throw new IllegalStateException( 344 "Must specify an existing table for read commands. Run a write command first."); 345 } 346 HTableDescriptor desc = 347 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; 348 byte[][] splits = getSplits(opts); 349 350 // recreate the table when user has requested presplit or when existing 351 // {RegionSplitPolicy,replica count} does not match requested, or when the 352 // number of column families does not match requested. 353 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 354 || (!isReadCmd && desc != null && 355 !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 356 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 357 || (desc != null && desc.getColumnFamilyCount() != opts.families)) { 358 needsDelete = true; 359 // wait, why did it delete my table?!? 360 LOG.debug(MoreObjects.toStringHelper("needsDelete") 361 .add("needsDelete", needsDelete) 362 .add("isReadCmd", isReadCmd) 363 .add("exists", exists) 364 .add("desc", desc) 365 .add("presplit", opts.presplitRegions) 366 .add("splitPolicy", opts.splitPolicy) 367 .add("replicas", opts.replicas) 368 .add("families", opts.families) 369 .toString()); 370 } 371 372 // remove an existing table 373 if (needsDelete) { 374 if (admin.isTableEnabled(tableName)) { 375 admin.disableTable(tableName); 376 } 377 admin.deleteTable(tableName); 378 } 379 380 // table creation is necessary 381 if (!exists || needsDelete) { 382 desc = getTableDescriptor(opts); 383 if (splits != null) { 384 if (LOG.isDebugEnabled()) { 385 for (int i = 0; i < splits.length; i++) { 386 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 387 } 388 } 389 } 390 admin.createTable(desc, splits); 391 LOG.info("Table " + desc + " created"); 392 } 393 return admin.tableExists(tableName); 394 } 395 396 /** 397 * Create an HTableDescriptor from provided TestOptions. 398 */ 399 protected static HTableDescriptor getTableDescriptor(TestOptions opts) { 400 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName)); 401 for (int family = 0; family < opts.families; family++) { 402 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 403 HColumnDescriptor familyDesc = new HColumnDescriptor(familyName); 404 familyDesc.setDataBlockEncoding(opts.blockEncoding); 405 familyDesc.setCompressionType(opts.compression); 406 familyDesc.setBloomFilterType(opts.bloomType); 407 familyDesc.setBlocksize(opts.blockSize); 408 if (opts.inMemoryCF) { 409 familyDesc.setInMemory(true); 410 } 411 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction); 412 tableDesc.addFamily(familyDesc); 413 } 414 if (opts.replicas != DEFAULT_OPTS.replicas) { 415 tableDesc.setRegionReplication(opts.replicas); 416 } 417 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 418 tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy); 419 } 420 return tableDesc; 421 } 422 423 /** 424 * generates splits based on total number of rows and specified split regions 425 */ 426 protected static byte[][] getSplits(TestOptions opts) { 427 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) 428 return null; 429 430 int numSplitPoints = opts.presplitRegions - 1; 431 byte[][] splits = new byte[numSplitPoints][]; 432 int jump = opts.totalRows / opts.presplitRegions; 433 for (int i = 0; i < numSplitPoints; i++) { 434 int rowkey = jump * (1 + i); 435 splits[i] = format(rowkey); 436 } 437 return splits; 438 } 439 440 /* 441 * Run all clients in this vm each to its own thread. 442 */ 443 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 444 throws IOException, InterruptedException, ExecutionException { 445 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 446 assert cmd != null; 447 @SuppressWarnings("unchecked") 448 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 449 RunResult[] results = new RunResult[opts.numClientThreads]; 450 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 451 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 452 final Connection con = ConnectionFactory.createConnection(conf); 453 final AsyncConnection asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 454 for (int i = 0; i < threads.length; i++) { 455 final int index = i; 456 threads[i] = pool.submit(new Callable<RunResult>() { 457 @Override 458 public RunResult call() throws Exception { 459 TestOptions threadOpts = new TestOptions(opts); 460 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 461 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 462 @Override 463 public void setStatus(final String msg) throws IOException { 464 LOG.info(msg); 465 } 466 }); 467 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration + 468 "ms over " + threadOpts.perClientRunRows + " rows"); 469 return run; 470 } 471 }); 472 } 473 pool.shutdown(); 474 475 for (int i = 0; i < threads.length; i++) { 476 try { 477 results[i] = threads[i].get(); 478 } catch (ExecutionException e) { 479 throw new IOException(e.getCause()); 480 } 481 } 482 final String test = cmd.getSimpleName(); 483 LOG.info("[" + test + "] Summary of timings (ms): " 484 + Arrays.toString(results)); 485 Arrays.sort(results); 486 long total = 0; 487 for (RunResult result : results) { 488 total += result.duration; 489 } 490 LOG.info("[" + test + "]" 491 + "\tMin: " + results[0] + "ms" 492 + "\tMax: " + results[results.length - 1] + "ms" 493 + "\tAvg: " + (total / results.length) + "ms"); 494 495 con.close(); 496 asyncCon.close(); 497 498 return results; 499 } 500 501 /* 502 * Run a mapreduce job. Run as many maps as asked-for clients. 503 * Before we start up the job, write out an input file with instruction 504 * per client regards which row they are to start on. 505 * @param cmd Command to run. 506 * @throws IOException 507 */ 508 static Job doMapReduce(TestOptions opts, final Configuration conf) 509 throws IOException, InterruptedException, ClassNotFoundException { 510 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 511 assert cmd != null; 512 Path inputDir = writeInputFile(conf, opts); 513 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 514 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 515 Job job = Job.getInstance(conf); 516 job.setJarByClass(PerformanceEvaluation.class); 517 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 518 519 job.setInputFormatClass(NLineInputFormat.class); 520 NLineInputFormat.setInputPaths(job, inputDir); 521 // this is default, but be explicit about it just in case. 522 NLineInputFormat.setNumLinesPerSplit(job, 1); 523 524 job.setOutputKeyClass(LongWritable.class); 525 job.setOutputValueClass(LongWritable.class); 526 527 job.setMapperClass(EvaluationMapTask.class); 528 job.setReducerClass(LongSumReducer.class); 529 530 job.setNumReduceTasks(1); 531 532 job.setOutputFormatClass(TextOutputFormat.class); 533 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 534 535 TableMapReduceUtil.addDependencyJars(job); 536 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 537 Histogram.class, // yammer metrics 538 ObjectMapper.class, // jackson-mapper-asl 539 FilterAllFilter.class // hbase-server tests jar 540 ); 541 542 TableMapReduceUtil.initCredentials(job); 543 544 job.waitForCompletion(true); 545 return job; 546 } 547 548 /** 549 * Each client has one mapper to do the work, and client do the resulting count in a map task. 550 */ 551 552 static String JOB_INPUT_FILENAME = "input.txt"; 553 554 /* 555 * Write input file of offsets-per-client for the mapreduce job. 556 * @param c Configuration 557 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 558 * @throws IOException 559 */ 560 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 561 return writeInputFile(c, opts, new Path(".")); 562 } 563 564 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 565 throws IOException { 566 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 567 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 568 Path inputDir = new Path(jobdir, "inputs"); 569 570 FileSystem fs = FileSystem.get(c); 571 fs.mkdirs(inputDir); 572 573 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 574 PrintStream out = new PrintStream(fs.create(inputFile)); 575 // Make input random. 576 Map<Integer, String> m = new TreeMap<>(); 577 Hash h = MurmurHash.getInstance(); 578 int perClientRows = (opts.totalRows / opts.numClientThreads); 579 try { 580 for (int j = 0; j < opts.numClientThreads; j++) { 581 TestOptions next = new TestOptions(opts); 582 next.startRow = j * perClientRows; 583 next.perClientRunRows = perClientRows; 584 String s = MAPPER.writeValueAsString(next); 585 LOG.info("Client=" + j + ", input=" + s); 586 byte[] b = Bytes.toBytes(s); 587 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 588 m.put(hash, s); 589 } 590 for (Map.Entry<Integer, String> e: m.entrySet()) { 591 out.println(e.getValue()); 592 } 593 } finally { 594 out.close(); 595 } 596 return inputDir; 597 } 598 599 /** 600 * Describes a command. 601 */ 602 static class CmdDescriptor { 603 private Class<? extends TestBase> cmdClass; 604 private String name; 605 private String description; 606 607 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 608 this.cmdClass = cmdClass; 609 this.name = name; 610 this.description = description; 611 } 612 613 public Class<? extends TestBase> getCmdClass() { 614 return cmdClass; 615 } 616 617 public String getName() { 618 return name; 619 } 620 621 public String getDescription() { 622 return description; 623 } 624 } 625 626 /** 627 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. 628 * This makes tracking all these arguments a little easier. 629 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON 630 * serialization of this TestOptions class behave), and you need to add to the clone constructor 631 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below. 632 */ 633 static class TestOptions { 634 String cmdName = null; 635 boolean nomapred = false; 636 boolean filterAll = false; 637 int startRow = 0; 638 float size = 1.0f; 639 int perClientRunRows = DEFAULT_ROWS_PER_GB; 640 int numClientThreads = 1; 641 int totalRows = DEFAULT_ROWS_PER_GB; 642 int measureAfter = 0; 643 float sampleRate = 1.0f; 644 double traceRate = 0.0; 645 String tableName = TABLE_NAME; 646 boolean flushCommits = true; 647 boolean writeToWAL = true; 648 boolean autoFlush = false; 649 boolean oneCon = false; 650 boolean useTags = false; 651 int noOfTags = 1; 652 boolean reportLatency = false; 653 int multiGet = 0; 654 int randomSleep = 0; 655 boolean inMemoryCF = false; 656 int presplitRegions = 0; 657 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; 658 String splitPolicy = null; 659 Compression.Algorithm compression = Compression.Algorithm.NONE; 660 BloomType bloomType = BloomType.ROW; 661 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 662 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 663 boolean valueRandom = false; 664 boolean valueZipf = false; 665 int valueSize = DEFAULT_VALUE_LENGTH; 666 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; 667 int cycles = 1; 668 int columns = 1; 669 int families = 1; 670 int caching = 30; 671 boolean addColumns = true; 672 MemoryCompactionPolicy inMemoryCompaction = 673 MemoryCompactionPolicy.valueOf( 674 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 675 boolean asyncPrefetch = false; 676 boolean cacheBlocks = true; 677 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 678 long bufferSize = 2l * 1024l * 1024l; 679 680 public TestOptions() {} 681 682 /** 683 * Clone constructor. 684 * @param that Object to copy from. 685 */ 686 public TestOptions(TestOptions that) { 687 this.cmdName = that.cmdName; 688 this.cycles = that.cycles; 689 this.nomapred = that.nomapred; 690 this.startRow = that.startRow; 691 this.size = that.size; 692 this.perClientRunRows = that.perClientRunRows; 693 this.numClientThreads = that.numClientThreads; 694 this.totalRows = that.totalRows; 695 this.sampleRate = that.sampleRate; 696 this.traceRate = that.traceRate; 697 this.tableName = that.tableName; 698 this.flushCommits = that.flushCommits; 699 this.writeToWAL = that.writeToWAL; 700 this.autoFlush = that.autoFlush; 701 this.oneCon = that.oneCon; 702 this.useTags = that.useTags; 703 this.noOfTags = that.noOfTags; 704 this.reportLatency = that.reportLatency; 705 this.multiGet = that.multiGet; 706 this.inMemoryCF = that.inMemoryCF; 707 this.presplitRegions = that.presplitRegions; 708 this.replicas = that.replicas; 709 this.splitPolicy = that.splitPolicy; 710 this.compression = that.compression; 711 this.blockEncoding = that.blockEncoding; 712 this.filterAll = that.filterAll; 713 this.bloomType = that.bloomType; 714 this.blockSize = that.blockSize; 715 this.valueRandom = that.valueRandom; 716 this.valueZipf = that.valueZipf; 717 this.valueSize = that.valueSize; 718 this.period = that.period; 719 this.randomSleep = that.randomSleep; 720 this.measureAfter = that.measureAfter; 721 this.addColumns = that.addColumns; 722 this.columns = that.columns; 723 this.families = that.families; 724 this.caching = that.caching; 725 this.inMemoryCompaction = that.inMemoryCompaction; 726 this.asyncPrefetch = that.asyncPrefetch; 727 this.cacheBlocks = that.cacheBlocks; 728 this.scanReadType = that.scanReadType; 729 this.bufferSize = that.bufferSize; 730 } 731 732 public int getCaching() { 733 return this.caching; 734 } 735 736 public void setCaching(final int caching) { 737 this.caching = caching; 738 } 739 740 public int getColumns() { 741 return this.columns; 742 } 743 744 public void setColumns(final int columns) { 745 this.columns = columns; 746 } 747 748 public int getFamilies() { 749 return this.families; 750 } 751 752 public void setFamilies(final int families) { 753 this.families = families; 754 } 755 756 public int getCycles() { 757 return this.cycles; 758 } 759 760 public void setCycles(final int cycles) { 761 this.cycles = cycles; 762 } 763 764 public boolean isValueZipf() { 765 return valueZipf; 766 } 767 768 public void setValueZipf(boolean valueZipf) { 769 this.valueZipf = valueZipf; 770 } 771 772 public String getCmdName() { 773 return cmdName; 774 } 775 776 public void setCmdName(String cmdName) { 777 this.cmdName = cmdName; 778 } 779 780 public int getRandomSleep() { 781 return randomSleep; 782 } 783 784 public void setRandomSleep(int randomSleep) { 785 this.randomSleep = randomSleep; 786 } 787 788 public int getReplicas() { 789 return replicas; 790 } 791 792 public void setReplicas(int replicas) { 793 this.replicas = replicas; 794 } 795 796 public String getSplitPolicy() { 797 return splitPolicy; 798 } 799 800 public void setSplitPolicy(String splitPolicy) { 801 this.splitPolicy = splitPolicy; 802 } 803 804 public void setNomapred(boolean nomapred) { 805 this.nomapred = nomapred; 806 } 807 808 public void setFilterAll(boolean filterAll) { 809 this.filterAll = filterAll; 810 } 811 812 public void setStartRow(int startRow) { 813 this.startRow = startRow; 814 } 815 816 public void setSize(float size) { 817 this.size = size; 818 } 819 820 public void setPerClientRunRows(int perClientRunRows) { 821 this.perClientRunRows = perClientRunRows; 822 } 823 824 public void setNumClientThreads(int numClientThreads) { 825 this.numClientThreads = numClientThreads; 826 } 827 828 public void setTotalRows(int totalRows) { 829 this.totalRows = totalRows; 830 } 831 832 public void setSampleRate(float sampleRate) { 833 this.sampleRate = sampleRate; 834 } 835 836 public void setTraceRate(double traceRate) { 837 this.traceRate = traceRate; 838 } 839 840 public void setTableName(String tableName) { 841 this.tableName = tableName; 842 } 843 844 public void setFlushCommits(boolean flushCommits) { 845 this.flushCommits = flushCommits; 846 } 847 848 public void setWriteToWAL(boolean writeToWAL) { 849 this.writeToWAL = writeToWAL; 850 } 851 852 public void setAutoFlush(boolean autoFlush) { 853 this.autoFlush = autoFlush; 854 } 855 856 public void setOneCon(boolean oneCon) { 857 this.oneCon = oneCon; 858 } 859 860 public void setUseTags(boolean useTags) { 861 this.useTags = useTags; 862 } 863 864 public void setNoOfTags(int noOfTags) { 865 this.noOfTags = noOfTags; 866 } 867 868 public void setReportLatency(boolean reportLatency) { 869 this.reportLatency = reportLatency; 870 } 871 872 public void setMultiGet(int multiGet) { 873 this.multiGet = multiGet; 874 } 875 876 public void setInMemoryCF(boolean inMemoryCF) { 877 this.inMemoryCF = inMemoryCF; 878 } 879 880 public void setPresplitRegions(int presplitRegions) { 881 this.presplitRegions = presplitRegions; 882 } 883 884 public void setCompression(Compression.Algorithm compression) { 885 this.compression = compression; 886 } 887 888 public void setBloomType(BloomType bloomType) { 889 this.bloomType = bloomType; 890 } 891 892 public void setBlockSize(int blockSize) { 893 this.blockSize = blockSize; 894 } 895 896 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 897 this.blockEncoding = blockEncoding; 898 } 899 900 public void setValueRandom(boolean valueRandom) { 901 this.valueRandom = valueRandom; 902 } 903 904 public void setValueSize(int valueSize) { 905 this.valueSize = valueSize; 906 } 907 908 public void setBufferSize(long bufferSize) { 909 this.bufferSize = bufferSize; 910 } 911 912 public void setPeriod(int period) { 913 this.period = period; 914 } 915 916 public boolean isNomapred() { 917 return nomapred; 918 } 919 920 public boolean isFilterAll() { 921 return filterAll; 922 } 923 924 public int getStartRow() { 925 return startRow; 926 } 927 928 public float getSize() { 929 return size; 930 } 931 932 public int getPerClientRunRows() { 933 return perClientRunRows; 934 } 935 936 public int getNumClientThreads() { 937 return numClientThreads; 938 } 939 940 public int getTotalRows() { 941 return totalRows; 942 } 943 944 public float getSampleRate() { 945 return sampleRate; 946 } 947 948 public double getTraceRate() { 949 return traceRate; 950 } 951 952 public String getTableName() { 953 return tableName; 954 } 955 956 public boolean isFlushCommits() { 957 return flushCommits; 958 } 959 960 public boolean isWriteToWAL() { 961 return writeToWAL; 962 } 963 964 public boolean isAutoFlush() { 965 return autoFlush; 966 } 967 968 public boolean isUseTags() { 969 return useTags; 970 } 971 972 public int getNoOfTags() { 973 return noOfTags; 974 } 975 976 public boolean isReportLatency() { 977 return reportLatency; 978 } 979 980 public int getMultiGet() { 981 return multiGet; 982 } 983 984 public boolean isInMemoryCF() { 985 return inMemoryCF; 986 } 987 988 public int getPresplitRegions() { 989 return presplitRegions; 990 } 991 992 public Compression.Algorithm getCompression() { 993 return compression; 994 } 995 996 public DataBlockEncoding getBlockEncoding() { 997 return blockEncoding; 998 } 999 1000 public boolean isValueRandom() { 1001 return valueRandom; 1002 } 1003 1004 public int getValueSize() { 1005 return valueSize; 1006 } 1007 1008 public int getPeriod() { 1009 return period; 1010 } 1011 1012 public BloomType getBloomType() { 1013 return bloomType; 1014 } 1015 1016 public int getBlockSize() { 1017 return blockSize; 1018 } 1019 1020 public boolean isOneCon() { 1021 return oneCon; 1022 } 1023 1024 public int getMeasureAfter() { 1025 return measureAfter; 1026 } 1027 1028 public void setMeasureAfter(int measureAfter) { 1029 this.measureAfter = measureAfter; 1030 } 1031 1032 public boolean getAddColumns() { 1033 return addColumns; 1034 } 1035 1036 public void setAddColumns(boolean addColumns) { 1037 this.addColumns = addColumns; 1038 } 1039 1040 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1041 this.inMemoryCompaction = inMemoryCompaction; 1042 } 1043 1044 public MemoryCompactionPolicy getInMemoryCompaction() { 1045 return this.inMemoryCompaction; 1046 } 1047 1048 public long getBufferSize() { 1049 return this.bufferSize; 1050 } 1051 } 1052 1053 /* 1054 * A test. 1055 * Subclass to particularize what happens per row. 1056 */ 1057 static abstract class TestBase { 1058 // Below is make it so when Tests are all running in the one 1059 // jvm, that they each have a differently seeded Random. 1060 private static final Random randomSeed = new Random(System.currentTimeMillis()); 1061 1062 private static long nextRandomSeed() { 1063 return randomSeed.nextLong(); 1064 } 1065 private final int everyN; 1066 1067 protected final Random rand = new Random(nextRandomSeed()); 1068 protected final Configuration conf; 1069 protected final TestOptions opts; 1070 1071 private final Status status; 1072 private final Sampler traceSampler; 1073 private final SpanReceiverHost receiverHost; 1074 1075 private String testName; 1076 private Histogram latencyHistogram; 1077 private Histogram valueSizeHistogram; 1078 private Histogram rpcCallsHistogram; 1079 private Histogram remoteRpcCallsHistogram; 1080 private Histogram millisBetweenNextHistogram; 1081 private Histogram regionsScannedHistogram; 1082 private Histogram bytesInResultsHistogram; 1083 private Histogram bytesInRemoteResultsHistogram; 1084 private RandomDistribution.Zipf zipf; 1085 1086 /** 1087 * Note that all subclasses of this class must provide a public constructor 1088 * that has the exact same list of arguments. 1089 */ 1090 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1091 this.conf = conf; 1092 this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); 1093 this.opts = options; 1094 this.status = status; 1095 this.testName = this.getClass().getSimpleName(); 1096 if (options.traceRate >= 1.0) { 1097 this.traceSampler = Sampler.ALWAYS; 1098 } else if (options.traceRate > 0.0) { 1099 conf.setDouble("hbase.sampler.fraction", options.traceRate); 1100 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); 1101 } else { 1102 this.traceSampler = Sampler.NEVER; 1103 } 1104 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1105 if (options.isValueZipf()) { 1106 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1107 } 1108 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1109 } 1110 1111 int getValueLength(final Random r) { 1112 if (this.opts.isValueRandom()) { 1113 return r.nextInt(opts.valueSize); 1114 } else if (this.opts.isValueZipf()) { 1115 return Math.abs(this.zipf.nextInt()); 1116 } else { 1117 return opts.valueSize; 1118 } 1119 } 1120 1121 void updateValueSize(final Result [] rs) throws IOException { 1122 if (rs == null || !isRandomValueSize()) return; 1123 for (Result r: rs) updateValueSize(r); 1124 } 1125 1126 void updateValueSize(final Result r) throws IOException { 1127 if (r == null || !isRandomValueSize()) return; 1128 int size = 0; 1129 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1130 size += scanner.current().getValueLength(); 1131 } 1132 updateValueSize(size); 1133 } 1134 1135 void updateValueSize(final int valueSize) { 1136 if (!isRandomValueSize()) return; 1137 this.valueSizeHistogram.update(valueSize); 1138 } 1139 1140 void updateScanMetrics(final ScanMetrics metrics) { 1141 if (metrics == null) return; 1142 Map<String,Long> metricsMap = metrics.getMetricsMap(); 1143 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1144 if (rpcCalls != null) { 1145 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1146 } 1147 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1148 if (remoteRpcCalls != null) { 1149 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1150 } 1151 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1152 if (millisBetweenNext != null) { 1153 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1154 } 1155 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1156 if (regionsScanned != null) { 1157 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1158 } 1159 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1160 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1161 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1162 } 1163 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1164 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1165 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1166 } 1167 } 1168 1169 String generateStatus(final int sr, final int i, final int lr) { 1170 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() + 1171 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport()); 1172 } 1173 1174 boolean isRandomValueSize() { 1175 return opts.valueRandom; 1176 } 1177 1178 protected int getReportingPeriod() { 1179 return opts.period; 1180 } 1181 1182 /** 1183 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1184 */ 1185 public Histogram getLatencyHistogram() { 1186 return latencyHistogram; 1187 } 1188 1189 void testSetup() throws IOException { 1190 // test metrics 1191 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1192 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1193 // scan metrics 1194 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1195 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1196 millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1197 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1198 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1199 bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1200 1201 createConnection(); 1202 onStartup(); 1203 } 1204 1205 abstract void createConnection() throws IOException; 1206 1207 abstract void onStartup() throws IOException; 1208 1209 void testTakedown() throws IOException { 1210 onTakedown(); 1211 // Print all stats for this thread continuously. 1212 // Synchronize on Test.class so different threads don't intermingle the 1213 // output. We can't use 'this' here because each thread has its own instance of Test class. 1214 synchronized (Test.class) { 1215 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1216 status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport( 1217 latencyHistogram)); 1218 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1219 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1220 status.setStatus("ValueSize (bytes) : " 1221 + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1222 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1223 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1224 if (rpcCallsHistogram.getCount() > 0) { 1225 status.setStatus("rpcCalls (count): " + 1226 YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1227 } 1228 if (remoteRpcCallsHistogram.getCount() > 0) { 1229 status.setStatus("remoteRpcCalls (count): " + 1230 YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1231 } 1232 if (millisBetweenNextHistogram.getCount() > 0) { 1233 status.setStatus("millisBetweenNext (latency): " + 1234 YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1235 } 1236 if (regionsScannedHistogram.getCount() > 0) { 1237 status.setStatus("regionsScanned (count): " + 1238 YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1239 } 1240 if (bytesInResultsHistogram.getCount() > 0) { 1241 status.setStatus("bytesInResults (size): " + 1242 YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1243 } 1244 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1245 status.setStatus("bytesInRemoteResults (size): " + 1246 YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1247 } 1248 } 1249 closeConnection(); 1250 receiverHost.closeReceivers(); 1251 } 1252 1253 abstract void onTakedown() throws IOException; 1254 1255 abstract void closeConnection() throws IOException; 1256 1257 /* 1258 * Run test 1259 * @return Elapsed time. 1260 * @throws IOException 1261 */ 1262 long test() throws IOException, InterruptedException { 1263 testSetup(); 1264 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1265 final long startTime = System.nanoTime(); 1266 try { 1267 testTimed(); 1268 } finally { 1269 testTakedown(); 1270 } 1271 return (System.nanoTime() - startTime) / 1000000; 1272 } 1273 1274 int getStartRow() { 1275 return opts.startRow; 1276 } 1277 1278 int getLastRow() { 1279 return getStartRow() + opts.perClientRunRows; 1280 } 1281 1282 /** 1283 * Provides an extension point for tests that don't want a per row invocation. 1284 */ 1285 void testTimed() throws IOException, InterruptedException { 1286 int startRow = getStartRow(); 1287 int lastRow = getLastRow(); 1288 TraceUtil.addSampler(traceSampler); 1289 // Report on completion of 1/10th of total. 1290 for (int ii = 0; ii < opts.cycles; ii++) { 1291 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1292 for (int i = startRow; i < lastRow; i++) { 1293 if (i % everyN != 0) continue; 1294 long startTime = System.nanoTime(); 1295 try (TraceScope scope = TraceUtil.createTrace("test row");){ 1296 testRow(i); 1297 } 1298 if ( (i - startRow) > opts.measureAfter) { 1299 // If multiget is enabled, say set to 10, testRow() returns immediately first 9 times 1300 // and sends the actual get request in the 10th iteration. We should only set latency 1301 // when actual request is sent because otherwise it turns out to be 0. 1302 if (opts.multiGet == 0 || (i - startRow + 1) % opts.multiGet == 0) { 1303 latencyHistogram.update((System.nanoTime() - startTime) / 1000); 1304 } 1305 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1306 status.setStatus(generateStatus(startRow, i, lastRow)); 1307 } 1308 } 1309 } 1310 } 1311 } 1312 1313 /** 1314 * @return Subset of the histograms' calculation. 1315 */ 1316 public String getShortLatencyReport() { 1317 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1318 } 1319 1320 /** 1321 * @return Subset of the histograms' calculation. 1322 */ 1323 public String getShortValueSizeReport() { 1324 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1325 } 1326 1327 /* 1328 * Test for individual row. 1329 * @param i Row index. 1330 */ 1331 abstract void testRow(final int i) throws IOException, InterruptedException; 1332 } 1333 1334 static abstract class Test extends TestBase { 1335 protected Connection connection; 1336 1337 Test(final Connection con, final TestOptions options, final Status status) { 1338 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1339 this.connection = con; 1340 } 1341 1342 @Override 1343 void createConnection() throws IOException { 1344 if (!opts.isOneCon()) { 1345 this.connection = ConnectionFactory.createConnection(conf); 1346 } 1347 } 1348 1349 @Override 1350 void closeConnection() throws IOException { 1351 if (!opts.isOneCon()) { 1352 this.connection.close(); 1353 } 1354 } 1355 } 1356 1357 static abstract class AsyncTest extends TestBase { 1358 protected AsyncConnection connection; 1359 1360 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1361 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1362 this.connection = con; 1363 } 1364 1365 @Override 1366 void createConnection() { 1367 if (!opts.isOneCon()) { 1368 try { 1369 this.connection = ConnectionFactory.createAsyncConnection(conf).get(); 1370 } catch (InterruptedException | ExecutionException e) { 1371 LOG.error("Failed to create async connection", e); 1372 } 1373 } 1374 } 1375 1376 @Override 1377 void closeConnection() throws IOException { 1378 if (!opts.isOneCon()) { 1379 this.connection.close(); 1380 } 1381 } 1382 } 1383 1384 static abstract class TableTest extends Test { 1385 protected Table table; 1386 1387 TableTest(Connection con, TestOptions options, Status status) { 1388 super(con, options, status); 1389 } 1390 1391 @Override 1392 void onStartup() throws IOException { 1393 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1394 } 1395 1396 @Override 1397 void onTakedown() throws IOException { 1398 table.close(); 1399 } 1400 } 1401 1402 static abstract class AsyncTableTest extends AsyncTest { 1403 protected AsyncTable<?> table; 1404 1405 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1406 super(con, options, status); 1407 } 1408 1409 @Override 1410 void onStartup() throws IOException { 1411 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1412 } 1413 1414 @Override 1415 void onTakedown() throws IOException { 1416 } 1417 } 1418 1419 static class AsyncRandomReadTest extends AsyncTableTest { 1420 private final Consistency consistency; 1421 private ArrayList<Get> gets; 1422 private Random rd = new Random(); 1423 1424 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1425 super(con, options, status); 1426 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1427 if (opts.multiGet > 0) { 1428 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1429 this.gets = new ArrayList<>(opts.multiGet); 1430 } 1431 } 1432 1433 @Override 1434 void testRow(final int i) throws IOException, InterruptedException { 1435 if (opts.randomSleep > 0) { 1436 Thread.sleep(rd.nextInt(opts.randomSleep)); 1437 } 1438 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1439 for (int family = 0; family < opts.families; family++) { 1440 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1441 if (opts.addColumns) { 1442 for (int column = 0; column < opts.columns; column++) { 1443 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1444 get.addColumn(familyName, qualifier); 1445 } 1446 } else { 1447 get.addFamily(familyName); 1448 } 1449 } 1450 if (opts.filterAll) { 1451 get.setFilter(new FilterAllFilter()); 1452 } 1453 get.setConsistency(consistency); 1454 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1455 try { 1456 if (opts.multiGet > 0) { 1457 this.gets.add(get); 1458 if (this.gets.size() == opts.multiGet) { 1459 Result[] rs = 1460 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1461 updateValueSize(rs); 1462 this.gets.clear(); 1463 } 1464 } else { 1465 updateValueSize(this.table.get(get).get()); 1466 } 1467 } catch (ExecutionException e) { 1468 throw new IOException(e); 1469 } 1470 } 1471 1472 public static RuntimeException runtime(Throwable e) { 1473 if (e instanceof RuntimeException) { 1474 return (RuntimeException) e; 1475 } 1476 return new RuntimeException(e); 1477 } 1478 1479 public static <V> V propagate(Callable<V> callable) { 1480 try { 1481 return callable.call(); 1482 } catch (Exception e) { 1483 throw runtime(e); 1484 } 1485 } 1486 1487 @Override 1488 protected int getReportingPeriod() { 1489 int period = opts.perClientRunRows / 10; 1490 return period == 0 ? opts.perClientRunRows : period; 1491 } 1492 1493 @Override 1494 protected void testTakedown() throws IOException { 1495 if (this.gets != null && this.gets.size() > 0) { 1496 this.table.get(gets); 1497 this.gets.clear(); 1498 } 1499 super.testTakedown(); 1500 } 1501 } 1502 1503 static class AsyncRandomWriteTest extends AsyncTableTest { 1504 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1505 super(con, options, status); 1506 } 1507 1508 @Override 1509 void testRow(final int i) throws IOException, InterruptedException { 1510 byte[] row = getRandomRow(this.rand, opts.totalRows); 1511 Put put = new Put(row); 1512 for (int family = 0; family < opts.families; family++) { 1513 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1514 for (int column = 0; column < opts.columns; column++) { 1515 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1516 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1517 if (opts.useTags) { 1518 byte[] tag = generateData(this.rand, TAG_LENGTH); 1519 Tag[] tags = new Tag[opts.noOfTags]; 1520 for (int n = 0; n < opts.noOfTags; n++) { 1521 Tag t = new ArrayBackedTag((byte) n, tag); 1522 tags[n] = t; 1523 } 1524 KeyValue kv = 1525 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1526 put.add(kv); 1527 updateValueSize(kv.getValueLength()); 1528 } else { 1529 put.addColumn(familyName, qualifier, value); 1530 updateValueSize(value.length); 1531 } 1532 } 1533 } 1534 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1535 try { 1536 table.put(put).get(); 1537 } catch (ExecutionException e) { 1538 throw new IOException(e); 1539 } 1540 } 1541 } 1542 1543 static class AsyncScanTest extends AsyncTableTest { 1544 private ResultScanner testScanner; 1545 private AsyncTable<?> asyncTable; 1546 1547 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1548 super(con, options, status); 1549 } 1550 1551 @Override 1552 void onStartup() throws IOException { 1553 this.asyncTable = 1554 connection.getTable(TableName.valueOf(opts.tableName), 1555 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1556 } 1557 1558 @Override 1559 void testTakedown() throws IOException { 1560 if (this.testScanner != null) { 1561 updateScanMetrics(this.testScanner.getScanMetrics()); 1562 this.testScanner.close(); 1563 } 1564 super.testTakedown(); 1565 } 1566 1567 @Override 1568 void testRow(final int i) throws IOException { 1569 if (this.testScanner == null) { 1570 Scan scan = 1571 new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1572 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1573 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1574 for (int family = 0; family < opts.families; family++) { 1575 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1576 if (opts.addColumns) { 1577 for (int column = 0; column < opts.columns; column++) { 1578 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1579 scan.addColumn(familyName, qualifier); 1580 } 1581 } else { 1582 scan.addFamily(familyName); 1583 } 1584 } 1585 if (opts.filterAll) { 1586 scan.setFilter(new FilterAllFilter()); 1587 } 1588 this.testScanner = asyncTable.getScanner(scan); 1589 } 1590 Result r = testScanner.next(); 1591 updateValueSize(r); 1592 } 1593 } 1594 1595 static class AsyncSequentialReadTest extends AsyncTableTest { 1596 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1597 super(con, options, status); 1598 } 1599 1600 @Override 1601 void testRow(final int i) throws IOException, InterruptedException { 1602 Get get = new Get(format(i)); 1603 for (int family = 0; family < opts.families; family++) { 1604 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1605 if (opts.addColumns) { 1606 for (int column = 0; column < opts.columns; column++) { 1607 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1608 get.addColumn(familyName, qualifier); 1609 } 1610 } else { 1611 get.addFamily(familyName); 1612 } 1613 } 1614 if (opts.filterAll) { 1615 get.setFilter(new FilterAllFilter()); 1616 } 1617 try { 1618 updateValueSize(table.get(get).get()); 1619 } catch (ExecutionException e) { 1620 throw new IOException(e); 1621 } 1622 } 1623 } 1624 1625 static class AsyncSequentialWriteTest extends AsyncTableTest { 1626 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1627 super(con, options, status); 1628 } 1629 1630 @Override 1631 void testRow(final int i) throws IOException, InterruptedException { 1632 byte[] row = format(i); 1633 Put put = new Put(row); 1634 for (int family = 0; family < opts.families; family++) { 1635 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1636 for (int column = 0; column < opts.columns; column++) { 1637 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1638 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1639 if (opts.useTags) { 1640 byte[] tag = generateData(this.rand, TAG_LENGTH); 1641 Tag[] tags = new Tag[opts.noOfTags]; 1642 for (int n = 0; n < opts.noOfTags; n++) { 1643 Tag t = new ArrayBackedTag((byte) n, tag); 1644 tags[n] = t; 1645 } 1646 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 1647 value, tags); 1648 put.add(kv); 1649 updateValueSize(kv.getValueLength()); 1650 } else { 1651 put.addColumn(familyName, qualifier, value); 1652 updateValueSize(value.length); 1653 } 1654 } 1655 } 1656 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1657 try { 1658 table.put(put).get(); 1659 } catch (ExecutionException e) { 1660 throw new IOException(e); 1661 } 1662 } 1663 } 1664 1665 static abstract class BufferedMutatorTest extends Test { 1666 protected BufferedMutator mutator; 1667 protected Table table; 1668 1669 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1670 super(con, options, status); 1671 } 1672 1673 @Override 1674 void onStartup() throws IOException { 1675 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1676 p.writeBufferSize(opts.bufferSize); 1677 this.mutator = connection.getBufferedMutator(p); 1678 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1679 } 1680 1681 @Override 1682 void onTakedown() throws IOException { 1683 mutator.close(); 1684 table.close(); 1685 } 1686 } 1687 1688 static class RandomSeekScanTest extends TableTest { 1689 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1690 super(con, options, status); 1691 } 1692 1693 @Override 1694 void testRow(final int i) throws IOException { 1695 Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) 1696 .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 1697 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 1698 .setScanMetricsEnabled(true); 1699 FilterList list = new FilterList(); 1700 for (int family = 0; family < opts.families; family++) { 1701 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1702 if (opts.addColumns) { 1703 for (int column = 0; column < opts.columns; column++) { 1704 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1705 scan.addColumn(familyName, qualifier); 1706 } 1707 } else { 1708 scan.addFamily(familyName); 1709 } 1710 } 1711 if (opts.filterAll) { 1712 list.addFilter(new FilterAllFilter()); 1713 } 1714 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1715 scan.setFilter(list); 1716 ResultScanner s = this.table.getScanner(scan); 1717 try { 1718 for (Result rr; (rr = s.next()) != null;) { 1719 updateValueSize(rr); 1720 } 1721 } finally { 1722 updateScanMetrics(s.getScanMetrics()); 1723 s.close(); 1724 } 1725 } 1726 1727 @Override 1728 protected int getReportingPeriod() { 1729 int period = opts.perClientRunRows / 100; 1730 return period == 0 ? opts.perClientRunRows : period; 1731 } 1732 1733 } 1734 1735 static abstract class RandomScanWithRangeTest extends TableTest { 1736 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1737 super(con, options, status); 1738 } 1739 1740 @Override 1741 void testRow(final int i) throws IOException { 1742 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1743 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1744 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1745 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1746 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1747 for (int family = 0; family < opts.families; family++) { 1748 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1749 if (opts.addColumns) { 1750 for (int column = 0; column < opts.columns; column++) { 1751 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1752 scan.addColumn(familyName, qualifier); 1753 } 1754 } else { 1755 scan.addFamily(familyName); 1756 } 1757 } 1758 if (opts.filterAll) { 1759 scan.setFilter(new FilterAllFilter()); 1760 } 1761 Result r = null; 1762 int count = 0; 1763 ResultScanner s = this.table.getScanner(scan); 1764 try { 1765 for (; (r = s.next()) != null;) { 1766 updateValueSize(r); 1767 count++; 1768 } 1769 if (i % 100 == 0) { 1770 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1771 Bytes.toString(startAndStopRow.getFirst()), 1772 Bytes.toString(startAndStopRow.getSecond()), count)); 1773 } 1774 } finally { 1775 updateScanMetrics(s.getScanMetrics()); 1776 s.close(); 1777 } 1778 } 1779 1780 protected abstract Pair<byte[],byte[]> getStartAndStopRow(); 1781 1782 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1783 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1784 int stop = start + maxRange; 1785 return new Pair<>(format(start), format(stop)); 1786 } 1787 1788 @Override 1789 protected int getReportingPeriod() { 1790 int period = opts.perClientRunRows / 100; 1791 return period == 0? opts.perClientRunRows: period; 1792 } 1793 } 1794 1795 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1796 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1797 super(con, options, status); 1798 } 1799 1800 @Override 1801 protected Pair<byte[], byte[]> getStartAndStopRow() { 1802 return generateStartAndStopRows(10); 1803 } 1804 } 1805 1806 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1807 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1808 super(con, options, status); 1809 } 1810 1811 @Override 1812 protected Pair<byte[], byte[]> getStartAndStopRow() { 1813 return generateStartAndStopRows(100); 1814 } 1815 } 1816 1817 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1818 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1819 super(con, options, status); 1820 } 1821 1822 @Override 1823 protected Pair<byte[], byte[]> getStartAndStopRow() { 1824 return generateStartAndStopRows(1000); 1825 } 1826 } 1827 1828 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1829 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1830 super(con, options, status); 1831 } 1832 1833 @Override 1834 protected Pair<byte[], byte[]> getStartAndStopRow() { 1835 return generateStartAndStopRows(10000); 1836 } 1837 } 1838 1839 static class RandomReadTest extends TableTest { 1840 private final Consistency consistency; 1841 private ArrayList<Get> gets; 1842 private Random rd = new Random(); 1843 1844 RandomReadTest(Connection con, TestOptions options, Status status) { 1845 super(con, options, status); 1846 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1847 if (opts.multiGet > 0) { 1848 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1849 this.gets = new ArrayList<>(opts.multiGet); 1850 } 1851 } 1852 1853 @Override 1854 void testRow(final int i) throws IOException, InterruptedException { 1855 if (opts.randomSleep > 0) { 1856 Thread.sleep(rd.nextInt(opts.randomSleep)); 1857 } 1858 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1859 for (int family = 0; family < opts.families; family++) { 1860 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1861 if (opts.addColumns) { 1862 for (int column = 0; column < opts.columns; column++) { 1863 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1864 get.addColumn(familyName, qualifier); 1865 } 1866 } else { 1867 get.addFamily(familyName); 1868 } 1869 } 1870 if (opts.filterAll) { 1871 get.setFilter(new FilterAllFilter()); 1872 } 1873 get.setConsistency(consistency); 1874 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1875 if (opts.multiGet > 0) { 1876 this.gets.add(get); 1877 if (this.gets.size() == opts.multiGet) { 1878 Result [] rs = this.table.get(this.gets); 1879 updateValueSize(rs); 1880 this.gets.clear(); 1881 } 1882 } else { 1883 updateValueSize(this.table.get(get)); 1884 } 1885 } 1886 1887 @Override 1888 protected int getReportingPeriod() { 1889 int period = opts.perClientRunRows / 10; 1890 return period == 0 ? opts.perClientRunRows : period; 1891 } 1892 1893 @Override 1894 protected void testTakedown() throws IOException { 1895 if (this.gets != null && this.gets.size() > 0) { 1896 this.table.get(gets); 1897 this.gets.clear(); 1898 } 1899 super.testTakedown(); 1900 } 1901 } 1902 1903 static class RandomWriteTest extends BufferedMutatorTest { 1904 RandomWriteTest(Connection con, TestOptions options, Status status) { 1905 super(con, options, status); 1906 } 1907 1908 @Override 1909 void testRow(final int i) throws IOException { 1910 byte[] row = getRandomRow(this.rand, opts.totalRows); 1911 Put put = new Put(row); 1912 for (int family = 0; family < opts.families; family++) { 1913 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1914 for (int column = 0; column < opts.columns; column++) { 1915 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1916 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1917 if (opts.useTags) { 1918 byte[] tag = generateData(this.rand, TAG_LENGTH); 1919 Tag[] tags = new Tag[opts.noOfTags]; 1920 for (int n = 0; n < opts.noOfTags; n++) { 1921 Tag t = new ArrayBackedTag((byte) n, tag); 1922 tags[n] = t; 1923 } 1924 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 1925 value, tags); 1926 put.add(kv); 1927 updateValueSize(kv.getValueLength()); 1928 } else { 1929 put.addColumn(familyName, qualifier, value); 1930 updateValueSize(value.length); 1931 } 1932 } 1933 } 1934 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1935 if (opts.autoFlush) { 1936 table.put(put); 1937 } else { 1938 mutator.mutate(put); 1939 } 1940 } 1941 } 1942 1943 static class ScanTest extends TableTest { 1944 private ResultScanner testScanner; 1945 1946 ScanTest(Connection con, TestOptions options, Status status) { 1947 super(con, options, status); 1948 } 1949 1950 @Override 1951 void testTakedown() throws IOException { 1952 if (this.testScanner != null) { 1953 this.testScanner.close(); 1954 } 1955 super.testTakedown(); 1956 } 1957 1958 1959 @Override 1960 void testRow(final int i) throws IOException { 1961 if (this.testScanner == null) { 1962 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1963 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1964 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1965 for (int family = 0; family < opts.families; family++) { 1966 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1967 if (opts.addColumns) { 1968 for (int column = 0; column < opts.columns; column++) { 1969 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1970 scan.addColumn(familyName, qualifier); 1971 } 1972 } else { 1973 scan.addFamily(familyName); 1974 } 1975 } 1976 if (opts.filterAll) { 1977 scan.setFilter(new FilterAllFilter()); 1978 } 1979 this.testScanner = table.getScanner(scan); 1980 } 1981 Result r = testScanner.next(); 1982 updateValueSize(r); 1983 } 1984 } 1985 1986 /** 1987 * Base class for operations that are CAS-like; that read a value and then set it based off what 1988 * they read. In this category is increment, append, checkAndPut, etc. 1989 * 1990 * <p>These operations also want some concurrency going on. Usually when these tests run, they 1991 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 1992 * same key space. We do this with our getStartRow and getLastRow overrides. 1993 */ 1994 static abstract class CASTableTest extends TableTest { 1995 private final byte [] qualifier; 1996 CASTableTest(Connection con, TestOptions options, Status status) { 1997 super(con, options, status); 1998 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 1999 } 2000 2001 byte [] getQualifier() { 2002 return this.qualifier; 2003 } 2004 2005 @Override 2006 int getStartRow() { 2007 return 0; 2008 } 2009 2010 @Override 2011 int getLastRow() { 2012 return opts.perClientRunRows; 2013 } 2014 } 2015 2016 static class IncrementTest extends CASTableTest { 2017 IncrementTest(Connection con, TestOptions options, Status status) { 2018 super(con, options, status); 2019 } 2020 2021 @Override 2022 void testRow(final int i) throws IOException { 2023 Increment increment = new Increment(format(i)); 2024 // unlike checkAndXXX tests, which make most sense to do on a single value, 2025 // if multiple families are specified for an increment test we assume it is 2026 // meant to raise the work factor 2027 for (int family = 0; family < opts.families; family++) { 2028 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2029 increment.addColumn(familyName, getQualifier(), 1l); 2030 } 2031 updateValueSize(this.table.increment(increment)); 2032 } 2033 } 2034 2035 static class AppendTest extends CASTableTest { 2036 AppendTest(Connection con, TestOptions options, Status status) { 2037 super(con, options, status); 2038 } 2039 2040 @Override 2041 void testRow(final int i) throws IOException { 2042 byte [] bytes = format(i); 2043 Append append = new Append(bytes); 2044 // unlike checkAndXXX tests, which make most sense to do on a single value, 2045 // if multiple families are specified for an append test we assume it is 2046 // meant to raise the work factor 2047 for (int family = 0; family < opts.families; family++) { 2048 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2049 append.addColumn(familyName, getQualifier(), bytes); 2050 } 2051 updateValueSize(this.table.append(append)); 2052 } 2053 } 2054 2055 static class CheckAndMutateTest extends CASTableTest { 2056 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2057 super(con, options, status); 2058 } 2059 2060 @Override 2061 void testRow(final int i) throws IOException { 2062 final byte [] bytes = format(i); 2063 // checkAndXXX tests operate on only a single value 2064 // Put a known value so when we go to check it, it is there. 2065 Put put = new Put(bytes); 2066 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2067 this.table.put(put); 2068 RowMutations mutations = new RowMutations(bytes); 2069 mutations.add(put); 2070 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2071 .ifEquals(bytes).thenMutate(mutations); 2072 } 2073 } 2074 2075 static class CheckAndPutTest extends CASTableTest { 2076 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2077 super(con, options, status); 2078 } 2079 2080 @Override 2081 void testRow(final int i) throws IOException { 2082 final byte [] bytes = format(i); 2083 // checkAndXXX tests operate on only a single value 2084 // Put a known value so when we go to check it, it is there. 2085 Put put = new Put(bytes); 2086 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2087 this.table.put(put); 2088 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2089 .ifEquals(bytes).thenPut(put); 2090 } 2091 } 2092 2093 static class CheckAndDeleteTest extends CASTableTest { 2094 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2095 super(con, options, status); 2096 } 2097 2098 @Override 2099 void testRow(final int i) throws IOException { 2100 final byte [] bytes = format(i); 2101 // checkAndXXX tests operate on only a single value 2102 // Put a known value so when we go to check it, it is there. 2103 Put put = new Put(bytes); 2104 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2105 this.table.put(put); 2106 Delete delete = new Delete(put.getRow()); 2107 delete.addColumn(FAMILY_ZERO, getQualifier()); 2108 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2109 .ifEquals(bytes).thenDelete(delete); 2110 } 2111 } 2112 2113 static class SequentialReadTest extends TableTest { 2114 SequentialReadTest(Connection con, TestOptions options, Status status) { 2115 super(con, options, status); 2116 } 2117 2118 @Override 2119 void testRow(final int i) throws IOException { 2120 Get get = new Get(format(i)); 2121 for (int family = 0; family < opts.families; family++) { 2122 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2123 if (opts.addColumns) { 2124 for (int column = 0; column < opts.columns; column++) { 2125 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2126 get.addColumn(familyName, qualifier); 2127 } 2128 } else { 2129 get.addFamily(familyName); 2130 } 2131 } 2132 if (opts.filterAll) { 2133 get.setFilter(new FilterAllFilter()); 2134 } 2135 updateValueSize(table.get(get)); 2136 } 2137 } 2138 2139 static class SequentialWriteTest extends BufferedMutatorTest { 2140 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2141 super(con, options, status); 2142 } 2143 2144 @Override 2145 void testRow(final int i) throws IOException { 2146 byte[] row = format(i); 2147 Put put = new Put(row); 2148 for (int family = 0; family < opts.families; family++) { 2149 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2150 for (int column = 0; column < opts.columns; column++) { 2151 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2152 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2153 if (opts.useTags) { 2154 byte[] tag = generateData(this.rand, TAG_LENGTH); 2155 Tag[] tags = new Tag[opts.noOfTags]; 2156 for (int n = 0; n < opts.noOfTags; n++) { 2157 Tag t = new ArrayBackedTag((byte) n, tag); 2158 tags[n] = t; 2159 } 2160 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 2161 value, tags); 2162 put.add(kv); 2163 updateValueSize(kv.getValueLength()); 2164 } else { 2165 put.addColumn(familyName, qualifier, value); 2166 updateValueSize(value.length); 2167 } 2168 } 2169 } 2170 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2171 if (opts.autoFlush) { 2172 table.put(put); 2173 } else { 2174 mutator.mutate(put); 2175 } 2176 } 2177 } 2178 2179 static class FilteredScanTest extends TableTest { 2180 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2181 2182 FilteredScanTest(Connection con, TestOptions options, Status status) { 2183 super(con, options, status); 2184 } 2185 2186 @Override 2187 void testRow(int i) throws IOException { 2188 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2189 Scan scan = constructScan(value); 2190 ResultScanner scanner = null; 2191 try { 2192 scanner = this.table.getScanner(scan); 2193 for (Result r = null; (r = scanner.next()) != null;) { 2194 updateValueSize(r); 2195 } 2196 } finally { 2197 if (scanner != null) { 2198 updateScanMetrics(scanner.getScanMetrics()); 2199 scanner.close(); 2200 } 2201 } 2202 } 2203 2204 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2205 FilterList list = new FilterList(); 2206 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, 2207 CompareOperator.EQUAL, new BinaryComparator(valuePrefix)); 2208 list.addFilter(filter); 2209 if (opts.filterAll) { 2210 list.addFilter(new FilterAllFilter()); 2211 } 2212 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2213 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2214 .setScanMetricsEnabled(true); 2215 if (opts.addColumns) { 2216 for (int column = 0; column < opts.columns; column++) { 2217 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2218 scan.addColumn(FAMILY_ZERO, qualifier); 2219 } 2220 } else { 2221 scan.addFamily(FAMILY_ZERO); 2222 } 2223 scan.setFilter(list); 2224 return scan; 2225 } 2226 } 2227 2228 /** 2229 * Compute a throughput rate in MB/s. 2230 * @param rows Number of records consumed. 2231 * @param timeMs Time taken in milliseconds. 2232 * @return String value with label, ie '123.76 MB/s' 2233 */ 2234 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) { 2235 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH + 2236 ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families); 2237 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2238 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) 2239 .divide(BYTES_PER_MB, CXT); 2240 return FMT.format(mbps) + " MB/s"; 2241 } 2242 2243 /* 2244 * Format passed integer. 2245 * @param number 2246 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed 2247 * number (Does absolute in case number is negative). 2248 */ 2249 public static byte [] format(final int number) { 2250 byte [] b = new byte[ROW_LENGTH]; 2251 int d = Math.abs(number); 2252 for (int i = b.length - 1; i >= 0; i--) { 2253 b[i] = (byte)((d % 10) + '0'); 2254 d /= 10; 2255 } 2256 return b; 2257 } 2258 2259 /* 2260 * This method takes some time and is done inline uploading data. For 2261 * example, doing the mapfile test, generation of the key and value 2262 * consumes about 30% of CPU time. 2263 * @return Generated random value to insert into a table cell. 2264 */ 2265 public static byte[] generateData(final Random r, int length) { 2266 byte [] b = new byte [length]; 2267 int i; 2268 2269 for(i = 0; i < (length-8); i += 8) { 2270 b[i] = (byte) (65 + r.nextInt(26)); 2271 b[i+1] = b[i]; 2272 b[i+2] = b[i]; 2273 b[i+3] = b[i]; 2274 b[i+4] = b[i]; 2275 b[i+5] = b[i]; 2276 b[i+6] = b[i]; 2277 b[i+7] = b[i]; 2278 } 2279 2280 byte a = (byte) (65 + r.nextInt(26)); 2281 for(; i < length; i++) { 2282 b[i] = a; 2283 } 2284 return b; 2285 } 2286 2287 static byte [] getRandomRow(final Random random, final int totalRows) { 2288 return format(generateRandomRow(random, totalRows)); 2289 } 2290 2291 static int generateRandomRow(final Random random, final int totalRows) { 2292 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2293 } 2294 2295 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2296 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2297 throws IOException, InterruptedException { 2298 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " 2299 + opts.perClientRunRows + " rows"); 2300 long totalElapsedTime; 2301 2302 final TestBase t; 2303 try { 2304 if (AsyncTest.class.isAssignableFrom(cmd)) { 2305 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2306 Constructor<? extends AsyncTest> constructor = 2307 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2308 t = constructor.newInstance(asyncCon, opts, status); 2309 } else { 2310 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2311 Constructor<? extends Test> constructor = 2312 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2313 t = constructor.newInstance(con, opts, status); 2314 } 2315 } catch (NoSuchMethodException e) { 2316 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2317 + ". It does not provide a constructor as described by " 2318 + "the javadoc comment. Available constructors are: " 2319 + Arrays.toString(cmd.getConstructors())); 2320 } catch (Exception e) { 2321 throw new IllegalStateException("Failed to construct command class", e); 2322 } 2323 totalElapsedTime = t.test(); 2324 2325 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 2326 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + 2327 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2328 getAverageValueLength(opts), opts.families, opts.columns) + ")"); 2329 2330 return new RunResult(totalElapsedTime, t.getLatencyHistogram()); 2331 } 2332 2333 private static int getAverageValueLength(final TestOptions opts) { 2334 return opts.valueRandom? opts.valueSize/2: opts.valueSize; 2335 } 2336 2337 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException, 2338 InterruptedException, ClassNotFoundException, ExecutionException { 2339 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2340 // the TestOptions introspection for us and dump the output in a readable format. 2341 LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts)); 2342 Admin admin = null; 2343 Connection connection = null; 2344 try { 2345 connection = ConnectionFactory.createConnection(getConf()); 2346 admin = connection.getAdmin(); 2347 checkTable(admin, opts); 2348 } finally { 2349 if (admin != null) admin.close(); 2350 if (connection != null) connection.close(); 2351 } 2352 if (opts.nomapred) { 2353 doLocalClients(opts, getConf()); 2354 } else { 2355 doMapReduce(opts, getConf()); 2356 } 2357 } 2358 2359 protected void printUsage() { 2360 printUsage(this.getClass().getName(), null); 2361 } 2362 2363 protected static void printUsage(final String message) { 2364 printUsage(PerformanceEvaluation.class.getName(), message); 2365 } 2366 2367 protected static void printUsageAndExit(final String message, final int exitCode) { 2368 printUsage(message); 2369 System.exit(exitCode); 2370 } 2371 2372 protected static void printUsage(final String className, final String message) { 2373 if (message != null && message.length() > 0) { 2374 System.err.println(message); 2375 } 2376 System.err.println("Usage: java " + className + " \\"); 2377 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2378 System.err.println(); 2379 System.err.println("General Options:"); 2380 System.err.println(" nomapred Run multiple clients using threads " + 2381 "(rather than use mapreduce)"); 2382 System.err.println(" oneCon all the threads share the same connection. Default: False"); 2383 System.err.println(" sampleRate Execute test on a sample of total " + 2384 "rows. Only supported by randomRead. Default: 1.0"); 2385 System.err.println(" period Report every 'period' rows: " + 2386 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10); 2387 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2388 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + 2389 "Default: 0"); 2390 System.err.println(" latency Set to report operation latencies. Default: False"); 2391 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" + 2392 " rows have been treated. Default: 0"); 2393 System.err.println(" valueSize Pass value size to use: Default: " 2394 + DEFAULT_OPTS.getValueSize()); 2395 System.err.println(" valueRandom Set if we should vary value size between 0 and " + 2396 "'valueSize'; set on read for stats on size: Default: Not set."); 2397 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2398 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2399 System.err.println(); 2400 System.err.println("Table Creation / Write Tests:"); 2401 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2402 System.err.println(" rows Rows each client runs. Default: " 2403 + DEFAULT_OPTS.getPerClientRunRows() 2404 + ". In case of randomReads and randomSeekScans this could" 2405 + " be specified along with --size to specify the number of rows to be scanned within" 2406 + " the total range specified by the size."); 2407 System.err.println( 2408 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2409 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2410 + " use size to specify the end range and --rows" 2411 + " specifies the number of rows within that range. " + "Default: 1.0."); 2412 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2413 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 2414 "Default: false"); 2415 System.err.println(" valueZipf Set if we should vary value size between 0 and " + 2416 "'valueSize' in zipf form: Default: Not set."); 2417 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2418 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2419 System.err.println(" presplit Create presplit table. If a table with same name exists," 2420 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2421 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2422 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 2423 "Default: false"); 2424 System.err.println(" numoftags Specify the no of tags that would be needed. " + 2425 "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2426 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2427 System.err.println(" columns Columns to write per row. Default: 1"); 2428 System.err.println(" families Specify number of column families for the table. Default: 1"); 2429 System.err.println(); 2430 System.err.println("Read Tests:"); 2431 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2432 + " there by not returning any thing back to the client. Helps to check the server side" 2433 + " performance. Uses FilterAllFilter internally. "); 2434 System.err.println(" multiGet Batch gets together into groups of N. Only supported " + 2435 "by randomRead. Default: disabled"); 2436 System.err.println(" inmemory Tries to keep the HFiles of the CF " + 2437 "inmemory as far as possible. Not guaranteed that reads are always served " + 2438 "from memory. Default: false"); 2439 System.err.println(" bloomFilter Bloom filter type, one of " 2440 + Arrays.toString(BloomType.values())); 2441 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2442 System.err.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2443 + "Uses the CompactingMemstore"); 2444 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2445 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2446 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2447 System.err.println(" caching Scan caching to use. Default: 30"); 2448 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2449 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2450 System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2451 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2452 System.err.println(); 2453 System.err.println(" Note: -D properties will be applied to the conf used. "); 2454 System.err.println(" For example: "); 2455 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2456 System.err.println(" -Dmapreduce.task.timeout=60000"); 2457 System.err.println(); 2458 System.err.println("Command:"); 2459 for (CmdDescriptor command : COMMANDS.values()) { 2460 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2461 } 2462 System.err.println(); 2463 System.err.println("Args:"); 2464 System.err.println(" nclients Integer. Required. Total number of clients " 2465 + "(and HRegionServers) running. 1 <= value <= 500"); 2466 System.err.println("Examples:"); 2467 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2468 System.err.println(" $ hbase " + className + " sequentialWrite 1"); 2469 System.err.println(" To run 10 clients doing increments over ten rows:"); 2470 System.err.println(" $ hbase " + className + " --rows=10 --nomapred increment 10"); 2471 } 2472 2473 /** 2474 * Parse options passed in via an arguments array. Assumes that array has been split 2475 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain 2476 * in the queue at the conclusion of this method call. It's up to the caller to deal 2477 * with these unrecognized arguments. 2478 */ 2479 static TestOptions parseOpts(Queue<String> args) { 2480 TestOptions opts = new TestOptions(); 2481 2482 String cmd = null; 2483 while ((cmd = args.poll()) != null) { 2484 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2485 // place item back onto queue so that caller knows parsing was incomplete 2486 args.add(cmd); 2487 break; 2488 } 2489 2490 final String nmr = "--nomapred"; 2491 if (cmd.startsWith(nmr)) { 2492 opts.nomapred = true; 2493 continue; 2494 } 2495 2496 final String rows = "--rows="; 2497 if (cmd.startsWith(rows)) { 2498 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2499 continue; 2500 } 2501 2502 final String cycles = "--cycles="; 2503 if (cmd.startsWith(cycles)) { 2504 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2505 continue; 2506 } 2507 2508 final String sampleRate = "--sampleRate="; 2509 if (cmd.startsWith(sampleRate)) { 2510 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2511 continue; 2512 } 2513 2514 final String table = "--table="; 2515 if (cmd.startsWith(table)) { 2516 opts.tableName = cmd.substring(table.length()); 2517 continue; 2518 } 2519 2520 final String startRow = "--startRow="; 2521 if (cmd.startsWith(startRow)) { 2522 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2523 continue; 2524 } 2525 2526 final String compress = "--compress="; 2527 if (cmd.startsWith(compress)) { 2528 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2529 continue; 2530 } 2531 2532 final String traceRate = "--traceRate="; 2533 if (cmd.startsWith(traceRate)) { 2534 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2535 continue; 2536 } 2537 2538 final String blockEncoding = "--blockEncoding="; 2539 if (cmd.startsWith(blockEncoding)) { 2540 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2541 continue; 2542 } 2543 2544 final String flushCommits = "--flushCommits="; 2545 if (cmd.startsWith(flushCommits)) { 2546 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2547 continue; 2548 } 2549 2550 final String writeToWAL = "--writeToWAL="; 2551 if (cmd.startsWith(writeToWAL)) { 2552 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2553 continue; 2554 } 2555 2556 final String presplit = "--presplit="; 2557 if (cmd.startsWith(presplit)) { 2558 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2559 continue; 2560 } 2561 2562 final String inMemory = "--inmemory="; 2563 if (cmd.startsWith(inMemory)) { 2564 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2565 continue; 2566 } 2567 2568 final String autoFlush = "--autoFlush="; 2569 if (cmd.startsWith(autoFlush)) { 2570 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2571 continue; 2572 } 2573 2574 final String onceCon = "--oneCon="; 2575 if (cmd.startsWith(onceCon)) { 2576 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2577 continue; 2578 } 2579 2580 final String latency = "--latency"; 2581 if (cmd.startsWith(latency)) { 2582 opts.reportLatency = true; 2583 continue; 2584 } 2585 2586 final String multiGet = "--multiGet="; 2587 if (cmd.startsWith(multiGet)) { 2588 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2589 continue; 2590 } 2591 2592 final String useTags = "--usetags="; 2593 if (cmd.startsWith(useTags)) { 2594 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2595 continue; 2596 } 2597 2598 final String noOfTags = "--numoftags="; 2599 if (cmd.startsWith(noOfTags)) { 2600 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2601 continue; 2602 } 2603 2604 final String replicas = "--replicas="; 2605 if (cmd.startsWith(replicas)) { 2606 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2607 continue; 2608 } 2609 2610 final String filterOutAll = "--filterAll"; 2611 if (cmd.startsWith(filterOutAll)) { 2612 opts.filterAll = true; 2613 continue; 2614 } 2615 2616 final String size = "--size="; 2617 if (cmd.startsWith(size)) { 2618 opts.size = Float.parseFloat(cmd.substring(size.length())); 2619 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2620 continue; 2621 } 2622 2623 final String splitPolicy = "--splitPolicy="; 2624 if (cmd.startsWith(splitPolicy)) { 2625 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2626 continue; 2627 } 2628 2629 final String randomSleep = "--randomSleep="; 2630 if (cmd.startsWith(randomSleep)) { 2631 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2632 continue; 2633 } 2634 2635 final String measureAfter = "--measureAfter="; 2636 if (cmd.startsWith(measureAfter)) { 2637 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2638 continue; 2639 } 2640 2641 final String bloomFilter = "--bloomFilter="; 2642 if (cmd.startsWith(bloomFilter)) { 2643 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2644 continue; 2645 } 2646 2647 final String blockSize = "--blockSize="; 2648 if(cmd.startsWith(blockSize) ) { 2649 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2650 continue; 2651 } 2652 2653 final String valueSize = "--valueSize="; 2654 if (cmd.startsWith(valueSize)) { 2655 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2656 continue; 2657 } 2658 2659 final String valueRandom = "--valueRandom"; 2660 if (cmd.startsWith(valueRandom)) { 2661 opts.valueRandom = true; 2662 if (opts.valueZipf) { 2663 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2664 } 2665 continue; 2666 } 2667 2668 final String valueZipf = "--valueZipf"; 2669 if (cmd.startsWith(valueZipf)) { 2670 opts.valueZipf = true; 2671 if (opts.valueRandom) { 2672 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2673 } 2674 continue; 2675 } 2676 2677 final String period = "--period="; 2678 if (cmd.startsWith(period)) { 2679 opts.period = Integer.parseInt(cmd.substring(period.length())); 2680 continue; 2681 } 2682 2683 final String addColumns = "--addColumns="; 2684 if (cmd.startsWith(addColumns)) { 2685 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2686 continue; 2687 } 2688 2689 final String inMemoryCompaction = "--inmemoryCompaction="; 2690 if (cmd.startsWith(inMemoryCompaction)) { 2691 opts.inMemoryCompaction = 2692 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2693 continue; 2694 } 2695 2696 final String columns = "--columns="; 2697 if (cmd.startsWith(columns)) { 2698 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2699 continue; 2700 } 2701 2702 final String families = "--families="; 2703 if (cmd.startsWith(families)) { 2704 opts.families = Integer.parseInt(cmd.substring(families.length())); 2705 continue; 2706 } 2707 2708 final String caching = "--caching="; 2709 if (cmd.startsWith(caching)) { 2710 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2711 continue; 2712 } 2713 2714 final String asyncPrefetch = "--asyncPrefetch"; 2715 if (cmd.startsWith(asyncPrefetch)) { 2716 opts.asyncPrefetch = true; 2717 continue; 2718 } 2719 2720 final String cacheBlocks = "--cacheBlocks="; 2721 if (cmd.startsWith(cacheBlocks)) { 2722 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2723 continue; 2724 } 2725 2726 final String scanReadType = "--scanReadType="; 2727 if (cmd.startsWith(scanReadType)) { 2728 opts.scanReadType = 2729 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2730 continue; 2731 } 2732 2733 final String bufferSize = "--bufferSize="; 2734 if (cmd.startsWith(bufferSize)) { 2735 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2736 continue; 2737 } 2738 2739 if (isCommandClass(cmd)) { 2740 opts.cmdName = cmd; 2741 try { 2742 opts.numClientThreads = Integer.parseInt(args.remove()); 2743 } catch (NoSuchElementException | NumberFormatException e) { 2744 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2745 } 2746 opts = calculateRowsAndSize(opts); 2747 break; 2748 } else { 2749 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2750 } 2751 2752 // Not matching any option or command. 2753 System.err.println("Error: Wrong option or command: " + cmd); 2754 args.add(cmd); 2755 break; 2756 } 2757 return opts; 2758 } 2759 2760 static TestOptions calculateRowsAndSize(final TestOptions opts) { 2761 int rowsPerGB = getRowsPerGB(opts); 2762 if ((opts.getCmdName() != null 2763 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 2764 && opts.size != DEFAULT_OPTS.size 2765 && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { 2766 opts.totalRows = (int) opts.size * rowsPerGB; 2767 } else if (opts.size != DEFAULT_OPTS.size) { 2768 // total size in GB specified 2769 opts.totalRows = (int) opts.size * rowsPerGB; 2770 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 2771 } else { 2772 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 2773 opts.size = opts.totalRows / rowsPerGB; 2774 } 2775 return opts; 2776 } 2777 2778 static int getRowsPerGB(final TestOptions opts) { 2779 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() * 2780 opts.getColumns()); 2781 } 2782 2783 @Override 2784 public int run(String[] args) throws Exception { 2785 // Process command-line args. TODO: Better cmd-line processing 2786 // (but hopefully something not as painful as cli options). 2787 int errCode = -1; 2788 if (args.length < 1) { 2789 printUsage(); 2790 return errCode; 2791 } 2792 2793 try { 2794 LinkedList<String> argv = new LinkedList<>(); 2795 argv.addAll(Arrays.asList(args)); 2796 TestOptions opts = parseOpts(argv); 2797 2798 // args remaining, print help and exit 2799 if (!argv.isEmpty()) { 2800 errCode = 0; 2801 printUsage(); 2802 return errCode; 2803 } 2804 2805 // must run at least 1 client 2806 if (opts.numClientThreads <= 0) { 2807 throw new IllegalArgumentException("Number of clients must be > 0"); 2808 } 2809 2810 // cmdName should not be null, print help and exit 2811 if (opts.cmdName == null) { 2812 printUsage(); 2813 return errCode; 2814 } 2815 2816 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 2817 if (cmdClass != null) { 2818 runTest(cmdClass, opts); 2819 errCode = 0; 2820 } 2821 2822 } catch (Exception e) { 2823 e.printStackTrace(); 2824 } 2825 2826 return errCode; 2827 } 2828 2829 private static boolean isCommandClass(String cmd) { 2830 return COMMANDS.containsKey(cmd); 2831 } 2832 2833 private static Class<? extends TestBase> determineCommandClass(String cmd) { 2834 CmdDescriptor descriptor = COMMANDS.get(cmd); 2835 return descriptor != null ? descriptor.getCmdClass() : null; 2836 } 2837 2838 public static void main(final String[] args) throws Exception { 2839 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 2840 System.exit(res); 2841 } 2842}