001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Properties; 040import java.util.Queue; 041import java.util.Random; 042import java.util.TreeMap; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048import java.util.concurrent.ThreadLocalRandom; 049import java.util.stream.Collectors; 050import org.apache.commons.lang3.StringUtils; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.conf.Configured; 053import org.apache.hadoop.fs.FileSystem; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.client.Append; 057import org.apache.hadoop.hbase.client.AsyncConnection; 058import org.apache.hadoop.hbase.client.AsyncTable; 059import org.apache.hadoop.hbase.client.BufferedMutator; 060import org.apache.hadoop.hbase.client.BufferedMutatorParams; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.ConnectionFactory; 064import org.apache.hadoop.hbase.client.Consistency; 065import org.apache.hadoop.hbase.client.Delete; 066import org.apache.hadoop.hbase.client.Durability; 067import org.apache.hadoop.hbase.client.Get; 068import org.apache.hadoop.hbase.client.Increment; 069import org.apache.hadoop.hbase.client.Put; 070import org.apache.hadoop.hbase.client.RegionInfo; 071import org.apache.hadoop.hbase.client.RegionInfoBuilder; 072import org.apache.hadoop.hbase.client.RegionLocator; 073import org.apache.hadoop.hbase.client.Result; 074import org.apache.hadoop.hbase.client.ResultScanner; 075import org.apache.hadoop.hbase.client.RowMutations; 076import org.apache.hadoop.hbase.client.Scan; 077import org.apache.hadoop.hbase.client.Table; 078import org.apache.hadoop.hbase.client.TableDescriptor; 079import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 080import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 081import org.apache.hadoop.hbase.filter.BinaryComparator; 082import org.apache.hadoop.hbase.filter.Filter; 083import org.apache.hadoop.hbase.filter.FilterAllFilter; 084import org.apache.hadoop.hbase.filter.FilterList; 085import org.apache.hadoop.hbase.filter.PageFilter; 086import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 087import org.apache.hadoop.hbase.filter.WhileMatchFilter; 088import org.apache.hadoop.hbase.io.compress.Compression; 089import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 090import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 091import org.apache.hadoop.hbase.regionserver.BloomType; 092import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 093import org.apache.hadoop.hbase.trace.TraceUtil; 094import org.apache.hadoop.hbase.util.ByteArrayHashKey; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.GsonUtil; 098import org.apache.hadoop.hbase.util.Hash; 099import org.apache.hadoop.hbase.util.MurmurHash; 100import org.apache.hadoop.hbase.util.Pair; 101import org.apache.hadoop.hbase.util.RandomDistribution; 102import org.apache.hadoop.hbase.util.YammerHistogramUtils; 103import org.apache.hadoop.io.LongWritable; 104import org.apache.hadoop.io.Text; 105import org.apache.hadoop.mapreduce.Job; 106import org.apache.hadoop.mapreduce.Mapper; 107import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 108import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 109import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 110import org.apache.hadoop.util.Tool; 111import org.apache.hadoop.util.ToolRunner; 112import org.apache.yetus.audience.InterfaceAudience; 113import org.slf4j.Logger; 114import org.slf4j.LoggerFactory; 115 116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 117import org.apache.hbase.thirdparty.com.google.gson.Gson; 118 119/** 120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 122 * etc.). Pass on the command-line which test to run and how many clients are participating in this 123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 124 * <p> 125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 127 * pages 8-10. 128 * <p> 129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 131 * about 1GB of data, unless specified otherwise. 132 */ 133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 134public class PerformanceEvaluation extends Configured implements Tool { 135 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 136 static final String RANDOM_READ = "randomRead"; 137 static final String PE_COMMAND_SHORTNAME = "pe"; 138 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 139 private static final Gson GSON = GsonUtil.createGson().create(); 140 141 public static final String TABLE_NAME = "TestTable"; 142 public static final String FAMILY_NAME_BASE = "info"; 143 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 144 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 145 public static final int DEFAULT_VALUE_LENGTH = 1000; 146 public static final int ROW_LENGTH = 26; 147 148 private static final int ONE_GB = 1024 * 1024 * 1000; 149 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 150 // TODO : should we make this configurable 151 private static final int TAG_LENGTH = 256; 152 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 153 private static final MathContext CXT = MathContext.DECIMAL64; 154 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 155 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 156 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 157 158 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 159 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 160 161 static { 162 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 163 "Run async random read test"); 164 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 165 "Run async random write test"); 166 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 167 "Run async sequential read test"); 168 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 169 "Run async sequential write test"); 170 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 171 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 172 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 173 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 174 "Run random seek and scan 100 test"); 175 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 176 "Run random seek scan with both start and stop row (max 10 rows)"); 177 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 178 "Run random seek scan with both start and stop row (max 100 rows)"); 179 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 180 "Run random seek scan with both start and stop row (max 1000 rows)"); 181 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 182 "Run random seek scan with both start and stop row (max 10000 rows)"); 183 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 184 addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test"); 185 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 186 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 187 addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete", 188 "Run sequential delete test"); 189 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 190 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 191 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 192 addCommandDescriptor(ReverseScanTest.class, "reverseScan", 193 "Run reverse scan test (read every row)"); 194 addCommandDescriptor(FilteredScanTest.class, "filterScan", 195 "Run scan test using a filter to find a specific row based on it's value " 196 + "(make sure to use --rows=20)"); 197 addCommandDescriptor(IncrementTest.class, "increment", 198 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(AppendTest.class, "append", 200 "Append on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 202 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 203 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 204 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 205 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 206 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 207 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 208 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 209 } 210 211 /** 212 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 213 * associated properties. 214 */ 215 protected static enum Counter { 216 /** elapsed time */ 217 ELAPSED_TIME, 218 /** number of rows */ 219 ROWS 220 } 221 222 protected static class RunResult implements Comparable<RunResult> { 223 public RunResult(long duration, Histogram hist) { 224 this.duration = duration; 225 this.hist = hist; 226 numbOfReplyOverThreshold = 0; 227 numOfReplyFromReplica = 0; 228 } 229 230 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 231 Histogram hist) { 232 this.duration = duration; 233 this.hist = hist; 234 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 235 this.numOfReplyFromReplica = numOfReplyFromReplica; 236 } 237 238 public final long duration; 239 public final Histogram hist; 240 public final long numbOfReplyOverThreshold; 241 public final long numOfReplyFromReplica; 242 243 @Override 244 public String toString() { 245 return Long.toString(duration); 246 } 247 248 @Override 249 public int compareTo(RunResult o) { 250 return Long.compare(this.duration, o.duration); 251 } 252 253 @Override 254 public boolean equals(Object obj) { 255 if (this == obj) { 256 return true; 257 } 258 if (obj == null || getClass() != obj.getClass()) { 259 return false; 260 } 261 return this.compareTo((RunResult) obj) == 0; 262 } 263 264 @Override 265 public int hashCode() { 266 return Long.hashCode(duration); 267 } 268 } 269 270 /** 271 * Constructor 272 * @param conf Configuration object 273 */ 274 public PerformanceEvaluation(final Configuration conf) { 275 super(conf); 276 } 277 278 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 279 String description) { 280 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 281 COMMANDS.put(name, cmdDescriptor); 282 } 283 284 /** 285 * Implementations can have their status set. 286 */ 287 interface Status { 288 /** 289 * Sets status 290 * @param msg status message 291 */ 292 void setStatus(final String msg) throws IOException; 293 } 294 295 /** 296 * MapReduce job that runs a performance evaluation client in each map task. 297 */ 298 public static class EvaluationMapTask 299 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 300 301 /** configuration parameter name that contains the command */ 302 public final static String CMD_KEY = "EvaluationMapTask.command"; 303 /** configuration parameter name that contains the PE impl */ 304 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 305 306 private Class<? extends Test> cmd; 307 308 @Override 309 protected void setup(Context context) throws IOException, InterruptedException { 310 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 311 312 // this is required so that extensions of PE are instantiated within the 313 // map reduce task... 314 Class<? extends PerformanceEvaluation> peClass = 315 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 316 try { 317 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 318 } catch (Exception e) { 319 throw new IllegalStateException("Could not instantiate PE instance", e); 320 } 321 } 322 323 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 324 try { 325 return Class.forName(className).asSubclass(type); 326 } catch (ClassNotFoundException e) { 327 throw new IllegalStateException("Could not find class for name: " + className, e); 328 } 329 } 330 331 @Override 332 protected void map(LongWritable key, Text value, final Context context) 333 throws IOException, InterruptedException { 334 335 Status status = new Status() { 336 @Override 337 public void setStatus(String msg) { 338 context.setStatus(msg); 339 } 340 }; 341 342 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 343 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 344 final Connection con = ConnectionFactory.createConnection(conf); 345 AsyncConnection asyncCon = null; 346 try { 347 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 348 } catch (ExecutionException e) { 349 throw new IOException(e); 350 } 351 352 // Evaluation task 353 RunResult result = 354 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 355 // Collect how much time the thing took. Report as map output and 356 // to the ELAPSED_TIME counter. 357 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 358 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 359 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 360 context.progress(); 361 } 362 } 363 364 /* 365 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 366 * is specified or when the existing table's region replica count doesn't match {@code 367 * opts.replicas}. 368 */ 369 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 370 final TableName tableName = TableName.valueOf(opts.tableName); 371 final boolean exists = admin.tableExists(tableName); 372 final boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 373 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 374 final boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete"); 375 final boolean needsData = isReadCmd || isDeleteCmd; 376 if (!exists && needsData) { 377 throw new IllegalStateException( 378 "Must specify an existing table for read/delete commands. Run a write command first."); 379 } 380 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 381 final byte[][] splits = getSplits(opts); 382 383 // recreate the table when user has requested presplit or when existing 384 // {RegionSplitPolicy,replica count} does not match requested, or when the 385 // number of column families does not match requested. 386 final boolean regionCountChanged = 387 exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions 388 && opts.presplitRegions != admin.getRegions(tableName).size(); 389 final boolean splitPolicyChanged = 390 exists && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy); 391 final boolean regionReplicationChanged = exists && desc.getRegionReplication() != opts.replicas; 392 final boolean columnFamilyCountChanged = exists && desc.getColumnFamilyCount() != opts.families; 393 394 boolean needsDelete = regionCountChanged || splitPolicyChanged || regionReplicationChanged 395 || columnFamilyCountChanged; 396 397 if (needsDelete) { 398 final List<String> errors = new ArrayList<>(); 399 if (columnFamilyCountChanged) { 400 final String error = String.format("--families=%d, but found %d column families", 401 opts.families, desc.getColumnFamilyCount()); 402 if (needsData) { 403 // We can't proceed the test in this case 404 throw new IllegalStateException( 405 "Cannot proceed the test. Run a write command first: " + error); 406 } 407 errors.add(error); 408 } 409 if (regionCountChanged) { 410 errors.add(String.format("--presplit=%d, but found %d regions", opts.presplitRegions, 411 admin.getRegions(tableName).size())); 412 } 413 if (splitPolicyChanged) { 414 errors.add(String.format("--splitPolicy=%s, but current policy is %s", opts.splitPolicy, 415 desc.getRegionSplitPolicyClassName())); 416 } 417 if (regionReplicationChanged) { 418 errors.add(String.format("--replicas=%d, but found %d replicas", opts.replicas, 419 desc.getRegionReplication())); 420 } 421 final String reason = 422 errors.stream().map(s -> '[' + s + ']').collect(Collectors.joining(", ")); 423 424 if (needsData) { 425 LOG.warn("Unexpected or incorrect options provided for {}. " 426 + "Please verify whether the detected inconsistencies are expected or ignorable: {}. " 427 + "The test will proceed, but the results may not be reliable.", opts.cmdName, reason); 428 needsDelete = false; 429 } else { 430 LOG.info("Table will be recreated: " + reason); 431 } 432 } 433 434 // remove an existing table 435 if (needsDelete) { 436 if (admin.isTableEnabled(tableName)) { 437 admin.disableTable(tableName); 438 } 439 admin.deleteTable(tableName); 440 } 441 442 // table creation is necessary 443 if (!exists || needsDelete) { 444 desc = getTableDescriptor(opts); 445 if (splits != null) { 446 if (LOG.isDebugEnabled()) { 447 for (int i = 0; i < splits.length; i++) { 448 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 449 } 450 } 451 } 452 if (splits != null) { 453 admin.createTable(desc, splits); 454 } else { 455 admin.createTable(desc); 456 } 457 LOG.info("Table " + desc + " created"); 458 } 459 return admin.tableExists(tableName); 460 } 461 462 /** 463 * Create an HTableDescriptor from provided TestOptions. 464 */ 465 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 466 TableDescriptorBuilder builder = 467 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 468 469 for (int family = 0; family < opts.families; family++) { 470 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 471 ColumnFamilyDescriptorBuilder cfBuilder = 472 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 473 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 474 cfBuilder.setCompressionType(opts.compression); 475 cfBuilder.setEncryptionType(opts.encryption); 476 cfBuilder.setBloomFilterType(opts.bloomType); 477 cfBuilder.setBlocksize(opts.blockSize); 478 if (opts.inMemoryCF) { 479 cfBuilder.setInMemory(true); 480 } 481 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 482 builder.setColumnFamily(cfBuilder.build()); 483 } 484 if (opts.replicas != DEFAULT_OPTS.replicas) { 485 builder.setRegionReplication(opts.replicas); 486 } 487 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 488 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 489 } 490 return builder.build(); 491 } 492 493 /** 494 * generates splits based on total number of rows and specified split regions 495 */ 496 protected static byte[][] getSplits(TestOptions opts) { 497 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 498 499 int numSplitPoints = opts.presplitRegions - 1; 500 byte[][] splits = new byte[numSplitPoints][]; 501 long jump = opts.totalRows / opts.presplitRegions; 502 for (int i = 0; i < numSplitPoints; i++) { 503 long rowkey = jump * (1 + i); 504 splits[i] = format(rowkey); 505 } 506 return splits; 507 } 508 509 static void setupConnectionCount(final TestOptions opts) { 510 if (opts.oneCon) { 511 opts.connCount = 1; 512 } else { 513 if (opts.connCount == -1) { 514 // set to thread number if connCount is not set 515 opts.connCount = opts.numClientThreads; 516 } 517 } 518 } 519 520 /* 521 * Run all clients in this vm each to its own thread. 522 */ 523 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 524 throws IOException, InterruptedException, ExecutionException { 525 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 526 assert cmd != null; 527 @SuppressWarnings("unchecked") 528 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 529 RunResult[] results = new RunResult[opts.numClientThreads]; 530 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 531 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 532 setupConnectionCount(opts); 533 final Connection[] cons = new Connection[opts.connCount]; 534 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 535 for (int i = 0; i < opts.connCount; i++) { 536 cons[i] = ConnectionFactory.createConnection(conf); 537 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 538 } 539 LOG 540 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 541 for (int i = 0; i < threads.length; i++) { 542 final int index = i; 543 threads[i] = pool.submit(new Callable<RunResult>() { 544 @Override 545 public RunResult call() throws Exception { 546 TestOptions threadOpts = new TestOptions(opts); 547 final Connection con = cons[index % cons.length]; 548 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 549 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 550 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 551 @Override 552 public void setStatus(final String msg) throws IOException { 553 LOG.info(msg); 554 } 555 }); 556 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 557 + "ms over " + threadOpts.perClientRunRows + " rows"); 558 if (opts.latencyThreshold > 0) { 559 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 560 + "(ms) is " + run.numbOfReplyOverThreshold); 561 } 562 return run; 563 } 564 }); 565 } 566 pool.shutdown(); 567 568 for (int i = 0; i < threads.length; i++) { 569 try { 570 results[i] = threads[i].get(); 571 } catch (ExecutionException e) { 572 throw new IOException(e.getCause()); 573 } 574 } 575 final String test = cmd.getSimpleName(); 576 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 577 Arrays.sort(results); 578 long total = 0; 579 float avgLatency = 0; 580 float avgTPS = 0; 581 long replicaWins = 0; 582 for (RunResult result : results) { 583 total += result.duration; 584 avgLatency += result.hist.getSnapshot().getMean(); 585 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 586 replicaWins += result.numOfReplyFromReplica; 587 } 588 avgTPS *= 1000; // ms to second 589 avgLatency = avgLatency / results.length; 590 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 591 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 592 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 593 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 594 if (opts.replicas > 1) { 595 LOG.info("[results from replica regions] " + replicaWins); 596 } 597 598 for (int i = 0; i < opts.connCount; i++) { 599 cons[i].close(); 600 asyncCons[i].close(); 601 } 602 603 return results; 604 } 605 606 /* 607 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 608 * out an input file with instruction per client regards which row they are to start on. 609 * @param cmd Command to run. 610 */ 611 static Job doMapReduce(TestOptions opts, final Configuration conf) 612 throws IOException, InterruptedException, ClassNotFoundException { 613 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 614 assert cmd != null; 615 Path inputDir = writeInputFile(conf, opts); 616 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 617 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 618 Job job = Job.getInstance(conf); 619 job.setJarByClass(PerformanceEvaluation.class); 620 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 621 622 job.setInputFormatClass(NLineInputFormat.class); 623 NLineInputFormat.setInputPaths(job, inputDir); 624 // this is default, but be explicit about it just in case. 625 NLineInputFormat.setNumLinesPerSplit(job, 1); 626 627 job.setOutputKeyClass(LongWritable.class); 628 job.setOutputValueClass(LongWritable.class); 629 630 job.setMapperClass(EvaluationMapTask.class); 631 job.setReducerClass(LongSumReducer.class); 632 633 job.setNumReduceTasks(1); 634 635 job.setOutputFormatClass(TextOutputFormat.class); 636 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 637 638 TableMapReduceUtil.addDependencyJars(job); 639 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 640 // metrics 641 Gson.class, // gson 642 FilterAllFilter.class // hbase-server tests jar 643 ); 644 645 TableMapReduceUtil.initCredentials(job); 646 647 job.waitForCompletion(true); 648 return job; 649 } 650 651 /** 652 * Each client has one mapper to do the work, and client do the resulting count in a map task. 653 */ 654 655 static String JOB_INPUT_FILENAME = "input.txt"; 656 657 /* 658 * Write input file of offsets-per-client for the mapreduce job. 659 * @param c Configuration 660 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 661 */ 662 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 663 return writeInputFile(c, opts, new Path(".")); 664 } 665 666 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 667 throws IOException { 668 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 669 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 670 Path inputDir = new Path(jobdir, "inputs"); 671 672 FileSystem fs = FileSystem.get(c); 673 fs.mkdirs(inputDir); 674 675 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 676 PrintStream out = new PrintStream(fs.create(inputFile)); 677 // Make input random. 678 Map<Integer, String> m = new TreeMap<>(); 679 Hash h = MurmurHash.getInstance(); 680 long perClientRows = (opts.totalRows / opts.numClientThreads); 681 try { 682 for (int j = 0; j < opts.numClientThreads; j++) { 683 TestOptions next = new TestOptions(opts); 684 next.startRow = j * perClientRows; 685 next.perClientRunRows = perClientRows; 686 String s = GSON.toJson(next); 687 LOG.info("Client=" + j + ", input=" + s); 688 byte[] b = Bytes.toBytes(s); 689 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 690 m.put(hash, s); 691 } 692 for (Map.Entry<Integer, String> e : m.entrySet()) { 693 out.println(e.getValue()); 694 } 695 } finally { 696 out.close(); 697 } 698 return inputDir; 699 } 700 701 /** 702 * Describes a command. 703 */ 704 static class CmdDescriptor { 705 private Class<? extends TestBase> cmdClass; 706 private String name; 707 private String description; 708 709 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 710 this.cmdClass = cmdClass; 711 this.name = name; 712 this.description = description; 713 } 714 715 public Class<? extends TestBase> getCmdClass() { 716 return cmdClass; 717 } 718 719 public String getName() { 720 return name; 721 } 722 723 public String getDescription() { 724 return description; 725 } 726 } 727 728 /** 729 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 730 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 731 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 732 * need to add to the clone constructor below copying your new option from the 'that' to the 733 * 'this'. Look for 'clone' below. 734 */ 735 static class TestOptions { 736 String cmdName = null; 737 boolean nomapred = false; 738 boolean filterAll = false; 739 long startRow = 0; 740 float size = 1.0f; 741 long perClientRunRows = DEFAULT_ROWS_PER_GB; 742 int numClientThreads = 1; 743 long totalRows = DEFAULT_ROWS_PER_GB; 744 int measureAfter = 0; 745 float sampleRate = 1.0f; 746 /** 747 * @deprecated Useless after switching to OpenTelemetry 748 */ 749 @Deprecated 750 double traceRate = 0.0; 751 String tableName = TABLE_NAME; 752 boolean flushCommits = true; 753 boolean writeToWAL = true; 754 boolean autoFlush = false; 755 boolean oneCon = false; 756 int connCount = -1; // wil decide the actual num later 757 boolean useTags = false; 758 int noOfTags = 1; 759 boolean reportLatency = false; 760 int multiGet = 0; 761 int multiPut = 0; 762 int randomSleep = 0; 763 boolean inMemoryCF = false; 764 int presplitRegions = 0; 765 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 766 String splitPolicy = null; 767 Compression.Algorithm compression = Compression.Algorithm.NONE; 768 String encryption = null; 769 BloomType bloomType = BloomType.ROW; 770 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 771 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 772 boolean valueRandom = false; 773 boolean valueZipf = false; 774 int valueSize = DEFAULT_VALUE_LENGTH; 775 long period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 776 int cycles = 1; 777 int columns = 1; 778 int families = 1; 779 int caching = 30; 780 int latencyThreshold = 0; // in millsecond 781 boolean addColumns = true; 782 MemoryCompactionPolicy inMemoryCompaction = 783 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 784 boolean asyncPrefetch = false; 785 boolean cacheBlocks = true; 786 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 787 long bufferSize = 2l * 1024l * 1024l; 788 Properties commandProperties; 789 790 public TestOptions() { 791 } 792 793 /** 794 * Clone constructor. 795 * @param that Object to copy from. 796 */ 797 public TestOptions(TestOptions that) { 798 this.cmdName = that.cmdName; 799 this.cycles = that.cycles; 800 this.nomapred = that.nomapred; 801 this.startRow = that.startRow; 802 this.size = that.size; 803 this.perClientRunRows = that.perClientRunRows; 804 this.numClientThreads = that.numClientThreads; 805 this.totalRows = that.totalRows; 806 this.sampleRate = that.sampleRate; 807 this.traceRate = that.traceRate; 808 this.tableName = that.tableName; 809 this.flushCommits = that.flushCommits; 810 this.writeToWAL = that.writeToWAL; 811 this.autoFlush = that.autoFlush; 812 this.oneCon = that.oneCon; 813 this.connCount = that.connCount; 814 this.useTags = that.useTags; 815 this.noOfTags = that.noOfTags; 816 this.reportLatency = that.reportLatency; 817 this.latencyThreshold = that.latencyThreshold; 818 this.multiGet = that.multiGet; 819 this.multiPut = that.multiPut; 820 this.inMemoryCF = that.inMemoryCF; 821 this.presplitRegions = that.presplitRegions; 822 this.replicas = that.replicas; 823 this.splitPolicy = that.splitPolicy; 824 this.compression = that.compression; 825 this.encryption = that.encryption; 826 this.blockEncoding = that.blockEncoding; 827 this.filterAll = that.filterAll; 828 this.bloomType = that.bloomType; 829 this.blockSize = that.blockSize; 830 this.valueRandom = that.valueRandom; 831 this.valueZipf = that.valueZipf; 832 this.valueSize = that.valueSize; 833 this.period = that.period; 834 this.randomSleep = that.randomSleep; 835 this.measureAfter = that.measureAfter; 836 this.addColumns = that.addColumns; 837 this.columns = that.columns; 838 this.families = that.families; 839 this.caching = that.caching; 840 this.inMemoryCompaction = that.inMemoryCompaction; 841 this.asyncPrefetch = that.asyncPrefetch; 842 this.cacheBlocks = that.cacheBlocks; 843 this.scanReadType = that.scanReadType; 844 this.bufferSize = that.bufferSize; 845 this.commandProperties = that.commandProperties; 846 } 847 848 public Properties getCommandProperties() { 849 return commandProperties; 850 } 851 852 public int getCaching() { 853 return this.caching; 854 } 855 856 public void setCaching(final int caching) { 857 this.caching = caching; 858 } 859 860 public int getColumns() { 861 return this.columns; 862 } 863 864 public void setColumns(final int columns) { 865 this.columns = columns; 866 } 867 868 public int getFamilies() { 869 return this.families; 870 } 871 872 public void setFamilies(final int families) { 873 this.families = families; 874 } 875 876 public int getCycles() { 877 return this.cycles; 878 } 879 880 public void setCycles(final int cycles) { 881 this.cycles = cycles; 882 } 883 884 public boolean isValueZipf() { 885 return valueZipf; 886 } 887 888 public void setValueZipf(boolean valueZipf) { 889 this.valueZipf = valueZipf; 890 } 891 892 public String getCmdName() { 893 return cmdName; 894 } 895 896 public void setCmdName(String cmdName) { 897 this.cmdName = cmdName; 898 } 899 900 public int getRandomSleep() { 901 return randomSleep; 902 } 903 904 public void setRandomSleep(int randomSleep) { 905 this.randomSleep = randomSleep; 906 } 907 908 public int getReplicas() { 909 return replicas; 910 } 911 912 public void setReplicas(int replicas) { 913 this.replicas = replicas; 914 } 915 916 public String getSplitPolicy() { 917 return splitPolicy; 918 } 919 920 public void setSplitPolicy(String splitPolicy) { 921 this.splitPolicy = splitPolicy; 922 } 923 924 public void setNomapred(boolean nomapred) { 925 this.nomapred = nomapred; 926 } 927 928 public void setFilterAll(boolean filterAll) { 929 this.filterAll = filterAll; 930 } 931 932 public void setStartRow(long startRow) { 933 this.startRow = startRow; 934 } 935 936 public void setSize(float size) { 937 this.size = size; 938 } 939 940 public void setPerClientRunRows(int perClientRunRows) { 941 this.perClientRunRows = perClientRunRows; 942 } 943 944 public void setNumClientThreads(int numClientThreads) { 945 this.numClientThreads = numClientThreads; 946 } 947 948 public void setTotalRows(long totalRows) { 949 this.totalRows = totalRows; 950 } 951 952 public void setSampleRate(float sampleRate) { 953 this.sampleRate = sampleRate; 954 } 955 956 public void setTraceRate(double traceRate) { 957 this.traceRate = traceRate; 958 } 959 960 public void setTableName(String tableName) { 961 this.tableName = tableName; 962 } 963 964 public void setFlushCommits(boolean flushCommits) { 965 this.flushCommits = flushCommits; 966 } 967 968 public void setWriteToWAL(boolean writeToWAL) { 969 this.writeToWAL = writeToWAL; 970 } 971 972 public void setAutoFlush(boolean autoFlush) { 973 this.autoFlush = autoFlush; 974 } 975 976 public void setOneCon(boolean oneCon) { 977 this.oneCon = oneCon; 978 } 979 980 public int getConnCount() { 981 return connCount; 982 } 983 984 public void setConnCount(int connCount) { 985 this.connCount = connCount; 986 } 987 988 public void setUseTags(boolean useTags) { 989 this.useTags = useTags; 990 } 991 992 public void setNoOfTags(int noOfTags) { 993 this.noOfTags = noOfTags; 994 } 995 996 public void setReportLatency(boolean reportLatency) { 997 this.reportLatency = reportLatency; 998 } 999 1000 public void setMultiGet(int multiGet) { 1001 this.multiGet = multiGet; 1002 } 1003 1004 public void setMultiPut(int multiPut) { 1005 this.multiPut = multiPut; 1006 } 1007 1008 public void setInMemoryCF(boolean inMemoryCF) { 1009 this.inMemoryCF = inMemoryCF; 1010 } 1011 1012 public void setPresplitRegions(int presplitRegions) { 1013 this.presplitRegions = presplitRegions; 1014 } 1015 1016 public void setCompression(Compression.Algorithm compression) { 1017 this.compression = compression; 1018 } 1019 1020 public void setEncryption(String encryption) { 1021 this.encryption = encryption; 1022 } 1023 1024 public void setBloomType(BloomType bloomType) { 1025 this.bloomType = bloomType; 1026 } 1027 1028 public void setBlockSize(int blockSize) { 1029 this.blockSize = blockSize; 1030 } 1031 1032 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 1033 this.blockEncoding = blockEncoding; 1034 } 1035 1036 public void setValueRandom(boolean valueRandom) { 1037 this.valueRandom = valueRandom; 1038 } 1039 1040 public void setValueSize(int valueSize) { 1041 this.valueSize = valueSize; 1042 } 1043 1044 public void setBufferSize(long bufferSize) { 1045 this.bufferSize = bufferSize; 1046 } 1047 1048 public void setPeriod(int period) { 1049 this.period = period; 1050 } 1051 1052 public boolean isNomapred() { 1053 return nomapred; 1054 } 1055 1056 public boolean isFilterAll() { 1057 return filterAll; 1058 } 1059 1060 public long getStartRow() { 1061 return startRow; 1062 } 1063 1064 public float getSize() { 1065 return size; 1066 } 1067 1068 public long getPerClientRunRows() { 1069 return perClientRunRows; 1070 } 1071 1072 public int getNumClientThreads() { 1073 return numClientThreads; 1074 } 1075 1076 public long getTotalRows() { 1077 return totalRows; 1078 } 1079 1080 public float getSampleRate() { 1081 return sampleRate; 1082 } 1083 1084 public double getTraceRate() { 1085 return traceRate; 1086 } 1087 1088 public String getTableName() { 1089 return tableName; 1090 } 1091 1092 public boolean isFlushCommits() { 1093 return flushCommits; 1094 } 1095 1096 public boolean isWriteToWAL() { 1097 return writeToWAL; 1098 } 1099 1100 public boolean isAutoFlush() { 1101 return autoFlush; 1102 } 1103 1104 public boolean isUseTags() { 1105 return useTags; 1106 } 1107 1108 public int getNoOfTags() { 1109 return noOfTags; 1110 } 1111 1112 public boolean isReportLatency() { 1113 return reportLatency; 1114 } 1115 1116 public int getMultiGet() { 1117 return multiGet; 1118 } 1119 1120 public int getMultiPut() { 1121 return multiPut; 1122 } 1123 1124 public boolean isInMemoryCF() { 1125 return inMemoryCF; 1126 } 1127 1128 public int getPresplitRegions() { 1129 return presplitRegions; 1130 } 1131 1132 public Compression.Algorithm getCompression() { 1133 return compression; 1134 } 1135 1136 public String getEncryption() { 1137 return encryption; 1138 } 1139 1140 public DataBlockEncoding getBlockEncoding() { 1141 return blockEncoding; 1142 } 1143 1144 public boolean isValueRandom() { 1145 return valueRandom; 1146 } 1147 1148 public int getValueSize() { 1149 return valueSize; 1150 } 1151 1152 public long getPeriod() { 1153 return period; 1154 } 1155 1156 public BloomType getBloomType() { 1157 return bloomType; 1158 } 1159 1160 public int getBlockSize() { 1161 return blockSize; 1162 } 1163 1164 public boolean isOneCon() { 1165 return oneCon; 1166 } 1167 1168 public int getMeasureAfter() { 1169 return measureAfter; 1170 } 1171 1172 public void setMeasureAfter(int measureAfter) { 1173 this.measureAfter = measureAfter; 1174 } 1175 1176 public boolean getAddColumns() { 1177 return addColumns; 1178 } 1179 1180 public void setAddColumns(boolean addColumns) { 1181 this.addColumns = addColumns; 1182 } 1183 1184 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1185 this.inMemoryCompaction = inMemoryCompaction; 1186 } 1187 1188 public MemoryCompactionPolicy getInMemoryCompaction() { 1189 return this.inMemoryCompaction; 1190 } 1191 1192 public long getBufferSize() { 1193 return this.bufferSize; 1194 } 1195 } 1196 1197 /* 1198 * A test. Subclass to particularize what happens per row. 1199 */ 1200 static abstract class TestBase { 1201 private final long everyN; 1202 1203 protected final Configuration conf; 1204 protected final TestOptions opts; 1205 1206 protected final Status status; 1207 1208 private String testName; 1209 protected Histogram latencyHistogram; 1210 private Histogram replicaLatencyHistogram; 1211 private Histogram valueSizeHistogram; 1212 private Histogram rpcCallsHistogram; 1213 private Histogram remoteRpcCallsHistogram; 1214 private Histogram millisBetweenNextHistogram; 1215 private Histogram regionsScannedHistogram; 1216 private Histogram bytesInResultsHistogram; 1217 private Histogram bytesInRemoteResultsHistogram; 1218 private RandomDistribution.Zipf zipf; 1219 private long numOfReplyOverLatencyThreshold = 0; 1220 private long numOfReplyFromReplica = 0; 1221 1222 /** 1223 * Note that all subclasses of this class must provide a public constructor that has the exact 1224 * same list of arguments. 1225 */ 1226 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1227 this.conf = conf; 1228 this.opts = options; 1229 this.status = status; 1230 this.testName = this.getClass().getSimpleName(); 1231 everyN = (long) (1 / opts.sampleRate); 1232 if (options.isValueZipf()) { 1233 this.zipf = 1234 new RandomDistribution.Zipf(ThreadLocalRandom.current(), 1, options.getValueSize(), 1.2); 1235 } 1236 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1237 } 1238 1239 int getValueLength() { 1240 if (this.opts.isValueRandom()) { 1241 return ThreadLocalRandom.current().nextInt(opts.valueSize); 1242 } else if (this.opts.isValueZipf()) { 1243 return Math.abs(this.zipf.nextInt()); 1244 } else { 1245 return opts.valueSize; 1246 } 1247 } 1248 1249 void updateValueSize(final Result[] rs) throws IOException { 1250 updateValueSize(rs, 0); 1251 } 1252 1253 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1254 if (rs == null || (latency == 0)) return; 1255 for (Result r : rs) 1256 updateValueSize(r, latency); 1257 } 1258 1259 void updateValueSize(final Result r) throws IOException { 1260 updateValueSize(r, 0); 1261 } 1262 1263 void updateValueSize(final Result r, final long latency) throws IOException { 1264 if (r == null || (latency == 0)) return; 1265 int size = 0; 1266 // update replicaHistogram 1267 if (r.isStale()) { 1268 replicaLatencyHistogram.update(latency / 1000); 1269 numOfReplyFromReplica++; 1270 } 1271 if (!isRandomValueSize()) return; 1272 1273 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1274 size += scanner.current().getValueLength(); 1275 } 1276 updateValueSize(size); 1277 } 1278 1279 void updateValueSize(final int valueSize) { 1280 if (!isRandomValueSize()) return; 1281 this.valueSizeHistogram.update(valueSize); 1282 } 1283 1284 void updateScanMetrics(final ScanMetrics metrics) { 1285 if (metrics == null) return; 1286 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1287 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1288 if (rpcCalls != null) { 1289 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1290 } 1291 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1292 if (remoteRpcCalls != null) { 1293 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1294 } 1295 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1296 if (millisBetweenNext != null) { 1297 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1298 } 1299 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1300 if (regionsScanned != null) { 1301 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1302 } 1303 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1304 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1305 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1306 } 1307 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1308 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1309 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1310 } 1311 } 1312 1313 String generateStatus(final long sr, final long i, final long lr) { 1314 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1315 + getShortLatencyReport() + "]" 1316 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1317 } 1318 1319 boolean isRandomValueSize() { 1320 return opts.valueRandom; 1321 } 1322 1323 protected long getReportingPeriod() { 1324 return opts.period; 1325 } 1326 1327 /** 1328 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1329 */ 1330 public Histogram getLatencyHistogram() { 1331 return latencyHistogram; 1332 } 1333 1334 void testSetup() throws IOException { 1335 // test metrics 1336 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1337 // If it is a replica test, set up histogram for replica. 1338 if (opts.replicas > 1) { 1339 replicaLatencyHistogram = 1340 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1341 } 1342 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1343 // scan metrics 1344 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1345 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1346 millisBetweenNextHistogram = 1347 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1348 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1349 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1350 bytesInRemoteResultsHistogram = 1351 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1352 1353 onStartup(); 1354 } 1355 1356 abstract void onStartup() throws IOException; 1357 1358 void testTakedown() throws IOException { 1359 onTakedown(); 1360 // Print all stats for this thread continuously. 1361 // Synchronize on Test.class so different threads don't intermingle the 1362 // output. We can't use 'this' here because each thread has its own instance of Test class. 1363 synchronized (Test.class) { 1364 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1365 status 1366 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1367 if (opts.replicas > 1) { 1368 status.setStatus("Latency (us) from Replica Regions: " 1369 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1370 } 1371 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1372 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1373 if (valueSizeHistogram.getCount() > 0) { 1374 status.setStatus( 1375 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1376 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1377 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1378 } else { 1379 status.setStatus("No valueSize statistics available"); 1380 } 1381 if (rpcCallsHistogram.getCount() > 0) { 1382 status.setStatus( 1383 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1384 } 1385 if (remoteRpcCallsHistogram.getCount() > 0) { 1386 status.setStatus("remoteRpcCalls (count): " 1387 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1388 } 1389 if (millisBetweenNextHistogram.getCount() > 0) { 1390 status.setStatus("millisBetweenNext (latency): " 1391 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1392 } 1393 if (regionsScannedHistogram.getCount() > 0) { 1394 status.setStatus("regionsScanned (count): " 1395 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1396 } 1397 if (bytesInResultsHistogram.getCount() > 0) { 1398 status.setStatus("bytesInResults (size): " 1399 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1400 } 1401 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1402 status.setStatus("bytesInRemoteResults (size): " 1403 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1404 } 1405 } 1406 } 1407 1408 abstract void onTakedown() throws IOException; 1409 1410 /* 1411 * Run test 1412 * @return Elapsed time. 1413 */ 1414 long test() throws IOException, InterruptedException { 1415 testSetup(); 1416 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1417 final long startTime = System.nanoTime(); 1418 try { 1419 testTimed(); 1420 } finally { 1421 testTakedown(); 1422 } 1423 return (System.nanoTime() - startTime) / 1000000; 1424 } 1425 1426 long getStartRow() { 1427 return opts.startRow; 1428 } 1429 1430 long getLastRow() { 1431 return getStartRow() + opts.perClientRunRows; 1432 } 1433 1434 /** 1435 * Provides an extension point for tests that don't want a per row invocation. 1436 */ 1437 void testTimed() throws IOException, InterruptedException { 1438 long startRow = getStartRow(); 1439 long lastRow = getLastRow(); 1440 // Report on completion of 1/10th of total. 1441 for (int ii = 0; ii < opts.cycles; ii++) { 1442 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1443 for (long i = startRow; i < lastRow; i++) { 1444 if (i % everyN != 0) continue; 1445 long startTime = System.nanoTime(); 1446 boolean requestSent = false; 1447 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1448 try (Scope scope = span.makeCurrent()) { 1449 requestSent = testRow(i, startTime); 1450 } finally { 1451 span.end(); 1452 } 1453 if ((i - startRow) > opts.measureAfter) { 1454 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1455 // first 9 times and sends the actual get request in the 10th iteration. 1456 // We should only set latency when actual request is sent because otherwise 1457 // it turns out to be 0. 1458 if (requestSent) { 1459 long latency = (System.nanoTime() - startTime) / 1000; 1460 latencyHistogram.update(latency); 1461 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1462 numOfReplyOverLatencyThreshold++; 1463 } 1464 } 1465 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1466 status.setStatus(generateStatus(startRow, i, lastRow)); 1467 } 1468 } 1469 } 1470 } 1471 } 1472 1473 /** Returns Subset of the histograms' calculation. */ 1474 public String getShortLatencyReport() { 1475 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1476 } 1477 1478 /** Returns Subset of the histograms' calculation. */ 1479 public String getShortValueSizeReport() { 1480 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1481 } 1482 1483 /** 1484 * Test for individual row. 1485 * @param i Row index. 1486 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1487 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1488 */ 1489 abstract boolean testRow(final long i, final long startTime) 1490 throws IOException, InterruptedException; 1491 } 1492 1493 static abstract class Test extends TestBase { 1494 protected Connection connection; 1495 1496 Test(final Connection con, final TestOptions options, final Status status) { 1497 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1498 this.connection = con; 1499 } 1500 } 1501 1502 static abstract class AsyncTest extends TestBase { 1503 protected AsyncConnection connection; 1504 1505 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1506 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1507 this.connection = con; 1508 } 1509 } 1510 1511 static abstract class TableTest extends Test { 1512 protected Table table; 1513 1514 TableTest(Connection con, TestOptions options, Status status) { 1515 super(con, options, status); 1516 } 1517 1518 @Override 1519 void onStartup() throws IOException { 1520 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1521 } 1522 1523 @Override 1524 void onTakedown() throws IOException { 1525 table.close(); 1526 } 1527 } 1528 1529 /* 1530 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1531 */ 1532 static abstract class MetaTest extends TableTest { 1533 protected int keyLength; 1534 1535 MetaTest(Connection con, TestOptions options, Status status) { 1536 super(con, options, status); 1537 keyLength = Long.toString(opts.perClientRunRows).length(); 1538 } 1539 1540 @Override 1541 void onTakedown() throws IOException { 1542 // No clean up 1543 } 1544 1545 /* 1546 * Generates Lexicographically ascending strings 1547 */ 1548 protected byte[] getSplitKey(final long i) { 1549 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1550 } 1551 1552 } 1553 1554 static abstract class AsyncTableTest extends AsyncTest { 1555 protected AsyncTable<?> table; 1556 1557 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1558 super(con, options, status); 1559 } 1560 1561 @Override 1562 void onStartup() throws IOException { 1563 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1564 } 1565 1566 @Override 1567 void onTakedown() throws IOException { 1568 } 1569 } 1570 1571 static class AsyncRandomReadTest extends AsyncTableTest { 1572 private final Consistency consistency; 1573 private ArrayList<Get> gets; 1574 1575 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1576 super(con, options, status); 1577 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1578 if (opts.multiGet > 0) { 1579 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1580 this.gets = new ArrayList<>(opts.multiGet); 1581 } 1582 } 1583 1584 @Override 1585 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 1586 if (opts.randomSleep > 0) { 1587 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1588 } 1589 Get get = new Get(getRandomRow(opts.totalRows)); 1590 for (int family = 0; family < opts.families; family++) { 1591 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1592 if (opts.addColumns) { 1593 for (int column = 0; column < opts.columns; column++) { 1594 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1595 get.addColumn(familyName, qualifier); 1596 } 1597 } else { 1598 get.addFamily(familyName); 1599 } 1600 } 1601 if (opts.filterAll) { 1602 get.setFilter(new FilterAllFilter()); 1603 } 1604 get.setConsistency(consistency); 1605 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1606 try { 1607 if (opts.multiGet > 0) { 1608 this.gets.add(get); 1609 if (this.gets.size() == opts.multiGet) { 1610 Result[] rs = 1611 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1612 updateValueSize(rs); 1613 this.gets.clear(); 1614 } else { 1615 return false; 1616 } 1617 } else { 1618 updateValueSize(this.table.get(get).get()); 1619 } 1620 } catch (ExecutionException e) { 1621 throw new IOException(e); 1622 } 1623 return true; 1624 } 1625 1626 public static RuntimeException runtime(Throwable e) { 1627 if (e instanceof RuntimeException) { 1628 return (RuntimeException) e; 1629 } 1630 return new RuntimeException(e); 1631 } 1632 1633 public static <V> V propagate(Callable<V> callable) { 1634 try { 1635 return callable.call(); 1636 } catch (Exception e) { 1637 throw runtime(e); 1638 } 1639 } 1640 1641 @Override 1642 protected long getReportingPeriod() { 1643 long period = opts.perClientRunRows / 10; 1644 return period == 0 ? opts.perClientRunRows : period; 1645 } 1646 1647 @Override 1648 protected void testTakedown() throws IOException { 1649 if (this.gets != null && this.gets.size() > 0) { 1650 this.table.get(gets); 1651 this.gets.clear(); 1652 } 1653 super.testTakedown(); 1654 } 1655 } 1656 1657 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1658 1659 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1660 super(con, options, status); 1661 } 1662 1663 @Override 1664 protected byte[] generateRow(final long i) { 1665 return getRandomRow(opts.totalRows); 1666 } 1667 } 1668 1669 static class AsyncScanTest extends AsyncTableTest { 1670 private ResultScanner testScanner; 1671 private AsyncTable<?> asyncTable; 1672 1673 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1674 super(con, options, status); 1675 } 1676 1677 @Override 1678 void onStartup() throws IOException { 1679 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1680 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1681 } 1682 1683 @Override 1684 void testTakedown() throws IOException { 1685 if (this.testScanner != null) { 1686 updateScanMetrics(this.testScanner.getScanMetrics()); 1687 this.testScanner.close(); 1688 } 1689 super.testTakedown(); 1690 } 1691 1692 @Override 1693 boolean testRow(final long i, final long startTime) throws IOException { 1694 if (this.testScanner == null) { 1695 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1696 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1697 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1698 for (int family = 0; family < opts.families; family++) { 1699 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1700 if (opts.addColumns) { 1701 for (int column = 0; column < opts.columns; column++) { 1702 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1703 scan.addColumn(familyName, qualifier); 1704 } 1705 } else { 1706 scan.addFamily(familyName); 1707 } 1708 } 1709 if (opts.filterAll) { 1710 scan.setFilter(new FilterAllFilter()); 1711 } 1712 this.testScanner = asyncTable.getScanner(scan); 1713 } 1714 Result r = testScanner.next(); 1715 updateValueSize(r); 1716 return true; 1717 } 1718 } 1719 1720 static class AsyncSequentialReadTest extends AsyncTableTest { 1721 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1722 super(con, options, status); 1723 } 1724 1725 @Override 1726 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 1727 Get get = new Get(format(i)); 1728 for (int family = 0; family < opts.families; family++) { 1729 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1730 if (opts.addColumns) { 1731 for (int column = 0; column < opts.columns; column++) { 1732 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1733 get.addColumn(familyName, qualifier); 1734 } 1735 } else { 1736 get.addFamily(familyName); 1737 } 1738 } 1739 if (opts.filterAll) { 1740 get.setFilter(new FilterAllFilter()); 1741 } 1742 try { 1743 updateValueSize(table.get(get).get()); 1744 } catch (ExecutionException e) { 1745 throw new IOException(e); 1746 } 1747 return true; 1748 } 1749 } 1750 1751 static class AsyncSequentialWriteTest extends AsyncTableTest { 1752 private ArrayList<Put> puts; 1753 1754 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1755 super(con, options, status); 1756 if (opts.multiPut > 0) { 1757 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1758 this.puts = new ArrayList<>(opts.multiPut); 1759 } 1760 } 1761 1762 protected byte[] generateRow(final long i) { 1763 return format(i); 1764 } 1765 1766 @Override 1767 @SuppressWarnings("ReturnValueIgnored") 1768 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 1769 byte[] row = generateRow(i); 1770 Put put = new Put(row); 1771 for (int family = 0; family < opts.families; family++) { 1772 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1773 for (int column = 0; column < opts.columns; column++) { 1774 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1775 byte[] value = generateData(getValueLength()); 1776 if (opts.useTags) { 1777 byte[] tag = generateData(TAG_LENGTH); 1778 Tag[] tags = new Tag[opts.noOfTags]; 1779 for (int n = 0; n < opts.noOfTags; n++) { 1780 Tag t = new ArrayBackedTag((byte) n, tag); 1781 tags[n] = t; 1782 } 1783 KeyValue kv = 1784 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1785 put.add(kv); 1786 updateValueSize(kv.getValueLength()); 1787 } else { 1788 put.addColumn(familyName, qualifier, value); 1789 updateValueSize(value.length); 1790 } 1791 } 1792 } 1793 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1794 try { 1795 table.put(put).get(); 1796 if (opts.multiPut > 0) { 1797 this.puts.add(put); 1798 if (this.puts.size() == opts.multiPut) { 1799 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1800 this.puts.clear(); 1801 } else { 1802 return false; 1803 } 1804 } else { 1805 table.put(put).get(); 1806 } 1807 } catch (ExecutionException e) { 1808 throw new IOException(e); 1809 } 1810 return true; 1811 } 1812 } 1813 1814 static abstract class BufferedMutatorTest extends Test { 1815 protected BufferedMutator mutator; 1816 protected Table table; 1817 1818 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1819 super(con, options, status); 1820 } 1821 1822 @Override 1823 void onStartup() throws IOException { 1824 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1825 p.writeBufferSize(opts.bufferSize); 1826 this.mutator = connection.getBufferedMutator(p); 1827 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1828 } 1829 1830 @Override 1831 void onTakedown() throws IOException { 1832 mutator.close(); 1833 table.close(); 1834 } 1835 } 1836 1837 static class RandomSeekScanTest extends TableTest { 1838 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1839 super(con, options, status); 1840 } 1841 1842 @Override 1843 boolean testRow(final long i, final long startTime) throws IOException { 1844 Scan scan = new Scan().withStartRow(getRandomRow(opts.totalRows)).setCaching(opts.caching) 1845 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1846 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1847 FilterList list = new FilterList(); 1848 for (int family = 0; family < opts.families; family++) { 1849 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1850 if (opts.addColumns) { 1851 for (int column = 0; column < opts.columns; column++) { 1852 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1853 scan.addColumn(familyName, qualifier); 1854 } 1855 } else { 1856 scan.addFamily(familyName); 1857 } 1858 } 1859 if (opts.filterAll) { 1860 list.addFilter(new FilterAllFilter()); 1861 } 1862 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1863 scan.setFilter(list); 1864 ResultScanner s = this.table.getScanner(scan); 1865 try { 1866 for (Result rr; (rr = s.next()) != null;) { 1867 updateValueSize(rr); 1868 } 1869 } finally { 1870 updateScanMetrics(s.getScanMetrics()); 1871 s.close(); 1872 } 1873 return true; 1874 } 1875 1876 @Override 1877 protected long getReportingPeriod() { 1878 long period = opts.perClientRunRows / 100; 1879 return period == 0 ? opts.perClientRunRows : period; 1880 } 1881 1882 } 1883 1884 static abstract class RandomScanWithRangeTest extends TableTest { 1885 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1886 super(con, options, status); 1887 } 1888 1889 @Override 1890 boolean testRow(final long i, final long startTime) throws IOException { 1891 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1892 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1893 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1894 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1895 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1896 for (int family = 0; family < opts.families; family++) { 1897 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1898 if (opts.addColumns) { 1899 for (int column = 0; column < opts.columns; column++) { 1900 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1901 scan.addColumn(familyName, qualifier); 1902 } 1903 } else { 1904 scan.addFamily(familyName); 1905 } 1906 } 1907 if (opts.filterAll) { 1908 scan.setFilter(new FilterAllFilter()); 1909 } 1910 Result r = null; 1911 int count = 0; 1912 ResultScanner s = this.table.getScanner(scan); 1913 try { 1914 for (; (r = s.next()) != null;) { 1915 updateValueSize(r); 1916 count++; 1917 } 1918 if (i % 100 == 0) { 1919 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1920 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1921 count)); 1922 } 1923 } finally { 1924 updateScanMetrics(s.getScanMetrics()); 1925 s.close(); 1926 } 1927 return true; 1928 } 1929 1930 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1931 1932 protected Pair<byte[], byte[]> generateStartAndStopRows(long maxRange) { 1933 long start = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % opts.totalRows; 1934 long stop = start + maxRange; 1935 return new Pair<>(format(start), format(stop)); 1936 } 1937 1938 @Override 1939 protected long getReportingPeriod() { 1940 long period = opts.perClientRunRows / 100; 1941 return period == 0 ? opts.perClientRunRows : period; 1942 } 1943 } 1944 1945 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1946 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1947 super(con, options, status); 1948 } 1949 1950 @Override 1951 protected Pair<byte[], byte[]> getStartAndStopRow() { 1952 return generateStartAndStopRows(10); 1953 } 1954 } 1955 1956 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1957 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1958 super(con, options, status); 1959 } 1960 1961 @Override 1962 protected Pair<byte[], byte[]> getStartAndStopRow() { 1963 return generateStartAndStopRows(100); 1964 } 1965 } 1966 1967 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1968 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1969 super(con, options, status); 1970 } 1971 1972 @Override 1973 protected Pair<byte[], byte[]> getStartAndStopRow() { 1974 return generateStartAndStopRows(1000); 1975 } 1976 } 1977 1978 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1979 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1980 super(con, options, status); 1981 } 1982 1983 @Override 1984 protected Pair<byte[], byte[]> getStartAndStopRow() { 1985 return generateStartAndStopRows(10000); 1986 } 1987 } 1988 1989 static class RandomReadTest extends TableTest { 1990 private final Consistency consistency; 1991 private ArrayList<Get> gets; 1992 1993 RandomReadTest(Connection con, TestOptions options, Status status) { 1994 super(con, options, status); 1995 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1996 if (opts.multiGet > 0) { 1997 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1998 this.gets = new ArrayList<>(opts.multiGet); 1999 } 2000 } 2001 2002 @Override 2003 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 2004 if (opts.randomSleep > 0) { 2005 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 2006 } 2007 Get get = new Get(getRandomRow(opts.totalRows)); 2008 for (int family = 0; family < opts.families; family++) { 2009 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2010 if (opts.addColumns) { 2011 for (int column = 0; column < opts.columns; column++) { 2012 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2013 get.addColumn(familyName, qualifier); 2014 } 2015 } else { 2016 get.addFamily(familyName); 2017 } 2018 } 2019 if (opts.filterAll) { 2020 get.setFilter(new FilterAllFilter()); 2021 } 2022 get.setConsistency(consistency); 2023 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 2024 if (opts.multiGet > 0) { 2025 this.gets.add(get); 2026 if (this.gets.size() == opts.multiGet) { 2027 Result[] rs = this.table.get(this.gets); 2028 if (opts.replicas > 1) { 2029 long latency = System.nanoTime() - startTime; 2030 updateValueSize(rs, latency); 2031 } else { 2032 updateValueSize(rs); 2033 } 2034 this.gets.clear(); 2035 } else { 2036 return false; 2037 } 2038 } else { 2039 if (opts.replicas > 1) { 2040 Result r = this.table.get(get); 2041 long latency = System.nanoTime() - startTime; 2042 updateValueSize(r, latency); 2043 } else { 2044 updateValueSize(this.table.get(get)); 2045 } 2046 } 2047 return true; 2048 } 2049 2050 @Override 2051 protected long getReportingPeriod() { 2052 long period = opts.perClientRunRows / 10; 2053 return period == 0 ? opts.perClientRunRows : period; 2054 } 2055 2056 @Override 2057 protected void testTakedown() throws IOException { 2058 if (this.gets != null && this.gets.size() > 0) { 2059 this.table.get(gets); 2060 this.gets.clear(); 2061 } 2062 super.testTakedown(); 2063 } 2064 } 2065 2066 /* 2067 * Send random reads against fake regions inserted by MetaWriteTest 2068 */ 2069 static class MetaRandomReadTest extends MetaTest { 2070 private RegionLocator regionLocator; 2071 2072 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2073 super(con, options, status); 2074 LOG.info("call getRegionLocation"); 2075 } 2076 2077 @Override 2078 void onStartup() throws IOException { 2079 super.onStartup(); 2080 this.regionLocator = connection.getRegionLocator(table.getName()); 2081 } 2082 2083 @Override 2084 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 2085 if (opts.randomSleep > 0) { 2086 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 2087 } 2088 HRegionLocation hRegionLocation = regionLocator.getRegionLocation( 2089 getSplitKey(ThreadLocalRandom.current().nextLong(opts.perClientRunRows)), true); 2090 LOG.debug("get location for region: " + hRegionLocation); 2091 return true; 2092 } 2093 2094 @Override 2095 protected long getReportingPeriod() { 2096 long period = opts.perClientRunRows / 10; 2097 return period == 0 ? opts.perClientRunRows : period; 2098 } 2099 2100 @Override 2101 protected void testTakedown() throws IOException { 2102 super.testTakedown(); 2103 } 2104 } 2105 2106 static class RandomWriteTest extends SequentialWriteTest { 2107 RandomWriteTest(Connection con, TestOptions options, Status status) { 2108 super(con, options, status); 2109 } 2110 2111 @Override 2112 protected byte[] generateRow(final long i) { 2113 return getRandomRow(opts.totalRows); 2114 } 2115 2116 } 2117 2118 static class RandomDeleteTest extends SequentialDeleteTest { 2119 RandomDeleteTest(Connection con, TestOptions options, Status status) { 2120 super(con, options, status); 2121 } 2122 2123 @Override 2124 protected byte[] generateRow(final long i) { 2125 return getRandomRow(opts.totalRows); 2126 } 2127 2128 } 2129 2130 static class ScanTest extends TableTest { 2131 private ResultScanner testScanner; 2132 2133 ScanTest(Connection con, TestOptions options, Status status) { 2134 super(con, options, status); 2135 } 2136 2137 @Override 2138 void testTakedown() throws IOException { 2139 if (this.testScanner != null) { 2140 this.testScanner.close(); 2141 } 2142 super.testTakedown(); 2143 } 2144 2145 @Override 2146 boolean testRow(final long i, final long startTime) throws IOException { 2147 if (this.testScanner == null) { 2148 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2149 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2150 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2151 for (int family = 0; family < opts.families; family++) { 2152 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2153 if (opts.addColumns) { 2154 for (int column = 0; column < opts.columns; column++) { 2155 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2156 scan.addColumn(familyName, qualifier); 2157 } 2158 } else { 2159 scan.addFamily(familyName); 2160 } 2161 } 2162 if (opts.filterAll) { 2163 scan.setFilter(new FilterAllFilter()); 2164 } 2165 this.testScanner = table.getScanner(scan); 2166 } 2167 Result r = testScanner.next(); 2168 updateValueSize(r); 2169 return true; 2170 } 2171 } 2172 2173 static class ReverseScanTest extends TableTest { 2174 private ResultScanner testScanner; 2175 2176 ReverseScanTest(Connection con, TestOptions options, Status status) { 2177 super(con, options, status); 2178 } 2179 2180 @Override 2181 void testTakedown() throws IOException { 2182 if (this.testScanner != null) { 2183 this.testScanner.close(); 2184 } 2185 super.testTakedown(); 2186 } 2187 2188 @Override 2189 boolean testRow(final long i, final long startTime) throws IOException { 2190 if (this.testScanner == null) { 2191 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2192 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2193 .setScanMetricsEnabled(true).setReversed(true); 2194 for (int family = 0; family < opts.families; family++) { 2195 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2196 if (opts.addColumns) { 2197 for (int column = 0; column < opts.columns; column++) { 2198 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2199 scan.addColumn(familyName, qualifier); 2200 } 2201 } else { 2202 scan.addFamily(familyName); 2203 } 2204 } 2205 if (opts.filterAll) { 2206 scan.setFilter(new FilterAllFilter()); 2207 } 2208 this.testScanner = table.getScanner(scan); 2209 } 2210 Result r = testScanner.next(); 2211 updateValueSize(r); 2212 return true; 2213 } 2214 } 2215 2216 /** 2217 * Base class for operations that are CAS-like; that read a value and then set it based off what 2218 * they read. In this category is increment, append, checkAndPut, etc. 2219 * <p> 2220 * These operations also want some concurrency going on. Usually when these tests run, they 2221 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2222 * same key space. We do this with our getStartRow and getLastRow overrides. 2223 */ 2224 static abstract class CASTableTest extends TableTest { 2225 private final byte[] qualifier; 2226 2227 CASTableTest(Connection con, TestOptions options, Status status) { 2228 super(con, options, status); 2229 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2230 } 2231 2232 byte[] getQualifier() { 2233 return this.qualifier; 2234 } 2235 2236 @Override 2237 long getStartRow() { 2238 return 0; 2239 } 2240 2241 @Override 2242 long getLastRow() { 2243 return opts.perClientRunRows; 2244 } 2245 } 2246 2247 static class IncrementTest extends CASTableTest { 2248 IncrementTest(Connection con, TestOptions options, Status status) { 2249 super(con, options, status); 2250 } 2251 2252 @Override 2253 boolean testRow(final long i, final long startTime) throws IOException { 2254 Increment increment = new Increment(format(i)); 2255 // unlike checkAndXXX tests, which make most sense to do on a single value, 2256 // if multiple families are specified for an increment test we assume it is 2257 // meant to raise the work factor 2258 for (int family = 0; family < opts.families; family++) { 2259 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2260 increment.addColumn(familyName, getQualifier(), 1l); 2261 } 2262 updateValueSize(this.table.increment(increment)); 2263 return true; 2264 } 2265 } 2266 2267 static class AppendTest extends CASTableTest { 2268 AppendTest(Connection con, TestOptions options, Status status) { 2269 super(con, options, status); 2270 } 2271 2272 @Override 2273 boolean testRow(final long i, final long startTime) throws IOException { 2274 byte[] bytes = format(i); 2275 Append append = new Append(bytes); 2276 // unlike checkAndXXX tests, which make most sense to do on a single value, 2277 // if multiple families are specified for an append test we assume it is 2278 // meant to raise the work factor 2279 for (int family = 0; family < opts.families; family++) { 2280 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2281 append.addColumn(familyName, getQualifier(), bytes); 2282 } 2283 updateValueSize(this.table.append(append)); 2284 return true; 2285 } 2286 } 2287 2288 static class CheckAndMutateTest extends CASTableTest { 2289 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2290 super(con, options, status); 2291 } 2292 2293 @Override 2294 boolean testRow(final long i, final long startTime) throws IOException { 2295 final byte[] bytes = format(i); 2296 // checkAndXXX tests operate on only a single value 2297 // Put a known value so when we go to check it, it is there. 2298 Put put = new Put(bytes); 2299 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2300 this.table.put(put); 2301 RowMutations mutations = new RowMutations(bytes); 2302 mutations.add(put); 2303 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2304 .thenMutate(mutations); 2305 return true; 2306 } 2307 } 2308 2309 static class CheckAndPutTest extends CASTableTest { 2310 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2311 super(con, options, status); 2312 } 2313 2314 @Override 2315 boolean testRow(final long i, final long startTime) throws IOException { 2316 final byte[] bytes = format(i); 2317 // checkAndXXX tests operate on only a single value 2318 // Put a known value so when we go to check it, it is there. 2319 Put put = new Put(bytes); 2320 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2321 this.table.put(put); 2322 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2323 .thenPut(put); 2324 return true; 2325 } 2326 } 2327 2328 static class CheckAndDeleteTest extends CASTableTest { 2329 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2330 super(con, options, status); 2331 } 2332 2333 @Override 2334 boolean testRow(final long i, final long startTime) throws IOException { 2335 final byte[] bytes = format(i); 2336 // checkAndXXX tests operate on only a single value 2337 // Put a known value so when we go to check it, it is there. 2338 Put put = new Put(bytes); 2339 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2340 this.table.put(put); 2341 Delete delete = new Delete(put.getRow()); 2342 delete.addColumn(FAMILY_ZERO, getQualifier()); 2343 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2344 .thenDelete(delete); 2345 return true; 2346 } 2347 } 2348 2349 /* 2350 * Delete all fake regions inserted to meta table by MetaWriteTest. 2351 */ 2352 static class CleanMetaTest extends MetaTest { 2353 CleanMetaTest(Connection con, TestOptions options, Status status) { 2354 super(con, options, status); 2355 } 2356 2357 @Override 2358 boolean testRow(final long i, final long startTime) throws IOException { 2359 try { 2360 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2361 .getRegionLocation(getSplitKey(i), false).getRegion(); 2362 LOG.debug("deleting region from meta: " + regionInfo); 2363 2364 Delete delete = 2365 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2366 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2367 t.delete(delete); 2368 } 2369 } catch (IOException ie) { 2370 // Log and continue 2371 LOG.error("cannot find region with start key: " + i); 2372 } 2373 return true; 2374 } 2375 } 2376 2377 static class SequentialReadTest extends TableTest { 2378 SequentialReadTest(Connection con, TestOptions options, Status status) { 2379 super(con, options, status); 2380 } 2381 2382 @Override 2383 boolean testRow(final long i, final long startTime) throws IOException { 2384 Get get = new Get(format(i)); 2385 for (int family = 0; family < opts.families; family++) { 2386 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2387 if (opts.addColumns) { 2388 for (int column = 0; column < opts.columns; column++) { 2389 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2390 get.addColumn(familyName, qualifier); 2391 } 2392 } else { 2393 get.addFamily(familyName); 2394 } 2395 } 2396 if (opts.filterAll) { 2397 get.setFilter(new FilterAllFilter()); 2398 } 2399 updateValueSize(table.get(get)); 2400 return true; 2401 } 2402 } 2403 2404 static class SequentialWriteTest extends BufferedMutatorTest { 2405 private ArrayList<Put> puts; 2406 2407 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2408 super(con, options, status); 2409 if (opts.multiPut > 0) { 2410 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2411 this.puts = new ArrayList<>(opts.multiPut); 2412 } 2413 } 2414 2415 protected byte[] generateRow(final long i) { 2416 return format(i); 2417 } 2418 2419 @Override 2420 boolean testRow(final long i, final long startTime) throws IOException { 2421 byte[] row = generateRow(i); 2422 Put put = new Put(row); 2423 for (int family = 0; family < opts.families; family++) { 2424 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2425 for (int column = 0; column < opts.columns; column++) { 2426 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2427 byte[] value = generateData(getValueLength()); 2428 if (opts.useTags) { 2429 byte[] tag = generateData(TAG_LENGTH); 2430 Tag[] tags = new Tag[opts.noOfTags]; 2431 for (int n = 0; n < opts.noOfTags; n++) { 2432 Tag t = new ArrayBackedTag((byte) n, tag); 2433 tags[n] = t; 2434 } 2435 KeyValue kv = 2436 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2437 put.add(kv); 2438 updateValueSize(kv.getValueLength()); 2439 } else { 2440 put.addColumn(familyName, qualifier, value); 2441 updateValueSize(value.length); 2442 } 2443 } 2444 } 2445 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2446 if (opts.autoFlush) { 2447 if (opts.multiPut > 0) { 2448 this.puts.add(put); 2449 if (this.puts.size() == opts.multiPut) { 2450 table.put(this.puts); 2451 this.puts.clear(); 2452 } else { 2453 return false; 2454 } 2455 } else { 2456 table.put(put); 2457 } 2458 } else { 2459 mutator.mutate(put); 2460 } 2461 return true; 2462 } 2463 } 2464 2465 static class SequentialDeleteTest extends BufferedMutatorTest { 2466 2467 SequentialDeleteTest(Connection con, TestOptions options, Status status) { 2468 super(con, options, status); 2469 } 2470 2471 protected byte[] generateRow(final long i) { 2472 return format(i); 2473 } 2474 2475 @Override 2476 boolean testRow(final long i, final long startTime) throws IOException { 2477 byte[] row = generateRow(i); 2478 Delete delete = new Delete(row); 2479 for (int family = 0; family < opts.families; family++) { 2480 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2481 delete.addFamily(familyName); 2482 } 2483 delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2484 if (opts.autoFlush) { 2485 table.delete(delete); 2486 } else { 2487 mutator.mutate(delete); 2488 } 2489 return true; 2490 } 2491 } 2492 2493 /* 2494 * Insert fake regions into meta table with contiguous split keys. 2495 */ 2496 static class MetaWriteTest extends MetaTest { 2497 2498 MetaWriteTest(Connection con, TestOptions options, Status status) { 2499 super(con, options, status); 2500 } 2501 2502 @Override 2503 boolean testRow(final long i, final long startTime) throws IOException { 2504 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2505 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2506 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2507 regionInfos.add(regionInfo); 2508 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2509 2510 // write the serverName columns 2511 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2512 ServerName.valueOf("localhost", 60010, ThreadLocalRandom.current().nextLong()), i, 2513 EnvironmentEdgeManager.currentTime()); 2514 return true; 2515 } 2516 } 2517 2518 static class FilteredScanTest extends TableTest { 2519 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2520 2521 FilteredScanTest(Connection con, TestOptions options, Status status) { 2522 super(con, options, status); 2523 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2524 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2525 + ". This could take a very long time."); 2526 } 2527 } 2528 2529 @Override 2530 boolean testRow(long i, final long startTime) throws IOException { 2531 byte[] value = generateData(getValueLength()); 2532 Scan scan = constructScan(value); 2533 ResultScanner scanner = null; 2534 try { 2535 scanner = this.table.getScanner(scan); 2536 for (Result r = null; (r = scanner.next()) != null;) { 2537 updateValueSize(r); 2538 } 2539 } finally { 2540 if (scanner != null) { 2541 updateScanMetrics(scanner.getScanMetrics()); 2542 scanner.close(); 2543 } 2544 } 2545 return true; 2546 } 2547 2548 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2549 FilterList list = new FilterList(); 2550 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2551 new BinaryComparator(valuePrefix)); 2552 list.addFilter(filter); 2553 if (opts.filterAll) { 2554 list.addFilter(new FilterAllFilter()); 2555 } 2556 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2557 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2558 .setScanMetricsEnabled(true); 2559 if (opts.addColumns) { 2560 for (int column = 0; column < opts.columns; column++) { 2561 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2562 scan.addColumn(FAMILY_ZERO, qualifier); 2563 } 2564 } else { 2565 scan.addFamily(FAMILY_ZERO); 2566 } 2567 scan.setFilter(list); 2568 return scan; 2569 } 2570 } 2571 2572 /** 2573 * Compute a throughput rate in MB/s. 2574 * @param rows Number of records consumed. 2575 * @param timeMs Time taken in milliseconds. 2576 * @return String value with label, ie '123.76 MB/s' 2577 */ 2578 private static String calculateMbps(long rows, long timeMs, final int valueSize, int families, 2579 int columns) { 2580 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2581 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2582 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2583 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2584 return FMT.format(mbps) + " MB/s"; 2585 } 2586 2587 /* 2588 * Format passed integer. 2589 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does 2590 * absolute in case number is negative). 2591 */ 2592 public static byte[] format(final long number) { 2593 byte[] b = new byte[ROW_LENGTH]; 2594 long d = Math.abs(number); 2595 for (int i = b.length - 1; i >= 0; i--) { 2596 b[i] = (byte) ((d % 10) + '0'); 2597 d /= 10; 2598 } 2599 return b; 2600 } 2601 2602 /* 2603 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2604 * test, generation of the key and value consumes about 30% of CPU time. 2605 * @return Generated random value to insert into a table cell. 2606 */ 2607 public static byte[] generateData(int length) { 2608 byte[] b = new byte[length]; 2609 int i; 2610 2611 Random r = ThreadLocalRandom.current(); 2612 for (i = 0; i < (length - 8); i += 8) { 2613 b[i] = (byte) (65 + r.nextInt(26)); 2614 b[i + 1] = b[i]; 2615 b[i + 2] = b[i]; 2616 b[i + 3] = b[i]; 2617 b[i + 4] = b[i]; 2618 b[i + 5] = b[i]; 2619 b[i + 6] = b[i]; 2620 b[i + 7] = b[i]; 2621 } 2622 2623 byte a = (byte) (65 + r.nextInt(26)); 2624 for (; i < length; i++) { 2625 b[i] = a; 2626 } 2627 return b; 2628 } 2629 2630 static byte[] getRandomRow(final long totalRows) { 2631 return format(generateRandomRow(totalRows)); 2632 } 2633 2634 static long generateRandomRow(final long totalRows) { 2635 return ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % totalRows; 2636 } 2637 2638 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2639 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2640 throws IOException, InterruptedException { 2641 status.setStatus( 2642 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2643 long totalElapsedTime; 2644 2645 final TestBase t; 2646 try { 2647 if (AsyncTest.class.isAssignableFrom(cmd)) { 2648 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2649 Constructor<? extends AsyncTest> constructor = 2650 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2651 t = constructor.newInstance(asyncCon, opts, status); 2652 } else { 2653 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2654 Constructor<? extends Test> constructor = 2655 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2656 t = constructor.newInstance(con, opts, status); 2657 } 2658 } catch (NoSuchMethodException e) { 2659 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2660 + ". It does not provide a constructor as described by " 2661 + "the javadoc comment. Available constructors are: " 2662 + Arrays.toString(cmd.getConstructors())); 2663 } catch (Exception e) { 2664 throw new IllegalStateException("Failed to construct command class", e); 2665 } 2666 totalElapsedTime = t.test(); 2667 2668 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2669 + " for " + opts.perClientRunRows + " rows" + " (" 2670 + calculateMbps((long) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2671 getAverageValueLength(opts), opts.families, opts.columns) 2672 + ")"); 2673 2674 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2675 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2676 } 2677 2678 private static int getAverageValueLength(final TestOptions opts) { 2679 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2680 } 2681 2682 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2683 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2684 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2685 // the TestOptions introspection for us and dump the output in a readable format. 2686 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2687 Admin admin = null; 2688 Connection connection = null; 2689 try { 2690 connection = ConnectionFactory.createConnection(getConf()); 2691 admin = connection.getAdmin(); 2692 checkTable(admin, opts); 2693 } finally { 2694 if (admin != null) admin.close(); 2695 if (connection != null) connection.close(); 2696 } 2697 if (opts.nomapred) { 2698 doLocalClients(opts, getConf()); 2699 } else { 2700 doMapReduce(opts, getConf()); 2701 } 2702 } 2703 2704 protected void printUsage() { 2705 printUsage(PE_COMMAND_SHORTNAME, null); 2706 } 2707 2708 protected static void printUsage(final String message) { 2709 printUsage(PE_COMMAND_SHORTNAME, message); 2710 } 2711 2712 protected static void printUsageAndExit(final String message, final int exitCode) { 2713 printUsage(message); 2714 System.exit(exitCode); 2715 } 2716 2717 protected static void printUsage(final String shortName, final String message) { 2718 if (message != null && message.length() > 0) { 2719 System.err.println(message); 2720 } 2721 System.err.print("Usage: hbase " + shortName); 2722 System.err.println(" <OPTIONS> [-D<property=value>]* <command|class> <nclients>"); 2723 System.err.println(); 2724 System.err.println("General Options:"); 2725 System.err.println( 2726 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2727 System.err 2728 .println(" oneCon all the threads share the same connection. Default: False"); 2729 System.err.println(" connCount connections all threads share. " 2730 + "For example, if set to 2, then all thread share 2 connection. " 2731 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2732 + "if not, connCount=thread number"); 2733 2734 System.err.println(" sampleRate Execute test on a sample of total rows. Default: 1.0"); 2735 System.err.println(" period Report every 'period' rows: " 2736 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2737 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2738 System.err.println( 2739 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2740 System.err.println(" latency Set to report operation latencies. Default: False"); 2741 System.err.println(" latencyThreshold Set to report number of operations with latency " 2742 + "over lantencyThreshold, unit in millisecond, default 0"); 2743 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2744 + " rows have been treated. Default: 0"); 2745 System.err 2746 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2747 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2748 + "'valueSize'; set on read for stats on size: Default: Not set."); 2749 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2750 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2751 System.err.println(); 2752 System.err.println("Table Creation / Write Tests:"); 2753 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2754 System.err.println( 2755 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2756 + ". In case of randomReads and randomSeekScans this could" 2757 + " be specified along with --size to specify the number of rows to be scanned within" 2758 + " the total range specified by the size."); 2759 System.err.println( 2760 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2761 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2762 + " use size to specify the end range and --rows" 2763 + " specifies the number of rows within that range. " + "Default: 1.0."); 2764 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2765 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2766 System.err.println( 2767 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2768 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2769 + "'valueSize' in zipf form: Default: Not set."); 2770 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2771 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2772 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2773 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2774 System.err.println(" presplit Create presplit table. If a table with same name exists," 2775 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2776 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2777 System.err.println( 2778 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2779 System.err.println(" numoftags Specify the no of tags that would be needed. " 2780 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2781 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2782 System.err.println(" columns Columns to write per row. Default: 1"); 2783 System.err 2784 .println(" families Specify number of column families for the table. Default: 1"); 2785 System.err.println(); 2786 System.err.println("Read Tests:"); 2787 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2788 + " there by not returning any thing back to the client. Helps to check the server side" 2789 + " performance. Uses FilterAllFilter internally. "); 2790 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2791 + "by randomRead. Default: disabled"); 2792 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2793 + "inmemory as far as possible. Not guaranteed that reads are always served " 2794 + "from memory. Default: false"); 2795 System.err 2796 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2797 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2798 System.err 2799 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2800 + "Uses the CompactingMemstore"); 2801 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2802 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2803 System.err.println( 2804 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2805 System.err.println(" caching Scan caching to use. Default: 30"); 2806 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2807 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2808 System.err.println( 2809 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2810 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2811 System.err.println(); 2812 System.err.println(" Note: -D properties will be applied to the conf used. "); 2813 System.err.println(" For example: "); 2814 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2815 System.err.println(" -Dmapreduce.task.timeout=60000"); 2816 System.err.println(); 2817 System.err.println("Command:"); 2818 for (CmdDescriptor command : COMMANDS.values()) { 2819 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2820 } 2821 System.err.println(); 2822 System.err.println("Class:"); 2823 System.err.println("To run any custom implementation of PerformanceEvaluation.Test, " 2824 + "provide the classname of the implementaion class in place of " 2825 + "command name and it will be loaded at runtime from classpath.:"); 2826 System.err.println("Please consider to contribute back " 2827 + "this custom test impl into a builtin PE command for the benefit of the community"); 2828 System.err.println(); 2829 System.err.println("Args:"); 2830 System.err.println(" nclients Integer. Required. Total number of clients " 2831 + "(and HRegionServers) running. 1 <= value <= 500"); 2832 System.err.println("Examples:"); 2833 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2834 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2835 System.err.println(" To run 10 clients doing increments over ten rows:"); 2836 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2837 } 2838 2839 /** 2840 * Parse options passed in via an arguments array. Assumes that array has been split on 2841 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2842 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2843 * arguments. 2844 */ 2845 static TestOptions parseOpts(Queue<String> args) { 2846 TestOptions opts = new TestOptions(); 2847 2848 String cmd = null; 2849 while ((cmd = args.poll()) != null) { 2850 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2851 // place item back onto queue so that caller knows parsing was incomplete 2852 args.add(cmd); 2853 break; 2854 } 2855 2856 final String nmr = "--nomapred"; 2857 if (cmd.startsWith(nmr)) { 2858 opts.nomapred = true; 2859 continue; 2860 } 2861 2862 final String rows = "--rows="; 2863 if (cmd.startsWith(rows)) { 2864 opts.perClientRunRows = Long.parseLong(cmd.substring(rows.length())); 2865 continue; 2866 } 2867 2868 final String cycles = "--cycles="; 2869 if (cmd.startsWith(cycles)) { 2870 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2871 continue; 2872 } 2873 2874 final String sampleRate = "--sampleRate="; 2875 if (cmd.startsWith(sampleRate)) { 2876 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2877 continue; 2878 } 2879 2880 final String table = "--table="; 2881 if (cmd.startsWith(table)) { 2882 opts.tableName = cmd.substring(table.length()); 2883 continue; 2884 } 2885 2886 final String startRow = "--startRow="; 2887 if (cmd.startsWith(startRow)) { 2888 opts.startRow = Long.parseLong(cmd.substring(startRow.length())); 2889 continue; 2890 } 2891 2892 final String compress = "--compress="; 2893 if (cmd.startsWith(compress)) { 2894 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2895 continue; 2896 } 2897 2898 final String encryption = "--encryption="; 2899 if (cmd.startsWith(encryption)) { 2900 opts.encryption = cmd.substring(encryption.length()); 2901 continue; 2902 } 2903 2904 final String traceRate = "--traceRate="; 2905 if (cmd.startsWith(traceRate)) { 2906 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2907 continue; 2908 } 2909 2910 final String blockEncoding = "--blockEncoding="; 2911 if (cmd.startsWith(blockEncoding)) { 2912 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2913 continue; 2914 } 2915 2916 final String flushCommits = "--flushCommits="; 2917 if (cmd.startsWith(flushCommits)) { 2918 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2919 continue; 2920 } 2921 2922 final String writeToWAL = "--writeToWAL="; 2923 if (cmd.startsWith(writeToWAL)) { 2924 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2925 continue; 2926 } 2927 2928 final String presplit = "--presplit="; 2929 if (cmd.startsWith(presplit)) { 2930 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2931 continue; 2932 } 2933 2934 final String inMemory = "--inmemory="; 2935 if (cmd.startsWith(inMemory)) { 2936 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2937 continue; 2938 } 2939 2940 final String autoFlush = "--autoFlush="; 2941 if (cmd.startsWith(autoFlush)) { 2942 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2943 continue; 2944 } 2945 2946 final String onceCon = "--oneCon="; 2947 if (cmd.startsWith(onceCon)) { 2948 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2949 continue; 2950 } 2951 2952 final String connCount = "--connCount="; 2953 if (cmd.startsWith(connCount)) { 2954 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2955 continue; 2956 } 2957 2958 final String latencyThreshold = "--latencyThreshold="; 2959 if (cmd.startsWith(latencyThreshold)) { 2960 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2961 continue; 2962 } 2963 2964 final String latency = "--latency"; 2965 if (cmd.startsWith(latency)) { 2966 opts.reportLatency = true; 2967 continue; 2968 } 2969 2970 final String multiGet = "--multiGet="; 2971 if (cmd.startsWith(multiGet)) { 2972 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2973 continue; 2974 } 2975 2976 final String multiPut = "--multiPut="; 2977 if (cmd.startsWith(multiPut)) { 2978 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2979 continue; 2980 } 2981 2982 final String useTags = "--usetags="; 2983 if (cmd.startsWith(useTags)) { 2984 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2985 continue; 2986 } 2987 2988 final String noOfTags = "--numoftags="; 2989 if (cmd.startsWith(noOfTags)) { 2990 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2991 continue; 2992 } 2993 2994 final String replicas = "--replicas="; 2995 if (cmd.startsWith(replicas)) { 2996 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2997 continue; 2998 } 2999 3000 final String filterOutAll = "--filterAll"; 3001 if (cmd.startsWith(filterOutAll)) { 3002 opts.filterAll = true; 3003 continue; 3004 } 3005 3006 final String size = "--size="; 3007 if (cmd.startsWith(size)) { 3008 opts.size = Float.parseFloat(cmd.substring(size.length())); 3009 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 3010 continue; 3011 } 3012 3013 final String splitPolicy = "--splitPolicy="; 3014 if (cmd.startsWith(splitPolicy)) { 3015 opts.splitPolicy = cmd.substring(splitPolicy.length()); 3016 continue; 3017 } 3018 3019 final String randomSleep = "--randomSleep="; 3020 if (cmd.startsWith(randomSleep)) { 3021 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 3022 continue; 3023 } 3024 3025 final String measureAfter = "--measureAfter="; 3026 if (cmd.startsWith(measureAfter)) { 3027 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 3028 continue; 3029 } 3030 3031 final String bloomFilter = "--bloomFilter="; 3032 if (cmd.startsWith(bloomFilter)) { 3033 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 3034 continue; 3035 } 3036 3037 final String blockSize = "--blockSize="; 3038 if (cmd.startsWith(blockSize)) { 3039 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 3040 continue; 3041 } 3042 3043 final String valueSize = "--valueSize="; 3044 if (cmd.startsWith(valueSize)) { 3045 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 3046 continue; 3047 } 3048 3049 final String valueRandom = "--valueRandom"; 3050 if (cmd.startsWith(valueRandom)) { 3051 opts.valueRandom = true; 3052 continue; 3053 } 3054 3055 final String valueZipf = "--valueZipf"; 3056 if (cmd.startsWith(valueZipf)) { 3057 opts.valueZipf = true; 3058 continue; 3059 } 3060 3061 final String period = "--period="; 3062 if (cmd.startsWith(period)) { 3063 opts.period = Integer.parseInt(cmd.substring(period.length())); 3064 continue; 3065 } 3066 3067 final String addColumns = "--addColumns="; 3068 if (cmd.startsWith(addColumns)) { 3069 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 3070 continue; 3071 } 3072 3073 final String inMemoryCompaction = "--inmemoryCompaction="; 3074 if (cmd.startsWith(inMemoryCompaction)) { 3075 opts.inMemoryCompaction = 3076 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 3077 continue; 3078 } 3079 3080 final String columns = "--columns="; 3081 if (cmd.startsWith(columns)) { 3082 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 3083 continue; 3084 } 3085 3086 final String families = "--families="; 3087 if (cmd.startsWith(families)) { 3088 opts.families = Integer.parseInt(cmd.substring(families.length())); 3089 continue; 3090 } 3091 3092 final String caching = "--caching="; 3093 if (cmd.startsWith(caching)) { 3094 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 3095 continue; 3096 } 3097 3098 final String asyncPrefetch = "--asyncPrefetch"; 3099 if (cmd.startsWith(asyncPrefetch)) { 3100 opts.asyncPrefetch = true; 3101 continue; 3102 } 3103 3104 final String cacheBlocks = "--cacheBlocks="; 3105 if (cmd.startsWith(cacheBlocks)) { 3106 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 3107 continue; 3108 } 3109 3110 final String scanReadType = "--scanReadType="; 3111 if (cmd.startsWith(scanReadType)) { 3112 opts.scanReadType = 3113 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 3114 continue; 3115 } 3116 3117 final String bufferSize = "--bufferSize="; 3118 if (cmd.startsWith(bufferSize)) { 3119 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 3120 continue; 3121 } 3122 3123 final String commandPropertiesFile = "--commandPropertiesFile="; 3124 if (cmd.startsWith(commandPropertiesFile)) { 3125 String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length())); 3126 Properties properties = new Properties(); 3127 try { 3128 properties 3129 .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName)); 3130 opts.commandProperties = properties; 3131 } catch (IOException e) { 3132 LOG.error("Failed to load metricIds from properties file", e); 3133 } 3134 continue; 3135 } 3136 3137 validateParsedOpts(opts); 3138 3139 if (isCommandClass(cmd)) { 3140 opts.cmdName = cmd; 3141 try { 3142 opts.numClientThreads = Integer.parseInt(args.remove()); 3143 } catch (NoSuchElementException | NumberFormatException e) { 3144 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 3145 } 3146 opts = calculateRowsAndSize(opts); 3147 break; 3148 } else { 3149 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 3150 } 3151 3152 // Not matching any option or command. 3153 System.err.println("Error: Wrong option or command: " + cmd); 3154 args.add(cmd); 3155 break; 3156 } 3157 return opts; 3158 } 3159 3160 /** 3161 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3162 */ 3163 private static void validateParsedOpts(TestOptions opts) { 3164 3165 if (!opts.autoFlush && opts.multiPut > 0) { 3166 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3167 } 3168 3169 if (opts.oneCon && opts.connCount > 1) { 3170 throw new IllegalArgumentException( 3171 "oneCon is set to true, " + "connCount should not bigger than 1"); 3172 } 3173 3174 if (opts.valueZipf && opts.valueRandom) { 3175 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3176 } 3177 } 3178 3179 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3180 int rowsPerGB = getRowsPerGB(opts); 3181 if ( 3182 (opts.getCmdName() != null 3183 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3184 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3185 ) { 3186 opts.totalRows = (long) (opts.size * rowsPerGB); 3187 } else if (opts.size != DEFAULT_OPTS.size) { 3188 // total size in GB specified 3189 opts.totalRows = (long) (opts.size * rowsPerGB); 3190 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3191 } else { 3192 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3193 // Cast to float to ensure floating-point division 3194 opts.size = (float) opts.totalRows / rowsPerGB; 3195 } 3196 return opts; 3197 } 3198 3199 static int getRowsPerGB(final TestOptions opts) { 3200 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3201 * opts.getColumns()); 3202 } 3203 3204 @Override 3205 public int run(String[] args) throws Exception { 3206 // Process command-line args. TODO: Better cmd-line processing 3207 // (but hopefully something not as painful as cli options). 3208 int errCode = -1; 3209 if (args.length < 1) { 3210 printUsage(); 3211 return errCode; 3212 } 3213 3214 try { 3215 LinkedList<String> argv = new LinkedList<>(); 3216 argv.addAll(Arrays.asList(args)); 3217 TestOptions opts = parseOpts(argv); 3218 3219 // args remaining, print help and exit 3220 if (!argv.isEmpty()) { 3221 errCode = 0; 3222 printUsage(); 3223 return errCode; 3224 } 3225 3226 // must run at least 1 client 3227 if (opts.numClientThreads <= 0) { 3228 throw new IllegalArgumentException("Number of clients must be > 0"); 3229 } 3230 3231 // cmdName should not be null, print help and exit 3232 if (opts.cmdName == null) { 3233 printUsage(); 3234 return errCode; 3235 } 3236 3237 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3238 if (cmdClass != null) { 3239 runTest(cmdClass, opts); 3240 errCode = 0; 3241 } 3242 3243 } catch (Exception e) { 3244 e.printStackTrace(); 3245 } 3246 3247 return errCode; 3248 } 3249 3250 private static boolean isCommandClass(String cmd) { 3251 return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd); 3252 } 3253 3254 private static boolean isCustomTestClass(String cmd) { 3255 Class<? extends Test> cmdClass; 3256 try { 3257 cmdClass = 3258 (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd); 3259 addCommandDescriptor(cmdClass, cmd, "custom command"); 3260 return true; 3261 } catch (Throwable th) { 3262 LOG.info("No class found for command: " + cmd, th); 3263 return false; 3264 } 3265 } 3266 3267 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3268 CmdDescriptor descriptor = COMMANDS.get(cmd); 3269 return descriptor != null ? descriptor.getCmdClass() : null; 3270 } 3271 3272 public static void main(final String[] args) throws Exception { 3273 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3274 System.exit(res); 3275 } 3276}