001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Queue; 040import java.util.Random; 041import java.util.TreeMap; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.ThreadLocalRandom; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.conf.Configured; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.Append; 055import org.apache.hadoop.hbase.client.AsyncConnection; 056import org.apache.hadoop.hbase.client.AsyncTable; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.BufferedMutatorParams; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Consistency; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Durability; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Increment; 067import org.apache.hadoop.hbase.client.Put; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.RegionInfoBuilder; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.RowMutations; 074import org.apache.hadoop.hbase.client.Scan; 075import org.apache.hadoop.hbase.client.Table; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 079import org.apache.hadoop.hbase.filter.BinaryComparator; 080import org.apache.hadoop.hbase.filter.Filter; 081import org.apache.hadoop.hbase.filter.FilterAllFilter; 082import org.apache.hadoop.hbase.filter.FilterList; 083import org.apache.hadoop.hbase.filter.PageFilter; 084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 085import org.apache.hadoop.hbase.filter.WhileMatchFilter; 086import org.apache.hadoop.hbase.io.compress.Compression; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 089import org.apache.hadoop.hbase.regionserver.BloomType; 090import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 091import org.apache.hadoop.hbase.trace.TraceUtil; 092import org.apache.hadoop.hbase.util.ByteArrayHashKey; 093import org.apache.hadoop.hbase.util.Bytes; 094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 095import org.apache.hadoop.hbase.util.GsonUtil; 096import org.apache.hadoop.hbase.util.Hash; 097import org.apache.hadoop.hbase.util.MurmurHash; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.util.RandomDistribution; 100import org.apache.hadoop.hbase.util.YammerHistogramUtils; 101import org.apache.hadoop.io.LongWritable; 102import org.apache.hadoop.io.Text; 103import org.apache.hadoop.mapreduce.Job; 104import org.apache.hadoop.mapreduce.Mapper; 105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 108import org.apache.hadoop.util.Tool; 109import org.apache.hadoop.util.ToolRunner; 110import org.apache.yetus.audience.InterfaceAudience; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113 114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 116import org.apache.hbase.thirdparty.com.google.gson.Gson; 117 118/** 119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 121 * etc.). Pass on the command-line which test to run and how many clients are participating in this 122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 123 * <p> 124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 126 * pages 8-10. 127 * <p> 128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 130 * about 1GB of data, unless specified otherwise. 131 */ 132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 133public class PerformanceEvaluation extends Configured implements Tool { 134 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 135 static final String RANDOM_READ = "randomRead"; 136 static final String PE_COMMAND_SHORTNAME = "pe"; 137 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 138 private static final Gson GSON = GsonUtil.createGson().create(); 139 140 public static final String TABLE_NAME = "TestTable"; 141 public static final String FAMILY_NAME_BASE = "info"; 142 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 143 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 144 public static final int DEFAULT_VALUE_LENGTH = 1000; 145 public static final int ROW_LENGTH = 26; 146 147 private static final int ONE_GB = 1024 * 1024 * 1000; 148 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 149 // TODO : should we make this configurable 150 private static final int TAG_LENGTH = 256; 151 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 152 private static final MathContext CXT = MathContext.DECIMAL64; 153 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 154 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 155 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 156 157 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 158 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 159 160 static { 161 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 162 "Run async random read test"); 163 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 164 "Run async random write test"); 165 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 166 "Run async sequential read test"); 167 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 168 "Run async sequential write test"); 169 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 170 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 171 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 172 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 173 "Run random seek and scan 100 test"); 174 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 175 "Run random seek scan with both start and stop row (max 10 rows)"); 176 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 177 "Run random seek scan with both start and stop row (max 100 rows)"); 178 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 179 "Run random seek scan with both start and stop row (max 1000 rows)"); 180 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 181 "Run random seek scan with both start and stop row (max 10000 rows)"); 182 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 183 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 184 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 185 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 186 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 187 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 188 addCommandDescriptor(FilteredScanTest.class, "filterScan", 189 "Run scan test using a filter to find a specific row based on it's value " 190 + "(make sure to use --rows=20)"); 191 addCommandDescriptor(IncrementTest.class, "increment", 192 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 193 addCommandDescriptor(AppendTest.class, "append", 194 "Append on each row; clients overlap on keyspace so some concurrent operations"); 195 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 196 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 197 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 198 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 200 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 202 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 203 } 204 205 /** 206 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 207 * associated properties. 208 */ 209 protected static enum Counter { 210 /** elapsed time */ 211 ELAPSED_TIME, 212 /** number of rows */ 213 ROWS 214 } 215 216 protected static class RunResult implements Comparable<RunResult> { 217 public RunResult(long duration, Histogram hist) { 218 this.duration = duration; 219 this.hist = hist; 220 numbOfReplyOverThreshold = 0; 221 numOfReplyFromReplica = 0; 222 } 223 224 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 225 Histogram hist) { 226 this.duration = duration; 227 this.hist = hist; 228 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 229 this.numOfReplyFromReplica = numOfReplyFromReplica; 230 } 231 232 public final long duration; 233 public final Histogram hist; 234 public final long numbOfReplyOverThreshold; 235 public final long numOfReplyFromReplica; 236 237 @Override 238 public String toString() { 239 return Long.toString(duration); 240 } 241 242 @Override 243 public int compareTo(RunResult o) { 244 return Long.compare(this.duration, o.duration); 245 } 246 } 247 248 /** 249 * Constructor 250 * @param conf Configuration object 251 */ 252 public PerformanceEvaluation(final Configuration conf) { 253 super(conf); 254 } 255 256 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 257 String description) { 258 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 259 COMMANDS.put(name, cmdDescriptor); 260 } 261 262 /** 263 * Implementations can have their status set. 264 */ 265 interface Status { 266 /** 267 * Sets status 268 * @param msg status message n 269 */ 270 void setStatus(final String msg) throws IOException; 271 } 272 273 /** 274 * MapReduce job that runs a performance evaluation client in each map task. 275 */ 276 public static class EvaluationMapTask 277 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 278 279 /** configuration parameter name that contains the command */ 280 public final static String CMD_KEY = "EvaluationMapTask.command"; 281 /** configuration parameter name that contains the PE impl */ 282 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 283 284 private Class<? extends Test> cmd; 285 286 @Override 287 protected void setup(Context context) throws IOException, InterruptedException { 288 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 289 290 // this is required so that extensions of PE are instantiated within the 291 // map reduce task... 292 Class<? extends PerformanceEvaluation> peClass = 293 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 294 try { 295 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 296 } catch (Exception e) { 297 throw new IllegalStateException("Could not instantiate PE instance", e); 298 } 299 } 300 301 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 302 try { 303 return Class.forName(className).asSubclass(type); 304 } catch (ClassNotFoundException e) { 305 throw new IllegalStateException("Could not find class for name: " + className, e); 306 } 307 } 308 309 @Override 310 protected void map(LongWritable key, Text value, final Context context) 311 throws IOException, InterruptedException { 312 313 Status status = new Status() { 314 @Override 315 public void setStatus(String msg) { 316 context.setStatus(msg); 317 } 318 }; 319 320 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 321 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 322 final Connection con = ConnectionFactory.createConnection(conf); 323 AsyncConnection asyncCon = null; 324 try { 325 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 326 } catch (ExecutionException e) { 327 throw new IOException(e); 328 } 329 330 // Evaluation task 331 RunResult result = 332 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 333 // Collect how much time the thing took. Report as map output and 334 // to the ELAPSED_TIME counter. 335 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 336 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 337 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 338 context.progress(); 339 } 340 } 341 342 /* 343 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 344 * is specified or when the existing table's region replica count doesn't match {@code 345 * opts.replicas}. 346 */ 347 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 348 TableName tableName = TableName.valueOf(opts.tableName); 349 boolean needsDelete = false, exists = admin.tableExists(tableName); 350 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 351 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 352 if (!exists && isReadCmd) { 353 throw new IllegalStateException( 354 "Must specify an existing table for read commands. Run a write command first."); 355 } 356 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 357 byte[][] splits = getSplits(opts); 358 359 // recreate the table when user has requested presplit or when existing 360 // {RegionSplitPolicy,replica count} does not match requested, or when the 361 // number of column families does not match requested. 362 if ( 363 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 364 || (!isReadCmd && desc != null 365 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 366 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 367 || (desc != null && desc.getColumnFamilyCount() != opts.families) 368 ) { 369 needsDelete = true; 370 // wait, why did it delete my table?!? 371 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 372 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 373 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 374 .add("replicas", opts.replicas).add("families", opts.families).toString()); 375 } 376 377 // remove an existing table 378 if (needsDelete) { 379 if (admin.isTableEnabled(tableName)) { 380 admin.disableTable(tableName); 381 } 382 admin.deleteTable(tableName); 383 } 384 385 // table creation is necessary 386 if (!exists || needsDelete) { 387 desc = getTableDescriptor(opts); 388 if (splits != null) { 389 if (LOG.isDebugEnabled()) { 390 for (int i = 0; i < splits.length; i++) { 391 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 392 } 393 } 394 } 395 if (splits != null) { 396 admin.createTable(desc, splits); 397 } else { 398 admin.createTable(desc); 399 } 400 LOG.info("Table " + desc + " created"); 401 } 402 return admin.tableExists(tableName); 403 } 404 405 /** 406 * Create an HTableDescriptor from provided TestOptions. 407 */ 408 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 409 TableDescriptorBuilder builder = 410 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 411 412 for (int family = 0; family < opts.families; family++) { 413 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 414 ColumnFamilyDescriptorBuilder cfBuilder = 415 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 416 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 417 cfBuilder.setCompressionType(opts.compression); 418 cfBuilder.setBloomFilterType(opts.bloomType); 419 cfBuilder.setBlocksize(opts.blockSize); 420 if (opts.inMemoryCF) { 421 cfBuilder.setInMemory(true); 422 } 423 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 424 builder.setColumnFamily(cfBuilder.build()); 425 } 426 if (opts.replicas != DEFAULT_OPTS.replicas) { 427 builder.setRegionReplication(opts.replicas); 428 } 429 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 430 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 431 } 432 return builder.build(); 433 } 434 435 /** 436 * generates splits based on total number of rows and specified split regions 437 */ 438 protected static byte[][] getSplits(TestOptions opts) { 439 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 440 441 int numSplitPoints = opts.presplitRegions - 1; 442 byte[][] splits = new byte[numSplitPoints][]; 443 int jump = opts.totalRows / opts.presplitRegions; 444 for (int i = 0; i < numSplitPoints; i++) { 445 int rowkey = jump * (1 + i); 446 splits[i] = format(rowkey); 447 } 448 return splits; 449 } 450 451 static void setupConnectionCount(final TestOptions opts) { 452 if (opts.oneCon) { 453 opts.connCount = 1; 454 } else { 455 if (opts.connCount == -1) { 456 // set to thread number if connCount is not set 457 opts.connCount = opts.numClientThreads; 458 } 459 } 460 } 461 462 /* 463 * Run all clients in this vm each to its own thread. 464 */ 465 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 466 throws IOException, InterruptedException, ExecutionException { 467 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 468 assert cmd != null; 469 @SuppressWarnings("unchecked") 470 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 471 RunResult[] results = new RunResult[opts.numClientThreads]; 472 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 473 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 474 setupConnectionCount(opts); 475 final Connection[] cons = new Connection[opts.connCount]; 476 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 477 for (int i = 0; i < opts.connCount; i++) { 478 cons[i] = ConnectionFactory.createConnection(conf); 479 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 480 } 481 LOG 482 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 483 for (int i = 0; i < threads.length; i++) { 484 final int index = i; 485 threads[i] = pool.submit(new Callable<RunResult>() { 486 @Override 487 public RunResult call() throws Exception { 488 TestOptions threadOpts = new TestOptions(opts); 489 final Connection con = cons[index % cons.length]; 490 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 491 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 492 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 493 @Override 494 public void setStatus(final String msg) throws IOException { 495 LOG.info(msg); 496 } 497 }); 498 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 499 + "ms over " + threadOpts.perClientRunRows + " rows"); 500 if (opts.latencyThreshold > 0) { 501 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 502 + "(ms) is " + run.numbOfReplyOverThreshold); 503 } 504 return run; 505 } 506 }); 507 } 508 pool.shutdown(); 509 510 for (int i = 0; i < threads.length; i++) { 511 try { 512 results[i] = threads[i].get(); 513 } catch (ExecutionException e) { 514 throw new IOException(e.getCause()); 515 } 516 } 517 final String test = cmd.getSimpleName(); 518 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 519 Arrays.sort(results); 520 long total = 0; 521 float avgLatency = 0; 522 float avgTPS = 0; 523 long replicaWins = 0; 524 for (RunResult result : results) { 525 total += result.duration; 526 avgLatency += result.hist.getSnapshot().getMean(); 527 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 528 replicaWins += result.numOfReplyFromReplica; 529 } 530 avgTPS *= 1000; // ms to second 531 avgLatency = avgLatency / results.length; 532 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 533 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 534 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 535 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 536 if (opts.replicas > 1) { 537 LOG.info("[results from replica regions] " + replicaWins); 538 } 539 540 for (int i = 0; i < opts.connCount; i++) { 541 cons[i].close(); 542 asyncCons[i].close(); 543 } 544 545 return results; 546 } 547 548 /* 549 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 550 * out an input file with instruction per client regards which row they are to start on. 551 * @param cmd Command to run. n 552 */ 553 static Job doMapReduce(TestOptions opts, final Configuration conf) 554 throws IOException, InterruptedException, ClassNotFoundException { 555 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 556 assert cmd != null; 557 Path inputDir = writeInputFile(conf, opts); 558 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 559 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 560 Job job = Job.getInstance(conf); 561 job.setJarByClass(PerformanceEvaluation.class); 562 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 563 564 job.setInputFormatClass(NLineInputFormat.class); 565 NLineInputFormat.setInputPaths(job, inputDir); 566 // this is default, but be explicit about it just in case. 567 NLineInputFormat.setNumLinesPerSplit(job, 1); 568 569 job.setOutputKeyClass(LongWritable.class); 570 job.setOutputValueClass(LongWritable.class); 571 572 job.setMapperClass(EvaluationMapTask.class); 573 job.setReducerClass(LongSumReducer.class); 574 575 job.setNumReduceTasks(1); 576 577 job.setOutputFormatClass(TextOutputFormat.class); 578 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 579 580 TableMapReduceUtil.addDependencyJars(job); 581 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 582 // metrics 583 Gson.class, // gson 584 FilterAllFilter.class // hbase-server tests jar 585 ); 586 587 TableMapReduceUtil.initCredentials(job); 588 589 job.waitForCompletion(true); 590 return job; 591 } 592 593 /** 594 * Each client has one mapper to do the work, and client do the resulting count in a map task. 595 */ 596 597 static String JOB_INPUT_FILENAME = "input.txt"; 598 599 /* 600 * Write input file of offsets-per-client for the mapreduce job. 601 * @param c Configuration 602 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME n 603 */ 604 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 605 return writeInputFile(c, opts, new Path(".")); 606 } 607 608 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 609 throws IOException { 610 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 611 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 612 Path inputDir = new Path(jobdir, "inputs"); 613 614 FileSystem fs = FileSystem.get(c); 615 fs.mkdirs(inputDir); 616 617 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 618 PrintStream out = new PrintStream(fs.create(inputFile)); 619 // Make input random. 620 Map<Integer, String> m = new TreeMap<>(); 621 Hash h = MurmurHash.getInstance(); 622 int perClientRows = (opts.totalRows / opts.numClientThreads); 623 try { 624 for (int j = 0; j < opts.numClientThreads; j++) { 625 TestOptions next = new TestOptions(opts); 626 next.startRow = j * perClientRows; 627 next.perClientRunRows = perClientRows; 628 String s = GSON.toJson(next); 629 LOG.info("Client=" + j + ", input=" + s); 630 byte[] b = Bytes.toBytes(s); 631 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 632 m.put(hash, s); 633 } 634 for (Map.Entry<Integer, String> e : m.entrySet()) { 635 out.println(e.getValue()); 636 } 637 } finally { 638 out.close(); 639 } 640 return inputDir; 641 } 642 643 /** 644 * Describes a command. 645 */ 646 static class CmdDescriptor { 647 private Class<? extends TestBase> cmdClass; 648 private String name; 649 private String description; 650 651 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 652 this.cmdClass = cmdClass; 653 this.name = name; 654 this.description = description; 655 } 656 657 public Class<? extends TestBase> getCmdClass() { 658 return cmdClass; 659 } 660 661 public String getName() { 662 return name; 663 } 664 665 public String getDescription() { 666 return description; 667 } 668 } 669 670 /** 671 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 672 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 673 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 674 * need to add to the clone constructor below copying your new option from the 'that' to the 675 * 'this'. Look for 'clone' below. 676 */ 677 static class TestOptions { 678 String cmdName = null; 679 boolean nomapred = false; 680 boolean filterAll = false; 681 int startRow = 0; 682 float size = 1.0f; 683 int perClientRunRows = DEFAULT_ROWS_PER_GB; 684 int numClientThreads = 1; 685 int totalRows = DEFAULT_ROWS_PER_GB; 686 int measureAfter = 0; 687 float sampleRate = 1.0f; 688 /** 689 * @deprecated Useless after switching to OpenTelemetry 690 */ 691 @Deprecated 692 double traceRate = 0.0; 693 String tableName = TABLE_NAME; 694 boolean flushCommits = true; 695 boolean writeToWAL = true; 696 boolean autoFlush = false; 697 boolean oneCon = false; 698 int connCount = -1; // wil decide the actual num later 699 boolean useTags = false; 700 int noOfTags = 1; 701 boolean reportLatency = false; 702 int multiGet = 0; 703 int multiPut = 0; 704 int randomSleep = 0; 705 boolean inMemoryCF = false; 706 int presplitRegions = 0; 707 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 708 String splitPolicy = null; 709 Compression.Algorithm compression = Compression.Algorithm.NONE; 710 BloomType bloomType = BloomType.ROW; 711 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 712 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 713 boolean valueRandom = false; 714 boolean valueZipf = false; 715 int valueSize = DEFAULT_VALUE_LENGTH; 716 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 717 int cycles = 1; 718 int columns = 1; 719 int families = 1; 720 int caching = 30; 721 int latencyThreshold = 0; // in millsecond 722 boolean addColumns = true; 723 MemoryCompactionPolicy inMemoryCompaction = 724 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 725 boolean asyncPrefetch = false; 726 boolean cacheBlocks = true; 727 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 728 long bufferSize = 2l * 1024l * 1024l; 729 730 public TestOptions() { 731 } 732 733 /** 734 * Clone constructor. 735 * @param that Object to copy from. 736 */ 737 public TestOptions(TestOptions that) { 738 this.cmdName = that.cmdName; 739 this.cycles = that.cycles; 740 this.nomapred = that.nomapred; 741 this.startRow = that.startRow; 742 this.size = that.size; 743 this.perClientRunRows = that.perClientRunRows; 744 this.numClientThreads = that.numClientThreads; 745 this.totalRows = that.totalRows; 746 this.sampleRate = that.sampleRate; 747 this.traceRate = that.traceRate; 748 this.tableName = that.tableName; 749 this.flushCommits = that.flushCommits; 750 this.writeToWAL = that.writeToWAL; 751 this.autoFlush = that.autoFlush; 752 this.oneCon = that.oneCon; 753 this.connCount = that.connCount; 754 this.useTags = that.useTags; 755 this.noOfTags = that.noOfTags; 756 this.reportLatency = that.reportLatency; 757 this.latencyThreshold = that.latencyThreshold; 758 this.multiGet = that.multiGet; 759 this.multiPut = that.multiPut; 760 this.inMemoryCF = that.inMemoryCF; 761 this.presplitRegions = that.presplitRegions; 762 this.replicas = that.replicas; 763 this.splitPolicy = that.splitPolicy; 764 this.compression = that.compression; 765 this.blockEncoding = that.blockEncoding; 766 this.filterAll = that.filterAll; 767 this.bloomType = that.bloomType; 768 this.blockSize = that.blockSize; 769 this.valueRandom = that.valueRandom; 770 this.valueZipf = that.valueZipf; 771 this.valueSize = that.valueSize; 772 this.period = that.period; 773 this.randomSleep = that.randomSleep; 774 this.measureAfter = that.measureAfter; 775 this.addColumns = that.addColumns; 776 this.columns = that.columns; 777 this.families = that.families; 778 this.caching = that.caching; 779 this.inMemoryCompaction = that.inMemoryCompaction; 780 this.asyncPrefetch = that.asyncPrefetch; 781 this.cacheBlocks = that.cacheBlocks; 782 this.scanReadType = that.scanReadType; 783 this.bufferSize = that.bufferSize; 784 } 785 786 public int getCaching() { 787 return this.caching; 788 } 789 790 public void setCaching(final int caching) { 791 this.caching = caching; 792 } 793 794 public int getColumns() { 795 return this.columns; 796 } 797 798 public void setColumns(final int columns) { 799 this.columns = columns; 800 } 801 802 public int getFamilies() { 803 return this.families; 804 } 805 806 public void setFamilies(final int families) { 807 this.families = families; 808 } 809 810 public int getCycles() { 811 return this.cycles; 812 } 813 814 public void setCycles(final int cycles) { 815 this.cycles = cycles; 816 } 817 818 public boolean isValueZipf() { 819 return valueZipf; 820 } 821 822 public void setValueZipf(boolean valueZipf) { 823 this.valueZipf = valueZipf; 824 } 825 826 public String getCmdName() { 827 return cmdName; 828 } 829 830 public void setCmdName(String cmdName) { 831 this.cmdName = cmdName; 832 } 833 834 public int getRandomSleep() { 835 return randomSleep; 836 } 837 838 public void setRandomSleep(int randomSleep) { 839 this.randomSleep = randomSleep; 840 } 841 842 public int getReplicas() { 843 return replicas; 844 } 845 846 public void setReplicas(int replicas) { 847 this.replicas = replicas; 848 } 849 850 public String getSplitPolicy() { 851 return splitPolicy; 852 } 853 854 public void setSplitPolicy(String splitPolicy) { 855 this.splitPolicy = splitPolicy; 856 } 857 858 public void setNomapred(boolean nomapred) { 859 this.nomapred = nomapred; 860 } 861 862 public void setFilterAll(boolean filterAll) { 863 this.filterAll = filterAll; 864 } 865 866 public void setStartRow(int startRow) { 867 this.startRow = startRow; 868 } 869 870 public void setSize(float size) { 871 this.size = size; 872 } 873 874 public void setPerClientRunRows(int perClientRunRows) { 875 this.perClientRunRows = perClientRunRows; 876 } 877 878 public void setNumClientThreads(int numClientThreads) { 879 this.numClientThreads = numClientThreads; 880 } 881 882 public void setTotalRows(int totalRows) { 883 this.totalRows = totalRows; 884 } 885 886 public void setSampleRate(float sampleRate) { 887 this.sampleRate = sampleRate; 888 } 889 890 public void setTraceRate(double traceRate) { 891 this.traceRate = traceRate; 892 } 893 894 public void setTableName(String tableName) { 895 this.tableName = tableName; 896 } 897 898 public void setFlushCommits(boolean flushCommits) { 899 this.flushCommits = flushCommits; 900 } 901 902 public void setWriteToWAL(boolean writeToWAL) { 903 this.writeToWAL = writeToWAL; 904 } 905 906 public void setAutoFlush(boolean autoFlush) { 907 this.autoFlush = autoFlush; 908 } 909 910 public void setOneCon(boolean oneCon) { 911 this.oneCon = oneCon; 912 } 913 914 public int getConnCount() { 915 return connCount; 916 } 917 918 public void setConnCount(int connCount) { 919 this.connCount = connCount; 920 } 921 922 public void setUseTags(boolean useTags) { 923 this.useTags = useTags; 924 } 925 926 public void setNoOfTags(int noOfTags) { 927 this.noOfTags = noOfTags; 928 } 929 930 public void setReportLatency(boolean reportLatency) { 931 this.reportLatency = reportLatency; 932 } 933 934 public void setMultiGet(int multiGet) { 935 this.multiGet = multiGet; 936 } 937 938 public void setMultiPut(int multiPut) { 939 this.multiPut = multiPut; 940 } 941 942 public void setInMemoryCF(boolean inMemoryCF) { 943 this.inMemoryCF = inMemoryCF; 944 } 945 946 public void setPresplitRegions(int presplitRegions) { 947 this.presplitRegions = presplitRegions; 948 } 949 950 public void setCompression(Compression.Algorithm compression) { 951 this.compression = compression; 952 } 953 954 public void setBloomType(BloomType bloomType) { 955 this.bloomType = bloomType; 956 } 957 958 public void setBlockSize(int blockSize) { 959 this.blockSize = blockSize; 960 } 961 962 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 963 this.blockEncoding = blockEncoding; 964 } 965 966 public void setValueRandom(boolean valueRandom) { 967 this.valueRandom = valueRandom; 968 } 969 970 public void setValueSize(int valueSize) { 971 this.valueSize = valueSize; 972 } 973 974 public void setBufferSize(long bufferSize) { 975 this.bufferSize = bufferSize; 976 } 977 978 public void setPeriod(int period) { 979 this.period = period; 980 } 981 982 public boolean isNomapred() { 983 return nomapred; 984 } 985 986 public boolean isFilterAll() { 987 return filterAll; 988 } 989 990 public int getStartRow() { 991 return startRow; 992 } 993 994 public float getSize() { 995 return size; 996 } 997 998 public int getPerClientRunRows() { 999 return perClientRunRows; 1000 } 1001 1002 public int getNumClientThreads() { 1003 return numClientThreads; 1004 } 1005 1006 public int getTotalRows() { 1007 return totalRows; 1008 } 1009 1010 public float getSampleRate() { 1011 return sampleRate; 1012 } 1013 1014 public double getTraceRate() { 1015 return traceRate; 1016 } 1017 1018 public String getTableName() { 1019 return tableName; 1020 } 1021 1022 public boolean isFlushCommits() { 1023 return flushCommits; 1024 } 1025 1026 public boolean isWriteToWAL() { 1027 return writeToWAL; 1028 } 1029 1030 public boolean isAutoFlush() { 1031 return autoFlush; 1032 } 1033 1034 public boolean isUseTags() { 1035 return useTags; 1036 } 1037 1038 public int getNoOfTags() { 1039 return noOfTags; 1040 } 1041 1042 public boolean isReportLatency() { 1043 return reportLatency; 1044 } 1045 1046 public int getMultiGet() { 1047 return multiGet; 1048 } 1049 1050 public int getMultiPut() { 1051 return multiPut; 1052 } 1053 1054 public boolean isInMemoryCF() { 1055 return inMemoryCF; 1056 } 1057 1058 public int getPresplitRegions() { 1059 return presplitRegions; 1060 } 1061 1062 public Compression.Algorithm getCompression() { 1063 return compression; 1064 } 1065 1066 public DataBlockEncoding getBlockEncoding() { 1067 return blockEncoding; 1068 } 1069 1070 public boolean isValueRandom() { 1071 return valueRandom; 1072 } 1073 1074 public int getValueSize() { 1075 return valueSize; 1076 } 1077 1078 public int getPeriod() { 1079 return period; 1080 } 1081 1082 public BloomType getBloomType() { 1083 return bloomType; 1084 } 1085 1086 public int getBlockSize() { 1087 return blockSize; 1088 } 1089 1090 public boolean isOneCon() { 1091 return oneCon; 1092 } 1093 1094 public int getMeasureAfter() { 1095 return measureAfter; 1096 } 1097 1098 public void setMeasureAfter(int measureAfter) { 1099 this.measureAfter = measureAfter; 1100 } 1101 1102 public boolean getAddColumns() { 1103 return addColumns; 1104 } 1105 1106 public void setAddColumns(boolean addColumns) { 1107 this.addColumns = addColumns; 1108 } 1109 1110 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1111 this.inMemoryCompaction = inMemoryCompaction; 1112 } 1113 1114 public MemoryCompactionPolicy getInMemoryCompaction() { 1115 return this.inMemoryCompaction; 1116 } 1117 1118 public long getBufferSize() { 1119 return this.bufferSize; 1120 } 1121 } 1122 1123 /* 1124 * A test. Subclass to particularize what happens per row. 1125 */ 1126 static abstract class TestBase { 1127 // Below is make it so when Tests are all running in the one 1128 // jvm, that they each have a differently seeded Random. 1129 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1130 1131 private static long nextRandomSeed() { 1132 return randomSeed.nextLong(); 1133 } 1134 1135 private final int everyN; 1136 1137 protected final Random rand = new Random(nextRandomSeed()); 1138 protected final Configuration conf; 1139 protected final TestOptions opts; 1140 1141 private final Status status; 1142 1143 private String testName; 1144 private Histogram latencyHistogram; 1145 private Histogram replicaLatencyHistogram; 1146 private Histogram valueSizeHistogram; 1147 private Histogram rpcCallsHistogram; 1148 private Histogram remoteRpcCallsHistogram; 1149 private Histogram millisBetweenNextHistogram; 1150 private Histogram regionsScannedHistogram; 1151 private Histogram bytesInResultsHistogram; 1152 private Histogram bytesInRemoteResultsHistogram; 1153 private RandomDistribution.Zipf zipf; 1154 private long numOfReplyOverLatencyThreshold = 0; 1155 private long numOfReplyFromReplica = 0; 1156 1157 /** 1158 * Note that all subclasses of this class must provide a public constructor that has the exact 1159 * same list of arguments. 1160 */ 1161 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1162 this.conf = conf; 1163 this.opts = options; 1164 this.status = status; 1165 this.testName = this.getClass().getSimpleName(); 1166 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1167 if (options.isValueZipf()) { 1168 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1169 } 1170 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1171 } 1172 1173 int getValueLength(final Random r) { 1174 if (this.opts.isValueRandom()) { 1175 return r.nextInt(opts.valueSize); 1176 } else if (this.opts.isValueZipf()) { 1177 return Math.abs(this.zipf.nextInt()); 1178 } else { 1179 return opts.valueSize; 1180 } 1181 } 1182 1183 void updateValueSize(final Result[] rs) throws IOException { 1184 updateValueSize(rs, 0); 1185 } 1186 1187 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1188 if (rs == null || (latency == 0)) return; 1189 for (Result r : rs) 1190 updateValueSize(r, latency); 1191 } 1192 1193 void updateValueSize(final Result r) throws IOException { 1194 updateValueSize(r, 0); 1195 } 1196 1197 void updateValueSize(final Result r, final long latency) throws IOException { 1198 if (r == null || (latency == 0)) return; 1199 int size = 0; 1200 // update replicaHistogram 1201 if (r.isStale()) { 1202 replicaLatencyHistogram.update(latency / 1000); 1203 numOfReplyFromReplica++; 1204 } 1205 if (!isRandomValueSize()) return; 1206 1207 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1208 size += scanner.current().getValueLength(); 1209 } 1210 updateValueSize(size); 1211 } 1212 1213 void updateValueSize(final int valueSize) { 1214 if (!isRandomValueSize()) return; 1215 this.valueSizeHistogram.update(valueSize); 1216 } 1217 1218 void updateScanMetrics(final ScanMetrics metrics) { 1219 if (metrics == null) return; 1220 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1221 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1222 if (rpcCalls != null) { 1223 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1224 } 1225 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1226 if (remoteRpcCalls != null) { 1227 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1228 } 1229 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1230 if (millisBetweenNext != null) { 1231 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1232 } 1233 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1234 if (regionsScanned != null) { 1235 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1236 } 1237 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1238 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1239 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1240 } 1241 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1242 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1243 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1244 } 1245 } 1246 1247 String generateStatus(final int sr, final int i, final int lr) { 1248 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1249 + getShortLatencyReport() + "]" 1250 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1251 } 1252 1253 boolean isRandomValueSize() { 1254 return opts.valueRandom; 1255 } 1256 1257 protected int getReportingPeriod() { 1258 return opts.period; 1259 } 1260 1261 /** 1262 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1263 */ 1264 public Histogram getLatencyHistogram() { 1265 return latencyHistogram; 1266 } 1267 1268 void testSetup() throws IOException { 1269 // test metrics 1270 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1271 // If it is a replica test, set up histogram for replica. 1272 if (opts.replicas > 1) { 1273 replicaLatencyHistogram = 1274 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1275 } 1276 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1277 // scan metrics 1278 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1279 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1280 millisBetweenNextHistogram = 1281 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1282 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1283 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1284 bytesInRemoteResultsHistogram = 1285 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1286 1287 onStartup(); 1288 } 1289 1290 abstract void onStartup() throws IOException; 1291 1292 void testTakedown() throws IOException { 1293 onTakedown(); 1294 // Print all stats for this thread continuously. 1295 // Synchronize on Test.class so different threads don't intermingle the 1296 // output. We can't use 'this' here because each thread has its own instance of Test class. 1297 synchronized (Test.class) { 1298 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1299 status 1300 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1301 if (opts.replicas > 1) { 1302 status.setStatus("Latency (us) from Replica Regions: " 1303 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1304 } 1305 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1306 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1307 if (valueSizeHistogram.getCount() > 0) { 1308 status.setStatus( 1309 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1310 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1311 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1312 } else { 1313 status.setStatus("No valueSize statistics available"); 1314 } 1315 if (rpcCallsHistogram.getCount() > 0) { 1316 status.setStatus( 1317 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1318 } 1319 if (remoteRpcCallsHistogram.getCount() > 0) { 1320 status.setStatus("remoteRpcCalls (count): " 1321 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1322 } 1323 if (millisBetweenNextHistogram.getCount() > 0) { 1324 status.setStatus("millisBetweenNext (latency): " 1325 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1326 } 1327 if (regionsScannedHistogram.getCount() > 0) { 1328 status.setStatus("regionsScanned (count): " 1329 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1330 } 1331 if (bytesInResultsHistogram.getCount() > 0) { 1332 status.setStatus("bytesInResults (size): " 1333 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1334 } 1335 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1336 status.setStatus("bytesInRemoteResults (size): " 1337 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1338 } 1339 } 1340 } 1341 1342 abstract void onTakedown() throws IOException; 1343 1344 /* 1345 * Run test 1346 * @return Elapsed time. n 1347 */ 1348 long test() throws IOException, InterruptedException { 1349 testSetup(); 1350 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1351 final long startTime = System.nanoTime(); 1352 try { 1353 testTimed(); 1354 } finally { 1355 testTakedown(); 1356 } 1357 return (System.nanoTime() - startTime) / 1000000; 1358 } 1359 1360 int getStartRow() { 1361 return opts.startRow; 1362 } 1363 1364 int getLastRow() { 1365 return getStartRow() + opts.perClientRunRows; 1366 } 1367 1368 /** 1369 * Provides an extension point for tests that don't want a per row invocation. 1370 */ 1371 void testTimed() throws IOException, InterruptedException { 1372 int startRow = getStartRow(); 1373 int lastRow = getLastRow(); 1374 // Report on completion of 1/10th of total. 1375 for (int ii = 0; ii < opts.cycles; ii++) { 1376 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1377 for (int i = startRow; i < lastRow; i++) { 1378 if (i % everyN != 0) continue; 1379 long startTime = System.nanoTime(); 1380 boolean requestSent = false; 1381 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1382 try (Scope scope = span.makeCurrent()) { 1383 requestSent = testRow(i, startTime); 1384 } finally { 1385 span.end(); 1386 } 1387 if ((i - startRow) > opts.measureAfter) { 1388 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1389 // first 9 times and sends the actual get request in the 10th iteration. 1390 // We should only set latency when actual request is sent because otherwise 1391 // it turns out to be 0. 1392 if (requestSent) { 1393 long latency = (System.nanoTime() - startTime) / 1000; 1394 latencyHistogram.update(latency); 1395 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1396 numOfReplyOverLatencyThreshold++; 1397 } 1398 } 1399 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1400 status.setStatus(generateStatus(startRow, i, lastRow)); 1401 } 1402 } 1403 } 1404 } 1405 } 1406 1407 /** 1408 * @return Subset of the histograms' calculation. 1409 */ 1410 public String getShortLatencyReport() { 1411 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1412 } 1413 1414 /** 1415 * @return Subset of the histograms' calculation. 1416 */ 1417 public String getShortValueSizeReport() { 1418 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1419 } 1420 1421 /** 1422 * Test for individual row. 1423 * @param i Row index. 1424 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1425 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1426 */ 1427 abstract boolean testRow(final int i, final long startTime) 1428 throws IOException, InterruptedException; 1429 } 1430 1431 static abstract class Test extends TestBase { 1432 protected Connection connection; 1433 1434 Test(final Connection con, final TestOptions options, final Status status) { 1435 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1436 this.connection = con; 1437 } 1438 } 1439 1440 static abstract class AsyncTest extends TestBase { 1441 protected AsyncConnection connection; 1442 1443 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1444 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1445 this.connection = con; 1446 } 1447 } 1448 1449 static abstract class TableTest extends Test { 1450 protected Table table; 1451 1452 TableTest(Connection con, TestOptions options, Status status) { 1453 super(con, options, status); 1454 } 1455 1456 @Override 1457 void onStartup() throws IOException { 1458 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1459 } 1460 1461 @Override 1462 void onTakedown() throws IOException { 1463 table.close(); 1464 } 1465 } 1466 1467 /* 1468 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1469 */ 1470 static abstract class MetaTest extends TableTest { 1471 protected int keyLength; 1472 1473 MetaTest(Connection con, TestOptions options, Status status) { 1474 super(con, options, status); 1475 keyLength = Integer.toString(opts.perClientRunRows).length(); 1476 } 1477 1478 @Override 1479 void onTakedown() throws IOException { 1480 // No clean up 1481 } 1482 1483 /* 1484 * Generates Lexicographically ascending strings 1485 */ 1486 protected byte[] getSplitKey(final int i) { 1487 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1488 } 1489 1490 } 1491 1492 static abstract class AsyncTableTest extends AsyncTest { 1493 protected AsyncTable<?> table; 1494 1495 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1496 super(con, options, status); 1497 } 1498 1499 @Override 1500 void onStartup() throws IOException { 1501 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1502 } 1503 1504 @Override 1505 void onTakedown() throws IOException { 1506 } 1507 } 1508 1509 static class AsyncRandomReadTest extends AsyncTableTest { 1510 private final Consistency consistency; 1511 private ArrayList<Get> gets; 1512 1513 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1514 super(con, options, status); 1515 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1516 if (opts.multiGet > 0) { 1517 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1518 this.gets = new ArrayList<>(opts.multiGet); 1519 } 1520 } 1521 1522 @Override 1523 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1524 if (opts.randomSleep > 0) { 1525 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1526 } 1527 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1528 for (int family = 0; family < opts.families; family++) { 1529 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1530 if (opts.addColumns) { 1531 for (int column = 0; column < opts.columns; column++) { 1532 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1533 get.addColumn(familyName, qualifier); 1534 } 1535 } else { 1536 get.addFamily(familyName); 1537 } 1538 } 1539 if (opts.filterAll) { 1540 get.setFilter(new FilterAllFilter()); 1541 } 1542 get.setConsistency(consistency); 1543 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1544 try { 1545 if (opts.multiGet > 0) { 1546 this.gets.add(get); 1547 if (this.gets.size() == opts.multiGet) { 1548 Result[] rs = 1549 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1550 updateValueSize(rs); 1551 this.gets.clear(); 1552 } else { 1553 return false; 1554 } 1555 } else { 1556 updateValueSize(this.table.get(get).get()); 1557 } 1558 } catch (ExecutionException e) { 1559 throw new IOException(e); 1560 } 1561 return true; 1562 } 1563 1564 public static RuntimeException runtime(Throwable e) { 1565 if (e instanceof RuntimeException) { 1566 return (RuntimeException) e; 1567 } 1568 return new RuntimeException(e); 1569 } 1570 1571 public static <V> V propagate(Callable<V> callable) { 1572 try { 1573 return callable.call(); 1574 } catch (Exception e) { 1575 throw runtime(e); 1576 } 1577 } 1578 1579 @Override 1580 protected int getReportingPeriod() { 1581 int period = opts.perClientRunRows / 10; 1582 return period == 0 ? opts.perClientRunRows : period; 1583 } 1584 1585 @Override 1586 protected void testTakedown() throws IOException { 1587 if (this.gets != null && this.gets.size() > 0) { 1588 this.table.get(gets); 1589 this.gets.clear(); 1590 } 1591 super.testTakedown(); 1592 } 1593 } 1594 1595 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1596 1597 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1598 super(con, options, status); 1599 } 1600 1601 @Override 1602 protected byte[] generateRow(final int i) { 1603 return getRandomRow(this.rand, opts.totalRows); 1604 } 1605 } 1606 1607 static class AsyncScanTest extends AsyncTableTest { 1608 private ResultScanner testScanner; 1609 private AsyncTable<?> asyncTable; 1610 1611 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1612 super(con, options, status); 1613 } 1614 1615 @Override 1616 void onStartup() throws IOException { 1617 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1618 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1619 } 1620 1621 @Override 1622 void testTakedown() throws IOException { 1623 if (this.testScanner != null) { 1624 updateScanMetrics(this.testScanner.getScanMetrics()); 1625 this.testScanner.close(); 1626 } 1627 super.testTakedown(); 1628 } 1629 1630 @Override 1631 boolean testRow(final int i, final long startTime) throws IOException { 1632 if (this.testScanner == null) { 1633 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1634 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1635 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1636 for (int family = 0; family < opts.families; family++) { 1637 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1638 if (opts.addColumns) { 1639 for (int column = 0; column < opts.columns; column++) { 1640 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1641 scan.addColumn(familyName, qualifier); 1642 } 1643 } else { 1644 scan.addFamily(familyName); 1645 } 1646 } 1647 if (opts.filterAll) { 1648 scan.setFilter(new FilterAllFilter()); 1649 } 1650 this.testScanner = asyncTable.getScanner(scan); 1651 } 1652 Result r = testScanner.next(); 1653 updateValueSize(r); 1654 return true; 1655 } 1656 } 1657 1658 static class AsyncSequentialReadTest extends AsyncTableTest { 1659 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1660 super(con, options, status); 1661 } 1662 1663 @Override 1664 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1665 Get get = new Get(format(i)); 1666 for (int family = 0; family < opts.families; family++) { 1667 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1668 if (opts.addColumns) { 1669 for (int column = 0; column < opts.columns; column++) { 1670 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1671 get.addColumn(familyName, qualifier); 1672 } 1673 } else { 1674 get.addFamily(familyName); 1675 } 1676 } 1677 if (opts.filterAll) { 1678 get.setFilter(new FilterAllFilter()); 1679 } 1680 try { 1681 updateValueSize(table.get(get).get()); 1682 } catch (ExecutionException e) { 1683 throw new IOException(e); 1684 } 1685 return true; 1686 } 1687 } 1688 1689 static class AsyncSequentialWriteTest extends AsyncTableTest { 1690 private ArrayList<Put> puts; 1691 1692 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1693 super(con, options, status); 1694 if (opts.multiPut > 0) { 1695 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1696 this.puts = new ArrayList<>(opts.multiPut); 1697 } 1698 } 1699 1700 protected byte[] generateRow(final int i) { 1701 return format(i); 1702 } 1703 1704 @Override 1705 @SuppressWarnings("ReturnValueIgnored") 1706 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1707 byte[] row = generateRow(i); 1708 Put put = new Put(row); 1709 for (int family = 0; family < opts.families; family++) { 1710 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1711 for (int column = 0; column < opts.columns; column++) { 1712 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1713 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1714 if (opts.useTags) { 1715 byte[] tag = generateData(this.rand, TAG_LENGTH); 1716 Tag[] tags = new Tag[opts.noOfTags]; 1717 for (int n = 0; n < opts.noOfTags; n++) { 1718 Tag t = new ArrayBackedTag((byte) n, tag); 1719 tags[n] = t; 1720 } 1721 KeyValue kv = 1722 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1723 put.add(kv); 1724 updateValueSize(kv.getValueLength()); 1725 } else { 1726 put.addColumn(familyName, qualifier, value); 1727 updateValueSize(value.length); 1728 } 1729 } 1730 } 1731 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1732 try { 1733 table.put(put).get(); 1734 if (opts.multiPut > 0) { 1735 this.puts.add(put); 1736 if (this.puts.size() == opts.multiPut) { 1737 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1738 this.puts.clear(); 1739 } else { 1740 return false; 1741 } 1742 } else { 1743 table.put(put).get(); 1744 } 1745 } catch (ExecutionException e) { 1746 throw new IOException(e); 1747 } 1748 return true; 1749 } 1750 } 1751 1752 static abstract class BufferedMutatorTest extends Test { 1753 protected BufferedMutator mutator; 1754 protected Table table; 1755 1756 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1757 super(con, options, status); 1758 } 1759 1760 @Override 1761 void onStartup() throws IOException { 1762 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1763 p.writeBufferSize(opts.bufferSize); 1764 this.mutator = connection.getBufferedMutator(p); 1765 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1766 } 1767 1768 @Override 1769 void onTakedown() throws IOException { 1770 mutator.close(); 1771 table.close(); 1772 } 1773 } 1774 1775 static class RandomSeekScanTest extends TableTest { 1776 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1777 super(con, options, status); 1778 } 1779 1780 @Override 1781 boolean testRow(final int i, final long startTime) throws IOException { 1782 Scan scan = 1783 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1784 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1785 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1786 FilterList list = new FilterList(); 1787 for (int family = 0; family < opts.families; family++) { 1788 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1789 if (opts.addColumns) { 1790 for (int column = 0; column < opts.columns; column++) { 1791 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1792 scan.addColumn(familyName, qualifier); 1793 } 1794 } else { 1795 scan.addFamily(familyName); 1796 } 1797 } 1798 if (opts.filterAll) { 1799 list.addFilter(new FilterAllFilter()); 1800 } 1801 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1802 scan.setFilter(list); 1803 ResultScanner s = this.table.getScanner(scan); 1804 try { 1805 for (Result rr; (rr = s.next()) != null;) { 1806 updateValueSize(rr); 1807 } 1808 } finally { 1809 updateScanMetrics(s.getScanMetrics()); 1810 s.close(); 1811 } 1812 return true; 1813 } 1814 1815 @Override 1816 protected int getReportingPeriod() { 1817 int period = opts.perClientRunRows / 100; 1818 return period == 0 ? opts.perClientRunRows : period; 1819 } 1820 1821 } 1822 1823 static abstract class RandomScanWithRangeTest extends TableTest { 1824 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1825 super(con, options, status); 1826 } 1827 1828 @Override 1829 boolean testRow(final int i, final long startTime) throws IOException { 1830 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1831 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1832 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1833 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1834 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1835 for (int family = 0; family < opts.families; family++) { 1836 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1837 if (opts.addColumns) { 1838 for (int column = 0; column < opts.columns; column++) { 1839 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1840 scan.addColumn(familyName, qualifier); 1841 } 1842 } else { 1843 scan.addFamily(familyName); 1844 } 1845 } 1846 if (opts.filterAll) { 1847 scan.setFilter(new FilterAllFilter()); 1848 } 1849 Result r = null; 1850 int count = 0; 1851 ResultScanner s = this.table.getScanner(scan); 1852 try { 1853 for (; (r = s.next()) != null;) { 1854 updateValueSize(r); 1855 count++; 1856 } 1857 if (i % 100 == 0) { 1858 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1859 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1860 count)); 1861 } 1862 } finally { 1863 updateScanMetrics(s.getScanMetrics()); 1864 s.close(); 1865 } 1866 return true; 1867 } 1868 1869 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1870 1871 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1872 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1873 int stop = start + maxRange; 1874 return new Pair<>(format(start), format(stop)); 1875 } 1876 1877 @Override 1878 protected int getReportingPeriod() { 1879 int period = opts.perClientRunRows / 100; 1880 return period == 0 ? opts.perClientRunRows : period; 1881 } 1882 } 1883 1884 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1885 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1886 super(con, options, status); 1887 } 1888 1889 @Override 1890 protected Pair<byte[], byte[]> getStartAndStopRow() { 1891 return generateStartAndStopRows(10); 1892 } 1893 } 1894 1895 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1896 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1897 super(con, options, status); 1898 } 1899 1900 @Override 1901 protected Pair<byte[], byte[]> getStartAndStopRow() { 1902 return generateStartAndStopRows(100); 1903 } 1904 } 1905 1906 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1907 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1908 super(con, options, status); 1909 } 1910 1911 @Override 1912 protected Pair<byte[], byte[]> getStartAndStopRow() { 1913 return generateStartAndStopRows(1000); 1914 } 1915 } 1916 1917 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1918 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1919 super(con, options, status); 1920 } 1921 1922 @Override 1923 protected Pair<byte[], byte[]> getStartAndStopRow() { 1924 return generateStartAndStopRows(10000); 1925 } 1926 } 1927 1928 static class RandomReadTest extends TableTest { 1929 private final Consistency consistency; 1930 private ArrayList<Get> gets; 1931 1932 RandomReadTest(Connection con, TestOptions options, Status status) { 1933 super(con, options, status); 1934 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1935 if (opts.multiGet > 0) { 1936 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1937 this.gets = new ArrayList<>(opts.multiGet); 1938 } 1939 } 1940 1941 @Override 1942 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1943 if (opts.randomSleep > 0) { 1944 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1945 } 1946 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1947 for (int family = 0; family < opts.families; family++) { 1948 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1949 if (opts.addColumns) { 1950 for (int column = 0; column < opts.columns; column++) { 1951 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1952 get.addColumn(familyName, qualifier); 1953 } 1954 } else { 1955 get.addFamily(familyName); 1956 } 1957 } 1958 if (opts.filterAll) { 1959 get.setFilter(new FilterAllFilter()); 1960 } 1961 get.setConsistency(consistency); 1962 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1963 if (opts.multiGet > 0) { 1964 this.gets.add(get); 1965 if (this.gets.size() == opts.multiGet) { 1966 Result[] rs = this.table.get(this.gets); 1967 if (opts.replicas > 1) { 1968 long latency = System.nanoTime() - startTime; 1969 updateValueSize(rs, latency); 1970 } else { 1971 updateValueSize(rs); 1972 } 1973 this.gets.clear(); 1974 } else { 1975 return false; 1976 } 1977 } else { 1978 if (opts.replicas > 1) { 1979 Result r = this.table.get(get); 1980 long latency = System.nanoTime() - startTime; 1981 updateValueSize(r, latency); 1982 } else { 1983 updateValueSize(this.table.get(get)); 1984 } 1985 } 1986 return true; 1987 } 1988 1989 @Override 1990 protected int getReportingPeriod() { 1991 int period = opts.perClientRunRows / 10; 1992 return period == 0 ? opts.perClientRunRows : period; 1993 } 1994 1995 @Override 1996 protected void testTakedown() throws IOException { 1997 if (this.gets != null && this.gets.size() > 0) { 1998 this.table.get(gets); 1999 this.gets.clear(); 2000 } 2001 super.testTakedown(); 2002 } 2003 } 2004 2005 /* 2006 * Send random reads against fake regions inserted by MetaWriteTest 2007 */ 2008 static class MetaRandomReadTest extends MetaTest { 2009 private RegionLocator regionLocator; 2010 2011 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2012 super(con, options, status); 2013 LOG.info("call getRegionLocation"); 2014 } 2015 2016 @Override 2017 void onStartup() throws IOException { 2018 super.onStartup(); 2019 this.regionLocator = connection.getRegionLocator(table.getName()); 2020 } 2021 2022 @Override 2023 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2024 if (opts.randomSleep > 0) { 2025 Thread.sleep(rand.nextInt(opts.randomSleep)); 2026 } 2027 HRegionLocation hRegionLocation = 2028 regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true); 2029 LOG.debug("get location for region: " + hRegionLocation); 2030 return true; 2031 } 2032 2033 @Override 2034 protected int getReportingPeriod() { 2035 int period = opts.perClientRunRows / 10; 2036 return period == 0 ? opts.perClientRunRows : period; 2037 } 2038 2039 @Override 2040 protected void testTakedown() throws IOException { 2041 super.testTakedown(); 2042 } 2043 } 2044 2045 static class RandomWriteTest extends SequentialWriteTest { 2046 RandomWriteTest(Connection con, TestOptions options, Status status) { 2047 super(con, options, status); 2048 } 2049 2050 @Override 2051 protected byte[] generateRow(final int i) { 2052 return getRandomRow(this.rand, opts.totalRows); 2053 } 2054 2055 } 2056 2057 static class ScanTest extends TableTest { 2058 private ResultScanner testScanner; 2059 2060 ScanTest(Connection con, TestOptions options, Status status) { 2061 super(con, options, status); 2062 } 2063 2064 @Override 2065 void testTakedown() throws IOException { 2066 if (this.testScanner != null) { 2067 this.testScanner.close(); 2068 } 2069 super.testTakedown(); 2070 } 2071 2072 @Override 2073 boolean testRow(final int i, final long startTime) throws IOException { 2074 if (this.testScanner == null) { 2075 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2076 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2077 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2078 for (int family = 0; family < opts.families; family++) { 2079 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2080 if (opts.addColumns) { 2081 for (int column = 0; column < opts.columns; column++) { 2082 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2083 scan.addColumn(familyName, qualifier); 2084 } 2085 } else { 2086 scan.addFamily(familyName); 2087 } 2088 } 2089 if (opts.filterAll) { 2090 scan.setFilter(new FilterAllFilter()); 2091 } 2092 this.testScanner = table.getScanner(scan); 2093 } 2094 Result r = testScanner.next(); 2095 updateValueSize(r); 2096 return true; 2097 } 2098 } 2099 2100 /** 2101 * Base class for operations that are CAS-like; that read a value and then set it based off what 2102 * they read. In this category is increment, append, checkAndPut, etc. 2103 * <p> 2104 * These operations also want some concurrency going on. Usually when these tests run, they 2105 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2106 * same key space. We do this with our getStartRow and getLastRow overrides. 2107 */ 2108 static abstract class CASTableTest extends TableTest { 2109 private final byte[] qualifier; 2110 2111 CASTableTest(Connection con, TestOptions options, Status status) { 2112 super(con, options, status); 2113 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2114 } 2115 2116 byte[] getQualifier() { 2117 return this.qualifier; 2118 } 2119 2120 @Override 2121 int getStartRow() { 2122 return 0; 2123 } 2124 2125 @Override 2126 int getLastRow() { 2127 return opts.perClientRunRows; 2128 } 2129 } 2130 2131 static class IncrementTest extends CASTableTest { 2132 IncrementTest(Connection con, TestOptions options, Status status) { 2133 super(con, options, status); 2134 } 2135 2136 @Override 2137 boolean testRow(final int i, final long startTime) throws IOException { 2138 Increment increment = new Increment(format(i)); 2139 // unlike checkAndXXX tests, which make most sense to do on a single value, 2140 // if multiple families are specified for an increment test we assume it is 2141 // meant to raise the work factor 2142 for (int family = 0; family < opts.families; family++) { 2143 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2144 increment.addColumn(familyName, getQualifier(), 1l); 2145 } 2146 updateValueSize(this.table.increment(increment)); 2147 return true; 2148 } 2149 } 2150 2151 static class AppendTest extends CASTableTest { 2152 AppendTest(Connection con, TestOptions options, Status status) { 2153 super(con, options, status); 2154 } 2155 2156 @Override 2157 boolean testRow(final int i, final long startTime) throws IOException { 2158 byte[] bytes = format(i); 2159 Append append = new Append(bytes); 2160 // unlike checkAndXXX tests, which make most sense to do on a single value, 2161 // if multiple families are specified for an append test we assume it is 2162 // meant to raise the work factor 2163 for (int family = 0; family < opts.families; family++) { 2164 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2165 append.addColumn(familyName, getQualifier(), bytes); 2166 } 2167 updateValueSize(this.table.append(append)); 2168 return true; 2169 } 2170 } 2171 2172 static class CheckAndMutateTest extends CASTableTest { 2173 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2174 super(con, options, status); 2175 } 2176 2177 @Override 2178 boolean testRow(final int i, final long startTime) throws IOException { 2179 final byte[] bytes = format(i); 2180 // checkAndXXX tests operate on only a single value 2181 // Put a known value so when we go to check it, it is there. 2182 Put put = new Put(bytes); 2183 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2184 this.table.put(put); 2185 RowMutations mutations = new RowMutations(bytes); 2186 mutations.add(put); 2187 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2188 .thenMutate(mutations); 2189 return true; 2190 } 2191 } 2192 2193 static class CheckAndPutTest extends CASTableTest { 2194 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2195 super(con, options, status); 2196 } 2197 2198 @Override 2199 boolean testRow(final int i, final long startTime) throws IOException { 2200 final byte[] bytes = format(i); 2201 // checkAndXXX tests operate on only a single value 2202 // Put a known value so when we go to check it, it is there. 2203 Put put = new Put(bytes); 2204 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2205 this.table.put(put); 2206 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2207 .thenPut(put); 2208 return true; 2209 } 2210 } 2211 2212 static class CheckAndDeleteTest extends CASTableTest { 2213 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2214 super(con, options, status); 2215 } 2216 2217 @Override 2218 boolean testRow(final int i, final long startTime) throws IOException { 2219 final byte[] bytes = format(i); 2220 // checkAndXXX tests operate on only a single value 2221 // Put a known value so when we go to check it, it is there. 2222 Put put = new Put(bytes); 2223 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2224 this.table.put(put); 2225 Delete delete = new Delete(put.getRow()); 2226 delete.addColumn(FAMILY_ZERO, getQualifier()); 2227 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2228 .thenDelete(delete); 2229 return true; 2230 } 2231 } 2232 2233 /* 2234 * Delete all fake regions inserted to meta table by MetaWriteTest. 2235 */ 2236 static class CleanMetaTest extends MetaTest { 2237 CleanMetaTest(Connection con, TestOptions options, Status status) { 2238 super(con, options, status); 2239 } 2240 2241 @Override 2242 boolean testRow(final int i, final long startTime) throws IOException { 2243 try { 2244 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2245 .getRegionLocation(getSplitKey(i), false).getRegion(); 2246 LOG.debug("deleting region from meta: " + regionInfo); 2247 2248 Delete delete = 2249 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2250 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2251 t.delete(delete); 2252 } 2253 } catch (IOException ie) { 2254 // Log and continue 2255 LOG.error("cannot find region with start key: " + i); 2256 } 2257 return true; 2258 } 2259 } 2260 2261 static class SequentialReadTest extends TableTest { 2262 SequentialReadTest(Connection con, TestOptions options, Status status) { 2263 super(con, options, status); 2264 } 2265 2266 @Override 2267 boolean testRow(final int i, final long startTime) throws IOException { 2268 Get get = new Get(format(i)); 2269 for (int family = 0; family < opts.families; family++) { 2270 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2271 if (opts.addColumns) { 2272 for (int column = 0; column < opts.columns; column++) { 2273 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2274 get.addColumn(familyName, qualifier); 2275 } 2276 } else { 2277 get.addFamily(familyName); 2278 } 2279 } 2280 if (opts.filterAll) { 2281 get.setFilter(new FilterAllFilter()); 2282 } 2283 updateValueSize(table.get(get)); 2284 return true; 2285 } 2286 } 2287 2288 static class SequentialWriteTest extends BufferedMutatorTest { 2289 private ArrayList<Put> puts; 2290 2291 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2292 super(con, options, status); 2293 if (opts.multiPut > 0) { 2294 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2295 this.puts = new ArrayList<>(opts.multiPut); 2296 } 2297 } 2298 2299 protected byte[] generateRow(final int i) { 2300 return format(i); 2301 } 2302 2303 @Override 2304 boolean testRow(final int i, final long startTime) throws IOException { 2305 byte[] row = generateRow(i); 2306 Put put = new Put(row); 2307 for (int family = 0; family < opts.families; family++) { 2308 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2309 for (int column = 0; column < opts.columns; column++) { 2310 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2311 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2312 if (opts.useTags) { 2313 byte[] tag = generateData(this.rand, TAG_LENGTH); 2314 Tag[] tags = new Tag[opts.noOfTags]; 2315 for (int n = 0; n < opts.noOfTags; n++) { 2316 Tag t = new ArrayBackedTag((byte) n, tag); 2317 tags[n] = t; 2318 } 2319 KeyValue kv = 2320 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2321 put.add(kv); 2322 updateValueSize(kv.getValueLength()); 2323 } else { 2324 put.addColumn(familyName, qualifier, value); 2325 updateValueSize(value.length); 2326 } 2327 } 2328 } 2329 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2330 if (opts.autoFlush) { 2331 if (opts.multiPut > 0) { 2332 this.puts.add(put); 2333 if (this.puts.size() == opts.multiPut) { 2334 table.put(this.puts); 2335 this.puts.clear(); 2336 } else { 2337 return false; 2338 } 2339 } else { 2340 table.put(put); 2341 } 2342 } else { 2343 mutator.mutate(put); 2344 } 2345 return true; 2346 } 2347 } 2348 2349 /* 2350 * Insert fake regions into meta table with contiguous split keys. 2351 */ 2352 static class MetaWriteTest extends MetaTest { 2353 2354 MetaWriteTest(Connection con, TestOptions options, Status status) { 2355 super(con, options, status); 2356 } 2357 2358 @Override 2359 boolean testRow(final int i, final long startTime) throws IOException { 2360 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2361 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2362 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2363 regionInfos.add(regionInfo); 2364 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2365 2366 // write the serverName columns 2367 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2368 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2369 EnvironmentEdgeManager.currentTime()); 2370 return true; 2371 } 2372 } 2373 2374 static class FilteredScanTest extends TableTest { 2375 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2376 2377 FilteredScanTest(Connection con, TestOptions options, Status status) { 2378 super(con, options, status); 2379 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2380 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2381 + ". This could take a very long time."); 2382 } 2383 } 2384 2385 @Override 2386 boolean testRow(int i, final long startTime) throws IOException { 2387 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2388 Scan scan = constructScan(value); 2389 ResultScanner scanner = null; 2390 try { 2391 scanner = this.table.getScanner(scan); 2392 for (Result r = null; (r = scanner.next()) != null;) { 2393 updateValueSize(r); 2394 } 2395 } finally { 2396 if (scanner != null) { 2397 updateScanMetrics(scanner.getScanMetrics()); 2398 scanner.close(); 2399 } 2400 } 2401 return true; 2402 } 2403 2404 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2405 FilterList list = new FilterList(); 2406 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2407 new BinaryComparator(valuePrefix)); 2408 list.addFilter(filter); 2409 if (opts.filterAll) { 2410 list.addFilter(new FilterAllFilter()); 2411 } 2412 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2413 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2414 .setScanMetricsEnabled(true); 2415 if (opts.addColumns) { 2416 for (int column = 0; column < opts.columns; column++) { 2417 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2418 scan.addColumn(FAMILY_ZERO, qualifier); 2419 } 2420 } else { 2421 scan.addFamily(FAMILY_ZERO); 2422 } 2423 scan.setFilter(list); 2424 return scan; 2425 } 2426 } 2427 2428 /** 2429 * Compute a throughput rate in MB/s. 2430 * @param rows Number of records consumed. 2431 * @param timeMs Time taken in milliseconds. 2432 * @return String value with label, ie '123.76 MB/s' 2433 */ 2434 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2435 int columns) { 2436 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2437 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2438 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2439 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2440 return FMT.format(mbps) + " MB/s"; 2441 } 2442 2443 /* 2444 * Format passed integer. n * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version 2445 * of passed number (Does absolute in case number is negative). 2446 */ 2447 public static byte[] format(final int number) { 2448 byte[] b = new byte[ROW_LENGTH]; 2449 int d = Math.abs(number); 2450 for (int i = b.length - 1; i >= 0; i--) { 2451 b[i] = (byte) ((d % 10) + '0'); 2452 d /= 10; 2453 } 2454 return b; 2455 } 2456 2457 /* 2458 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2459 * test, generation of the key and value consumes about 30% of CPU time. 2460 * @return Generated random value to insert into a table cell. 2461 */ 2462 public static byte[] generateData(final Random r, int length) { 2463 byte[] b = new byte[length]; 2464 int i; 2465 2466 for (i = 0; i < (length - 8); i += 8) { 2467 b[i] = (byte) (65 + r.nextInt(26)); 2468 b[i + 1] = b[i]; 2469 b[i + 2] = b[i]; 2470 b[i + 3] = b[i]; 2471 b[i + 4] = b[i]; 2472 b[i + 5] = b[i]; 2473 b[i + 6] = b[i]; 2474 b[i + 7] = b[i]; 2475 } 2476 2477 byte a = (byte) (65 + r.nextInt(26)); 2478 for (; i < length; i++) { 2479 b[i] = a; 2480 } 2481 return b; 2482 } 2483 2484 static byte[] getRandomRow(final Random random, final int totalRows) { 2485 return format(generateRandomRow(random, totalRows)); 2486 } 2487 2488 static int generateRandomRow(final Random random, final int totalRows) { 2489 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2490 } 2491 2492 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2493 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2494 throws IOException, InterruptedException { 2495 status.setStatus( 2496 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2497 long totalElapsedTime; 2498 2499 final TestBase t; 2500 try { 2501 if (AsyncTest.class.isAssignableFrom(cmd)) { 2502 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2503 Constructor<? extends AsyncTest> constructor = 2504 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2505 t = constructor.newInstance(asyncCon, opts, status); 2506 } else { 2507 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2508 Constructor<? extends Test> constructor = 2509 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2510 t = constructor.newInstance(con, opts, status); 2511 } 2512 } catch (NoSuchMethodException e) { 2513 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2514 + ". It does not provide a constructor as described by " 2515 + "the javadoc comment. Available constructors are: " 2516 + Arrays.toString(cmd.getConstructors())); 2517 } catch (Exception e) { 2518 throw new IllegalStateException("Failed to construct command class", e); 2519 } 2520 totalElapsedTime = t.test(); 2521 2522 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2523 + " for " + opts.perClientRunRows + " rows" + " (" 2524 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2525 getAverageValueLength(opts), opts.families, opts.columns) 2526 + ")"); 2527 2528 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2529 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2530 } 2531 2532 private static int getAverageValueLength(final TestOptions opts) { 2533 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2534 } 2535 2536 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2537 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2538 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2539 // the TestOptions introspection for us and dump the output in a readable format. 2540 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2541 Admin admin = null; 2542 Connection connection = null; 2543 try { 2544 connection = ConnectionFactory.createConnection(getConf()); 2545 admin = connection.getAdmin(); 2546 checkTable(admin, opts); 2547 } finally { 2548 if (admin != null) admin.close(); 2549 if (connection != null) connection.close(); 2550 } 2551 if (opts.nomapred) { 2552 doLocalClients(opts, getConf()); 2553 } else { 2554 doMapReduce(opts, getConf()); 2555 } 2556 } 2557 2558 protected void printUsage() { 2559 printUsage(PE_COMMAND_SHORTNAME, null); 2560 } 2561 2562 protected static void printUsage(final String message) { 2563 printUsage(PE_COMMAND_SHORTNAME, message); 2564 } 2565 2566 protected static void printUsageAndExit(final String message, final int exitCode) { 2567 printUsage(message); 2568 System.exit(exitCode); 2569 } 2570 2571 protected static void printUsage(final String shortName, final String message) { 2572 if (message != null && message.length() > 0) { 2573 System.err.println(message); 2574 } 2575 System.err.print("Usage: hbase " + shortName); 2576 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2577 System.err.println(); 2578 System.err.println("General Options:"); 2579 System.err.println( 2580 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2581 System.err 2582 .println(" oneCon all the threads share the same connection. Default: False"); 2583 System.err.println(" connCount connections all threads share. " 2584 + "For example, if set to 2, then all thread share 2 connection. " 2585 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2586 + "if not, connCount=thread number"); 2587 2588 System.err.println(" sampleRate Execute test on a sample of total " 2589 + "rows. Only supported by randomRead. Default: 1.0"); 2590 System.err.println(" period Report every 'period' rows: " 2591 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2592 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2593 System.err.println( 2594 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2595 System.err.println(" latency Set to report operation latencies. Default: False"); 2596 System.err.println(" latencyThreshold Set to report number of operations with latency " 2597 + "over lantencyThreshold, unit in millisecond, default 0"); 2598 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2599 + " rows have been treated. Default: 0"); 2600 System.err 2601 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2602 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2603 + "'valueSize'; set on read for stats on size: Default: Not set."); 2604 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2605 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2606 System.err.println(); 2607 System.err.println("Table Creation / Write Tests:"); 2608 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2609 System.err.println( 2610 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2611 + ". In case of randomReads and randomSeekScans this could" 2612 + " be specified along with --size to specify the number of rows to be scanned within" 2613 + " the total range specified by the size."); 2614 System.err.println( 2615 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2616 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2617 + " use size to specify the end range and --rows" 2618 + " specifies the number of rows within that range. " + "Default: 1.0."); 2619 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2620 System.err.println( 2621 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2622 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2623 + "'valueSize' in zipf form: Default: Not set."); 2624 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2625 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2626 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2627 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2628 System.err.println(" presplit Create presplit table. If a table with same name exists," 2629 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2630 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2631 System.err.println( 2632 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2633 System.err.println(" numoftags Specify the no of tags that would be needed. " 2634 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2635 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2636 System.err.println(" columns Columns to write per row. Default: 1"); 2637 System.err 2638 .println(" families Specify number of column families for the table. Default: 1"); 2639 System.err.println(); 2640 System.err.println("Read Tests:"); 2641 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2642 + " there by not returning any thing back to the client. Helps to check the server side" 2643 + " performance. Uses FilterAllFilter internally. "); 2644 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2645 + "by randomRead. Default: disabled"); 2646 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2647 + "inmemory as far as possible. Not guaranteed that reads are always served " 2648 + "from memory. Default: false"); 2649 System.err 2650 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2651 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2652 System.err 2653 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2654 + "Uses the CompactingMemstore"); 2655 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2656 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2657 System.err.println( 2658 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2659 System.err.println(" caching Scan caching to use. Default: 30"); 2660 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2661 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2662 System.err.println( 2663 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2664 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2665 System.err.println(); 2666 System.err.println(" Note: -D properties will be applied to the conf used. "); 2667 System.err.println(" For example: "); 2668 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2669 System.err.println(" -Dmapreduce.task.timeout=60000"); 2670 System.err.println(); 2671 System.err.println("Command:"); 2672 for (CmdDescriptor command : COMMANDS.values()) { 2673 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2674 } 2675 System.err.println(); 2676 System.err.println("Args:"); 2677 System.err.println(" nclients Integer. Required. Total number of clients " 2678 + "(and HRegionServers) running. 1 <= value <= 500"); 2679 System.err.println("Examples:"); 2680 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2681 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2682 System.err.println(" To run 10 clients doing increments over ten rows:"); 2683 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2684 } 2685 2686 /** 2687 * Parse options passed in via an arguments array. Assumes that array has been split on 2688 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2689 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2690 * arguments. 2691 */ 2692 static TestOptions parseOpts(Queue<String> args) { 2693 TestOptions opts = new TestOptions(); 2694 2695 String cmd = null; 2696 while ((cmd = args.poll()) != null) { 2697 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2698 // place item back onto queue so that caller knows parsing was incomplete 2699 args.add(cmd); 2700 break; 2701 } 2702 2703 final String nmr = "--nomapred"; 2704 if (cmd.startsWith(nmr)) { 2705 opts.nomapred = true; 2706 continue; 2707 } 2708 2709 final String rows = "--rows="; 2710 if (cmd.startsWith(rows)) { 2711 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2712 continue; 2713 } 2714 2715 final String cycles = "--cycles="; 2716 if (cmd.startsWith(cycles)) { 2717 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2718 continue; 2719 } 2720 2721 final String sampleRate = "--sampleRate="; 2722 if (cmd.startsWith(sampleRate)) { 2723 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2724 continue; 2725 } 2726 2727 final String table = "--table="; 2728 if (cmd.startsWith(table)) { 2729 opts.tableName = cmd.substring(table.length()); 2730 continue; 2731 } 2732 2733 final String startRow = "--startRow="; 2734 if (cmd.startsWith(startRow)) { 2735 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2736 continue; 2737 } 2738 2739 final String compress = "--compress="; 2740 if (cmd.startsWith(compress)) { 2741 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2742 continue; 2743 } 2744 2745 final String traceRate = "--traceRate="; 2746 if (cmd.startsWith(traceRate)) { 2747 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2748 continue; 2749 } 2750 2751 final String blockEncoding = "--blockEncoding="; 2752 if (cmd.startsWith(blockEncoding)) { 2753 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2754 continue; 2755 } 2756 2757 final String flushCommits = "--flushCommits="; 2758 if (cmd.startsWith(flushCommits)) { 2759 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2760 continue; 2761 } 2762 2763 final String writeToWAL = "--writeToWAL="; 2764 if (cmd.startsWith(writeToWAL)) { 2765 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2766 continue; 2767 } 2768 2769 final String presplit = "--presplit="; 2770 if (cmd.startsWith(presplit)) { 2771 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2772 continue; 2773 } 2774 2775 final String inMemory = "--inmemory="; 2776 if (cmd.startsWith(inMemory)) { 2777 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2778 continue; 2779 } 2780 2781 final String autoFlush = "--autoFlush="; 2782 if (cmd.startsWith(autoFlush)) { 2783 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2784 continue; 2785 } 2786 2787 final String onceCon = "--oneCon="; 2788 if (cmd.startsWith(onceCon)) { 2789 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2790 continue; 2791 } 2792 2793 final String connCount = "--connCount="; 2794 if (cmd.startsWith(connCount)) { 2795 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2796 continue; 2797 } 2798 2799 final String latencyThreshold = "--latencyThreshold="; 2800 if (cmd.startsWith(latencyThreshold)) { 2801 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2802 continue; 2803 } 2804 2805 final String latency = "--latency"; 2806 if (cmd.startsWith(latency)) { 2807 opts.reportLatency = true; 2808 continue; 2809 } 2810 2811 final String multiGet = "--multiGet="; 2812 if (cmd.startsWith(multiGet)) { 2813 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2814 continue; 2815 } 2816 2817 final String multiPut = "--multiPut="; 2818 if (cmd.startsWith(multiPut)) { 2819 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2820 continue; 2821 } 2822 2823 final String useTags = "--usetags="; 2824 if (cmd.startsWith(useTags)) { 2825 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2826 continue; 2827 } 2828 2829 final String noOfTags = "--numoftags="; 2830 if (cmd.startsWith(noOfTags)) { 2831 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2832 continue; 2833 } 2834 2835 final String replicas = "--replicas="; 2836 if (cmd.startsWith(replicas)) { 2837 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2838 continue; 2839 } 2840 2841 final String filterOutAll = "--filterAll"; 2842 if (cmd.startsWith(filterOutAll)) { 2843 opts.filterAll = true; 2844 continue; 2845 } 2846 2847 final String size = "--size="; 2848 if (cmd.startsWith(size)) { 2849 opts.size = Float.parseFloat(cmd.substring(size.length())); 2850 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2851 continue; 2852 } 2853 2854 final String splitPolicy = "--splitPolicy="; 2855 if (cmd.startsWith(splitPolicy)) { 2856 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2857 continue; 2858 } 2859 2860 final String randomSleep = "--randomSleep="; 2861 if (cmd.startsWith(randomSleep)) { 2862 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2863 continue; 2864 } 2865 2866 final String measureAfter = "--measureAfter="; 2867 if (cmd.startsWith(measureAfter)) { 2868 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2869 continue; 2870 } 2871 2872 final String bloomFilter = "--bloomFilter="; 2873 if (cmd.startsWith(bloomFilter)) { 2874 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2875 continue; 2876 } 2877 2878 final String blockSize = "--blockSize="; 2879 if (cmd.startsWith(blockSize)) { 2880 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2881 continue; 2882 } 2883 2884 final String valueSize = "--valueSize="; 2885 if (cmd.startsWith(valueSize)) { 2886 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2887 continue; 2888 } 2889 2890 final String valueRandom = "--valueRandom"; 2891 if (cmd.startsWith(valueRandom)) { 2892 opts.valueRandom = true; 2893 continue; 2894 } 2895 2896 final String valueZipf = "--valueZipf"; 2897 if (cmd.startsWith(valueZipf)) { 2898 opts.valueZipf = true; 2899 continue; 2900 } 2901 2902 final String period = "--period="; 2903 if (cmd.startsWith(period)) { 2904 opts.period = Integer.parseInt(cmd.substring(period.length())); 2905 continue; 2906 } 2907 2908 final String addColumns = "--addColumns="; 2909 if (cmd.startsWith(addColumns)) { 2910 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2911 continue; 2912 } 2913 2914 final String inMemoryCompaction = "--inmemoryCompaction="; 2915 if (cmd.startsWith(inMemoryCompaction)) { 2916 opts.inMemoryCompaction = 2917 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2918 continue; 2919 } 2920 2921 final String columns = "--columns="; 2922 if (cmd.startsWith(columns)) { 2923 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2924 continue; 2925 } 2926 2927 final String families = "--families="; 2928 if (cmd.startsWith(families)) { 2929 opts.families = Integer.parseInt(cmd.substring(families.length())); 2930 continue; 2931 } 2932 2933 final String caching = "--caching="; 2934 if (cmd.startsWith(caching)) { 2935 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2936 continue; 2937 } 2938 2939 final String asyncPrefetch = "--asyncPrefetch"; 2940 if (cmd.startsWith(asyncPrefetch)) { 2941 opts.asyncPrefetch = true; 2942 continue; 2943 } 2944 2945 final String cacheBlocks = "--cacheBlocks="; 2946 if (cmd.startsWith(cacheBlocks)) { 2947 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2948 continue; 2949 } 2950 2951 final String scanReadType = "--scanReadType="; 2952 if (cmd.startsWith(scanReadType)) { 2953 opts.scanReadType = 2954 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2955 continue; 2956 } 2957 2958 final String bufferSize = "--bufferSize="; 2959 if (cmd.startsWith(bufferSize)) { 2960 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2961 continue; 2962 } 2963 2964 validateParsedOpts(opts); 2965 2966 if (isCommandClass(cmd)) { 2967 opts.cmdName = cmd; 2968 try { 2969 opts.numClientThreads = Integer.parseInt(args.remove()); 2970 } catch (NoSuchElementException | NumberFormatException e) { 2971 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2972 } 2973 opts = calculateRowsAndSize(opts); 2974 break; 2975 } else { 2976 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2977 } 2978 2979 // Not matching any option or command. 2980 System.err.println("Error: Wrong option or command: " + cmd); 2981 args.add(cmd); 2982 break; 2983 } 2984 return opts; 2985 } 2986 2987 /** 2988 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 2989 */ 2990 private static void validateParsedOpts(TestOptions opts) { 2991 2992 if (!opts.autoFlush && opts.multiPut > 0) { 2993 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 2994 } 2995 2996 if (opts.oneCon && opts.connCount > 1) { 2997 throw new IllegalArgumentException( 2998 "oneCon is set to true, " + "connCount should not bigger than 1"); 2999 } 3000 3001 if (opts.valueZipf && opts.valueRandom) { 3002 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3003 } 3004 } 3005 3006 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3007 int rowsPerGB = getRowsPerGB(opts); 3008 if ( 3009 (opts.getCmdName() != null 3010 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3011 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3012 ) { 3013 opts.totalRows = (int) opts.size * rowsPerGB; 3014 } else if (opts.size != DEFAULT_OPTS.size) { 3015 // total size in GB specified 3016 opts.totalRows = (int) opts.size * rowsPerGB; 3017 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3018 } else { 3019 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3020 opts.size = opts.totalRows / rowsPerGB; 3021 } 3022 return opts; 3023 } 3024 3025 static int getRowsPerGB(final TestOptions opts) { 3026 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3027 * opts.getColumns()); 3028 } 3029 3030 @Override 3031 public int run(String[] args) throws Exception { 3032 // Process command-line args. TODO: Better cmd-line processing 3033 // (but hopefully something not as painful as cli options). 3034 int errCode = -1; 3035 if (args.length < 1) { 3036 printUsage(); 3037 return errCode; 3038 } 3039 3040 try { 3041 LinkedList<String> argv = new LinkedList<>(); 3042 argv.addAll(Arrays.asList(args)); 3043 TestOptions opts = parseOpts(argv); 3044 3045 // args remaining, print help and exit 3046 if (!argv.isEmpty()) { 3047 errCode = 0; 3048 printUsage(); 3049 return errCode; 3050 } 3051 3052 // must run at least 1 client 3053 if (opts.numClientThreads <= 0) { 3054 throw new IllegalArgumentException("Number of clients must be > 0"); 3055 } 3056 3057 // cmdName should not be null, print help and exit 3058 if (opts.cmdName == null) { 3059 printUsage(); 3060 return errCode; 3061 } 3062 3063 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3064 if (cmdClass != null) { 3065 runTest(cmdClass, opts); 3066 errCode = 0; 3067 } 3068 3069 } catch (Exception e) { 3070 e.printStackTrace(); 3071 } 3072 3073 return errCode; 3074 } 3075 3076 private static boolean isCommandClass(String cmd) { 3077 return COMMANDS.containsKey(cmd); 3078 } 3079 3080 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3081 CmdDescriptor descriptor = COMMANDS.get(cmd); 3082 return descriptor != null ? descriptor.getCmdClass() : null; 3083 } 3084 3085 public static void main(final String[] args) throws Exception { 3086 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3087 System.exit(res); 3088 } 3089}