001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import com.codahale.metrics.Histogram; 022import com.codahale.metrics.UniformReservoir; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.lang.reflect.Constructor; 026import java.math.BigDecimal; 027import java.math.MathContext; 028import java.text.DecimalFormat; 029import java.text.SimpleDateFormat; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Date; 033import java.util.LinkedList; 034import java.util.Locale; 035import java.util.Map; 036import java.util.NoSuchElementException; 037import java.util.Queue; 038import java.util.Random; 039import java.util.TreeMap; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Executors; 044import java.util.concurrent.Future; 045import org.apache.commons.lang3.StringUtils; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.conf.Configured; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.client.Admin; 051import org.apache.hadoop.hbase.client.Append; 052import org.apache.hadoop.hbase.client.AsyncConnection; 053import org.apache.hadoop.hbase.client.AsyncTable; 054import org.apache.hadoop.hbase.client.BufferedMutator; 055import org.apache.hadoop.hbase.client.BufferedMutatorParams; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Consistency; 059import org.apache.hadoop.hbase.client.Delete; 060import org.apache.hadoop.hbase.client.Durability; 061import org.apache.hadoop.hbase.client.Get; 062import org.apache.hadoop.hbase.client.Increment; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.RowMutations; 067import org.apache.hadoop.hbase.client.Scan; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 070import org.apache.hadoop.hbase.filter.BinaryComparator; 071import org.apache.hadoop.hbase.filter.Filter; 072import org.apache.hadoop.hbase.filter.FilterAllFilter; 073import org.apache.hadoop.hbase.filter.FilterList; 074import org.apache.hadoop.hbase.filter.PageFilter; 075import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 076import org.apache.hadoop.hbase.filter.WhileMatchFilter; 077import org.apache.hadoop.hbase.io.compress.Compression; 078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 079import org.apache.hadoop.hbase.io.hfile.RandomDistribution; 080import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 081import org.apache.hadoop.hbase.regionserver.BloomType; 082import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 083import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; 084import org.apache.hadoop.hbase.trace.SpanReceiverHost; 085import org.apache.hadoop.hbase.trace.TraceUtil; 086import org.apache.hadoop.hbase.util.ByteArrayHashKey; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.GsonUtil; 089import org.apache.hadoop.hbase.util.Hash; 090import org.apache.hadoop.hbase.util.MurmurHash; 091import org.apache.hadoop.hbase.util.Pair; 092import org.apache.hadoop.hbase.util.YammerHistogramUtils; 093import org.apache.hadoop.io.LongWritable; 094import org.apache.hadoop.io.Text; 095import org.apache.hadoop.mapreduce.Job; 096import org.apache.hadoop.mapreduce.Mapper; 097import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 098import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 099import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 100import org.apache.hadoop.util.Tool; 101import org.apache.hadoop.util.ToolRunner; 102import org.apache.htrace.core.ProbabilitySampler; 103import org.apache.htrace.core.Sampler; 104import org.apache.htrace.core.TraceScope; 105import org.apache.yetus.audience.InterfaceAudience; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108 109import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 110import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 111import org.apache.hbase.thirdparty.com.google.gson.Gson; 112 113/** 114 * Script used evaluating HBase performance and scalability. Runs a HBase 115 * client that steps through one of a set of hardcoded tests or 'experiments' 116 * (e.g. a random reads test, a random writes test, etc.). Pass on the 117 * command-line which test to run and how many clients are participating in 118 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 119 * 120 * <p>This class sets up and runs the evaluation programs described in 121 * Section 7, <i>Performance Evaluation</i>, of the <a 122 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 123 * paper, pages 8-10. 124 * 125 * <p>By default, runs as a mapreduce job where each mapper runs a single test 126 * client. Can also run as a non-mapreduce, multithreaded application by 127 * specifying {@code --nomapred}. Each client does about 1GB of data, unless 128 * specified otherwise. 129 */ 130@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 131public class PerformanceEvaluation extends Configured implements Tool { 132 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 133 static final String RANDOM_READ = "randomRead"; 134 static final String PE_COMMAND_SHORTNAME = "pe"; 135 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 136 private static final Gson GSON = GsonUtil.createGson().create(); 137 138 public static final String TABLE_NAME = "TestTable"; 139 public static final String FAMILY_NAME_BASE = "info"; 140 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 141 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 142 public static final int DEFAULT_VALUE_LENGTH = 1000; 143 public static final int ROW_LENGTH = 26; 144 145 private static final int ONE_GB = 1024 * 1024 * 1000; 146 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 147 // TODO : should we make this configurable 148 private static final int TAG_LENGTH = 256; 149 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 150 private static final MathContext CXT = MathContext.DECIMAL64; 151 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 152 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 153 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 154 155 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 156 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 157 158 static { 159 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 160 "Run async random read test"); 161 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 162 "Run async random write test"); 163 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 164 "Run async sequential read test"); 165 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 166 "Run async sequential write test"); 167 addCommandDescriptor(AsyncScanTest.class, "asyncScan", 168 "Run async scan test (read every row)"); 169 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, 170 "Run random read test"); 171 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 172 "Run random seek and scan 100 test"); 173 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 174 "Run random seek scan with both start and stop row (max 10 rows)"); 175 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 176 "Run random seek scan with both start and stop row (max 100 rows)"); 177 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 178 "Run random seek scan with both start and stop row (max 1000 rows)"); 179 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 180 "Run random seek scan with both start and stop row (max 10000 rows)"); 181 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 182 "Run random write test"); 183 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 184 "Run sequential read test"); 185 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 186 "Run sequential write test"); 187 addCommandDescriptor(ScanTest.class, "scan", 188 "Run scan test (read every row)"); 189 addCommandDescriptor(FilteredScanTest.class, "filterScan", 190 "Run scan test using a filter to find a specific row based on it's value " + 191 "(make sure to use --rows=20)"); 192 addCommandDescriptor(IncrementTest.class, "increment", 193 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 194 addCommandDescriptor(AppendTest.class, "append", 195 "Append on each row; clients overlap on keyspace so some concurrent operations"); 196 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 197 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 198 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 199 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 200 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 201 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 202 } 203 204 /** 205 * Enum for map metrics. Keep it out here rather than inside in the Map 206 * inner-class so we can find associated properties. 207 */ 208 protected static enum Counter { 209 /** elapsed time */ 210 ELAPSED_TIME, 211 /** number of rows */ 212 ROWS 213 } 214 215 protected static class RunResult implements Comparable<RunResult> { 216 public RunResult(long duration, Histogram hist) { 217 this.duration = duration; 218 this.hist = hist; 219 } 220 221 public final long duration; 222 public final Histogram hist; 223 224 @Override 225 public String toString() { 226 return Long.toString(duration); 227 } 228 229 @Override public int compareTo(RunResult o) { 230 return Long.compare(this.duration, o.duration); 231 } 232 } 233 234 /** 235 * Constructor 236 * @param conf Configuration object 237 */ 238 public PerformanceEvaluation(final Configuration conf) { 239 super(conf); 240 } 241 242 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, 243 String name, String description) { 244 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 245 COMMANDS.put(name, cmdDescriptor); 246 } 247 248 /** 249 * Implementations can have their status set. 250 */ 251 interface Status { 252 /** 253 * Sets status 254 * @param msg status message 255 * @throws IOException 256 */ 257 void setStatus(final String msg) throws IOException; 258 } 259 260 /** 261 * MapReduce job that runs a performance evaluation client in each map task. 262 */ 263 public static class EvaluationMapTask 264 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 265 266 /** configuration parameter name that contains the command */ 267 public final static String CMD_KEY = "EvaluationMapTask.command"; 268 /** configuration parameter name that contains the PE impl */ 269 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 270 271 private Class<? extends Test> cmd; 272 273 @Override 274 protected void setup(Context context) throws IOException, InterruptedException { 275 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 276 277 // this is required so that extensions of PE are instantiated within the 278 // map reduce task... 279 Class<? extends PerformanceEvaluation> peClass = 280 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 281 try { 282 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 283 } catch (Exception e) { 284 throw new IllegalStateException("Could not instantiate PE instance", e); 285 } 286 } 287 288 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 289 try { 290 return Class.forName(className).asSubclass(type); 291 } catch (ClassNotFoundException e) { 292 throw new IllegalStateException("Could not find class for name: " + className, e); 293 } 294 } 295 296 @Override 297 protected void map(LongWritable key, Text value, final Context context) 298 throws IOException, InterruptedException { 299 300 Status status = new Status() { 301 @Override 302 public void setStatus(String msg) { 303 context.setStatus(msg); 304 } 305 }; 306 307 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 308 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 309 final Connection con = ConnectionFactory.createConnection(conf); 310 AsyncConnection asyncCon = null; 311 try { 312 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 313 } catch (ExecutionException e) { 314 throw new IOException(e); 315 } 316 317 // Evaluation task 318 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 319 // Collect how much time the thing took. Report as map output and 320 // to the ELAPSED_TIME counter. 321 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 322 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 323 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 324 context.progress(); 325 } 326 } 327 328 /* 329 * If table does not already exist, create. Also create a table when 330 * {@code opts.presplitRegions} is specified or when the existing table's 331 * region replica count doesn't match {@code opts.replicas}. 332 */ 333 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 334 TableName tableName = TableName.valueOf(opts.tableName); 335 boolean needsDelete = false, exists = admin.tableExists(tableName); 336 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 337 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 338 if (!exists && isReadCmd) { 339 throw new IllegalStateException( 340 "Must specify an existing table for read commands. Run a write command first."); 341 } 342 HTableDescriptor desc = 343 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; 344 byte[][] splits = getSplits(opts); 345 346 // recreate the table when user has requested presplit or when existing 347 // {RegionSplitPolicy,replica count} does not match requested, or when the 348 // number of column families does not match requested. 349 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 350 || (!isReadCmd && desc != null && 351 !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 352 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 353 || (desc != null && desc.getColumnFamilyCount() != opts.families)) { 354 needsDelete = true; 355 // wait, why did it delete my table?!? 356 LOG.debug(MoreObjects.toStringHelper("needsDelete") 357 .add("needsDelete", needsDelete) 358 .add("isReadCmd", isReadCmd) 359 .add("exists", exists) 360 .add("desc", desc) 361 .add("presplit", opts.presplitRegions) 362 .add("splitPolicy", opts.splitPolicy) 363 .add("replicas", opts.replicas) 364 .add("families", opts.families) 365 .toString()); 366 } 367 368 // remove an existing table 369 if (needsDelete) { 370 if (admin.isTableEnabled(tableName)) { 371 admin.disableTable(tableName); 372 } 373 admin.deleteTable(tableName); 374 } 375 376 // table creation is necessary 377 if (!exists || needsDelete) { 378 desc = getTableDescriptor(opts); 379 if (splits != null) { 380 if (LOG.isDebugEnabled()) { 381 for (int i = 0; i < splits.length; i++) { 382 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 383 } 384 } 385 } 386 admin.createTable(desc, splits); 387 LOG.info("Table " + desc + " created"); 388 } 389 return admin.tableExists(tableName); 390 } 391 392 /** 393 * Create an HTableDescriptor from provided TestOptions. 394 */ 395 protected static HTableDescriptor getTableDescriptor(TestOptions opts) { 396 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName)); 397 for (int family = 0; family < opts.families; family++) { 398 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 399 HColumnDescriptor familyDesc = new HColumnDescriptor(familyName); 400 familyDesc.setDataBlockEncoding(opts.blockEncoding); 401 familyDesc.setCompressionType(opts.compression); 402 familyDesc.setBloomFilterType(opts.bloomType); 403 familyDesc.setBlocksize(opts.blockSize); 404 if (opts.inMemoryCF) { 405 familyDesc.setInMemory(true); 406 } 407 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction); 408 tableDesc.addFamily(familyDesc); 409 } 410 if (opts.replicas != DEFAULT_OPTS.replicas) { 411 tableDesc.setRegionReplication(opts.replicas); 412 } 413 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 414 tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy); 415 } 416 return tableDesc; 417 } 418 419 /** 420 * generates splits based on total number of rows and specified split regions 421 */ 422 protected static byte[][] getSplits(TestOptions opts) { 423 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) 424 return null; 425 426 int numSplitPoints = opts.presplitRegions - 1; 427 byte[][] splits = new byte[numSplitPoints][]; 428 int jump = opts.totalRows / opts.presplitRegions; 429 for (int i = 0; i < numSplitPoints; i++) { 430 int rowkey = jump * (1 + i); 431 splits[i] = format(rowkey); 432 } 433 return splits; 434 } 435 436 static void setupConnectionCount(final TestOptions opts) { 437 if (opts.oneCon) { 438 opts.connCount = 1; 439 } else { 440 if (opts.connCount == -1) { 441 // set to thread number if connCount is not set 442 opts.connCount = opts.numClientThreads; 443 } 444 } 445 } 446 447 /* 448 * Run all clients in this vm each to its own thread. 449 */ 450 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 451 throws IOException, InterruptedException, ExecutionException { 452 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 453 assert cmd != null; 454 @SuppressWarnings("unchecked") 455 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 456 RunResult[] results = new RunResult[opts.numClientThreads]; 457 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 458 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 459 setupConnectionCount(opts); 460 final Connection[] cons = new Connection[opts.connCount]; 461 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 462 for (int i = 0; i < opts.connCount; i++) { 463 cons[i] = ConnectionFactory.createConnection(conf); 464 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 465 } 466 LOG.info("Created " + opts.connCount + " connections for " + 467 opts.numClientThreads + " threads"); 468 for (int i = 0; i < threads.length; i++) { 469 final int index = i; 470 threads[i] = pool.submit(new Callable<RunResult>() { 471 @Override 472 public RunResult call() throws Exception { 473 TestOptions threadOpts = new TestOptions(opts); 474 final Connection con = cons[index % cons.length]; 475 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 476 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 477 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 478 @Override 479 public void setStatus(final String msg) throws IOException { 480 LOG.info(msg); 481 } 482 }); 483 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration + 484 "ms over " + threadOpts.perClientRunRows + " rows"); 485 return run; 486 } 487 }); 488 } 489 pool.shutdown(); 490 491 for (int i = 0; i < threads.length; i++) { 492 try { 493 results[i] = threads[i].get(); 494 } catch (ExecutionException e) { 495 throw new IOException(e.getCause()); 496 } 497 } 498 final String test = cmd.getSimpleName(); 499 LOG.info("[" + test + "] Summary of timings (ms): " 500 + Arrays.toString(results)); 501 Arrays.sort(results); 502 long total = 0; 503 float avgLatency = 0 ; 504 float avgTPS = 0; 505 for (RunResult result : results) { 506 total += result.duration; 507 avgLatency += result.hist.getSnapshot().getMean(); 508 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 509 } 510 avgTPS *= 1000; // ms to second 511 avgLatency = avgLatency / results.length; 512 LOG.info("[" + test + " duration ]" 513 + "\tMin: " + results[0] + "ms" 514 + "\tMax: " + results[results.length - 1] + "ms" 515 + "\tAvg: " + (total / results.length) + "ms"); 516 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 517 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 518 for (int i = 0; i < opts.connCount; i++) { 519 cons[i].close(); 520 asyncCons[i].close(); 521 } 522 523 524 return results; 525 } 526 527 /* 528 * Run a mapreduce job. Run as many maps as asked-for clients. 529 * Before we start up the job, write out an input file with instruction 530 * per client regards which row they are to start on. 531 * @param cmd Command to run. 532 * @throws IOException 533 */ 534 static Job doMapReduce(TestOptions opts, final Configuration conf) 535 throws IOException, InterruptedException, ClassNotFoundException { 536 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 537 assert cmd != null; 538 Path inputDir = writeInputFile(conf, opts); 539 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 540 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 541 Job job = Job.getInstance(conf); 542 job.setJarByClass(PerformanceEvaluation.class); 543 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 544 545 job.setInputFormatClass(NLineInputFormat.class); 546 NLineInputFormat.setInputPaths(job, inputDir); 547 // this is default, but be explicit about it just in case. 548 NLineInputFormat.setNumLinesPerSplit(job, 1); 549 550 job.setOutputKeyClass(LongWritable.class); 551 job.setOutputValueClass(LongWritable.class); 552 553 job.setMapperClass(EvaluationMapTask.class); 554 job.setReducerClass(LongSumReducer.class); 555 556 job.setNumReduceTasks(1); 557 558 job.setOutputFormatClass(TextOutputFormat.class); 559 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 560 561 TableMapReduceUtil.addDependencyJars(job); 562 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 563 Histogram.class, // yammer metrics 564 Gson.class, // gson 565 FilterAllFilter.class // hbase-server tests jar 566 ); 567 568 TableMapReduceUtil.initCredentials(job); 569 570 job.waitForCompletion(true); 571 return job; 572 } 573 574 /** 575 * Each client has one mapper to do the work, and client do the resulting count in a map task. 576 */ 577 578 static String JOB_INPUT_FILENAME = "input.txt"; 579 580 /* 581 * Write input file of offsets-per-client for the mapreduce job. 582 * @param c Configuration 583 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 584 * @throws IOException 585 */ 586 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 587 return writeInputFile(c, opts, new Path(".")); 588 } 589 590 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 591 throws IOException { 592 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 593 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 594 Path inputDir = new Path(jobdir, "inputs"); 595 596 FileSystem fs = FileSystem.get(c); 597 fs.mkdirs(inputDir); 598 599 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 600 PrintStream out = new PrintStream(fs.create(inputFile)); 601 // Make input random. 602 Map<Integer, String> m = new TreeMap<>(); 603 Hash h = MurmurHash.getInstance(); 604 int perClientRows = (opts.totalRows / opts.numClientThreads); 605 try { 606 for (int j = 0; j < opts.numClientThreads; j++) { 607 TestOptions next = new TestOptions(opts); 608 next.startRow = j * perClientRows; 609 next.perClientRunRows = perClientRows; 610 String s = GSON.toJson(next); 611 LOG.info("Client=" + j + ", input=" + s); 612 byte[] b = Bytes.toBytes(s); 613 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 614 m.put(hash, s); 615 } 616 for (Map.Entry<Integer, String> e: m.entrySet()) { 617 out.println(e.getValue()); 618 } 619 } finally { 620 out.close(); 621 } 622 return inputDir; 623 } 624 625 /** 626 * Describes a command. 627 */ 628 static class CmdDescriptor { 629 private Class<? extends TestBase> cmdClass; 630 private String name; 631 private String description; 632 633 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 634 this.cmdClass = cmdClass; 635 this.name = name; 636 this.description = description; 637 } 638 639 public Class<? extends TestBase> getCmdClass() { 640 return cmdClass; 641 } 642 643 public String getName() { 644 return name; 645 } 646 647 public String getDescription() { 648 return description; 649 } 650 } 651 652 /** 653 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. 654 * This makes tracking all these arguments a little easier. 655 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON 656 * serialization of this TestOptions class behave), and you need to add to the clone constructor 657 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below. 658 */ 659 static class TestOptions { 660 String cmdName = null; 661 boolean nomapred = false; 662 boolean filterAll = false; 663 int startRow = 0; 664 float size = 1.0f; 665 int perClientRunRows = DEFAULT_ROWS_PER_GB; 666 int numClientThreads = 1; 667 int totalRows = DEFAULT_ROWS_PER_GB; 668 int measureAfter = 0; 669 float sampleRate = 1.0f; 670 double traceRate = 0.0; 671 String tableName = TABLE_NAME; 672 boolean flushCommits = true; 673 boolean writeToWAL = true; 674 boolean autoFlush = false; 675 boolean oneCon = false; 676 int connCount = -1; //wil decide the actual num later 677 boolean useTags = false; 678 int noOfTags = 1; 679 boolean reportLatency = false; 680 int multiGet = 0; 681 int multiPut = 0; 682 int randomSleep = 0; 683 boolean inMemoryCF = false; 684 int presplitRegions = 0; 685 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; 686 String splitPolicy = null; 687 Compression.Algorithm compression = Compression.Algorithm.NONE; 688 BloomType bloomType = BloomType.ROW; 689 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 690 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 691 boolean valueRandom = false; 692 boolean valueZipf = false; 693 int valueSize = DEFAULT_VALUE_LENGTH; 694 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; 695 int cycles = 1; 696 int columns = 1; 697 int families = 1; 698 int caching = 30; 699 boolean addColumns = true; 700 MemoryCompactionPolicy inMemoryCompaction = 701 MemoryCompactionPolicy.valueOf( 702 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 703 boolean asyncPrefetch = false; 704 boolean cacheBlocks = true; 705 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 706 long bufferSize = 2l * 1024l * 1024l; 707 708 public TestOptions() {} 709 710 /** 711 * Clone constructor. 712 * @param that Object to copy from. 713 */ 714 public TestOptions(TestOptions that) { 715 this.cmdName = that.cmdName; 716 this.cycles = that.cycles; 717 this.nomapred = that.nomapred; 718 this.startRow = that.startRow; 719 this.size = that.size; 720 this.perClientRunRows = that.perClientRunRows; 721 this.numClientThreads = that.numClientThreads; 722 this.totalRows = that.totalRows; 723 this.sampleRate = that.sampleRate; 724 this.traceRate = that.traceRate; 725 this.tableName = that.tableName; 726 this.flushCommits = that.flushCommits; 727 this.writeToWAL = that.writeToWAL; 728 this.autoFlush = that.autoFlush; 729 this.oneCon = that.oneCon; 730 this.connCount = that.connCount; 731 this.useTags = that.useTags; 732 this.noOfTags = that.noOfTags; 733 this.reportLatency = that.reportLatency; 734 this.multiGet = that.multiGet; 735 this.multiPut = that.multiPut; 736 this.inMemoryCF = that.inMemoryCF; 737 this.presplitRegions = that.presplitRegions; 738 this.replicas = that.replicas; 739 this.splitPolicy = that.splitPolicy; 740 this.compression = that.compression; 741 this.blockEncoding = that.blockEncoding; 742 this.filterAll = that.filterAll; 743 this.bloomType = that.bloomType; 744 this.blockSize = that.blockSize; 745 this.valueRandom = that.valueRandom; 746 this.valueZipf = that.valueZipf; 747 this.valueSize = that.valueSize; 748 this.period = that.period; 749 this.randomSleep = that.randomSleep; 750 this.measureAfter = that.measureAfter; 751 this.addColumns = that.addColumns; 752 this.columns = that.columns; 753 this.families = that.families; 754 this.caching = that.caching; 755 this.inMemoryCompaction = that.inMemoryCompaction; 756 this.asyncPrefetch = that.asyncPrefetch; 757 this.cacheBlocks = that.cacheBlocks; 758 this.scanReadType = that.scanReadType; 759 this.bufferSize = that.bufferSize; 760 } 761 762 public int getCaching() { 763 return this.caching; 764 } 765 766 public void setCaching(final int caching) { 767 this.caching = caching; 768 } 769 770 public int getColumns() { 771 return this.columns; 772 } 773 774 public void setColumns(final int columns) { 775 this.columns = columns; 776 } 777 778 public int getFamilies() { 779 return this.families; 780 } 781 782 public void setFamilies(final int families) { 783 this.families = families; 784 } 785 786 public int getCycles() { 787 return this.cycles; 788 } 789 790 public void setCycles(final int cycles) { 791 this.cycles = cycles; 792 } 793 794 public boolean isValueZipf() { 795 return valueZipf; 796 } 797 798 public void setValueZipf(boolean valueZipf) { 799 this.valueZipf = valueZipf; 800 } 801 802 public String getCmdName() { 803 return cmdName; 804 } 805 806 public void setCmdName(String cmdName) { 807 this.cmdName = cmdName; 808 } 809 810 public int getRandomSleep() { 811 return randomSleep; 812 } 813 814 public void setRandomSleep(int randomSleep) { 815 this.randomSleep = randomSleep; 816 } 817 818 public int getReplicas() { 819 return replicas; 820 } 821 822 public void setReplicas(int replicas) { 823 this.replicas = replicas; 824 } 825 826 public String getSplitPolicy() { 827 return splitPolicy; 828 } 829 830 public void setSplitPolicy(String splitPolicy) { 831 this.splitPolicy = splitPolicy; 832 } 833 834 public void setNomapred(boolean nomapred) { 835 this.nomapred = nomapred; 836 } 837 838 public void setFilterAll(boolean filterAll) { 839 this.filterAll = filterAll; 840 } 841 842 public void setStartRow(int startRow) { 843 this.startRow = startRow; 844 } 845 846 public void setSize(float size) { 847 this.size = size; 848 } 849 850 public void setPerClientRunRows(int perClientRunRows) { 851 this.perClientRunRows = perClientRunRows; 852 } 853 854 public void setNumClientThreads(int numClientThreads) { 855 this.numClientThreads = numClientThreads; 856 } 857 858 public void setTotalRows(int totalRows) { 859 this.totalRows = totalRows; 860 } 861 862 public void setSampleRate(float sampleRate) { 863 this.sampleRate = sampleRate; 864 } 865 866 public void setTraceRate(double traceRate) { 867 this.traceRate = traceRate; 868 } 869 870 public void setTableName(String tableName) { 871 this.tableName = tableName; 872 } 873 874 public void setFlushCommits(boolean flushCommits) { 875 this.flushCommits = flushCommits; 876 } 877 878 public void setWriteToWAL(boolean writeToWAL) { 879 this.writeToWAL = writeToWAL; 880 } 881 882 public void setAutoFlush(boolean autoFlush) { 883 this.autoFlush = autoFlush; 884 } 885 886 public void setOneCon(boolean oneCon) { 887 this.oneCon = oneCon; 888 } 889 890 public int getConnCount() { 891 return connCount; 892 } 893 894 public void setConnCount(int connCount) { 895 this.connCount = connCount; 896 } 897 898 public void setUseTags(boolean useTags) { 899 this.useTags = useTags; 900 } 901 902 public void setNoOfTags(int noOfTags) { 903 this.noOfTags = noOfTags; 904 } 905 906 public void setReportLatency(boolean reportLatency) { 907 this.reportLatency = reportLatency; 908 } 909 910 public void setMultiGet(int multiGet) { 911 this.multiGet = multiGet; 912 } 913 914 public void setMultiPut(int multiPut) { 915 this.multiPut = multiPut; 916 } 917 918 public void setInMemoryCF(boolean inMemoryCF) { 919 this.inMemoryCF = inMemoryCF; 920 } 921 922 public void setPresplitRegions(int presplitRegions) { 923 this.presplitRegions = presplitRegions; 924 } 925 926 public void setCompression(Compression.Algorithm compression) { 927 this.compression = compression; 928 } 929 930 public void setBloomType(BloomType bloomType) { 931 this.bloomType = bloomType; 932 } 933 934 public void setBlockSize(int blockSize) { 935 this.blockSize = blockSize; 936 } 937 938 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 939 this.blockEncoding = blockEncoding; 940 } 941 942 public void setValueRandom(boolean valueRandom) { 943 this.valueRandom = valueRandom; 944 } 945 946 public void setValueSize(int valueSize) { 947 this.valueSize = valueSize; 948 } 949 950 public void setBufferSize(long bufferSize) { 951 this.bufferSize = bufferSize; 952 } 953 954 public void setPeriod(int period) { 955 this.period = period; 956 } 957 958 public boolean isNomapred() { 959 return nomapred; 960 } 961 962 public boolean isFilterAll() { 963 return filterAll; 964 } 965 966 public int getStartRow() { 967 return startRow; 968 } 969 970 public float getSize() { 971 return size; 972 } 973 974 public int getPerClientRunRows() { 975 return perClientRunRows; 976 } 977 978 public int getNumClientThreads() { 979 return numClientThreads; 980 } 981 982 public int getTotalRows() { 983 return totalRows; 984 } 985 986 public float getSampleRate() { 987 return sampleRate; 988 } 989 990 public double getTraceRate() { 991 return traceRate; 992 } 993 994 public String getTableName() { 995 return tableName; 996 } 997 998 public boolean isFlushCommits() { 999 return flushCommits; 1000 } 1001 1002 public boolean isWriteToWAL() { 1003 return writeToWAL; 1004 } 1005 1006 public boolean isAutoFlush() { 1007 return autoFlush; 1008 } 1009 1010 public boolean isUseTags() { 1011 return useTags; 1012 } 1013 1014 public int getNoOfTags() { 1015 return noOfTags; 1016 } 1017 1018 public boolean isReportLatency() { 1019 return reportLatency; 1020 } 1021 1022 public int getMultiGet() { 1023 return multiGet; 1024 } 1025 1026 public int getMultiPut() { 1027 return multiPut; 1028 } 1029 1030 public boolean isInMemoryCF() { 1031 return inMemoryCF; 1032 } 1033 1034 public int getPresplitRegions() { 1035 return presplitRegions; 1036 } 1037 1038 public Compression.Algorithm getCompression() { 1039 return compression; 1040 } 1041 1042 public DataBlockEncoding getBlockEncoding() { 1043 return blockEncoding; 1044 } 1045 1046 public boolean isValueRandom() { 1047 return valueRandom; 1048 } 1049 1050 public int getValueSize() { 1051 return valueSize; 1052 } 1053 1054 public int getPeriod() { 1055 return period; 1056 } 1057 1058 public BloomType getBloomType() { 1059 return bloomType; 1060 } 1061 1062 public int getBlockSize() { 1063 return blockSize; 1064 } 1065 1066 public boolean isOneCon() { 1067 return oneCon; 1068 } 1069 1070 public int getMeasureAfter() { 1071 return measureAfter; 1072 } 1073 1074 public void setMeasureAfter(int measureAfter) { 1075 this.measureAfter = measureAfter; 1076 } 1077 1078 public boolean getAddColumns() { 1079 return addColumns; 1080 } 1081 1082 public void setAddColumns(boolean addColumns) { 1083 this.addColumns = addColumns; 1084 } 1085 1086 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1087 this.inMemoryCompaction = inMemoryCompaction; 1088 } 1089 1090 public MemoryCompactionPolicy getInMemoryCompaction() { 1091 return this.inMemoryCompaction; 1092 } 1093 1094 public long getBufferSize() { 1095 return this.bufferSize; 1096 } 1097 } 1098 1099 /* 1100 * A test. 1101 * Subclass to particularize what happens per row. 1102 */ 1103 static abstract class TestBase { 1104 // Below is make it so when Tests are all running in the one 1105 // jvm, that they each have a differently seeded Random. 1106 private static final Random randomSeed = new Random(System.currentTimeMillis()); 1107 1108 private static long nextRandomSeed() { 1109 return randomSeed.nextLong(); 1110 } 1111 private final int everyN; 1112 1113 protected final Random rand = new Random(nextRandomSeed()); 1114 protected final Configuration conf; 1115 protected final TestOptions opts; 1116 1117 private final Status status; 1118 private final Sampler traceSampler; 1119 private final SpanReceiverHost receiverHost; 1120 1121 private String testName; 1122 private Histogram latencyHistogram; 1123 private Histogram valueSizeHistogram; 1124 private Histogram rpcCallsHistogram; 1125 private Histogram remoteRpcCallsHistogram; 1126 private Histogram millisBetweenNextHistogram; 1127 private Histogram regionsScannedHistogram; 1128 private Histogram bytesInResultsHistogram; 1129 private Histogram bytesInRemoteResultsHistogram; 1130 private RandomDistribution.Zipf zipf; 1131 1132 /** 1133 * Note that all subclasses of this class must provide a public constructor 1134 * that has the exact same list of arguments. 1135 */ 1136 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1137 this.conf = conf; 1138 this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); 1139 this.opts = options; 1140 this.status = status; 1141 this.testName = this.getClass().getSimpleName(); 1142 if (options.traceRate >= 1.0) { 1143 this.traceSampler = Sampler.ALWAYS; 1144 } else if (options.traceRate > 0.0) { 1145 conf.setDouble("hbase.sampler.fraction", options.traceRate); 1146 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); 1147 } else { 1148 this.traceSampler = Sampler.NEVER; 1149 } 1150 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1151 if (options.isValueZipf()) { 1152 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1153 } 1154 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1155 } 1156 1157 int getValueLength(final Random r) { 1158 if (this.opts.isValueRandom()) { 1159 return r.nextInt(opts.valueSize); 1160 } else if (this.opts.isValueZipf()) { 1161 return Math.abs(this.zipf.nextInt()); 1162 } else { 1163 return opts.valueSize; 1164 } 1165 } 1166 1167 void updateValueSize(final Result [] rs) throws IOException { 1168 if (rs == null || !isRandomValueSize()) return; 1169 for (Result r: rs) updateValueSize(r); 1170 } 1171 1172 void updateValueSize(final Result r) throws IOException { 1173 if (r == null || !isRandomValueSize()) return; 1174 int size = 0; 1175 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1176 size += scanner.current().getValueLength(); 1177 } 1178 updateValueSize(size); 1179 } 1180 1181 void updateValueSize(final int valueSize) { 1182 if (!isRandomValueSize()) return; 1183 this.valueSizeHistogram.update(valueSize); 1184 } 1185 1186 void updateScanMetrics(final ScanMetrics metrics) { 1187 if (metrics == null) return; 1188 Map<String,Long> metricsMap = metrics.getMetricsMap(); 1189 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1190 if (rpcCalls != null) { 1191 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1192 } 1193 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1194 if (remoteRpcCalls != null) { 1195 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1196 } 1197 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1198 if (millisBetweenNext != null) { 1199 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1200 } 1201 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1202 if (regionsScanned != null) { 1203 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1204 } 1205 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1206 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1207 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1208 } 1209 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1210 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1211 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1212 } 1213 } 1214 1215 String generateStatus(final int sr, final int i, final int lr) { 1216 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() + 1217 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport()); 1218 } 1219 1220 boolean isRandomValueSize() { 1221 return opts.valueRandom; 1222 } 1223 1224 protected int getReportingPeriod() { 1225 return opts.period; 1226 } 1227 1228 /** 1229 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1230 */ 1231 public Histogram getLatencyHistogram() { 1232 return latencyHistogram; 1233 } 1234 1235 void testSetup() throws IOException { 1236 // test metrics 1237 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1238 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1239 // scan metrics 1240 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1241 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1242 millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1243 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1244 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1245 bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1246 1247 onStartup(); 1248 } 1249 1250 abstract void onStartup() throws IOException; 1251 1252 void testTakedown() throws IOException { 1253 onTakedown(); 1254 // Print all stats for this thread continuously. 1255 // Synchronize on Test.class so different threads don't intermingle the 1256 // output. We can't use 'this' here because each thread has its own instance of Test class. 1257 synchronized (Test.class) { 1258 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1259 status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport( 1260 latencyHistogram)); 1261 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1262 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1263 if (valueSizeHistogram.getCount() > 0) { 1264 status.setStatus("ValueSize (bytes) : " 1265 + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1266 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1267 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1268 } else { 1269 status.setStatus("No valueSize statistics available"); 1270 } 1271 if (rpcCallsHistogram.getCount() > 0) { 1272 status.setStatus("rpcCalls (count): " + 1273 YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1274 } 1275 if (remoteRpcCallsHistogram.getCount() > 0) { 1276 status.setStatus("remoteRpcCalls (count): " + 1277 YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1278 } 1279 if (millisBetweenNextHistogram.getCount() > 0) { 1280 status.setStatus("millisBetweenNext (latency): " + 1281 YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1282 } 1283 if (regionsScannedHistogram.getCount() > 0) { 1284 status.setStatus("regionsScanned (count): " + 1285 YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1286 } 1287 if (bytesInResultsHistogram.getCount() > 0) { 1288 status.setStatus("bytesInResults (size): " + 1289 YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1290 } 1291 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1292 status.setStatus("bytesInRemoteResults (size): " + 1293 YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1294 } 1295 } 1296 receiverHost.closeReceivers(); 1297 } 1298 1299 abstract void onTakedown() throws IOException; 1300 1301 1302 /* 1303 * Run test 1304 * @return Elapsed time. 1305 * @throws IOException 1306 */ 1307 long test() throws IOException, InterruptedException { 1308 testSetup(); 1309 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1310 final long startTime = System.nanoTime(); 1311 try { 1312 testTimed(); 1313 } finally { 1314 testTakedown(); 1315 } 1316 return (System.nanoTime() - startTime) / 1000000; 1317 } 1318 1319 int getStartRow() { 1320 return opts.startRow; 1321 } 1322 1323 int getLastRow() { 1324 return getStartRow() + opts.perClientRunRows; 1325 } 1326 1327 /** 1328 * Provides an extension point for tests that don't want a per row invocation. 1329 */ 1330 void testTimed() throws IOException, InterruptedException { 1331 int startRow = getStartRow(); 1332 int lastRow = getLastRow(); 1333 TraceUtil.addSampler(traceSampler); 1334 // Report on completion of 1/10th of total. 1335 for (int ii = 0; ii < opts.cycles; ii++) { 1336 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1337 for (int i = startRow; i < lastRow; i++) { 1338 if (i % everyN != 0) continue; 1339 long startTime = System.nanoTime(); 1340 boolean requestSent = false; 1341 try (TraceScope scope = TraceUtil.createTrace("test row");){ 1342 requestSent = testRow(i); 1343 } 1344 if ( (i - startRow) > opts.measureAfter) { 1345 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1346 // first 9 times and sends the actual get request in the 10th iteration. 1347 // We should only set latency when actual request is sent because otherwise 1348 // it turns out to be 0. 1349 if (requestSent) { 1350 latencyHistogram.update((System.nanoTime() - startTime) / 1000); 1351 } 1352 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1353 status.setStatus(generateStatus(startRow, i, lastRow)); 1354 } 1355 } 1356 } 1357 } 1358 } 1359 1360 /** 1361 * @return Subset of the histograms' calculation. 1362 */ 1363 public String getShortLatencyReport() { 1364 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1365 } 1366 1367 /** 1368 * @return Subset of the histograms' calculation. 1369 */ 1370 public String getShortValueSizeReport() { 1371 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1372 } 1373 1374 1375 /** 1376 * Test for individual row. 1377 * @param i Row index. 1378 * @return true if the row was sent to server and need to record metrics. 1379 * False if not, multiGet and multiPut e.g., the rows are sent 1380 * to server only if enough gets/puts are gathered. 1381 */ 1382 abstract boolean testRow(final int i) throws IOException, InterruptedException; 1383 } 1384 1385 static abstract class Test extends TestBase { 1386 protected Connection connection; 1387 1388 Test(final Connection con, final TestOptions options, final Status status) { 1389 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1390 this.connection = con; 1391 } 1392 } 1393 1394 static abstract class AsyncTest extends TestBase { 1395 protected AsyncConnection connection; 1396 1397 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1398 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1399 this.connection = con; 1400 } 1401 } 1402 1403 static abstract class TableTest extends Test { 1404 protected Table table; 1405 1406 TableTest(Connection con, TestOptions options, Status status) { 1407 super(con, options, status); 1408 } 1409 1410 @Override 1411 void onStartup() throws IOException { 1412 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1413 } 1414 1415 @Override 1416 void onTakedown() throws IOException { 1417 table.close(); 1418 } 1419 } 1420 1421 static abstract class AsyncTableTest extends AsyncTest { 1422 protected AsyncTable<?> table; 1423 1424 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1425 super(con, options, status); 1426 } 1427 1428 @Override 1429 void onStartup() throws IOException { 1430 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1431 } 1432 1433 @Override 1434 void onTakedown() throws IOException { 1435 } 1436 } 1437 1438 static class AsyncRandomReadTest extends AsyncTableTest { 1439 private final Consistency consistency; 1440 private ArrayList<Get> gets; 1441 private Random rd = new Random(); 1442 1443 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1444 super(con, options, status); 1445 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1446 if (opts.multiGet > 0) { 1447 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1448 this.gets = new ArrayList<>(opts.multiGet); 1449 } 1450 } 1451 1452 @Override 1453 boolean testRow(final int i) throws IOException, InterruptedException { 1454 if (opts.randomSleep > 0) { 1455 Thread.sleep(rd.nextInt(opts.randomSleep)); 1456 } 1457 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1458 for (int family = 0; family < opts.families; family++) { 1459 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1460 if (opts.addColumns) { 1461 for (int column = 0; column < opts.columns; column++) { 1462 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1463 get.addColumn(familyName, qualifier); 1464 } 1465 } else { 1466 get.addFamily(familyName); 1467 } 1468 } 1469 if (opts.filterAll) { 1470 get.setFilter(new FilterAllFilter()); 1471 } 1472 get.setConsistency(consistency); 1473 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1474 try { 1475 if (opts.multiGet > 0) { 1476 this.gets.add(get); 1477 if (this.gets.size() == opts.multiGet) { 1478 Result[] rs = 1479 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1480 updateValueSize(rs); 1481 this.gets.clear(); 1482 } else { 1483 return false; 1484 } 1485 } else { 1486 updateValueSize(this.table.get(get).get()); 1487 } 1488 } catch (ExecutionException e) { 1489 throw new IOException(e); 1490 } 1491 return true; 1492 } 1493 1494 public static RuntimeException runtime(Throwable e) { 1495 if (e instanceof RuntimeException) { 1496 return (RuntimeException) e; 1497 } 1498 return new RuntimeException(e); 1499 } 1500 1501 public static <V> V propagate(Callable<V> callable) { 1502 try { 1503 return callable.call(); 1504 } catch (Exception e) { 1505 throw runtime(e); 1506 } 1507 } 1508 1509 @Override 1510 protected int getReportingPeriod() { 1511 int period = opts.perClientRunRows / 10; 1512 return period == 0 ? opts.perClientRunRows : period; 1513 } 1514 1515 @Override 1516 protected void testTakedown() throws IOException { 1517 if (this.gets != null && this.gets.size() > 0) { 1518 this.table.get(gets); 1519 this.gets.clear(); 1520 } 1521 super.testTakedown(); 1522 } 1523 } 1524 1525 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1526 1527 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1528 super(con, options, status); 1529 } 1530 1531 @Override 1532 protected byte[] generateRow(final int i) { 1533 return getRandomRow(this.rand, opts.totalRows); 1534 } 1535 } 1536 1537 static class AsyncScanTest extends AsyncTableTest { 1538 private ResultScanner testScanner; 1539 private AsyncTable<?> asyncTable; 1540 1541 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1542 super(con, options, status); 1543 } 1544 1545 @Override 1546 void onStartup() throws IOException { 1547 this.asyncTable = 1548 connection.getTable(TableName.valueOf(opts.tableName), 1549 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1550 } 1551 1552 @Override 1553 void testTakedown() throws IOException { 1554 if (this.testScanner != null) { 1555 updateScanMetrics(this.testScanner.getScanMetrics()); 1556 this.testScanner.close(); 1557 } 1558 super.testTakedown(); 1559 } 1560 1561 @Override 1562 boolean testRow(final int i) throws IOException { 1563 if (this.testScanner == null) { 1564 Scan scan = 1565 new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1566 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1567 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1568 for (int family = 0; family < opts.families; family++) { 1569 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1570 if (opts.addColumns) { 1571 for (int column = 0; column < opts.columns; column++) { 1572 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1573 scan.addColumn(familyName, qualifier); 1574 } 1575 } else { 1576 scan.addFamily(familyName); 1577 } 1578 } 1579 if (opts.filterAll) { 1580 scan.setFilter(new FilterAllFilter()); 1581 } 1582 this.testScanner = asyncTable.getScanner(scan); 1583 } 1584 Result r = testScanner.next(); 1585 updateValueSize(r); 1586 return true; 1587 } 1588 } 1589 1590 static class AsyncSequentialReadTest extends AsyncTableTest { 1591 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1592 super(con, options, status); 1593 } 1594 1595 @Override 1596 boolean testRow(final int i) throws IOException, InterruptedException { 1597 Get get = new Get(format(i)); 1598 for (int family = 0; family < opts.families; family++) { 1599 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1600 if (opts.addColumns) { 1601 for (int column = 0; column < opts.columns; column++) { 1602 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1603 get.addColumn(familyName, qualifier); 1604 } 1605 } else { 1606 get.addFamily(familyName); 1607 } 1608 } 1609 if (opts.filterAll) { 1610 get.setFilter(new FilterAllFilter()); 1611 } 1612 try { 1613 updateValueSize(table.get(get).get()); 1614 } catch (ExecutionException e) { 1615 throw new IOException(e); 1616 } 1617 return true; 1618 } 1619 } 1620 1621 static class AsyncSequentialWriteTest extends AsyncTableTest { 1622 private ArrayList<Put> puts; 1623 1624 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1625 super(con, options, status); 1626 if (opts.multiPut > 0) { 1627 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1628 this.puts = new ArrayList<>(opts.multiPut); 1629 } 1630 } 1631 1632 protected byte[] generateRow(final int i) { 1633 return format(i); 1634 } 1635 1636 @Override 1637 @SuppressWarnings("ReturnValueIgnored") 1638 boolean testRow(final int i) throws IOException, InterruptedException { 1639 byte[] row = generateRow(i); 1640 Put put = new Put(row); 1641 for (int family = 0; family < opts.families; family++) { 1642 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1643 for (int column = 0; column < opts.columns; column++) { 1644 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1645 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1646 if (opts.useTags) { 1647 byte[] tag = generateData(this.rand, TAG_LENGTH); 1648 Tag[] tags = new Tag[opts.noOfTags]; 1649 for (int n = 0; n < opts.noOfTags; n++) { 1650 Tag t = new ArrayBackedTag((byte) n, tag); 1651 tags[n] = t; 1652 } 1653 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 1654 value, tags); 1655 put.add(kv); 1656 updateValueSize(kv.getValueLength()); 1657 } else { 1658 put.addColumn(familyName, qualifier, value); 1659 updateValueSize(value.length); 1660 } 1661 } 1662 } 1663 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1664 try { 1665 table.put(put).get(); 1666 if (opts.multiPut > 0) { 1667 this.puts.add(put); 1668 if (this.puts.size() == opts.multiPut) { 1669 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1670 this.puts.clear(); 1671 } else { 1672 return false; 1673 } 1674 } else { 1675 table.put(put).get(); 1676 } 1677 } catch (ExecutionException e) { 1678 throw new IOException(e); 1679 } 1680 return true; 1681 } 1682 } 1683 1684 static abstract class BufferedMutatorTest extends Test { 1685 protected BufferedMutator mutator; 1686 protected Table table; 1687 1688 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1689 super(con, options, status); 1690 } 1691 1692 @Override 1693 void onStartup() throws IOException { 1694 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1695 p.writeBufferSize(opts.bufferSize); 1696 this.mutator = connection.getBufferedMutator(p); 1697 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1698 } 1699 1700 @Override 1701 void onTakedown() throws IOException { 1702 mutator.close(); 1703 table.close(); 1704 } 1705 } 1706 1707 static class RandomSeekScanTest extends TableTest { 1708 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1709 super(con, options, status); 1710 } 1711 1712 @Override 1713 boolean testRow(final int i) throws IOException { 1714 Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) 1715 .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 1716 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 1717 .setScanMetricsEnabled(true); 1718 FilterList list = new FilterList(); 1719 for (int family = 0; family < opts.families; family++) { 1720 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1721 if (opts.addColumns) { 1722 for (int column = 0; column < opts.columns; column++) { 1723 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1724 scan.addColumn(familyName, qualifier); 1725 } 1726 } else { 1727 scan.addFamily(familyName); 1728 } 1729 } 1730 if (opts.filterAll) { 1731 list.addFilter(new FilterAllFilter()); 1732 } 1733 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1734 scan.setFilter(list); 1735 ResultScanner s = this.table.getScanner(scan); 1736 try { 1737 for (Result rr; (rr = s.next()) != null;) { 1738 updateValueSize(rr); 1739 } 1740 } finally { 1741 updateScanMetrics(s.getScanMetrics()); 1742 s.close(); 1743 } 1744 return true; 1745 } 1746 1747 @Override 1748 protected int getReportingPeriod() { 1749 int period = opts.perClientRunRows / 100; 1750 return period == 0 ? opts.perClientRunRows : period; 1751 } 1752 1753 } 1754 1755 static abstract class RandomScanWithRangeTest extends TableTest { 1756 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1757 super(con, options, status); 1758 } 1759 1760 @Override 1761 boolean testRow(final int i) throws IOException { 1762 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1763 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1764 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1765 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1766 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1767 for (int family = 0; family < opts.families; family++) { 1768 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1769 if (opts.addColumns) { 1770 for (int column = 0; column < opts.columns; column++) { 1771 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1772 scan.addColumn(familyName, qualifier); 1773 } 1774 } else { 1775 scan.addFamily(familyName); 1776 } 1777 } 1778 if (opts.filterAll) { 1779 scan.setFilter(new FilterAllFilter()); 1780 } 1781 Result r = null; 1782 int count = 0; 1783 ResultScanner s = this.table.getScanner(scan); 1784 try { 1785 for (; (r = s.next()) != null;) { 1786 updateValueSize(r); 1787 count++; 1788 } 1789 if (i % 100 == 0) { 1790 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1791 Bytes.toString(startAndStopRow.getFirst()), 1792 Bytes.toString(startAndStopRow.getSecond()), count)); 1793 } 1794 } finally { 1795 updateScanMetrics(s.getScanMetrics()); 1796 s.close(); 1797 } 1798 return true; 1799 } 1800 1801 protected abstract Pair<byte[],byte[]> getStartAndStopRow(); 1802 1803 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1804 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1805 int stop = start + maxRange; 1806 return new Pair<>(format(start), format(stop)); 1807 } 1808 1809 @Override 1810 protected int getReportingPeriod() { 1811 int period = opts.perClientRunRows / 100; 1812 return period == 0? opts.perClientRunRows: period; 1813 } 1814 } 1815 1816 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1817 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1818 super(con, options, status); 1819 } 1820 1821 @Override 1822 protected Pair<byte[], byte[]> getStartAndStopRow() { 1823 return generateStartAndStopRows(10); 1824 } 1825 } 1826 1827 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1828 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1829 super(con, options, status); 1830 } 1831 1832 @Override 1833 protected Pair<byte[], byte[]> getStartAndStopRow() { 1834 return generateStartAndStopRows(100); 1835 } 1836 } 1837 1838 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1839 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1840 super(con, options, status); 1841 } 1842 1843 @Override 1844 protected Pair<byte[], byte[]> getStartAndStopRow() { 1845 return generateStartAndStopRows(1000); 1846 } 1847 } 1848 1849 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1850 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1851 super(con, options, status); 1852 } 1853 1854 @Override 1855 protected Pair<byte[], byte[]> getStartAndStopRow() { 1856 return generateStartAndStopRows(10000); 1857 } 1858 } 1859 1860 static class RandomReadTest extends TableTest { 1861 private final Consistency consistency; 1862 private ArrayList<Get> gets; 1863 private Random rd = new Random(); 1864 1865 RandomReadTest(Connection con, TestOptions options, Status status) { 1866 super(con, options, status); 1867 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1868 if (opts.multiGet > 0) { 1869 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1870 this.gets = new ArrayList<>(opts.multiGet); 1871 } 1872 } 1873 1874 @Override 1875 boolean testRow(final int i) throws IOException, InterruptedException { 1876 if (opts.randomSleep > 0) { 1877 Thread.sleep(rd.nextInt(opts.randomSleep)); 1878 } 1879 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1880 for (int family = 0; family < opts.families; family++) { 1881 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1882 if (opts.addColumns) { 1883 for (int column = 0; column < opts.columns; column++) { 1884 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1885 get.addColumn(familyName, qualifier); 1886 } 1887 } else { 1888 get.addFamily(familyName); 1889 } 1890 } 1891 if (opts.filterAll) { 1892 get.setFilter(new FilterAllFilter()); 1893 } 1894 get.setConsistency(consistency); 1895 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1896 if (opts.multiGet > 0) { 1897 this.gets.add(get); 1898 if (this.gets.size() == opts.multiGet) { 1899 Result [] rs = this.table.get(this.gets); 1900 updateValueSize(rs); 1901 this.gets.clear(); 1902 } else { 1903 return false; 1904 } 1905 } else { 1906 updateValueSize(this.table.get(get)); 1907 } 1908 return true; 1909 } 1910 1911 @Override 1912 protected int getReportingPeriod() { 1913 int period = opts.perClientRunRows / 10; 1914 return period == 0 ? opts.perClientRunRows : period; 1915 } 1916 1917 @Override 1918 protected void testTakedown() throws IOException { 1919 if (this.gets != null && this.gets.size() > 0) { 1920 this.table.get(gets); 1921 this.gets.clear(); 1922 } 1923 super.testTakedown(); 1924 } 1925 } 1926 1927 static class RandomWriteTest extends SequentialWriteTest { 1928 RandomWriteTest(Connection con, TestOptions options, Status status) { 1929 super(con, options, status); 1930 } 1931 1932 @Override 1933 protected byte[] generateRow(final int i) { 1934 return getRandomRow(this.rand, opts.totalRows); 1935 } 1936 1937 1938 } 1939 1940 static class ScanTest extends TableTest { 1941 private ResultScanner testScanner; 1942 1943 ScanTest(Connection con, TestOptions options, Status status) { 1944 super(con, options, status); 1945 } 1946 1947 @Override 1948 void testTakedown() throws IOException { 1949 if (this.testScanner != null) { 1950 this.testScanner.close(); 1951 } 1952 super.testTakedown(); 1953 } 1954 1955 1956 @Override 1957 boolean testRow(final int i) throws IOException { 1958 if (this.testScanner == null) { 1959 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1960 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1961 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1962 for (int family = 0; family < opts.families; family++) { 1963 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1964 if (opts.addColumns) { 1965 for (int column = 0; column < opts.columns; column++) { 1966 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1967 scan.addColumn(familyName, qualifier); 1968 } 1969 } else { 1970 scan.addFamily(familyName); 1971 } 1972 } 1973 if (opts.filterAll) { 1974 scan.setFilter(new FilterAllFilter()); 1975 } 1976 this.testScanner = table.getScanner(scan); 1977 } 1978 Result r = testScanner.next(); 1979 updateValueSize(r); 1980 return true; 1981 } 1982 } 1983 1984 /** 1985 * Base class for operations that are CAS-like; that read a value and then set it based off what 1986 * they read. In this category is increment, append, checkAndPut, etc. 1987 * 1988 * <p>These operations also want some concurrency going on. Usually when these tests run, they 1989 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 1990 * same key space. We do this with our getStartRow and getLastRow overrides. 1991 */ 1992 static abstract class CASTableTest extends TableTest { 1993 private final byte [] qualifier; 1994 CASTableTest(Connection con, TestOptions options, Status status) { 1995 super(con, options, status); 1996 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 1997 } 1998 1999 byte [] getQualifier() { 2000 return this.qualifier; 2001 } 2002 2003 @Override 2004 int getStartRow() { 2005 return 0; 2006 } 2007 2008 @Override 2009 int getLastRow() { 2010 return opts.perClientRunRows; 2011 } 2012 } 2013 2014 static class IncrementTest extends CASTableTest { 2015 IncrementTest(Connection con, TestOptions options, Status status) { 2016 super(con, options, status); 2017 } 2018 2019 @Override 2020 boolean testRow(final int i) throws IOException { 2021 Increment increment = new Increment(format(i)); 2022 // unlike checkAndXXX tests, which make most sense to do on a single value, 2023 // if multiple families are specified for an increment test we assume it is 2024 // meant to raise the work factor 2025 for (int family = 0; family < opts.families; family++) { 2026 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2027 increment.addColumn(familyName, getQualifier(), 1l); 2028 } 2029 updateValueSize(this.table.increment(increment)); 2030 return true; 2031 } 2032 } 2033 2034 static class AppendTest extends CASTableTest { 2035 AppendTest(Connection con, TestOptions options, Status status) { 2036 super(con, options, status); 2037 } 2038 2039 @Override 2040 boolean testRow(final int i) throws IOException { 2041 byte [] bytes = format(i); 2042 Append append = new Append(bytes); 2043 // unlike checkAndXXX tests, which make most sense to do on a single value, 2044 // if multiple families are specified for an append test we assume it is 2045 // meant to raise the work factor 2046 for (int family = 0; family < opts.families; family++) { 2047 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2048 append.addColumn(familyName, getQualifier(), bytes); 2049 } 2050 updateValueSize(this.table.append(append)); 2051 return true; 2052 } 2053 } 2054 2055 static class CheckAndMutateTest extends CASTableTest { 2056 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2057 super(con, options, status); 2058 } 2059 2060 @Override 2061 boolean testRow(final int i) throws IOException { 2062 final byte [] bytes = format(i); 2063 // checkAndXXX tests operate on only a single value 2064 // Put a known value so when we go to check it, it is there. 2065 Put put = new Put(bytes); 2066 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2067 this.table.put(put); 2068 RowMutations mutations = new RowMutations(bytes); 2069 mutations.add(put); 2070 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2071 .ifEquals(bytes).thenMutate(mutations); 2072 return true; 2073 } 2074 } 2075 2076 static class CheckAndPutTest extends CASTableTest { 2077 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2078 super(con, options, status); 2079 } 2080 2081 @Override 2082 boolean testRow(final int i) throws IOException { 2083 final byte [] bytes = format(i); 2084 // checkAndXXX tests operate on only a single value 2085 // Put a known value so when we go to check it, it is there. 2086 Put put = new Put(bytes); 2087 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2088 this.table.put(put); 2089 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2090 .ifEquals(bytes).thenPut(put); 2091 return true; 2092 } 2093 } 2094 2095 static class CheckAndDeleteTest extends CASTableTest { 2096 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2097 super(con, options, status); 2098 } 2099 2100 @Override 2101 boolean testRow(final int i) throws IOException { 2102 final byte [] bytes = format(i); 2103 // checkAndXXX tests operate on only a single value 2104 // Put a known value so when we go to check it, it is there. 2105 Put put = new Put(bytes); 2106 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2107 this.table.put(put); 2108 Delete delete = new Delete(put.getRow()); 2109 delete.addColumn(FAMILY_ZERO, getQualifier()); 2110 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2111 .ifEquals(bytes).thenDelete(delete); 2112 return true; 2113 } 2114 } 2115 2116 static class SequentialReadTest extends TableTest { 2117 SequentialReadTest(Connection con, TestOptions options, Status status) { 2118 super(con, options, status); 2119 } 2120 2121 @Override 2122 boolean testRow(final int i) throws IOException { 2123 Get get = new Get(format(i)); 2124 for (int family = 0; family < opts.families; family++) { 2125 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2126 if (opts.addColumns) { 2127 for (int column = 0; column < opts.columns; column++) { 2128 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2129 get.addColumn(familyName, qualifier); 2130 } 2131 } else { 2132 get.addFamily(familyName); 2133 } 2134 } 2135 if (opts.filterAll) { 2136 get.setFilter(new FilterAllFilter()); 2137 } 2138 updateValueSize(table.get(get)); 2139 return true; 2140 } 2141 } 2142 2143 static class SequentialWriteTest extends BufferedMutatorTest { 2144 private ArrayList<Put> puts; 2145 2146 2147 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2148 super(con, options, status); 2149 if (opts.multiPut > 0) { 2150 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2151 this.puts = new ArrayList<>(opts.multiPut); 2152 } 2153 } 2154 2155 protected byte[] generateRow(final int i) { 2156 return format(i); 2157 } 2158 2159 @Override 2160 boolean testRow(final int i) throws IOException { 2161 byte[] row = generateRow(i); 2162 Put put = new Put(row); 2163 for (int family = 0; family < opts.families; family++) { 2164 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2165 for (int column = 0; column < opts.columns; column++) { 2166 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2167 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2168 if (opts.useTags) { 2169 byte[] tag = generateData(this.rand, TAG_LENGTH); 2170 Tag[] tags = new Tag[opts.noOfTags]; 2171 for (int n = 0; n < opts.noOfTags; n++) { 2172 Tag t = new ArrayBackedTag((byte) n, tag); 2173 tags[n] = t; 2174 } 2175 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 2176 value, tags); 2177 put.add(kv); 2178 updateValueSize(kv.getValueLength()); 2179 } else { 2180 put.addColumn(familyName, qualifier, value); 2181 updateValueSize(value.length); 2182 } 2183 } 2184 } 2185 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2186 if (opts.autoFlush) { 2187 if (opts.multiPut > 0) { 2188 this.puts.add(put); 2189 if (this.puts.size() == opts.multiPut) { 2190 table.put(this.puts); 2191 this.puts.clear(); 2192 } else { 2193 return false; 2194 } 2195 } else { 2196 table.put(put); 2197 } 2198 } else { 2199 mutator.mutate(put); 2200 } 2201 return true; 2202 } 2203 } 2204 2205 static class FilteredScanTest extends TableTest { 2206 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2207 2208 FilteredScanTest(Connection con, TestOptions options, Status status) { 2209 super(con, options, status); 2210 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2211 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB + 2212 ". This could take a very long time."); 2213 } 2214 } 2215 2216 @Override 2217 boolean testRow(int i) throws IOException { 2218 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2219 Scan scan = constructScan(value); 2220 ResultScanner scanner = null; 2221 try { 2222 scanner = this.table.getScanner(scan); 2223 for (Result r = null; (r = scanner.next()) != null;) { 2224 updateValueSize(r); 2225 } 2226 } finally { 2227 if (scanner != null) { 2228 updateScanMetrics(scanner.getScanMetrics()); 2229 scanner.close(); 2230 } 2231 } 2232 return true; 2233 } 2234 2235 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2236 FilterList list = new FilterList(); 2237 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, 2238 CompareOperator.EQUAL, new BinaryComparator(valuePrefix)); 2239 list.addFilter(filter); 2240 if (opts.filterAll) { 2241 list.addFilter(new FilterAllFilter()); 2242 } 2243 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2244 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2245 .setScanMetricsEnabled(true); 2246 if (opts.addColumns) { 2247 for (int column = 0; column < opts.columns; column++) { 2248 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2249 scan.addColumn(FAMILY_ZERO, qualifier); 2250 } 2251 } else { 2252 scan.addFamily(FAMILY_ZERO); 2253 } 2254 scan.setFilter(list); 2255 return scan; 2256 } 2257 } 2258 2259 /** 2260 * Compute a throughput rate in MB/s. 2261 * @param rows Number of records consumed. 2262 * @param timeMs Time taken in milliseconds. 2263 * @return String value with label, ie '123.76 MB/s' 2264 */ 2265 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) { 2266 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH + 2267 ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families); 2268 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2269 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) 2270 .divide(BYTES_PER_MB, CXT); 2271 return FMT.format(mbps) + " MB/s"; 2272 } 2273 2274 /* 2275 * Format passed integer. 2276 * @param number 2277 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed 2278 * number (Does absolute in case number is negative). 2279 */ 2280 public static byte [] format(final int number) { 2281 byte [] b = new byte[ROW_LENGTH]; 2282 int d = Math.abs(number); 2283 for (int i = b.length - 1; i >= 0; i--) { 2284 b[i] = (byte)((d % 10) + '0'); 2285 d /= 10; 2286 } 2287 return b; 2288 } 2289 2290 /* 2291 * This method takes some time and is done inline uploading data. For 2292 * example, doing the mapfile test, generation of the key and value 2293 * consumes about 30% of CPU time. 2294 * @return Generated random value to insert into a table cell. 2295 */ 2296 public static byte[] generateData(final Random r, int length) { 2297 byte [] b = new byte [length]; 2298 int i; 2299 2300 for(i = 0; i < (length-8); i += 8) { 2301 b[i] = (byte) (65 + r.nextInt(26)); 2302 b[i+1] = b[i]; 2303 b[i+2] = b[i]; 2304 b[i+3] = b[i]; 2305 b[i+4] = b[i]; 2306 b[i+5] = b[i]; 2307 b[i+6] = b[i]; 2308 b[i+7] = b[i]; 2309 } 2310 2311 byte a = (byte) (65 + r.nextInt(26)); 2312 for(; i < length; i++) { 2313 b[i] = a; 2314 } 2315 return b; 2316 } 2317 2318 static byte [] getRandomRow(final Random random, final int totalRows) { 2319 return format(generateRandomRow(random, totalRows)); 2320 } 2321 2322 static int generateRandomRow(final Random random, final int totalRows) { 2323 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2324 } 2325 2326 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2327 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2328 throws IOException, InterruptedException { 2329 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " 2330 + opts.perClientRunRows + " rows"); 2331 long totalElapsedTime; 2332 2333 final TestBase t; 2334 try { 2335 if (AsyncTest.class.isAssignableFrom(cmd)) { 2336 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2337 Constructor<? extends AsyncTest> constructor = 2338 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2339 t = constructor.newInstance(asyncCon, opts, status); 2340 } else { 2341 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2342 Constructor<? extends Test> constructor = 2343 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2344 t = constructor.newInstance(con, opts, status); 2345 } 2346 } catch (NoSuchMethodException e) { 2347 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2348 + ". It does not provide a constructor as described by " 2349 + "the javadoc comment. Available constructors are: " 2350 + Arrays.toString(cmd.getConstructors())); 2351 } catch (Exception e) { 2352 throw new IllegalStateException("Failed to construct command class", e); 2353 } 2354 totalElapsedTime = t.test(); 2355 2356 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 2357 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + 2358 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2359 getAverageValueLength(opts), opts.families, opts.columns) + ")"); 2360 2361 return new RunResult(totalElapsedTime, t.getLatencyHistogram()); 2362 } 2363 2364 private static int getAverageValueLength(final TestOptions opts) { 2365 return opts.valueRandom? opts.valueSize/2: opts.valueSize; 2366 } 2367 2368 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException, 2369 InterruptedException, ClassNotFoundException, ExecutionException { 2370 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2371 // the TestOptions introspection for us and dump the output in a readable format. 2372 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2373 Admin admin = null; 2374 Connection connection = null; 2375 try { 2376 connection = ConnectionFactory.createConnection(getConf()); 2377 admin = connection.getAdmin(); 2378 checkTable(admin, opts); 2379 } finally { 2380 if (admin != null) admin.close(); 2381 if (connection != null) connection.close(); 2382 } 2383 if (opts.nomapred) { 2384 doLocalClients(opts, getConf()); 2385 } else { 2386 doMapReduce(opts, getConf()); 2387 } 2388 } 2389 2390 protected void printUsage() { 2391 printUsage(PE_COMMAND_SHORTNAME, null); 2392 } 2393 2394 protected static void printUsage(final String message) { 2395 printUsage(PE_COMMAND_SHORTNAME, message); 2396 } 2397 2398 protected static void printUsageAndExit(final String message, final int exitCode) { 2399 printUsage(message); 2400 System.exit(exitCode); 2401 } 2402 2403 protected static void printUsage(final String shortName, final String message) { 2404 if (message != null && message.length() > 0) { 2405 System.err.println(message); 2406 } 2407 System.err.print("Usage: hbase " + shortName); 2408 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2409 System.err.println(); 2410 System.err.println("General Options:"); 2411 System.err.println(" nomapred Run multiple clients using threads " + 2412 "(rather than use mapreduce)"); 2413 System.err.println(" oneCon all the threads share the same connection. Default: False"); 2414 System.err.println(" connCount connections all threads share. " 2415 + "For example, if set to 2, then all thread share 2 connection. " 2416 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2417 + "if not, connCount=thread number"); 2418 2419 System.err.println(" sampleRate Execute test on a sample of total " + 2420 "rows. Only supported by randomRead. Default: 1.0"); 2421 System.err.println(" period Report every 'period' rows: " + 2422 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10); 2423 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2424 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + 2425 "Default: 0"); 2426 System.err.println(" latency Set to report operation latencies. Default: False"); 2427 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" + 2428 " rows have been treated. Default: 0"); 2429 System.err.println(" valueSize Pass value size to use: Default: " 2430 + DEFAULT_OPTS.getValueSize()); 2431 System.err.println(" valueRandom Set if we should vary value size between 0 and " + 2432 "'valueSize'; set on read for stats on size: Default: Not set."); 2433 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2434 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2435 System.err.println(); 2436 System.err.println("Table Creation / Write Tests:"); 2437 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2438 System.err.println(" rows Rows each client runs. Default: " 2439 + DEFAULT_OPTS.getPerClientRunRows() 2440 + ". In case of randomReads and randomSeekScans this could" 2441 + " be specified along with --size to specify the number of rows to be scanned within" 2442 + " the total range specified by the size."); 2443 System.err.println( 2444 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2445 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2446 + " use size to specify the end range and --rows" 2447 + " specifies the number of rows within that range. " + "Default: 1.0."); 2448 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2449 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 2450 "Default: false"); 2451 System.err.println(" valueZipf Set if we should vary value size between 0 and " + 2452 "'valueSize' in zipf form: Default: Not set."); 2453 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2454 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2455 System.err.println(" multiPut Batch puts together into groups of N. Only supported " + 2456 "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2457 System.err.println(" presplit Create presplit table. If a table with same name exists," 2458 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2459 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2460 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 2461 "Default: false"); 2462 System.err.println(" numoftags Specify the no of tags that would be needed. " + 2463 "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2464 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2465 System.err.println(" columns Columns to write per row. Default: 1"); 2466 System.err.println(" families Specify number of column families for the table. Default: 1"); 2467 System.err.println(); 2468 System.err.println("Read Tests:"); 2469 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2470 + " there by not returning any thing back to the client. Helps to check the server side" 2471 + " performance. Uses FilterAllFilter internally. "); 2472 System.err.println(" multiGet Batch gets together into groups of N. Only supported " + 2473 "by randomRead. Default: disabled"); 2474 System.err.println(" inmemory Tries to keep the HFiles of the CF " + 2475 "inmemory as far as possible. Not guaranteed that reads are always served " + 2476 "from memory. Default: false"); 2477 System.err.println(" bloomFilter Bloom filter type, one of " 2478 + Arrays.toString(BloomType.values())); 2479 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2480 System.err.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2481 + "Uses the CompactingMemstore"); 2482 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2483 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2484 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2485 System.err.println(" caching Scan caching to use. Default: 30"); 2486 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2487 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2488 System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2489 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2490 System.err.println(); 2491 System.err.println(" Note: -D properties will be applied to the conf used. "); 2492 System.err.println(" For example: "); 2493 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2494 System.err.println(" -Dmapreduce.task.timeout=60000"); 2495 System.err.println(); 2496 System.err.println("Command:"); 2497 for (CmdDescriptor command : COMMANDS.values()) { 2498 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2499 } 2500 System.err.println(); 2501 System.err.println("Args:"); 2502 System.err.println(" nclients Integer. Required. Total number of clients " 2503 + "(and HRegionServers) running. 1 <= value <= 500"); 2504 System.err.println("Examples:"); 2505 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2506 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2507 System.err.println(" To run 10 clients doing increments over ten rows:"); 2508 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2509 } 2510 2511 /** 2512 * Parse options passed in via an arguments array. Assumes that array has been split 2513 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain 2514 * in the queue at the conclusion of this method call. It's up to the caller to deal 2515 * with these unrecognized arguments. 2516 */ 2517 static TestOptions parseOpts(Queue<String> args) { 2518 TestOptions opts = new TestOptions(); 2519 2520 String cmd = null; 2521 while ((cmd = args.poll()) != null) { 2522 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2523 // place item back onto queue so that caller knows parsing was incomplete 2524 args.add(cmd); 2525 break; 2526 } 2527 2528 final String nmr = "--nomapred"; 2529 if (cmd.startsWith(nmr)) { 2530 opts.nomapred = true; 2531 continue; 2532 } 2533 2534 final String rows = "--rows="; 2535 if (cmd.startsWith(rows)) { 2536 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2537 continue; 2538 } 2539 2540 final String cycles = "--cycles="; 2541 if (cmd.startsWith(cycles)) { 2542 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2543 continue; 2544 } 2545 2546 final String sampleRate = "--sampleRate="; 2547 if (cmd.startsWith(sampleRate)) { 2548 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2549 continue; 2550 } 2551 2552 final String table = "--table="; 2553 if (cmd.startsWith(table)) { 2554 opts.tableName = cmd.substring(table.length()); 2555 continue; 2556 } 2557 2558 final String startRow = "--startRow="; 2559 if (cmd.startsWith(startRow)) { 2560 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2561 continue; 2562 } 2563 2564 final String compress = "--compress="; 2565 if (cmd.startsWith(compress)) { 2566 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2567 continue; 2568 } 2569 2570 final String traceRate = "--traceRate="; 2571 if (cmd.startsWith(traceRate)) { 2572 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2573 continue; 2574 } 2575 2576 final String blockEncoding = "--blockEncoding="; 2577 if (cmd.startsWith(blockEncoding)) { 2578 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2579 continue; 2580 } 2581 2582 final String flushCommits = "--flushCommits="; 2583 if (cmd.startsWith(flushCommits)) { 2584 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2585 continue; 2586 } 2587 2588 final String writeToWAL = "--writeToWAL="; 2589 if (cmd.startsWith(writeToWAL)) { 2590 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2591 continue; 2592 } 2593 2594 final String presplit = "--presplit="; 2595 if (cmd.startsWith(presplit)) { 2596 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2597 continue; 2598 } 2599 2600 final String inMemory = "--inmemory="; 2601 if (cmd.startsWith(inMemory)) { 2602 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2603 continue; 2604 } 2605 2606 final String autoFlush = "--autoFlush="; 2607 if (cmd.startsWith(autoFlush)) { 2608 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2609 if (!opts.autoFlush && opts.multiPut > 0) { 2610 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 2611 } 2612 continue; 2613 } 2614 2615 final String onceCon = "--oneCon="; 2616 if (cmd.startsWith(onceCon)) { 2617 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2618 if (opts.oneCon && opts.connCount > 1) { 2619 throw new IllegalArgumentException("oneCon is set to true, " 2620 + "connCount should not bigger than 1"); 2621 } 2622 continue; 2623 } 2624 2625 final String connCount = "--connCount="; 2626 if (cmd.startsWith(connCount)) { 2627 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2628 if (opts.oneCon && opts.connCount > 1) { 2629 throw new IllegalArgumentException("oneCon is set to true, " 2630 + "connCount should not bigger than 1"); 2631 } 2632 continue; 2633 } 2634 2635 final String latency = "--latency"; 2636 if (cmd.startsWith(latency)) { 2637 opts.reportLatency = true; 2638 continue; 2639 } 2640 2641 final String multiGet = "--multiGet="; 2642 if (cmd.startsWith(multiGet)) { 2643 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2644 continue; 2645 } 2646 2647 final String multiPut = "--multiPut="; 2648 if (cmd.startsWith(multiPut)) { 2649 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2650 if (!opts.autoFlush && opts.multiPut > 0) { 2651 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 2652 } 2653 continue; 2654 } 2655 2656 final String useTags = "--usetags="; 2657 if (cmd.startsWith(useTags)) { 2658 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2659 continue; 2660 } 2661 2662 final String noOfTags = "--numoftags="; 2663 if (cmd.startsWith(noOfTags)) { 2664 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2665 continue; 2666 } 2667 2668 final String replicas = "--replicas="; 2669 if (cmd.startsWith(replicas)) { 2670 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2671 continue; 2672 } 2673 2674 final String filterOutAll = "--filterAll"; 2675 if (cmd.startsWith(filterOutAll)) { 2676 opts.filterAll = true; 2677 continue; 2678 } 2679 2680 final String size = "--size="; 2681 if (cmd.startsWith(size)) { 2682 opts.size = Float.parseFloat(cmd.substring(size.length())); 2683 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2684 continue; 2685 } 2686 2687 final String splitPolicy = "--splitPolicy="; 2688 if (cmd.startsWith(splitPolicy)) { 2689 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2690 continue; 2691 } 2692 2693 final String randomSleep = "--randomSleep="; 2694 if (cmd.startsWith(randomSleep)) { 2695 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2696 continue; 2697 } 2698 2699 final String measureAfter = "--measureAfter="; 2700 if (cmd.startsWith(measureAfter)) { 2701 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2702 continue; 2703 } 2704 2705 final String bloomFilter = "--bloomFilter="; 2706 if (cmd.startsWith(bloomFilter)) { 2707 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2708 continue; 2709 } 2710 2711 final String blockSize = "--blockSize="; 2712 if(cmd.startsWith(blockSize) ) { 2713 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2714 continue; 2715 } 2716 2717 final String valueSize = "--valueSize="; 2718 if (cmd.startsWith(valueSize)) { 2719 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2720 continue; 2721 } 2722 2723 final String valueRandom = "--valueRandom"; 2724 if (cmd.startsWith(valueRandom)) { 2725 opts.valueRandom = true; 2726 if (opts.valueZipf) { 2727 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2728 } 2729 continue; 2730 } 2731 2732 final String valueZipf = "--valueZipf"; 2733 if (cmd.startsWith(valueZipf)) { 2734 opts.valueZipf = true; 2735 if (opts.valueRandom) { 2736 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 2737 } 2738 continue; 2739 } 2740 2741 final String period = "--period="; 2742 if (cmd.startsWith(period)) { 2743 opts.period = Integer.parseInt(cmd.substring(period.length())); 2744 continue; 2745 } 2746 2747 final String addColumns = "--addColumns="; 2748 if (cmd.startsWith(addColumns)) { 2749 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2750 continue; 2751 } 2752 2753 final String inMemoryCompaction = "--inmemoryCompaction="; 2754 if (cmd.startsWith(inMemoryCompaction)) { 2755 opts.inMemoryCompaction = 2756 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2757 continue; 2758 } 2759 2760 final String columns = "--columns="; 2761 if (cmd.startsWith(columns)) { 2762 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2763 continue; 2764 } 2765 2766 final String families = "--families="; 2767 if (cmd.startsWith(families)) { 2768 opts.families = Integer.parseInt(cmd.substring(families.length())); 2769 continue; 2770 } 2771 2772 final String caching = "--caching="; 2773 if (cmd.startsWith(caching)) { 2774 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2775 continue; 2776 } 2777 2778 final String asyncPrefetch = "--asyncPrefetch"; 2779 if (cmd.startsWith(asyncPrefetch)) { 2780 opts.asyncPrefetch = true; 2781 continue; 2782 } 2783 2784 final String cacheBlocks = "--cacheBlocks="; 2785 if (cmd.startsWith(cacheBlocks)) { 2786 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2787 continue; 2788 } 2789 2790 final String scanReadType = "--scanReadType="; 2791 if (cmd.startsWith(scanReadType)) { 2792 opts.scanReadType = 2793 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2794 continue; 2795 } 2796 2797 final String bufferSize = "--bufferSize="; 2798 if (cmd.startsWith(bufferSize)) { 2799 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2800 continue; 2801 } 2802 2803 if (isCommandClass(cmd)) { 2804 opts.cmdName = cmd; 2805 try { 2806 opts.numClientThreads = Integer.parseInt(args.remove()); 2807 } catch (NoSuchElementException | NumberFormatException e) { 2808 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2809 } 2810 opts = calculateRowsAndSize(opts); 2811 break; 2812 } else { 2813 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2814 } 2815 2816 // Not matching any option or command. 2817 System.err.println("Error: Wrong option or command: " + cmd); 2818 args.add(cmd); 2819 break; 2820 } 2821 return opts; 2822 } 2823 2824 static TestOptions calculateRowsAndSize(final TestOptions opts) { 2825 int rowsPerGB = getRowsPerGB(opts); 2826 if ((opts.getCmdName() != null 2827 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 2828 && opts.size != DEFAULT_OPTS.size 2829 && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { 2830 opts.totalRows = (int) opts.size * rowsPerGB; 2831 } else if (opts.size != DEFAULT_OPTS.size) { 2832 // total size in GB specified 2833 opts.totalRows = (int) opts.size * rowsPerGB; 2834 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 2835 } else { 2836 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 2837 opts.size = opts.totalRows / rowsPerGB; 2838 } 2839 return opts; 2840 } 2841 2842 static int getRowsPerGB(final TestOptions opts) { 2843 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() * 2844 opts.getColumns()); 2845 } 2846 2847 @Override 2848 public int run(String[] args) throws Exception { 2849 // Process command-line args. TODO: Better cmd-line processing 2850 // (but hopefully something not as painful as cli options). 2851 int errCode = -1; 2852 if (args.length < 1) { 2853 printUsage(); 2854 return errCode; 2855 } 2856 2857 try { 2858 LinkedList<String> argv = new LinkedList<>(); 2859 argv.addAll(Arrays.asList(args)); 2860 TestOptions opts = parseOpts(argv); 2861 2862 // args remaining, print help and exit 2863 if (!argv.isEmpty()) { 2864 errCode = 0; 2865 printUsage(); 2866 return errCode; 2867 } 2868 2869 // must run at least 1 client 2870 if (opts.numClientThreads <= 0) { 2871 throw new IllegalArgumentException("Number of clients must be > 0"); 2872 } 2873 2874 // cmdName should not be null, print help and exit 2875 if (opts.cmdName == null) { 2876 printUsage(); 2877 return errCode; 2878 } 2879 2880 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 2881 if (cmdClass != null) { 2882 runTest(cmdClass, opts); 2883 errCode = 0; 2884 } 2885 2886 } catch (Exception e) { 2887 e.printStackTrace(); 2888 } 2889 2890 return errCode; 2891 } 2892 2893 private static boolean isCommandClass(String cmd) { 2894 return COMMANDS.containsKey(cmd); 2895 } 2896 2897 private static Class<? extends TestBase> determineCommandClass(String cmd) { 2898 CmdDescriptor descriptor = COMMANDS.get(cmd); 2899 return descriptor != null ? descriptor.getCmdClass() : null; 2900 } 2901 2902 public static void main(final String[] args) throws Exception { 2903 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 2904 System.exit(res); 2905 } 2906}