001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Queue; 040import java.util.Random; 041import java.util.TreeMap; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.ThreadLocalRandom; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.conf.Configured; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.Append; 055import org.apache.hadoop.hbase.client.AsyncConnection; 056import org.apache.hadoop.hbase.client.AsyncTable; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.BufferedMutatorParams; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Consistency; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Durability; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Increment; 067import org.apache.hadoop.hbase.client.Put; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.RegionInfoBuilder; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.RowMutations; 074import org.apache.hadoop.hbase.client.Scan; 075import org.apache.hadoop.hbase.client.Table; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 079import org.apache.hadoop.hbase.filter.BinaryComparator; 080import org.apache.hadoop.hbase.filter.Filter; 081import org.apache.hadoop.hbase.filter.FilterAllFilter; 082import org.apache.hadoop.hbase.filter.FilterList; 083import org.apache.hadoop.hbase.filter.PageFilter; 084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 085import org.apache.hadoop.hbase.filter.WhileMatchFilter; 086import org.apache.hadoop.hbase.io.compress.Compression; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 089import org.apache.hadoop.hbase.regionserver.BloomType; 090import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 091import org.apache.hadoop.hbase.trace.TraceUtil; 092import org.apache.hadoop.hbase.util.ByteArrayHashKey; 093import org.apache.hadoop.hbase.util.Bytes; 094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 095import org.apache.hadoop.hbase.util.GsonUtil; 096import org.apache.hadoop.hbase.util.Hash; 097import org.apache.hadoop.hbase.util.MurmurHash; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.util.RandomDistribution; 100import org.apache.hadoop.hbase.util.YammerHistogramUtils; 101import org.apache.hadoop.io.LongWritable; 102import org.apache.hadoop.io.Text; 103import org.apache.hadoop.mapreduce.Job; 104import org.apache.hadoop.mapreduce.Mapper; 105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 108import org.apache.hadoop.util.Tool; 109import org.apache.hadoop.util.ToolRunner; 110import org.apache.yetus.audience.InterfaceAudience; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113 114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 116import org.apache.hbase.thirdparty.com.google.gson.Gson; 117 118/** 119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 121 * etc.). Pass on the command-line which test to run and how many clients are participating in this 122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 123 * <p> 124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 126 * pages 8-10. 127 * <p> 128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 130 * about 1GB of data, unless specified otherwise. 131 */ 132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 133public class PerformanceEvaluation extends Configured implements Tool { 134 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 135 static final String RANDOM_READ = "randomRead"; 136 static final String PE_COMMAND_SHORTNAME = "pe"; 137 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 138 private static final Gson GSON = GsonUtil.createGson().create(); 139 140 public static final String TABLE_NAME = "TestTable"; 141 public static final String FAMILY_NAME_BASE = "info"; 142 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 143 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 144 public static final int DEFAULT_VALUE_LENGTH = 1000; 145 public static final int ROW_LENGTH = 26; 146 147 private static final int ONE_GB = 1024 * 1024 * 1000; 148 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 149 // TODO : should we make this configurable 150 private static final int TAG_LENGTH = 256; 151 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 152 private static final MathContext CXT = MathContext.DECIMAL64; 153 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 154 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 155 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 156 157 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 158 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 159 160 static { 161 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 162 "Run async random read test"); 163 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 164 "Run async random write test"); 165 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 166 "Run async sequential read test"); 167 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 168 "Run async sequential write test"); 169 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 170 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 171 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 172 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 173 "Run random seek and scan 100 test"); 174 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 175 "Run random seek scan with both start and stop row (max 10 rows)"); 176 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 177 "Run random seek scan with both start and stop row (max 100 rows)"); 178 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 179 "Run random seek scan with both start and stop row (max 1000 rows)"); 180 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 181 "Run random seek scan with both start and stop row (max 10000 rows)"); 182 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 183 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 184 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 185 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 186 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 187 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 188 addCommandDescriptor(FilteredScanTest.class, "filterScan", 189 "Run scan test using a filter to find a specific row based on it's value " 190 + "(make sure to use --rows=20)"); 191 addCommandDescriptor(IncrementTest.class, "increment", 192 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 193 addCommandDescriptor(AppendTest.class, "append", 194 "Append on each row; clients overlap on keyspace so some concurrent operations"); 195 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 196 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 197 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 198 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 200 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 202 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 203 } 204 205 /** 206 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 207 * associated properties. 208 */ 209 protected static enum Counter { 210 /** elapsed time */ 211 ELAPSED_TIME, 212 /** number of rows */ 213 ROWS 214 } 215 216 protected static class RunResult implements Comparable<RunResult> { 217 public RunResult(long duration, Histogram hist) { 218 this.duration = duration; 219 this.hist = hist; 220 numbOfReplyOverThreshold = 0; 221 numOfReplyFromReplica = 0; 222 } 223 224 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 225 Histogram hist) { 226 this.duration = duration; 227 this.hist = hist; 228 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 229 this.numOfReplyFromReplica = numOfReplyFromReplica; 230 } 231 232 public final long duration; 233 public final Histogram hist; 234 public final long numbOfReplyOverThreshold; 235 public final long numOfReplyFromReplica; 236 237 @Override 238 public String toString() { 239 return Long.toString(duration); 240 } 241 242 @Override 243 public int compareTo(RunResult o) { 244 return Long.compare(this.duration, o.duration); 245 } 246 } 247 248 /** 249 * Constructor 250 * @param conf Configuration object 251 */ 252 public PerformanceEvaluation(final Configuration conf) { 253 super(conf); 254 } 255 256 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 257 String description) { 258 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 259 COMMANDS.put(name, cmdDescriptor); 260 } 261 262 /** 263 * Implementations can have their status set. 264 */ 265 interface Status { 266 /** 267 * Sets status 268 * @param msg status message 269 */ 270 void setStatus(final String msg) throws IOException; 271 } 272 273 /** 274 * MapReduce job that runs a performance evaluation client in each map task. 275 */ 276 public static class EvaluationMapTask 277 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 278 279 /** configuration parameter name that contains the command */ 280 public final static String CMD_KEY = "EvaluationMapTask.command"; 281 /** configuration parameter name that contains the PE impl */ 282 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 283 284 private Class<? extends Test> cmd; 285 286 @Override 287 protected void setup(Context context) throws IOException, InterruptedException { 288 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 289 290 // this is required so that extensions of PE are instantiated within the 291 // map reduce task... 292 Class<? extends PerformanceEvaluation> peClass = 293 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 294 try { 295 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 296 } catch (Exception e) { 297 throw new IllegalStateException("Could not instantiate PE instance", e); 298 } 299 } 300 301 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 302 try { 303 return Class.forName(className).asSubclass(type); 304 } catch (ClassNotFoundException e) { 305 throw new IllegalStateException("Could not find class for name: " + className, e); 306 } 307 } 308 309 @Override 310 protected void map(LongWritable key, Text value, final Context context) 311 throws IOException, InterruptedException { 312 313 Status status = new Status() { 314 @Override 315 public void setStatus(String msg) { 316 context.setStatus(msg); 317 } 318 }; 319 320 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 321 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 322 final Connection con = ConnectionFactory.createConnection(conf); 323 AsyncConnection asyncCon = null; 324 try { 325 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 326 } catch (ExecutionException e) { 327 throw new IOException(e); 328 } 329 330 // Evaluation task 331 RunResult result = 332 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 333 // Collect how much time the thing took. Report as map output and 334 // to the ELAPSED_TIME counter. 335 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 336 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 337 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 338 context.progress(); 339 } 340 } 341 342 /* 343 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 344 * is specified or when the existing table's region replica count doesn't match {@code 345 * opts.replicas}. 346 */ 347 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 348 TableName tableName = TableName.valueOf(opts.tableName); 349 boolean needsDelete = false, exists = admin.tableExists(tableName); 350 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 351 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 352 if (!exists && isReadCmd) { 353 throw new IllegalStateException( 354 "Must specify an existing table for read commands. Run a write command first."); 355 } 356 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 357 byte[][] splits = getSplits(opts); 358 359 // recreate the table when user has requested presplit or when existing 360 // {RegionSplitPolicy,replica count} does not match requested, or when the 361 // number of column families does not match requested. 362 if ( 363 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 364 || (!isReadCmd && desc != null 365 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 366 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 367 || (desc != null && desc.getColumnFamilyCount() != opts.families) 368 ) { 369 needsDelete = true; 370 // wait, why did it delete my table?!? 371 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 372 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 373 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 374 .add("replicas", opts.replicas).add("families", opts.families).toString()); 375 } 376 377 // remove an existing table 378 if (needsDelete) { 379 if (admin.isTableEnabled(tableName)) { 380 admin.disableTable(tableName); 381 } 382 admin.deleteTable(tableName); 383 } 384 385 // table creation is necessary 386 if (!exists || needsDelete) { 387 desc = getTableDescriptor(opts); 388 if (splits != null) { 389 if (LOG.isDebugEnabled()) { 390 for (int i = 0; i < splits.length; i++) { 391 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 392 } 393 } 394 } 395 if (splits != null) { 396 admin.createTable(desc, splits); 397 } else { 398 admin.createTable(desc); 399 } 400 LOG.info("Table " + desc + " created"); 401 } 402 return admin.tableExists(tableName); 403 } 404 405 /** 406 * Create an HTableDescriptor from provided TestOptions. 407 */ 408 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 409 TableDescriptorBuilder builder = 410 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 411 412 for (int family = 0; family < opts.families; family++) { 413 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 414 ColumnFamilyDescriptorBuilder cfBuilder = 415 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 416 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 417 cfBuilder.setCompressionType(opts.compression); 418 cfBuilder.setEncryptionType(opts.encryption); 419 cfBuilder.setBloomFilterType(opts.bloomType); 420 cfBuilder.setBlocksize(opts.blockSize); 421 if (opts.inMemoryCF) { 422 cfBuilder.setInMemory(true); 423 } 424 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 425 builder.setColumnFamily(cfBuilder.build()); 426 } 427 if (opts.replicas != DEFAULT_OPTS.replicas) { 428 builder.setRegionReplication(opts.replicas); 429 } 430 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 431 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 432 } 433 return builder.build(); 434 } 435 436 /** 437 * generates splits based on total number of rows and specified split regions 438 */ 439 protected static byte[][] getSplits(TestOptions opts) { 440 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 441 442 int numSplitPoints = opts.presplitRegions - 1; 443 byte[][] splits = new byte[numSplitPoints][]; 444 int jump = opts.totalRows / opts.presplitRegions; 445 for (int i = 0; i < numSplitPoints; i++) { 446 int rowkey = jump * (1 + i); 447 splits[i] = format(rowkey); 448 } 449 return splits; 450 } 451 452 static void setupConnectionCount(final TestOptions opts) { 453 if (opts.oneCon) { 454 opts.connCount = 1; 455 } else { 456 if (opts.connCount == -1) { 457 // set to thread number if connCount is not set 458 opts.connCount = opts.numClientThreads; 459 } 460 } 461 } 462 463 /* 464 * Run all clients in this vm each to its own thread. 465 */ 466 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 467 throws IOException, InterruptedException, ExecutionException { 468 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 469 assert cmd != null; 470 @SuppressWarnings("unchecked") 471 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 472 RunResult[] results = new RunResult[opts.numClientThreads]; 473 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 474 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 475 setupConnectionCount(opts); 476 final Connection[] cons = new Connection[opts.connCount]; 477 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 478 for (int i = 0; i < opts.connCount; i++) { 479 cons[i] = ConnectionFactory.createConnection(conf); 480 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 481 } 482 LOG 483 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 484 for (int i = 0; i < threads.length; i++) { 485 final int index = i; 486 threads[i] = pool.submit(new Callable<RunResult>() { 487 @Override 488 public RunResult call() throws Exception { 489 TestOptions threadOpts = new TestOptions(opts); 490 final Connection con = cons[index % cons.length]; 491 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 492 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 493 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 494 @Override 495 public void setStatus(final String msg) throws IOException { 496 LOG.info(msg); 497 } 498 }); 499 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 500 + "ms over " + threadOpts.perClientRunRows + " rows"); 501 if (opts.latencyThreshold > 0) { 502 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 503 + "(ms) is " + run.numbOfReplyOverThreshold); 504 } 505 return run; 506 } 507 }); 508 } 509 pool.shutdown(); 510 511 for (int i = 0; i < threads.length; i++) { 512 try { 513 results[i] = threads[i].get(); 514 } catch (ExecutionException e) { 515 throw new IOException(e.getCause()); 516 } 517 } 518 final String test = cmd.getSimpleName(); 519 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 520 Arrays.sort(results); 521 long total = 0; 522 float avgLatency = 0; 523 float avgTPS = 0; 524 long replicaWins = 0; 525 for (RunResult result : results) { 526 total += result.duration; 527 avgLatency += result.hist.getSnapshot().getMean(); 528 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 529 replicaWins += result.numOfReplyFromReplica; 530 } 531 avgTPS *= 1000; // ms to second 532 avgLatency = avgLatency / results.length; 533 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 534 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 535 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 536 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 537 if (opts.replicas > 1) { 538 LOG.info("[results from replica regions] " + replicaWins); 539 } 540 541 for (int i = 0; i < opts.connCount; i++) { 542 cons[i].close(); 543 asyncCons[i].close(); 544 } 545 546 return results; 547 } 548 549 /* 550 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 551 * out an input file with instruction per client regards which row they are to start on. 552 * @param cmd Command to run. 553 */ 554 static Job doMapReduce(TestOptions opts, final Configuration conf) 555 throws IOException, InterruptedException, ClassNotFoundException { 556 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 557 assert cmd != null; 558 Path inputDir = writeInputFile(conf, opts); 559 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 560 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 561 Job job = Job.getInstance(conf); 562 job.setJarByClass(PerformanceEvaluation.class); 563 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 564 565 job.setInputFormatClass(NLineInputFormat.class); 566 NLineInputFormat.setInputPaths(job, inputDir); 567 // this is default, but be explicit about it just in case. 568 NLineInputFormat.setNumLinesPerSplit(job, 1); 569 570 job.setOutputKeyClass(LongWritable.class); 571 job.setOutputValueClass(LongWritable.class); 572 573 job.setMapperClass(EvaluationMapTask.class); 574 job.setReducerClass(LongSumReducer.class); 575 576 job.setNumReduceTasks(1); 577 578 job.setOutputFormatClass(TextOutputFormat.class); 579 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 580 581 TableMapReduceUtil.addDependencyJars(job); 582 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 583 // metrics 584 Gson.class, // gson 585 FilterAllFilter.class // hbase-server tests jar 586 ); 587 588 TableMapReduceUtil.initCredentials(job); 589 590 job.waitForCompletion(true); 591 return job; 592 } 593 594 /** 595 * Each client has one mapper to do the work, and client do the resulting count in a map task. 596 */ 597 598 static String JOB_INPUT_FILENAME = "input.txt"; 599 600 /* 601 * Write input file of offsets-per-client for the mapreduce job. 602 * @param c Configuration 603 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 604 */ 605 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 606 return writeInputFile(c, opts, new Path(".")); 607 } 608 609 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 610 throws IOException { 611 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 612 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 613 Path inputDir = new Path(jobdir, "inputs"); 614 615 FileSystem fs = FileSystem.get(c); 616 fs.mkdirs(inputDir); 617 618 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 619 PrintStream out = new PrintStream(fs.create(inputFile)); 620 // Make input random. 621 Map<Integer, String> m = new TreeMap<>(); 622 Hash h = MurmurHash.getInstance(); 623 int perClientRows = (opts.totalRows / opts.numClientThreads); 624 try { 625 for (int j = 0; j < opts.numClientThreads; j++) { 626 TestOptions next = new TestOptions(opts); 627 next.startRow = j * perClientRows; 628 next.perClientRunRows = perClientRows; 629 String s = GSON.toJson(next); 630 LOG.info("Client=" + j + ", input=" + s); 631 byte[] b = Bytes.toBytes(s); 632 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 633 m.put(hash, s); 634 } 635 for (Map.Entry<Integer, String> e : m.entrySet()) { 636 out.println(e.getValue()); 637 } 638 } finally { 639 out.close(); 640 } 641 return inputDir; 642 } 643 644 /** 645 * Describes a command. 646 */ 647 static class CmdDescriptor { 648 private Class<? extends TestBase> cmdClass; 649 private String name; 650 private String description; 651 652 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 653 this.cmdClass = cmdClass; 654 this.name = name; 655 this.description = description; 656 } 657 658 public Class<? extends TestBase> getCmdClass() { 659 return cmdClass; 660 } 661 662 public String getName() { 663 return name; 664 } 665 666 public String getDescription() { 667 return description; 668 } 669 } 670 671 /** 672 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 673 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 674 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 675 * need to add to the clone constructor below copying your new option from the 'that' to the 676 * 'this'. Look for 'clone' below. 677 */ 678 static class TestOptions { 679 String cmdName = null; 680 boolean nomapred = false; 681 boolean filterAll = false; 682 int startRow = 0; 683 float size = 1.0f; 684 int perClientRunRows = DEFAULT_ROWS_PER_GB; 685 int numClientThreads = 1; 686 int totalRows = DEFAULT_ROWS_PER_GB; 687 int measureAfter = 0; 688 float sampleRate = 1.0f; 689 /** 690 * @deprecated Useless after switching to OpenTelemetry 691 */ 692 @Deprecated 693 double traceRate = 0.0; 694 String tableName = TABLE_NAME; 695 boolean flushCommits = true; 696 boolean writeToWAL = true; 697 boolean autoFlush = false; 698 boolean oneCon = false; 699 int connCount = -1; // wil decide the actual num later 700 boolean useTags = false; 701 int noOfTags = 1; 702 boolean reportLatency = false; 703 int multiGet = 0; 704 int multiPut = 0; 705 int randomSleep = 0; 706 boolean inMemoryCF = false; 707 int presplitRegions = 0; 708 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 709 String splitPolicy = null; 710 Compression.Algorithm compression = Compression.Algorithm.NONE; 711 String encryption = null; 712 BloomType bloomType = BloomType.ROW; 713 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 714 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 715 boolean valueRandom = false; 716 boolean valueZipf = false; 717 int valueSize = DEFAULT_VALUE_LENGTH; 718 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 719 int cycles = 1; 720 int columns = 1; 721 int families = 1; 722 int caching = 30; 723 int latencyThreshold = 0; // in millsecond 724 boolean addColumns = true; 725 MemoryCompactionPolicy inMemoryCompaction = 726 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 727 boolean asyncPrefetch = false; 728 boolean cacheBlocks = true; 729 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 730 long bufferSize = 2l * 1024l * 1024l; 731 732 public TestOptions() { 733 } 734 735 /** 736 * Clone constructor. 737 * @param that Object to copy from. 738 */ 739 public TestOptions(TestOptions that) { 740 this.cmdName = that.cmdName; 741 this.cycles = that.cycles; 742 this.nomapred = that.nomapred; 743 this.startRow = that.startRow; 744 this.size = that.size; 745 this.perClientRunRows = that.perClientRunRows; 746 this.numClientThreads = that.numClientThreads; 747 this.totalRows = that.totalRows; 748 this.sampleRate = that.sampleRate; 749 this.traceRate = that.traceRate; 750 this.tableName = that.tableName; 751 this.flushCommits = that.flushCommits; 752 this.writeToWAL = that.writeToWAL; 753 this.autoFlush = that.autoFlush; 754 this.oneCon = that.oneCon; 755 this.connCount = that.connCount; 756 this.useTags = that.useTags; 757 this.noOfTags = that.noOfTags; 758 this.reportLatency = that.reportLatency; 759 this.latencyThreshold = that.latencyThreshold; 760 this.multiGet = that.multiGet; 761 this.multiPut = that.multiPut; 762 this.inMemoryCF = that.inMemoryCF; 763 this.presplitRegions = that.presplitRegions; 764 this.replicas = that.replicas; 765 this.splitPolicy = that.splitPolicy; 766 this.compression = that.compression; 767 this.encryption = that.encryption; 768 this.blockEncoding = that.blockEncoding; 769 this.filterAll = that.filterAll; 770 this.bloomType = that.bloomType; 771 this.blockSize = that.blockSize; 772 this.valueRandom = that.valueRandom; 773 this.valueZipf = that.valueZipf; 774 this.valueSize = that.valueSize; 775 this.period = that.period; 776 this.randomSleep = that.randomSleep; 777 this.measureAfter = that.measureAfter; 778 this.addColumns = that.addColumns; 779 this.columns = that.columns; 780 this.families = that.families; 781 this.caching = that.caching; 782 this.inMemoryCompaction = that.inMemoryCompaction; 783 this.asyncPrefetch = that.asyncPrefetch; 784 this.cacheBlocks = that.cacheBlocks; 785 this.scanReadType = that.scanReadType; 786 this.bufferSize = that.bufferSize; 787 } 788 789 public int getCaching() { 790 return this.caching; 791 } 792 793 public void setCaching(final int caching) { 794 this.caching = caching; 795 } 796 797 public int getColumns() { 798 return this.columns; 799 } 800 801 public void setColumns(final int columns) { 802 this.columns = columns; 803 } 804 805 public int getFamilies() { 806 return this.families; 807 } 808 809 public void setFamilies(final int families) { 810 this.families = families; 811 } 812 813 public int getCycles() { 814 return this.cycles; 815 } 816 817 public void setCycles(final int cycles) { 818 this.cycles = cycles; 819 } 820 821 public boolean isValueZipf() { 822 return valueZipf; 823 } 824 825 public void setValueZipf(boolean valueZipf) { 826 this.valueZipf = valueZipf; 827 } 828 829 public String getCmdName() { 830 return cmdName; 831 } 832 833 public void setCmdName(String cmdName) { 834 this.cmdName = cmdName; 835 } 836 837 public int getRandomSleep() { 838 return randomSleep; 839 } 840 841 public void setRandomSleep(int randomSleep) { 842 this.randomSleep = randomSleep; 843 } 844 845 public int getReplicas() { 846 return replicas; 847 } 848 849 public void setReplicas(int replicas) { 850 this.replicas = replicas; 851 } 852 853 public String getSplitPolicy() { 854 return splitPolicy; 855 } 856 857 public void setSplitPolicy(String splitPolicy) { 858 this.splitPolicy = splitPolicy; 859 } 860 861 public void setNomapred(boolean nomapred) { 862 this.nomapred = nomapred; 863 } 864 865 public void setFilterAll(boolean filterAll) { 866 this.filterAll = filterAll; 867 } 868 869 public void setStartRow(int startRow) { 870 this.startRow = startRow; 871 } 872 873 public void setSize(float size) { 874 this.size = size; 875 } 876 877 public void setPerClientRunRows(int perClientRunRows) { 878 this.perClientRunRows = perClientRunRows; 879 } 880 881 public void setNumClientThreads(int numClientThreads) { 882 this.numClientThreads = numClientThreads; 883 } 884 885 public void setTotalRows(int totalRows) { 886 this.totalRows = totalRows; 887 } 888 889 public void setSampleRate(float sampleRate) { 890 this.sampleRate = sampleRate; 891 } 892 893 public void setTraceRate(double traceRate) { 894 this.traceRate = traceRate; 895 } 896 897 public void setTableName(String tableName) { 898 this.tableName = tableName; 899 } 900 901 public void setFlushCommits(boolean flushCommits) { 902 this.flushCommits = flushCommits; 903 } 904 905 public void setWriteToWAL(boolean writeToWAL) { 906 this.writeToWAL = writeToWAL; 907 } 908 909 public void setAutoFlush(boolean autoFlush) { 910 this.autoFlush = autoFlush; 911 } 912 913 public void setOneCon(boolean oneCon) { 914 this.oneCon = oneCon; 915 } 916 917 public int getConnCount() { 918 return connCount; 919 } 920 921 public void setConnCount(int connCount) { 922 this.connCount = connCount; 923 } 924 925 public void setUseTags(boolean useTags) { 926 this.useTags = useTags; 927 } 928 929 public void setNoOfTags(int noOfTags) { 930 this.noOfTags = noOfTags; 931 } 932 933 public void setReportLatency(boolean reportLatency) { 934 this.reportLatency = reportLatency; 935 } 936 937 public void setMultiGet(int multiGet) { 938 this.multiGet = multiGet; 939 } 940 941 public void setMultiPut(int multiPut) { 942 this.multiPut = multiPut; 943 } 944 945 public void setInMemoryCF(boolean inMemoryCF) { 946 this.inMemoryCF = inMemoryCF; 947 } 948 949 public void setPresplitRegions(int presplitRegions) { 950 this.presplitRegions = presplitRegions; 951 } 952 953 public void setCompression(Compression.Algorithm compression) { 954 this.compression = compression; 955 } 956 957 public void setEncryption(String encryption) { 958 this.encryption = encryption; 959 } 960 961 public void setBloomType(BloomType bloomType) { 962 this.bloomType = bloomType; 963 } 964 965 public void setBlockSize(int blockSize) { 966 this.blockSize = blockSize; 967 } 968 969 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 970 this.blockEncoding = blockEncoding; 971 } 972 973 public void setValueRandom(boolean valueRandom) { 974 this.valueRandom = valueRandom; 975 } 976 977 public void setValueSize(int valueSize) { 978 this.valueSize = valueSize; 979 } 980 981 public void setBufferSize(long bufferSize) { 982 this.bufferSize = bufferSize; 983 } 984 985 public void setPeriod(int period) { 986 this.period = period; 987 } 988 989 public boolean isNomapred() { 990 return nomapred; 991 } 992 993 public boolean isFilterAll() { 994 return filterAll; 995 } 996 997 public int getStartRow() { 998 return startRow; 999 } 1000 1001 public float getSize() { 1002 return size; 1003 } 1004 1005 public int getPerClientRunRows() { 1006 return perClientRunRows; 1007 } 1008 1009 public int getNumClientThreads() { 1010 return numClientThreads; 1011 } 1012 1013 public int getTotalRows() { 1014 return totalRows; 1015 } 1016 1017 public float getSampleRate() { 1018 return sampleRate; 1019 } 1020 1021 public double getTraceRate() { 1022 return traceRate; 1023 } 1024 1025 public String getTableName() { 1026 return tableName; 1027 } 1028 1029 public boolean isFlushCommits() { 1030 return flushCommits; 1031 } 1032 1033 public boolean isWriteToWAL() { 1034 return writeToWAL; 1035 } 1036 1037 public boolean isAutoFlush() { 1038 return autoFlush; 1039 } 1040 1041 public boolean isUseTags() { 1042 return useTags; 1043 } 1044 1045 public int getNoOfTags() { 1046 return noOfTags; 1047 } 1048 1049 public boolean isReportLatency() { 1050 return reportLatency; 1051 } 1052 1053 public int getMultiGet() { 1054 return multiGet; 1055 } 1056 1057 public int getMultiPut() { 1058 return multiPut; 1059 } 1060 1061 public boolean isInMemoryCF() { 1062 return inMemoryCF; 1063 } 1064 1065 public int getPresplitRegions() { 1066 return presplitRegions; 1067 } 1068 1069 public Compression.Algorithm getCompression() { 1070 return compression; 1071 } 1072 1073 public String getEncryption() { 1074 return encryption; 1075 } 1076 1077 public DataBlockEncoding getBlockEncoding() { 1078 return blockEncoding; 1079 } 1080 1081 public boolean isValueRandom() { 1082 return valueRandom; 1083 } 1084 1085 public int getValueSize() { 1086 return valueSize; 1087 } 1088 1089 public int getPeriod() { 1090 return period; 1091 } 1092 1093 public BloomType getBloomType() { 1094 return bloomType; 1095 } 1096 1097 public int getBlockSize() { 1098 return blockSize; 1099 } 1100 1101 public boolean isOneCon() { 1102 return oneCon; 1103 } 1104 1105 public int getMeasureAfter() { 1106 return measureAfter; 1107 } 1108 1109 public void setMeasureAfter(int measureAfter) { 1110 this.measureAfter = measureAfter; 1111 } 1112 1113 public boolean getAddColumns() { 1114 return addColumns; 1115 } 1116 1117 public void setAddColumns(boolean addColumns) { 1118 this.addColumns = addColumns; 1119 } 1120 1121 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1122 this.inMemoryCompaction = inMemoryCompaction; 1123 } 1124 1125 public MemoryCompactionPolicy getInMemoryCompaction() { 1126 return this.inMemoryCompaction; 1127 } 1128 1129 public long getBufferSize() { 1130 return this.bufferSize; 1131 } 1132 } 1133 1134 /* 1135 * A test. Subclass to particularize what happens per row. 1136 */ 1137 static abstract class TestBase { 1138 // Below is make it so when Tests are all running in the one 1139 // jvm, that they each have a differently seeded Random. 1140 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1141 1142 private static long nextRandomSeed() { 1143 return randomSeed.nextLong(); 1144 } 1145 1146 private final int everyN; 1147 1148 protected final Random rand = new Random(nextRandomSeed()); 1149 protected final Configuration conf; 1150 protected final TestOptions opts; 1151 1152 private final Status status; 1153 1154 private String testName; 1155 private Histogram latencyHistogram; 1156 private Histogram replicaLatencyHistogram; 1157 private Histogram valueSizeHistogram; 1158 private Histogram rpcCallsHistogram; 1159 private Histogram remoteRpcCallsHistogram; 1160 private Histogram millisBetweenNextHistogram; 1161 private Histogram regionsScannedHistogram; 1162 private Histogram bytesInResultsHistogram; 1163 private Histogram bytesInRemoteResultsHistogram; 1164 private RandomDistribution.Zipf zipf; 1165 private long numOfReplyOverLatencyThreshold = 0; 1166 private long numOfReplyFromReplica = 0; 1167 1168 /** 1169 * Note that all subclasses of this class must provide a public constructor that has the exact 1170 * same list of arguments. 1171 */ 1172 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1173 this.conf = conf; 1174 this.opts = options; 1175 this.status = status; 1176 this.testName = this.getClass().getSimpleName(); 1177 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1178 if (options.isValueZipf()) { 1179 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1180 } 1181 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1182 } 1183 1184 int getValueLength(final Random r) { 1185 if (this.opts.isValueRandom()) { 1186 return r.nextInt(opts.valueSize); 1187 } else if (this.opts.isValueZipf()) { 1188 return Math.abs(this.zipf.nextInt()); 1189 } else { 1190 return opts.valueSize; 1191 } 1192 } 1193 1194 void updateValueSize(final Result[] rs) throws IOException { 1195 updateValueSize(rs, 0); 1196 } 1197 1198 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1199 if (rs == null || (latency == 0)) return; 1200 for (Result r : rs) 1201 updateValueSize(r, latency); 1202 } 1203 1204 void updateValueSize(final Result r) throws IOException { 1205 updateValueSize(r, 0); 1206 } 1207 1208 void updateValueSize(final Result r, final long latency) throws IOException { 1209 if (r == null || (latency == 0)) return; 1210 int size = 0; 1211 // update replicaHistogram 1212 if (r.isStale()) { 1213 replicaLatencyHistogram.update(latency / 1000); 1214 numOfReplyFromReplica++; 1215 } 1216 if (!isRandomValueSize()) return; 1217 1218 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1219 size += scanner.current().getValueLength(); 1220 } 1221 updateValueSize(size); 1222 } 1223 1224 void updateValueSize(final int valueSize) { 1225 if (!isRandomValueSize()) return; 1226 this.valueSizeHistogram.update(valueSize); 1227 } 1228 1229 void updateScanMetrics(final ScanMetrics metrics) { 1230 if (metrics == null) return; 1231 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1232 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1233 if (rpcCalls != null) { 1234 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1235 } 1236 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1237 if (remoteRpcCalls != null) { 1238 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1239 } 1240 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1241 if (millisBetweenNext != null) { 1242 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1243 } 1244 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1245 if (regionsScanned != null) { 1246 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1247 } 1248 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1249 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1250 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1251 } 1252 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1253 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1254 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1255 } 1256 } 1257 1258 String generateStatus(final int sr, final int i, final int lr) { 1259 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1260 + getShortLatencyReport() + "]" 1261 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1262 } 1263 1264 boolean isRandomValueSize() { 1265 return opts.valueRandom; 1266 } 1267 1268 protected int getReportingPeriod() { 1269 return opts.period; 1270 } 1271 1272 /** 1273 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1274 */ 1275 public Histogram getLatencyHistogram() { 1276 return latencyHistogram; 1277 } 1278 1279 void testSetup() throws IOException { 1280 // test metrics 1281 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1282 // If it is a replica test, set up histogram for replica. 1283 if (opts.replicas > 1) { 1284 replicaLatencyHistogram = 1285 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1286 } 1287 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1288 // scan metrics 1289 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1290 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1291 millisBetweenNextHistogram = 1292 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1293 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1294 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1295 bytesInRemoteResultsHistogram = 1296 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1297 1298 onStartup(); 1299 } 1300 1301 abstract void onStartup() throws IOException; 1302 1303 void testTakedown() throws IOException { 1304 onTakedown(); 1305 // Print all stats for this thread continuously. 1306 // Synchronize on Test.class so different threads don't intermingle the 1307 // output. We can't use 'this' here because each thread has its own instance of Test class. 1308 synchronized (Test.class) { 1309 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1310 status 1311 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1312 if (opts.replicas > 1) { 1313 status.setStatus("Latency (us) from Replica Regions: " 1314 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1315 } 1316 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1317 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1318 if (valueSizeHistogram.getCount() > 0) { 1319 status.setStatus( 1320 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1321 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1322 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1323 } else { 1324 status.setStatus("No valueSize statistics available"); 1325 } 1326 if (rpcCallsHistogram.getCount() > 0) { 1327 status.setStatus( 1328 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1329 } 1330 if (remoteRpcCallsHistogram.getCount() > 0) { 1331 status.setStatus("remoteRpcCalls (count): " 1332 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1333 } 1334 if (millisBetweenNextHistogram.getCount() > 0) { 1335 status.setStatus("millisBetweenNext (latency): " 1336 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1337 } 1338 if (regionsScannedHistogram.getCount() > 0) { 1339 status.setStatus("regionsScanned (count): " 1340 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1341 } 1342 if (bytesInResultsHistogram.getCount() > 0) { 1343 status.setStatus("bytesInResults (size): " 1344 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1345 } 1346 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1347 status.setStatus("bytesInRemoteResults (size): " 1348 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1349 } 1350 } 1351 } 1352 1353 abstract void onTakedown() throws IOException; 1354 1355 /* 1356 * Run test 1357 * @return Elapsed time. 1358 */ 1359 long test() throws IOException, InterruptedException { 1360 testSetup(); 1361 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1362 final long startTime = System.nanoTime(); 1363 try { 1364 testTimed(); 1365 } finally { 1366 testTakedown(); 1367 } 1368 return (System.nanoTime() - startTime) / 1000000; 1369 } 1370 1371 int getStartRow() { 1372 return opts.startRow; 1373 } 1374 1375 int getLastRow() { 1376 return getStartRow() + opts.perClientRunRows; 1377 } 1378 1379 /** 1380 * Provides an extension point for tests that don't want a per row invocation. 1381 */ 1382 void testTimed() throws IOException, InterruptedException { 1383 int startRow = getStartRow(); 1384 int lastRow = getLastRow(); 1385 // Report on completion of 1/10th of total. 1386 for (int ii = 0; ii < opts.cycles; ii++) { 1387 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1388 for (int i = startRow; i < lastRow; i++) { 1389 if (i % everyN != 0) continue; 1390 long startTime = System.nanoTime(); 1391 boolean requestSent = false; 1392 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1393 try (Scope scope = span.makeCurrent()) { 1394 requestSent = testRow(i, startTime); 1395 } finally { 1396 span.end(); 1397 } 1398 if ((i - startRow) > opts.measureAfter) { 1399 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1400 // first 9 times and sends the actual get request in the 10th iteration. 1401 // We should only set latency when actual request is sent because otherwise 1402 // it turns out to be 0. 1403 if (requestSent) { 1404 long latency = (System.nanoTime() - startTime) / 1000; 1405 latencyHistogram.update(latency); 1406 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1407 numOfReplyOverLatencyThreshold++; 1408 } 1409 } 1410 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1411 status.setStatus(generateStatus(startRow, i, lastRow)); 1412 } 1413 } 1414 } 1415 } 1416 } 1417 1418 /** Returns Subset of the histograms' calculation. */ 1419 public String getShortLatencyReport() { 1420 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1421 } 1422 1423 /** Returns Subset of the histograms' calculation. */ 1424 public String getShortValueSizeReport() { 1425 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1426 } 1427 1428 /** 1429 * Test for individual row. 1430 * @param i Row index. 1431 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1432 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1433 */ 1434 abstract boolean testRow(final int i, final long startTime) 1435 throws IOException, InterruptedException; 1436 } 1437 1438 static abstract class Test extends TestBase { 1439 protected Connection connection; 1440 1441 Test(final Connection con, final TestOptions options, final Status status) { 1442 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1443 this.connection = con; 1444 } 1445 } 1446 1447 static abstract class AsyncTest extends TestBase { 1448 protected AsyncConnection connection; 1449 1450 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1451 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1452 this.connection = con; 1453 } 1454 } 1455 1456 static abstract class TableTest extends Test { 1457 protected Table table; 1458 1459 TableTest(Connection con, TestOptions options, Status status) { 1460 super(con, options, status); 1461 } 1462 1463 @Override 1464 void onStartup() throws IOException { 1465 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1466 } 1467 1468 @Override 1469 void onTakedown() throws IOException { 1470 table.close(); 1471 } 1472 } 1473 1474 /* 1475 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1476 */ 1477 static abstract class MetaTest extends TableTest { 1478 protected int keyLength; 1479 1480 MetaTest(Connection con, TestOptions options, Status status) { 1481 super(con, options, status); 1482 keyLength = Integer.toString(opts.perClientRunRows).length(); 1483 } 1484 1485 @Override 1486 void onTakedown() throws IOException { 1487 // No clean up 1488 } 1489 1490 /* 1491 * Generates Lexicographically ascending strings 1492 */ 1493 protected byte[] getSplitKey(final int i) { 1494 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1495 } 1496 1497 } 1498 1499 static abstract class AsyncTableTest extends AsyncTest { 1500 protected AsyncTable<?> table; 1501 1502 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1503 super(con, options, status); 1504 } 1505 1506 @Override 1507 void onStartup() throws IOException { 1508 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1509 } 1510 1511 @Override 1512 void onTakedown() throws IOException { 1513 } 1514 } 1515 1516 static class AsyncRandomReadTest extends AsyncTableTest { 1517 private final Consistency consistency; 1518 private ArrayList<Get> gets; 1519 1520 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1521 super(con, options, status); 1522 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1523 if (opts.multiGet > 0) { 1524 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1525 this.gets = new ArrayList<>(opts.multiGet); 1526 } 1527 } 1528 1529 @Override 1530 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1531 if (opts.randomSleep > 0) { 1532 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1533 } 1534 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1535 for (int family = 0; family < opts.families; family++) { 1536 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1537 if (opts.addColumns) { 1538 for (int column = 0; column < opts.columns; column++) { 1539 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1540 get.addColumn(familyName, qualifier); 1541 } 1542 } else { 1543 get.addFamily(familyName); 1544 } 1545 } 1546 if (opts.filterAll) { 1547 get.setFilter(new FilterAllFilter()); 1548 } 1549 get.setConsistency(consistency); 1550 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1551 try { 1552 if (opts.multiGet > 0) { 1553 this.gets.add(get); 1554 if (this.gets.size() == opts.multiGet) { 1555 Result[] rs = 1556 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1557 updateValueSize(rs); 1558 this.gets.clear(); 1559 } else { 1560 return false; 1561 } 1562 } else { 1563 updateValueSize(this.table.get(get).get()); 1564 } 1565 } catch (ExecutionException e) { 1566 throw new IOException(e); 1567 } 1568 return true; 1569 } 1570 1571 public static RuntimeException runtime(Throwable e) { 1572 if (e instanceof RuntimeException) { 1573 return (RuntimeException) e; 1574 } 1575 return new RuntimeException(e); 1576 } 1577 1578 public static <V> V propagate(Callable<V> callable) { 1579 try { 1580 return callable.call(); 1581 } catch (Exception e) { 1582 throw runtime(e); 1583 } 1584 } 1585 1586 @Override 1587 protected int getReportingPeriod() { 1588 int period = opts.perClientRunRows / 10; 1589 return period == 0 ? opts.perClientRunRows : period; 1590 } 1591 1592 @Override 1593 protected void testTakedown() throws IOException { 1594 if (this.gets != null && this.gets.size() > 0) { 1595 this.table.get(gets); 1596 this.gets.clear(); 1597 } 1598 super.testTakedown(); 1599 } 1600 } 1601 1602 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1603 1604 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1605 super(con, options, status); 1606 } 1607 1608 @Override 1609 protected byte[] generateRow(final int i) { 1610 return getRandomRow(this.rand, opts.totalRows); 1611 } 1612 } 1613 1614 static class AsyncScanTest extends AsyncTableTest { 1615 private ResultScanner testScanner; 1616 private AsyncTable<?> asyncTable; 1617 1618 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1619 super(con, options, status); 1620 } 1621 1622 @Override 1623 void onStartup() throws IOException { 1624 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1625 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1626 } 1627 1628 @Override 1629 void testTakedown() throws IOException { 1630 if (this.testScanner != null) { 1631 updateScanMetrics(this.testScanner.getScanMetrics()); 1632 this.testScanner.close(); 1633 } 1634 super.testTakedown(); 1635 } 1636 1637 @Override 1638 boolean testRow(final int i, final long startTime) throws IOException { 1639 if (this.testScanner == null) { 1640 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1641 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1642 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1643 for (int family = 0; family < opts.families; family++) { 1644 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1645 if (opts.addColumns) { 1646 for (int column = 0; column < opts.columns; column++) { 1647 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1648 scan.addColumn(familyName, qualifier); 1649 } 1650 } else { 1651 scan.addFamily(familyName); 1652 } 1653 } 1654 if (opts.filterAll) { 1655 scan.setFilter(new FilterAllFilter()); 1656 } 1657 this.testScanner = asyncTable.getScanner(scan); 1658 } 1659 Result r = testScanner.next(); 1660 updateValueSize(r); 1661 return true; 1662 } 1663 } 1664 1665 static class AsyncSequentialReadTest extends AsyncTableTest { 1666 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1667 super(con, options, status); 1668 } 1669 1670 @Override 1671 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1672 Get get = new Get(format(i)); 1673 for (int family = 0; family < opts.families; family++) { 1674 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1675 if (opts.addColumns) { 1676 for (int column = 0; column < opts.columns; column++) { 1677 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1678 get.addColumn(familyName, qualifier); 1679 } 1680 } else { 1681 get.addFamily(familyName); 1682 } 1683 } 1684 if (opts.filterAll) { 1685 get.setFilter(new FilterAllFilter()); 1686 } 1687 try { 1688 updateValueSize(table.get(get).get()); 1689 } catch (ExecutionException e) { 1690 throw new IOException(e); 1691 } 1692 return true; 1693 } 1694 } 1695 1696 static class AsyncSequentialWriteTest extends AsyncTableTest { 1697 private ArrayList<Put> puts; 1698 1699 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1700 super(con, options, status); 1701 if (opts.multiPut > 0) { 1702 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1703 this.puts = new ArrayList<>(opts.multiPut); 1704 } 1705 } 1706 1707 protected byte[] generateRow(final int i) { 1708 return format(i); 1709 } 1710 1711 @Override 1712 @SuppressWarnings("ReturnValueIgnored") 1713 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1714 byte[] row = generateRow(i); 1715 Put put = new Put(row); 1716 for (int family = 0; family < opts.families; family++) { 1717 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1718 for (int column = 0; column < opts.columns; column++) { 1719 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1720 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1721 if (opts.useTags) { 1722 byte[] tag = generateData(this.rand, TAG_LENGTH); 1723 Tag[] tags = new Tag[opts.noOfTags]; 1724 for (int n = 0; n < opts.noOfTags; n++) { 1725 Tag t = new ArrayBackedTag((byte) n, tag); 1726 tags[n] = t; 1727 } 1728 KeyValue kv = 1729 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1730 put.add(kv); 1731 updateValueSize(kv.getValueLength()); 1732 } else { 1733 put.addColumn(familyName, qualifier, value); 1734 updateValueSize(value.length); 1735 } 1736 } 1737 } 1738 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1739 try { 1740 table.put(put).get(); 1741 if (opts.multiPut > 0) { 1742 this.puts.add(put); 1743 if (this.puts.size() == opts.multiPut) { 1744 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1745 this.puts.clear(); 1746 } else { 1747 return false; 1748 } 1749 } else { 1750 table.put(put).get(); 1751 } 1752 } catch (ExecutionException e) { 1753 throw new IOException(e); 1754 } 1755 return true; 1756 } 1757 } 1758 1759 static abstract class BufferedMutatorTest extends Test { 1760 protected BufferedMutator mutator; 1761 protected Table table; 1762 1763 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1764 super(con, options, status); 1765 } 1766 1767 @Override 1768 void onStartup() throws IOException { 1769 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1770 p.writeBufferSize(opts.bufferSize); 1771 this.mutator = connection.getBufferedMutator(p); 1772 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1773 } 1774 1775 @Override 1776 void onTakedown() throws IOException { 1777 mutator.close(); 1778 table.close(); 1779 } 1780 } 1781 1782 static class RandomSeekScanTest extends TableTest { 1783 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1784 super(con, options, status); 1785 } 1786 1787 @Override 1788 boolean testRow(final int i, final long startTime) throws IOException { 1789 Scan scan = 1790 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1791 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1792 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1793 FilterList list = new FilterList(); 1794 for (int family = 0; family < opts.families; family++) { 1795 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1796 if (opts.addColumns) { 1797 for (int column = 0; column < opts.columns; column++) { 1798 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1799 scan.addColumn(familyName, qualifier); 1800 } 1801 } else { 1802 scan.addFamily(familyName); 1803 } 1804 } 1805 if (opts.filterAll) { 1806 list.addFilter(new FilterAllFilter()); 1807 } 1808 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1809 scan.setFilter(list); 1810 ResultScanner s = this.table.getScanner(scan); 1811 try { 1812 for (Result rr; (rr = s.next()) != null;) { 1813 updateValueSize(rr); 1814 } 1815 } finally { 1816 updateScanMetrics(s.getScanMetrics()); 1817 s.close(); 1818 } 1819 return true; 1820 } 1821 1822 @Override 1823 protected int getReportingPeriod() { 1824 int period = opts.perClientRunRows / 100; 1825 return period == 0 ? opts.perClientRunRows : period; 1826 } 1827 1828 } 1829 1830 static abstract class RandomScanWithRangeTest extends TableTest { 1831 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1832 super(con, options, status); 1833 } 1834 1835 @Override 1836 boolean testRow(final int i, final long startTime) throws IOException { 1837 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1838 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1839 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1840 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1841 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1842 for (int family = 0; family < opts.families; family++) { 1843 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1844 if (opts.addColumns) { 1845 for (int column = 0; column < opts.columns; column++) { 1846 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1847 scan.addColumn(familyName, qualifier); 1848 } 1849 } else { 1850 scan.addFamily(familyName); 1851 } 1852 } 1853 if (opts.filterAll) { 1854 scan.setFilter(new FilterAllFilter()); 1855 } 1856 Result r = null; 1857 int count = 0; 1858 ResultScanner s = this.table.getScanner(scan); 1859 try { 1860 for (; (r = s.next()) != null;) { 1861 updateValueSize(r); 1862 count++; 1863 } 1864 if (i % 100 == 0) { 1865 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1866 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1867 count)); 1868 } 1869 } finally { 1870 updateScanMetrics(s.getScanMetrics()); 1871 s.close(); 1872 } 1873 return true; 1874 } 1875 1876 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1877 1878 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1879 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1880 int stop = start + maxRange; 1881 return new Pair<>(format(start), format(stop)); 1882 } 1883 1884 @Override 1885 protected int getReportingPeriod() { 1886 int period = opts.perClientRunRows / 100; 1887 return period == 0 ? opts.perClientRunRows : period; 1888 } 1889 } 1890 1891 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1892 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1893 super(con, options, status); 1894 } 1895 1896 @Override 1897 protected Pair<byte[], byte[]> getStartAndStopRow() { 1898 return generateStartAndStopRows(10); 1899 } 1900 } 1901 1902 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1903 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1904 super(con, options, status); 1905 } 1906 1907 @Override 1908 protected Pair<byte[], byte[]> getStartAndStopRow() { 1909 return generateStartAndStopRows(100); 1910 } 1911 } 1912 1913 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1914 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1915 super(con, options, status); 1916 } 1917 1918 @Override 1919 protected Pair<byte[], byte[]> getStartAndStopRow() { 1920 return generateStartAndStopRows(1000); 1921 } 1922 } 1923 1924 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1925 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1926 super(con, options, status); 1927 } 1928 1929 @Override 1930 protected Pair<byte[], byte[]> getStartAndStopRow() { 1931 return generateStartAndStopRows(10000); 1932 } 1933 } 1934 1935 static class RandomReadTest extends TableTest { 1936 private final Consistency consistency; 1937 private ArrayList<Get> gets; 1938 1939 RandomReadTest(Connection con, TestOptions options, Status status) { 1940 super(con, options, status); 1941 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1942 if (opts.multiGet > 0) { 1943 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1944 this.gets = new ArrayList<>(opts.multiGet); 1945 } 1946 } 1947 1948 @Override 1949 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1950 if (opts.randomSleep > 0) { 1951 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1952 } 1953 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1954 for (int family = 0; family < opts.families; family++) { 1955 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1956 if (opts.addColumns) { 1957 for (int column = 0; column < opts.columns; column++) { 1958 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1959 get.addColumn(familyName, qualifier); 1960 } 1961 } else { 1962 get.addFamily(familyName); 1963 } 1964 } 1965 if (opts.filterAll) { 1966 get.setFilter(new FilterAllFilter()); 1967 } 1968 get.setConsistency(consistency); 1969 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1970 if (opts.multiGet > 0) { 1971 this.gets.add(get); 1972 if (this.gets.size() == opts.multiGet) { 1973 Result[] rs = this.table.get(this.gets); 1974 if (opts.replicas > 1) { 1975 long latency = System.nanoTime() - startTime; 1976 updateValueSize(rs, latency); 1977 } else { 1978 updateValueSize(rs); 1979 } 1980 this.gets.clear(); 1981 } else { 1982 return false; 1983 } 1984 } else { 1985 if (opts.replicas > 1) { 1986 Result r = this.table.get(get); 1987 long latency = System.nanoTime() - startTime; 1988 updateValueSize(r, latency); 1989 } else { 1990 updateValueSize(this.table.get(get)); 1991 } 1992 } 1993 return true; 1994 } 1995 1996 @Override 1997 protected int getReportingPeriod() { 1998 int period = opts.perClientRunRows / 10; 1999 return period == 0 ? opts.perClientRunRows : period; 2000 } 2001 2002 @Override 2003 protected void testTakedown() throws IOException { 2004 if (this.gets != null && this.gets.size() > 0) { 2005 this.table.get(gets); 2006 this.gets.clear(); 2007 } 2008 super.testTakedown(); 2009 } 2010 } 2011 2012 /* 2013 * Send random reads against fake regions inserted by MetaWriteTest 2014 */ 2015 static class MetaRandomReadTest extends MetaTest { 2016 private RegionLocator regionLocator; 2017 2018 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2019 super(con, options, status); 2020 LOG.info("call getRegionLocation"); 2021 } 2022 2023 @Override 2024 void onStartup() throws IOException { 2025 super.onStartup(); 2026 this.regionLocator = connection.getRegionLocator(table.getName()); 2027 } 2028 2029 @Override 2030 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2031 if (opts.randomSleep > 0) { 2032 Thread.sleep(rand.nextInt(opts.randomSleep)); 2033 } 2034 HRegionLocation hRegionLocation = 2035 regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true); 2036 LOG.debug("get location for region: " + hRegionLocation); 2037 return true; 2038 } 2039 2040 @Override 2041 protected int getReportingPeriod() { 2042 int period = opts.perClientRunRows / 10; 2043 return period == 0 ? opts.perClientRunRows : period; 2044 } 2045 2046 @Override 2047 protected void testTakedown() throws IOException { 2048 super.testTakedown(); 2049 } 2050 } 2051 2052 static class RandomWriteTest extends SequentialWriteTest { 2053 RandomWriteTest(Connection con, TestOptions options, Status status) { 2054 super(con, options, status); 2055 } 2056 2057 @Override 2058 protected byte[] generateRow(final int i) { 2059 return getRandomRow(this.rand, opts.totalRows); 2060 } 2061 2062 } 2063 2064 static class ScanTest extends TableTest { 2065 private ResultScanner testScanner; 2066 2067 ScanTest(Connection con, TestOptions options, Status status) { 2068 super(con, options, status); 2069 } 2070 2071 @Override 2072 void testTakedown() throws IOException { 2073 if (this.testScanner != null) { 2074 this.testScanner.close(); 2075 } 2076 super.testTakedown(); 2077 } 2078 2079 @Override 2080 boolean testRow(final int i, final long startTime) throws IOException { 2081 if (this.testScanner == null) { 2082 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2083 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2084 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2085 for (int family = 0; family < opts.families; family++) { 2086 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2087 if (opts.addColumns) { 2088 for (int column = 0; column < opts.columns; column++) { 2089 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2090 scan.addColumn(familyName, qualifier); 2091 } 2092 } else { 2093 scan.addFamily(familyName); 2094 } 2095 } 2096 if (opts.filterAll) { 2097 scan.setFilter(new FilterAllFilter()); 2098 } 2099 this.testScanner = table.getScanner(scan); 2100 } 2101 Result r = testScanner.next(); 2102 updateValueSize(r); 2103 return true; 2104 } 2105 } 2106 2107 /** 2108 * Base class for operations that are CAS-like; that read a value and then set it based off what 2109 * they read. In this category is increment, append, checkAndPut, etc. 2110 * <p> 2111 * These operations also want some concurrency going on. Usually when these tests run, they 2112 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2113 * same key space. We do this with our getStartRow and getLastRow overrides. 2114 */ 2115 static abstract class CASTableTest extends TableTest { 2116 private final byte[] qualifier; 2117 2118 CASTableTest(Connection con, TestOptions options, Status status) { 2119 super(con, options, status); 2120 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2121 } 2122 2123 byte[] getQualifier() { 2124 return this.qualifier; 2125 } 2126 2127 @Override 2128 int getStartRow() { 2129 return 0; 2130 } 2131 2132 @Override 2133 int getLastRow() { 2134 return opts.perClientRunRows; 2135 } 2136 } 2137 2138 static class IncrementTest extends CASTableTest { 2139 IncrementTest(Connection con, TestOptions options, Status status) { 2140 super(con, options, status); 2141 } 2142 2143 @Override 2144 boolean testRow(final int i, final long startTime) throws IOException { 2145 Increment increment = new Increment(format(i)); 2146 // unlike checkAndXXX tests, which make most sense to do on a single value, 2147 // if multiple families are specified for an increment test we assume it is 2148 // meant to raise the work factor 2149 for (int family = 0; family < opts.families; family++) { 2150 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2151 increment.addColumn(familyName, getQualifier(), 1l); 2152 } 2153 updateValueSize(this.table.increment(increment)); 2154 return true; 2155 } 2156 } 2157 2158 static class AppendTest extends CASTableTest { 2159 AppendTest(Connection con, TestOptions options, Status status) { 2160 super(con, options, status); 2161 } 2162 2163 @Override 2164 boolean testRow(final int i, final long startTime) throws IOException { 2165 byte[] bytes = format(i); 2166 Append append = new Append(bytes); 2167 // unlike checkAndXXX tests, which make most sense to do on a single value, 2168 // if multiple families are specified for an append test we assume it is 2169 // meant to raise the work factor 2170 for (int family = 0; family < opts.families; family++) { 2171 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2172 append.addColumn(familyName, getQualifier(), bytes); 2173 } 2174 updateValueSize(this.table.append(append)); 2175 return true; 2176 } 2177 } 2178 2179 static class CheckAndMutateTest extends CASTableTest { 2180 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2181 super(con, options, status); 2182 } 2183 2184 @Override 2185 boolean testRow(final int i, final long startTime) throws IOException { 2186 final byte[] bytes = format(i); 2187 // checkAndXXX tests operate on only a single value 2188 // Put a known value so when we go to check it, it is there. 2189 Put put = new Put(bytes); 2190 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2191 this.table.put(put); 2192 RowMutations mutations = new RowMutations(bytes); 2193 mutations.add(put); 2194 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2195 .thenMutate(mutations); 2196 return true; 2197 } 2198 } 2199 2200 static class CheckAndPutTest extends CASTableTest { 2201 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2202 super(con, options, status); 2203 } 2204 2205 @Override 2206 boolean testRow(final int i, final long startTime) throws IOException { 2207 final byte[] bytes = format(i); 2208 // checkAndXXX tests operate on only a single value 2209 // Put a known value so when we go to check it, it is there. 2210 Put put = new Put(bytes); 2211 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2212 this.table.put(put); 2213 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2214 .thenPut(put); 2215 return true; 2216 } 2217 } 2218 2219 static class CheckAndDeleteTest extends CASTableTest { 2220 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2221 super(con, options, status); 2222 } 2223 2224 @Override 2225 boolean testRow(final int i, final long startTime) throws IOException { 2226 final byte[] bytes = format(i); 2227 // checkAndXXX tests operate on only a single value 2228 // Put a known value so when we go to check it, it is there. 2229 Put put = new Put(bytes); 2230 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2231 this.table.put(put); 2232 Delete delete = new Delete(put.getRow()); 2233 delete.addColumn(FAMILY_ZERO, getQualifier()); 2234 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2235 .thenDelete(delete); 2236 return true; 2237 } 2238 } 2239 2240 /* 2241 * Delete all fake regions inserted to meta table by MetaWriteTest. 2242 */ 2243 static class CleanMetaTest extends MetaTest { 2244 CleanMetaTest(Connection con, TestOptions options, Status status) { 2245 super(con, options, status); 2246 } 2247 2248 @Override 2249 boolean testRow(final int i, final long startTime) throws IOException { 2250 try { 2251 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2252 .getRegionLocation(getSplitKey(i), false).getRegion(); 2253 LOG.debug("deleting region from meta: " + regionInfo); 2254 2255 Delete delete = 2256 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2257 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2258 t.delete(delete); 2259 } 2260 } catch (IOException ie) { 2261 // Log and continue 2262 LOG.error("cannot find region with start key: " + i); 2263 } 2264 return true; 2265 } 2266 } 2267 2268 static class SequentialReadTest extends TableTest { 2269 SequentialReadTest(Connection con, TestOptions options, Status status) { 2270 super(con, options, status); 2271 } 2272 2273 @Override 2274 boolean testRow(final int i, final long startTime) throws IOException { 2275 Get get = new Get(format(i)); 2276 for (int family = 0; family < opts.families; family++) { 2277 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2278 if (opts.addColumns) { 2279 for (int column = 0; column < opts.columns; column++) { 2280 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2281 get.addColumn(familyName, qualifier); 2282 } 2283 } else { 2284 get.addFamily(familyName); 2285 } 2286 } 2287 if (opts.filterAll) { 2288 get.setFilter(new FilterAllFilter()); 2289 } 2290 updateValueSize(table.get(get)); 2291 return true; 2292 } 2293 } 2294 2295 static class SequentialWriteTest extends BufferedMutatorTest { 2296 private ArrayList<Put> puts; 2297 2298 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2299 super(con, options, status); 2300 if (opts.multiPut > 0) { 2301 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2302 this.puts = new ArrayList<>(opts.multiPut); 2303 } 2304 } 2305 2306 protected byte[] generateRow(final int i) { 2307 return format(i); 2308 } 2309 2310 @Override 2311 boolean testRow(final int i, final long startTime) throws IOException { 2312 byte[] row = generateRow(i); 2313 Put put = new Put(row); 2314 for (int family = 0; family < opts.families; family++) { 2315 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2316 for (int column = 0; column < opts.columns; column++) { 2317 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2318 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2319 if (opts.useTags) { 2320 byte[] tag = generateData(this.rand, TAG_LENGTH); 2321 Tag[] tags = new Tag[opts.noOfTags]; 2322 for (int n = 0; n < opts.noOfTags; n++) { 2323 Tag t = new ArrayBackedTag((byte) n, tag); 2324 tags[n] = t; 2325 } 2326 KeyValue kv = 2327 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2328 put.add(kv); 2329 updateValueSize(kv.getValueLength()); 2330 } else { 2331 put.addColumn(familyName, qualifier, value); 2332 updateValueSize(value.length); 2333 } 2334 } 2335 } 2336 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2337 if (opts.autoFlush) { 2338 if (opts.multiPut > 0) { 2339 this.puts.add(put); 2340 if (this.puts.size() == opts.multiPut) { 2341 table.put(this.puts); 2342 this.puts.clear(); 2343 } else { 2344 return false; 2345 } 2346 } else { 2347 table.put(put); 2348 } 2349 } else { 2350 mutator.mutate(put); 2351 } 2352 return true; 2353 } 2354 } 2355 2356 /* 2357 * Insert fake regions into meta table with contiguous split keys. 2358 */ 2359 static class MetaWriteTest extends MetaTest { 2360 2361 MetaWriteTest(Connection con, TestOptions options, Status status) { 2362 super(con, options, status); 2363 } 2364 2365 @Override 2366 boolean testRow(final int i, final long startTime) throws IOException { 2367 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2368 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2369 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2370 regionInfos.add(regionInfo); 2371 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2372 2373 // write the serverName columns 2374 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2375 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2376 EnvironmentEdgeManager.currentTime()); 2377 return true; 2378 } 2379 } 2380 2381 static class FilteredScanTest extends TableTest { 2382 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2383 2384 FilteredScanTest(Connection con, TestOptions options, Status status) { 2385 super(con, options, status); 2386 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2387 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2388 + ". This could take a very long time."); 2389 } 2390 } 2391 2392 @Override 2393 boolean testRow(int i, final long startTime) throws IOException { 2394 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2395 Scan scan = constructScan(value); 2396 ResultScanner scanner = null; 2397 try { 2398 scanner = this.table.getScanner(scan); 2399 for (Result r = null; (r = scanner.next()) != null;) { 2400 updateValueSize(r); 2401 } 2402 } finally { 2403 if (scanner != null) { 2404 updateScanMetrics(scanner.getScanMetrics()); 2405 scanner.close(); 2406 } 2407 } 2408 return true; 2409 } 2410 2411 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2412 FilterList list = new FilterList(); 2413 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2414 new BinaryComparator(valuePrefix)); 2415 list.addFilter(filter); 2416 if (opts.filterAll) { 2417 list.addFilter(new FilterAllFilter()); 2418 } 2419 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2420 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2421 .setScanMetricsEnabled(true); 2422 if (opts.addColumns) { 2423 for (int column = 0; column < opts.columns; column++) { 2424 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2425 scan.addColumn(FAMILY_ZERO, qualifier); 2426 } 2427 } else { 2428 scan.addFamily(FAMILY_ZERO); 2429 } 2430 scan.setFilter(list); 2431 return scan; 2432 } 2433 } 2434 2435 /** 2436 * Compute a throughput rate in MB/s. 2437 * @param rows Number of records consumed. 2438 * @param timeMs Time taken in milliseconds. 2439 * @return String value with label, ie '123.76 MB/s' 2440 */ 2441 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2442 int columns) { 2443 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2444 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2445 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2446 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2447 return FMT.format(mbps) + " MB/s"; 2448 } 2449 2450 /* 2451 * Format passed integer. 2452 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does 2453 * absolute in case number is negative). 2454 */ 2455 public static byte[] format(final int number) { 2456 byte[] b = new byte[ROW_LENGTH]; 2457 int d = Math.abs(number); 2458 for (int i = b.length - 1; i >= 0; i--) { 2459 b[i] = (byte) ((d % 10) + '0'); 2460 d /= 10; 2461 } 2462 return b; 2463 } 2464 2465 /* 2466 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2467 * test, generation of the key and value consumes about 30% of CPU time. 2468 * @return Generated random value to insert into a table cell. 2469 */ 2470 public static byte[] generateData(final Random r, int length) { 2471 byte[] b = new byte[length]; 2472 int i; 2473 2474 for (i = 0; i < (length - 8); i += 8) { 2475 b[i] = (byte) (65 + r.nextInt(26)); 2476 b[i + 1] = b[i]; 2477 b[i + 2] = b[i]; 2478 b[i + 3] = b[i]; 2479 b[i + 4] = b[i]; 2480 b[i + 5] = b[i]; 2481 b[i + 6] = b[i]; 2482 b[i + 7] = b[i]; 2483 } 2484 2485 byte a = (byte) (65 + r.nextInt(26)); 2486 for (; i < length; i++) { 2487 b[i] = a; 2488 } 2489 return b; 2490 } 2491 2492 static byte[] getRandomRow(final Random random, final int totalRows) { 2493 return format(generateRandomRow(random, totalRows)); 2494 } 2495 2496 static int generateRandomRow(final Random random, final int totalRows) { 2497 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2498 } 2499 2500 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2501 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2502 throws IOException, InterruptedException { 2503 status.setStatus( 2504 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2505 long totalElapsedTime; 2506 2507 final TestBase t; 2508 try { 2509 if (AsyncTest.class.isAssignableFrom(cmd)) { 2510 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2511 Constructor<? extends AsyncTest> constructor = 2512 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2513 t = constructor.newInstance(asyncCon, opts, status); 2514 } else { 2515 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2516 Constructor<? extends Test> constructor = 2517 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2518 t = constructor.newInstance(con, opts, status); 2519 } 2520 } catch (NoSuchMethodException e) { 2521 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2522 + ". It does not provide a constructor as described by " 2523 + "the javadoc comment. Available constructors are: " 2524 + Arrays.toString(cmd.getConstructors())); 2525 } catch (Exception e) { 2526 throw new IllegalStateException("Failed to construct command class", e); 2527 } 2528 totalElapsedTime = t.test(); 2529 2530 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2531 + " for " + opts.perClientRunRows + " rows" + " (" 2532 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2533 getAverageValueLength(opts), opts.families, opts.columns) 2534 + ")"); 2535 2536 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2537 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2538 } 2539 2540 private static int getAverageValueLength(final TestOptions opts) { 2541 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2542 } 2543 2544 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2545 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2546 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2547 // the TestOptions introspection for us and dump the output in a readable format. 2548 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2549 Admin admin = null; 2550 Connection connection = null; 2551 try { 2552 connection = ConnectionFactory.createConnection(getConf()); 2553 admin = connection.getAdmin(); 2554 checkTable(admin, opts); 2555 } finally { 2556 if (admin != null) admin.close(); 2557 if (connection != null) connection.close(); 2558 } 2559 if (opts.nomapred) { 2560 doLocalClients(opts, getConf()); 2561 } else { 2562 doMapReduce(opts, getConf()); 2563 } 2564 } 2565 2566 protected void printUsage() { 2567 printUsage(PE_COMMAND_SHORTNAME, null); 2568 } 2569 2570 protected static void printUsage(final String message) { 2571 printUsage(PE_COMMAND_SHORTNAME, message); 2572 } 2573 2574 protected static void printUsageAndExit(final String message, final int exitCode) { 2575 printUsage(message); 2576 System.exit(exitCode); 2577 } 2578 2579 protected static void printUsage(final String shortName, final String message) { 2580 if (message != null && message.length() > 0) { 2581 System.err.println(message); 2582 } 2583 System.err.print("Usage: hbase " + shortName); 2584 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2585 System.err.println(); 2586 System.err.println("General Options:"); 2587 System.err.println( 2588 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2589 System.err 2590 .println(" oneCon all the threads share the same connection. Default: False"); 2591 System.err.println(" connCount connections all threads share. " 2592 + "For example, if set to 2, then all thread share 2 connection. " 2593 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2594 + "if not, connCount=thread number"); 2595 2596 System.err.println(" sampleRate Execute test on a sample of total " 2597 + "rows. Only supported by randomRead. Default: 1.0"); 2598 System.err.println(" period Report every 'period' rows: " 2599 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2600 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2601 System.err.println( 2602 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2603 System.err.println(" latency Set to report operation latencies. Default: False"); 2604 System.err.println(" latencyThreshold Set to report number of operations with latency " 2605 + "over lantencyThreshold, unit in millisecond, default 0"); 2606 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2607 + " rows have been treated. Default: 0"); 2608 System.err 2609 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2610 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2611 + "'valueSize'; set on read for stats on size: Default: Not set."); 2612 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2613 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2614 System.err.println(); 2615 System.err.println("Table Creation / Write Tests:"); 2616 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2617 System.err.println( 2618 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2619 + ". In case of randomReads and randomSeekScans this could" 2620 + " be specified along with --size to specify the number of rows to be scanned within" 2621 + " the total range specified by the size."); 2622 System.err.println( 2623 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2624 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2625 + " use size to specify the end range and --rows" 2626 + " specifies the number of rows within that range. " + "Default: 1.0."); 2627 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2628 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2629 System.err.println( 2630 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2631 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2632 + "'valueSize' in zipf form: Default: Not set."); 2633 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2634 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2635 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2636 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2637 System.err.println(" presplit Create presplit table. If a table with same name exists," 2638 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2639 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2640 System.err.println( 2641 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2642 System.err.println(" numoftags Specify the no of tags that would be needed. " 2643 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2644 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2645 System.err.println(" columns Columns to write per row. Default: 1"); 2646 System.err 2647 .println(" families Specify number of column families for the table. Default: 1"); 2648 System.err.println(); 2649 System.err.println("Read Tests:"); 2650 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2651 + " there by not returning any thing back to the client. Helps to check the server side" 2652 + " performance. Uses FilterAllFilter internally. "); 2653 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2654 + "by randomRead. Default: disabled"); 2655 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2656 + "inmemory as far as possible. Not guaranteed that reads are always served " 2657 + "from memory. Default: false"); 2658 System.err 2659 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2660 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2661 System.err 2662 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2663 + "Uses the CompactingMemstore"); 2664 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2665 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2666 System.err.println( 2667 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2668 System.err.println(" caching Scan caching to use. Default: 30"); 2669 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2670 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2671 System.err.println( 2672 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2673 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2674 System.err.println(); 2675 System.err.println(" Note: -D properties will be applied to the conf used. "); 2676 System.err.println(" For example: "); 2677 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2678 System.err.println(" -Dmapreduce.task.timeout=60000"); 2679 System.err.println(); 2680 System.err.println("Command:"); 2681 for (CmdDescriptor command : COMMANDS.values()) { 2682 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2683 } 2684 System.err.println(); 2685 System.err.println("Args:"); 2686 System.err.println(" nclients Integer. Required. Total number of clients " 2687 + "(and HRegionServers) running. 1 <= value <= 500"); 2688 System.err.println("Examples:"); 2689 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2690 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2691 System.err.println(" To run 10 clients doing increments over ten rows:"); 2692 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2693 } 2694 2695 /** 2696 * Parse options passed in via an arguments array. Assumes that array has been split on 2697 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2698 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2699 * arguments. 2700 */ 2701 static TestOptions parseOpts(Queue<String> args) { 2702 TestOptions opts = new TestOptions(); 2703 2704 String cmd = null; 2705 while ((cmd = args.poll()) != null) { 2706 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2707 // place item back onto queue so that caller knows parsing was incomplete 2708 args.add(cmd); 2709 break; 2710 } 2711 2712 final String nmr = "--nomapred"; 2713 if (cmd.startsWith(nmr)) { 2714 opts.nomapred = true; 2715 continue; 2716 } 2717 2718 final String rows = "--rows="; 2719 if (cmd.startsWith(rows)) { 2720 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2721 continue; 2722 } 2723 2724 final String cycles = "--cycles="; 2725 if (cmd.startsWith(cycles)) { 2726 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2727 continue; 2728 } 2729 2730 final String sampleRate = "--sampleRate="; 2731 if (cmd.startsWith(sampleRate)) { 2732 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2733 continue; 2734 } 2735 2736 final String table = "--table="; 2737 if (cmd.startsWith(table)) { 2738 opts.tableName = cmd.substring(table.length()); 2739 continue; 2740 } 2741 2742 final String startRow = "--startRow="; 2743 if (cmd.startsWith(startRow)) { 2744 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2745 continue; 2746 } 2747 2748 final String compress = "--compress="; 2749 if (cmd.startsWith(compress)) { 2750 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2751 continue; 2752 } 2753 2754 final String encryption = "--encryption="; 2755 if (cmd.startsWith(encryption)) { 2756 opts.encryption = cmd.substring(encryption.length()); 2757 continue; 2758 } 2759 2760 final String traceRate = "--traceRate="; 2761 if (cmd.startsWith(traceRate)) { 2762 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2763 continue; 2764 } 2765 2766 final String blockEncoding = "--blockEncoding="; 2767 if (cmd.startsWith(blockEncoding)) { 2768 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2769 continue; 2770 } 2771 2772 final String flushCommits = "--flushCommits="; 2773 if (cmd.startsWith(flushCommits)) { 2774 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2775 continue; 2776 } 2777 2778 final String writeToWAL = "--writeToWAL="; 2779 if (cmd.startsWith(writeToWAL)) { 2780 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2781 continue; 2782 } 2783 2784 final String presplit = "--presplit="; 2785 if (cmd.startsWith(presplit)) { 2786 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2787 continue; 2788 } 2789 2790 final String inMemory = "--inmemory="; 2791 if (cmd.startsWith(inMemory)) { 2792 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2793 continue; 2794 } 2795 2796 final String autoFlush = "--autoFlush="; 2797 if (cmd.startsWith(autoFlush)) { 2798 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2799 continue; 2800 } 2801 2802 final String onceCon = "--oneCon="; 2803 if (cmd.startsWith(onceCon)) { 2804 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2805 continue; 2806 } 2807 2808 final String connCount = "--connCount="; 2809 if (cmd.startsWith(connCount)) { 2810 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2811 continue; 2812 } 2813 2814 final String latencyThreshold = "--latencyThreshold="; 2815 if (cmd.startsWith(latencyThreshold)) { 2816 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2817 continue; 2818 } 2819 2820 final String latency = "--latency"; 2821 if (cmd.startsWith(latency)) { 2822 opts.reportLatency = true; 2823 continue; 2824 } 2825 2826 final String multiGet = "--multiGet="; 2827 if (cmd.startsWith(multiGet)) { 2828 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2829 continue; 2830 } 2831 2832 final String multiPut = "--multiPut="; 2833 if (cmd.startsWith(multiPut)) { 2834 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2835 continue; 2836 } 2837 2838 final String useTags = "--usetags="; 2839 if (cmd.startsWith(useTags)) { 2840 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2841 continue; 2842 } 2843 2844 final String noOfTags = "--numoftags="; 2845 if (cmd.startsWith(noOfTags)) { 2846 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2847 continue; 2848 } 2849 2850 final String replicas = "--replicas="; 2851 if (cmd.startsWith(replicas)) { 2852 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2853 continue; 2854 } 2855 2856 final String filterOutAll = "--filterAll"; 2857 if (cmd.startsWith(filterOutAll)) { 2858 opts.filterAll = true; 2859 continue; 2860 } 2861 2862 final String size = "--size="; 2863 if (cmd.startsWith(size)) { 2864 opts.size = Float.parseFloat(cmd.substring(size.length())); 2865 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2866 continue; 2867 } 2868 2869 final String splitPolicy = "--splitPolicy="; 2870 if (cmd.startsWith(splitPolicy)) { 2871 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2872 continue; 2873 } 2874 2875 final String randomSleep = "--randomSleep="; 2876 if (cmd.startsWith(randomSleep)) { 2877 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2878 continue; 2879 } 2880 2881 final String measureAfter = "--measureAfter="; 2882 if (cmd.startsWith(measureAfter)) { 2883 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2884 continue; 2885 } 2886 2887 final String bloomFilter = "--bloomFilter="; 2888 if (cmd.startsWith(bloomFilter)) { 2889 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2890 continue; 2891 } 2892 2893 final String blockSize = "--blockSize="; 2894 if (cmd.startsWith(blockSize)) { 2895 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2896 continue; 2897 } 2898 2899 final String valueSize = "--valueSize="; 2900 if (cmd.startsWith(valueSize)) { 2901 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2902 continue; 2903 } 2904 2905 final String valueRandom = "--valueRandom"; 2906 if (cmd.startsWith(valueRandom)) { 2907 opts.valueRandom = true; 2908 continue; 2909 } 2910 2911 final String valueZipf = "--valueZipf"; 2912 if (cmd.startsWith(valueZipf)) { 2913 opts.valueZipf = true; 2914 continue; 2915 } 2916 2917 final String period = "--period="; 2918 if (cmd.startsWith(period)) { 2919 opts.period = Integer.parseInt(cmd.substring(period.length())); 2920 continue; 2921 } 2922 2923 final String addColumns = "--addColumns="; 2924 if (cmd.startsWith(addColumns)) { 2925 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2926 continue; 2927 } 2928 2929 final String inMemoryCompaction = "--inmemoryCompaction="; 2930 if (cmd.startsWith(inMemoryCompaction)) { 2931 opts.inMemoryCompaction = 2932 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2933 continue; 2934 } 2935 2936 final String columns = "--columns="; 2937 if (cmd.startsWith(columns)) { 2938 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2939 continue; 2940 } 2941 2942 final String families = "--families="; 2943 if (cmd.startsWith(families)) { 2944 opts.families = Integer.parseInt(cmd.substring(families.length())); 2945 continue; 2946 } 2947 2948 final String caching = "--caching="; 2949 if (cmd.startsWith(caching)) { 2950 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2951 continue; 2952 } 2953 2954 final String asyncPrefetch = "--asyncPrefetch"; 2955 if (cmd.startsWith(asyncPrefetch)) { 2956 opts.asyncPrefetch = true; 2957 continue; 2958 } 2959 2960 final String cacheBlocks = "--cacheBlocks="; 2961 if (cmd.startsWith(cacheBlocks)) { 2962 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2963 continue; 2964 } 2965 2966 final String scanReadType = "--scanReadType="; 2967 if (cmd.startsWith(scanReadType)) { 2968 opts.scanReadType = 2969 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2970 continue; 2971 } 2972 2973 final String bufferSize = "--bufferSize="; 2974 if (cmd.startsWith(bufferSize)) { 2975 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2976 continue; 2977 } 2978 2979 validateParsedOpts(opts); 2980 2981 if (isCommandClass(cmd)) { 2982 opts.cmdName = cmd; 2983 try { 2984 opts.numClientThreads = Integer.parseInt(args.remove()); 2985 } catch (NoSuchElementException | NumberFormatException e) { 2986 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2987 } 2988 opts = calculateRowsAndSize(opts); 2989 break; 2990 } else { 2991 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2992 } 2993 2994 // Not matching any option or command. 2995 System.err.println("Error: Wrong option or command: " + cmd); 2996 args.add(cmd); 2997 break; 2998 } 2999 return opts; 3000 } 3001 3002 /** 3003 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3004 */ 3005 private static void validateParsedOpts(TestOptions opts) { 3006 3007 if (!opts.autoFlush && opts.multiPut > 0) { 3008 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3009 } 3010 3011 if (opts.oneCon && opts.connCount > 1) { 3012 throw new IllegalArgumentException( 3013 "oneCon is set to true, " + "connCount should not bigger than 1"); 3014 } 3015 3016 if (opts.valueZipf && opts.valueRandom) { 3017 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3018 } 3019 } 3020 3021 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3022 int rowsPerGB = getRowsPerGB(opts); 3023 if ( 3024 (opts.getCmdName() != null 3025 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3026 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3027 ) { 3028 opts.totalRows = (int) opts.size * rowsPerGB; 3029 } else if (opts.size != DEFAULT_OPTS.size) { 3030 // total size in GB specified 3031 opts.totalRows = (int) opts.size * rowsPerGB; 3032 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3033 } else { 3034 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3035 opts.size = opts.totalRows / rowsPerGB; 3036 } 3037 return opts; 3038 } 3039 3040 static int getRowsPerGB(final TestOptions opts) { 3041 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3042 * opts.getColumns()); 3043 } 3044 3045 @Override 3046 public int run(String[] args) throws Exception { 3047 // Process command-line args. TODO: Better cmd-line processing 3048 // (but hopefully something not as painful as cli options). 3049 int errCode = -1; 3050 if (args.length < 1) { 3051 printUsage(); 3052 return errCode; 3053 } 3054 3055 try { 3056 LinkedList<String> argv = new LinkedList<>(); 3057 argv.addAll(Arrays.asList(args)); 3058 TestOptions opts = parseOpts(argv); 3059 3060 // args remaining, print help and exit 3061 if (!argv.isEmpty()) { 3062 errCode = 0; 3063 printUsage(); 3064 return errCode; 3065 } 3066 3067 // must run at least 1 client 3068 if (opts.numClientThreads <= 0) { 3069 throw new IllegalArgumentException("Number of clients must be > 0"); 3070 } 3071 3072 // cmdName should not be null, print help and exit 3073 if (opts.cmdName == null) { 3074 printUsage(); 3075 return errCode; 3076 } 3077 3078 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3079 if (cmdClass != null) { 3080 runTest(cmdClass, opts); 3081 errCode = 0; 3082 } 3083 3084 } catch (Exception e) { 3085 e.printStackTrace(); 3086 } 3087 3088 return errCode; 3089 } 3090 3091 private static boolean isCommandClass(String cmd) { 3092 return COMMANDS.containsKey(cmd); 3093 } 3094 3095 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3096 CmdDescriptor descriptor = COMMANDS.get(cmd); 3097 return descriptor != null ? descriptor.getCmdClass() : null; 3098 } 3099 3100 public static void main(final String[] args) throws Exception { 3101 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3102 System.exit(res); 3103 } 3104}