001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Queue; 040import java.util.Random; 041import java.util.TreeMap; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.ThreadLocalRandom; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.conf.Configured; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.Append; 055import org.apache.hadoop.hbase.client.AsyncConnection; 056import org.apache.hadoop.hbase.client.AsyncTable; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.BufferedMutatorParams; 059import org.apache.hadoop.hbase.client.Connection; 060import org.apache.hadoop.hbase.client.ConnectionFactory; 061import org.apache.hadoop.hbase.client.Consistency; 062import org.apache.hadoop.hbase.client.Delete; 063import org.apache.hadoop.hbase.client.Durability; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Increment; 066import org.apache.hadoop.hbase.client.Put; 067import org.apache.hadoop.hbase.client.RegionInfo; 068import org.apache.hadoop.hbase.client.RegionInfoBuilder; 069import org.apache.hadoop.hbase.client.RegionLocator; 070import org.apache.hadoop.hbase.client.Result; 071import org.apache.hadoop.hbase.client.ResultScanner; 072import org.apache.hadoop.hbase.client.RowMutations; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.Table; 075import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 076import org.apache.hadoop.hbase.filter.BinaryComparator; 077import org.apache.hadoop.hbase.filter.Filter; 078import org.apache.hadoop.hbase.filter.FilterAllFilter; 079import org.apache.hadoop.hbase.filter.FilterList; 080import org.apache.hadoop.hbase.filter.PageFilter; 081import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 082import org.apache.hadoop.hbase.filter.WhileMatchFilter; 083import org.apache.hadoop.hbase.io.compress.Compression; 084import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 085import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 086import org.apache.hadoop.hbase.regionserver.BloomType; 087import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 088import org.apache.hadoop.hbase.trace.TraceUtil; 089import org.apache.hadoop.hbase.util.ByteArrayHashKey; 090import org.apache.hadoop.hbase.util.Bytes; 091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 092import org.apache.hadoop.hbase.util.GsonUtil; 093import org.apache.hadoop.hbase.util.Hash; 094import org.apache.hadoop.hbase.util.MurmurHash; 095import org.apache.hadoop.hbase.util.Pair; 096import org.apache.hadoop.hbase.util.RandomDistribution; 097import org.apache.hadoop.hbase.util.YammerHistogramUtils; 098import org.apache.hadoop.io.LongWritable; 099import org.apache.hadoop.io.Text; 100import org.apache.hadoop.mapreduce.Job; 101import org.apache.hadoop.mapreduce.Mapper; 102import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 103import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 104import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 105import org.apache.hadoop.util.Tool; 106import org.apache.hadoop.util.ToolRunner; 107import org.apache.yetus.audience.InterfaceAudience; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 113import org.apache.hbase.thirdparty.com.google.gson.Gson; 114 115/** 116 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 117 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 118 * etc.). Pass on the command-line which test to run and how many clients are participating in this 119 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 120 * <p> 121 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 122 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 123 * pages 8-10. 124 * <p> 125 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 126 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 127 * about 1GB of data, unless specified otherwise. 128 */ 129@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 130public class PerformanceEvaluation extends Configured implements Tool { 131 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 132 static final String RANDOM_READ = "randomRead"; 133 static final String PE_COMMAND_SHORTNAME = "pe"; 134 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 135 private static final Gson GSON = GsonUtil.createGson().create(); 136 137 public static final String TABLE_NAME = "TestTable"; 138 public static final String FAMILY_NAME_BASE = "info"; 139 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 140 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 141 public static final int DEFAULT_VALUE_LENGTH = 1000; 142 public static final int ROW_LENGTH = 26; 143 144 private static final int ONE_GB = 1024 * 1024 * 1000; 145 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 146 // TODO : should we make this configurable 147 private static final int TAG_LENGTH = 256; 148 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 149 private static final MathContext CXT = MathContext.DECIMAL64; 150 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 151 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 152 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 153 154 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 155 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 156 157 static { 158 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 159 "Run async random read test"); 160 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 161 "Run async random write test"); 162 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 163 "Run async sequential read test"); 164 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 165 "Run async sequential write test"); 166 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 167 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 168 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 169 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 170 "Run random seek and scan 100 test"); 171 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 172 "Run random seek scan with both start and stop row (max 10 rows)"); 173 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 174 "Run random seek scan with both start and stop row (max 100 rows)"); 175 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 176 "Run random seek scan with both start and stop row (max 1000 rows)"); 177 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 178 "Run random seek scan with both start and stop row (max 10000 rows)"); 179 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 180 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 181 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 182 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 183 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 184 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 185 addCommandDescriptor(FilteredScanTest.class, "filterScan", 186 "Run scan test using a filter to find a specific row based on it's value " 187 + "(make sure to use --rows=20)"); 188 addCommandDescriptor(IncrementTest.class, "increment", 189 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 190 addCommandDescriptor(AppendTest.class, "append", 191 "Append on each row; clients overlap on keyspace so some concurrent operations"); 192 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 193 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 194 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 195 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 196 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 197 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 198 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 199 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 200 } 201 202 /** 203 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 204 * associated properties. 205 */ 206 protected static enum Counter { 207 /** elapsed time */ 208 ELAPSED_TIME, 209 /** number of rows */ 210 ROWS 211 } 212 213 protected static class RunResult implements Comparable<RunResult> { 214 public RunResult(long duration, Histogram hist) { 215 this.duration = duration; 216 this.hist = hist; 217 numbOfReplyOverThreshold = 0; 218 numOfReplyFromReplica = 0; 219 } 220 221 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 222 Histogram hist) { 223 this.duration = duration; 224 this.hist = hist; 225 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 226 this.numOfReplyFromReplica = numOfReplyFromReplica; 227 } 228 229 public final long duration; 230 public final Histogram hist; 231 public final long numbOfReplyOverThreshold; 232 public final long numOfReplyFromReplica; 233 234 @Override 235 public String toString() { 236 return Long.toString(duration); 237 } 238 239 @Override 240 public int compareTo(RunResult o) { 241 return Long.compare(this.duration, o.duration); 242 } 243 } 244 245 /** 246 * Constructor 247 * @param conf Configuration object 248 */ 249 public PerformanceEvaluation(final Configuration conf) { 250 super(conf); 251 } 252 253 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 254 String description) { 255 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 256 COMMANDS.put(name, cmdDescriptor); 257 } 258 259 /** 260 * Implementations can have their status set. 261 */ 262 interface Status { 263 /** 264 * Sets status 265 * @param msg status message n 266 */ 267 void setStatus(final String msg) throws IOException; 268 } 269 270 /** 271 * MapReduce job that runs a performance evaluation client in each map task. 272 */ 273 public static class EvaluationMapTask 274 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 275 276 /** configuration parameter name that contains the command */ 277 public final static String CMD_KEY = "EvaluationMapTask.command"; 278 /** configuration parameter name that contains the PE impl */ 279 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 280 281 private Class<? extends Test> cmd; 282 283 @Override 284 protected void setup(Context context) throws IOException, InterruptedException { 285 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 286 287 // this is required so that extensions of PE are instantiated within the 288 // map reduce task... 289 Class<? extends PerformanceEvaluation> peClass = 290 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 291 try { 292 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 293 } catch (Exception e) { 294 throw new IllegalStateException("Could not instantiate PE instance", e); 295 } 296 } 297 298 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 299 try { 300 return Class.forName(className).asSubclass(type); 301 } catch (ClassNotFoundException e) { 302 throw new IllegalStateException("Could not find class for name: " + className, e); 303 } 304 } 305 306 @Override 307 protected void map(LongWritable key, Text value, final Context context) 308 throws IOException, InterruptedException { 309 310 Status status = new Status() { 311 @Override 312 public void setStatus(String msg) { 313 context.setStatus(msg); 314 } 315 }; 316 317 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 318 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 319 final Connection con = ConnectionFactory.createConnection(conf); 320 AsyncConnection asyncCon = null; 321 try { 322 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 323 } catch (ExecutionException e) { 324 throw new IOException(e); 325 } 326 327 // Evaluation task 328 RunResult result = 329 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 330 // Collect how much time the thing took. Report as map output and 331 // to the ELAPSED_TIME counter. 332 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 333 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 334 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 335 context.progress(); 336 } 337 } 338 339 /* 340 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 341 * is specified or when the existing table's region replica count doesn't match {@code 342 * opts.replicas}. 343 */ 344 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 345 TableName tableName = TableName.valueOf(opts.tableName); 346 boolean needsDelete = false, exists = admin.tableExists(tableName); 347 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 348 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 349 if (!exists && isReadCmd) { 350 throw new IllegalStateException( 351 "Must specify an existing table for read commands. Run a write command first."); 352 } 353 HTableDescriptor desc = 354 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; 355 byte[][] splits = getSplits(opts); 356 357 // recreate the table when user has requested presplit or when existing 358 // {RegionSplitPolicy,replica count} does not match requested, or when the 359 // number of column families does not match requested. 360 if ( 361 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 362 || (!isReadCmd && desc != null 363 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 364 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 365 || (desc != null && desc.getColumnFamilyCount() != opts.families) 366 ) { 367 needsDelete = true; 368 // wait, why did it delete my table?!? 369 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 370 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 371 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 372 .add("replicas", opts.replicas).add("families", opts.families).toString()); 373 } 374 375 // remove an existing table 376 if (needsDelete) { 377 if (admin.isTableEnabled(tableName)) { 378 admin.disableTable(tableName); 379 } 380 admin.deleteTable(tableName); 381 } 382 383 // table creation is necessary 384 if (!exists || needsDelete) { 385 desc = getTableDescriptor(opts); 386 if (splits != null) { 387 if (LOG.isDebugEnabled()) { 388 for (int i = 0; i < splits.length; i++) { 389 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 390 } 391 } 392 } 393 admin.createTable(desc, splits); 394 LOG.info("Table " + desc + " created"); 395 } 396 return admin.tableExists(tableName); 397 } 398 399 /** 400 * Create an HTableDescriptor from provided TestOptions. 401 */ 402 protected static HTableDescriptor getTableDescriptor(TestOptions opts) { 403 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName)); 404 for (int family = 0; family < opts.families; family++) { 405 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 406 HColumnDescriptor familyDesc = new HColumnDescriptor(familyName); 407 familyDesc.setDataBlockEncoding(opts.blockEncoding); 408 familyDesc.setCompressionType(opts.compression); 409 familyDesc.setEncryptionType(opts.encryption); 410 familyDesc.setBloomFilterType(opts.bloomType); 411 familyDesc.setBlocksize(opts.blockSize); 412 if (opts.inMemoryCF) { 413 familyDesc.setInMemory(true); 414 } 415 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction); 416 tableDesc.addFamily(familyDesc); 417 } 418 if (opts.replicas != DEFAULT_OPTS.replicas) { 419 tableDesc.setRegionReplication(opts.replicas); 420 } 421 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 422 tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy); 423 } 424 return tableDesc; 425 } 426 427 /** 428 * generates splits based on total number of rows and specified split regions 429 */ 430 protected static byte[][] getSplits(TestOptions opts) { 431 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 432 433 int numSplitPoints = opts.presplitRegions - 1; 434 byte[][] splits = new byte[numSplitPoints][]; 435 int jump = opts.totalRows / opts.presplitRegions; 436 for (int i = 0; i < numSplitPoints; i++) { 437 int rowkey = jump * (1 + i); 438 splits[i] = format(rowkey); 439 } 440 return splits; 441 } 442 443 static void setupConnectionCount(final TestOptions opts) { 444 if (opts.oneCon) { 445 opts.connCount = 1; 446 } else { 447 if (opts.connCount == -1) { 448 // set to thread number if connCount is not set 449 opts.connCount = opts.numClientThreads; 450 } 451 } 452 } 453 454 /* 455 * Run all clients in this vm each to its own thread. 456 */ 457 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 458 throws IOException, InterruptedException, ExecutionException { 459 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 460 assert cmd != null; 461 @SuppressWarnings("unchecked") 462 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 463 RunResult[] results = new RunResult[opts.numClientThreads]; 464 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 465 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 466 setupConnectionCount(opts); 467 final Connection[] cons = new Connection[opts.connCount]; 468 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 469 for (int i = 0; i < opts.connCount; i++) { 470 cons[i] = ConnectionFactory.createConnection(conf); 471 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 472 } 473 LOG 474 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 475 for (int i = 0; i < threads.length; i++) { 476 final int index = i; 477 threads[i] = pool.submit(new Callable<RunResult>() { 478 @Override 479 public RunResult call() throws Exception { 480 TestOptions threadOpts = new TestOptions(opts); 481 final Connection con = cons[index % cons.length]; 482 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 483 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 484 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 485 @Override 486 public void setStatus(final String msg) throws IOException { 487 LOG.info(msg); 488 } 489 }); 490 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 491 + "ms over " + threadOpts.perClientRunRows + " rows"); 492 if (opts.latencyThreshold > 0) { 493 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 494 + "(ms) is " + run.numbOfReplyOverThreshold); 495 } 496 return run; 497 } 498 }); 499 } 500 pool.shutdown(); 501 502 for (int i = 0; i < threads.length; i++) { 503 try { 504 results[i] = threads[i].get(); 505 } catch (ExecutionException e) { 506 throw new IOException(e.getCause()); 507 } 508 } 509 final String test = cmd.getSimpleName(); 510 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 511 Arrays.sort(results); 512 long total = 0; 513 float avgLatency = 0; 514 float avgTPS = 0; 515 long replicaWins = 0; 516 for (RunResult result : results) { 517 total += result.duration; 518 avgLatency += result.hist.getSnapshot().getMean(); 519 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 520 replicaWins += result.numOfReplyFromReplica; 521 } 522 avgTPS *= 1000; // ms to second 523 avgLatency = avgLatency / results.length; 524 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 525 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 526 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 527 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 528 if (opts.replicas > 1) { 529 LOG.info("[results from replica regions] " + replicaWins); 530 } 531 532 for (int i = 0; i < opts.connCount; i++) { 533 cons[i].close(); 534 asyncCons[i].close(); 535 } 536 537 return results; 538 } 539 540 /* 541 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 542 * out an input file with instruction per client regards which row they are to start on. 543 * @param cmd Command to run. n 544 */ 545 static Job doMapReduce(TestOptions opts, final Configuration conf) 546 throws IOException, InterruptedException, ClassNotFoundException { 547 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 548 assert cmd != null; 549 Path inputDir = writeInputFile(conf, opts); 550 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 551 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 552 Job job = Job.getInstance(conf); 553 job.setJarByClass(PerformanceEvaluation.class); 554 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 555 556 job.setInputFormatClass(NLineInputFormat.class); 557 NLineInputFormat.setInputPaths(job, inputDir); 558 // this is default, but be explicit about it just in case. 559 NLineInputFormat.setNumLinesPerSplit(job, 1); 560 561 job.setOutputKeyClass(LongWritable.class); 562 job.setOutputValueClass(LongWritable.class); 563 564 job.setMapperClass(EvaluationMapTask.class); 565 job.setReducerClass(LongSumReducer.class); 566 567 job.setNumReduceTasks(1); 568 569 job.setOutputFormatClass(TextOutputFormat.class); 570 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 571 572 TableMapReduceUtil.addDependencyJars(job); 573 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 574 // metrics 575 Gson.class, // gson 576 FilterAllFilter.class // hbase-server tests jar 577 ); 578 579 TableMapReduceUtil.initCredentials(job); 580 581 job.waitForCompletion(true); 582 return job; 583 } 584 585 /** 586 * Each client has one mapper to do the work, and client do the resulting count in a map task. 587 */ 588 589 static String JOB_INPUT_FILENAME = "input.txt"; 590 591 /* 592 * Write input file of offsets-per-client for the mapreduce job. 593 * @param c Configuration 594 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME n 595 */ 596 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 597 return writeInputFile(c, opts, new Path(".")); 598 } 599 600 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 601 throws IOException { 602 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 603 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 604 Path inputDir = new Path(jobdir, "inputs"); 605 606 FileSystem fs = FileSystem.get(c); 607 fs.mkdirs(inputDir); 608 609 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 610 PrintStream out = new PrintStream(fs.create(inputFile)); 611 // Make input random. 612 Map<Integer, String> m = new TreeMap<>(); 613 Hash h = MurmurHash.getInstance(); 614 int perClientRows = (opts.totalRows / opts.numClientThreads); 615 try { 616 for (int j = 0; j < opts.numClientThreads; j++) { 617 TestOptions next = new TestOptions(opts); 618 next.startRow = j * perClientRows; 619 next.perClientRunRows = perClientRows; 620 String s = GSON.toJson(next); 621 LOG.info("Client=" + j + ", input=" + s); 622 byte[] b = Bytes.toBytes(s); 623 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 624 m.put(hash, s); 625 } 626 for (Map.Entry<Integer, String> e : m.entrySet()) { 627 out.println(e.getValue()); 628 } 629 } finally { 630 out.close(); 631 } 632 return inputDir; 633 } 634 635 /** 636 * Describes a command. 637 */ 638 static class CmdDescriptor { 639 private Class<? extends TestBase> cmdClass; 640 private String name; 641 private String description; 642 643 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 644 this.cmdClass = cmdClass; 645 this.name = name; 646 this.description = description; 647 } 648 649 public Class<? extends TestBase> getCmdClass() { 650 return cmdClass; 651 } 652 653 public String getName() { 654 return name; 655 } 656 657 public String getDescription() { 658 return description; 659 } 660 } 661 662 /** 663 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 664 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 665 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 666 * need to add to the clone constructor below copying your new option from the 'that' to the 667 * 'this'. Look for 'clone' below. 668 */ 669 static class TestOptions { 670 String cmdName = null; 671 boolean nomapred = false; 672 boolean filterAll = false; 673 int startRow = 0; 674 float size = 1.0f; 675 int perClientRunRows = DEFAULT_ROWS_PER_GB; 676 int numClientThreads = 1; 677 int totalRows = DEFAULT_ROWS_PER_GB; 678 int measureAfter = 0; 679 float sampleRate = 1.0f; 680 /** 681 * @deprecated Useless after switching to OpenTelemetry 682 */ 683 @Deprecated 684 double traceRate = 0.0; 685 String tableName = TABLE_NAME; 686 boolean flushCommits = true; 687 boolean writeToWAL = true; 688 boolean autoFlush = false; 689 boolean oneCon = false; 690 int connCount = -1; // wil decide the actual num later 691 boolean useTags = false; 692 int noOfTags = 1; 693 boolean reportLatency = false; 694 int multiGet = 0; 695 int multiPut = 0; 696 int randomSleep = 0; 697 boolean inMemoryCF = false; 698 int presplitRegions = 0; 699 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; 700 String splitPolicy = null; 701 Compression.Algorithm compression = Compression.Algorithm.NONE; 702 String encryption = null; 703 BloomType bloomType = BloomType.ROW; 704 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 705 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 706 boolean valueRandom = false; 707 boolean valueZipf = false; 708 int valueSize = DEFAULT_VALUE_LENGTH; 709 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 710 int cycles = 1; 711 int columns = 1; 712 int families = 1; 713 int caching = 30; 714 int latencyThreshold = 0; // in millsecond 715 boolean addColumns = true; 716 MemoryCompactionPolicy inMemoryCompaction = 717 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 718 boolean asyncPrefetch = false; 719 boolean cacheBlocks = true; 720 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 721 long bufferSize = 2l * 1024l * 1024l; 722 723 public TestOptions() { 724 } 725 726 /** 727 * Clone constructor. 728 * @param that Object to copy from. 729 */ 730 public TestOptions(TestOptions that) { 731 this.cmdName = that.cmdName; 732 this.cycles = that.cycles; 733 this.nomapred = that.nomapred; 734 this.startRow = that.startRow; 735 this.size = that.size; 736 this.perClientRunRows = that.perClientRunRows; 737 this.numClientThreads = that.numClientThreads; 738 this.totalRows = that.totalRows; 739 this.sampleRate = that.sampleRate; 740 this.traceRate = that.traceRate; 741 this.tableName = that.tableName; 742 this.flushCommits = that.flushCommits; 743 this.writeToWAL = that.writeToWAL; 744 this.autoFlush = that.autoFlush; 745 this.oneCon = that.oneCon; 746 this.connCount = that.connCount; 747 this.useTags = that.useTags; 748 this.noOfTags = that.noOfTags; 749 this.reportLatency = that.reportLatency; 750 this.latencyThreshold = that.latencyThreshold; 751 this.multiGet = that.multiGet; 752 this.multiPut = that.multiPut; 753 this.inMemoryCF = that.inMemoryCF; 754 this.presplitRegions = that.presplitRegions; 755 this.replicas = that.replicas; 756 this.splitPolicy = that.splitPolicy; 757 this.compression = that.compression; 758 this.encryption = that.encryption; 759 this.blockEncoding = that.blockEncoding; 760 this.filterAll = that.filterAll; 761 this.bloomType = that.bloomType; 762 this.blockSize = that.blockSize; 763 this.valueRandom = that.valueRandom; 764 this.valueZipf = that.valueZipf; 765 this.valueSize = that.valueSize; 766 this.period = that.period; 767 this.randomSleep = that.randomSleep; 768 this.measureAfter = that.measureAfter; 769 this.addColumns = that.addColumns; 770 this.columns = that.columns; 771 this.families = that.families; 772 this.caching = that.caching; 773 this.inMemoryCompaction = that.inMemoryCompaction; 774 this.asyncPrefetch = that.asyncPrefetch; 775 this.cacheBlocks = that.cacheBlocks; 776 this.scanReadType = that.scanReadType; 777 this.bufferSize = that.bufferSize; 778 } 779 780 public int getCaching() { 781 return this.caching; 782 } 783 784 public void setCaching(final int caching) { 785 this.caching = caching; 786 } 787 788 public int getColumns() { 789 return this.columns; 790 } 791 792 public void setColumns(final int columns) { 793 this.columns = columns; 794 } 795 796 public int getFamilies() { 797 return this.families; 798 } 799 800 public void setFamilies(final int families) { 801 this.families = families; 802 } 803 804 public int getCycles() { 805 return this.cycles; 806 } 807 808 public void setCycles(final int cycles) { 809 this.cycles = cycles; 810 } 811 812 public boolean isValueZipf() { 813 return valueZipf; 814 } 815 816 public void setValueZipf(boolean valueZipf) { 817 this.valueZipf = valueZipf; 818 } 819 820 public String getCmdName() { 821 return cmdName; 822 } 823 824 public void setCmdName(String cmdName) { 825 this.cmdName = cmdName; 826 } 827 828 public int getRandomSleep() { 829 return randomSleep; 830 } 831 832 public void setRandomSleep(int randomSleep) { 833 this.randomSleep = randomSleep; 834 } 835 836 public int getReplicas() { 837 return replicas; 838 } 839 840 public void setReplicas(int replicas) { 841 this.replicas = replicas; 842 } 843 844 public String getSplitPolicy() { 845 return splitPolicy; 846 } 847 848 public void setSplitPolicy(String splitPolicy) { 849 this.splitPolicy = splitPolicy; 850 } 851 852 public void setNomapred(boolean nomapred) { 853 this.nomapred = nomapred; 854 } 855 856 public void setFilterAll(boolean filterAll) { 857 this.filterAll = filterAll; 858 } 859 860 public void setStartRow(int startRow) { 861 this.startRow = startRow; 862 } 863 864 public void setSize(float size) { 865 this.size = size; 866 } 867 868 public void setPerClientRunRows(int perClientRunRows) { 869 this.perClientRunRows = perClientRunRows; 870 } 871 872 public void setNumClientThreads(int numClientThreads) { 873 this.numClientThreads = numClientThreads; 874 } 875 876 public void setTotalRows(int totalRows) { 877 this.totalRows = totalRows; 878 } 879 880 public void setSampleRate(float sampleRate) { 881 this.sampleRate = sampleRate; 882 } 883 884 public void setTraceRate(double traceRate) { 885 this.traceRate = traceRate; 886 } 887 888 public void setTableName(String tableName) { 889 this.tableName = tableName; 890 } 891 892 public void setFlushCommits(boolean flushCommits) { 893 this.flushCommits = flushCommits; 894 } 895 896 public void setWriteToWAL(boolean writeToWAL) { 897 this.writeToWAL = writeToWAL; 898 } 899 900 public void setAutoFlush(boolean autoFlush) { 901 this.autoFlush = autoFlush; 902 } 903 904 public void setOneCon(boolean oneCon) { 905 this.oneCon = oneCon; 906 } 907 908 public int getConnCount() { 909 return connCount; 910 } 911 912 public void setConnCount(int connCount) { 913 this.connCount = connCount; 914 } 915 916 public void setUseTags(boolean useTags) { 917 this.useTags = useTags; 918 } 919 920 public void setNoOfTags(int noOfTags) { 921 this.noOfTags = noOfTags; 922 } 923 924 public void setReportLatency(boolean reportLatency) { 925 this.reportLatency = reportLatency; 926 } 927 928 public void setMultiGet(int multiGet) { 929 this.multiGet = multiGet; 930 } 931 932 public void setMultiPut(int multiPut) { 933 this.multiPut = multiPut; 934 } 935 936 public void setInMemoryCF(boolean inMemoryCF) { 937 this.inMemoryCF = inMemoryCF; 938 } 939 940 public void setPresplitRegions(int presplitRegions) { 941 this.presplitRegions = presplitRegions; 942 } 943 944 public void setCompression(Compression.Algorithm compression) { 945 this.compression = compression; 946 } 947 948 public void setEncryption(String encryption) { 949 this.encryption = encryption; 950 } 951 952 public void setBloomType(BloomType bloomType) { 953 this.bloomType = bloomType; 954 } 955 956 public void setBlockSize(int blockSize) { 957 this.blockSize = blockSize; 958 } 959 960 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 961 this.blockEncoding = blockEncoding; 962 } 963 964 public void setValueRandom(boolean valueRandom) { 965 this.valueRandom = valueRandom; 966 } 967 968 public void setValueSize(int valueSize) { 969 this.valueSize = valueSize; 970 } 971 972 public void setBufferSize(long bufferSize) { 973 this.bufferSize = bufferSize; 974 } 975 976 public void setPeriod(int period) { 977 this.period = period; 978 } 979 980 public boolean isNomapred() { 981 return nomapred; 982 } 983 984 public boolean isFilterAll() { 985 return filterAll; 986 } 987 988 public int getStartRow() { 989 return startRow; 990 } 991 992 public float getSize() { 993 return size; 994 } 995 996 public int getPerClientRunRows() { 997 return perClientRunRows; 998 } 999 1000 public int getNumClientThreads() { 1001 return numClientThreads; 1002 } 1003 1004 public int getTotalRows() { 1005 return totalRows; 1006 } 1007 1008 public float getSampleRate() { 1009 return sampleRate; 1010 } 1011 1012 public double getTraceRate() { 1013 return traceRate; 1014 } 1015 1016 public String getTableName() { 1017 return tableName; 1018 } 1019 1020 public boolean isFlushCommits() { 1021 return flushCommits; 1022 } 1023 1024 public boolean isWriteToWAL() { 1025 return writeToWAL; 1026 } 1027 1028 public boolean isAutoFlush() { 1029 return autoFlush; 1030 } 1031 1032 public boolean isUseTags() { 1033 return useTags; 1034 } 1035 1036 public int getNoOfTags() { 1037 return noOfTags; 1038 } 1039 1040 public boolean isReportLatency() { 1041 return reportLatency; 1042 } 1043 1044 public int getMultiGet() { 1045 return multiGet; 1046 } 1047 1048 public int getMultiPut() { 1049 return multiPut; 1050 } 1051 1052 public boolean isInMemoryCF() { 1053 return inMemoryCF; 1054 } 1055 1056 public int getPresplitRegions() { 1057 return presplitRegions; 1058 } 1059 1060 public Compression.Algorithm getCompression() { 1061 return compression; 1062 } 1063 1064 public String getEncryption() { 1065 return encryption; 1066 } 1067 1068 public DataBlockEncoding getBlockEncoding() { 1069 return blockEncoding; 1070 } 1071 1072 public boolean isValueRandom() { 1073 return valueRandom; 1074 } 1075 1076 public int getValueSize() { 1077 return valueSize; 1078 } 1079 1080 public int getPeriod() { 1081 return period; 1082 } 1083 1084 public BloomType getBloomType() { 1085 return bloomType; 1086 } 1087 1088 public int getBlockSize() { 1089 return blockSize; 1090 } 1091 1092 public boolean isOneCon() { 1093 return oneCon; 1094 } 1095 1096 public int getMeasureAfter() { 1097 return measureAfter; 1098 } 1099 1100 public void setMeasureAfter(int measureAfter) { 1101 this.measureAfter = measureAfter; 1102 } 1103 1104 public boolean getAddColumns() { 1105 return addColumns; 1106 } 1107 1108 public void setAddColumns(boolean addColumns) { 1109 this.addColumns = addColumns; 1110 } 1111 1112 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1113 this.inMemoryCompaction = inMemoryCompaction; 1114 } 1115 1116 public MemoryCompactionPolicy getInMemoryCompaction() { 1117 return this.inMemoryCompaction; 1118 } 1119 1120 public long getBufferSize() { 1121 return this.bufferSize; 1122 } 1123 } 1124 1125 /* 1126 * A test. Subclass to particularize what happens per row. 1127 */ 1128 static abstract class TestBase { 1129 // Below is make it so when Tests are all running in the one 1130 // jvm, that they each have a differently seeded Random. 1131 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1132 1133 private static long nextRandomSeed() { 1134 return randomSeed.nextLong(); 1135 } 1136 1137 private final int everyN; 1138 1139 protected final Random rand = new Random(nextRandomSeed()); 1140 protected final Configuration conf; 1141 protected final TestOptions opts; 1142 1143 private final Status status; 1144 1145 private String testName; 1146 private Histogram latencyHistogram; 1147 private Histogram replicaLatencyHistogram; 1148 private Histogram valueSizeHistogram; 1149 private Histogram rpcCallsHistogram; 1150 private Histogram remoteRpcCallsHistogram; 1151 private Histogram millisBetweenNextHistogram; 1152 private Histogram regionsScannedHistogram; 1153 private Histogram bytesInResultsHistogram; 1154 private Histogram bytesInRemoteResultsHistogram; 1155 private RandomDistribution.Zipf zipf; 1156 private long numOfReplyOverLatencyThreshold = 0; 1157 private long numOfReplyFromReplica = 0; 1158 1159 /** 1160 * Note that all subclasses of this class must provide a public constructor that has the exact 1161 * same list of arguments. 1162 */ 1163 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1164 this.conf = conf; 1165 this.opts = options; 1166 this.status = status; 1167 this.testName = this.getClass().getSimpleName(); 1168 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1169 if (options.isValueZipf()) { 1170 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1171 } 1172 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1173 } 1174 1175 int getValueLength(final Random r) { 1176 if (this.opts.isValueRandom()) { 1177 return r.nextInt(opts.valueSize); 1178 } else if (this.opts.isValueZipf()) { 1179 return Math.abs(this.zipf.nextInt()); 1180 } else { 1181 return opts.valueSize; 1182 } 1183 } 1184 1185 void updateValueSize(final Result[] rs) throws IOException { 1186 updateValueSize(rs, 0); 1187 } 1188 1189 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1190 if (rs == null || (latency == 0)) return; 1191 for (Result r : rs) 1192 updateValueSize(r, latency); 1193 } 1194 1195 void updateValueSize(final Result r) throws IOException { 1196 updateValueSize(r, 0); 1197 } 1198 1199 void updateValueSize(final Result r, final long latency) throws IOException { 1200 if (r == null || (latency == 0)) return; 1201 int size = 0; 1202 // update replicaHistogram 1203 if (r.isStale()) { 1204 replicaLatencyHistogram.update(latency / 1000); 1205 numOfReplyFromReplica++; 1206 } 1207 if (!isRandomValueSize()) return; 1208 1209 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1210 size += scanner.current().getValueLength(); 1211 } 1212 updateValueSize(size); 1213 } 1214 1215 void updateValueSize(final int valueSize) { 1216 if (!isRandomValueSize()) return; 1217 this.valueSizeHistogram.update(valueSize); 1218 } 1219 1220 void updateScanMetrics(final ScanMetrics metrics) { 1221 if (metrics == null) return; 1222 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1223 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1224 if (rpcCalls != null) { 1225 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1226 } 1227 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1228 if (remoteRpcCalls != null) { 1229 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1230 } 1231 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1232 if (millisBetweenNext != null) { 1233 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1234 } 1235 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1236 if (regionsScanned != null) { 1237 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1238 } 1239 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1240 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1241 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1242 } 1243 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1244 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1245 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1246 } 1247 } 1248 1249 String generateStatus(final int sr, final int i, final int lr) { 1250 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1251 + getShortLatencyReport() + "]" 1252 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1253 } 1254 1255 boolean isRandomValueSize() { 1256 return opts.valueRandom; 1257 } 1258 1259 protected int getReportingPeriod() { 1260 return opts.period; 1261 } 1262 1263 /** 1264 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1265 */ 1266 public Histogram getLatencyHistogram() { 1267 return latencyHistogram; 1268 } 1269 1270 void testSetup() throws IOException { 1271 // test metrics 1272 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1273 // If it is a replica test, set up histogram for replica. 1274 if (opts.replicas > 1) { 1275 replicaLatencyHistogram = 1276 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1277 } 1278 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1279 // scan metrics 1280 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1281 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1282 millisBetweenNextHistogram = 1283 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1284 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1285 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1286 bytesInRemoteResultsHistogram = 1287 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1288 1289 onStartup(); 1290 } 1291 1292 abstract void onStartup() throws IOException; 1293 1294 void testTakedown() throws IOException { 1295 onTakedown(); 1296 // Print all stats for this thread continuously. 1297 // Synchronize on Test.class so different threads don't intermingle the 1298 // output. We can't use 'this' here because each thread has its own instance of Test class. 1299 synchronized (Test.class) { 1300 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1301 status 1302 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1303 if (opts.replicas > 1) { 1304 status.setStatus("Latency (us) from Replica Regions: " 1305 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1306 } 1307 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1308 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1309 if (valueSizeHistogram.getCount() > 0) { 1310 status.setStatus( 1311 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1312 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1313 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1314 } else { 1315 status.setStatus("No valueSize statistics available"); 1316 } 1317 if (rpcCallsHistogram.getCount() > 0) { 1318 status.setStatus( 1319 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1320 } 1321 if (remoteRpcCallsHistogram.getCount() > 0) { 1322 status.setStatus("remoteRpcCalls (count): " 1323 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1324 } 1325 if (millisBetweenNextHistogram.getCount() > 0) { 1326 status.setStatus("millisBetweenNext (latency): " 1327 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1328 } 1329 if (regionsScannedHistogram.getCount() > 0) { 1330 status.setStatus("regionsScanned (count): " 1331 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1332 } 1333 if (bytesInResultsHistogram.getCount() > 0) { 1334 status.setStatus("bytesInResults (size): " 1335 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1336 } 1337 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1338 status.setStatus("bytesInRemoteResults (size): " 1339 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1340 } 1341 } 1342 } 1343 1344 abstract void onTakedown() throws IOException; 1345 1346 /* 1347 * Run test 1348 * @return Elapsed time. n 1349 */ 1350 long test() throws IOException, InterruptedException { 1351 testSetup(); 1352 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1353 final long startTime = System.nanoTime(); 1354 try { 1355 testTimed(); 1356 } finally { 1357 testTakedown(); 1358 } 1359 return (System.nanoTime() - startTime) / 1000000; 1360 } 1361 1362 int getStartRow() { 1363 return opts.startRow; 1364 } 1365 1366 int getLastRow() { 1367 return getStartRow() + opts.perClientRunRows; 1368 } 1369 1370 /** 1371 * Provides an extension point for tests that don't want a per row invocation. 1372 */ 1373 void testTimed() throws IOException, InterruptedException { 1374 int startRow = getStartRow(); 1375 int lastRow = getLastRow(); 1376 // Report on completion of 1/10th of total. 1377 for (int ii = 0; ii < opts.cycles; ii++) { 1378 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1379 for (int i = startRow; i < lastRow; i++) { 1380 if (i % everyN != 0) continue; 1381 long startTime = System.nanoTime(); 1382 boolean requestSent = false; 1383 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1384 try (Scope scope = span.makeCurrent()) { 1385 requestSent = testRow(i, startTime); 1386 } finally { 1387 span.end(); 1388 } 1389 if ((i - startRow) > opts.measureAfter) { 1390 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1391 // first 9 times and sends the actual get request in the 10th iteration. 1392 // We should only set latency when actual request is sent because otherwise 1393 // it turns out to be 0. 1394 if (requestSent) { 1395 long latency = (System.nanoTime() - startTime) / 1000; 1396 latencyHistogram.update(latency); 1397 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1398 numOfReplyOverLatencyThreshold++; 1399 } 1400 } 1401 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1402 status.setStatus(generateStatus(startRow, i, lastRow)); 1403 } 1404 } 1405 } 1406 } 1407 } 1408 1409 /** Returns Subset of the histograms' calculation. */ 1410 public String getShortLatencyReport() { 1411 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1412 } 1413 1414 /** Returns Subset of the histograms' calculation. */ 1415 public String getShortValueSizeReport() { 1416 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1417 } 1418 1419 /** 1420 * Test for individual row. 1421 * @param i Row index. 1422 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1423 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1424 */ 1425 abstract boolean testRow(final int i, final long startTime) 1426 throws IOException, InterruptedException; 1427 } 1428 1429 static abstract class Test extends TestBase { 1430 protected Connection connection; 1431 1432 Test(final Connection con, final TestOptions options, final Status status) { 1433 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1434 this.connection = con; 1435 } 1436 } 1437 1438 static abstract class AsyncTest extends TestBase { 1439 protected AsyncConnection connection; 1440 1441 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1442 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1443 this.connection = con; 1444 } 1445 } 1446 1447 static abstract class TableTest extends Test { 1448 protected Table table; 1449 1450 TableTest(Connection con, TestOptions options, Status status) { 1451 super(con, options, status); 1452 } 1453 1454 @Override 1455 void onStartup() throws IOException { 1456 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1457 } 1458 1459 @Override 1460 void onTakedown() throws IOException { 1461 table.close(); 1462 } 1463 } 1464 1465 /* 1466 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1467 */ 1468 static abstract class MetaTest extends TableTest { 1469 protected int keyLength; 1470 1471 MetaTest(Connection con, TestOptions options, Status status) { 1472 super(con, options, status); 1473 keyLength = Integer.toString(opts.perClientRunRows).length(); 1474 } 1475 1476 @Override 1477 void onTakedown() throws IOException { 1478 // No clean up 1479 } 1480 1481 /* 1482 * Generates Lexicographically ascending strings 1483 */ 1484 protected byte[] getSplitKey(final int i) { 1485 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1486 } 1487 1488 } 1489 1490 static abstract class AsyncTableTest extends AsyncTest { 1491 protected AsyncTable<?> table; 1492 1493 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1494 super(con, options, status); 1495 } 1496 1497 @Override 1498 void onStartup() throws IOException { 1499 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1500 } 1501 1502 @Override 1503 void onTakedown() throws IOException { 1504 } 1505 } 1506 1507 static class AsyncRandomReadTest extends AsyncTableTest { 1508 private final Consistency consistency; 1509 private ArrayList<Get> gets; 1510 1511 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1512 super(con, options, status); 1513 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1514 if (opts.multiGet > 0) { 1515 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1516 this.gets = new ArrayList<>(opts.multiGet); 1517 } 1518 } 1519 1520 @Override 1521 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1522 if (opts.randomSleep > 0) { 1523 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1524 } 1525 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1526 for (int family = 0; family < opts.families; family++) { 1527 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1528 if (opts.addColumns) { 1529 for (int column = 0; column < opts.columns; column++) { 1530 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1531 get.addColumn(familyName, qualifier); 1532 } 1533 } else { 1534 get.addFamily(familyName); 1535 } 1536 } 1537 if (opts.filterAll) { 1538 get.setFilter(new FilterAllFilter()); 1539 } 1540 get.setConsistency(consistency); 1541 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1542 try { 1543 if (opts.multiGet > 0) { 1544 this.gets.add(get); 1545 if (this.gets.size() == opts.multiGet) { 1546 Result[] rs = 1547 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1548 updateValueSize(rs); 1549 this.gets.clear(); 1550 } else { 1551 return false; 1552 } 1553 } else { 1554 updateValueSize(this.table.get(get).get()); 1555 } 1556 } catch (ExecutionException e) { 1557 throw new IOException(e); 1558 } 1559 return true; 1560 } 1561 1562 public static RuntimeException runtime(Throwable e) { 1563 if (e instanceof RuntimeException) { 1564 return (RuntimeException) e; 1565 } 1566 return new RuntimeException(e); 1567 } 1568 1569 public static <V> V propagate(Callable<V> callable) { 1570 try { 1571 return callable.call(); 1572 } catch (Exception e) { 1573 throw runtime(e); 1574 } 1575 } 1576 1577 @Override 1578 protected int getReportingPeriod() { 1579 int period = opts.perClientRunRows / 10; 1580 return period == 0 ? opts.perClientRunRows : period; 1581 } 1582 1583 @Override 1584 protected void testTakedown() throws IOException { 1585 if (this.gets != null && this.gets.size() > 0) { 1586 this.table.get(gets); 1587 this.gets.clear(); 1588 } 1589 super.testTakedown(); 1590 } 1591 } 1592 1593 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1594 1595 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1596 super(con, options, status); 1597 } 1598 1599 @Override 1600 protected byte[] generateRow(final int i) { 1601 return getRandomRow(this.rand, opts.totalRows); 1602 } 1603 } 1604 1605 static class AsyncScanTest extends AsyncTableTest { 1606 private ResultScanner testScanner; 1607 private AsyncTable<?> asyncTable; 1608 1609 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1610 super(con, options, status); 1611 } 1612 1613 @Override 1614 void onStartup() throws IOException { 1615 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1616 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1617 } 1618 1619 @Override 1620 void testTakedown() throws IOException { 1621 if (this.testScanner != null) { 1622 updateScanMetrics(this.testScanner.getScanMetrics()); 1623 this.testScanner.close(); 1624 } 1625 super.testTakedown(); 1626 } 1627 1628 @Override 1629 boolean testRow(final int i, final long startTime) throws IOException { 1630 if (this.testScanner == null) { 1631 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1632 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1633 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1634 for (int family = 0; family < opts.families; family++) { 1635 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1636 if (opts.addColumns) { 1637 for (int column = 0; column < opts.columns; column++) { 1638 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1639 scan.addColumn(familyName, qualifier); 1640 } 1641 } else { 1642 scan.addFamily(familyName); 1643 } 1644 } 1645 if (opts.filterAll) { 1646 scan.setFilter(new FilterAllFilter()); 1647 } 1648 this.testScanner = asyncTable.getScanner(scan); 1649 } 1650 Result r = testScanner.next(); 1651 updateValueSize(r); 1652 return true; 1653 } 1654 } 1655 1656 static class AsyncSequentialReadTest extends AsyncTableTest { 1657 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1658 super(con, options, status); 1659 } 1660 1661 @Override 1662 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1663 Get get = new Get(format(i)); 1664 for (int family = 0; family < opts.families; family++) { 1665 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1666 if (opts.addColumns) { 1667 for (int column = 0; column < opts.columns; column++) { 1668 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1669 get.addColumn(familyName, qualifier); 1670 } 1671 } else { 1672 get.addFamily(familyName); 1673 } 1674 } 1675 if (opts.filterAll) { 1676 get.setFilter(new FilterAllFilter()); 1677 } 1678 try { 1679 updateValueSize(table.get(get).get()); 1680 } catch (ExecutionException e) { 1681 throw new IOException(e); 1682 } 1683 return true; 1684 } 1685 } 1686 1687 static class AsyncSequentialWriteTest extends AsyncTableTest { 1688 private ArrayList<Put> puts; 1689 1690 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1691 super(con, options, status); 1692 if (opts.multiPut > 0) { 1693 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1694 this.puts = new ArrayList<>(opts.multiPut); 1695 } 1696 } 1697 1698 protected byte[] generateRow(final int i) { 1699 return format(i); 1700 } 1701 1702 @Override 1703 @SuppressWarnings("ReturnValueIgnored") 1704 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1705 byte[] row = generateRow(i); 1706 Put put = new Put(row); 1707 for (int family = 0; family < opts.families; family++) { 1708 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1709 for (int column = 0; column < opts.columns; column++) { 1710 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1711 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1712 if (opts.useTags) { 1713 byte[] tag = generateData(this.rand, TAG_LENGTH); 1714 Tag[] tags = new Tag[opts.noOfTags]; 1715 for (int n = 0; n < opts.noOfTags; n++) { 1716 Tag t = new ArrayBackedTag((byte) n, tag); 1717 tags[n] = t; 1718 } 1719 KeyValue kv = 1720 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1721 put.add(kv); 1722 updateValueSize(kv.getValueLength()); 1723 } else { 1724 put.addColumn(familyName, qualifier, value); 1725 updateValueSize(value.length); 1726 } 1727 } 1728 } 1729 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1730 try { 1731 table.put(put).get(); 1732 if (opts.multiPut > 0) { 1733 this.puts.add(put); 1734 if (this.puts.size() == opts.multiPut) { 1735 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1736 this.puts.clear(); 1737 } else { 1738 return false; 1739 } 1740 } else { 1741 table.put(put).get(); 1742 } 1743 } catch (ExecutionException e) { 1744 throw new IOException(e); 1745 } 1746 return true; 1747 } 1748 } 1749 1750 static abstract class BufferedMutatorTest extends Test { 1751 protected BufferedMutator mutator; 1752 protected Table table; 1753 1754 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1755 super(con, options, status); 1756 } 1757 1758 @Override 1759 void onStartup() throws IOException { 1760 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1761 p.writeBufferSize(opts.bufferSize); 1762 this.mutator = connection.getBufferedMutator(p); 1763 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1764 } 1765 1766 @Override 1767 void onTakedown() throws IOException { 1768 mutator.close(); 1769 table.close(); 1770 } 1771 } 1772 1773 static class RandomSeekScanTest extends TableTest { 1774 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1775 super(con, options, status); 1776 } 1777 1778 @Override 1779 boolean testRow(final int i, final long startTime) throws IOException { 1780 Scan scan = 1781 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1782 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1783 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1784 FilterList list = new FilterList(); 1785 for (int family = 0; family < opts.families; family++) { 1786 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1787 if (opts.addColumns) { 1788 for (int column = 0; column < opts.columns; column++) { 1789 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1790 scan.addColumn(familyName, qualifier); 1791 } 1792 } else { 1793 scan.addFamily(familyName); 1794 } 1795 } 1796 if (opts.filterAll) { 1797 list.addFilter(new FilterAllFilter()); 1798 } 1799 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1800 scan.setFilter(list); 1801 ResultScanner s = this.table.getScanner(scan); 1802 try { 1803 for (Result rr; (rr = s.next()) != null;) { 1804 updateValueSize(rr); 1805 } 1806 } finally { 1807 updateScanMetrics(s.getScanMetrics()); 1808 s.close(); 1809 } 1810 return true; 1811 } 1812 1813 @Override 1814 protected int getReportingPeriod() { 1815 int period = opts.perClientRunRows / 100; 1816 return period == 0 ? opts.perClientRunRows : period; 1817 } 1818 1819 } 1820 1821 static abstract class RandomScanWithRangeTest extends TableTest { 1822 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1823 super(con, options, status); 1824 } 1825 1826 @Override 1827 boolean testRow(final int i, final long startTime) throws IOException { 1828 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1829 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1830 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1831 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1832 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1833 for (int family = 0; family < opts.families; family++) { 1834 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1835 if (opts.addColumns) { 1836 for (int column = 0; column < opts.columns; column++) { 1837 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1838 scan.addColumn(familyName, qualifier); 1839 } 1840 } else { 1841 scan.addFamily(familyName); 1842 } 1843 } 1844 if (opts.filterAll) { 1845 scan.setFilter(new FilterAllFilter()); 1846 } 1847 Result r = null; 1848 int count = 0; 1849 ResultScanner s = this.table.getScanner(scan); 1850 try { 1851 for (; (r = s.next()) != null;) { 1852 updateValueSize(r); 1853 count++; 1854 } 1855 if (i % 100 == 0) { 1856 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1857 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1858 count)); 1859 } 1860 } finally { 1861 updateScanMetrics(s.getScanMetrics()); 1862 s.close(); 1863 } 1864 return true; 1865 } 1866 1867 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1868 1869 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1870 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1871 int stop = start + maxRange; 1872 return new Pair<>(format(start), format(stop)); 1873 } 1874 1875 @Override 1876 protected int getReportingPeriod() { 1877 int period = opts.perClientRunRows / 100; 1878 return period == 0 ? opts.perClientRunRows : period; 1879 } 1880 } 1881 1882 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1883 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1884 super(con, options, status); 1885 } 1886 1887 @Override 1888 protected Pair<byte[], byte[]> getStartAndStopRow() { 1889 return generateStartAndStopRows(10); 1890 } 1891 } 1892 1893 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1894 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1895 super(con, options, status); 1896 } 1897 1898 @Override 1899 protected Pair<byte[], byte[]> getStartAndStopRow() { 1900 return generateStartAndStopRows(100); 1901 } 1902 } 1903 1904 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1905 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1906 super(con, options, status); 1907 } 1908 1909 @Override 1910 protected Pair<byte[], byte[]> getStartAndStopRow() { 1911 return generateStartAndStopRows(1000); 1912 } 1913 } 1914 1915 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1916 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1917 super(con, options, status); 1918 } 1919 1920 @Override 1921 protected Pair<byte[], byte[]> getStartAndStopRow() { 1922 return generateStartAndStopRows(10000); 1923 } 1924 } 1925 1926 static class RandomReadTest extends TableTest { 1927 private final Consistency consistency; 1928 private ArrayList<Get> gets; 1929 1930 RandomReadTest(Connection con, TestOptions options, Status status) { 1931 super(con, options, status); 1932 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1933 if (opts.multiGet > 0) { 1934 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1935 this.gets = new ArrayList<>(opts.multiGet); 1936 } 1937 } 1938 1939 @Override 1940 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1941 if (opts.randomSleep > 0) { 1942 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1943 } 1944 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1945 for (int family = 0; family < opts.families; family++) { 1946 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1947 if (opts.addColumns) { 1948 for (int column = 0; column < opts.columns; column++) { 1949 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1950 get.addColumn(familyName, qualifier); 1951 } 1952 } else { 1953 get.addFamily(familyName); 1954 } 1955 } 1956 if (opts.filterAll) { 1957 get.setFilter(new FilterAllFilter()); 1958 } 1959 get.setConsistency(consistency); 1960 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1961 if (opts.multiGet > 0) { 1962 this.gets.add(get); 1963 if (this.gets.size() == opts.multiGet) { 1964 Result[] rs = this.table.get(this.gets); 1965 if (opts.replicas > 1) { 1966 long latency = System.nanoTime() - startTime; 1967 updateValueSize(rs, latency); 1968 } else { 1969 updateValueSize(rs); 1970 } 1971 this.gets.clear(); 1972 } else { 1973 return false; 1974 } 1975 } else { 1976 if (opts.replicas > 1) { 1977 Result r = this.table.get(get); 1978 long latency = System.nanoTime() - startTime; 1979 updateValueSize(r, latency); 1980 } else { 1981 updateValueSize(this.table.get(get)); 1982 } 1983 } 1984 return true; 1985 } 1986 1987 @Override 1988 protected int getReportingPeriod() { 1989 int period = opts.perClientRunRows / 10; 1990 return period == 0 ? opts.perClientRunRows : period; 1991 } 1992 1993 @Override 1994 protected void testTakedown() throws IOException { 1995 if (this.gets != null && this.gets.size() > 0) { 1996 this.table.get(gets); 1997 this.gets.clear(); 1998 } 1999 super.testTakedown(); 2000 } 2001 } 2002 2003 /* 2004 * Send random reads against fake regions inserted by MetaWriteTest 2005 */ 2006 static class MetaRandomReadTest extends MetaTest { 2007 private Random rd = new Random(); 2008 private RegionLocator regionLocator; 2009 2010 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2011 super(con, options, status); 2012 LOG.info("call getRegionLocation"); 2013 } 2014 2015 @Override 2016 void onStartup() throws IOException { 2017 super.onStartup(); 2018 this.regionLocator = connection.getRegionLocator(table.getName()); 2019 } 2020 2021 @Override 2022 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2023 if (opts.randomSleep > 0) { 2024 Thread.sleep(rd.nextInt(opts.randomSleep)); 2025 } 2026 HRegionLocation hRegionLocation = 2027 regionLocator.getRegionLocation(getSplitKey(rd.nextInt(opts.perClientRunRows)), true); 2028 LOG.debug("get location for region: " + hRegionLocation); 2029 return true; 2030 } 2031 2032 @Override 2033 protected int getReportingPeriod() { 2034 int period = opts.perClientRunRows / 10; 2035 return period == 0 ? opts.perClientRunRows : period; 2036 } 2037 2038 @Override 2039 protected void testTakedown() throws IOException { 2040 super.testTakedown(); 2041 } 2042 } 2043 2044 static class RandomWriteTest extends SequentialWriteTest { 2045 RandomWriteTest(Connection con, TestOptions options, Status status) { 2046 super(con, options, status); 2047 } 2048 2049 @Override 2050 protected byte[] generateRow(final int i) { 2051 return getRandomRow(this.rand, opts.totalRows); 2052 } 2053 2054 } 2055 2056 static class ScanTest extends TableTest { 2057 private ResultScanner testScanner; 2058 2059 ScanTest(Connection con, TestOptions options, Status status) { 2060 super(con, options, status); 2061 } 2062 2063 @Override 2064 void testTakedown() throws IOException { 2065 if (this.testScanner != null) { 2066 this.testScanner.close(); 2067 } 2068 super.testTakedown(); 2069 } 2070 2071 @Override 2072 boolean testRow(final int i, final long startTime) throws IOException { 2073 if (this.testScanner == null) { 2074 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2075 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2076 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2077 for (int family = 0; family < opts.families; family++) { 2078 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2079 if (opts.addColumns) { 2080 for (int column = 0; column < opts.columns; column++) { 2081 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2082 scan.addColumn(familyName, qualifier); 2083 } 2084 } else { 2085 scan.addFamily(familyName); 2086 } 2087 } 2088 if (opts.filterAll) { 2089 scan.setFilter(new FilterAllFilter()); 2090 } 2091 this.testScanner = table.getScanner(scan); 2092 } 2093 Result r = testScanner.next(); 2094 updateValueSize(r); 2095 return true; 2096 } 2097 } 2098 2099 /** 2100 * Base class for operations that are CAS-like; that read a value and then set it based off what 2101 * they read. In this category is increment, append, checkAndPut, etc. 2102 * <p> 2103 * These operations also want some concurrency going on. Usually when these tests run, they 2104 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2105 * same key space. We do this with our getStartRow and getLastRow overrides. 2106 */ 2107 static abstract class CASTableTest extends TableTest { 2108 private final byte[] qualifier; 2109 2110 CASTableTest(Connection con, TestOptions options, Status status) { 2111 super(con, options, status); 2112 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2113 } 2114 2115 byte[] getQualifier() { 2116 return this.qualifier; 2117 } 2118 2119 @Override 2120 int getStartRow() { 2121 return 0; 2122 } 2123 2124 @Override 2125 int getLastRow() { 2126 return opts.perClientRunRows; 2127 } 2128 } 2129 2130 static class IncrementTest extends CASTableTest { 2131 IncrementTest(Connection con, TestOptions options, Status status) { 2132 super(con, options, status); 2133 } 2134 2135 @Override 2136 boolean testRow(final int i, final long startTime) throws IOException { 2137 Increment increment = new Increment(format(i)); 2138 // unlike checkAndXXX tests, which make most sense to do on a single value, 2139 // if multiple families are specified for an increment test we assume it is 2140 // meant to raise the work factor 2141 for (int family = 0; family < opts.families; family++) { 2142 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2143 increment.addColumn(familyName, getQualifier(), 1l); 2144 } 2145 updateValueSize(this.table.increment(increment)); 2146 return true; 2147 } 2148 } 2149 2150 static class AppendTest extends CASTableTest { 2151 AppendTest(Connection con, TestOptions options, Status status) { 2152 super(con, options, status); 2153 } 2154 2155 @Override 2156 boolean testRow(final int i, final long startTime) throws IOException { 2157 byte[] bytes = format(i); 2158 Append append = new Append(bytes); 2159 // unlike checkAndXXX tests, which make most sense to do on a single value, 2160 // if multiple families are specified for an append test we assume it is 2161 // meant to raise the work factor 2162 for (int family = 0; family < opts.families; family++) { 2163 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2164 append.addColumn(familyName, getQualifier(), bytes); 2165 } 2166 updateValueSize(this.table.append(append)); 2167 return true; 2168 } 2169 } 2170 2171 static class CheckAndMutateTest extends CASTableTest { 2172 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2173 super(con, options, status); 2174 } 2175 2176 @Override 2177 boolean testRow(final int i, final long startTime) throws IOException { 2178 final byte[] bytes = format(i); 2179 // checkAndXXX tests operate on only a single value 2180 // Put a known value so when we go to check it, it is there. 2181 Put put = new Put(bytes); 2182 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2183 this.table.put(put); 2184 RowMutations mutations = new RowMutations(bytes); 2185 mutations.add(put); 2186 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2187 .thenMutate(mutations); 2188 return true; 2189 } 2190 } 2191 2192 static class CheckAndPutTest extends CASTableTest { 2193 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2194 super(con, options, status); 2195 } 2196 2197 @Override 2198 boolean testRow(final int i, final long startTime) throws IOException { 2199 final byte[] bytes = format(i); 2200 // checkAndXXX tests operate on only a single value 2201 // Put a known value so when we go to check it, it is there. 2202 Put put = new Put(bytes); 2203 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2204 this.table.put(put); 2205 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2206 .thenPut(put); 2207 return true; 2208 } 2209 } 2210 2211 static class CheckAndDeleteTest extends CASTableTest { 2212 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2213 super(con, options, status); 2214 } 2215 2216 @Override 2217 boolean testRow(final int i, final long startTime) throws IOException { 2218 final byte[] bytes = format(i); 2219 // checkAndXXX tests operate on only a single value 2220 // Put a known value so when we go to check it, it is there. 2221 Put put = new Put(bytes); 2222 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2223 this.table.put(put); 2224 Delete delete = new Delete(put.getRow()); 2225 delete.addColumn(FAMILY_ZERO, getQualifier()); 2226 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2227 .thenDelete(delete); 2228 return true; 2229 } 2230 } 2231 2232 /* 2233 * Delete all fake regions inserted to meta table by MetaWriteTest. 2234 */ 2235 static class CleanMetaTest extends MetaTest { 2236 CleanMetaTest(Connection con, TestOptions options, Status status) { 2237 super(con, options, status); 2238 } 2239 2240 @Override 2241 boolean testRow(final int i, final long startTime) throws IOException { 2242 try { 2243 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2244 .getRegionLocation(getSplitKey(i), false).getRegion(); 2245 LOG.debug("deleting region from meta: " + regionInfo); 2246 2247 Delete delete = 2248 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2249 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2250 t.delete(delete); 2251 } 2252 } catch (IOException ie) { 2253 // Log and continue 2254 LOG.error("cannot find region with start key: " + i); 2255 } 2256 return true; 2257 } 2258 } 2259 2260 static class SequentialReadTest extends TableTest { 2261 SequentialReadTest(Connection con, TestOptions options, Status status) { 2262 super(con, options, status); 2263 } 2264 2265 @Override 2266 boolean testRow(final int i, final long startTime) throws IOException { 2267 Get get = new Get(format(i)); 2268 for (int family = 0; family < opts.families; family++) { 2269 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2270 if (opts.addColumns) { 2271 for (int column = 0; column < opts.columns; column++) { 2272 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2273 get.addColumn(familyName, qualifier); 2274 } 2275 } else { 2276 get.addFamily(familyName); 2277 } 2278 } 2279 if (opts.filterAll) { 2280 get.setFilter(new FilterAllFilter()); 2281 } 2282 updateValueSize(table.get(get)); 2283 return true; 2284 } 2285 } 2286 2287 static class SequentialWriteTest extends BufferedMutatorTest { 2288 private ArrayList<Put> puts; 2289 2290 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2291 super(con, options, status); 2292 if (opts.multiPut > 0) { 2293 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2294 this.puts = new ArrayList<>(opts.multiPut); 2295 } 2296 } 2297 2298 protected byte[] generateRow(final int i) { 2299 return format(i); 2300 } 2301 2302 @Override 2303 boolean testRow(final int i, final long startTime) throws IOException { 2304 byte[] row = generateRow(i); 2305 Put put = new Put(row); 2306 for (int family = 0; family < opts.families; family++) { 2307 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2308 for (int column = 0; column < opts.columns; column++) { 2309 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2310 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2311 if (opts.useTags) { 2312 byte[] tag = generateData(this.rand, TAG_LENGTH); 2313 Tag[] tags = new Tag[opts.noOfTags]; 2314 for (int n = 0; n < opts.noOfTags; n++) { 2315 Tag t = new ArrayBackedTag((byte) n, tag); 2316 tags[n] = t; 2317 } 2318 KeyValue kv = 2319 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2320 put.add(kv); 2321 updateValueSize(kv.getValueLength()); 2322 } else { 2323 put.addColumn(familyName, qualifier, value); 2324 updateValueSize(value.length); 2325 } 2326 } 2327 } 2328 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2329 if (opts.autoFlush) { 2330 if (opts.multiPut > 0) { 2331 this.puts.add(put); 2332 if (this.puts.size() == opts.multiPut) { 2333 table.put(this.puts); 2334 this.puts.clear(); 2335 } else { 2336 return false; 2337 } 2338 } else { 2339 table.put(put); 2340 } 2341 } else { 2342 mutator.mutate(put); 2343 } 2344 return true; 2345 } 2346 } 2347 2348 /* 2349 * Insert fake regions into meta table with contiguous split keys. 2350 */ 2351 static class MetaWriteTest extends MetaTest { 2352 2353 MetaWriteTest(Connection con, TestOptions options, Status status) { 2354 super(con, options, status); 2355 } 2356 2357 @Override 2358 boolean testRow(final int i, final long startTime) throws IOException { 2359 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2360 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2361 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2362 regionInfos.add(regionInfo); 2363 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2364 2365 // write the serverName columns 2366 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2367 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2368 EnvironmentEdgeManager.currentTime()); 2369 return true; 2370 } 2371 } 2372 2373 static class FilteredScanTest extends TableTest { 2374 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2375 2376 FilteredScanTest(Connection con, TestOptions options, Status status) { 2377 super(con, options, status); 2378 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2379 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2380 + ". This could take a very long time."); 2381 } 2382 } 2383 2384 @Override 2385 boolean testRow(int i, final long startTime) throws IOException { 2386 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2387 Scan scan = constructScan(value); 2388 ResultScanner scanner = null; 2389 try { 2390 scanner = this.table.getScanner(scan); 2391 for (Result r = null; (r = scanner.next()) != null;) { 2392 updateValueSize(r); 2393 } 2394 } finally { 2395 if (scanner != null) { 2396 updateScanMetrics(scanner.getScanMetrics()); 2397 scanner.close(); 2398 } 2399 } 2400 return true; 2401 } 2402 2403 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2404 FilterList list = new FilterList(); 2405 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2406 new BinaryComparator(valuePrefix)); 2407 list.addFilter(filter); 2408 if (opts.filterAll) { 2409 list.addFilter(new FilterAllFilter()); 2410 } 2411 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2412 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2413 .setScanMetricsEnabled(true); 2414 if (opts.addColumns) { 2415 for (int column = 0; column < opts.columns; column++) { 2416 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2417 scan.addColumn(FAMILY_ZERO, qualifier); 2418 } 2419 } else { 2420 scan.addFamily(FAMILY_ZERO); 2421 } 2422 scan.setFilter(list); 2423 return scan; 2424 } 2425 } 2426 2427 /** 2428 * Compute a throughput rate in MB/s. 2429 * @param rows Number of records consumed. 2430 * @param timeMs Time taken in milliseconds. 2431 * @return String value with label, ie '123.76 MB/s' 2432 */ 2433 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2434 int columns) { 2435 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2436 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2437 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2438 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2439 return FMT.format(mbps) + " MB/s"; 2440 } 2441 2442 /* 2443 * Format passed integer. n * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version 2444 * of passed number (Does absolute in case number is negative). 2445 */ 2446 public static byte[] format(final int number) { 2447 byte[] b = new byte[ROW_LENGTH]; 2448 int d = Math.abs(number); 2449 for (int i = b.length - 1; i >= 0; i--) { 2450 b[i] = (byte) ((d % 10) + '0'); 2451 d /= 10; 2452 } 2453 return b; 2454 } 2455 2456 /* 2457 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2458 * test, generation of the key and value consumes about 30% of CPU time. 2459 * @return Generated random value to insert into a table cell. 2460 */ 2461 public static byte[] generateData(final Random r, int length) { 2462 byte[] b = new byte[length]; 2463 int i; 2464 2465 for (i = 0; i < (length - 8); i += 8) { 2466 b[i] = (byte) (65 + r.nextInt(26)); 2467 b[i + 1] = b[i]; 2468 b[i + 2] = b[i]; 2469 b[i + 3] = b[i]; 2470 b[i + 4] = b[i]; 2471 b[i + 5] = b[i]; 2472 b[i + 6] = b[i]; 2473 b[i + 7] = b[i]; 2474 } 2475 2476 byte a = (byte) (65 + r.nextInt(26)); 2477 for (; i < length; i++) { 2478 b[i] = a; 2479 } 2480 return b; 2481 } 2482 2483 static byte[] getRandomRow(final Random random, final int totalRows) { 2484 return format(generateRandomRow(random, totalRows)); 2485 } 2486 2487 static int generateRandomRow(final Random random, final int totalRows) { 2488 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2489 } 2490 2491 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2492 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2493 throws IOException, InterruptedException { 2494 status.setStatus( 2495 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2496 long totalElapsedTime; 2497 2498 final TestBase t; 2499 try { 2500 if (AsyncTest.class.isAssignableFrom(cmd)) { 2501 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2502 Constructor<? extends AsyncTest> constructor = 2503 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2504 t = constructor.newInstance(asyncCon, opts, status); 2505 } else { 2506 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2507 Constructor<? extends Test> constructor = 2508 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2509 t = constructor.newInstance(con, opts, status); 2510 } 2511 } catch (NoSuchMethodException e) { 2512 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2513 + ". It does not provide a constructor as described by " 2514 + "the javadoc comment. Available constructors are: " 2515 + Arrays.toString(cmd.getConstructors())); 2516 } catch (Exception e) { 2517 throw new IllegalStateException("Failed to construct command class", e); 2518 } 2519 totalElapsedTime = t.test(); 2520 2521 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2522 + " for " + opts.perClientRunRows + " rows" + " (" 2523 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2524 getAverageValueLength(opts), opts.families, opts.columns) 2525 + ")"); 2526 2527 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2528 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2529 } 2530 2531 private static int getAverageValueLength(final TestOptions opts) { 2532 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2533 } 2534 2535 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2536 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2537 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2538 // the TestOptions introspection for us and dump the output in a readable format. 2539 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2540 Admin admin = null; 2541 Connection connection = null; 2542 try { 2543 connection = ConnectionFactory.createConnection(getConf()); 2544 admin = connection.getAdmin(); 2545 checkTable(admin, opts); 2546 } finally { 2547 if (admin != null) admin.close(); 2548 if (connection != null) connection.close(); 2549 } 2550 if (opts.nomapred) { 2551 doLocalClients(opts, getConf()); 2552 } else { 2553 doMapReduce(opts, getConf()); 2554 } 2555 } 2556 2557 protected void printUsage() { 2558 printUsage(PE_COMMAND_SHORTNAME, null); 2559 } 2560 2561 protected static void printUsage(final String message) { 2562 printUsage(PE_COMMAND_SHORTNAME, message); 2563 } 2564 2565 protected static void printUsageAndExit(final String message, final int exitCode) { 2566 printUsage(message); 2567 System.exit(exitCode); 2568 } 2569 2570 protected static void printUsage(final String shortName, final String message) { 2571 if (message != null && message.length() > 0) { 2572 System.err.println(message); 2573 } 2574 System.err.print("Usage: hbase " + shortName); 2575 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2576 System.err.println(); 2577 System.err.println("General Options:"); 2578 System.err.println( 2579 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2580 System.err 2581 .println(" oneCon all the threads share the same connection. Default: False"); 2582 System.err.println(" connCount connections all threads share. " 2583 + "For example, if set to 2, then all thread share 2 connection. " 2584 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2585 + "if not, connCount=thread number"); 2586 2587 System.err.println(" sampleRate Execute test on a sample of total " 2588 + "rows. Only supported by randomRead. Default: 1.0"); 2589 System.err.println(" period Report every 'period' rows: " 2590 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2591 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2592 System.err.println( 2593 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2594 System.err.println(" latency Set to report operation latencies. Default: False"); 2595 System.err.println(" latencyThreshold Set to report number of operations with latency " 2596 + "over lantencyThreshold, unit in millisecond, default 0"); 2597 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2598 + " rows have been treated. Default: 0"); 2599 System.err 2600 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2601 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2602 + "'valueSize'; set on read for stats on size: Default: Not set."); 2603 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2604 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2605 System.err.println(); 2606 System.err.println("Table Creation / Write Tests:"); 2607 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2608 System.err.println( 2609 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2610 + ". In case of randomReads and randomSeekScans this could" 2611 + " be specified along with --size to specify the number of rows to be scanned within" 2612 + " the total range specified by the size."); 2613 System.err.println( 2614 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2615 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2616 + " use size to specify the end range and --rows" 2617 + " specifies the number of rows within that range. " + "Default: 1.0."); 2618 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2619 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2620 System.err.println( 2621 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2622 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2623 + "'valueSize' in zipf form: Default: Not set."); 2624 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2625 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2626 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2627 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2628 System.err.println(" presplit Create presplit table. If a table with same name exists," 2629 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2630 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2631 System.err.println( 2632 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2633 System.err.println(" numoftags Specify the no of tags that would be needed. " 2634 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2635 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2636 System.err.println(" columns Columns to write per row. Default: 1"); 2637 System.err 2638 .println(" families Specify number of column families for the table. Default: 1"); 2639 System.err.println(); 2640 System.err.println("Read Tests:"); 2641 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2642 + " there by not returning any thing back to the client. Helps to check the server side" 2643 + " performance. Uses FilterAllFilter internally. "); 2644 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2645 + "by randomRead. Default: disabled"); 2646 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2647 + "inmemory as far as possible. Not guaranteed that reads are always served " 2648 + "from memory. Default: false"); 2649 System.err 2650 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2651 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2652 System.err 2653 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2654 + "Uses the CompactingMemstore"); 2655 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2656 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2657 System.err.println( 2658 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2659 System.err.println(" caching Scan caching to use. Default: 30"); 2660 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2661 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2662 System.err.println( 2663 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2664 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2665 System.err.println(); 2666 System.err.println(" Note: -D properties will be applied to the conf used. "); 2667 System.err.println(" For example: "); 2668 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2669 System.err.println(" -Dmapreduce.task.timeout=60000"); 2670 System.err.println(); 2671 System.err.println("Command:"); 2672 for (CmdDescriptor command : COMMANDS.values()) { 2673 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2674 } 2675 System.err.println(); 2676 System.err.println("Args:"); 2677 System.err.println(" nclients Integer. Required. Total number of clients " 2678 + "(and HRegionServers) running. 1 <= value <= 500"); 2679 System.err.println("Examples:"); 2680 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2681 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2682 System.err.println(" To run 10 clients doing increments over ten rows:"); 2683 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2684 } 2685 2686 /** 2687 * Parse options passed in via an arguments array. Assumes that array has been split on 2688 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2689 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2690 * arguments. 2691 */ 2692 static TestOptions parseOpts(Queue<String> args) { 2693 TestOptions opts = new TestOptions(); 2694 2695 String cmd = null; 2696 while ((cmd = args.poll()) != null) { 2697 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2698 // place item back onto queue so that caller knows parsing was incomplete 2699 args.add(cmd); 2700 break; 2701 } 2702 2703 final String nmr = "--nomapred"; 2704 if (cmd.startsWith(nmr)) { 2705 opts.nomapred = true; 2706 continue; 2707 } 2708 2709 final String rows = "--rows="; 2710 if (cmd.startsWith(rows)) { 2711 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2712 continue; 2713 } 2714 2715 final String cycles = "--cycles="; 2716 if (cmd.startsWith(cycles)) { 2717 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2718 continue; 2719 } 2720 2721 final String sampleRate = "--sampleRate="; 2722 if (cmd.startsWith(sampleRate)) { 2723 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2724 continue; 2725 } 2726 2727 final String table = "--table="; 2728 if (cmd.startsWith(table)) { 2729 opts.tableName = cmd.substring(table.length()); 2730 continue; 2731 } 2732 2733 final String startRow = "--startRow="; 2734 if (cmd.startsWith(startRow)) { 2735 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2736 continue; 2737 } 2738 2739 final String compress = "--compress="; 2740 if (cmd.startsWith(compress)) { 2741 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2742 continue; 2743 } 2744 2745 final String encryption = "--encryption="; 2746 if (cmd.startsWith(encryption)) { 2747 opts.encryption = cmd.substring(encryption.length()); 2748 continue; 2749 } 2750 2751 final String traceRate = "--traceRate="; 2752 if (cmd.startsWith(traceRate)) { 2753 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2754 continue; 2755 } 2756 2757 final String blockEncoding = "--blockEncoding="; 2758 if (cmd.startsWith(blockEncoding)) { 2759 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2760 continue; 2761 } 2762 2763 final String flushCommits = "--flushCommits="; 2764 if (cmd.startsWith(flushCommits)) { 2765 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2766 continue; 2767 } 2768 2769 final String writeToWAL = "--writeToWAL="; 2770 if (cmd.startsWith(writeToWAL)) { 2771 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2772 continue; 2773 } 2774 2775 final String presplit = "--presplit="; 2776 if (cmd.startsWith(presplit)) { 2777 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2778 continue; 2779 } 2780 2781 final String inMemory = "--inmemory="; 2782 if (cmd.startsWith(inMemory)) { 2783 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2784 continue; 2785 } 2786 2787 final String autoFlush = "--autoFlush="; 2788 if (cmd.startsWith(autoFlush)) { 2789 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2790 continue; 2791 } 2792 2793 final String onceCon = "--oneCon="; 2794 if (cmd.startsWith(onceCon)) { 2795 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2796 continue; 2797 } 2798 2799 final String connCount = "--connCount="; 2800 if (cmd.startsWith(connCount)) { 2801 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2802 continue; 2803 } 2804 2805 final String latencyThreshold = "--latencyThreshold="; 2806 if (cmd.startsWith(latencyThreshold)) { 2807 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2808 continue; 2809 } 2810 2811 final String latency = "--latency"; 2812 if (cmd.startsWith(latency)) { 2813 opts.reportLatency = true; 2814 continue; 2815 } 2816 2817 final String multiGet = "--multiGet="; 2818 if (cmd.startsWith(multiGet)) { 2819 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2820 continue; 2821 } 2822 2823 final String multiPut = "--multiPut="; 2824 if (cmd.startsWith(multiPut)) { 2825 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2826 continue; 2827 } 2828 2829 final String useTags = "--usetags="; 2830 if (cmd.startsWith(useTags)) { 2831 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2832 continue; 2833 } 2834 2835 final String noOfTags = "--numoftags="; 2836 if (cmd.startsWith(noOfTags)) { 2837 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2838 continue; 2839 } 2840 2841 final String replicas = "--replicas="; 2842 if (cmd.startsWith(replicas)) { 2843 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2844 continue; 2845 } 2846 2847 final String filterOutAll = "--filterAll"; 2848 if (cmd.startsWith(filterOutAll)) { 2849 opts.filterAll = true; 2850 continue; 2851 } 2852 2853 final String size = "--size="; 2854 if (cmd.startsWith(size)) { 2855 opts.size = Float.parseFloat(cmd.substring(size.length())); 2856 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2857 continue; 2858 } 2859 2860 final String splitPolicy = "--splitPolicy="; 2861 if (cmd.startsWith(splitPolicy)) { 2862 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2863 continue; 2864 } 2865 2866 final String randomSleep = "--randomSleep="; 2867 if (cmd.startsWith(randomSleep)) { 2868 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2869 continue; 2870 } 2871 2872 final String measureAfter = "--measureAfter="; 2873 if (cmd.startsWith(measureAfter)) { 2874 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2875 continue; 2876 } 2877 2878 final String bloomFilter = "--bloomFilter="; 2879 if (cmd.startsWith(bloomFilter)) { 2880 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2881 continue; 2882 } 2883 2884 final String blockSize = "--blockSize="; 2885 if (cmd.startsWith(blockSize)) { 2886 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2887 continue; 2888 } 2889 2890 final String valueSize = "--valueSize="; 2891 if (cmd.startsWith(valueSize)) { 2892 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2893 continue; 2894 } 2895 2896 final String valueRandom = "--valueRandom"; 2897 if (cmd.startsWith(valueRandom)) { 2898 opts.valueRandom = true; 2899 continue; 2900 } 2901 2902 final String valueZipf = "--valueZipf"; 2903 if (cmd.startsWith(valueZipf)) { 2904 opts.valueZipf = true; 2905 continue; 2906 } 2907 2908 final String period = "--period="; 2909 if (cmd.startsWith(period)) { 2910 opts.period = Integer.parseInt(cmd.substring(period.length())); 2911 continue; 2912 } 2913 2914 final String addColumns = "--addColumns="; 2915 if (cmd.startsWith(addColumns)) { 2916 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2917 continue; 2918 } 2919 2920 final String inMemoryCompaction = "--inmemoryCompaction="; 2921 if (cmd.startsWith(inMemoryCompaction)) { 2922 opts.inMemoryCompaction = 2923 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2924 continue; 2925 } 2926 2927 final String columns = "--columns="; 2928 if (cmd.startsWith(columns)) { 2929 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2930 continue; 2931 } 2932 2933 final String families = "--families="; 2934 if (cmd.startsWith(families)) { 2935 opts.families = Integer.parseInt(cmd.substring(families.length())); 2936 continue; 2937 } 2938 2939 final String caching = "--caching="; 2940 if (cmd.startsWith(caching)) { 2941 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2942 continue; 2943 } 2944 2945 final String asyncPrefetch = "--asyncPrefetch"; 2946 if (cmd.startsWith(asyncPrefetch)) { 2947 opts.asyncPrefetch = true; 2948 continue; 2949 } 2950 2951 final String cacheBlocks = "--cacheBlocks="; 2952 if (cmd.startsWith(cacheBlocks)) { 2953 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2954 continue; 2955 } 2956 2957 final String scanReadType = "--scanReadType="; 2958 if (cmd.startsWith(scanReadType)) { 2959 opts.scanReadType = 2960 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2961 continue; 2962 } 2963 2964 final String bufferSize = "--bufferSize="; 2965 if (cmd.startsWith(bufferSize)) { 2966 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2967 continue; 2968 } 2969 2970 validateParsedOpts(opts); 2971 2972 if (isCommandClass(cmd)) { 2973 opts.cmdName = cmd; 2974 try { 2975 opts.numClientThreads = Integer.parseInt(args.remove()); 2976 } catch (NoSuchElementException | NumberFormatException e) { 2977 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2978 } 2979 opts = calculateRowsAndSize(opts); 2980 break; 2981 } else { 2982 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2983 } 2984 2985 // Not matching any option or command. 2986 System.err.println("Error: Wrong option or command: " + cmd); 2987 args.add(cmd); 2988 break; 2989 } 2990 return opts; 2991 } 2992 2993 /** 2994 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 2995 */ 2996 private static void validateParsedOpts(TestOptions opts) { 2997 2998 if (!opts.autoFlush && opts.multiPut > 0) { 2999 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3000 } 3001 3002 if (opts.oneCon && opts.connCount > 1) { 3003 throw new IllegalArgumentException( 3004 "oneCon is set to true, " + "connCount should not bigger than 1"); 3005 } 3006 3007 if (opts.valueZipf && opts.valueRandom) { 3008 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3009 } 3010 } 3011 3012 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3013 int rowsPerGB = getRowsPerGB(opts); 3014 if ( 3015 (opts.getCmdName() != null 3016 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3017 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3018 ) { 3019 opts.totalRows = (int) opts.size * rowsPerGB; 3020 } else if (opts.size != DEFAULT_OPTS.size) { 3021 // total size in GB specified 3022 opts.totalRows = (int) opts.size * rowsPerGB; 3023 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3024 } else { 3025 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3026 opts.size = opts.totalRows / rowsPerGB; 3027 } 3028 return opts; 3029 } 3030 3031 static int getRowsPerGB(final TestOptions opts) { 3032 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3033 * opts.getColumns()); 3034 } 3035 3036 @Override 3037 public int run(String[] args) throws Exception { 3038 // Process command-line args. TODO: Better cmd-line processing 3039 // (but hopefully something not as painful as cli options). 3040 int errCode = -1; 3041 if (args.length < 1) { 3042 printUsage(); 3043 return errCode; 3044 } 3045 3046 try { 3047 LinkedList<String> argv = new LinkedList<>(); 3048 argv.addAll(Arrays.asList(args)); 3049 TestOptions opts = parseOpts(argv); 3050 3051 // args remaining, print help and exit 3052 if (!argv.isEmpty()) { 3053 errCode = 0; 3054 printUsage(); 3055 return errCode; 3056 } 3057 3058 // must run at least 1 client 3059 if (opts.numClientThreads <= 0) { 3060 throw new IllegalArgumentException("Number of clients must be > 0"); 3061 } 3062 3063 // cmdName should not be null, print help and exit 3064 if (opts.cmdName == null) { 3065 printUsage(); 3066 return errCode; 3067 } 3068 3069 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3070 if (cmdClass != null) { 3071 runTest(cmdClass, opts); 3072 errCode = 0; 3073 } 3074 3075 } catch (Exception e) { 3076 e.printStackTrace(); 3077 } 3078 3079 return errCode; 3080 } 3081 3082 private static boolean isCommandClass(String cmd) { 3083 return COMMANDS.containsKey(cmd); 3084 } 3085 3086 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3087 CmdDescriptor descriptor = COMMANDS.get(cmd); 3088 return descriptor != null ? descriptor.getCmdClass() : null; 3089 } 3090 3091 public static void main(final String[] args) throws Exception { 3092 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3093 System.exit(res); 3094 } 3095}