001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import com.codahale.metrics.Histogram; 022import com.codahale.metrics.UniformReservoir; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.lang.reflect.Constructor; 026import java.math.BigDecimal; 027import java.math.MathContext; 028import java.text.DecimalFormat; 029import java.text.SimpleDateFormat; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Date; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.NoSuchElementException; 038import java.util.Queue; 039import java.util.Random; 040import java.util.TreeMap; 041import java.util.concurrent.Callable; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Executors; 045import java.util.concurrent.Future; 046import org.apache.commons.lang3.StringUtils; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.conf.Configured; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.client.Admin; 052import org.apache.hadoop.hbase.client.Append; 053import org.apache.hadoop.hbase.client.AsyncConnection; 054import org.apache.hadoop.hbase.client.AsyncTable; 055import org.apache.hadoop.hbase.client.BufferedMutator; 056import org.apache.hadoop.hbase.client.BufferedMutatorParams; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.Consistency; 060import org.apache.hadoop.hbase.client.Delete; 061import org.apache.hadoop.hbase.client.Durability; 062import org.apache.hadoop.hbase.client.Get; 063import org.apache.hadoop.hbase.client.Increment; 064import org.apache.hadoop.hbase.client.Put; 065import org.apache.hadoop.hbase.client.RegionInfo; 066import org.apache.hadoop.hbase.client.RegionInfoBuilder; 067import org.apache.hadoop.hbase.client.RegionLocator; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.ResultScanner; 070import org.apache.hadoop.hbase.client.RowMutations; 071import org.apache.hadoop.hbase.client.Scan; 072import org.apache.hadoop.hbase.client.Table; 073import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 074import org.apache.hadoop.hbase.filter.BinaryComparator; 075import org.apache.hadoop.hbase.filter.Filter; 076import org.apache.hadoop.hbase.filter.FilterAllFilter; 077import org.apache.hadoop.hbase.filter.FilterList; 078import org.apache.hadoop.hbase.filter.PageFilter; 079import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 080import org.apache.hadoop.hbase.filter.WhileMatchFilter; 081import org.apache.hadoop.hbase.io.compress.Compression; 082import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 083import org.apache.hadoop.hbase.io.hfile.RandomDistribution; 084import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 085import org.apache.hadoop.hbase.regionserver.BloomType; 086import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 087import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; 088import org.apache.hadoop.hbase.trace.SpanReceiverHost; 089import org.apache.hadoop.hbase.trace.TraceUtil; 090import org.apache.hadoop.hbase.util.ByteArrayHashKey; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.GsonUtil; 093import org.apache.hadoop.hbase.util.Hash; 094import org.apache.hadoop.hbase.util.MurmurHash; 095import org.apache.hadoop.hbase.util.Pair; 096import org.apache.hadoop.hbase.util.YammerHistogramUtils; 097import org.apache.hadoop.io.LongWritable; 098import org.apache.hadoop.io.Text; 099import org.apache.hadoop.mapreduce.Job; 100import org.apache.hadoop.mapreduce.Mapper; 101import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 102import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 103import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 104import org.apache.hadoop.util.Tool; 105import org.apache.hadoop.util.ToolRunner; 106import org.apache.htrace.core.ProbabilitySampler; 107import org.apache.htrace.core.Sampler; 108import org.apache.htrace.core.TraceScope; 109import org.apache.yetus.audience.InterfaceAudience; 110import org.slf4j.Logger; 111import org.slf4j.LoggerFactory; 112 113import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 114import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 115import org.apache.hbase.thirdparty.com.google.gson.Gson; 116 117/** 118 * Script used evaluating HBase performance and scalability. Runs a HBase 119 * client that steps through one of a set of hardcoded tests or 'experiments' 120 * (e.g. a random reads test, a random writes test, etc.). Pass on the 121 * command-line which test to run and how many clients are participating in 122 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 123 * 124 * <p>This class sets up and runs the evaluation programs described in 125 * Section 7, <i>Performance Evaluation</i>, of the <a 126 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a> 127 * paper, pages 8-10. 128 * 129 * <p>By default, runs as a mapreduce job where each mapper runs a single test 130 * client. Can also run as a non-mapreduce, multithreaded application by 131 * specifying {@code --nomapred}. Each client does about 1GB of data, unless 132 * specified otherwise. 133 */ 134@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 135public class PerformanceEvaluation extends Configured implements Tool { 136 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 137 static final String RANDOM_READ = "randomRead"; 138 static final String PE_COMMAND_SHORTNAME = "pe"; 139 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 140 private static final Gson GSON = GsonUtil.createGson().create(); 141 142 public static final String TABLE_NAME = "TestTable"; 143 public static final String FAMILY_NAME_BASE = "info"; 144 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 145 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 146 public static final int DEFAULT_VALUE_LENGTH = 1000; 147 public static final int ROW_LENGTH = 26; 148 149 private static final int ONE_GB = 1024 * 1024 * 1000; 150 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 151 // TODO : should we make this configurable 152 private static final int TAG_LENGTH = 256; 153 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 154 private static final MathContext CXT = MathContext.DECIMAL64; 155 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 156 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 157 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 158 159 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 160 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 161 162 static { 163 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 164 "Run async random read test"); 165 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 166 "Run async random write test"); 167 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 168 "Run async sequential read test"); 169 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 170 "Run async sequential write test"); 171 addCommandDescriptor(AsyncScanTest.class, "asyncScan", 172 "Run async scan test (read every row)"); 173 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 174 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", 175 "Run getRegionLocation test"); 176 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 177 "Run random seek and scan 100 test"); 178 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 179 "Run random seek scan with both start and stop row (max 10 rows)"); 180 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 181 "Run random seek scan with both start and stop row (max 100 rows)"); 182 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 183 "Run random seek scan with both start and stop row (max 1000 rows)"); 184 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 185 "Run random seek scan with both start and stop row (max 10000 rows)"); 186 addCommandDescriptor(RandomWriteTest.class, "randomWrite", 187 "Run random write test"); 188 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", 189 "Run sequential read test"); 190 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", 191 "Run sequential write test"); 192 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 193 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 194 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 195 addCommandDescriptor(FilteredScanTest.class, "filterScan", 196 "Run scan test using a filter to find a specific row based on it's value " + 197 "(make sure to use --rows=20)"); 198 addCommandDescriptor(IncrementTest.class, "increment", 199 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 200 addCommandDescriptor(AppendTest.class, "append", 201 "Append on each row; clients overlap on keyspace so some concurrent operations"); 202 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 203 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 204 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 205 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 206 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 207 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 208 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 209 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 210 } 211 212 /** 213 * Enum for map metrics. Keep it out here rather than inside in the Map 214 * inner-class so we can find associated properties. 215 */ 216 protected static enum Counter { 217 /** elapsed time */ 218 ELAPSED_TIME, 219 /** number of rows */ 220 ROWS 221 } 222 223 protected static class RunResult implements Comparable<RunResult> { 224 public RunResult(long duration, Histogram hist) { 225 this.duration = duration; 226 this.hist = hist; 227 numbOfReplyOverThreshold = 0; 228 numOfReplyFromReplica = 0; 229 } 230 231 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 232 Histogram hist) { 233 this.duration = duration; 234 this.hist = hist; 235 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 236 this.numOfReplyFromReplica = numOfReplyFromReplica; 237 } 238 239 public final long duration; 240 public final Histogram hist; 241 public final long numbOfReplyOverThreshold; 242 public final long numOfReplyFromReplica; 243 244 @Override 245 public String toString() { 246 return Long.toString(duration); 247 } 248 249 @Override public int compareTo(RunResult o) { 250 return Long.compare(this.duration, o.duration); 251 } 252 } 253 254 /** 255 * Constructor 256 * @param conf Configuration object 257 */ 258 public PerformanceEvaluation(final Configuration conf) { 259 super(conf); 260 } 261 262 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, 263 String name, String description) { 264 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 265 COMMANDS.put(name, cmdDescriptor); 266 } 267 268 /** 269 * Implementations can have their status set. 270 */ 271 interface Status { 272 /** 273 * Sets status 274 * @param msg status message 275 * @throws IOException 276 */ 277 void setStatus(final String msg) throws IOException; 278 } 279 280 /** 281 * MapReduce job that runs a performance evaluation client in each map task. 282 */ 283 public static class EvaluationMapTask 284 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 285 286 /** configuration parameter name that contains the command */ 287 public final static String CMD_KEY = "EvaluationMapTask.command"; 288 /** configuration parameter name that contains the PE impl */ 289 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 290 291 private Class<? extends Test> cmd; 292 293 @Override 294 protected void setup(Context context) throws IOException, InterruptedException { 295 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 296 297 // this is required so that extensions of PE are instantiated within the 298 // map reduce task... 299 Class<? extends PerformanceEvaluation> peClass = 300 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 301 try { 302 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 303 } catch (Exception e) { 304 throw new IllegalStateException("Could not instantiate PE instance", e); 305 } 306 } 307 308 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 309 try { 310 return Class.forName(className).asSubclass(type); 311 } catch (ClassNotFoundException e) { 312 throw new IllegalStateException("Could not find class for name: " + className, e); 313 } 314 } 315 316 @Override 317 protected void map(LongWritable key, Text value, final Context context) 318 throws IOException, InterruptedException { 319 320 Status status = new Status() { 321 @Override 322 public void setStatus(String msg) { 323 context.setStatus(msg); 324 } 325 }; 326 327 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 328 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 329 final Connection con = ConnectionFactory.createConnection(conf); 330 AsyncConnection asyncCon = null; 331 try { 332 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 333 } catch (ExecutionException e) { 334 throw new IOException(e); 335 } 336 337 // Evaluation task 338 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 339 // Collect how much time the thing took. Report as map output and 340 // to the ELAPSED_TIME counter. 341 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 342 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 343 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 344 context.progress(); 345 } 346 } 347 348 /* 349 * If table does not already exist, create. Also create a table when 350 * {@code opts.presplitRegions} is specified or when the existing table's 351 * region replica count doesn't match {@code opts.replicas}. 352 */ 353 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 354 TableName tableName = TableName.valueOf(opts.tableName); 355 boolean needsDelete = false, exists = admin.tableExists(tableName); 356 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 357 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 358 if (!exists && isReadCmd) { 359 throw new IllegalStateException( 360 "Must specify an existing table for read commands. Run a write command first."); 361 } 362 HTableDescriptor desc = 363 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; 364 byte[][] splits = getSplits(opts); 365 366 // recreate the table when user has requested presplit or when existing 367 // {RegionSplitPolicy,replica count} does not match requested, or when the 368 // number of column families does not match requested. 369 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 370 || (!isReadCmd && desc != null && 371 !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 372 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 373 || (desc != null && desc.getColumnFamilyCount() != opts.families)) { 374 needsDelete = true; 375 // wait, why did it delete my table?!? 376 LOG.debug(MoreObjects.toStringHelper("needsDelete") 377 .add("needsDelete", needsDelete) 378 .add("isReadCmd", isReadCmd) 379 .add("exists", exists) 380 .add("desc", desc) 381 .add("presplit", opts.presplitRegions) 382 .add("splitPolicy", opts.splitPolicy) 383 .add("replicas", opts.replicas) 384 .add("families", opts.families) 385 .toString()); 386 } 387 388 // remove an existing table 389 if (needsDelete) { 390 if (admin.isTableEnabled(tableName)) { 391 admin.disableTable(tableName); 392 } 393 admin.deleteTable(tableName); 394 } 395 396 // table creation is necessary 397 if (!exists || needsDelete) { 398 desc = getTableDescriptor(opts); 399 if (splits != null) { 400 if (LOG.isDebugEnabled()) { 401 for (int i = 0; i < splits.length; i++) { 402 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 403 } 404 } 405 } 406 admin.createTable(desc, splits); 407 LOG.info("Table " + desc + " created"); 408 } 409 return admin.tableExists(tableName); 410 } 411 412 /** 413 * Create an HTableDescriptor from provided TestOptions. 414 */ 415 protected static HTableDescriptor getTableDescriptor(TestOptions opts) { 416 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName)); 417 for (int family = 0; family < opts.families; family++) { 418 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 419 HColumnDescriptor familyDesc = new HColumnDescriptor(familyName); 420 familyDesc.setDataBlockEncoding(opts.blockEncoding); 421 familyDesc.setCompressionType(opts.compression); 422 familyDesc.setBloomFilterType(opts.bloomType); 423 familyDesc.setBlocksize(opts.blockSize); 424 if (opts.inMemoryCF) { 425 familyDesc.setInMemory(true); 426 } 427 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction); 428 tableDesc.addFamily(familyDesc); 429 } 430 if (opts.replicas != DEFAULT_OPTS.replicas) { 431 tableDesc.setRegionReplication(opts.replicas); 432 } 433 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 434 tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy); 435 } 436 return tableDesc; 437 } 438 439 /** 440 * generates splits based on total number of rows and specified split regions 441 */ 442 protected static byte[][] getSplits(TestOptions opts) { 443 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) 444 return null; 445 446 int numSplitPoints = opts.presplitRegions - 1; 447 byte[][] splits = new byte[numSplitPoints][]; 448 int jump = opts.totalRows / opts.presplitRegions; 449 for (int i = 0; i < numSplitPoints; i++) { 450 int rowkey = jump * (1 + i); 451 splits[i] = format(rowkey); 452 } 453 return splits; 454 } 455 456 static void setupConnectionCount(final TestOptions opts) { 457 if (opts.oneCon) { 458 opts.connCount = 1; 459 } else { 460 if (opts.connCount == -1) { 461 // set to thread number if connCount is not set 462 opts.connCount = opts.numClientThreads; 463 } 464 } 465 } 466 467 /* 468 * Run all clients in this vm each to its own thread. 469 */ 470 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 471 throws IOException, InterruptedException, ExecutionException { 472 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 473 assert cmd != null; 474 @SuppressWarnings("unchecked") 475 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 476 RunResult[] results = new RunResult[opts.numClientThreads]; 477 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 478 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 479 setupConnectionCount(opts); 480 final Connection[] cons = new Connection[opts.connCount]; 481 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 482 for (int i = 0; i < opts.connCount; i++) { 483 cons[i] = ConnectionFactory.createConnection(conf); 484 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 485 } 486 LOG.info("Created " + opts.connCount + " connections for " + 487 opts.numClientThreads + " threads"); 488 for (int i = 0; i < threads.length; i++) { 489 final int index = i; 490 threads[i] = pool.submit(new Callable<RunResult>() { 491 @Override 492 public RunResult call() throws Exception { 493 TestOptions threadOpts = new TestOptions(opts); 494 final Connection con = cons[index % cons.length]; 495 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 496 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 497 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 498 @Override 499 public void setStatus(final String msg) throws IOException { 500 LOG.info(msg); 501 } 502 }); 503 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration + 504 "ms over " + threadOpts.perClientRunRows + " rows"); 505 if (opts.latencyThreshold > 0) { 506 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold + 507 "(ms) is " + run.numbOfReplyOverThreshold); 508 } 509 return run; 510 } 511 }); 512 } 513 pool.shutdown(); 514 515 for (int i = 0; i < threads.length; i++) { 516 try { 517 results[i] = threads[i].get(); 518 } catch (ExecutionException e) { 519 throw new IOException(e.getCause()); 520 } 521 } 522 final String test = cmd.getSimpleName(); 523 LOG.info("[" + test + "] Summary of timings (ms): " 524 + Arrays.toString(results)); 525 Arrays.sort(results); 526 long total = 0; 527 float avgLatency = 0 ; 528 float avgTPS = 0; 529 long replicaWins = 0; 530 for (RunResult result : results) { 531 total += result.duration; 532 avgLatency += result.hist.getSnapshot().getMean(); 533 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 534 replicaWins += result.numOfReplyFromReplica; 535 } 536 avgTPS *= 1000; // ms to second 537 avgLatency = avgLatency / results.length; 538 LOG.info("[" + test + " duration ]" 539 + "\tMin: " + results[0] + "ms" 540 + "\tMax: " + results[results.length - 1] + "ms" 541 + "\tAvg: " + (total / results.length) + "ms"); 542 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 543 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 544 if (opts.replicas > 1) { 545 LOG.info("[results from replica regions] " + replicaWins); 546 } 547 548 for (int i = 0; i < opts.connCount; i++) { 549 cons[i].close(); 550 asyncCons[i].close(); 551 } 552 553 return results; 554 } 555 556 /* 557 * Run a mapreduce job. Run as many maps as asked-for clients. 558 * Before we start up the job, write out an input file with instruction 559 * per client regards which row they are to start on. 560 * @param cmd Command to run. 561 * @throws IOException 562 */ 563 static Job doMapReduce(TestOptions opts, final Configuration conf) 564 throws IOException, InterruptedException, ClassNotFoundException { 565 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 566 assert cmd != null; 567 Path inputDir = writeInputFile(conf, opts); 568 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 569 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 570 Job job = Job.getInstance(conf); 571 job.setJarByClass(PerformanceEvaluation.class); 572 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 573 574 job.setInputFormatClass(NLineInputFormat.class); 575 NLineInputFormat.setInputPaths(job, inputDir); 576 // this is default, but be explicit about it just in case. 577 NLineInputFormat.setNumLinesPerSplit(job, 1); 578 579 job.setOutputKeyClass(LongWritable.class); 580 job.setOutputValueClass(LongWritable.class); 581 582 job.setMapperClass(EvaluationMapTask.class); 583 job.setReducerClass(LongSumReducer.class); 584 585 job.setNumReduceTasks(1); 586 587 job.setOutputFormatClass(TextOutputFormat.class); 588 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 589 590 TableMapReduceUtil.addDependencyJars(job); 591 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 592 Histogram.class, // yammer metrics 593 Gson.class, // gson 594 FilterAllFilter.class // hbase-server tests jar 595 ); 596 597 TableMapReduceUtil.initCredentials(job); 598 599 job.waitForCompletion(true); 600 return job; 601 } 602 603 /** 604 * Each client has one mapper to do the work, and client do the resulting count in a map task. 605 */ 606 607 static String JOB_INPUT_FILENAME = "input.txt"; 608 609 /* 610 * Write input file of offsets-per-client for the mapreduce job. 611 * @param c Configuration 612 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 613 * @throws IOException 614 */ 615 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 616 return writeInputFile(c, opts, new Path(".")); 617 } 618 619 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 620 throws IOException { 621 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 622 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 623 Path inputDir = new Path(jobdir, "inputs"); 624 625 FileSystem fs = FileSystem.get(c); 626 fs.mkdirs(inputDir); 627 628 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 629 PrintStream out = new PrintStream(fs.create(inputFile)); 630 // Make input random. 631 Map<Integer, String> m = new TreeMap<>(); 632 Hash h = MurmurHash.getInstance(); 633 int perClientRows = (opts.totalRows / opts.numClientThreads); 634 try { 635 for (int j = 0; j < opts.numClientThreads; j++) { 636 TestOptions next = new TestOptions(opts); 637 next.startRow = j * perClientRows; 638 next.perClientRunRows = perClientRows; 639 String s = GSON.toJson(next); 640 LOG.info("Client=" + j + ", input=" + s); 641 byte[] b = Bytes.toBytes(s); 642 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 643 m.put(hash, s); 644 } 645 for (Map.Entry<Integer, String> e: m.entrySet()) { 646 out.println(e.getValue()); 647 } 648 } finally { 649 out.close(); 650 } 651 return inputDir; 652 } 653 654 /** 655 * Describes a command. 656 */ 657 static class CmdDescriptor { 658 private Class<? extends TestBase> cmdClass; 659 private String name; 660 private String description; 661 662 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 663 this.cmdClass = cmdClass; 664 this.name = name; 665 this.description = description; 666 } 667 668 public Class<? extends TestBase> getCmdClass() { 669 return cmdClass; 670 } 671 672 public String getName() { 673 return name; 674 } 675 676 public String getDescription() { 677 return description; 678 } 679 } 680 681 /** 682 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. 683 * This makes tracking all these arguments a little easier. 684 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON 685 * serialization of this TestOptions class behave), and you need to add to the clone constructor 686 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below. 687 */ 688 static class TestOptions { 689 String cmdName = null; 690 boolean nomapred = false; 691 boolean filterAll = false; 692 int startRow = 0; 693 float size = 1.0f; 694 int perClientRunRows = DEFAULT_ROWS_PER_GB; 695 int numClientThreads = 1; 696 int totalRows = DEFAULT_ROWS_PER_GB; 697 int measureAfter = 0; 698 float sampleRate = 1.0f; 699 double traceRate = 0.0; 700 String tableName = TABLE_NAME; 701 boolean flushCommits = true; 702 boolean writeToWAL = true; 703 boolean autoFlush = false; 704 boolean oneCon = false; 705 int connCount = -1; //wil decide the actual num later 706 boolean useTags = false; 707 int noOfTags = 1; 708 boolean reportLatency = false; 709 int multiGet = 0; 710 int multiPut = 0; 711 int randomSleep = 0; 712 boolean inMemoryCF = false; 713 int presplitRegions = 0; 714 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; 715 String splitPolicy = null; 716 Compression.Algorithm compression = Compression.Algorithm.NONE; 717 BloomType bloomType = BloomType.ROW; 718 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 719 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 720 boolean valueRandom = false; 721 boolean valueZipf = false; 722 int valueSize = DEFAULT_VALUE_LENGTH; 723 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; 724 int cycles = 1; 725 int columns = 1; 726 int families = 1; 727 int caching = 30; 728 int latencyThreshold = 0; // in millsecond 729 boolean addColumns = true; 730 MemoryCompactionPolicy inMemoryCompaction = 731 MemoryCompactionPolicy.valueOf( 732 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 733 boolean asyncPrefetch = false; 734 boolean cacheBlocks = true; 735 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 736 long bufferSize = 2l * 1024l * 1024l; 737 738 public TestOptions() {} 739 740 /** 741 * Clone constructor. 742 * @param that Object to copy from. 743 */ 744 public TestOptions(TestOptions that) { 745 this.cmdName = that.cmdName; 746 this.cycles = that.cycles; 747 this.nomapred = that.nomapred; 748 this.startRow = that.startRow; 749 this.size = that.size; 750 this.perClientRunRows = that.perClientRunRows; 751 this.numClientThreads = that.numClientThreads; 752 this.totalRows = that.totalRows; 753 this.sampleRate = that.sampleRate; 754 this.traceRate = that.traceRate; 755 this.tableName = that.tableName; 756 this.flushCommits = that.flushCommits; 757 this.writeToWAL = that.writeToWAL; 758 this.autoFlush = that.autoFlush; 759 this.oneCon = that.oneCon; 760 this.connCount = that.connCount; 761 this.useTags = that.useTags; 762 this.noOfTags = that.noOfTags; 763 this.reportLatency = that.reportLatency; 764 this.latencyThreshold = that.latencyThreshold; 765 this.multiGet = that.multiGet; 766 this.multiPut = that.multiPut; 767 this.inMemoryCF = that.inMemoryCF; 768 this.presplitRegions = that.presplitRegions; 769 this.replicas = that.replicas; 770 this.splitPolicy = that.splitPolicy; 771 this.compression = that.compression; 772 this.blockEncoding = that.blockEncoding; 773 this.filterAll = that.filterAll; 774 this.bloomType = that.bloomType; 775 this.blockSize = that.blockSize; 776 this.valueRandom = that.valueRandom; 777 this.valueZipf = that.valueZipf; 778 this.valueSize = that.valueSize; 779 this.period = that.period; 780 this.randomSleep = that.randomSleep; 781 this.measureAfter = that.measureAfter; 782 this.addColumns = that.addColumns; 783 this.columns = that.columns; 784 this.families = that.families; 785 this.caching = that.caching; 786 this.inMemoryCompaction = that.inMemoryCompaction; 787 this.asyncPrefetch = that.asyncPrefetch; 788 this.cacheBlocks = that.cacheBlocks; 789 this.scanReadType = that.scanReadType; 790 this.bufferSize = that.bufferSize; 791 } 792 793 public int getCaching() { 794 return this.caching; 795 } 796 797 public void setCaching(final int caching) { 798 this.caching = caching; 799 } 800 801 public int getColumns() { 802 return this.columns; 803 } 804 805 public void setColumns(final int columns) { 806 this.columns = columns; 807 } 808 809 public int getFamilies() { 810 return this.families; 811 } 812 813 public void setFamilies(final int families) { 814 this.families = families; 815 } 816 817 public int getCycles() { 818 return this.cycles; 819 } 820 821 public void setCycles(final int cycles) { 822 this.cycles = cycles; 823 } 824 825 public boolean isValueZipf() { 826 return valueZipf; 827 } 828 829 public void setValueZipf(boolean valueZipf) { 830 this.valueZipf = valueZipf; 831 } 832 833 public String getCmdName() { 834 return cmdName; 835 } 836 837 public void setCmdName(String cmdName) { 838 this.cmdName = cmdName; 839 } 840 841 public int getRandomSleep() { 842 return randomSleep; 843 } 844 845 public void setRandomSleep(int randomSleep) { 846 this.randomSleep = randomSleep; 847 } 848 849 public int getReplicas() { 850 return replicas; 851 } 852 853 public void setReplicas(int replicas) { 854 this.replicas = replicas; 855 } 856 857 public String getSplitPolicy() { 858 return splitPolicy; 859 } 860 861 public void setSplitPolicy(String splitPolicy) { 862 this.splitPolicy = splitPolicy; 863 } 864 865 public void setNomapred(boolean nomapred) { 866 this.nomapred = nomapred; 867 } 868 869 public void setFilterAll(boolean filterAll) { 870 this.filterAll = filterAll; 871 } 872 873 public void setStartRow(int startRow) { 874 this.startRow = startRow; 875 } 876 877 public void setSize(float size) { 878 this.size = size; 879 } 880 881 public void setPerClientRunRows(int perClientRunRows) { 882 this.perClientRunRows = perClientRunRows; 883 } 884 885 public void setNumClientThreads(int numClientThreads) { 886 this.numClientThreads = numClientThreads; 887 } 888 889 public void setTotalRows(int totalRows) { 890 this.totalRows = totalRows; 891 } 892 893 public void setSampleRate(float sampleRate) { 894 this.sampleRate = sampleRate; 895 } 896 897 public void setTraceRate(double traceRate) { 898 this.traceRate = traceRate; 899 } 900 901 public void setTableName(String tableName) { 902 this.tableName = tableName; 903 } 904 905 public void setFlushCommits(boolean flushCommits) { 906 this.flushCommits = flushCommits; 907 } 908 909 public void setWriteToWAL(boolean writeToWAL) { 910 this.writeToWAL = writeToWAL; 911 } 912 913 public void setAutoFlush(boolean autoFlush) { 914 this.autoFlush = autoFlush; 915 } 916 917 public void setOneCon(boolean oneCon) { 918 this.oneCon = oneCon; 919 } 920 921 public int getConnCount() { 922 return connCount; 923 } 924 925 public void setConnCount(int connCount) { 926 this.connCount = connCount; 927 } 928 929 public void setUseTags(boolean useTags) { 930 this.useTags = useTags; 931 } 932 933 public void setNoOfTags(int noOfTags) { 934 this.noOfTags = noOfTags; 935 } 936 937 public void setReportLatency(boolean reportLatency) { 938 this.reportLatency = reportLatency; 939 } 940 941 public void setMultiGet(int multiGet) { 942 this.multiGet = multiGet; 943 } 944 945 public void setMultiPut(int multiPut) { 946 this.multiPut = multiPut; 947 } 948 949 public void setInMemoryCF(boolean inMemoryCF) { 950 this.inMemoryCF = inMemoryCF; 951 } 952 953 public void setPresplitRegions(int presplitRegions) { 954 this.presplitRegions = presplitRegions; 955 } 956 957 public void setCompression(Compression.Algorithm compression) { 958 this.compression = compression; 959 } 960 961 public void setBloomType(BloomType bloomType) { 962 this.bloomType = bloomType; 963 } 964 965 public void setBlockSize(int blockSize) { 966 this.blockSize = blockSize; 967 } 968 969 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 970 this.blockEncoding = blockEncoding; 971 } 972 973 public void setValueRandom(boolean valueRandom) { 974 this.valueRandom = valueRandom; 975 } 976 977 public void setValueSize(int valueSize) { 978 this.valueSize = valueSize; 979 } 980 981 public void setBufferSize(long bufferSize) { 982 this.bufferSize = bufferSize; 983 } 984 985 public void setPeriod(int period) { 986 this.period = period; 987 } 988 989 public boolean isNomapred() { 990 return nomapred; 991 } 992 993 public boolean isFilterAll() { 994 return filterAll; 995 } 996 997 public int getStartRow() { 998 return startRow; 999 } 1000 1001 public float getSize() { 1002 return size; 1003 } 1004 1005 public int getPerClientRunRows() { 1006 return perClientRunRows; 1007 } 1008 1009 public int getNumClientThreads() { 1010 return numClientThreads; 1011 } 1012 1013 public int getTotalRows() { 1014 return totalRows; 1015 } 1016 1017 public float getSampleRate() { 1018 return sampleRate; 1019 } 1020 1021 public double getTraceRate() { 1022 return traceRate; 1023 } 1024 1025 public String getTableName() { 1026 return tableName; 1027 } 1028 1029 public boolean isFlushCommits() { 1030 return flushCommits; 1031 } 1032 1033 public boolean isWriteToWAL() { 1034 return writeToWAL; 1035 } 1036 1037 public boolean isAutoFlush() { 1038 return autoFlush; 1039 } 1040 1041 public boolean isUseTags() { 1042 return useTags; 1043 } 1044 1045 public int getNoOfTags() { 1046 return noOfTags; 1047 } 1048 1049 public boolean isReportLatency() { 1050 return reportLatency; 1051 } 1052 1053 public int getMultiGet() { 1054 return multiGet; 1055 } 1056 1057 public int getMultiPut() { 1058 return multiPut; 1059 } 1060 1061 public boolean isInMemoryCF() { 1062 return inMemoryCF; 1063 } 1064 1065 public int getPresplitRegions() { 1066 return presplitRegions; 1067 } 1068 1069 public Compression.Algorithm getCompression() { 1070 return compression; 1071 } 1072 1073 public DataBlockEncoding getBlockEncoding() { 1074 return blockEncoding; 1075 } 1076 1077 public boolean isValueRandom() { 1078 return valueRandom; 1079 } 1080 1081 public int getValueSize() { 1082 return valueSize; 1083 } 1084 1085 public int getPeriod() { 1086 return period; 1087 } 1088 1089 public BloomType getBloomType() { 1090 return bloomType; 1091 } 1092 1093 public int getBlockSize() { 1094 return blockSize; 1095 } 1096 1097 public boolean isOneCon() { 1098 return oneCon; 1099 } 1100 1101 public int getMeasureAfter() { 1102 return measureAfter; 1103 } 1104 1105 public void setMeasureAfter(int measureAfter) { 1106 this.measureAfter = measureAfter; 1107 } 1108 1109 public boolean getAddColumns() { 1110 return addColumns; 1111 } 1112 1113 public void setAddColumns(boolean addColumns) { 1114 this.addColumns = addColumns; 1115 } 1116 1117 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1118 this.inMemoryCompaction = inMemoryCompaction; 1119 } 1120 1121 public MemoryCompactionPolicy getInMemoryCompaction() { 1122 return this.inMemoryCompaction; 1123 } 1124 1125 public long getBufferSize() { 1126 return this.bufferSize; 1127 } 1128 } 1129 1130 /* 1131 * A test. 1132 * Subclass to particularize what happens per row. 1133 */ 1134 static abstract class TestBase { 1135 // Below is make it so when Tests are all running in the one 1136 // jvm, that they each have a differently seeded Random. 1137 private static final Random randomSeed = new Random(System.currentTimeMillis()); 1138 1139 private static long nextRandomSeed() { 1140 return randomSeed.nextLong(); 1141 } 1142 private final int everyN; 1143 1144 protected final Random rand = new Random(nextRandomSeed()); 1145 protected final Configuration conf; 1146 protected final TestOptions opts; 1147 1148 private final Status status; 1149 private final Sampler traceSampler; 1150 private final SpanReceiverHost receiverHost; 1151 1152 private String testName; 1153 private Histogram latencyHistogram; 1154 private Histogram replicaLatencyHistogram; 1155 private Histogram valueSizeHistogram; 1156 private Histogram rpcCallsHistogram; 1157 private Histogram remoteRpcCallsHistogram; 1158 private Histogram millisBetweenNextHistogram; 1159 private Histogram regionsScannedHistogram; 1160 private Histogram bytesInResultsHistogram; 1161 private Histogram bytesInRemoteResultsHistogram; 1162 private RandomDistribution.Zipf zipf; 1163 private long numOfReplyOverLatencyThreshold = 0; 1164 private long numOfReplyFromReplica = 0; 1165 1166 /** 1167 * Note that all subclasses of this class must provide a public constructor 1168 * that has the exact same list of arguments. 1169 */ 1170 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1171 this.conf = conf; 1172 this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); 1173 this.opts = options; 1174 this.status = status; 1175 this.testName = this.getClass().getSimpleName(); 1176 if (options.traceRate >= 1.0) { 1177 this.traceSampler = Sampler.ALWAYS; 1178 } else if (options.traceRate > 0.0) { 1179 conf.setDouble("hbase.sampler.fraction", options.traceRate); 1180 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); 1181 } else { 1182 this.traceSampler = Sampler.NEVER; 1183 } 1184 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1185 if (options.isValueZipf()) { 1186 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1187 } 1188 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1189 } 1190 1191 int getValueLength(final Random r) { 1192 if (this.opts.isValueRandom()) { 1193 return r.nextInt(opts.valueSize); 1194 } else if (this.opts.isValueZipf()) { 1195 return Math.abs(this.zipf.nextInt()); 1196 } else { 1197 return opts.valueSize; 1198 } 1199 } 1200 1201 void updateValueSize(final Result [] rs) throws IOException { 1202 updateValueSize(rs, 0); 1203 } 1204 1205 void updateValueSize(final Result [] rs, final long latency) throws IOException { 1206 if (rs == null || (latency == 0)) return; 1207 for (Result r: rs) updateValueSize(r, latency); 1208 } 1209 1210 void updateValueSize(final Result r) throws IOException { 1211 updateValueSize(r, 0); 1212 } 1213 1214 void updateValueSize(final Result r, final long latency) throws IOException { 1215 if (r == null || (latency == 0)) return; 1216 int size = 0; 1217 // update replicaHistogram 1218 if (r.isStale()) { 1219 replicaLatencyHistogram.update(latency / 1000); 1220 numOfReplyFromReplica ++; 1221 } 1222 if (!isRandomValueSize()) return; 1223 1224 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1225 size += scanner.current().getValueLength(); 1226 } 1227 updateValueSize(size); 1228 } 1229 1230 void updateValueSize(final int valueSize) { 1231 if (!isRandomValueSize()) return; 1232 this.valueSizeHistogram.update(valueSize); 1233 } 1234 1235 void updateScanMetrics(final ScanMetrics metrics) { 1236 if (metrics == null) return; 1237 Map<String,Long> metricsMap = metrics.getMetricsMap(); 1238 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1239 if (rpcCalls != null) { 1240 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1241 } 1242 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1243 if (remoteRpcCalls != null) { 1244 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1245 } 1246 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1247 if (millisBetweenNext != null) { 1248 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1249 } 1250 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1251 if (regionsScanned != null) { 1252 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1253 } 1254 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1255 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1256 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1257 } 1258 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1259 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1260 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1261 } 1262 } 1263 1264 String generateStatus(final int sr, final int i, final int lr) { 1265 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() + 1266 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport()); 1267 } 1268 1269 boolean isRandomValueSize() { 1270 return opts.valueRandom; 1271 } 1272 1273 protected int getReportingPeriod() { 1274 return opts.period; 1275 } 1276 1277 /** 1278 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1279 */ 1280 public Histogram getLatencyHistogram() { 1281 return latencyHistogram; 1282 } 1283 1284 void testSetup() throws IOException { 1285 // test metrics 1286 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1287 // If it is a replica test, set up histogram for replica. 1288 if (opts.replicas > 1) { 1289 replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1290 } 1291 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1292 // scan metrics 1293 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1294 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1295 millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1296 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1297 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1298 bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1299 1300 onStartup(); 1301 } 1302 1303 abstract void onStartup() throws IOException; 1304 1305 void testTakedown() throws IOException { 1306 onTakedown(); 1307 // Print all stats for this thread continuously. 1308 // Synchronize on Test.class so different threads don't intermingle the 1309 // output. We can't use 'this' here because each thread has its own instance of Test class. 1310 synchronized (Test.class) { 1311 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1312 status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport( 1313 latencyHistogram)); 1314 if (opts.replicas > 1) { 1315 status.setStatus("Latency (us) from Replica Regions: " + 1316 YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1317 } 1318 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1319 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1320 if (valueSizeHistogram.getCount() > 0) { 1321 status.setStatus("ValueSize (bytes) : " 1322 + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1323 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1324 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1325 } else { 1326 status.setStatus("No valueSize statistics available"); 1327 } 1328 if (rpcCallsHistogram.getCount() > 0) { 1329 status.setStatus("rpcCalls (count): " + 1330 YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1331 } 1332 if (remoteRpcCallsHistogram.getCount() > 0) { 1333 status.setStatus("remoteRpcCalls (count): " + 1334 YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1335 } 1336 if (millisBetweenNextHistogram.getCount() > 0) { 1337 status.setStatus("millisBetweenNext (latency): " + 1338 YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1339 } 1340 if (regionsScannedHistogram.getCount() > 0) { 1341 status.setStatus("regionsScanned (count): " + 1342 YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1343 } 1344 if (bytesInResultsHistogram.getCount() > 0) { 1345 status.setStatus("bytesInResults (size): " + 1346 YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1347 } 1348 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1349 status.setStatus("bytesInRemoteResults (size): " + 1350 YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1351 } 1352 } 1353 receiverHost.closeReceivers(); 1354 } 1355 1356 abstract void onTakedown() throws IOException; 1357 1358 1359 /* 1360 * Run test 1361 * @return Elapsed time. 1362 * @throws IOException 1363 */ 1364 long test() throws IOException, InterruptedException { 1365 testSetup(); 1366 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1367 final long startTime = System.nanoTime(); 1368 try { 1369 testTimed(); 1370 } finally { 1371 testTakedown(); 1372 } 1373 return (System.nanoTime() - startTime) / 1000000; 1374 } 1375 1376 int getStartRow() { 1377 return opts.startRow; 1378 } 1379 1380 int getLastRow() { 1381 return getStartRow() + opts.perClientRunRows; 1382 } 1383 1384 /** 1385 * Provides an extension point for tests that don't want a per row invocation. 1386 */ 1387 void testTimed() throws IOException, InterruptedException { 1388 int startRow = getStartRow(); 1389 int lastRow = getLastRow(); 1390 TraceUtil.addSampler(traceSampler); 1391 // Report on completion of 1/10th of total. 1392 for (int ii = 0; ii < opts.cycles; ii++) { 1393 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1394 for (int i = startRow; i < lastRow; i++) { 1395 if (i % everyN != 0) continue; 1396 long startTime = System.nanoTime(); 1397 boolean requestSent = false; 1398 try (TraceScope scope = TraceUtil.createTrace("test row");){ 1399 requestSent = testRow(i, startTime); 1400 } 1401 if ( (i - startRow) > opts.measureAfter) { 1402 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1403 // first 9 times and sends the actual get request in the 10th iteration. 1404 // We should only set latency when actual request is sent because otherwise 1405 // it turns out to be 0. 1406 if (requestSent) { 1407 long latency = (System.nanoTime() - startTime) / 1000; 1408 latencyHistogram.update(latency); 1409 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1410 numOfReplyOverLatencyThreshold ++; 1411 } 1412 } 1413 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1414 status.setStatus(generateStatus(startRow, i, lastRow)); 1415 } 1416 } 1417 } 1418 } 1419 } 1420 1421 /** 1422 * @return Subset of the histograms' calculation. 1423 */ 1424 public String getShortLatencyReport() { 1425 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1426 } 1427 1428 /** 1429 * @return Subset of the histograms' calculation. 1430 */ 1431 public String getShortValueSizeReport() { 1432 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1433 } 1434 1435 1436 /** 1437 * Test for individual row. 1438 * @param i Row index. 1439 * @return true if the row was sent to server and need to record metrics. 1440 * False if not, multiGet and multiPut e.g., the rows are sent 1441 * to server only if enough gets/puts are gathered. 1442 */ 1443 abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException; 1444 } 1445 1446 static abstract class Test extends TestBase { 1447 protected Connection connection; 1448 1449 Test(final Connection con, final TestOptions options, final Status status) { 1450 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1451 this.connection = con; 1452 } 1453 } 1454 1455 static abstract class AsyncTest extends TestBase { 1456 protected AsyncConnection connection; 1457 1458 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1459 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1460 this.connection = con; 1461 } 1462 } 1463 1464 static abstract class TableTest extends Test { 1465 protected Table table; 1466 1467 TableTest(Connection con, TestOptions options, Status status) { 1468 super(con, options, status); 1469 } 1470 1471 @Override 1472 void onStartup() throws IOException { 1473 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1474 } 1475 1476 @Override 1477 void onTakedown() throws IOException { 1478 table.close(); 1479 } 1480 } 1481 1482 /* 1483 Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1484 */ 1485 static abstract class MetaTest extends TableTest { 1486 protected int keyLength; 1487 1488 MetaTest(Connection con, TestOptions options, Status status) { 1489 super(con, options, status); 1490 keyLength = Integer.toString(opts.perClientRunRows).length(); 1491 } 1492 1493 @Override 1494 void onTakedown() throws IOException { 1495 // No clean up 1496 } 1497 1498 /* 1499 Generates Lexicographically ascending strings 1500 */ 1501 protected byte[] getSplitKey(final int i) { 1502 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1503 } 1504 1505 } 1506 1507 static abstract class AsyncTableTest extends AsyncTest { 1508 protected AsyncTable<?> table; 1509 1510 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1511 super(con, options, status); 1512 } 1513 1514 @Override 1515 void onStartup() throws IOException { 1516 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1517 } 1518 1519 @Override 1520 void onTakedown() throws IOException { 1521 } 1522 } 1523 1524 static class AsyncRandomReadTest extends AsyncTableTest { 1525 private final Consistency consistency; 1526 private ArrayList<Get> gets; 1527 private Random rd = new Random(); 1528 1529 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1530 super(con, options, status); 1531 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1532 if (opts.multiGet > 0) { 1533 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1534 this.gets = new ArrayList<>(opts.multiGet); 1535 } 1536 } 1537 1538 @Override 1539 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1540 if (opts.randomSleep > 0) { 1541 Thread.sleep(rd.nextInt(opts.randomSleep)); 1542 } 1543 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1544 for (int family = 0; family < opts.families; family++) { 1545 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1546 if (opts.addColumns) { 1547 for (int column = 0; column < opts.columns; column++) { 1548 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1549 get.addColumn(familyName, qualifier); 1550 } 1551 } else { 1552 get.addFamily(familyName); 1553 } 1554 } 1555 if (opts.filterAll) { 1556 get.setFilter(new FilterAllFilter()); 1557 } 1558 get.setConsistency(consistency); 1559 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1560 try { 1561 if (opts.multiGet > 0) { 1562 this.gets.add(get); 1563 if (this.gets.size() == opts.multiGet) { 1564 Result[] rs = 1565 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1566 updateValueSize(rs); 1567 this.gets.clear(); 1568 } else { 1569 return false; 1570 } 1571 } else { 1572 updateValueSize(this.table.get(get).get()); 1573 } 1574 } catch (ExecutionException e) { 1575 throw new IOException(e); 1576 } 1577 return true; 1578 } 1579 1580 public static RuntimeException runtime(Throwable e) { 1581 if (e instanceof RuntimeException) { 1582 return (RuntimeException) e; 1583 } 1584 return new RuntimeException(e); 1585 } 1586 1587 public static <V> V propagate(Callable<V> callable) { 1588 try { 1589 return callable.call(); 1590 } catch (Exception e) { 1591 throw runtime(e); 1592 } 1593 } 1594 1595 @Override 1596 protected int getReportingPeriod() { 1597 int period = opts.perClientRunRows / 10; 1598 return period == 0 ? opts.perClientRunRows : period; 1599 } 1600 1601 @Override 1602 protected void testTakedown() throws IOException { 1603 if (this.gets != null && this.gets.size() > 0) { 1604 this.table.get(gets); 1605 this.gets.clear(); 1606 } 1607 super.testTakedown(); 1608 } 1609 } 1610 1611 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1612 1613 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1614 super(con, options, status); 1615 } 1616 1617 @Override 1618 protected byte[] generateRow(final int i) { 1619 return getRandomRow(this.rand, opts.totalRows); 1620 } 1621 } 1622 1623 static class AsyncScanTest extends AsyncTableTest { 1624 private ResultScanner testScanner; 1625 private AsyncTable<?> asyncTable; 1626 1627 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1628 super(con, options, status); 1629 } 1630 1631 @Override 1632 void onStartup() throws IOException { 1633 this.asyncTable = 1634 connection.getTable(TableName.valueOf(opts.tableName), 1635 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1636 } 1637 1638 @Override 1639 void testTakedown() throws IOException { 1640 if (this.testScanner != null) { 1641 updateScanMetrics(this.testScanner.getScanMetrics()); 1642 this.testScanner.close(); 1643 } 1644 super.testTakedown(); 1645 } 1646 1647 @Override 1648 boolean testRow(final int i, final long startTime) throws IOException { 1649 if (this.testScanner == null) { 1650 Scan scan = 1651 new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1652 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1653 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1654 for (int family = 0; family < opts.families; family++) { 1655 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1656 if (opts.addColumns) { 1657 for (int column = 0; column < opts.columns; column++) { 1658 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1659 scan.addColumn(familyName, qualifier); 1660 } 1661 } else { 1662 scan.addFamily(familyName); 1663 } 1664 } 1665 if (opts.filterAll) { 1666 scan.setFilter(new FilterAllFilter()); 1667 } 1668 this.testScanner = asyncTable.getScanner(scan); 1669 } 1670 Result r = testScanner.next(); 1671 updateValueSize(r); 1672 return true; 1673 } 1674 } 1675 1676 static class AsyncSequentialReadTest extends AsyncTableTest { 1677 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1678 super(con, options, status); 1679 } 1680 1681 @Override 1682 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1683 Get get = new Get(format(i)); 1684 for (int family = 0; family < opts.families; family++) { 1685 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1686 if (opts.addColumns) { 1687 for (int column = 0; column < opts.columns; column++) { 1688 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1689 get.addColumn(familyName, qualifier); 1690 } 1691 } else { 1692 get.addFamily(familyName); 1693 } 1694 } 1695 if (opts.filterAll) { 1696 get.setFilter(new FilterAllFilter()); 1697 } 1698 try { 1699 updateValueSize(table.get(get).get()); 1700 } catch (ExecutionException e) { 1701 throw new IOException(e); 1702 } 1703 return true; 1704 } 1705 } 1706 1707 static class AsyncSequentialWriteTest extends AsyncTableTest { 1708 private ArrayList<Put> puts; 1709 1710 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1711 super(con, options, status); 1712 if (opts.multiPut > 0) { 1713 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1714 this.puts = new ArrayList<>(opts.multiPut); 1715 } 1716 } 1717 1718 protected byte[] generateRow(final int i) { 1719 return format(i); 1720 } 1721 1722 @Override 1723 @SuppressWarnings("ReturnValueIgnored") 1724 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1725 byte[] row = generateRow(i); 1726 Put put = new Put(row); 1727 for (int family = 0; family < opts.families; family++) { 1728 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1729 for (int column = 0; column < opts.columns; column++) { 1730 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1731 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1732 if (opts.useTags) { 1733 byte[] tag = generateData(this.rand, TAG_LENGTH); 1734 Tag[] tags = new Tag[opts.noOfTags]; 1735 for (int n = 0; n < opts.noOfTags; n++) { 1736 Tag t = new ArrayBackedTag((byte) n, tag); 1737 tags[n] = t; 1738 } 1739 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 1740 value, tags); 1741 put.add(kv); 1742 updateValueSize(kv.getValueLength()); 1743 } else { 1744 put.addColumn(familyName, qualifier, value); 1745 updateValueSize(value.length); 1746 } 1747 } 1748 } 1749 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1750 try { 1751 table.put(put).get(); 1752 if (opts.multiPut > 0) { 1753 this.puts.add(put); 1754 if (this.puts.size() == opts.multiPut) { 1755 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1756 this.puts.clear(); 1757 } else { 1758 return false; 1759 } 1760 } else { 1761 table.put(put).get(); 1762 } 1763 } catch (ExecutionException e) { 1764 throw new IOException(e); 1765 } 1766 return true; 1767 } 1768 } 1769 1770 static abstract class BufferedMutatorTest extends Test { 1771 protected BufferedMutator mutator; 1772 protected Table table; 1773 1774 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1775 super(con, options, status); 1776 } 1777 1778 @Override 1779 void onStartup() throws IOException { 1780 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1781 p.writeBufferSize(opts.bufferSize); 1782 this.mutator = connection.getBufferedMutator(p); 1783 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1784 } 1785 1786 @Override 1787 void onTakedown() throws IOException { 1788 mutator.close(); 1789 table.close(); 1790 } 1791 } 1792 1793 static class RandomSeekScanTest extends TableTest { 1794 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1795 super(con, options, status); 1796 } 1797 1798 @Override 1799 boolean testRow(final int i, final long startTime) throws IOException { 1800 Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) 1801 .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 1802 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 1803 .setScanMetricsEnabled(true); 1804 FilterList list = new FilterList(); 1805 for (int family = 0; family < opts.families; family++) { 1806 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1807 if (opts.addColumns) { 1808 for (int column = 0; column < opts.columns; column++) { 1809 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1810 scan.addColumn(familyName, qualifier); 1811 } 1812 } else { 1813 scan.addFamily(familyName); 1814 } 1815 } 1816 if (opts.filterAll) { 1817 list.addFilter(new FilterAllFilter()); 1818 } 1819 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1820 scan.setFilter(list); 1821 ResultScanner s = this.table.getScanner(scan); 1822 try { 1823 for (Result rr; (rr = s.next()) != null;) { 1824 updateValueSize(rr); 1825 } 1826 } finally { 1827 updateScanMetrics(s.getScanMetrics()); 1828 s.close(); 1829 } 1830 return true; 1831 } 1832 1833 @Override 1834 protected int getReportingPeriod() { 1835 int period = opts.perClientRunRows / 100; 1836 return period == 0 ? opts.perClientRunRows : period; 1837 } 1838 1839 } 1840 1841 static abstract class RandomScanWithRangeTest extends TableTest { 1842 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1843 super(con, options, status); 1844 } 1845 1846 @Override 1847 boolean testRow(final int i, final long startTime) throws IOException { 1848 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1849 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1850 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1851 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1852 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1853 for (int family = 0; family < opts.families; family++) { 1854 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1855 if (opts.addColumns) { 1856 for (int column = 0; column < opts.columns; column++) { 1857 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1858 scan.addColumn(familyName, qualifier); 1859 } 1860 } else { 1861 scan.addFamily(familyName); 1862 } 1863 } 1864 if (opts.filterAll) { 1865 scan.setFilter(new FilterAllFilter()); 1866 } 1867 Result r = null; 1868 int count = 0; 1869 ResultScanner s = this.table.getScanner(scan); 1870 try { 1871 for (; (r = s.next()) != null;) { 1872 updateValueSize(r); 1873 count++; 1874 } 1875 if (i % 100 == 0) { 1876 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1877 Bytes.toString(startAndStopRow.getFirst()), 1878 Bytes.toString(startAndStopRow.getSecond()), count)); 1879 } 1880 } finally { 1881 updateScanMetrics(s.getScanMetrics()); 1882 s.close(); 1883 } 1884 return true; 1885 } 1886 1887 protected abstract Pair<byte[],byte[]> getStartAndStopRow(); 1888 1889 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1890 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1891 int stop = start + maxRange; 1892 return new Pair<>(format(start), format(stop)); 1893 } 1894 1895 @Override 1896 protected int getReportingPeriod() { 1897 int period = opts.perClientRunRows / 100; 1898 return period == 0? opts.perClientRunRows: period; 1899 } 1900 } 1901 1902 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1903 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1904 super(con, options, status); 1905 } 1906 1907 @Override 1908 protected Pair<byte[], byte[]> getStartAndStopRow() { 1909 return generateStartAndStopRows(10); 1910 } 1911 } 1912 1913 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1914 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1915 super(con, options, status); 1916 } 1917 1918 @Override 1919 protected Pair<byte[], byte[]> getStartAndStopRow() { 1920 return generateStartAndStopRows(100); 1921 } 1922 } 1923 1924 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1925 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1926 super(con, options, status); 1927 } 1928 1929 @Override 1930 protected Pair<byte[], byte[]> getStartAndStopRow() { 1931 return generateStartAndStopRows(1000); 1932 } 1933 } 1934 1935 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1936 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1937 super(con, options, status); 1938 } 1939 1940 @Override 1941 protected Pair<byte[], byte[]> getStartAndStopRow() { 1942 return generateStartAndStopRows(10000); 1943 } 1944 } 1945 1946 static class RandomReadTest extends TableTest { 1947 private final Consistency consistency; 1948 private ArrayList<Get> gets; 1949 private Random rd = new Random(); 1950 private long numOfReplyFromReplica = 0; 1951 1952 RandomReadTest(Connection con, TestOptions options, Status status) { 1953 super(con, options, status); 1954 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1955 if (opts.multiGet > 0) { 1956 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1957 this.gets = new ArrayList<>(opts.multiGet); 1958 } 1959 } 1960 1961 @Override 1962 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1963 if (opts.randomSleep > 0) { 1964 Thread.sleep(rd.nextInt(opts.randomSleep)); 1965 } 1966 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1967 for (int family = 0; family < opts.families; family++) { 1968 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1969 if (opts.addColumns) { 1970 for (int column = 0; column < opts.columns; column++) { 1971 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 1972 get.addColumn(familyName, qualifier); 1973 } 1974 } else { 1975 get.addFamily(familyName); 1976 } 1977 } 1978 if (opts.filterAll) { 1979 get.setFilter(new FilterAllFilter()); 1980 } 1981 get.setConsistency(consistency); 1982 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1983 if (opts.multiGet > 0) { 1984 this.gets.add(get); 1985 if (this.gets.size() == opts.multiGet) { 1986 Result [] rs = this.table.get(this.gets); 1987 if (opts.replicas > 1) { 1988 long latency = System.nanoTime() - startTime; 1989 updateValueSize(rs, latency); 1990 } else { 1991 updateValueSize(rs); 1992 } 1993 this.gets.clear(); 1994 } else { 1995 return false; 1996 } 1997 } else { 1998 if (opts.replicas > 1) { 1999 Result r = this.table.get(get); 2000 long latency = System.nanoTime() - startTime; 2001 updateValueSize(r, latency); 2002 } else { 2003 updateValueSize(this.table.get(get)); 2004 } 2005 } 2006 return true; 2007 } 2008 2009 @Override 2010 protected int getReportingPeriod() { 2011 int period = opts.perClientRunRows / 10; 2012 return period == 0 ? opts.perClientRunRows : period; 2013 } 2014 2015 @Override 2016 protected void testTakedown() throws IOException { 2017 if (this.gets != null && this.gets.size() > 0) { 2018 this.table.get(gets); 2019 this.gets.clear(); 2020 } 2021 super.testTakedown(); 2022 } 2023 } 2024 2025 /* 2026 Send random reads against fake regions inserted by MetaWriteTest 2027 */ 2028 static class MetaRandomReadTest extends MetaTest { 2029 private Random rd = new Random(); 2030 private RegionLocator regionLocator; 2031 2032 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2033 super(con, options, status); 2034 LOG.info("call getRegionLocation"); 2035 } 2036 2037 @Override 2038 void onStartup() throws IOException { 2039 super.onStartup(); 2040 this.regionLocator = connection.getRegionLocator(table.getName()); 2041 } 2042 2043 @Override 2044 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2045 if (opts.randomSleep > 0) { 2046 Thread.sleep(rd.nextInt(opts.randomSleep)); 2047 } 2048 HRegionLocation hRegionLocation = regionLocator.getRegionLocation( 2049 getSplitKey(rd.nextInt(opts.perClientRunRows)), true); 2050 LOG.debug("get location for region: " + hRegionLocation); 2051 return true; 2052 } 2053 2054 @Override 2055 protected int getReportingPeriod() { 2056 int period = opts.perClientRunRows / 10; 2057 return period == 0 ? opts.perClientRunRows : period; 2058 } 2059 2060 @Override 2061 protected void testTakedown() throws IOException { 2062 super.testTakedown(); 2063 } 2064 } 2065 2066 static class RandomWriteTest extends SequentialWriteTest { 2067 RandomWriteTest(Connection con, TestOptions options, Status status) { 2068 super(con, options, status); 2069 } 2070 2071 @Override 2072 protected byte[] generateRow(final int i) { 2073 return getRandomRow(this.rand, opts.totalRows); 2074 } 2075 2076 2077 } 2078 2079 static class ScanTest extends TableTest { 2080 private ResultScanner testScanner; 2081 2082 ScanTest(Connection con, TestOptions options, Status status) { 2083 super(con, options, status); 2084 } 2085 2086 @Override 2087 void testTakedown() throws IOException { 2088 if (this.testScanner != null) { 2089 this.testScanner.close(); 2090 } 2091 super.testTakedown(); 2092 } 2093 2094 2095 @Override 2096 boolean testRow(final int i, final long startTime) throws IOException { 2097 if (this.testScanner == null) { 2098 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2099 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2100 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2101 for (int family = 0; family < opts.families; family++) { 2102 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2103 if (opts.addColumns) { 2104 for (int column = 0; column < opts.columns; column++) { 2105 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2106 scan.addColumn(familyName, qualifier); 2107 } 2108 } else { 2109 scan.addFamily(familyName); 2110 } 2111 } 2112 if (opts.filterAll) { 2113 scan.setFilter(new FilterAllFilter()); 2114 } 2115 this.testScanner = table.getScanner(scan); 2116 } 2117 Result r = testScanner.next(); 2118 updateValueSize(r); 2119 return true; 2120 } 2121 } 2122 2123 /** 2124 * Base class for operations that are CAS-like; that read a value and then set it based off what 2125 * they read. In this category is increment, append, checkAndPut, etc. 2126 * 2127 * <p>These operations also want some concurrency going on. Usually when these tests run, they 2128 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2129 * same key space. We do this with our getStartRow and getLastRow overrides. 2130 */ 2131 static abstract class CASTableTest extends TableTest { 2132 private final byte [] qualifier; 2133 CASTableTest(Connection con, TestOptions options, Status status) { 2134 super(con, options, status); 2135 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2136 } 2137 2138 byte [] getQualifier() { 2139 return this.qualifier; 2140 } 2141 2142 @Override 2143 int getStartRow() { 2144 return 0; 2145 } 2146 2147 @Override 2148 int getLastRow() { 2149 return opts.perClientRunRows; 2150 } 2151 } 2152 2153 static class IncrementTest extends CASTableTest { 2154 IncrementTest(Connection con, TestOptions options, Status status) { 2155 super(con, options, status); 2156 } 2157 2158 @Override 2159 boolean testRow(final int i, final long startTime) throws IOException { 2160 Increment increment = new Increment(format(i)); 2161 // unlike checkAndXXX tests, which make most sense to do on a single value, 2162 // if multiple families are specified for an increment test we assume it is 2163 // meant to raise the work factor 2164 for (int family = 0; family < opts.families; family++) { 2165 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2166 increment.addColumn(familyName, getQualifier(), 1l); 2167 } 2168 updateValueSize(this.table.increment(increment)); 2169 return true; 2170 } 2171 } 2172 2173 static class AppendTest extends CASTableTest { 2174 AppendTest(Connection con, TestOptions options, Status status) { 2175 super(con, options, status); 2176 } 2177 2178 @Override 2179 boolean testRow(final int i, final long startTime) throws IOException { 2180 byte [] bytes = format(i); 2181 Append append = new Append(bytes); 2182 // unlike checkAndXXX tests, which make most sense to do on a single value, 2183 // if multiple families are specified for an append test we assume it is 2184 // meant to raise the work factor 2185 for (int family = 0; family < opts.families; family++) { 2186 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2187 append.addColumn(familyName, getQualifier(), bytes); 2188 } 2189 updateValueSize(this.table.append(append)); 2190 return true; 2191 } 2192 } 2193 2194 static class CheckAndMutateTest extends CASTableTest { 2195 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2196 super(con, options, status); 2197 } 2198 2199 @Override 2200 boolean testRow(final int i, final long startTime) throws IOException { 2201 final byte [] bytes = format(i); 2202 // checkAndXXX tests operate on only a single value 2203 // Put a known value so when we go to check it, it is there. 2204 Put put = new Put(bytes); 2205 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2206 this.table.put(put); 2207 RowMutations mutations = new RowMutations(bytes); 2208 mutations.add(put); 2209 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2210 .ifEquals(bytes).thenMutate(mutations); 2211 return true; 2212 } 2213 } 2214 2215 static class CheckAndPutTest extends CASTableTest { 2216 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2217 super(con, options, status); 2218 } 2219 2220 @Override 2221 boolean testRow(final int i, final long startTime) throws IOException { 2222 final byte [] bytes = format(i); 2223 // checkAndXXX tests operate on only a single value 2224 // Put a known value so when we go to check it, it is there. 2225 Put put = new Put(bytes); 2226 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2227 this.table.put(put); 2228 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2229 .ifEquals(bytes).thenPut(put); 2230 return true; 2231 } 2232 } 2233 2234 static class CheckAndDeleteTest extends CASTableTest { 2235 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2236 super(con, options, status); 2237 } 2238 2239 @Override 2240 boolean testRow(final int i, final long startTime) throws IOException { 2241 final byte [] bytes = format(i); 2242 // checkAndXXX tests operate on only a single value 2243 // Put a known value so when we go to check it, it is there. 2244 Put put = new Put(bytes); 2245 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2246 this.table.put(put); 2247 Delete delete = new Delete(put.getRow()); 2248 delete.addColumn(FAMILY_ZERO, getQualifier()); 2249 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) 2250 .ifEquals(bytes).thenDelete(delete); 2251 return true; 2252 } 2253 } 2254 2255 /* 2256 Delete all fake regions inserted to meta table by MetaWriteTest. 2257 */ 2258 static class CleanMetaTest extends MetaTest { 2259 CleanMetaTest(Connection con, TestOptions options, Status status) { 2260 super(con, options, status); 2261 } 2262 2263 @Override 2264 boolean testRow(final int i, final long startTime) throws IOException { 2265 try { 2266 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2267 .getRegionLocation(getSplitKey(i), false).getRegion(); 2268 LOG.debug("deleting region from meta: " + regionInfo); 2269 2270 Delete delete = MetaTableAccessor 2271 .makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2272 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2273 t.delete(delete); 2274 } 2275 } catch (IOException ie) { 2276 // Log and continue 2277 LOG.error("cannot find region with start key: " + i); 2278 } 2279 return true; 2280 } 2281 } 2282 2283 static class SequentialReadTest extends TableTest { 2284 SequentialReadTest(Connection con, TestOptions options, Status status) { 2285 super(con, options, status); 2286 } 2287 2288 @Override 2289 boolean testRow(final int i, final long startTime) throws IOException { 2290 Get get = new Get(format(i)); 2291 for (int family = 0; family < opts.families; family++) { 2292 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2293 if (opts.addColumns) { 2294 for (int column = 0; column < opts.columns; column++) { 2295 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2296 get.addColumn(familyName, qualifier); 2297 } 2298 } else { 2299 get.addFamily(familyName); 2300 } 2301 } 2302 if (opts.filterAll) { 2303 get.setFilter(new FilterAllFilter()); 2304 } 2305 updateValueSize(table.get(get)); 2306 return true; 2307 } 2308 } 2309 2310 static class SequentialWriteTest extends BufferedMutatorTest { 2311 private ArrayList<Put> puts; 2312 2313 2314 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2315 super(con, options, status); 2316 if (opts.multiPut > 0) { 2317 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2318 this.puts = new ArrayList<>(opts.multiPut); 2319 } 2320 } 2321 2322 protected byte[] generateRow(final int i) { 2323 return format(i); 2324 } 2325 2326 @Override 2327 boolean testRow(final int i, final long startTime) throws IOException { 2328 byte[] row = generateRow(i); 2329 Put put = new Put(row); 2330 for (int family = 0; family < opts.families; family++) { 2331 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2332 for (int column = 0; column < opts.columns; column++) { 2333 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2334 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2335 if (opts.useTags) { 2336 byte[] tag = generateData(this.rand, TAG_LENGTH); 2337 Tag[] tags = new Tag[opts.noOfTags]; 2338 for (int n = 0; n < opts.noOfTags; n++) { 2339 Tag t = new ArrayBackedTag((byte) n, tag); 2340 tags[n] = t; 2341 } 2342 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, 2343 value, tags); 2344 put.add(kv); 2345 updateValueSize(kv.getValueLength()); 2346 } else { 2347 put.addColumn(familyName, qualifier, value); 2348 updateValueSize(value.length); 2349 } 2350 } 2351 } 2352 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2353 if (opts.autoFlush) { 2354 if (opts.multiPut > 0) { 2355 this.puts.add(put); 2356 if (this.puts.size() == opts.multiPut) { 2357 table.put(this.puts); 2358 this.puts.clear(); 2359 } else { 2360 return false; 2361 } 2362 } else { 2363 table.put(put); 2364 } 2365 } else { 2366 mutator.mutate(put); 2367 } 2368 return true; 2369 } 2370 } 2371 2372 /* 2373 Insert fake regions into meta table with contiguous split keys. 2374 */ 2375 static class MetaWriteTest extends MetaTest { 2376 2377 MetaWriteTest(Connection con, TestOptions options, Status status) { 2378 super(con, options, status); 2379 } 2380 2381 @Override 2382 boolean testRow(final int i, final long startTime) throws IOException { 2383 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2384 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2385 .setStartKey(getSplitKey(i)) 2386 .setEndKey(getSplitKey(i + 1)) 2387 .build()); 2388 regionInfos.add(regionInfo); 2389 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2390 2391 // write the serverName columns 2392 MetaTableAccessor.updateRegionLocation(connection, 2393 regionInfo, ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2394 System.currentTimeMillis()); 2395 return true; 2396 } 2397 } 2398 static class FilteredScanTest extends TableTest { 2399 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2400 2401 FilteredScanTest(Connection con, TestOptions options, Status status) { 2402 super(con, options, status); 2403 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2404 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB + 2405 ". This could take a very long time."); 2406 } 2407 } 2408 2409 @Override 2410 boolean testRow(int i, final long startTime) throws IOException { 2411 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2412 Scan scan = constructScan(value); 2413 ResultScanner scanner = null; 2414 try { 2415 scanner = this.table.getScanner(scan); 2416 for (Result r = null; (r = scanner.next()) != null;) { 2417 updateValueSize(r); 2418 } 2419 } finally { 2420 if (scanner != null) { 2421 updateScanMetrics(scanner.getScanMetrics()); 2422 scanner.close(); 2423 } 2424 } 2425 return true; 2426 } 2427 2428 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2429 FilterList list = new FilterList(); 2430 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, 2431 CompareOperator.EQUAL, new BinaryComparator(valuePrefix)); 2432 list.addFilter(filter); 2433 if (opts.filterAll) { 2434 list.addFilter(new FilterAllFilter()); 2435 } 2436 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2437 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2438 .setScanMetricsEnabled(true); 2439 if (opts.addColumns) { 2440 for (int column = 0; column < opts.columns; column++) { 2441 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); 2442 scan.addColumn(FAMILY_ZERO, qualifier); 2443 } 2444 } else { 2445 scan.addFamily(FAMILY_ZERO); 2446 } 2447 scan.setFilter(list); 2448 return scan; 2449 } 2450 } 2451 2452 /** 2453 * Compute a throughput rate in MB/s. 2454 * @param rows Number of records consumed. 2455 * @param timeMs Time taken in milliseconds. 2456 * @return String value with label, ie '123.76 MB/s' 2457 */ 2458 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) { 2459 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH + 2460 ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families); 2461 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2462 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) 2463 .divide(BYTES_PER_MB, CXT); 2464 return FMT.format(mbps) + " MB/s"; 2465 } 2466 2467 /* 2468 * Format passed integer. 2469 * @param number 2470 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed 2471 * number (Does absolute in case number is negative). 2472 */ 2473 public static byte [] format(final int number) { 2474 byte [] b = new byte[ROW_LENGTH]; 2475 int d = Math.abs(number); 2476 for (int i = b.length - 1; i >= 0; i--) { 2477 b[i] = (byte)((d % 10) + '0'); 2478 d /= 10; 2479 } 2480 return b; 2481 } 2482 2483 /* 2484 * This method takes some time and is done inline uploading data. For 2485 * example, doing the mapfile test, generation of the key and value 2486 * consumes about 30% of CPU time. 2487 * @return Generated random value to insert into a table cell. 2488 */ 2489 public static byte[] generateData(final Random r, int length) { 2490 byte [] b = new byte [length]; 2491 int i; 2492 2493 for(i = 0; i < (length-8); i += 8) { 2494 b[i] = (byte) (65 + r.nextInt(26)); 2495 b[i+1] = b[i]; 2496 b[i+2] = b[i]; 2497 b[i+3] = b[i]; 2498 b[i+4] = b[i]; 2499 b[i+5] = b[i]; 2500 b[i+6] = b[i]; 2501 b[i+7] = b[i]; 2502 } 2503 2504 byte a = (byte) (65 + r.nextInt(26)); 2505 for(; i < length; i++) { 2506 b[i] = a; 2507 } 2508 return b; 2509 } 2510 2511 static byte [] getRandomRow(final Random random, final int totalRows) { 2512 return format(generateRandomRow(random, totalRows)); 2513 } 2514 2515 static int generateRandomRow(final Random random, final int totalRows) { 2516 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2517 } 2518 2519 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2520 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2521 throws IOException, InterruptedException { 2522 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " 2523 + opts.perClientRunRows + " rows"); 2524 long totalElapsedTime; 2525 2526 final TestBase t; 2527 try { 2528 if (AsyncTest.class.isAssignableFrom(cmd)) { 2529 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2530 Constructor<? extends AsyncTest> constructor = 2531 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2532 t = constructor.newInstance(asyncCon, opts, status); 2533 } else { 2534 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2535 Constructor<? extends Test> constructor = 2536 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2537 t = constructor.newInstance(con, opts, status); 2538 } 2539 } catch (NoSuchMethodException e) { 2540 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2541 + ". It does not provide a constructor as described by " 2542 + "the javadoc comment. Available constructors are: " 2543 + Arrays.toString(cmd.getConstructors())); 2544 } catch (Exception e) { 2545 throw new IllegalStateException("Failed to construct command class", e); 2546 } 2547 totalElapsedTime = t.test(); 2548 2549 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + 2550 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + 2551 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2552 getAverageValueLength(opts), opts.families, opts.columns) + ")"); 2553 2554 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2555 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2556 } 2557 2558 private static int getAverageValueLength(final TestOptions opts) { 2559 return opts.valueRandom? opts.valueSize/2: opts.valueSize; 2560 } 2561 2562 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException, 2563 InterruptedException, ClassNotFoundException, ExecutionException { 2564 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2565 // the TestOptions introspection for us and dump the output in a readable format. 2566 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2567 Admin admin = null; 2568 Connection connection = null; 2569 try { 2570 connection = ConnectionFactory.createConnection(getConf()); 2571 admin = connection.getAdmin(); 2572 checkTable(admin, opts); 2573 } finally { 2574 if (admin != null) admin.close(); 2575 if (connection != null) connection.close(); 2576 } 2577 if (opts.nomapred) { 2578 doLocalClients(opts, getConf()); 2579 } else { 2580 doMapReduce(opts, getConf()); 2581 } 2582 } 2583 2584 protected void printUsage() { 2585 printUsage(PE_COMMAND_SHORTNAME, null); 2586 } 2587 2588 protected static void printUsage(final String message) { 2589 printUsage(PE_COMMAND_SHORTNAME, message); 2590 } 2591 2592 protected static void printUsageAndExit(final String message, final int exitCode) { 2593 printUsage(message); 2594 System.exit(exitCode); 2595 } 2596 2597 protected static void printUsage(final String shortName, final String message) { 2598 if (message != null && message.length() > 0) { 2599 System.err.println(message); 2600 } 2601 System.err.print("Usage: hbase " + shortName); 2602 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2603 System.err.println(); 2604 System.err.println("General Options:"); 2605 System.err.println(" nomapred Run multiple clients using threads " + 2606 "(rather than use mapreduce)"); 2607 System.err.println(" oneCon all the threads share the same connection. Default: False"); 2608 System.err.println(" connCount connections all threads share. " 2609 + "For example, if set to 2, then all thread share 2 connection. " 2610 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2611 + "if not, connCount=thread number"); 2612 2613 System.err.println(" sampleRate Execute test on a sample of total " + 2614 "rows. Only supported by randomRead. Default: 1.0"); 2615 System.err.println(" period Report every 'period' rows: " + 2616 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10); 2617 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2618 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + 2619 "Default: 0"); 2620 System.err.println(" latency Set to report operation latencies. Default: False"); 2621 System.err.println(" latencyThreshold Set to report number of operations with latency " + 2622 "over lantencyThreshold, unit in millisecond, default 0"); 2623 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" + 2624 " rows have been treated. Default: 0"); 2625 System.err.println(" valueSize Pass value size to use: Default: " 2626 + DEFAULT_OPTS.getValueSize()); 2627 System.err.println(" valueRandom Set if we should vary value size between 0 and " + 2628 "'valueSize'; set on read for stats on size: Default: Not set."); 2629 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2630 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2631 System.err.println(); 2632 System.err.println("Table Creation / Write Tests:"); 2633 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2634 System.err.println(" rows Rows each client runs. Default: " 2635 + DEFAULT_OPTS.getPerClientRunRows() 2636 + ". In case of randomReads and randomSeekScans this could" 2637 + " be specified along with --size to specify the number of rows to be scanned within" 2638 + " the total range specified by the size."); 2639 System.err.println( 2640 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2641 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2642 + " use size to specify the end range and --rows" 2643 + " specifies the number of rows within that range. " + "Default: 1.0."); 2644 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2645 System.err.println(" flushCommits Used to determine if the test should flush the table. " + 2646 "Default: false"); 2647 System.err.println(" valueZipf Set if we should vary value size between 0 and " + 2648 "'valueSize' in zipf form: Default: Not set."); 2649 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2650 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2651 System.err.println(" multiPut Batch puts together into groups of N. Only supported " + 2652 "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2653 System.err.println(" presplit Create presplit table. If a table with same name exists," 2654 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2655 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2656 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " + 2657 "Default: false"); 2658 System.err.println(" numoftags Specify the no of tags that would be needed. " + 2659 "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2660 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2661 System.err.println(" columns Columns to write per row. Default: 1"); 2662 System.err.println(" families Specify number of column families for the table. Default: 1"); 2663 System.err.println(); 2664 System.err.println("Read Tests:"); 2665 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2666 + " there by not returning any thing back to the client. Helps to check the server side" 2667 + " performance. Uses FilterAllFilter internally. "); 2668 System.err.println(" multiGet Batch gets together into groups of N. Only supported " + 2669 "by randomRead. Default: disabled"); 2670 System.err.println(" inmemory Tries to keep the HFiles of the CF " + 2671 "inmemory as far as possible. Not guaranteed that reads are always served " + 2672 "from memory. Default: false"); 2673 System.err.println(" bloomFilter Bloom filter type, one of " 2674 + Arrays.toString(BloomType.values())); 2675 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2676 System.err.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2677 + "Uses the CompactingMemstore"); 2678 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2679 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2680 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2681 System.err.println(" caching Scan caching to use. Default: 30"); 2682 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2683 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2684 System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2685 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2686 System.err.println(); 2687 System.err.println(" Note: -D properties will be applied to the conf used. "); 2688 System.err.println(" For example: "); 2689 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2690 System.err.println(" -Dmapreduce.task.timeout=60000"); 2691 System.err.println(); 2692 System.err.println("Command:"); 2693 for (CmdDescriptor command : COMMANDS.values()) { 2694 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2695 } 2696 System.err.println(); 2697 System.err.println("Args:"); 2698 System.err.println(" nclients Integer. Required. Total number of clients " 2699 + "(and HRegionServers) running. 1 <= value <= 500"); 2700 System.err.println("Examples:"); 2701 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2702 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2703 System.err.println(" To run 10 clients doing increments over ten rows:"); 2704 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2705 } 2706 2707 /** 2708 * Parse options passed in via an arguments array. Assumes that array has been split 2709 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain 2710 * in the queue at the conclusion of this method call. It's up to the caller to deal 2711 * with these unrecognized arguments. 2712 */ 2713 static TestOptions parseOpts(Queue<String> args) { 2714 TestOptions opts = new TestOptions(); 2715 2716 String cmd = null; 2717 while ((cmd = args.poll()) != null) { 2718 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2719 // place item back onto queue so that caller knows parsing was incomplete 2720 args.add(cmd); 2721 break; 2722 } 2723 2724 final String nmr = "--nomapred"; 2725 if (cmd.startsWith(nmr)) { 2726 opts.nomapred = true; 2727 continue; 2728 } 2729 2730 final String rows = "--rows="; 2731 if (cmd.startsWith(rows)) { 2732 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2733 continue; 2734 } 2735 2736 final String cycles = "--cycles="; 2737 if (cmd.startsWith(cycles)) { 2738 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2739 continue; 2740 } 2741 2742 final String sampleRate = "--sampleRate="; 2743 if (cmd.startsWith(sampleRate)) { 2744 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2745 continue; 2746 } 2747 2748 final String table = "--table="; 2749 if (cmd.startsWith(table)) { 2750 opts.tableName = cmd.substring(table.length()); 2751 continue; 2752 } 2753 2754 final String startRow = "--startRow="; 2755 if (cmd.startsWith(startRow)) { 2756 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2757 continue; 2758 } 2759 2760 final String compress = "--compress="; 2761 if (cmd.startsWith(compress)) { 2762 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2763 continue; 2764 } 2765 2766 final String traceRate = "--traceRate="; 2767 if (cmd.startsWith(traceRate)) { 2768 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2769 continue; 2770 } 2771 2772 final String blockEncoding = "--blockEncoding="; 2773 if (cmd.startsWith(blockEncoding)) { 2774 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2775 continue; 2776 } 2777 2778 final String flushCommits = "--flushCommits="; 2779 if (cmd.startsWith(flushCommits)) { 2780 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2781 continue; 2782 } 2783 2784 final String writeToWAL = "--writeToWAL="; 2785 if (cmd.startsWith(writeToWAL)) { 2786 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2787 continue; 2788 } 2789 2790 final String presplit = "--presplit="; 2791 if (cmd.startsWith(presplit)) { 2792 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2793 continue; 2794 } 2795 2796 final String inMemory = "--inmemory="; 2797 if (cmd.startsWith(inMemory)) { 2798 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2799 continue; 2800 } 2801 2802 final String autoFlush = "--autoFlush="; 2803 if (cmd.startsWith(autoFlush)) { 2804 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2805 continue; 2806 } 2807 2808 final String onceCon = "--oneCon="; 2809 if (cmd.startsWith(onceCon)) { 2810 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2811 continue; 2812 } 2813 2814 final String connCount = "--connCount="; 2815 if (cmd.startsWith(connCount)) { 2816 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2817 continue; 2818 } 2819 2820 final String latencyThreshold = "--latencyThreshold="; 2821 if (cmd.startsWith(latencyThreshold)) { 2822 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2823 continue; 2824 } 2825 2826 final String latency = "--latency"; 2827 if (cmd.startsWith(latency)) { 2828 opts.reportLatency = true; 2829 continue; 2830 } 2831 2832 final String multiGet = "--multiGet="; 2833 if (cmd.startsWith(multiGet)) { 2834 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2835 continue; 2836 } 2837 2838 final String multiPut = "--multiPut="; 2839 if (cmd.startsWith(multiPut)) { 2840 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2841 continue; 2842 } 2843 2844 final String useTags = "--usetags="; 2845 if (cmd.startsWith(useTags)) { 2846 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2847 continue; 2848 } 2849 2850 final String noOfTags = "--numoftags="; 2851 if (cmd.startsWith(noOfTags)) { 2852 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2853 continue; 2854 } 2855 2856 final String replicas = "--replicas="; 2857 if (cmd.startsWith(replicas)) { 2858 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2859 continue; 2860 } 2861 2862 final String filterOutAll = "--filterAll"; 2863 if (cmd.startsWith(filterOutAll)) { 2864 opts.filterAll = true; 2865 continue; 2866 } 2867 2868 final String size = "--size="; 2869 if (cmd.startsWith(size)) { 2870 opts.size = Float.parseFloat(cmd.substring(size.length())); 2871 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2872 continue; 2873 } 2874 2875 final String splitPolicy = "--splitPolicy="; 2876 if (cmd.startsWith(splitPolicy)) { 2877 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2878 continue; 2879 } 2880 2881 final String randomSleep = "--randomSleep="; 2882 if (cmd.startsWith(randomSleep)) { 2883 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2884 continue; 2885 } 2886 2887 final String measureAfter = "--measureAfter="; 2888 if (cmd.startsWith(measureAfter)) { 2889 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2890 continue; 2891 } 2892 2893 final String bloomFilter = "--bloomFilter="; 2894 if (cmd.startsWith(bloomFilter)) { 2895 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2896 continue; 2897 } 2898 2899 final String blockSize = "--blockSize="; 2900 if(cmd.startsWith(blockSize) ) { 2901 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2902 continue; 2903 } 2904 2905 final String valueSize = "--valueSize="; 2906 if (cmd.startsWith(valueSize)) { 2907 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2908 continue; 2909 } 2910 2911 final String valueRandom = "--valueRandom"; 2912 if (cmd.startsWith(valueRandom)) { 2913 opts.valueRandom = true; 2914 continue; 2915 } 2916 2917 final String valueZipf = "--valueZipf"; 2918 if (cmd.startsWith(valueZipf)) { 2919 opts.valueZipf = true; 2920 continue; 2921 } 2922 2923 final String period = "--period="; 2924 if (cmd.startsWith(period)) { 2925 opts.period = Integer.parseInt(cmd.substring(period.length())); 2926 continue; 2927 } 2928 2929 final String addColumns = "--addColumns="; 2930 if (cmd.startsWith(addColumns)) { 2931 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2932 continue; 2933 } 2934 2935 final String inMemoryCompaction = "--inmemoryCompaction="; 2936 if (cmd.startsWith(inMemoryCompaction)) { 2937 opts.inMemoryCompaction = 2938 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2939 continue; 2940 } 2941 2942 final String columns = "--columns="; 2943 if (cmd.startsWith(columns)) { 2944 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2945 continue; 2946 } 2947 2948 final String families = "--families="; 2949 if (cmd.startsWith(families)) { 2950 opts.families = Integer.parseInt(cmd.substring(families.length())); 2951 continue; 2952 } 2953 2954 final String caching = "--caching="; 2955 if (cmd.startsWith(caching)) { 2956 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2957 continue; 2958 } 2959 2960 final String asyncPrefetch = "--asyncPrefetch"; 2961 if (cmd.startsWith(asyncPrefetch)) { 2962 opts.asyncPrefetch = true; 2963 continue; 2964 } 2965 2966 final String cacheBlocks = "--cacheBlocks="; 2967 if (cmd.startsWith(cacheBlocks)) { 2968 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2969 continue; 2970 } 2971 2972 final String scanReadType = "--scanReadType="; 2973 if (cmd.startsWith(scanReadType)) { 2974 opts.scanReadType = 2975 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2976 continue; 2977 } 2978 2979 final String bufferSize = "--bufferSize="; 2980 if (cmd.startsWith(bufferSize)) { 2981 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2982 continue; 2983 } 2984 2985 validateParsedOpts(opts); 2986 2987 if (isCommandClass(cmd)) { 2988 opts.cmdName = cmd; 2989 try { 2990 opts.numClientThreads = Integer.parseInt(args.remove()); 2991 } catch (NoSuchElementException | NumberFormatException e) { 2992 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2993 } 2994 opts = calculateRowsAndSize(opts); 2995 break; 2996 } else { 2997 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2998 } 2999 3000 // Not matching any option or command. 3001 System.err.println("Error: Wrong option or command: " + cmd); 3002 args.add(cmd); 3003 break; 3004 } 3005 return opts; 3006 } 3007 3008 /** 3009 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3010 */ 3011 private static void validateParsedOpts(TestOptions opts) { 3012 3013 if (!opts.autoFlush && opts.multiPut > 0) { 3014 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3015 } 3016 3017 if (opts.oneCon && opts.connCount > 1) { 3018 throw new IllegalArgumentException("oneCon is set to true, " 3019 + "connCount should not bigger than 1"); 3020 } 3021 3022 if (opts.valueZipf && opts.valueRandom) { 3023 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3024 } 3025 } 3026 3027 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3028 int rowsPerGB = getRowsPerGB(opts); 3029 if ((opts.getCmdName() != null 3030 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3031 && opts.size != DEFAULT_OPTS.size 3032 && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { 3033 opts.totalRows = (int) opts.size * rowsPerGB; 3034 } else if (opts.size != DEFAULT_OPTS.size) { 3035 // total size in GB specified 3036 opts.totalRows = (int) opts.size * rowsPerGB; 3037 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3038 } else { 3039 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3040 opts.size = opts.totalRows / rowsPerGB; 3041 } 3042 return opts; 3043 } 3044 3045 static int getRowsPerGB(final TestOptions opts) { 3046 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() * 3047 opts.getColumns()); 3048 } 3049 3050 @Override 3051 public int run(String[] args) throws Exception { 3052 // Process command-line args. TODO: Better cmd-line processing 3053 // (but hopefully something not as painful as cli options). 3054 int errCode = -1; 3055 if (args.length < 1) { 3056 printUsage(); 3057 return errCode; 3058 } 3059 3060 try { 3061 LinkedList<String> argv = new LinkedList<>(); 3062 argv.addAll(Arrays.asList(args)); 3063 TestOptions opts = parseOpts(argv); 3064 3065 // args remaining, print help and exit 3066 if (!argv.isEmpty()) { 3067 errCode = 0; 3068 printUsage(); 3069 return errCode; 3070 } 3071 3072 // must run at least 1 client 3073 if (opts.numClientThreads <= 0) { 3074 throw new IllegalArgumentException("Number of clients must be > 0"); 3075 } 3076 3077 // cmdName should not be null, print help and exit 3078 if (opts.cmdName == null) { 3079 printUsage(); 3080 return errCode; 3081 } 3082 3083 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3084 if (cmdClass != null) { 3085 runTest(cmdClass, opts); 3086 errCode = 0; 3087 } 3088 3089 } catch (Exception e) { 3090 e.printStackTrace(); 3091 } 3092 3093 return errCode; 3094 } 3095 3096 private static boolean isCommandClass(String cmd) { 3097 return COMMANDS.containsKey(cmd); 3098 } 3099 3100 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3101 CmdDescriptor descriptor = COMMANDS.get(cmd); 3102 return descriptor != null ? descriptor.getCmdClass() : null; 3103 } 3104 3105 public static void main(final String[] args) throws Exception { 3106 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3107 System.exit(res); 3108 } 3109}