001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import com.codahale.metrics.Histogram; 022import com.codahale.metrics.UniformReservoir; 023import com.fasterxml.jackson.databind.MapperFeature; 024import com.fasterxml.jackson.databind.ObjectMapper; 025 026import java.io.IOException; 027import java.io.PrintStream; 028import java.lang.reflect.Constructor; 029import java.math.BigDecimal; 030import java.math.MathContext; 031import java.text.DecimalFormat; 032import java.text.SimpleDateFormat; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Date; 036import java.util.LinkedList; 037import java.util.Locale; 038import java.util.Map; 039import java.util.NoSuchElementException; 040import java.util.Queue; 041import java.util.Random; 042import java.util.TreeMap; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048 049import org.apache.commons.lang3.StringUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.conf.Configured; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.Append; 056import org.apache.hadoop.hbase.client.AsyncConnection; 057import org.apache.hadoop.hbase.client.AsyncTable; 058import org.apache.hadoop.hbase.client.BufferedMutator; 059import org.apache.hadoop.hbase.client.BufferedMutatorParams; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Consistency; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Durability; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Increment; 067import org.apache.hadoop.hbase.client.Put; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.ResultScanner; 070import org.apache.hadoop.hbase.client.RowMutations; 071import org.apache.hadoop.hbase.client.Scan; 072import org.apache.hadoop.hbase.client.Table; 073import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 074import org.apache.hadoop.hbase.filter.BinaryComparator; 075import org.apache.hadoop.hbase.filter.Filter; 076import org.apache.hadoop.hbase.filter.FilterAllFilter; 077import org.apache.hadoop.hbase.filter.FilterList; 078import org.apache.hadoop.hbase.filter.PageFilter; 079import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 080import org.apache.hadoop.hbase.filter.WhileMatchFilter; 081import org.apache.hadoop.hbase.io.compress.Compression; 082import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 083import org.apache.hadoop.hbase.io.hfile.RandomDistribution; 084import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 085import org.apache.hadoop.hbase.regionserver.BloomType; 086import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 087import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; 088import org.apache.hadoop.hbase.trace.SpanReceiverHost; 089import org.apache.hadoop.hbase.trace.TraceUtil; 090import org.apache.hadoop.hbase.util.ByteArrayHashKey; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.Hash; 093import org.apache.hadoop.hbase.util.MurmurHash; 094import org.apache.hadoop.hbase.util.Pair; 095import org.apache.hadoop.hbase.util.YammerHistogramUtils; 096import org.apache.hadoop.io.LongWritable; 097import org.apache.hadoop.io.Text; 098import org.apache.hadoop.mapreduce.Job; 099import org.apache.hadoop.mapreduce.Mapper; 100import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 101import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 102import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 103import org.apache.hadoop.util.Tool; 104import org.apache.hadoop.util.ToolRunner; 105import org.apache.htrace.core.ProbabilitySampler; 106import org.apache.htrace.core.Sampler; 107import org.apache.htrace.core.TraceScope; 108import org.apache.yetus.audience.InterfaceAudience; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 113 114/** 115 * Script used evaluating HBase performance and scalability. Runs a HBase 116 * client that steps through one of a set of hardcoded tests or 'experiments' 117 * (e.g. a random reads test, a random writes test, etc.). Pass on the 118 * command-line which test to run and how many clients are participating in 119 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 120 * 121 * <p>This class sets up and runs the evaluation programs described in 122 * Section 7, <i>Performance Evaluation</i>, of the <a 123 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 124 * paper, pages 8-10. 125 * 126 * <p>By default, runs as a mapreduce job where each mapper runs a single test 127 * client. Can also run as a non-mapreduce, multithreaded application by 128 * specifying {@code --nomapred}. Each client does about 1GB of data, unless 129 * specified otherwise. 130 */ 131@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 132public class PerformanceEvaluation extends Configured implements Tool { 133 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 134 static final String RANDOM_READ = "randomRead"; 135 static final String PE_COMMAND_SHORTNAME = "pe"; 136 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 137 private static final ObjectMapper MAPPER = new ObjectMapper(); 138 static { 139 MAPPER.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); 140 } 141 142 public static final String TABLE_NAME = "TestTable"; 143 public static final String FAMILY_NAME_BASE = "info"; 144 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 145 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 146 public static final int DEFAULT_VALUE_LENGTH = 1000; 147 public static final int ROW_LENGTH = 26; 148 149 private static final int ONE_GB = 1024 * 1024 * 1000; 150 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 151 // TODO : should we make this configurable 152 private static final int TAG_LENGTH = 256; 153 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 154 private static final MathContext CXT = MathContext.DECIMAL64; 155 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 156 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 157 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 158 159 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 160 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 161 162 static { 163 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 164 "Run async random read test"); 165 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 166 "Run async random write test"); 167 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 168 "Run async sequential read test"); 169 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 170 "Run async sequential write test"); 171 addCommandDescriptor(AsyncScanTest.class, "asyncScan", 172 "Run async scan test (read every row)"); 173 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, 174 "Run random read test"); 175 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 176 "Run random seek and scan 100 test"); 177 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 178 "Run random seek scan with both start and stop row (max 10 rows)"); 179 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 180 "Run random seek scan with both start and stop row (max 100 rows)"); 181 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 182 "Run random seek scan with both start and stop row (max 1000 rows)"); 183 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 184 "Run random seek scan with both start and stop row (max 10000 rows)"); 185 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 186 "Run random write test"); 187 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 188 "Run sequential read test"); 189 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 190 "Run sequential write test"); 191 addCommandDescriptor(ScanTest.class, "scan", 192 "Run scan test (read every row)"); 193 addCommandDescriptor(FilteredScanTest.class, "filterScan", 194 "Run scan test using a filter to find a specific row based on it's value " + 195 "(make sure to use --rows=20)"); 196 addCommandDescriptor(IncrementTest.class, "increment", 197 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 198 addCommandDescriptor(AppendTest.class, "append", 199 "Append on each row; clients overlap on keyspace so some concurrent operations"); 200 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 201 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 202 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 203 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 204 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 205 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 206 } 207 208 /** 209 * Enum for map metrics. Keep it out here rather than inside in the Map 210 * inner-class so we can find associated properties. 211 */ 212 protected static enum Counter { 213 /** elapsed time */ 214 ELAPSED_TIME, 215 /** number of rows */ 216 ROWS 217 } 218 219 protected static class RunResult implements Comparable<RunResult> { 220 public RunResult(long duration, Histogram hist) { 221 this.duration = duration; 222 this.hist = hist; 223 } 224 225 public final long duration; 226 public final Histogram hist; 227 228 @Override 229 public String toString() { 230 return Long.toString(duration); 231 } 232 233 @Override public int compareTo(RunResult o) { 234 return Long.compare(this.duration, o.duration); 235 } 236 } 237 238 /** 239 * Constructor 240 * @param conf Configuration object 241 */ 242 public PerformanceEvaluation(final Configuration conf) { 243 super(conf); 244 } 245 246 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, 247 String name, String description) { 248 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 249 COMMANDS.put(name, cmdDescriptor); 250 } 251 252 /** 253 * Implementations can have their status set. 254 */ 255 interface Status { 256 /** 257 * Sets status 258 * @param msg status message 259 * @throws IOException 260 */ 261 void setStatus(final String msg) throws IOException; 262 } 263 264 /** 265 * MapReduce job that runs a performance evaluation client in each map task. 266 */ 267 public static class EvaluationMapTask 268 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 269 270 /** configuration parameter name that contains the command */ 271 public final static String CMD_KEY = "EvaluationMapTask.command"; 272 /** configuration parameter name that contains the PE impl */ 273 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 274 275 private Class<? extends Test> cmd; 276 277 @Override 278 protected void setup(Context context) throws IOException, InterruptedException { 279 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 280 281 // this is required so that extensions of PE are instantiated within the 282 // map reduce task... 283 Class<? extends PerformanceEvaluation> peClass = 284 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 285 try { 286 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 287 } catch (Exception e) { 288 throw new IllegalStateException("Could not instantiate PE instance", e); 289 } 290 } 291 292 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 293 try { 294 return Class.forName(className).asSubclass(type); 295 } catch (ClassNotFoundException e) { 296 throw new IllegalStateException("Could not find class for name: " + className, e); 297 } 298 } 299 300 @Override 301 protected void map(LongWritable key, Text value, final Context context) 302 throws IOException, InterruptedException { 303 304 Status status = new Status() { 305 @Override 306 public void setStatus(String msg) { 307 context.setStatus(msg); 308 } 309 }; 310 311 ObjectMapper mapper = new ObjectMapper(); 312 TestOptions opts = mapper.readValue(value.toString(), TestOptions.class); 313 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 314 final Connection con = ConnectionFactory.createConnection(conf); 315 AsyncConnection asyncCon = null; 316 try { 317 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 318 } catch (ExecutionException e) { 319 throw new IOException(e); 320 } 321 322 // Evaluation task 323 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 324 // Collect how much time the thing took. Report as map output and 325 // to the ELAPSED_TIME counter. 326 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 327 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 328 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 329 context.progress(); 330 } 331 } 332 333 /* 334 * If table does not already exist, create. Also create a table when 335 * {@code opts.presplitRegions} is specified or when the existing table's 336 * region replica count doesn't match {@code opts.replicas}. 337 */ 338 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 339 TableName tableName = TableName.valueOf(opts.tableName); 340 boolean needsDelete = false, exists = admin.tableExists(tableName); 341 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 342 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 343 if (!exists && isReadCmd) { 344 throw new IllegalStateException( 345 "Must specify an existing table for read commands. Run a write command first."); 346 } 347 HTableDescriptor desc = 348 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; 349 byte[][] splits = getSplits(opts); 350 351 // recreate the table when user has requested presplit or when existing 352 // {RegionSplitPolicy,replica count} does not match requested, or when the 353 // number of column families does not match requested. 354 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 355 || (!isReadCmd && desc != null && 356 !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 357 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 358 || (desc != null && desc.getColumnFamilyCount() != opts.families)) { 359 needsDelete = true; 360 // wait, why did it delete my table?!? 361 LOG.debug(MoreObjects.toStringHelper("needsDelete") 362 .add("needsDelete", needsDelete) 363 .add("isReadCmd", isReadCmd) 364 .add("exists", exists) 365 .add("desc", desc) 366 .add("presplit", opts.presplitRegions) 367 .add("splitPolicy", opts.splitPolicy) 368 .add("replicas", opts.replicas) 369 .add("families", opts.families) 370 .toString()); 371 } 372 373 // remove an existing table 374 if (needsDelete) { 375 if (admin.isTableEnabled(tableName)) { 376 admin.disableTable(tableName); 377 } 378 admin.deleteTable(tableName); 379 } 380 381 // table creation is necessary 382 if (!exists || needsDelete) { 383 desc = getTableDescriptor(opts); 384 if (splits != null) { 385 if (LOG.isDebugEnabled()) { 386 for (int i = 0; i < splits.length; i++) { 387 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 388 } 389 } 390 } 391 admin.createTable(desc, splits); 392 LOG.info("Table " + desc + " created"); 393 } 394 return admin.tableExists(tableName); 395 } 396 397 /** 398 * Create an HTableDescriptor from provided TestOptions. 399 */ 400 protected static HTableDescriptor getTableDescriptor(TestOptions opts) { 401 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName)); 402 for (int family = 0; family < opts.families; family++) { 403 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 404 HColumnDescriptor familyDesc = new HColumnDescriptor(familyName); 405 familyDesc.setDataBlockEncoding(opts.blockEncoding); 406 familyDesc.setCompressionType(opts.compression); 407 familyDesc.setBloomFilterType(opts.bloomType); 408 familyDesc.setBlocksize(opts.blockSize); 409 if (opts.inMemoryCF) { 410 familyDesc.setInMemory(true); 411 } 412 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction); 413 tableDesc.addFamily(familyDesc); 414 } 415 if (opts.replicas != DEFAULT_OPTS.replicas) { 416 tableDesc.setRegionReplication(opts.replicas); 417 } 418 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 419 tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy); 420 } 421 return tableDesc; 422 } 423 424 /** 425 * generates splits based on total number of rows and specified split regions 426 */ 427 protected static byte[][] getSplits(TestOptions opts) { 428 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) 429 return null; 430 431 int numSplitPoints = opts.presplitRegions - 1; 432 byte[][] splits = new byte[numSplitPoints][]; 433 int jump = opts.totalRows / opts.presplitRegions; 434 for (int i = 0; i < numSplitPoints; i++) { 435 int rowkey = jump * (1 + i); 436 splits[i] = format(rowkey); 437 } 438 return splits; 439 } 440 441 static void setupConnectionCount(final TestOptions opts) { 442 if (opts.oneCon) { 443 opts.connCount = 1; 444 } else { 445 if (opts.connCount == -1) { 446 // set to thread number if connCount is not set 447 opts.connCount = opts.numClientThreads; 448 } 449 } 450 } 451 452 /* 453 * Run all clients in this vm each to its own thread. 454 */ 455 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 456 throws IOException, InterruptedException, ExecutionException { 457 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 458 assert cmd != null; 459 @SuppressWarnings("unchecked") 460 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 461 RunResult[] results = new RunResult[opts.numClientThreads]; 462 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 463 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 464 setupConnectionCount(opts); 465 final Connection[] cons = new Connection[opts.connCount]; 466 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 467 for (int i = 0; i < opts.connCount; i++) { 468 cons[i] = ConnectionFactory.createConnection(conf); 469 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 470 } 471 LOG.info("Created " + opts.connCount + " connections for " + 472 opts.numClientThreads + " threads"); 473 for (int i = 0; i < threads.length; i++) { 474 final int index = i; 475 threads[i] = pool.submit(new Callable<RunResult>() { 476 @Override 477 public RunResult call() throws Exception { 478 TestOptions threadOpts = new TestOptions(opts); 479 final Connection con = cons[index % cons.length]; 480 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 481 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 482 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 483 @Override 484 public void setStatus(final String msg) throws IOException { 485 LOG.info(msg); 486 } 487 }); 488 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration + 489 "ms over " + threadOpts.perClientRunRows + " rows"); 490 return run; 491 } 492 }); 493 } 494 pool.shutdown(); 495 496 for (int i = 0; i < threads.length; i++) { 497 try { 498 results[i] = threads[i].get(); 499 } catch (ExecutionException e) { 500 throw new IOException(e.getCause()); 501 } 502 } 503 final String test = cmd.getSimpleName(); 504 LOG.info("[" + test + "] Summary of timings (ms): " 505 + Arrays.toString(results)); 506 Arrays.sort(results); 507 long total = 0; 508 float avgLatency = 0 ; 509 float avgTPS = 0; 510 for (RunResult result : results) { 511 total += result.duration; 512 avgLatency += result.hist.getSnapshot().getMean(); 513 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 514 } 515 avgTPS *= 1000; // ms to second 516 avgLatency = avgLatency / results.length; 517 LOG.info("[" + test + " duration ]" 518 + "\tMin: " + results[0] + "ms" 519 + "\tMax: " + results[results.length - 1] + "ms" 520 + "\tAvg: " + (total / results.length) + "ms"); 521 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 522 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 523 for (int i = 0; i < opts.connCount; i++) { 524 cons[i].close(); 525 asyncCons[i].close(); 526 } 527 528 529 return results; 530 } 531 532 /* 533 * Run a mapreduce job. Run as many maps as asked-for clients. 534 * Before we start up the job, write out an input file with instruction 535 * per client regards which row they are to start on. 536 * @param cmd Command to run. 537 * @throws IOException 538 */ 539 static Job doMapReduce(TestOptions opts, final Configuration conf) 540 throws IOException, InterruptedException, ClassNotFoundException { 541 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 542 assert cmd != null; 543 Path inputDir = writeInputFile(conf, opts); 544 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 545 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 546 Job job = Job.getInstance(conf); 547 job.setJarByClass(PerformanceEvaluation.class); 548 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 549 550 job.setInputFormatClass(NLineInputFormat.class); 551 NLineInputFormat.setInputPaths(job, inputDir); 552 // this is default, but be explicit about it just in case. 553 NLineInputFormat.setNumLinesPerSplit(job, 1); 554 555 job.setOutputKeyClass(LongWritable.class); 556 job.setOutputValueClass(LongWritable.class); 557 558 job.setMapperClass(EvaluationMapTask.class); 559 job.setReducerClass(LongSumReducer.class); 560 561 job.setNumReduceTasks(1); 562 563 job.setOutputFormatClass(TextOutputFormat.class); 564 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 565 566 TableMapReduceUtil.addDependencyJars(job); 567 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 568 Histogram.class, // yammer metrics 569 ObjectMapper.class, // jackson-mapper-asl 570 FilterAllFilter.class // hbase-server tests jar 571 ); 572 573 TableMapReduceUtil.initCredentials(job); 574 575 job.waitForCompletion(true); 576 return job; 577 } 578 579 /** 580 * Each client has one mapper to do the work, and client do the resulting count in a map task. 581 */ 582 583 static String JOB_INPUT_FILENAME = "input.txt"; 584 585 /* 586 * Write input file of offsets-per-client for the mapreduce job. 587 * @param c Configuration 588 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 589 * @throws IOException 590 */ 591 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 592 return writeInputFile(c, opts, new Path(".")); 593 } 594 595 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 596 throws IOException { 597 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 598 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 599 Path inputDir = new Path(jobdir, "inputs"); 600 601 FileSystem fs = FileSystem.get(c); 602 fs.mkdirs(inputDir); 603 604 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 605 PrintStream out = new PrintStream(fs.create(inputFile)); 606 // Make input random. 607 Map<Integer, String> m = new TreeMap<>(); 608 Hash h = MurmurHash.getInstance(); 609 int perClientRows = (opts.totalRows / opts.numClientThreads); 610 try { 611 for (int j = 0; j < opts.numClientThreads; j++) { 612 TestOptions next = new TestOptions(opts); 613 next.startRow = j * perClientRows; 614 next.perClientRunRows = perClientRows; 615 String s = MAPPER.writeValueAsString(next); 616 LOG.info("Client=" + j + ", input=" + s); 617 byte[] b = Bytes.toBytes(s); 618 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 619 m.put(hash, s); 620 } 621 for (Map.Entry<Integer, String> e: m.entrySet()) { 622 out.println(e.getValue()); 623 } 624 } finally { 625 out.close(); 626 } 627 return inputDir; 628 } 629 630 /** 631 * Describes a command. 632 */ 633 static class CmdDescriptor { 634 private Class<? extends TestBase> cmdClass; 635 private String name; 636 private String description; 637 638 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 639 this.cmdClass = cmdClass; 640 this.name = name; 641 this.description = description; 642 } 643 644 public Class<? extends TestBase> getCmdClass() { 645 return cmdClass; 646 } 647 648 public String getName() { 649 return name; 650 } 651 652 public String getDescription() { 653 return description; 654 } 655 } 656 657 /** 658 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. 659 * This makes tracking all these arguments a little easier. 660 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON 661 * serialization of this TestOptions class behave), and you need to add to the clone constructor 662 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below. 663 */ 664 static class TestOptions { 665 String cmdName = null; 666 boolean nomapred = false; 667 boolean filterAll = false; 668 int startRow = 0; 669 float size = 1.0f; 670 int perClientRunRows = DEFAULT_ROWS_PER_GB; 671 int numClientThreads = 1; 672 int totalRows = DEFAULT_ROWS_PER_GB; 673 int measureAfter = 0; 674 float sampleRate = 1.0f; 675 double traceRate = 0.0; 676 String tableName = TABLE_NAME; 677 boolean flushCommits = true; 678 boolean writeToWAL = true; 679 boolean autoFlush = false; 680 boolean oneCon = false; 681 int connCount = -1; //wil decide the actual num later 682 boolean useTags = false; 683 int noOfTags = 1; 684 boolean reportLatency = false; 685 int multiGet = 0; 686 int multiPut = 0; 687 int randomSleep = 0; 688 boolean inMemoryCF = false; 689 int presplitRegions = 0; 690 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; 691 String splitPolicy = null; 692 Compression.Algorithm compression = Compression.Algorithm.NONE; 693 BloomType bloomType = BloomType.ROW; 694 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 695 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 696 boolean valueRandom = false; 697 boolean valueZipf = false; 698 int valueSize = DEFAULT_VALUE_LENGTH; 699 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; 700 int cycles = 1; 701 int columns = 1; 702 int families = 1; 703 int caching = 30; 704 boolean addColumns = true; 705 MemoryCompactionPolicy inMemoryCompaction = 706 MemoryCompactionPolicy.valueOf( 707 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 708 boolean asyncPrefetch = false; 709 boolean cacheBlocks = true; 710 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 711 long bufferSize = 2l * 1024l * 1024l; 712 713 public TestOptions() {} 714 715 /** 716 * Clone constructor. 717 * @param that Object to copy from. 718 */ 719 public TestOptions(TestOptions that) { 720 this.cmdName = that.cmdName; 721 this.cycles = that.cycles; 722 this.nomapred = that.nomapred; 723 this.startRow = that.startRow; 724 this.size = that.size; 725 this.perClientRunRows = that.perClientRunRows; 726 this.numClientThreads = that.numClientThreads; 727 this.totalRows = that.totalRows; 728 this.sampleRate = that.sampleRate; 729 this.traceRate = that.traceRate; 730 this.tableName = that.tableName; 731 this.flushCommits = that.flushCommits; 732 this.writeToWAL = that.writeToWAL; 733 this.autoFlush = that.autoFlush; 734 this.oneCon = that.oneCon; 735 this.connCount = that.connCount; 736 this.useTags = that.useTags; 737 this.noOfTags = that.noOfTags; 738 this.reportLatency = that.reportLatency; 739 this.multiGet = that.multiGet; 740 this.multiPut = that.multiPut; 741 this.inMemoryCF = that.inMemoryCF; 742 this.presplitRegions = that.presplitRegions; 743 this.replicas = that.replicas; 744 this.splitPolicy = that.splitPolicy; 745 this.compression = that.compression; 746 this.blockEncoding = that.blockEncoding; 747 this.filterAll = that.filterAll; 748 this.bloomType = that.bloomType; 749 this.blockSize = that.blockSize; 750 this.valueRandom = that.valueRandom; 751 this.valueZipf = that.valueZipf; 752 this.valueSize = that.valueSize; 753 this.period = that.period; 754 this.randomSleep = that.randomSleep; 755 this.measureAfter = that.measureAfter; 756 this.addColumns = that.addColumns; 757 this.columns = that.columns; 758 this.families = that.families; 759 this.caching = that.caching; 760 this.inMemoryCompaction = that.inMemoryCompaction; 761 this.asyncPrefetch = that.asyncPrefetch; 762 this.cacheBlocks = that.cacheBlocks; 763 this.scanReadType = that.scanReadType; 764 this.bufferSize = that.bufferSize; 765 } 766 767 public int getCaching() { 768 return this.caching; 769 } 770 771 public void setCaching(final int caching) { 772 this.caching = caching; 773 } 774 775 public int getColumns() { 776 return this.columns; 777 } 778 779 public void setColumns(final int columns) { 780 this.columns = columns; 781 } 782 783 public int getFamilies() { 784 return this.families; 785 } 786 787 public void setFamilies(final int families) { 788 this.families = families; 789 } 790 791 public int getCycles() { 792 return this.cycles; 793 } 794 795 public void setCycles(final int cycles) { 796 this.cycles = cycles; 797 } 798 799 public boolean isValueZipf() { 800 return valueZipf; 801 } 802 803 public void setValueZipf(boolean valueZipf) { 804 this.valueZipf = valueZipf; 805 } 806 807 public String getCmdName() { 808 return cmdName; 809 } 810 811 public void setCmdName(String cmdName) { 812 this.cmdName = cmdName; 813 } 814 815 public int getRandomSleep() { 816 return randomSleep; 817 } 818 819 public void setRandomSleep(int randomSleep) { 820 this.randomSleep = randomSleep; 821 } 822 823 public int getReplicas() { 824 return replicas; 825 } 826 827 public void setReplicas(int replicas) { 828 this.replicas = replicas; 829 } 830 831 public String getSplitPolicy() { 832 return splitPolicy; 833 } 834 835 public void setSplitPolicy(String splitPolicy) { 836 this.splitPolicy = splitPolicy; 837 } 838 839 public void setNomapred(boolean nomapred) { 840 this.nomapred = nomapred; 841 } 842 843 public void setFilterAll(boolean filterAll) { 844 this.filterAll = filterAll; 845 } 846 847 public void setStartRow(int startRow) { 848 this.startRow = startRow; 849 } 850 851 public void setSize(float size) { 852 this.size = size; 853 } 854 855 public void setPerClientRunRows(int perClientRunRows) { 856 this.perClientRunRows = perClientRunRows; 857 } 858 859 public void setNumClientThreads(int numClientThreads) { 860 this.numClientThreads = numClientThreads; 861 } 862 863 public void setTotalRows(int totalRows) { 864 this.totalRows = totalRows; 865 } 866 867 public void setSampleRate(float sampleRate) { 868 this.sampleRate = sampleRate; 869 } 870 871 public void setTraceRate(double traceRate) { 872 this.traceRate = traceRate; 873 } 874 875 public void setTableName(String tableName) { 876 this.tableName = tableName; 877 } 878 879 public void setFlushCommits(boolean flushCommits) { 880 this.flushCommits = flushCommits; 881 } 882 883 public void setWriteToWAL(boolean writeToWAL) { 884 this.writeToWAL = writeToWAL; 885 } 886 887 public void setAutoFlush(boolean autoFlush) { 888 this.autoFlush = autoFlush; 889 } 890 891 public void setOneCon(boolean oneCon) { 892 this.oneCon = oneCon; 893 } 894 895 public int getConnCount() { 896 return connCount; 897 } 898 899 public void setConnCount(int connCount) { 900 this.connCount = connCount; 901 } 902 903 public void setUseTags(boolean useTags) { 904 this.useTags = useTags; 905 } 906 907 public void setNoOfTags(int noOfTags) { 908 this.noOfTags = noOfTags; 909 } 910 911 public void setReportLatency(boolean reportLatency) { 912 this.reportLatency = reportLatency; 913 } 914 915 public void setMultiGet(int multiGet) { 916 this.multiGet = multiGet; 917 } 918 919 public void setMultiPut(int multiPut) { 920 this.multiPut = multiPut; 921 } 922 923 public void setInMemoryCF(boolean inMemoryCF) { 924 this.inMemoryCF = inMemoryCF; 925 } 926 927 public void setPresplitRegions(int presplitRegions) { 928 this.presplitRegions = presplitRegions; 929 } 930 931 public void setCompression(Compression.Algorithm compression) { 932 this.compression = compression; 933 } 934 935 public void setBloomType(BloomType bloomType) { 936 this.bloomType = bloomType; 937 } 938 939 public void setBlockSize(int blockSize) { 940 this.blockSize = blockSize; 941 } 942 943 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 944 this.blockEncoding = blockEncoding; 945 } 946 947 public void setValueRandom(boolean valueRandom) { 948 this.valueRandom = valueRandom; 949 } 950 951 public void setValueSize(int valueSize) { 952 this.valueSize = valueSize; 953 } 954 955 public void setBufferSize(long bufferSize) { 956 this.bufferSize = bufferSize; 957 } 958 959 public void setPeriod(int period) { 960 this.period = period; 961 } 962 963 public boolean isNomapred() { 964 return nomapred; 965 } 966 967 public boolean isFilterAll() { 968 return filterAll; 969 } 970 971 public int getStartRow() { 972 return startRow; 973 } 974 975 public float getSize() { 976 return size; 977 } 978 979 public int getPerClientRunRows() { 980 return perClientRunRows; 981 } 982 983 public int getNumClientThreads() { 984 return numClientThreads; 985 } 986 987 public int getTotalRows() { 988 return totalRows; 989 } 990 991 public float getSampleRate() { 992 return sampleRate; 993 } 994 995 public double getTraceRate() { 996 return traceRate; 997 } 998 999 public String getTableName() { 1000 return tableName; 1001 } 1002 1003 public boolean isFlushCommits() { 1004 return flushCommits; 1005 } 1006 1007 public boolean isWriteToWAL() { 1008 return writeToWAL; 1009 } 1010 1011 public boolean isAutoFlush() { 1012 return autoFlush; 1013 } 1014 1015 public boolean isUseTags() { 1016 return useTags; 1017 } 1018 1019 public int getNoOfTags() { 1020 return noOfTags; 1021 } 1022 1023 public boolean isReportLatency() { 1024 return reportLatency; 1025 } 1026 1027 public int getMultiGet() { 1028 return multiGet; 1029 } 1030 1031 public int getMultiPut() { 1032 return multiPut; 1033 } 1034 1035 public boolean isInMemoryCF() { 1036 return inMemoryCF; 1037 } 1038 1039 public int getPresplitRegions() { 1040 return presplitRegions; 1041 } 1042 1043 public Compression.Algorithm getCompression() { 1044 return compression; 1045 } 1046 1047 public DataBlockEncoding getBlockEncoding() { 1048 return blockEncoding; 1049 } 1050 1051 public boolean isValueRandom() { 1052 return valueRandom; 1053 } 1054 1055 public int getValueSize() { 1056 return valueSize; 1057 } 1058 1059 public int getPeriod() { 1060 return period; 1061 } 1062 1063 public BloomType getBloomType() { 1064 return bloomType; 1065 } 1066 1067 public int getBlockSize() { 1068 return blockSize; 1069 } 1070 1071 public boolean isOneCon() { 1072 return oneCon; 1073 } 1074 1075 public int getMeasureAfter() { 1076 return measureAfter; 1077 } 1078 1079 public void setMeasureAfter(int measureAfter) { 1080 this.measureAfter = measureAfter; 1081 } 1082 1083 public boolean getAddColumns() { 1084 return addColumns; 1085 } 1086 1087 public void setAddColumns(boolean addColumns) { 1088 this.addColumns = addColumns; 1089 } 1090 1091 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1092 this.inMemoryCompaction = inMemoryCompaction; 1093 } 1094 1095 public MemoryCompactionPolicy getInMemoryCompaction() { 1096 return this.inMemoryCompaction; 1097 } 1098 1099 public long getBufferSize() { 1100 return this.bufferSize; 1101 } 1102 } 1103 1104 /* 1105 * A test. 1106 * Subclass to particularize what happens per row. 1107 */ 1108 static abstract class TestBase { 1109 // Below is make it so when Tests are all running in the one 1110 // jvm, that they each have a differently seeded Random. 1111 private static final Random randomSeed = new Random(System.currentTimeMillis()); 1112 1113 private static long nextRandomSeed() { 1114 return randomSeed.nextLong(); 1115 } 1116 private final int everyN; 1117 1118 protected final Random rand = new Random(nextRandomSeed()); 1119 protected final Configuration conf; 1120 protected final TestOptions opts; 1121 1122 private final Status status; 1123 private final Sampler traceSampler; 1124 private final SpanReceiverHost receiverHost; 1125 1126 private String testName; 1127 private Histogram latencyHistogram; 1128 private Histogram valueSizeHistogram; 1129 private Histogram rpcCallsHistogram; 1130 private Histogram remoteRpcCallsHistogram; 1131 private Histogram millisBetweenNextHistogram; 1132 private Histogram regionsScannedHistogram; 1133 private Histogram bytesInResultsHistogram; 1134 private Histogram bytesInRemoteResultsHistogram; 1135 private RandomDistribution.Zipf zipf; 1136 1137 /** 1138 * Note that all subclasses of this class must provide a public constructor 1139 * that has the exact same list of arguments. 1140 */ 1141 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1142 this.conf = conf; 1143 this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); 1144 this.opts = options; 1145 this.status = status; 1146 this.testName = this.getClass().getSimpleName(); 1147 if (options.traceRate >= 1.0) { 1148 this.traceSampler = Sampler.ALWAYS; 1149 } else if (options.traceRate > 0.0) { 1150 conf.setDouble("hbase.sampler.fraction", options.traceRate); 1151 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); 1152 } else { 1153 this.traceSampler = Sampler.NEVER; 1154 } 1155 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1156 if (options.isValueZipf()) { 1157 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1158 } 1159 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1160 } 1161 1162 int getValueLength(final Random r) { 1163 if (this.opts.isValueRandom()) { 1164 return r.nextInt(opts.valueSize); 1165 } else if (this.opts.isValueZipf()) { 1166 return Math.abs(this.zipf.nextInt()); 1167 } else { 1168 return opts.valueSize; 1169 } 1170 } 1171 1172 void updateValueSize(final Result [] rs) throws IOException { 1173 if (rs == null || !isRandomValueSize()) return; 1174 for (Result r: rs) updateValueSize(r); 1175 } 1176 1177 void updateValueSize(final Result r) throws IOException { 1178 if (r == null || !isRandomValueSize()) return; 1179 int size = 0; 1180 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1181 size += scanner.current().getValueLength(); 1182 } 1183 updateValueSize(size); 1184 } 1185 1186 void updateValueSize(final int valueSize) { 1187 if (!isRandomValueSize()) return; 1188 this.valueSizeHistogram.update(valueSize); 1189 } 1190 1191 void updateScanMetrics(final ScanMetrics metrics) { 1192 if (metrics == null) return; 1193 Map<String,Long> metricsMap = metrics.getMetricsMap(); 1194 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1195 if (rpcCalls != null) { 1196 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1197 } 1198 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1199 if (remoteRpcCalls != null) { 1200 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1201 } 1202 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1203 if (millisBetweenNext != null) { 1204 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1205 } 1206 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1207 if (regionsScanned != null) { 1208 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1209 } 1210 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1211 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1212 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1213 } 1214 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1215 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1216 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1217 } 1218 } 1219 1220 String generateStatus(final int sr, final int i, final int lr) { 1221 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() + 1222 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport()); 1223 } 1224 1225 boolean isRandomValueSize() { 1226 return opts.valueRandom; 1227 } 1228 1229 protected int getReportingPeriod() { 1230 return opts.period; 1231 } 1232 1233 /** 1234 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1235 */ 1236 public Histogram getLatencyHistogram() { 1237 return latencyHistogram; 1238 } 1239 1240 void testSetup() throws IOException { 1241 // test metrics 1242 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1243 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1244 // scan metrics 1245 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1246 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1247 millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1248 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1249 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1250 bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1251 1252 onStartup(); 1253 } 1254 1255 abstract void onStartup() throws IOException; 1256 1257 void testTakedown() throws IOException { 1258 onTakedown(); 1259 // Print all stats for this thread continuously. 1260 // Synchronize on Test.class so different threads don't intermingle the 1261 // output. We can't use 'this' here because each thread has its own instance of Test class. 1262 synchronized (Test.class) { 1263 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1264 status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport( 1265 latencyHistogram)); 1266 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1267 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1268 if (valueSizeHistogram.getCount() > 0) { 1269 status.setStatus("ValueSize (bytes) : " 1270 + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1271 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1272 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1273 } else { 1274 status.setStatus("No valueSize statistics available"); 1275 } 1276 if (rpcCallsHistogram.getCount() > 0) { 1277 status.setStatus("rpcCalls (count): " + 1278 YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1279 } 1280 if (remoteRpcCallsHistogram.getCount() > 0) { 1281 status.setStatus("remoteRpcCalls (count): " + 1282 YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1283 } 1284 if (millisBetweenNextHistogram.getCount() > 0) { 1285 status.setStatus("millisBetweenNext (latency): " + 1286 YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1287 } 1288 if (regionsScannedHistogram.getCount() > 0) { 1289 status.setStatus("regionsScanned (count): " + 1290 YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1291 } 1292 if (bytesInResultsHistogram.getCount() > 0) { 1293 status.setStatus("bytesInResults (size): " + 1294 YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1295 } 1296 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1297 status.setStatus("bytesInRemoteResults (size): " + 1298 YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1299 } 1300 } 1301 receiverHost.closeReceivers(); 1302 } 1303 1304 abstract void onTakedown() throws IOException; 1305 1306 1307 /* 1308 * Run test 1309 * @return Elapsed time. 1310 * @throws IOException 1311 */ 1312 long test() throws IOException, InterruptedException { 1313 testSetup(); 1314 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1315 final long startTime = System.nanoTime(); 1316 try { 1317 testTimed(); 1318 } finally { 1319 testTakedown(); 1320 } 1321 return (System.nanoTime() - startTime) / 1000000; 1322 } 1323 1324 int getStartRow() { 1325 return opts.startRow; 1326 } 1327 1328 int getLastRow() { 1329 return getStartRow() + opts.perClientRunRows; 1330 } 1331 1332 /** 1333 * Provides an extension point for tests that don't want a per row invocation. 1334 */ 1335 void testTimed() throws IOException, InterruptedException { 1336 int startRow = getStartRow(); 1337 int lastRow = getLastRow(); 1338 TraceUtil.addSampler(traceSampler); 1339 // Report on completion of 1/10th of total. 1340 for (int ii = 0; ii < opts.cycles; ii++) { 1341 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1342 for (int i = startRow; i < lastRow; i++) { 1343 if (i % everyN != 0) continue; 1344 long startTime = System.nanoTime(); 1345 boolean requestSent = false; 1346 try (TraceScope scope = TraceUtil.createTrace("test row");){ 1347 requestSent = testRow(i); 1348 } 1349 if ( (i - startRow) > opts.measureAfter) { 1350 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1351 // first 9 times and sends the actual get request in the 10th iteration. 1352 // We should only set latency when actual request is sent because otherwise 1353 // it turns out to be 0. 1354 if (requestSent) { 1355 latencyHistogram.update((System.nanoTime() - startTime) / 1000); 1356 } 1357 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1358 status.setStatus(generateStatus(startRow, i, lastRow)); 1359 } 1360 } 1361 } 1362 } 1363 } 1364 1365 /** 1366 * @return Subset of the histograms' calculation. 1367 */ 1368 public String getShortLatencyReport() { 1369 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1370 } 1371 1372 /** 1373 * @return Subset of the histograms' calculation. 1374 */ 1375 public String getShortValueSizeReport() { 1376 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1377 } 1378 1379 1380 /** 1381 * Test for individual row. 1382 * @param i Row index. 1383 * @return true if the row was sent to server and need to record metrics. 1384 * False if not, multiGet and multiPut e.g., the rows are sent 1385 * to server only if enough gets/puts are gathered. 1386 */ 1387 abstract boolean testRow(final int i) throws IOException, InterruptedException; 1388 } 1389 1390 static abstract class Test extends TestBase { 1391 protected Connection connection; 1392 1393 Test(final Connection con, final TestOptions options, final Status status) { 1394 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1395 this.connection = con; 1396 } 1397 } 1398 1399 static abstract class AsyncTest extends TestBase { 1400 protected AsyncConnection connection; 1401 1402 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1403 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1404 this.connection = con; 1405 } 1406 } 1407 1408 static abstract class TableTest extends Test { 1409 protected Table table; 1410 1411 TableTest(Connection con, TestOptions options, Status status) { 1412 super(con, options, status); 1413 } 1414 1415 @Override 1416 void onStartup() throws IOException { 1417 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1418 } 1419 1420 @Override 1421 void onTakedown() throws IOException { 1422 table.close(); 1423 } 1424 } 1425 1426 static abstract class AsyncTableTest extends AsyncTest { 1427 protected AsyncTable<?> table; 1428 1429 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1430 super(con, options, status); 1431 } 1432 1433 @Override 1434 void onStartup() throws IOException { 1435 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1436 } 1437 1438 @Override 1439 void onTakedown() throws IOException { 1440 } 1441 } 1442 1443 static class AsyncRandomReadTest extends AsyncTableTest { 1444 private final Consistency consistency; 1445 private ArrayList<Get> gets; 1446 private Random rd = new Random(); 1447 1448 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1449 super(con, options, status); 1450 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1451 if (opts.multiGet > 0) { 1452 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1453 this.gets = new ArrayList<>(opts.multiGet); 1454 } 1455 } 1456 1457 @Override 1458 boolean testRow(final int i) throws IOException, InterruptedException { 1459 if (opts.randomSleep > 0) { 1460 Thread.sleep(rd.nextInt(opts.randomSleep)); 1461 } 1462 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1463 for (int family = 0; family < opts.families; family++) { 1464 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1465 if (opts.addColumns) { 1466 for (int column = 0; column < opts.columns; column++) { 1467 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1468 get.addColumn(familyName, qualifier); 1469 } 1470 } else { 1471 get.addFamily(familyName); 1472 } 1473 } 1474 if (opts.filterAll) { 1475 get.setFilter(new FilterAllFilter()); 1476 } 1477 get.setConsistency(consistency); 1478 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1479 try { 1480 if (opts.multiGet > 0) { 1481 this.gets.add(get); 1482 if (this.gets.size() == opts.multiGet) { 1483 Result[] rs = 1484 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1485 updateValueSize(rs); 1486 this.gets.clear(); 1487 } else { 1488 return false; 1489 } 1490 } else { 1491 updateValueSize(this.table.get(get).get()); 1492 } 1493 } catch (ExecutionException e) { 1494 throw new IOException(e); 1495 } 1496 return true; 1497 } 1498 1499 public static RuntimeException runtime(Throwable e) { 1500 if (e instanceof RuntimeException) { 1501 return (RuntimeException) e; 1502 } 1503 return new RuntimeException(e); 1504 } 1505 1506 public static <V> V propagate(Callable<V> callable) { 1507 try { 1508 return callable.call(); 1509 } catch (Exception e) { 1510 throw runtime(e); 1511 } 1512 } 1513 1514 @Override 1515 protected int getReportingPeriod() { 1516 int period = opts.perClientRunRows / 10; 1517 return period == 0 ? opts.perClientRunRows : period; 1518 } 1519 1520 @Override 1521 protected void testTakedown() throws IOException { 1522 if (this.gets != null && this.gets.size() > 0) { 1523 this.table.get(gets); 1524 this.gets.clear(); 1525 } 1526 super.testTakedown(); 1527 } 1528 } 1529 1530 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1531 1532 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1533 super(con, options, status); 1534 } 1535 1536 @Override 1537 protected byte[] generateRow(final int i) { 1538 return getRandomRow(this.rand, opts.totalRows); 1539 } 1540 } 1541 1542 static class AsyncScanTest extends AsyncTableTest { 1543 private ResultScanner testScanner; 1544 private AsyncTable<?> asyncTable; 1545 1546 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1547 super(con, options, status); 1548 } 1549 1550 @Override 1551 void onStartup() throws IOException { 1552 this.asyncTable = 1553 connection.getTable(TableName.valueOf(opts.tableName), 1554 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1555 } 1556 1557 @Override 1558 void testTakedown() throws IOException { 1559 if (this.testScanner != null) { 1560 updateScanMetrics(this.testScanner.getScanMetrics()); 1561 this.testScanner.close(); 1562 } 1563 super.testTakedown(); 1564 } 1565 1566 @Override 1567 boolean testRow(final int i) throws IOException { 1568 if (this.testScanner == null) { 1569 Scan scan = 1570 new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1571 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1572 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1573 for (int family = 0; family < opts.families; family++) { 1574 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1575 if (opts.addColumns) { 1576 for (int column = 0; column < opts.columns; column++) { 1577 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1578 scan.addColumn(familyName, qualifier); 1579 } 1580 } else { 1581 scan.addFamily(familyName); 1582 } 1583 } 1584 if (opts.filterAll) { 1585 scan.setFilter(new FilterAllFilter()); 1586 } 1587 this.testScanner = asyncTable.getScanner(scan); 1588 } 1589 Result r = testScanner.next(); 1590 updateValueSize(r); 1591 return true; 1592 } 1593 } 1594 1595 static class AsyncSequentialReadTest extends AsyncTableTest { 1596 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1597 super(con, options, status); 1598 } 1599 1600 @Override 1601 boolean testRow(final int i) throws IOException, InterruptedException { 1602 Get get = new Get(format(i)); 1603 for (int family = 0; family < opts.families; family++) { 1604 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1605 if (opts.addColumns) { 1606 for (int column = 0; column < opts.columns; column++) { 1607 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1608 get.addColumn(familyName, qualifier); 1609 } 1610 } else { 1611 get.addFamily(familyName); 1612 } 1613 } 1614 if (opts.filterAll) { 1615 get.setFilter(new FilterAllFilter()); 1616 } 1617 try { 1618 updateValueSize(table.get(get).get()); 1619 } catch (ExecutionException e) { 1620 throw new IOException(e); 1621 } 1622 return true; 1623 } 1624 } 1625 1626 static class AsyncSequentialWriteTest extends AsyncTableTest { 1627 private ArrayList<Put> puts; 1628 1629 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1630 super(con, options, status); 1631 if (opts.multiPut > 0) { 1632 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1633 this.puts = new ArrayList<>(opts.multiPut); 1634 } 1635 } 1636 1637 protected byte[] generateRow(final int i) { 1638 return format(i); 1639 } 1640 1641 @Override 1642 @SuppressWarnings("ReturnValueIgnored") 1643 boolean testRow(final int i) throws IOException, InterruptedException { 1644 byte[] row = generateRow(i); 1645 Put put = new Put(row); 1646 for (int family = 0; family < opts.families; family++) { 1647 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1648 for (int column = 0; column < opts.columns; column++) { 1649 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1650 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1651 if (opts.useTags) { 1652 byte[] tag = generateData(this.rand, TAG_LENGTH); 1653 Tag[] tags = new Tag[opts.noOfTags]; 1654 for (int n = 0; n < opts.noOfTags; n++) { 1655 Tag t = new ArrayBackedTag((byte) n, tag); 1656 tags[n] = t; 1657 } 1658 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 1659 value, tags); 1660 put.add(kv); 1661 updateValueSize(kv.getValueLength()); 1662 } else { 1663 put.addColumn(familyName, qualifier, value); 1664 updateValueSize(value.length); 1665 } 1666 } 1667 } 1668 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1669 try { 1670 table.put(put).get(); 1671 if (opts.multiPut > 0) { 1672 this.puts.add(put); 1673 if (this.puts.size() == opts.multiPut) { 1674 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1675 this.puts.clear(); 1676 } else { 1677 return false; 1678 } 1679 } else { 1680 table.put(put).get(); 1681 } 1682 } catch (ExecutionException e) { 1683 throw new IOException(e); 1684 } 1685 return true; 1686 } 1687 } 1688 1689 static abstract class BufferedMutatorTest extends Test { 1690 protected BufferedMutator mutator; 1691 protected Table table; 1692 1693 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1694 super(con, options, status); 1695 } 1696 1697 @Override 1698 void onStartup() throws IOException { 1699 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1700 p.writeBufferSize(opts.bufferSize); 1701 this.mutator = connection.getBufferedMutator(p); 1702 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1703 } 1704 1705 @Override 1706 void onTakedown() throws IOException { 1707 mutator.close(); 1708 table.close(); 1709 } 1710 } 1711 1712 static class RandomSeekScanTest extends TableTest { 1713 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1714 super(con, options, status); 1715 } 1716 1717 @Override 1718 boolean testRow(final int i) throws IOException { 1719 Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) 1720 .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 1721 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 1722 .setScanMetricsEnabled(true); 1723 FilterList list = new FilterList(); 1724 for (int family = 0; family < opts.families; family++) { 1725 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1726 if (opts.addColumns) { 1727 for (int column = 0; column < opts.columns; column++) { 1728 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1729 scan.addColumn(familyName, qualifier); 1730 } 1731 } else { 1732 scan.addFamily(familyName); 1733 } 1734 } 1735 if (opts.filterAll) { 1736 list.addFilter(new FilterAllFilter()); 1737 } 1738 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1739 scan.setFilter(list); 1740 ResultScanner s = this.table.getScanner(scan); 1741 try { 1742 for (Result rr; (rr = s.next()) != null;) { 1743 updateValueSize(rr); 1744 } 1745 } finally { 1746 updateScanMetrics(s.getScanMetrics()); 1747 s.close(); 1748 } 1749 return true; 1750 } 1751 1752 @Override 1753 protected int getReportingPeriod() { 1754 int period = opts.perClientRunRows / 100; 1755 return period == 0 ? opts.perClientRunRows : period; 1756 } 1757 1758 } 1759 1760 static abstract class RandomScanWithRangeTest extends TableTest { 1761 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1762 super(con, options, status); 1763 } 1764 1765 @Override 1766 boolean testRow(final int i) throws IOException { 1767 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1768 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1769 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1770 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1771 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1772 for (int family = 0; family < opts.families; family++) { 1773 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1774 if (opts.addColumns) { 1775 for (int column = 0; column < opts.columns; column++) { 1776 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1777 scan.addColumn(familyName, qualifier); 1778 } 1779 } else { 1780 scan.addFamily(familyName); 1781 } 1782 } 1783 if (opts.filterAll) { 1784 scan.setFilter(new FilterAllFilter()); 1785 } 1786 Result r = null; 1787 int count = 0; 1788 ResultScanner s = this.table.getScanner(scan); 1789 try { 1790 for (; (r = s.next()) != null;) { 1791 updateValueSize(r); 1792 count++; 1793 } 1794 if (i % 100 == 0) { 1795 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1796 Bytes.toString(startAndStopRow.getFirst()), 1797 Bytes.toString(startAndStopRow.getSecond()), count)); 1798 } 1799 } finally { 1800 updateScanMetrics(s.getScanMetrics()); 1801 s.close(); 1802 } 1803 return true; 1804 } 1805 1806 protected abstract Pair<byte[],byte[]> getStartAndStopRow(); 1807 1808 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1809 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1810 int stop = start + maxRange; 1811 return new Pair<>(format(start), format(stop)); 1812 } 1813 1814 @Override 1815 protected int getReportingPeriod() { 1816 int period = opts.perClientRunRows / 100; 1817 return period == 0? opts.perClientRunRows: period; 1818 } 1819 } 1820 1821 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1822 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1823 super(con, options, status); 1824 } 1825 1826 @Override 1827 protected Pair<byte[], byte[]> getStartAndStopRow() { 1828 return generateStartAndStopRows(10); 1829 } 1830 } 1831 1832 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1833 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1834 super(con, options, status); 1835 } 1836 1837 @Override 1838 protected Pair<byte[], byte[]> getStartAndStopRow() { 1839 return generateStartAndStopRows(100); 1840 } 1841 } 1842 1843 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1844 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1845 super(con, options, status); 1846 } 1847 1848 @Override 1849 protected Pair<byte[], byte[]> getStartAndStopRow() { 1850 return generateStartAndStopRows(1000); 1851 } 1852 } 1853 1854 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1855 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1856 super(con, options, status); 1857 } 1858 1859 @Override 1860 protected Pair<byte[], byte[]> getStartAndStopRow() { 1861 return generateStartAndStopRows(10000); 1862 } 1863 } 1864 1865 static class RandomReadTest extends TableTest { 1866 private final Consistency consistency; 1867 private ArrayList<Get> gets; 1868 private Random rd = new Random(); 1869 1870 RandomReadTest(Connection con, TestOptions options, Status status) { 1871 super(con, options, status); 1872 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1873 if (opts.multiGet > 0) { 1874 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1875 this.gets = new ArrayList<>(opts.multiGet); 1876 } 1877 } 1878 1879 @Override 1880 boolean testRow(final int i) throws IOException, InterruptedException { 1881 if (opts.randomSleep > 0) { 1882 Thread.sleep(rd.nextInt(opts.randomSleep)); 1883 } 1884 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1885 for (int family = 0; family < opts.families; family++) { 1886 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1887 if (opts.addColumns) { 1888 for (int column = 0; column < opts.columns; column++) { 1889 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1890 get.addColumn(familyName, qualifier); 1891 } 1892 } else { 1893 get.addFamily(familyName); 1894 } 1895 } 1896 if (opts.filterAll) { 1897 get.setFilter(new FilterAllFilter()); 1898 } 1899 get.setConsistency(consistency); 1900 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1901 if (opts.multiGet > 0) { 1902 this.gets.add(get); 1903 if (this.gets.size() == opts.multiGet) { 1904 Result [] rs = this.table.get(this.gets); 1905 updateValueSize(rs); 1906 this.gets.clear(); 1907 } else { 1908 return false; 1909 } 1910 } else { 1911 updateValueSize(this.table.get(get)); 1912 } 1913 return true; 1914 } 1915 1916 @Override 1917 protected int getReportingPeriod() { 1918 int period = opts.perClientRunRows / 10; 1919 return period == 0 ? opts.perClientRunRows : period; 1920 } 1921 1922 @Override 1923 protected void testTakedown() throws IOException { 1924 if (this.gets != null && this.gets.size() > 0) { 1925 this.table.get(gets); 1926 this.gets.clear(); 1927 } 1928 super.testTakedown(); 1929 } 1930 } 1931 1932 static class RandomWriteTest extends SequentialWriteTest { 1933 RandomWriteTest(Connection con, TestOptions options, Status status) { 1934 super(con, options, status); 1935 } 1936 1937 @Override 1938 protected byte[] generateRow(final int i) { 1939 return getRandomRow(this.rand, opts.totalRows); 1940 } 1941 1942 1943 } 1944 1945 static class ScanTest extends TableTest { 1946 private ResultScanner testScanner; 1947 1948 ScanTest(Connection con, TestOptions options, Status status) { 1949 super(con, options, status); 1950 } 1951 1952 @Override 1953 void testTakedown() throws IOException { 1954 if (this.testScanner != null) { 1955 this.testScanner.close(); 1956 } 1957 super.testTakedown(); 1958 } 1959 1960 1961 @Override 1962 boolean testRow(final int i) throws IOException { 1963 if (this.testScanner == null) { 1964 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1965 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1966 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1967 for (int family = 0; family < opts.families; family++) { 1968 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1969 if (opts.addColumns) { 1970 for (int column = 0; column < opts.columns; column++) { 1971 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1972 scan.addColumn(familyName, qualifier); 1973 } 1974 } else { 1975 scan.addFamily(familyName); 1976 } 1977 } 1978 if (opts.filterAll) { 1979 scan.setFilter(new FilterAllFilter()); 1980 } 1981 this.testScanner = table.getScanner(scan); 1982 } 1983 Result r = testScanner.next(); 1984 updateValueSize(r); 1985 return true; 1986 } 1987 } 1988 1989 /** 1990 * Base class for operations that are CAS-like; that read a value and then set it based off what 1991 * they read. In this category is increment, append, checkAndPut, etc. 1992 * 1993 * <p>These operations also want some concurrency going on. Usually when these tests run, they 1994 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 1995 * same key space. We do this with our getStartRow and getLastRow overrides. 1996 */ 1997 static abstract class CASTableTest extends TableTest { 1998 private final byte [] qualifier; 1999 CASTableTest(Connection con, TestOptions options, Status status) { 2000 super(con, options, status); 2001 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2002 } 2003 2004 byte [] getQualifier() { 2005 return this.qualifier; 2006 } 2007 2008 @Override 2009 int getStartRow() { 2010 return 0; 2011 } 2012 2013 @Override 2014 int getLastRow() { 2015 return opts.perClientRunRows; 2016 } 2017 } 2018 2019 static class IncrementTest extends CASTableTest { 2020 IncrementTest(Connection con, TestOptions options, Status status) { 2021 super(con, options, status); 2022 } 2023 2024 @Override 2025 boolean testRow(final int i) throws IOException { 2026 Increment increment = new Increment(format(i)); 2027 // unlike checkAndXXX tests, which make most sense to do on a single value, 2028 // if multiple families are specified for an increment test we assume it is 2029 // meant to raise the work factor 2030 for (int family = 0; family < opts.families; family++) { 2031 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2032 increment.addColumn(familyName, getQualifier(), 1l); 2033 } 2034 updateValueSize(this.table.increment(increment)); 2035 return true; 2036 } 2037 } 2038 2039 static class AppendTest extends CASTableTest { 2040 AppendTest(Connection con, TestOptions options, Status status) { 2041 super(con, options, status); 2042 } 2043 2044 @Override 2045 boolean testRow(final int i) throws IOException { 2046 byte [] bytes = format(i); 2047 Append append = new Append(bytes); 2048 // unlike checkAndXXX tests, which make most sense to do on a single value, 2049 // if multiple families are specified for an append test we assume it is 2050 // meant to raise the work factor 2051 for (int family = 0; family < opts.families; family++) { 2052 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2053 append.addColumn(familyName, getQualifier(), bytes); 2054 } 2055 updateValueSize(this.table.append(append)); 2056 return true; 2057 } 2058 } 2059 2060 static class CheckAndMutateTest extends CASTableTest { 2061 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2062 super(con, options, status); 2063 } 2064 2065 @Override 2066 boolean testRow(final int i) throws IOException { 2067 final byte [] bytes = format(i); 2068 // checkAndXXX tests operate on only a single value 2069 // Put a known value so when we go to check it, it is there. 2070 Put put = new Put(bytes); 2071 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2072 this.table.put(put); 2073 RowMutations mutations = new RowMutations(bytes); 2074 mutations.add(put); 2075 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2076 .ifEquals(bytes).thenMutate(mutations); 2077 return true; 2078 } 2079 } 2080 2081 static class CheckAndPutTest extends CASTableTest { 2082 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2083 super(con, options, status); 2084 } 2085 2086 @Override 2087 boolean testRow(final int i) throws IOException { 2088 final byte [] bytes = format(i); 2089 // checkAndXXX tests operate on only a single value 2090 // Put a known value so when we go to check it, it is there. 2091 Put put = new Put(bytes); 2092 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2093 this.table.put(put); 2094 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2095 .ifEquals(bytes).thenPut(put); 2096 return true; 2097 } 2098 } 2099 2100 static class CheckAndDeleteTest extends CASTableTest { 2101 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2102 super(con, options, status); 2103 } 2104 2105 @Override 2106 boolean testRow(final int i) throws IOException { 2107 final byte [] bytes = format(i); 2108 // checkAndXXX tests operate on only a single value 2109 // Put a known value so when we go to check it, it is there. 2110 Put put = new Put(bytes); 2111 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2112 this.table.put(put); 2113 Delete delete = new Delete(put.getRow()); 2114 delete.addColumn(FAMILY_ZERO, getQualifier()); 2115 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2116 .ifEquals(bytes).thenDelete(delete); 2117 return true; 2118 } 2119 } 2120 2121 static class SequentialReadTest extends TableTest { 2122 SequentialReadTest(Connection con, TestOptions options, Status status) { 2123 super(con, options, status); 2124 } 2125 2126 @Override 2127 boolean testRow(final int i) throws IOException { 2128 Get get = new Get(format(i)); 2129 for (int family = 0; family < opts.families; family++) { 2130 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2131 if (opts.addColumns) { 2132 for (int column = 0; column < opts.columns; column++) { 2133 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2134 get.addColumn(familyName, qualifier); 2135 } 2136 } else { 2137 get.addFamily(familyName); 2138 } 2139 } 2140 if (opts.filterAll) { 2141 get.setFilter(new FilterAllFilter()); 2142 } 2143 updateValueSize(table.get(get)); 2144 return true; 2145 } 2146 } 2147 2148 static class SequentialWriteTest extends BufferedMutatorTest { 2149 private ArrayList<Put> puts; 2150 2151 2152 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2153 super(con, options, status); 2154 if (opts.multiPut > 0) { 2155 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2156 this.puts = new ArrayList<>(opts.multiPut); 2157 } 2158 } 2159 2160 protected byte[] generateRow(final int i) { 2161 return format(i); 2162 } 2163 2164 @Override 2165 boolean testRow(final int i) throws IOException { 2166 byte[] row = generateRow(i); 2167 Put put = new Put(row); 2168 for (int family = 0; family < opts.families; family++) { 2169 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2170 for (int column = 0; column < opts.columns; column++) { 2171 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2172 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2173 if (opts.useTags) { 2174 byte[] tag = generateData(this.rand, TAG_LENGTH); 2175 Tag[] tags = new Tag[opts.noOfTags]; 2176 for (int n = 0; n < opts.noOfTags; n++) { 2177 Tag t = new ArrayBackedTag((byte) n, tag); 2178 tags[n] = t; 2179 } 2180 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 2181 value, tags); 2182 put.add(kv); 2183 updateValueSize(kv.getValueLength()); 2184 } else { 2185 put.addColumn(familyName, qualifier, value); 2186 updateValueSize(value.length); 2187 } 2188 } 2189 } 2190 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2191 if (opts.autoFlush) { 2192 if (opts.multiPut > 0) { 2193 this.puts.add(put); 2194 if (this.puts.size() == opts.multiPut) { 2195 table.put(this.puts); 2196 this.puts.clear(); 2197 } else { 2198 return false; 2199 } 2200 } else { 2201 table.put(put); 2202 } 2203 } else { 2204 mutator.mutate(put); 2205 } 2206 return true; 2207 } 2208 } 2209 2210 static class FilteredScanTest extends TableTest { 2211 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2212 2213 FilteredScanTest(Connection con, TestOptions options, Status status) { 2214 super(con, options, status); 2215 } 2216 2217 @Override 2218 boolean testRow(int i) throws IOException { 2219 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2220 Scan scan = constructScan(value); 2221 ResultScanner scanner = null; 2222 try { 2223 scanner = this.table.getScanner(scan); 2224 for (Result r = null; (r = scanner.next()) != null;) { 2225 updateValueSize(r); 2226 } 2227 } finally { 2228 if (scanner != null) { 2229 updateScanMetrics(scanner.getScanMetrics()); 2230 scanner.close(); 2231 } 2232 } 2233 return true; 2234 } 2235 2236 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2237 FilterList list = new FilterList(); 2238 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, 2239 CompareOperator.EQUAL, new BinaryComparator(valuePrefix)); 2240 list.addFilter(filter); 2241 if (opts.filterAll) { 2242 list.addFilter(new FilterAllFilter()); 2243 } 2244 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2245 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2246 .setScanMetricsEnabled(true); 2247 if (opts.addColumns) { 2248 for (int column = 0; column < opts.columns; column++) { 2249 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2250 scan.addColumn(FAMILY_ZERO, qualifier); 2251 } 2252 } else { 2253 scan.addFamily(FAMILY_ZERO); 2254 } 2255 scan.setFilter(list); 2256 return scan; 2257 } 2258 } 2259 2260 /** 2261 * Compute a throughput rate in MB/s. 2262 * @param rows Number of records consumed. 2263 * @param timeMs Time taken in milliseconds. 2264 * @return String value with label, ie '123.76 MB/s' 2265 */ 2266 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) { 2267 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH + 2268 ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families); 2269 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2270 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) 2271 .divide(BYTES_PER_MB, CXT); 2272 return FMT.format(mbps) + " MB/s"; 2273 } 2274 2275 /* 2276 * Format passed integer. 2277 * @param number 2278 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed 2279 * number (Does absolute in case number is negative). 2280 */ 2281 public static byte [] format(final int number) { 2282 byte [] b = new byte[ROW_LENGTH]; 2283 int d = Math.abs(number); 2284 for (int i = b.length - 1; i >= 0; i--) { 2285 b[i] = (byte)((d % 10) + '0'); 2286 d /= 10; 2287 } 2288 return b; 2289 } 2290 2291 /* 2292 * This method takes some time and is done inline uploading data. For 2293 * example, doing the mapfile test, generation of the key and value 2294 * consumes about 30% of CPU time. 2295 * @return Generated random value to insert into a table cell. 2296 */ 2297 public static byte[] generateData(final Random r, int length) { 2298 byte [] b = new byte [length]; 2299 int i; 2300 2301 for(i = 0; i < (length-8); i += 8) { 2302 b[i] = (byte) (65 + r.nextInt(26)); 2303 b[i+1] = b[i]; 2304 b[i+2] = b[i]; 2305 b[i+3] = b[i]; 2306 b[i+4] = b[i]; 2307 b[i+5] = b[i]; 2308 b[i+6] = b[i]; 2309 b[i+7] = b[i]; 2310 } 2311 2312 byte a = (byte) (65 + r.nextInt(26)); 2313 for(; i < length; i++) { 2314 b[i] = a; 2315 } 2316 return b; 2317 } 2318 2319 static byte [] getRandomRow(final Random random, final int totalRows) { 2320 return format(generateRandomRow(random, totalRows)); 2321 } 2322 2323 static int generateRandomRow(final Random random, final int totalRows) { 2324 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2325 } 2326 2327 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2328 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2329 throws IOException, InterruptedException { 2330 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " 2331 + opts.perClientRunRows + " rows"); 2332 long totalElapsedTime; 2333 2334 final TestBase t; 2335 try { 2336 if (AsyncTest.class.isAssignableFrom(cmd)) { 2337 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2338 Constructor<? extends AsyncTest> constructor = 2339 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2340 t = constructor.newInstance(asyncCon, opts, status); 2341 } else { 2342 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2343 Constructor<? extends Test> constructor = 2344 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2345 t = constructor.newInstance(con, opts, status); 2346 } 2347 } catch (NoSuchMethodException e) { 2348 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2349 + ". It does not provide a constructor as described by " 2350 + "the javadoc comment. Available constructors are: " 2351 + Arrays.toString(cmd.getConstructors())); 2352 } catch (Exception e) { 2353 throw new IllegalStateException("Failed to construct command class", e); 2354 } 2355 totalElapsedTime = t.test(); 2356 2357 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 2358 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + 2359 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2360 getAverageValueLength(opts), opts.families, opts.columns) + ")"); 2361 2362 return new RunResult(totalElapsedTime, t.getLatencyHistogram()); 2363 } 2364 2365 private static int getAverageValueLength(final TestOptions opts) { 2366 return opts.valueRandom? opts.valueSize/2: opts.valueSize; 2367 } 2368 2369 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException, 2370 InterruptedException, ClassNotFoundException, ExecutionException { 2371 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2372 // the TestOptions introspection for us and dump the output in a readable format. 2373 LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts)); 2374 Admin admin = null; 2375 Connection connection = null; 2376 try { 2377 connection = ConnectionFactory.createConnection(getConf()); 2378 admin = connection.getAdmin(); 2379 checkTable(admin, opts); 2380 } finally { 2381 if (admin != null) admin.close(); 2382 if (connection != null) connection.close(); 2383 } 2384 if (opts.nomapred) { 2385 doLocalClients(opts, getConf()); 2386 } else { 2387 doMapReduce(opts, getConf()); 2388 } 2389 } 2390 2391 protected void printUsage() { 2392 printUsage(PE_COMMAND_SHORTNAME, null); 2393 } 2394 2395 protected static void printUsage(final String message) { 2396 printUsage(PE_COMMAND_SHORTNAME, message); 2397 } 2398 2399 protected static void printUsageAndExit(final String message, final int exitCode) { 2400 printUsage(message); 2401 System.exit(exitCode); 2402 } 2403 2404 protected static void printUsage(final String shortName, final String message) { 2405 if (message != null && message.length() > 0) { 2406 System.err.println(message); 2407 } 2408 System.err.print("Usage: hbase " + shortName); 2409 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2410 System.err.println(); 2411 System.err.println("General Options:"); 2412 System.err.println(" nomapred Run multiple clients using threads " + 2413 "(rather than use mapreduce)"); 2414 System.err.println(" oneCon all the threads share the same connection. Default: False"); 2415 System.err.println(" connCount connections all threads share. " 2416 + "For example, if set to 2, then all thread share 2 connection. " 2417 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2418 + "if not, connCount=thread number"); 2419 2420 System.err.println(" sampleRate Execute test on a sample of total " + 2421 "rows. Only supported by randomRead. Default: 1.0"); 2422 System.err.println(" period Report every 'period' rows: " + 2423 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10); 2424 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2425 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + 2426 "Default: 0"); 2427 System.err.println(" latency Set to report operation latencies. Default: False"); 2428 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" + 2429 " rows have been treated. Default: 0"); 2430 System.err.println(" valueSize Pass value size to use: Default: " 2431 + DEFAULT_OPTS.getValueSize()); 2432 System.err.println(" valueRandom Set if we should vary value size between 0 and " + 2433 "'valueSize'; set on read for stats on size: Default: Not set."); 2434 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2435 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2436 System.err.println(); 2437 System.err.println("Table Creation / Write Tests:"); 2438 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2439 System.err.println(" rows Rows each client runs. Default: " 2440 + DEFAULT_OPTS.getPerClientRunRows() 2441 + ". In case of randomReads and randomSeekScans this could" 2442 + " be specified along with --size to specify the number of rows to be scanned within" 2443 + " the total range specified by the size."); 2444 System.err.println( 2445 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2446 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2447 + " use size to specify the end range and --rows" 2448 + " specifies the number of rows within that range. " + "Default: 1.0."); 2449 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2450 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 2451 "Default: false"); 2452 System.err.println(" valueZipf Set if we should vary value size between 0 and " + 2453 "'valueSize' in zipf form: Default: Not set."); 2454 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2455 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2456 System.err.println(" multiPut Batch puts together into groups of N. Only supported " + 2457 "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2458 System.err.println(" presplit Create presplit table. If a table with same name exists," 2459 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2460 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2461 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 2462 "Default: false"); 2463 System.err.println(" numoftags Specify the no of tags that would be needed. " + 2464 "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2465 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2466 System.err.println(" columns Columns to write per row. Default: 1"); 2467 System.err.println(" families Specify number of column families for the table. Default: 1"); 2468 System.err.println(); 2469 System.err.println("Read Tests:"); 2470 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2471 + " there by not returning any thing back to the client. Helps to check the server side" 2472 + " performance. Uses FilterAllFilter internally. "); 2473 System.err.println(" multiGet Batch gets together into groups of N. Only supported " + 2474 "by randomRead. Default: disabled"); 2475 System.err.println(" inmemory Tries to keep the HFiles of the CF " + 2476 "inmemory as far as possible. Not guaranteed that reads are always served " + 2477 "from memory. Default: false"); 2478 System.err.println(" bloomFilter Bloom filter type, one of " 2479 + Arrays.toString(BloomType.values())); 2480 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2481 System.err.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2482 + "Uses the CompactingMemstore"); 2483 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2484 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2485 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2486 System.err.println(" caching Scan caching to use. Default: 30"); 2487 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2488 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2489 System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2490 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2491 System.err.println(); 2492 System.err.println(" Note: -D properties will be applied to the conf used. "); 2493 System.err.println(" For example: "); 2494 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2495 System.err.println(" -Dmapreduce.task.timeout=60000"); 2496 System.err.println(); 2497 System.err.println("Command:"); 2498 for (CmdDescriptor command : COMMANDS.values()) { 2499 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2500 } 2501 System.err.println(); 2502 System.err.println("Args:"); 2503 System.err.println(" nclients Integer. Required. Total number of clients " 2504 + "(and HRegionServers) running. 1 <= value <= 500"); 2505 System.err.println("Examples:"); 2506 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2507 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2508 System.err.println(" To run 10 clients doing increments over ten rows:"); 2509 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2510 } 2511 2512 /** 2513 * Parse options passed in via an arguments array. Assumes that array has been split 2514 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain 2515 * in the queue at the conclusion of this method call. It's up to the caller to deal 2516 * with these unrecognized arguments. 2517 */ 2518 static TestOptions parseOpts(Queue<String> args) { 2519 TestOptions opts = new TestOptions(); 2520 2521 String cmd = null; 2522 while ((cmd = args.poll()) != null) { 2523 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2524 // place item back onto queue so that caller knows parsing was incomplete 2525 args.add(cmd); 2526 break; 2527 } 2528 2529 final String nmr = "--nomapred"; 2530 if (cmd.startsWith(nmr)) { 2531 opts.nomapred = true; 2532 continue; 2533 } 2534 2535 final String rows = "--rows="; 2536 if (cmd.startsWith(rows)) { 2537 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2538 continue; 2539 } 2540 2541 final String cycles = "--cycles="; 2542 if (cmd.startsWith(cycles)) { 2543 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2544 continue; 2545 } 2546 2547 final String sampleRate = "--sampleRate="; 2548 if (cmd.startsWith(sampleRate)) { 2549 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2550 continue; 2551 } 2552 2553 final String table = "--table="; 2554 if (cmd.startsWith(table)) { 2555 opts.tableName = cmd.substring(table.length()); 2556 continue; 2557 } 2558 2559 final String startRow = "--startRow="; 2560 if (cmd.startsWith(startRow)) { 2561 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2562 continue; 2563 } 2564 2565 final String compress = "--compress="; 2566 if (cmd.startsWith(compress)) { 2567 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2568 continue; 2569 } 2570 2571 final String traceRate = "--traceRate="; 2572 if (cmd.startsWith(traceRate)) { 2573 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2574 continue; 2575 } 2576 2577 final String blockEncoding = "--blockEncoding="; 2578 if (cmd.startsWith(blockEncoding)) { 2579 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2580 continue; 2581 } 2582 2583 final String flushCommits = "--flushCommits="; 2584 if (cmd.startsWith(flushCommits)) { 2585 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2586 continue; 2587 } 2588 2589 final String writeToWAL = "--writeToWAL="; 2590 if (cmd.startsWith(writeToWAL)) { 2591 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2592 continue; 2593 } 2594 2595 final String presplit = "--presplit="; 2596 if (cmd.startsWith(presplit)) { 2597 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2598 continue; 2599 } 2600 2601 final String inMemory = "--inmemory="; 2602 if (cmd.startsWith(inMemory)) { 2603 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2604 continue; 2605 } 2606 2607 final String autoFlush = "--autoFlush="; 2608 if (cmd.startsWith(autoFlush)) { 2609 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2610 if (!opts.autoFlush && opts.multiPut > 0) { 2611 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 2612 } 2613 continue; 2614 } 2615 2616 final String onceCon = "--oneCon="; 2617 if (cmd.startsWith(onceCon)) { 2618 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2619 if (opts.oneCon && opts.connCount > 1) { 2620 throw new IllegalArgumentException("oneCon is set to true, " 2621 + "connCount should not bigger than 1"); 2622 } 2623 continue; 2624 } 2625 2626 final String connCount = "--connCount="; 2627 if (cmd.startsWith(connCount)) { 2628 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2629 if (opts.oneCon && opts.connCount > 1) { 2630 throw new IllegalArgumentException("oneCon is set to true, " 2631 + "connCount should not bigger than 1"); 2632 } 2633 continue; 2634 } 2635 2636 final String latency = "--latency"; 2637 if (cmd.startsWith(latency)) { 2638 opts.reportLatency = true; 2639 continue; 2640 } 2641 2642 final String multiGet = "--multiGet="; 2643 if (cmd.startsWith(multiGet)) { 2644 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2645 continue; 2646 } 2647 2648 final String multiPut = "--multiPut="; 2649 if (cmd.startsWith(multiPut)) { 2650 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2651 if (!opts.autoFlush && opts.multiPut > 0) { 2652 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 2653 } 2654 continue; 2655 } 2656 2657 final String useTags = "--usetags="; 2658 if (cmd.startsWith(useTags)) { 2659 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2660 continue; 2661 } 2662 2663 final String noOfTags = "--numoftags="; 2664 if (cmd.startsWith(noOfTags)) { 2665 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2666 continue; 2667 } 2668 2669 final String replicas = "--replicas="; 2670 if (cmd.startsWith(replicas)) { 2671 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2672 continue; 2673 } 2674 2675 final String filterOutAll = "--filterAll"; 2676 if (cmd.startsWith(filterOutAll)) { 2677 opts.filterAll = true; 2678 continue; 2679 } 2680 2681 final String size = "--size="; 2682 if (cmd.startsWith(size)) { 2683 opts.size = Float.parseFloat(cmd.substring(size.length())); 2684 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2685 continue; 2686 } 2687 2688 final String splitPolicy = "--splitPolicy="; 2689 if (cmd.startsWith(splitPolicy)) { 2690 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2691 continue; 2692 } 2693 2694 final String randomSleep = "--randomSleep="; 2695 if (cmd.startsWith(randomSleep)) { 2696 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2697 continue; 2698 } 2699 2700 final String measureAfter = "--measureAfter="; 2701 if (cmd.startsWith(measureAfter)) { 2702 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2703 continue; 2704 } 2705 2706 final String bloomFilter = "--bloomFilter="; 2707 if (cmd.startsWith(bloomFilter)) { 2708 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2709 continue; 2710 } 2711 2712 final String blockSize = "--blockSize="; 2713 if(cmd.startsWith(blockSize) ) { 2714 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2715 continue; 2716 } 2717 2718 final String valueSize = "--valueSize="; 2719 if (cmd.startsWith(valueSize)) { 2720 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2721 continue; 2722 } 2723 2724 final String valueRandom = "--valueRandom"; 2725 if (cmd.startsWith(valueRandom)) { 2726 opts.valueRandom = true; 2727 if (opts.valueZipf) { 2728 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2729 } 2730 continue; 2731 } 2732 2733 final String valueZipf = "--valueZipf"; 2734 if (cmd.startsWith(valueZipf)) { 2735 opts.valueZipf = true; 2736 if (opts.valueRandom) { 2737 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2738 } 2739 continue; 2740 } 2741 2742 final String period = "--period="; 2743 if (cmd.startsWith(period)) { 2744 opts.period = Integer.parseInt(cmd.substring(period.length())); 2745 continue; 2746 } 2747 2748 final String addColumns = "--addColumns="; 2749 if (cmd.startsWith(addColumns)) { 2750 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2751 continue; 2752 } 2753 2754 final String inMemoryCompaction = "--inmemoryCompaction="; 2755 if (cmd.startsWith(inMemoryCompaction)) { 2756 opts.inMemoryCompaction = 2757 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2758 continue; 2759 } 2760 2761 final String columns = "--columns="; 2762 if (cmd.startsWith(columns)) { 2763 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2764 continue; 2765 } 2766 2767 final String families = "--families="; 2768 if (cmd.startsWith(families)) { 2769 opts.families = Integer.parseInt(cmd.substring(families.length())); 2770 continue; 2771 } 2772 2773 final String caching = "--caching="; 2774 if (cmd.startsWith(caching)) { 2775 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2776 continue; 2777 } 2778 2779 final String asyncPrefetch = "--asyncPrefetch"; 2780 if (cmd.startsWith(asyncPrefetch)) { 2781 opts.asyncPrefetch = true; 2782 continue; 2783 } 2784 2785 final String cacheBlocks = "--cacheBlocks="; 2786 if (cmd.startsWith(cacheBlocks)) { 2787 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2788 continue; 2789 } 2790 2791 final String scanReadType = "--scanReadType="; 2792 if (cmd.startsWith(scanReadType)) { 2793 opts.scanReadType = 2794 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2795 continue; 2796 } 2797 2798 final String bufferSize = "--bufferSize="; 2799 if (cmd.startsWith(bufferSize)) { 2800 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2801 continue; 2802 } 2803 2804 if (isCommandClass(cmd)) { 2805 opts.cmdName = cmd; 2806 try { 2807 opts.numClientThreads = Integer.parseInt(args.remove()); 2808 } catch (NoSuchElementException | NumberFormatException e) { 2809 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2810 } 2811 opts = calculateRowsAndSize(opts); 2812 break; 2813 } else { 2814 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2815 } 2816 2817 // Not matching any option or command. 2818 System.err.println("Error: Wrong option or command: " + cmd); 2819 args.add(cmd); 2820 break; 2821 } 2822 return opts; 2823 } 2824 2825 static TestOptions calculateRowsAndSize(final TestOptions opts) { 2826 int rowsPerGB = getRowsPerGB(opts); 2827 if ((opts.getCmdName() != null 2828 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 2829 && opts.size != DEFAULT_OPTS.size 2830 && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { 2831 opts.totalRows = (int) opts.size * rowsPerGB; 2832 } else if (opts.size != DEFAULT_OPTS.size) { 2833 // total size in GB specified 2834 opts.totalRows = (int) opts.size * rowsPerGB; 2835 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 2836 } else { 2837 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 2838 opts.size = opts.totalRows / rowsPerGB; 2839 } 2840 return opts; 2841 } 2842 2843 static int getRowsPerGB(final TestOptions opts) { 2844 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() * 2845 opts.getColumns()); 2846 } 2847 2848 @Override 2849 public int run(String[] args) throws Exception { 2850 // Process command-line args. TODO: Better cmd-line processing 2851 // (but hopefully something not as painful as cli options). 2852 int errCode = -1; 2853 if (args.length < 1) { 2854 printUsage(); 2855 return errCode; 2856 } 2857 2858 try { 2859 LinkedList<String> argv = new LinkedList<>(); 2860 argv.addAll(Arrays.asList(args)); 2861 TestOptions opts = parseOpts(argv); 2862 2863 // args remaining, print help and exit 2864 if (!argv.isEmpty()) { 2865 errCode = 0; 2866 printUsage(); 2867 return errCode; 2868 } 2869 2870 // must run at least 1 client 2871 if (opts.numClientThreads <= 0) { 2872 throw new IllegalArgumentException("Number of clients must be > 0"); 2873 } 2874 2875 // cmdName should not be null, print help and exit 2876 if (opts.cmdName == null) { 2877 printUsage(); 2878 return errCode; 2879 } 2880 2881 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 2882 if (cmdClass != null) { 2883 runTest(cmdClass, opts); 2884 errCode = 0; 2885 } 2886 2887 } catch (Exception e) { 2888 e.printStackTrace(); 2889 } 2890 2891 return errCode; 2892 } 2893 2894 private static boolean isCommandClass(String cmd) { 2895 return COMMANDS.containsKey(cmd); 2896 } 2897 2898 private static Class<? extends TestBase> determineCommandClass(String cmd) { 2899 CmdDescriptor descriptor = COMMANDS.get(cmd); 2900 return descriptor != null ? descriptor.getCmdClass() : null; 2901 } 2902 2903 public static void main(final String[] args) throws Exception { 2904 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 2905 System.exit(res); 2906 } 2907}