001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Properties; 040import java.util.Queue; 041import java.util.Random; 042import java.util.TreeMap; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048import java.util.concurrent.ThreadLocalRandom; 049import org.apache.commons.lang3.StringUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.conf.Configured; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.Append; 056import org.apache.hadoop.hbase.client.AsyncConnection; 057import org.apache.hadoop.hbase.client.AsyncTable; 058import org.apache.hadoop.hbase.client.BufferedMutator; 059import org.apache.hadoop.hbase.client.BufferedMutatorParams; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 061import org.apache.hadoop.hbase.client.Connection; 062import org.apache.hadoop.hbase.client.ConnectionFactory; 063import org.apache.hadoop.hbase.client.Consistency; 064import org.apache.hadoop.hbase.client.Delete; 065import org.apache.hadoop.hbase.client.Durability; 066import org.apache.hadoop.hbase.client.Get; 067import org.apache.hadoop.hbase.client.Increment; 068import org.apache.hadoop.hbase.client.Put; 069import org.apache.hadoop.hbase.client.RegionInfo; 070import org.apache.hadoop.hbase.client.RegionInfoBuilder; 071import org.apache.hadoop.hbase.client.RegionLocator; 072import org.apache.hadoop.hbase.client.Result; 073import org.apache.hadoop.hbase.client.ResultScanner; 074import org.apache.hadoop.hbase.client.RowMutations; 075import org.apache.hadoop.hbase.client.Scan; 076import org.apache.hadoop.hbase.client.Table; 077import org.apache.hadoop.hbase.client.TableDescriptor; 078import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 079import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 080import org.apache.hadoop.hbase.filter.BinaryComparator; 081import org.apache.hadoop.hbase.filter.Filter; 082import org.apache.hadoop.hbase.filter.FilterAllFilter; 083import org.apache.hadoop.hbase.filter.FilterList; 084import org.apache.hadoop.hbase.filter.PageFilter; 085import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 086import org.apache.hadoop.hbase.filter.WhileMatchFilter; 087import org.apache.hadoop.hbase.io.compress.Compression; 088import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 089import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 090import org.apache.hadoop.hbase.regionserver.BloomType; 091import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 092import org.apache.hadoop.hbase.trace.TraceUtil; 093import org.apache.hadoop.hbase.util.ByteArrayHashKey; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.GsonUtil; 097import org.apache.hadoop.hbase.util.Hash; 098import org.apache.hadoop.hbase.util.MurmurHash; 099import org.apache.hadoop.hbase.util.Pair; 100import org.apache.hadoop.hbase.util.RandomDistribution; 101import org.apache.hadoop.hbase.util.YammerHistogramUtils; 102import org.apache.hadoop.io.LongWritable; 103import org.apache.hadoop.io.Text; 104import org.apache.hadoop.mapreduce.Job; 105import org.apache.hadoop.mapreduce.Mapper; 106import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 108import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 109import org.apache.hadoop.util.Tool; 110import org.apache.hadoop.util.ToolRunner; 111import org.apache.yetus.audience.InterfaceAudience; 112import org.slf4j.Logger; 113import org.slf4j.LoggerFactory; 114 115import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 117import org.apache.hbase.thirdparty.com.google.gson.Gson; 118 119/** 120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 122 * etc.). Pass on the command-line which test to run and how many clients are participating in this 123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 124 * <p> 125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 127 * pages 8-10. 128 * <p> 129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 131 * about 1GB of data, unless specified otherwise. 132 */ 133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 134public class PerformanceEvaluation extends Configured implements Tool { 135 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 136 static final String RANDOM_READ = "randomRead"; 137 static final String PE_COMMAND_SHORTNAME = "pe"; 138 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 139 private static final Gson GSON = GsonUtil.createGson().create(); 140 141 public static final String TABLE_NAME = "TestTable"; 142 public static final String FAMILY_NAME_BASE = "info"; 143 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 144 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 145 public static final int DEFAULT_VALUE_LENGTH = 1000; 146 public static final int ROW_LENGTH = 26; 147 148 private static final int ONE_GB = 1024 * 1024 * 1000; 149 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 150 // TODO : should we make this configurable 151 private static final int TAG_LENGTH = 256; 152 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 153 private static final MathContext CXT = MathContext.DECIMAL64; 154 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 155 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 156 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 157 158 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 159 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 160 161 static { 162 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 163 "Run async random read test"); 164 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 165 "Run async random write test"); 166 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 167 "Run async sequential read test"); 168 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 169 "Run async sequential write test"); 170 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 171 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 172 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 173 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 174 "Run random seek and scan 100 test"); 175 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 176 "Run random seek scan with both start and stop row (max 10 rows)"); 177 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 178 "Run random seek scan with both start and stop row (max 100 rows)"); 179 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 180 "Run random seek scan with both start and stop row (max 1000 rows)"); 181 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 182 "Run random seek scan with both start and stop row (max 10000 rows)"); 183 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 184 addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test"); 185 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 186 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 187 addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete", 188 "Run sequential delete test"); 189 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 190 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 191 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 192 addCommandDescriptor(ReverseScanTest.class, "reverseScan", 193 "Run reverse scan test (read every row)"); 194 addCommandDescriptor(FilteredScanTest.class, "filterScan", 195 "Run scan test using a filter to find a specific row based on it's value " 196 + "(make sure to use --rows=20)"); 197 addCommandDescriptor(IncrementTest.class, "increment", 198 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(AppendTest.class, "append", 200 "Append on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 202 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 203 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 204 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 205 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 206 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 207 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 208 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 209 } 210 211 /** 212 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 213 * associated properties. 214 */ 215 protected static enum Counter { 216 /** elapsed time */ 217 ELAPSED_TIME, 218 /** number of rows */ 219 ROWS 220 } 221 222 protected static class RunResult implements Comparable<RunResult> { 223 public RunResult(long duration, Histogram hist) { 224 this.duration = duration; 225 this.hist = hist; 226 numbOfReplyOverThreshold = 0; 227 numOfReplyFromReplica = 0; 228 } 229 230 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 231 Histogram hist) { 232 this.duration = duration; 233 this.hist = hist; 234 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 235 this.numOfReplyFromReplica = numOfReplyFromReplica; 236 } 237 238 public final long duration; 239 public final Histogram hist; 240 public final long numbOfReplyOverThreshold; 241 public final long numOfReplyFromReplica; 242 243 @Override 244 public String toString() { 245 return Long.toString(duration); 246 } 247 248 @Override 249 public int compareTo(RunResult o) { 250 return Long.compare(this.duration, o.duration); 251 } 252 253 @Override 254 public boolean equals(Object obj) { 255 if (this == obj) { 256 return true; 257 } 258 if (obj == null || getClass() != obj.getClass()) { 259 return false; 260 } 261 return this.compareTo((RunResult) obj) == 0; 262 } 263 264 @Override 265 public int hashCode() { 266 return Long.hashCode(duration); 267 } 268 } 269 270 /** 271 * Constructor 272 * @param conf Configuration object 273 */ 274 public PerformanceEvaluation(final Configuration conf) { 275 super(conf); 276 } 277 278 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 279 String description) { 280 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 281 COMMANDS.put(name, cmdDescriptor); 282 } 283 284 /** 285 * Implementations can have their status set. 286 */ 287 interface Status { 288 /** 289 * Sets status 290 * @param msg status message 291 */ 292 void setStatus(final String msg) throws IOException; 293 } 294 295 /** 296 * MapReduce job that runs a performance evaluation client in each map task. 297 */ 298 public static class EvaluationMapTask 299 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 300 301 /** configuration parameter name that contains the command */ 302 public final static String CMD_KEY = "EvaluationMapTask.command"; 303 /** configuration parameter name that contains the PE impl */ 304 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 305 306 private Class<? extends Test> cmd; 307 308 @Override 309 protected void setup(Context context) throws IOException, InterruptedException { 310 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 311 312 // this is required so that extensions of PE are instantiated within the 313 // map reduce task... 314 Class<? extends PerformanceEvaluation> peClass = 315 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 316 try { 317 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 318 } catch (Exception e) { 319 throw new IllegalStateException("Could not instantiate PE instance", e); 320 } 321 } 322 323 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 324 try { 325 return Class.forName(className).asSubclass(type); 326 } catch (ClassNotFoundException e) { 327 throw new IllegalStateException("Could not find class for name: " + className, e); 328 } 329 } 330 331 @Override 332 protected void map(LongWritable key, Text value, final Context context) 333 throws IOException, InterruptedException { 334 335 Status status = new Status() { 336 @Override 337 public void setStatus(String msg) { 338 context.setStatus(msg); 339 } 340 }; 341 342 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 343 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 344 final Connection con = ConnectionFactory.createConnection(conf); 345 AsyncConnection asyncCon = null; 346 try { 347 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 348 } catch (ExecutionException e) { 349 throw new IOException(e); 350 } 351 352 // Evaluation task 353 RunResult result = 354 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 355 // Collect how much time the thing took. Report as map output and 356 // to the ELAPSED_TIME counter. 357 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 358 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 359 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 360 context.progress(); 361 } 362 } 363 364 /* 365 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 366 * is specified or when the existing table's region replica count doesn't match {@code 367 * opts.replicas}. 368 */ 369 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 370 TableName tableName = TableName.valueOf(opts.tableName); 371 boolean needsDelete = false, exists = admin.tableExists(tableName); 372 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 373 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 374 boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete"); 375 if (!exists && (isReadCmd || isDeleteCmd)) { 376 throw new IllegalStateException( 377 "Must specify an existing table for read commands. Run a write command first."); 378 } 379 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 380 byte[][] splits = getSplits(opts); 381 382 // recreate the table when user has requested presplit or when existing 383 // {RegionSplitPolicy,replica count} does not match requested, or when the 384 // number of column families does not match requested. 385 if ( 386 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions 387 && opts.presplitRegions != admin.getRegions(tableName).size()) 388 || (!isReadCmd && desc != null 389 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 390 || (!(isReadCmd || isDeleteCmd) && desc != null 391 && desc.getRegionReplication() != opts.replicas) 392 || (desc != null && desc.getColumnFamilyCount() != opts.families) 393 ) { 394 needsDelete = true; 395 // wait, why did it delete my table?!? 396 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 397 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 398 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 399 .add("replicas", opts.replicas).add("families", opts.families).toString()); 400 } 401 402 // remove an existing table 403 if (needsDelete) { 404 if (admin.isTableEnabled(tableName)) { 405 admin.disableTable(tableName); 406 } 407 admin.deleteTable(tableName); 408 } 409 410 // table creation is necessary 411 if (!exists || needsDelete) { 412 desc = getTableDescriptor(opts); 413 if (splits != null) { 414 if (LOG.isDebugEnabled()) { 415 for (int i = 0; i < splits.length; i++) { 416 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 417 } 418 } 419 } 420 if (splits != null) { 421 admin.createTable(desc, splits); 422 } else { 423 admin.createTable(desc); 424 } 425 LOG.info("Table " + desc + " created"); 426 } 427 return admin.tableExists(tableName); 428 } 429 430 /** 431 * Create an HTableDescriptor from provided TestOptions. 432 */ 433 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 434 TableDescriptorBuilder builder = 435 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 436 437 for (int family = 0; family < opts.families; family++) { 438 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 439 ColumnFamilyDescriptorBuilder cfBuilder = 440 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 441 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 442 cfBuilder.setCompressionType(opts.compression); 443 cfBuilder.setEncryptionType(opts.encryption); 444 cfBuilder.setBloomFilterType(opts.bloomType); 445 cfBuilder.setBlocksize(opts.blockSize); 446 if (opts.inMemoryCF) { 447 cfBuilder.setInMemory(true); 448 } 449 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 450 builder.setColumnFamily(cfBuilder.build()); 451 } 452 if (opts.replicas != DEFAULT_OPTS.replicas) { 453 builder.setRegionReplication(opts.replicas); 454 } 455 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 456 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 457 } 458 return builder.build(); 459 } 460 461 /** 462 * generates splits based on total number of rows and specified split regions 463 */ 464 protected static byte[][] getSplits(TestOptions opts) { 465 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 466 467 int numSplitPoints = opts.presplitRegions - 1; 468 byte[][] splits = new byte[numSplitPoints][]; 469 int jump = opts.totalRows / opts.presplitRegions; 470 for (int i = 0; i < numSplitPoints; i++) { 471 int rowkey = jump * (1 + i); 472 splits[i] = format(rowkey); 473 } 474 return splits; 475 } 476 477 static void setupConnectionCount(final TestOptions opts) { 478 if (opts.oneCon) { 479 opts.connCount = 1; 480 } else { 481 if (opts.connCount == -1) { 482 // set to thread number if connCount is not set 483 opts.connCount = opts.numClientThreads; 484 } 485 } 486 } 487 488 /* 489 * Run all clients in this vm each to its own thread. 490 */ 491 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 492 throws IOException, InterruptedException, ExecutionException { 493 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 494 assert cmd != null; 495 @SuppressWarnings("unchecked") 496 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 497 RunResult[] results = new RunResult[opts.numClientThreads]; 498 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 499 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 500 setupConnectionCount(opts); 501 final Connection[] cons = new Connection[opts.connCount]; 502 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 503 for (int i = 0; i < opts.connCount; i++) { 504 cons[i] = ConnectionFactory.createConnection(conf); 505 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 506 } 507 LOG 508 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 509 for (int i = 0; i < threads.length; i++) { 510 final int index = i; 511 threads[i] = pool.submit(new Callable<RunResult>() { 512 @Override 513 public RunResult call() throws Exception { 514 TestOptions threadOpts = new TestOptions(opts); 515 final Connection con = cons[index % cons.length]; 516 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 517 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 518 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 519 @Override 520 public void setStatus(final String msg) throws IOException { 521 LOG.info(msg); 522 } 523 }); 524 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 525 + "ms over " + threadOpts.perClientRunRows + " rows"); 526 if (opts.latencyThreshold > 0) { 527 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 528 + "(ms) is " + run.numbOfReplyOverThreshold); 529 } 530 return run; 531 } 532 }); 533 } 534 pool.shutdown(); 535 536 for (int i = 0; i < threads.length; i++) { 537 try { 538 results[i] = threads[i].get(); 539 } catch (ExecutionException e) { 540 throw new IOException(e.getCause()); 541 } 542 } 543 final String test = cmd.getSimpleName(); 544 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 545 Arrays.sort(results); 546 long total = 0; 547 float avgLatency = 0; 548 float avgTPS = 0; 549 long replicaWins = 0; 550 for (RunResult result : results) { 551 total += result.duration; 552 avgLatency += result.hist.getSnapshot().getMean(); 553 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 554 replicaWins += result.numOfReplyFromReplica; 555 } 556 avgTPS *= 1000; // ms to second 557 avgLatency = avgLatency / results.length; 558 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 559 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 560 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 561 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 562 if (opts.replicas > 1) { 563 LOG.info("[results from replica regions] " + replicaWins); 564 } 565 566 for (int i = 0; i < opts.connCount; i++) { 567 cons[i].close(); 568 asyncCons[i].close(); 569 } 570 571 return results; 572 } 573 574 /* 575 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 576 * out an input file with instruction per client regards which row they are to start on. 577 * @param cmd Command to run. 578 */ 579 static Job doMapReduce(TestOptions opts, final Configuration conf) 580 throws IOException, InterruptedException, ClassNotFoundException { 581 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 582 assert cmd != null; 583 Path inputDir = writeInputFile(conf, opts); 584 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 585 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 586 Job job = Job.getInstance(conf); 587 job.setJarByClass(PerformanceEvaluation.class); 588 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 589 590 job.setInputFormatClass(NLineInputFormat.class); 591 NLineInputFormat.setInputPaths(job, inputDir); 592 // this is default, but be explicit about it just in case. 593 NLineInputFormat.setNumLinesPerSplit(job, 1); 594 595 job.setOutputKeyClass(LongWritable.class); 596 job.setOutputValueClass(LongWritable.class); 597 598 job.setMapperClass(EvaluationMapTask.class); 599 job.setReducerClass(LongSumReducer.class); 600 601 job.setNumReduceTasks(1); 602 603 job.setOutputFormatClass(TextOutputFormat.class); 604 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 605 606 TableMapReduceUtil.addDependencyJars(job); 607 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 608 // metrics 609 Gson.class, // gson 610 FilterAllFilter.class // hbase-server tests jar 611 ); 612 613 TableMapReduceUtil.initCredentials(job); 614 615 job.waitForCompletion(true); 616 return job; 617 } 618 619 /** 620 * Each client has one mapper to do the work, and client do the resulting count in a map task. 621 */ 622 623 static String JOB_INPUT_FILENAME = "input.txt"; 624 625 /* 626 * Write input file of offsets-per-client for the mapreduce job. 627 * @param c Configuration 628 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 629 */ 630 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 631 return writeInputFile(c, opts, new Path(".")); 632 } 633 634 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 635 throws IOException { 636 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 637 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 638 Path inputDir = new Path(jobdir, "inputs"); 639 640 FileSystem fs = FileSystem.get(c); 641 fs.mkdirs(inputDir); 642 643 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 644 PrintStream out = new PrintStream(fs.create(inputFile)); 645 // Make input random. 646 Map<Integer, String> m = new TreeMap<>(); 647 Hash h = MurmurHash.getInstance(); 648 int perClientRows = (opts.totalRows / opts.numClientThreads); 649 try { 650 for (int j = 0; j < opts.numClientThreads; j++) { 651 TestOptions next = new TestOptions(opts); 652 next.startRow = j * perClientRows; 653 next.perClientRunRows = perClientRows; 654 String s = GSON.toJson(next); 655 LOG.info("Client=" + j + ", input=" + s); 656 byte[] b = Bytes.toBytes(s); 657 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 658 m.put(hash, s); 659 } 660 for (Map.Entry<Integer, String> e : m.entrySet()) { 661 out.println(e.getValue()); 662 } 663 } finally { 664 out.close(); 665 } 666 return inputDir; 667 } 668 669 /** 670 * Describes a command. 671 */ 672 static class CmdDescriptor { 673 private Class<? extends TestBase> cmdClass; 674 private String name; 675 private String description; 676 677 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 678 this.cmdClass = cmdClass; 679 this.name = name; 680 this.description = description; 681 } 682 683 public Class<? extends TestBase> getCmdClass() { 684 return cmdClass; 685 } 686 687 public String getName() { 688 return name; 689 } 690 691 public String getDescription() { 692 return description; 693 } 694 } 695 696 /** 697 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 698 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 699 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 700 * need to add to the clone constructor below copying your new option from the 'that' to the 701 * 'this'. Look for 'clone' below. 702 */ 703 static class TestOptions { 704 String cmdName = null; 705 boolean nomapred = false; 706 boolean filterAll = false; 707 int startRow = 0; 708 float size = 1.0f; 709 int perClientRunRows = DEFAULT_ROWS_PER_GB; 710 int numClientThreads = 1; 711 int totalRows = DEFAULT_ROWS_PER_GB; 712 int measureAfter = 0; 713 float sampleRate = 1.0f; 714 /** 715 * @deprecated Useless after switching to OpenTelemetry 716 */ 717 @Deprecated 718 double traceRate = 0.0; 719 String tableName = TABLE_NAME; 720 boolean flushCommits = true; 721 boolean writeToWAL = true; 722 boolean autoFlush = false; 723 boolean oneCon = false; 724 int connCount = -1; // wil decide the actual num later 725 boolean useTags = false; 726 int noOfTags = 1; 727 boolean reportLatency = false; 728 int multiGet = 0; 729 int multiPut = 0; 730 int randomSleep = 0; 731 boolean inMemoryCF = false; 732 int presplitRegions = 0; 733 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 734 String splitPolicy = null; 735 Compression.Algorithm compression = Compression.Algorithm.NONE; 736 String encryption = null; 737 BloomType bloomType = BloomType.ROW; 738 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 739 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 740 boolean valueRandom = false; 741 boolean valueZipf = false; 742 int valueSize = DEFAULT_VALUE_LENGTH; 743 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 744 int cycles = 1; 745 int columns = 1; 746 int families = 1; 747 int caching = 30; 748 int latencyThreshold = 0; // in millsecond 749 boolean addColumns = true; 750 MemoryCompactionPolicy inMemoryCompaction = 751 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 752 boolean asyncPrefetch = false; 753 boolean cacheBlocks = true; 754 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 755 long bufferSize = 2l * 1024l * 1024l; 756 Properties commandProperties; 757 758 public TestOptions() { 759 } 760 761 /** 762 * Clone constructor. 763 * @param that Object to copy from. 764 */ 765 public TestOptions(TestOptions that) { 766 this.cmdName = that.cmdName; 767 this.cycles = that.cycles; 768 this.nomapred = that.nomapred; 769 this.startRow = that.startRow; 770 this.size = that.size; 771 this.perClientRunRows = that.perClientRunRows; 772 this.numClientThreads = that.numClientThreads; 773 this.totalRows = that.totalRows; 774 this.sampleRate = that.sampleRate; 775 this.traceRate = that.traceRate; 776 this.tableName = that.tableName; 777 this.flushCommits = that.flushCommits; 778 this.writeToWAL = that.writeToWAL; 779 this.autoFlush = that.autoFlush; 780 this.oneCon = that.oneCon; 781 this.connCount = that.connCount; 782 this.useTags = that.useTags; 783 this.noOfTags = that.noOfTags; 784 this.reportLatency = that.reportLatency; 785 this.latencyThreshold = that.latencyThreshold; 786 this.multiGet = that.multiGet; 787 this.multiPut = that.multiPut; 788 this.inMemoryCF = that.inMemoryCF; 789 this.presplitRegions = that.presplitRegions; 790 this.replicas = that.replicas; 791 this.splitPolicy = that.splitPolicy; 792 this.compression = that.compression; 793 this.encryption = that.encryption; 794 this.blockEncoding = that.blockEncoding; 795 this.filterAll = that.filterAll; 796 this.bloomType = that.bloomType; 797 this.blockSize = that.blockSize; 798 this.valueRandom = that.valueRandom; 799 this.valueZipf = that.valueZipf; 800 this.valueSize = that.valueSize; 801 this.period = that.period; 802 this.randomSleep = that.randomSleep; 803 this.measureAfter = that.measureAfter; 804 this.addColumns = that.addColumns; 805 this.columns = that.columns; 806 this.families = that.families; 807 this.caching = that.caching; 808 this.inMemoryCompaction = that.inMemoryCompaction; 809 this.asyncPrefetch = that.asyncPrefetch; 810 this.cacheBlocks = that.cacheBlocks; 811 this.scanReadType = that.scanReadType; 812 this.bufferSize = that.bufferSize; 813 this.commandProperties = that.commandProperties; 814 } 815 816 public Properties getCommandProperties() { 817 return commandProperties; 818 } 819 820 public int getCaching() { 821 return this.caching; 822 } 823 824 public void setCaching(final int caching) { 825 this.caching = caching; 826 } 827 828 public int getColumns() { 829 return this.columns; 830 } 831 832 public void setColumns(final int columns) { 833 this.columns = columns; 834 } 835 836 public int getFamilies() { 837 return this.families; 838 } 839 840 public void setFamilies(final int families) { 841 this.families = families; 842 } 843 844 public int getCycles() { 845 return this.cycles; 846 } 847 848 public void setCycles(final int cycles) { 849 this.cycles = cycles; 850 } 851 852 public boolean isValueZipf() { 853 return valueZipf; 854 } 855 856 public void setValueZipf(boolean valueZipf) { 857 this.valueZipf = valueZipf; 858 } 859 860 public String getCmdName() { 861 return cmdName; 862 } 863 864 public void setCmdName(String cmdName) { 865 this.cmdName = cmdName; 866 } 867 868 public int getRandomSleep() { 869 return randomSleep; 870 } 871 872 public void setRandomSleep(int randomSleep) { 873 this.randomSleep = randomSleep; 874 } 875 876 public int getReplicas() { 877 return replicas; 878 } 879 880 public void setReplicas(int replicas) { 881 this.replicas = replicas; 882 } 883 884 public String getSplitPolicy() { 885 return splitPolicy; 886 } 887 888 public void setSplitPolicy(String splitPolicy) { 889 this.splitPolicy = splitPolicy; 890 } 891 892 public void setNomapred(boolean nomapred) { 893 this.nomapred = nomapred; 894 } 895 896 public void setFilterAll(boolean filterAll) { 897 this.filterAll = filterAll; 898 } 899 900 public void setStartRow(int startRow) { 901 this.startRow = startRow; 902 } 903 904 public void setSize(float size) { 905 this.size = size; 906 } 907 908 public void setPerClientRunRows(int perClientRunRows) { 909 this.perClientRunRows = perClientRunRows; 910 } 911 912 public void setNumClientThreads(int numClientThreads) { 913 this.numClientThreads = numClientThreads; 914 } 915 916 public void setTotalRows(int totalRows) { 917 this.totalRows = totalRows; 918 } 919 920 public void setSampleRate(float sampleRate) { 921 this.sampleRate = sampleRate; 922 } 923 924 public void setTraceRate(double traceRate) { 925 this.traceRate = traceRate; 926 } 927 928 public void setTableName(String tableName) { 929 this.tableName = tableName; 930 } 931 932 public void setFlushCommits(boolean flushCommits) { 933 this.flushCommits = flushCommits; 934 } 935 936 public void setWriteToWAL(boolean writeToWAL) { 937 this.writeToWAL = writeToWAL; 938 } 939 940 public void setAutoFlush(boolean autoFlush) { 941 this.autoFlush = autoFlush; 942 } 943 944 public void setOneCon(boolean oneCon) { 945 this.oneCon = oneCon; 946 } 947 948 public int getConnCount() { 949 return connCount; 950 } 951 952 public void setConnCount(int connCount) { 953 this.connCount = connCount; 954 } 955 956 public void setUseTags(boolean useTags) { 957 this.useTags = useTags; 958 } 959 960 public void setNoOfTags(int noOfTags) { 961 this.noOfTags = noOfTags; 962 } 963 964 public void setReportLatency(boolean reportLatency) { 965 this.reportLatency = reportLatency; 966 } 967 968 public void setMultiGet(int multiGet) { 969 this.multiGet = multiGet; 970 } 971 972 public void setMultiPut(int multiPut) { 973 this.multiPut = multiPut; 974 } 975 976 public void setInMemoryCF(boolean inMemoryCF) { 977 this.inMemoryCF = inMemoryCF; 978 } 979 980 public void setPresplitRegions(int presplitRegions) { 981 this.presplitRegions = presplitRegions; 982 } 983 984 public void setCompression(Compression.Algorithm compression) { 985 this.compression = compression; 986 } 987 988 public void setEncryption(String encryption) { 989 this.encryption = encryption; 990 } 991 992 public void setBloomType(BloomType bloomType) { 993 this.bloomType = bloomType; 994 } 995 996 public void setBlockSize(int blockSize) { 997 this.blockSize = blockSize; 998 } 999 1000 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 1001 this.blockEncoding = blockEncoding; 1002 } 1003 1004 public void setValueRandom(boolean valueRandom) { 1005 this.valueRandom = valueRandom; 1006 } 1007 1008 public void setValueSize(int valueSize) { 1009 this.valueSize = valueSize; 1010 } 1011 1012 public void setBufferSize(long bufferSize) { 1013 this.bufferSize = bufferSize; 1014 } 1015 1016 public void setPeriod(int period) { 1017 this.period = period; 1018 } 1019 1020 public boolean isNomapred() { 1021 return nomapred; 1022 } 1023 1024 public boolean isFilterAll() { 1025 return filterAll; 1026 } 1027 1028 public int getStartRow() { 1029 return startRow; 1030 } 1031 1032 public float getSize() { 1033 return size; 1034 } 1035 1036 public int getPerClientRunRows() { 1037 return perClientRunRows; 1038 } 1039 1040 public int getNumClientThreads() { 1041 return numClientThreads; 1042 } 1043 1044 public int getTotalRows() { 1045 return totalRows; 1046 } 1047 1048 public float getSampleRate() { 1049 return sampleRate; 1050 } 1051 1052 public double getTraceRate() { 1053 return traceRate; 1054 } 1055 1056 public String getTableName() { 1057 return tableName; 1058 } 1059 1060 public boolean isFlushCommits() { 1061 return flushCommits; 1062 } 1063 1064 public boolean isWriteToWAL() { 1065 return writeToWAL; 1066 } 1067 1068 public boolean isAutoFlush() { 1069 return autoFlush; 1070 } 1071 1072 public boolean isUseTags() { 1073 return useTags; 1074 } 1075 1076 public int getNoOfTags() { 1077 return noOfTags; 1078 } 1079 1080 public boolean isReportLatency() { 1081 return reportLatency; 1082 } 1083 1084 public int getMultiGet() { 1085 return multiGet; 1086 } 1087 1088 public int getMultiPut() { 1089 return multiPut; 1090 } 1091 1092 public boolean isInMemoryCF() { 1093 return inMemoryCF; 1094 } 1095 1096 public int getPresplitRegions() { 1097 return presplitRegions; 1098 } 1099 1100 public Compression.Algorithm getCompression() { 1101 return compression; 1102 } 1103 1104 public String getEncryption() { 1105 return encryption; 1106 } 1107 1108 public DataBlockEncoding getBlockEncoding() { 1109 return blockEncoding; 1110 } 1111 1112 public boolean isValueRandom() { 1113 return valueRandom; 1114 } 1115 1116 public int getValueSize() { 1117 return valueSize; 1118 } 1119 1120 public int getPeriod() { 1121 return period; 1122 } 1123 1124 public BloomType getBloomType() { 1125 return bloomType; 1126 } 1127 1128 public int getBlockSize() { 1129 return blockSize; 1130 } 1131 1132 public boolean isOneCon() { 1133 return oneCon; 1134 } 1135 1136 public int getMeasureAfter() { 1137 return measureAfter; 1138 } 1139 1140 public void setMeasureAfter(int measureAfter) { 1141 this.measureAfter = measureAfter; 1142 } 1143 1144 public boolean getAddColumns() { 1145 return addColumns; 1146 } 1147 1148 public void setAddColumns(boolean addColumns) { 1149 this.addColumns = addColumns; 1150 } 1151 1152 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1153 this.inMemoryCompaction = inMemoryCompaction; 1154 } 1155 1156 public MemoryCompactionPolicy getInMemoryCompaction() { 1157 return this.inMemoryCompaction; 1158 } 1159 1160 public long getBufferSize() { 1161 return this.bufferSize; 1162 } 1163 } 1164 1165 /* 1166 * A test. Subclass to particularize what happens per row. 1167 */ 1168 static abstract class TestBase { 1169 // Below is make it so when Tests are all running in the one 1170 // jvm, that they each have a differently seeded Random. 1171 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1172 1173 private static long nextRandomSeed() { 1174 return randomSeed.nextLong(); 1175 } 1176 1177 private final int everyN; 1178 1179 protected final Random rand = new Random(nextRandomSeed()); 1180 protected final Configuration conf; 1181 protected final TestOptions opts; 1182 1183 protected final Status status; 1184 1185 private String testName; 1186 protected Histogram latencyHistogram; 1187 private Histogram replicaLatencyHistogram; 1188 private Histogram valueSizeHistogram; 1189 private Histogram rpcCallsHistogram; 1190 private Histogram remoteRpcCallsHistogram; 1191 private Histogram millisBetweenNextHistogram; 1192 private Histogram regionsScannedHistogram; 1193 private Histogram bytesInResultsHistogram; 1194 private Histogram bytesInRemoteResultsHistogram; 1195 private RandomDistribution.Zipf zipf; 1196 private long numOfReplyOverLatencyThreshold = 0; 1197 private long numOfReplyFromReplica = 0; 1198 1199 /** 1200 * Note that all subclasses of this class must provide a public constructor that has the exact 1201 * same list of arguments. 1202 */ 1203 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1204 this.conf = conf; 1205 this.opts = options; 1206 this.status = status; 1207 this.testName = this.getClass().getSimpleName(); 1208 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1209 if (options.isValueZipf()) { 1210 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1211 } 1212 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1213 } 1214 1215 int getValueLength(final Random r) { 1216 if (this.opts.isValueRandom()) { 1217 return r.nextInt(opts.valueSize); 1218 } else if (this.opts.isValueZipf()) { 1219 return Math.abs(this.zipf.nextInt()); 1220 } else { 1221 return opts.valueSize; 1222 } 1223 } 1224 1225 void updateValueSize(final Result[] rs) throws IOException { 1226 updateValueSize(rs, 0); 1227 } 1228 1229 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1230 if (rs == null || (latency == 0)) return; 1231 for (Result r : rs) 1232 updateValueSize(r, latency); 1233 } 1234 1235 void updateValueSize(final Result r) throws IOException { 1236 updateValueSize(r, 0); 1237 } 1238 1239 void updateValueSize(final Result r, final long latency) throws IOException { 1240 if (r == null || (latency == 0)) return; 1241 int size = 0; 1242 // update replicaHistogram 1243 if (r.isStale()) { 1244 replicaLatencyHistogram.update(latency / 1000); 1245 numOfReplyFromReplica++; 1246 } 1247 if (!isRandomValueSize()) return; 1248 1249 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1250 size += scanner.current().getValueLength(); 1251 } 1252 updateValueSize(size); 1253 } 1254 1255 void updateValueSize(final int valueSize) { 1256 if (!isRandomValueSize()) return; 1257 this.valueSizeHistogram.update(valueSize); 1258 } 1259 1260 void updateScanMetrics(final ScanMetrics metrics) { 1261 if (metrics == null) return; 1262 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1263 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1264 if (rpcCalls != null) { 1265 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1266 } 1267 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1268 if (remoteRpcCalls != null) { 1269 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1270 } 1271 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1272 if (millisBetweenNext != null) { 1273 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1274 } 1275 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1276 if (regionsScanned != null) { 1277 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1278 } 1279 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1280 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1281 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1282 } 1283 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1284 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1285 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1286 } 1287 } 1288 1289 String generateStatus(final int sr, final int i, final int lr) { 1290 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1291 + getShortLatencyReport() + "]" 1292 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1293 } 1294 1295 boolean isRandomValueSize() { 1296 return opts.valueRandom; 1297 } 1298 1299 protected int getReportingPeriod() { 1300 return opts.period; 1301 } 1302 1303 /** 1304 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1305 */ 1306 public Histogram getLatencyHistogram() { 1307 return latencyHistogram; 1308 } 1309 1310 void testSetup() throws IOException { 1311 // test metrics 1312 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1313 // If it is a replica test, set up histogram for replica. 1314 if (opts.replicas > 1) { 1315 replicaLatencyHistogram = 1316 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1317 } 1318 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1319 // scan metrics 1320 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1321 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1322 millisBetweenNextHistogram = 1323 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1324 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1325 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1326 bytesInRemoteResultsHistogram = 1327 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1328 1329 onStartup(); 1330 } 1331 1332 abstract void onStartup() throws IOException; 1333 1334 void testTakedown() throws IOException { 1335 onTakedown(); 1336 // Print all stats for this thread continuously. 1337 // Synchronize on Test.class so different threads don't intermingle the 1338 // output. We can't use 'this' here because each thread has its own instance of Test class. 1339 synchronized (Test.class) { 1340 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1341 status 1342 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1343 if (opts.replicas > 1) { 1344 status.setStatus("Latency (us) from Replica Regions: " 1345 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1346 } 1347 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1348 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1349 if (valueSizeHistogram.getCount() > 0) { 1350 status.setStatus( 1351 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1352 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1353 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1354 } else { 1355 status.setStatus("No valueSize statistics available"); 1356 } 1357 if (rpcCallsHistogram.getCount() > 0) { 1358 status.setStatus( 1359 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1360 } 1361 if (remoteRpcCallsHistogram.getCount() > 0) { 1362 status.setStatus("remoteRpcCalls (count): " 1363 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1364 } 1365 if (millisBetweenNextHistogram.getCount() > 0) { 1366 status.setStatus("millisBetweenNext (latency): " 1367 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1368 } 1369 if (regionsScannedHistogram.getCount() > 0) { 1370 status.setStatus("regionsScanned (count): " 1371 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1372 } 1373 if (bytesInResultsHistogram.getCount() > 0) { 1374 status.setStatus("bytesInResults (size): " 1375 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1376 } 1377 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1378 status.setStatus("bytesInRemoteResults (size): " 1379 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1380 } 1381 } 1382 } 1383 1384 abstract void onTakedown() throws IOException; 1385 1386 /* 1387 * Run test 1388 * @return Elapsed time. 1389 */ 1390 long test() throws IOException, InterruptedException { 1391 testSetup(); 1392 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1393 final long startTime = System.nanoTime(); 1394 try { 1395 testTimed(); 1396 } finally { 1397 testTakedown(); 1398 } 1399 return (System.nanoTime() - startTime) / 1000000; 1400 } 1401 1402 int getStartRow() { 1403 return opts.startRow; 1404 } 1405 1406 int getLastRow() { 1407 return getStartRow() + opts.perClientRunRows; 1408 } 1409 1410 /** 1411 * Provides an extension point for tests that don't want a per row invocation. 1412 */ 1413 void testTimed() throws IOException, InterruptedException { 1414 int startRow = getStartRow(); 1415 int lastRow = getLastRow(); 1416 // Report on completion of 1/10th of total. 1417 for (int ii = 0; ii < opts.cycles; ii++) { 1418 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1419 for (int i = startRow; i < lastRow; i++) { 1420 if (i % everyN != 0) continue; 1421 long startTime = System.nanoTime(); 1422 boolean requestSent = false; 1423 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1424 try (Scope scope = span.makeCurrent()) { 1425 requestSent = testRow(i, startTime); 1426 } finally { 1427 span.end(); 1428 } 1429 if ((i - startRow) > opts.measureAfter) { 1430 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1431 // first 9 times and sends the actual get request in the 10th iteration. 1432 // We should only set latency when actual request is sent because otherwise 1433 // it turns out to be 0. 1434 if (requestSent) { 1435 long latency = (System.nanoTime() - startTime) / 1000; 1436 latencyHistogram.update(latency); 1437 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1438 numOfReplyOverLatencyThreshold++; 1439 } 1440 } 1441 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1442 status.setStatus(generateStatus(startRow, i, lastRow)); 1443 } 1444 } 1445 } 1446 } 1447 } 1448 1449 /** Returns Subset of the histograms' calculation. */ 1450 public String getShortLatencyReport() { 1451 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1452 } 1453 1454 /** Returns Subset of the histograms' calculation. */ 1455 public String getShortValueSizeReport() { 1456 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1457 } 1458 1459 /** 1460 * Test for individual row. 1461 * @param i Row index. 1462 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1463 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1464 */ 1465 abstract boolean testRow(final int i, final long startTime) 1466 throws IOException, InterruptedException; 1467 } 1468 1469 static abstract class Test extends TestBase { 1470 protected Connection connection; 1471 1472 Test(final Connection con, final TestOptions options, final Status status) { 1473 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1474 this.connection = con; 1475 } 1476 } 1477 1478 static abstract class AsyncTest extends TestBase { 1479 protected AsyncConnection connection; 1480 1481 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1482 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1483 this.connection = con; 1484 } 1485 } 1486 1487 static abstract class TableTest extends Test { 1488 protected Table table; 1489 1490 TableTest(Connection con, TestOptions options, Status status) { 1491 super(con, options, status); 1492 } 1493 1494 @Override 1495 void onStartup() throws IOException { 1496 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1497 } 1498 1499 @Override 1500 void onTakedown() throws IOException { 1501 table.close(); 1502 } 1503 } 1504 1505 /* 1506 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1507 */ 1508 static abstract class MetaTest extends TableTest { 1509 protected int keyLength; 1510 1511 MetaTest(Connection con, TestOptions options, Status status) { 1512 super(con, options, status); 1513 keyLength = Integer.toString(opts.perClientRunRows).length(); 1514 } 1515 1516 @Override 1517 void onTakedown() throws IOException { 1518 // No clean up 1519 } 1520 1521 /* 1522 * Generates Lexicographically ascending strings 1523 */ 1524 protected byte[] getSplitKey(final int i) { 1525 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1526 } 1527 1528 } 1529 1530 static abstract class AsyncTableTest extends AsyncTest { 1531 protected AsyncTable<?> table; 1532 1533 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1534 super(con, options, status); 1535 } 1536 1537 @Override 1538 void onStartup() throws IOException { 1539 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1540 } 1541 1542 @Override 1543 void onTakedown() throws IOException { 1544 } 1545 } 1546 1547 static class AsyncRandomReadTest extends AsyncTableTest { 1548 private final Consistency consistency; 1549 private ArrayList<Get> gets; 1550 1551 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1552 super(con, options, status); 1553 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1554 if (opts.multiGet > 0) { 1555 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1556 this.gets = new ArrayList<>(opts.multiGet); 1557 } 1558 } 1559 1560 @Override 1561 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1562 if (opts.randomSleep > 0) { 1563 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1564 } 1565 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1566 for (int family = 0; family < opts.families; family++) { 1567 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1568 if (opts.addColumns) { 1569 for (int column = 0; column < opts.columns; column++) { 1570 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1571 get.addColumn(familyName, qualifier); 1572 } 1573 } else { 1574 get.addFamily(familyName); 1575 } 1576 } 1577 if (opts.filterAll) { 1578 get.setFilter(new FilterAllFilter()); 1579 } 1580 get.setConsistency(consistency); 1581 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1582 try { 1583 if (opts.multiGet > 0) { 1584 this.gets.add(get); 1585 if (this.gets.size() == opts.multiGet) { 1586 Result[] rs = 1587 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1588 updateValueSize(rs); 1589 this.gets.clear(); 1590 } else { 1591 return false; 1592 } 1593 } else { 1594 updateValueSize(this.table.get(get).get()); 1595 } 1596 } catch (ExecutionException e) { 1597 throw new IOException(e); 1598 } 1599 return true; 1600 } 1601 1602 public static RuntimeException runtime(Throwable e) { 1603 if (e instanceof RuntimeException) { 1604 return (RuntimeException) e; 1605 } 1606 return new RuntimeException(e); 1607 } 1608 1609 public static <V> V propagate(Callable<V> callable) { 1610 try { 1611 return callable.call(); 1612 } catch (Exception e) { 1613 throw runtime(e); 1614 } 1615 } 1616 1617 @Override 1618 protected int getReportingPeriod() { 1619 int period = opts.perClientRunRows / 10; 1620 return period == 0 ? opts.perClientRunRows : period; 1621 } 1622 1623 @Override 1624 protected void testTakedown() throws IOException { 1625 if (this.gets != null && this.gets.size() > 0) { 1626 this.table.get(gets); 1627 this.gets.clear(); 1628 } 1629 super.testTakedown(); 1630 } 1631 } 1632 1633 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1634 1635 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1636 super(con, options, status); 1637 } 1638 1639 @Override 1640 protected byte[] generateRow(final int i) { 1641 return getRandomRow(this.rand, opts.totalRows); 1642 } 1643 } 1644 1645 static class AsyncScanTest extends AsyncTableTest { 1646 private ResultScanner testScanner; 1647 private AsyncTable<?> asyncTable; 1648 1649 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1650 super(con, options, status); 1651 } 1652 1653 @Override 1654 void onStartup() throws IOException { 1655 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1656 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1657 } 1658 1659 @Override 1660 void testTakedown() throws IOException { 1661 if (this.testScanner != null) { 1662 updateScanMetrics(this.testScanner.getScanMetrics()); 1663 this.testScanner.close(); 1664 } 1665 super.testTakedown(); 1666 } 1667 1668 @Override 1669 boolean testRow(final int i, final long startTime) throws IOException { 1670 if (this.testScanner == null) { 1671 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1672 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1673 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1674 for (int family = 0; family < opts.families; family++) { 1675 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1676 if (opts.addColumns) { 1677 for (int column = 0; column < opts.columns; column++) { 1678 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1679 scan.addColumn(familyName, qualifier); 1680 } 1681 } else { 1682 scan.addFamily(familyName); 1683 } 1684 } 1685 if (opts.filterAll) { 1686 scan.setFilter(new FilterAllFilter()); 1687 } 1688 this.testScanner = asyncTable.getScanner(scan); 1689 } 1690 Result r = testScanner.next(); 1691 updateValueSize(r); 1692 return true; 1693 } 1694 } 1695 1696 static class AsyncSequentialReadTest extends AsyncTableTest { 1697 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1698 super(con, options, status); 1699 } 1700 1701 @Override 1702 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1703 Get get = new Get(format(i)); 1704 for (int family = 0; family < opts.families; family++) { 1705 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1706 if (opts.addColumns) { 1707 for (int column = 0; column < opts.columns; column++) { 1708 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1709 get.addColumn(familyName, qualifier); 1710 } 1711 } else { 1712 get.addFamily(familyName); 1713 } 1714 } 1715 if (opts.filterAll) { 1716 get.setFilter(new FilterAllFilter()); 1717 } 1718 try { 1719 updateValueSize(table.get(get).get()); 1720 } catch (ExecutionException e) { 1721 throw new IOException(e); 1722 } 1723 return true; 1724 } 1725 } 1726 1727 static class AsyncSequentialWriteTest extends AsyncTableTest { 1728 private ArrayList<Put> puts; 1729 1730 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1731 super(con, options, status); 1732 if (opts.multiPut > 0) { 1733 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1734 this.puts = new ArrayList<>(opts.multiPut); 1735 } 1736 } 1737 1738 protected byte[] generateRow(final int i) { 1739 return format(i); 1740 } 1741 1742 @Override 1743 @SuppressWarnings("ReturnValueIgnored") 1744 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1745 byte[] row = generateRow(i); 1746 Put put = new Put(row); 1747 for (int family = 0; family < opts.families; family++) { 1748 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1749 for (int column = 0; column < opts.columns; column++) { 1750 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1751 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1752 if (opts.useTags) { 1753 byte[] tag = generateData(this.rand, TAG_LENGTH); 1754 Tag[] tags = new Tag[opts.noOfTags]; 1755 for (int n = 0; n < opts.noOfTags; n++) { 1756 Tag t = new ArrayBackedTag((byte) n, tag); 1757 tags[n] = t; 1758 } 1759 KeyValue kv = 1760 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1761 put.add(kv); 1762 updateValueSize(kv.getValueLength()); 1763 } else { 1764 put.addColumn(familyName, qualifier, value); 1765 updateValueSize(value.length); 1766 } 1767 } 1768 } 1769 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1770 try { 1771 table.put(put).get(); 1772 if (opts.multiPut > 0) { 1773 this.puts.add(put); 1774 if (this.puts.size() == opts.multiPut) { 1775 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1776 this.puts.clear(); 1777 } else { 1778 return false; 1779 } 1780 } else { 1781 table.put(put).get(); 1782 } 1783 } catch (ExecutionException e) { 1784 throw new IOException(e); 1785 } 1786 return true; 1787 } 1788 } 1789 1790 static abstract class BufferedMutatorTest extends Test { 1791 protected BufferedMutator mutator; 1792 protected Table table; 1793 1794 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1795 super(con, options, status); 1796 } 1797 1798 @Override 1799 void onStartup() throws IOException { 1800 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1801 p.writeBufferSize(opts.bufferSize); 1802 this.mutator = connection.getBufferedMutator(p); 1803 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1804 } 1805 1806 @Override 1807 void onTakedown() throws IOException { 1808 mutator.close(); 1809 table.close(); 1810 } 1811 } 1812 1813 static class RandomSeekScanTest extends TableTest { 1814 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1815 super(con, options, status); 1816 } 1817 1818 @Override 1819 boolean testRow(final int i, final long startTime) throws IOException { 1820 Scan scan = 1821 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1822 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1823 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1824 FilterList list = new FilterList(); 1825 for (int family = 0; family < opts.families; family++) { 1826 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1827 if (opts.addColumns) { 1828 for (int column = 0; column < opts.columns; column++) { 1829 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1830 scan.addColumn(familyName, qualifier); 1831 } 1832 } else { 1833 scan.addFamily(familyName); 1834 } 1835 } 1836 if (opts.filterAll) { 1837 list.addFilter(new FilterAllFilter()); 1838 } 1839 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1840 scan.setFilter(list); 1841 ResultScanner s = this.table.getScanner(scan); 1842 try { 1843 for (Result rr; (rr = s.next()) != null;) { 1844 updateValueSize(rr); 1845 } 1846 } finally { 1847 updateScanMetrics(s.getScanMetrics()); 1848 s.close(); 1849 } 1850 return true; 1851 } 1852 1853 @Override 1854 protected int getReportingPeriod() { 1855 int period = opts.perClientRunRows / 100; 1856 return period == 0 ? opts.perClientRunRows : period; 1857 } 1858 1859 } 1860 1861 static abstract class RandomScanWithRangeTest extends TableTest { 1862 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1863 super(con, options, status); 1864 } 1865 1866 @Override 1867 boolean testRow(final int i, final long startTime) throws IOException { 1868 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1869 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1870 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1871 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1872 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1873 for (int family = 0; family < opts.families; family++) { 1874 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1875 if (opts.addColumns) { 1876 for (int column = 0; column < opts.columns; column++) { 1877 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1878 scan.addColumn(familyName, qualifier); 1879 } 1880 } else { 1881 scan.addFamily(familyName); 1882 } 1883 } 1884 if (opts.filterAll) { 1885 scan.setFilter(new FilterAllFilter()); 1886 } 1887 Result r = null; 1888 int count = 0; 1889 ResultScanner s = this.table.getScanner(scan); 1890 try { 1891 for (; (r = s.next()) != null;) { 1892 updateValueSize(r); 1893 count++; 1894 } 1895 if (i % 100 == 0) { 1896 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1897 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1898 count)); 1899 } 1900 } finally { 1901 updateScanMetrics(s.getScanMetrics()); 1902 s.close(); 1903 } 1904 return true; 1905 } 1906 1907 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1908 1909 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1910 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1911 int stop = start + maxRange; 1912 return new Pair<>(format(start), format(stop)); 1913 } 1914 1915 @Override 1916 protected int getReportingPeriod() { 1917 int period = opts.perClientRunRows / 100; 1918 return period == 0 ? opts.perClientRunRows : period; 1919 } 1920 } 1921 1922 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1923 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1924 super(con, options, status); 1925 } 1926 1927 @Override 1928 protected Pair<byte[], byte[]> getStartAndStopRow() { 1929 return generateStartAndStopRows(10); 1930 } 1931 } 1932 1933 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1934 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1935 super(con, options, status); 1936 } 1937 1938 @Override 1939 protected Pair<byte[], byte[]> getStartAndStopRow() { 1940 return generateStartAndStopRows(100); 1941 } 1942 } 1943 1944 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1945 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1946 super(con, options, status); 1947 } 1948 1949 @Override 1950 protected Pair<byte[], byte[]> getStartAndStopRow() { 1951 return generateStartAndStopRows(1000); 1952 } 1953 } 1954 1955 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1956 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1957 super(con, options, status); 1958 } 1959 1960 @Override 1961 protected Pair<byte[], byte[]> getStartAndStopRow() { 1962 return generateStartAndStopRows(10000); 1963 } 1964 } 1965 1966 static class RandomReadTest extends TableTest { 1967 private final Consistency consistency; 1968 private ArrayList<Get> gets; 1969 1970 RandomReadTest(Connection con, TestOptions options, Status status) { 1971 super(con, options, status); 1972 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1973 if (opts.multiGet > 0) { 1974 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1975 this.gets = new ArrayList<>(opts.multiGet); 1976 } 1977 } 1978 1979 @Override 1980 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1981 if (opts.randomSleep > 0) { 1982 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1983 } 1984 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1985 for (int family = 0; family < opts.families; family++) { 1986 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1987 if (opts.addColumns) { 1988 for (int column = 0; column < opts.columns; column++) { 1989 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1990 get.addColumn(familyName, qualifier); 1991 } 1992 } else { 1993 get.addFamily(familyName); 1994 } 1995 } 1996 if (opts.filterAll) { 1997 get.setFilter(new FilterAllFilter()); 1998 } 1999 get.setConsistency(consistency); 2000 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 2001 if (opts.multiGet > 0) { 2002 this.gets.add(get); 2003 if (this.gets.size() == opts.multiGet) { 2004 Result[] rs = this.table.get(this.gets); 2005 if (opts.replicas > 1) { 2006 long latency = System.nanoTime() - startTime; 2007 updateValueSize(rs, latency); 2008 } else { 2009 updateValueSize(rs); 2010 } 2011 this.gets.clear(); 2012 } else { 2013 return false; 2014 } 2015 } else { 2016 if (opts.replicas > 1) { 2017 Result r = this.table.get(get); 2018 long latency = System.nanoTime() - startTime; 2019 updateValueSize(r, latency); 2020 } else { 2021 updateValueSize(this.table.get(get)); 2022 } 2023 } 2024 return true; 2025 } 2026 2027 @Override 2028 protected int getReportingPeriod() { 2029 int period = opts.perClientRunRows / 10; 2030 return period == 0 ? opts.perClientRunRows : period; 2031 } 2032 2033 @Override 2034 protected void testTakedown() throws IOException { 2035 if (this.gets != null && this.gets.size() > 0) { 2036 this.table.get(gets); 2037 this.gets.clear(); 2038 } 2039 super.testTakedown(); 2040 } 2041 } 2042 2043 /* 2044 * Send random reads against fake regions inserted by MetaWriteTest 2045 */ 2046 static class MetaRandomReadTest extends MetaTest { 2047 private RegionLocator regionLocator; 2048 2049 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2050 super(con, options, status); 2051 LOG.info("call getRegionLocation"); 2052 } 2053 2054 @Override 2055 void onStartup() throws IOException { 2056 super.onStartup(); 2057 this.regionLocator = connection.getRegionLocator(table.getName()); 2058 } 2059 2060 @Override 2061 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2062 if (opts.randomSleep > 0) { 2063 Thread.sleep(rand.nextInt(opts.randomSleep)); 2064 } 2065 HRegionLocation hRegionLocation = 2066 regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true); 2067 LOG.debug("get location for region: " + hRegionLocation); 2068 return true; 2069 } 2070 2071 @Override 2072 protected int getReportingPeriod() { 2073 int period = opts.perClientRunRows / 10; 2074 return period == 0 ? opts.perClientRunRows : period; 2075 } 2076 2077 @Override 2078 protected void testTakedown() throws IOException { 2079 super.testTakedown(); 2080 } 2081 } 2082 2083 static class RandomWriteTest extends SequentialWriteTest { 2084 RandomWriteTest(Connection con, TestOptions options, Status status) { 2085 super(con, options, status); 2086 } 2087 2088 @Override 2089 protected byte[] generateRow(final int i) { 2090 return getRandomRow(this.rand, opts.totalRows); 2091 } 2092 2093 } 2094 2095 static class RandomDeleteTest extends SequentialDeleteTest { 2096 RandomDeleteTest(Connection con, TestOptions options, Status status) { 2097 super(con, options, status); 2098 } 2099 2100 @Override 2101 protected byte[] generateRow(final int i) { 2102 return getRandomRow(this.rand, opts.totalRows); 2103 } 2104 2105 } 2106 2107 static class ScanTest extends TableTest { 2108 private ResultScanner testScanner; 2109 2110 ScanTest(Connection con, TestOptions options, Status status) { 2111 super(con, options, status); 2112 } 2113 2114 @Override 2115 void testTakedown() throws IOException { 2116 if (this.testScanner != null) { 2117 this.testScanner.close(); 2118 } 2119 super.testTakedown(); 2120 } 2121 2122 @Override 2123 boolean testRow(final int i, final long startTime) throws IOException { 2124 if (this.testScanner == null) { 2125 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2126 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2127 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2128 for (int family = 0; family < opts.families; family++) { 2129 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2130 if (opts.addColumns) { 2131 for (int column = 0; column < opts.columns; column++) { 2132 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2133 scan.addColumn(familyName, qualifier); 2134 } 2135 } else { 2136 scan.addFamily(familyName); 2137 } 2138 } 2139 if (opts.filterAll) { 2140 scan.setFilter(new FilterAllFilter()); 2141 } 2142 this.testScanner = table.getScanner(scan); 2143 } 2144 Result r = testScanner.next(); 2145 updateValueSize(r); 2146 return true; 2147 } 2148 } 2149 2150 static class ReverseScanTest extends TableTest { 2151 private ResultScanner testScanner; 2152 2153 ReverseScanTest(Connection con, TestOptions options, Status status) { 2154 super(con, options, status); 2155 } 2156 2157 @Override 2158 void testTakedown() throws IOException { 2159 if (this.testScanner != null) { 2160 this.testScanner.close(); 2161 } 2162 super.testTakedown(); 2163 } 2164 2165 @Override 2166 boolean testRow(final int i, final long startTime) throws IOException { 2167 if (this.testScanner == null) { 2168 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2169 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2170 .setScanMetricsEnabled(true).setReversed(true); 2171 for (int family = 0; family < opts.families; family++) { 2172 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2173 if (opts.addColumns) { 2174 for (int column = 0; column < opts.columns; column++) { 2175 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2176 scan.addColumn(familyName, qualifier); 2177 } 2178 } else { 2179 scan.addFamily(familyName); 2180 } 2181 } 2182 if (opts.filterAll) { 2183 scan.setFilter(new FilterAllFilter()); 2184 } 2185 this.testScanner = table.getScanner(scan); 2186 } 2187 Result r = testScanner.next(); 2188 updateValueSize(r); 2189 return true; 2190 } 2191 } 2192 2193 /** 2194 * Base class for operations that are CAS-like; that read a value and then set it based off what 2195 * they read. In this category is increment, append, checkAndPut, etc. 2196 * <p> 2197 * These operations also want some concurrency going on. Usually when these tests run, they 2198 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2199 * same key space. We do this with our getStartRow and getLastRow overrides. 2200 */ 2201 static abstract class CASTableTest extends TableTest { 2202 private final byte[] qualifier; 2203 2204 CASTableTest(Connection con, TestOptions options, Status status) { 2205 super(con, options, status); 2206 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2207 } 2208 2209 byte[] getQualifier() { 2210 return this.qualifier; 2211 } 2212 2213 @Override 2214 int getStartRow() { 2215 return 0; 2216 } 2217 2218 @Override 2219 int getLastRow() { 2220 return opts.perClientRunRows; 2221 } 2222 } 2223 2224 static class IncrementTest extends CASTableTest { 2225 IncrementTest(Connection con, TestOptions options, Status status) { 2226 super(con, options, status); 2227 } 2228 2229 @Override 2230 boolean testRow(final int i, final long startTime) throws IOException { 2231 Increment increment = new Increment(format(i)); 2232 // unlike checkAndXXX tests, which make most sense to do on a single value, 2233 // if multiple families are specified for an increment test we assume it is 2234 // meant to raise the work factor 2235 for (int family = 0; family < opts.families; family++) { 2236 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2237 increment.addColumn(familyName, getQualifier(), 1l); 2238 } 2239 updateValueSize(this.table.increment(increment)); 2240 return true; 2241 } 2242 } 2243 2244 static class AppendTest extends CASTableTest { 2245 AppendTest(Connection con, TestOptions options, Status status) { 2246 super(con, options, status); 2247 } 2248 2249 @Override 2250 boolean testRow(final int i, final long startTime) throws IOException { 2251 byte[] bytes = format(i); 2252 Append append = new Append(bytes); 2253 // unlike checkAndXXX tests, which make most sense to do on a single value, 2254 // if multiple families are specified for an append test we assume it is 2255 // meant to raise the work factor 2256 for (int family = 0; family < opts.families; family++) { 2257 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2258 append.addColumn(familyName, getQualifier(), bytes); 2259 } 2260 updateValueSize(this.table.append(append)); 2261 return true; 2262 } 2263 } 2264 2265 static class CheckAndMutateTest extends CASTableTest { 2266 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2267 super(con, options, status); 2268 } 2269 2270 @Override 2271 boolean testRow(final int i, final long startTime) throws IOException { 2272 final byte[] bytes = format(i); 2273 // checkAndXXX tests operate on only a single value 2274 // Put a known value so when we go to check it, it is there. 2275 Put put = new Put(bytes); 2276 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2277 this.table.put(put); 2278 RowMutations mutations = new RowMutations(bytes); 2279 mutations.add(put); 2280 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2281 .thenMutate(mutations); 2282 return true; 2283 } 2284 } 2285 2286 static class CheckAndPutTest extends CASTableTest { 2287 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2288 super(con, options, status); 2289 } 2290 2291 @Override 2292 boolean testRow(final int i, final long startTime) throws IOException { 2293 final byte[] bytes = format(i); 2294 // checkAndXXX tests operate on only a single value 2295 // Put a known value so when we go to check it, it is there. 2296 Put put = new Put(bytes); 2297 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2298 this.table.put(put); 2299 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2300 .thenPut(put); 2301 return true; 2302 } 2303 } 2304 2305 static class CheckAndDeleteTest extends CASTableTest { 2306 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2307 super(con, options, status); 2308 } 2309 2310 @Override 2311 boolean testRow(final int i, final long startTime) throws IOException { 2312 final byte[] bytes = format(i); 2313 // checkAndXXX tests operate on only a single value 2314 // Put a known value so when we go to check it, it is there. 2315 Put put = new Put(bytes); 2316 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2317 this.table.put(put); 2318 Delete delete = new Delete(put.getRow()); 2319 delete.addColumn(FAMILY_ZERO, getQualifier()); 2320 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2321 .thenDelete(delete); 2322 return true; 2323 } 2324 } 2325 2326 /* 2327 * Delete all fake regions inserted to meta table by MetaWriteTest. 2328 */ 2329 static class CleanMetaTest extends MetaTest { 2330 CleanMetaTest(Connection con, TestOptions options, Status status) { 2331 super(con, options, status); 2332 } 2333 2334 @Override 2335 boolean testRow(final int i, final long startTime) throws IOException { 2336 try { 2337 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2338 .getRegionLocation(getSplitKey(i), false).getRegion(); 2339 LOG.debug("deleting region from meta: " + regionInfo); 2340 2341 Delete delete = 2342 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2343 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2344 t.delete(delete); 2345 } 2346 } catch (IOException ie) { 2347 // Log and continue 2348 LOG.error("cannot find region with start key: " + i); 2349 } 2350 return true; 2351 } 2352 } 2353 2354 static class SequentialReadTest extends TableTest { 2355 SequentialReadTest(Connection con, TestOptions options, Status status) { 2356 super(con, options, status); 2357 } 2358 2359 @Override 2360 boolean testRow(final int i, final long startTime) throws IOException { 2361 Get get = new Get(format(i)); 2362 for (int family = 0; family < opts.families; family++) { 2363 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2364 if (opts.addColumns) { 2365 for (int column = 0; column < opts.columns; column++) { 2366 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2367 get.addColumn(familyName, qualifier); 2368 } 2369 } else { 2370 get.addFamily(familyName); 2371 } 2372 } 2373 if (opts.filterAll) { 2374 get.setFilter(new FilterAllFilter()); 2375 } 2376 updateValueSize(table.get(get)); 2377 return true; 2378 } 2379 } 2380 2381 static class SequentialWriteTest extends BufferedMutatorTest { 2382 private ArrayList<Put> puts; 2383 2384 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2385 super(con, options, status); 2386 if (opts.multiPut > 0) { 2387 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2388 this.puts = new ArrayList<>(opts.multiPut); 2389 } 2390 } 2391 2392 protected byte[] generateRow(final int i) { 2393 return format(i); 2394 } 2395 2396 @Override 2397 boolean testRow(final int i, final long startTime) throws IOException { 2398 byte[] row = generateRow(i); 2399 Put put = new Put(row); 2400 for (int family = 0; family < opts.families; family++) { 2401 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2402 for (int column = 0; column < opts.columns; column++) { 2403 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2404 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2405 if (opts.useTags) { 2406 byte[] tag = generateData(this.rand, TAG_LENGTH); 2407 Tag[] tags = new Tag[opts.noOfTags]; 2408 for (int n = 0; n < opts.noOfTags; n++) { 2409 Tag t = new ArrayBackedTag((byte) n, tag); 2410 tags[n] = t; 2411 } 2412 KeyValue kv = 2413 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2414 put.add(kv); 2415 updateValueSize(kv.getValueLength()); 2416 } else { 2417 put.addColumn(familyName, qualifier, value); 2418 updateValueSize(value.length); 2419 } 2420 } 2421 } 2422 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2423 if (opts.autoFlush) { 2424 if (opts.multiPut > 0) { 2425 this.puts.add(put); 2426 if (this.puts.size() == opts.multiPut) { 2427 table.put(this.puts); 2428 this.puts.clear(); 2429 } else { 2430 return false; 2431 } 2432 } else { 2433 table.put(put); 2434 } 2435 } else { 2436 mutator.mutate(put); 2437 } 2438 return true; 2439 } 2440 } 2441 2442 static class SequentialDeleteTest extends BufferedMutatorTest { 2443 2444 SequentialDeleteTest(Connection con, TestOptions options, Status status) { 2445 super(con, options, status); 2446 } 2447 2448 protected byte[] generateRow(final int i) { 2449 return format(i); 2450 } 2451 2452 @Override 2453 boolean testRow(final int i, final long startTime) throws IOException { 2454 byte[] row = generateRow(i); 2455 Delete delete = new Delete(row); 2456 for (int family = 0; family < opts.families; family++) { 2457 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2458 delete.addFamily(familyName); 2459 } 2460 delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2461 if (opts.autoFlush) { 2462 table.delete(delete); 2463 } else { 2464 mutator.mutate(delete); 2465 } 2466 return true; 2467 } 2468 } 2469 2470 /* 2471 * Insert fake regions into meta table with contiguous split keys. 2472 */ 2473 static class MetaWriteTest extends MetaTest { 2474 2475 MetaWriteTest(Connection con, TestOptions options, Status status) { 2476 super(con, options, status); 2477 } 2478 2479 @Override 2480 boolean testRow(final int i, final long startTime) throws IOException { 2481 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2482 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2483 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2484 regionInfos.add(regionInfo); 2485 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2486 2487 // write the serverName columns 2488 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2489 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2490 EnvironmentEdgeManager.currentTime()); 2491 return true; 2492 } 2493 } 2494 2495 static class FilteredScanTest extends TableTest { 2496 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2497 2498 FilteredScanTest(Connection con, TestOptions options, Status status) { 2499 super(con, options, status); 2500 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2501 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2502 + ". This could take a very long time."); 2503 } 2504 } 2505 2506 @Override 2507 boolean testRow(int i, final long startTime) throws IOException { 2508 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2509 Scan scan = constructScan(value); 2510 ResultScanner scanner = null; 2511 try { 2512 scanner = this.table.getScanner(scan); 2513 for (Result r = null; (r = scanner.next()) != null;) { 2514 updateValueSize(r); 2515 } 2516 } finally { 2517 if (scanner != null) { 2518 updateScanMetrics(scanner.getScanMetrics()); 2519 scanner.close(); 2520 } 2521 } 2522 return true; 2523 } 2524 2525 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2526 FilterList list = new FilterList(); 2527 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2528 new BinaryComparator(valuePrefix)); 2529 list.addFilter(filter); 2530 if (opts.filterAll) { 2531 list.addFilter(new FilterAllFilter()); 2532 } 2533 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2534 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2535 .setScanMetricsEnabled(true); 2536 if (opts.addColumns) { 2537 for (int column = 0; column < opts.columns; column++) { 2538 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2539 scan.addColumn(FAMILY_ZERO, qualifier); 2540 } 2541 } else { 2542 scan.addFamily(FAMILY_ZERO); 2543 } 2544 scan.setFilter(list); 2545 return scan; 2546 } 2547 } 2548 2549 /** 2550 * Compute a throughput rate in MB/s. 2551 * @param rows Number of records consumed. 2552 * @param timeMs Time taken in milliseconds. 2553 * @return String value with label, ie '123.76 MB/s' 2554 */ 2555 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2556 int columns) { 2557 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2558 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2559 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2560 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2561 return FMT.format(mbps) + " MB/s"; 2562 } 2563 2564 /* 2565 * Format passed integer. 2566 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does 2567 * absolute in case number is negative). 2568 */ 2569 public static byte[] format(final int number) { 2570 byte[] b = new byte[ROW_LENGTH]; 2571 int d = Math.abs(number); 2572 for (int i = b.length - 1; i >= 0; i--) { 2573 b[i] = (byte) ((d % 10) + '0'); 2574 d /= 10; 2575 } 2576 return b; 2577 } 2578 2579 /* 2580 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2581 * test, generation of the key and value consumes about 30% of CPU time. 2582 * @return Generated random value to insert into a table cell. 2583 */ 2584 public static byte[] generateData(final Random r, int length) { 2585 byte[] b = new byte[length]; 2586 int i; 2587 2588 for (i = 0; i < (length - 8); i += 8) { 2589 b[i] = (byte) (65 + r.nextInt(26)); 2590 b[i + 1] = b[i]; 2591 b[i + 2] = b[i]; 2592 b[i + 3] = b[i]; 2593 b[i + 4] = b[i]; 2594 b[i + 5] = b[i]; 2595 b[i + 6] = b[i]; 2596 b[i + 7] = b[i]; 2597 } 2598 2599 byte a = (byte) (65 + r.nextInt(26)); 2600 for (; i < length; i++) { 2601 b[i] = a; 2602 } 2603 return b; 2604 } 2605 2606 static byte[] getRandomRow(final Random random, final int totalRows) { 2607 return format(generateRandomRow(random, totalRows)); 2608 } 2609 2610 static int generateRandomRow(final Random random, final int totalRows) { 2611 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2612 } 2613 2614 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2615 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2616 throws IOException, InterruptedException { 2617 status.setStatus( 2618 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2619 long totalElapsedTime; 2620 2621 final TestBase t; 2622 try { 2623 if (AsyncTest.class.isAssignableFrom(cmd)) { 2624 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2625 Constructor<? extends AsyncTest> constructor = 2626 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2627 t = constructor.newInstance(asyncCon, opts, status); 2628 } else { 2629 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2630 Constructor<? extends Test> constructor = 2631 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2632 t = constructor.newInstance(con, opts, status); 2633 } 2634 } catch (NoSuchMethodException e) { 2635 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2636 + ". It does not provide a constructor as described by " 2637 + "the javadoc comment. Available constructors are: " 2638 + Arrays.toString(cmd.getConstructors())); 2639 } catch (Exception e) { 2640 throw new IllegalStateException("Failed to construct command class", e); 2641 } 2642 totalElapsedTime = t.test(); 2643 2644 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2645 + " for " + opts.perClientRunRows + " rows" + " (" 2646 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2647 getAverageValueLength(opts), opts.families, opts.columns) 2648 + ")"); 2649 2650 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2651 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2652 } 2653 2654 private static int getAverageValueLength(final TestOptions opts) { 2655 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2656 } 2657 2658 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2659 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2660 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2661 // the TestOptions introspection for us and dump the output in a readable format. 2662 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2663 Admin admin = null; 2664 Connection connection = null; 2665 try { 2666 connection = ConnectionFactory.createConnection(getConf()); 2667 admin = connection.getAdmin(); 2668 checkTable(admin, opts); 2669 } finally { 2670 if (admin != null) admin.close(); 2671 if (connection != null) connection.close(); 2672 } 2673 if (opts.nomapred) { 2674 doLocalClients(opts, getConf()); 2675 } else { 2676 doMapReduce(opts, getConf()); 2677 } 2678 } 2679 2680 protected void printUsage() { 2681 printUsage(PE_COMMAND_SHORTNAME, null); 2682 } 2683 2684 protected static void printUsage(final String message) { 2685 printUsage(PE_COMMAND_SHORTNAME, message); 2686 } 2687 2688 protected static void printUsageAndExit(final String message, final int exitCode) { 2689 printUsage(message); 2690 System.exit(exitCode); 2691 } 2692 2693 protected static void printUsage(final String shortName, final String message) { 2694 if (message != null && message.length() > 0) { 2695 System.err.println(message); 2696 } 2697 System.err.print("Usage: hbase " + shortName); 2698 System.err.println(" <OPTIONS> [-D<property=value>]* <command|class> <nclients>"); 2699 System.err.println(); 2700 System.err.println("General Options:"); 2701 System.err.println( 2702 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2703 System.err 2704 .println(" oneCon all the threads share the same connection. Default: False"); 2705 System.err.println(" connCount connections all threads share. " 2706 + "For example, if set to 2, then all thread share 2 connection. " 2707 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2708 + "if not, connCount=thread number"); 2709 2710 System.err.println(" sampleRate Execute test on a sample of total " 2711 + "rows. Only supported by randomRead. Default: 1.0"); 2712 System.err.println(" period Report every 'period' rows: " 2713 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2714 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2715 System.err.println( 2716 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2717 System.err.println(" latency Set to report operation latencies. Default: False"); 2718 System.err.println(" latencyThreshold Set to report number of operations with latency " 2719 + "over lantencyThreshold, unit in millisecond, default 0"); 2720 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2721 + " rows have been treated. Default: 0"); 2722 System.err 2723 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2724 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2725 + "'valueSize'; set on read for stats on size: Default: Not set."); 2726 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2727 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2728 System.err.println(); 2729 System.err.println("Table Creation / Write Tests:"); 2730 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2731 System.err.println( 2732 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2733 + ". In case of randomReads and randomSeekScans this could" 2734 + " be specified along with --size to specify the number of rows to be scanned within" 2735 + " the total range specified by the size."); 2736 System.err.println( 2737 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2738 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2739 + " use size to specify the end range and --rows" 2740 + " specifies the number of rows within that range. " + "Default: 1.0."); 2741 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2742 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2743 System.err.println( 2744 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2745 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2746 + "'valueSize' in zipf form: Default: Not set."); 2747 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2748 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2749 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2750 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2751 System.err.println(" presplit Create presplit table. If a table with same name exists," 2752 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2753 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2754 System.err.println( 2755 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2756 System.err.println(" numoftags Specify the no of tags that would be needed. " 2757 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2758 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2759 System.err.println(" columns Columns to write per row. Default: 1"); 2760 System.err 2761 .println(" families Specify number of column families for the table. Default: 1"); 2762 System.err.println(); 2763 System.err.println("Read Tests:"); 2764 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2765 + " there by not returning any thing back to the client. Helps to check the server side" 2766 + " performance. Uses FilterAllFilter internally. "); 2767 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2768 + "by randomRead. Default: disabled"); 2769 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2770 + "inmemory as far as possible. Not guaranteed that reads are always served " 2771 + "from memory. Default: false"); 2772 System.err 2773 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2774 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2775 System.err 2776 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2777 + "Uses the CompactingMemstore"); 2778 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2779 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2780 System.err.println( 2781 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2782 System.err.println(" caching Scan caching to use. Default: 30"); 2783 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2784 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2785 System.err.println( 2786 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2787 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2788 System.err.println(); 2789 System.err.println(" Note: -D properties will be applied to the conf used. "); 2790 System.err.println(" For example: "); 2791 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2792 System.err.println(" -Dmapreduce.task.timeout=60000"); 2793 System.err.println(); 2794 System.err.println("Command:"); 2795 for (CmdDescriptor command : COMMANDS.values()) { 2796 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2797 } 2798 System.err.println(); 2799 System.err.println("Class:"); 2800 System.err.println("To run any custom implementation of PerformanceEvaluation.Test, " 2801 + "provide the classname of the implementaion class in place of " 2802 + "command name and it will be loaded at runtime from classpath.:"); 2803 System.err.println("Please consider to contribute back " 2804 + "this custom test impl into a builtin PE command for the benefit of the community"); 2805 System.err.println(); 2806 System.err.println("Args:"); 2807 System.err.println(" nclients Integer. Required. Total number of clients " 2808 + "(and HRegionServers) running. 1 <= value <= 500"); 2809 System.err.println("Examples:"); 2810 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2811 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2812 System.err.println(" To run 10 clients doing increments over ten rows:"); 2813 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2814 } 2815 2816 /** 2817 * Parse options passed in via an arguments array. Assumes that array has been split on 2818 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2819 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2820 * arguments. 2821 */ 2822 static TestOptions parseOpts(Queue<String> args) { 2823 TestOptions opts = new TestOptions(); 2824 2825 String cmd = null; 2826 while ((cmd = args.poll()) != null) { 2827 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2828 // place item back onto queue so that caller knows parsing was incomplete 2829 args.add(cmd); 2830 break; 2831 } 2832 2833 final String nmr = "--nomapred"; 2834 if (cmd.startsWith(nmr)) { 2835 opts.nomapred = true; 2836 continue; 2837 } 2838 2839 final String rows = "--rows="; 2840 if (cmd.startsWith(rows)) { 2841 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2842 continue; 2843 } 2844 2845 final String cycles = "--cycles="; 2846 if (cmd.startsWith(cycles)) { 2847 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2848 continue; 2849 } 2850 2851 final String sampleRate = "--sampleRate="; 2852 if (cmd.startsWith(sampleRate)) { 2853 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2854 continue; 2855 } 2856 2857 final String table = "--table="; 2858 if (cmd.startsWith(table)) { 2859 opts.tableName = cmd.substring(table.length()); 2860 continue; 2861 } 2862 2863 final String startRow = "--startRow="; 2864 if (cmd.startsWith(startRow)) { 2865 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2866 continue; 2867 } 2868 2869 final String compress = "--compress="; 2870 if (cmd.startsWith(compress)) { 2871 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2872 continue; 2873 } 2874 2875 final String encryption = "--encryption="; 2876 if (cmd.startsWith(encryption)) { 2877 opts.encryption = cmd.substring(encryption.length()); 2878 continue; 2879 } 2880 2881 final String traceRate = "--traceRate="; 2882 if (cmd.startsWith(traceRate)) { 2883 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2884 continue; 2885 } 2886 2887 final String blockEncoding = "--blockEncoding="; 2888 if (cmd.startsWith(blockEncoding)) { 2889 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2890 continue; 2891 } 2892 2893 final String flushCommits = "--flushCommits="; 2894 if (cmd.startsWith(flushCommits)) { 2895 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2896 continue; 2897 } 2898 2899 final String writeToWAL = "--writeToWAL="; 2900 if (cmd.startsWith(writeToWAL)) { 2901 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2902 continue; 2903 } 2904 2905 final String presplit = "--presplit="; 2906 if (cmd.startsWith(presplit)) { 2907 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2908 continue; 2909 } 2910 2911 final String inMemory = "--inmemory="; 2912 if (cmd.startsWith(inMemory)) { 2913 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2914 continue; 2915 } 2916 2917 final String autoFlush = "--autoFlush="; 2918 if (cmd.startsWith(autoFlush)) { 2919 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2920 continue; 2921 } 2922 2923 final String onceCon = "--oneCon="; 2924 if (cmd.startsWith(onceCon)) { 2925 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2926 continue; 2927 } 2928 2929 final String connCount = "--connCount="; 2930 if (cmd.startsWith(connCount)) { 2931 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2932 continue; 2933 } 2934 2935 final String latencyThreshold = "--latencyThreshold="; 2936 if (cmd.startsWith(latencyThreshold)) { 2937 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2938 continue; 2939 } 2940 2941 final String latency = "--latency"; 2942 if (cmd.startsWith(latency)) { 2943 opts.reportLatency = true; 2944 continue; 2945 } 2946 2947 final String multiGet = "--multiGet="; 2948 if (cmd.startsWith(multiGet)) { 2949 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2950 continue; 2951 } 2952 2953 final String multiPut = "--multiPut="; 2954 if (cmd.startsWith(multiPut)) { 2955 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2956 continue; 2957 } 2958 2959 final String useTags = "--usetags="; 2960 if (cmd.startsWith(useTags)) { 2961 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2962 continue; 2963 } 2964 2965 final String noOfTags = "--numoftags="; 2966 if (cmd.startsWith(noOfTags)) { 2967 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2968 continue; 2969 } 2970 2971 final String replicas = "--replicas="; 2972 if (cmd.startsWith(replicas)) { 2973 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2974 continue; 2975 } 2976 2977 final String filterOutAll = "--filterAll"; 2978 if (cmd.startsWith(filterOutAll)) { 2979 opts.filterAll = true; 2980 continue; 2981 } 2982 2983 final String size = "--size="; 2984 if (cmd.startsWith(size)) { 2985 opts.size = Float.parseFloat(cmd.substring(size.length())); 2986 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2987 continue; 2988 } 2989 2990 final String splitPolicy = "--splitPolicy="; 2991 if (cmd.startsWith(splitPolicy)) { 2992 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2993 continue; 2994 } 2995 2996 final String randomSleep = "--randomSleep="; 2997 if (cmd.startsWith(randomSleep)) { 2998 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2999 continue; 3000 } 3001 3002 final String measureAfter = "--measureAfter="; 3003 if (cmd.startsWith(measureAfter)) { 3004 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 3005 continue; 3006 } 3007 3008 final String bloomFilter = "--bloomFilter="; 3009 if (cmd.startsWith(bloomFilter)) { 3010 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 3011 continue; 3012 } 3013 3014 final String blockSize = "--blockSize="; 3015 if (cmd.startsWith(blockSize)) { 3016 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 3017 continue; 3018 } 3019 3020 final String valueSize = "--valueSize="; 3021 if (cmd.startsWith(valueSize)) { 3022 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 3023 continue; 3024 } 3025 3026 final String valueRandom = "--valueRandom"; 3027 if (cmd.startsWith(valueRandom)) { 3028 opts.valueRandom = true; 3029 continue; 3030 } 3031 3032 final String valueZipf = "--valueZipf"; 3033 if (cmd.startsWith(valueZipf)) { 3034 opts.valueZipf = true; 3035 continue; 3036 } 3037 3038 final String period = "--period="; 3039 if (cmd.startsWith(period)) { 3040 opts.period = Integer.parseInt(cmd.substring(period.length())); 3041 continue; 3042 } 3043 3044 final String addColumns = "--addColumns="; 3045 if (cmd.startsWith(addColumns)) { 3046 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 3047 continue; 3048 } 3049 3050 final String inMemoryCompaction = "--inmemoryCompaction="; 3051 if (cmd.startsWith(inMemoryCompaction)) { 3052 opts.inMemoryCompaction = 3053 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 3054 continue; 3055 } 3056 3057 final String columns = "--columns="; 3058 if (cmd.startsWith(columns)) { 3059 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 3060 continue; 3061 } 3062 3063 final String families = "--families="; 3064 if (cmd.startsWith(families)) { 3065 opts.families = Integer.parseInt(cmd.substring(families.length())); 3066 continue; 3067 } 3068 3069 final String caching = "--caching="; 3070 if (cmd.startsWith(caching)) { 3071 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 3072 continue; 3073 } 3074 3075 final String asyncPrefetch = "--asyncPrefetch"; 3076 if (cmd.startsWith(asyncPrefetch)) { 3077 opts.asyncPrefetch = true; 3078 continue; 3079 } 3080 3081 final String cacheBlocks = "--cacheBlocks="; 3082 if (cmd.startsWith(cacheBlocks)) { 3083 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 3084 continue; 3085 } 3086 3087 final String scanReadType = "--scanReadType="; 3088 if (cmd.startsWith(scanReadType)) { 3089 opts.scanReadType = 3090 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 3091 continue; 3092 } 3093 3094 final String bufferSize = "--bufferSize="; 3095 if (cmd.startsWith(bufferSize)) { 3096 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 3097 continue; 3098 } 3099 3100 final String commandPropertiesFile = "--commandPropertiesFile="; 3101 if (cmd.startsWith(commandPropertiesFile)) { 3102 String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length())); 3103 Properties properties = new Properties(); 3104 try { 3105 properties 3106 .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName)); 3107 opts.commandProperties = properties; 3108 } catch (IOException e) { 3109 LOG.error("Failed to load metricIds from properties file", e); 3110 } 3111 continue; 3112 } 3113 3114 validateParsedOpts(opts); 3115 3116 if (isCommandClass(cmd)) { 3117 opts.cmdName = cmd; 3118 try { 3119 opts.numClientThreads = Integer.parseInt(args.remove()); 3120 } catch (NoSuchElementException | NumberFormatException e) { 3121 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 3122 } 3123 opts = calculateRowsAndSize(opts); 3124 break; 3125 } else { 3126 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 3127 } 3128 3129 // Not matching any option or command. 3130 System.err.println("Error: Wrong option or command: " + cmd); 3131 args.add(cmd); 3132 break; 3133 } 3134 return opts; 3135 } 3136 3137 /** 3138 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3139 */ 3140 private static void validateParsedOpts(TestOptions opts) { 3141 3142 if (!opts.autoFlush && opts.multiPut > 0) { 3143 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3144 } 3145 3146 if (opts.oneCon && opts.connCount > 1) { 3147 throw new IllegalArgumentException( 3148 "oneCon is set to true, " + "connCount should not bigger than 1"); 3149 } 3150 3151 if (opts.valueZipf && opts.valueRandom) { 3152 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3153 } 3154 } 3155 3156 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3157 int rowsPerGB = getRowsPerGB(opts); 3158 if ( 3159 (opts.getCmdName() != null 3160 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3161 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3162 ) { 3163 opts.totalRows = (int) (opts.size * rowsPerGB); 3164 } else if (opts.size != DEFAULT_OPTS.size) { 3165 // total size in GB specified 3166 opts.totalRows = (int) (opts.size * rowsPerGB); 3167 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3168 } else { 3169 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3170 // Cast to float to ensure floating-point division 3171 opts.size = (float) opts.totalRows / rowsPerGB; 3172 } 3173 return opts; 3174 } 3175 3176 static int getRowsPerGB(final TestOptions opts) { 3177 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3178 * opts.getColumns()); 3179 } 3180 3181 @Override 3182 public int run(String[] args) throws Exception { 3183 // Process command-line args. TODO: Better cmd-line processing 3184 // (but hopefully something not as painful as cli options). 3185 int errCode = -1; 3186 if (args.length < 1) { 3187 printUsage(); 3188 return errCode; 3189 } 3190 3191 try { 3192 LinkedList<String> argv = new LinkedList<>(); 3193 argv.addAll(Arrays.asList(args)); 3194 TestOptions opts = parseOpts(argv); 3195 3196 // args remaining, print help and exit 3197 if (!argv.isEmpty()) { 3198 errCode = 0; 3199 printUsage(); 3200 return errCode; 3201 } 3202 3203 // must run at least 1 client 3204 if (opts.numClientThreads <= 0) { 3205 throw new IllegalArgumentException("Number of clients must be > 0"); 3206 } 3207 3208 // cmdName should not be null, print help and exit 3209 if (opts.cmdName == null) { 3210 printUsage(); 3211 return errCode; 3212 } 3213 3214 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3215 if (cmdClass != null) { 3216 runTest(cmdClass, opts); 3217 errCode = 0; 3218 } 3219 3220 } catch (Exception e) { 3221 e.printStackTrace(); 3222 } 3223 3224 return errCode; 3225 } 3226 3227 private static boolean isCommandClass(String cmd) { 3228 return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd); 3229 } 3230 3231 private static boolean isCustomTestClass(String cmd) { 3232 Class<? extends Test> cmdClass; 3233 try { 3234 cmdClass = 3235 (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd); 3236 addCommandDescriptor(cmdClass, cmd, "custom command"); 3237 return true; 3238 } catch (Throwable th) { 3239 LOG.info("No class found for command: " + cmd, th); 3240 return false; 3241 } 3242 } 3243 3244 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3245 CmdDescriptor descriptor = COMMANDS.get(cmd); 3246 return descriptor != null ? descriptor.getCmdClass() : null; 3247 } 3248 3249 public static void main(final String[] args) throws Exception { 3250 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3251 System.exit(res); 3252 } 3253}