001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Properties; 040import java.util.Queue; 041import java.util.Random; 042import java.util.TreeMap; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048import java.util.concurrent.ThreadLocalRandom; 049import org.apache.commons.lang3.StringUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.conf.Configured; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.Append; 056import org.apache.hadoop.hbase.client.AsyncConnection; 057import org.apache.hadoop.hbase.client.AsyncTable; 058import org.apache.hadoop.hbase.client.BufferedMutator; 059import org.apache.hadoop.hbase.client.BufferedMutatorParams; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 061import org.apache.hadoop.hbase.client.Connection; 062import org.apache.hadoop.hbase.client.ConnectionFactory; 063import org.apache.hadoop.hbase.client.Consistency; 064import org.apache.hadoop.hbase.client.Delete; 065import org.apache.hadoop.hbase.client.Durability; 066import org.apache.hadoop.hbase.client.Get; 067import org.apache.hadoop.hbase.client.Increment; 068import org.apache.hadoop.hbase.client.Put; 069import org.apache.hadoop.hbase.client.RegionInfo; 070import org.apache.hadoop.hbase.client.RegionInfoBuilder; 071import org.apache.hadoop.hbase.client.RegionLocator; 072import org.apache.hadoop.hbase.client.Result; 073import org.apache.hadoop.hbase.client.ResultScanner; 074import org.apache.hadoop.hbase.client.RowMutations; 075import org.apache.hadoop.hbase.client.Scan; 076import org.apache.hadoop.hbase.client.Table; 077import org.apache.hadoop.hbase.client.TableDescriptor; 078import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 079import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 080import org.apache.hadoop.hbase.filter.BinaryComparator; 081import org.apache.hadoop.hbase.filter.Filter; 082import org.apache.hadoop.hbase.filter.FilterAllFilter; 083import org.apache.hadoop.hbase.filter.FilterList; 084import org.apache.hadoop.hbase.filter.PageFilter; 085import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 086import org.apache.hadoop.hbase.filter.WhileMatchFilter; 087import org.apache.hadoop.hbase.io.compress.Compression; 088import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 089import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 090import org.apache.hadoop.hbase.regionserver.BloomType; 091import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 092import org.apache.hadoop.hbase.trace.TraceUtil; 093import org.apache.hadoop.hbase.util.ByteArrayHashKey; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.GsonUtil; 097import org.apache.hadoop.hbase.util.Hash; 098import org.apache.hadoop.hbase.util.MurmurHash; 099import org.apache.hadoop.hbase.util.Pair; 100import org.apache.hadoop.hbase.util.RandomDistribution; 101import org.apache.hadoop.hbase.util.YammerHistogramUtils; 102import org.apache.hadoop.io.LongWritable; 103import org.apache.hadoop.io.Text; 104import org.apache.hadoop.mapreduce.Job; 105import org.apache.hadoop.mapreduce.Mapper; 106import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 108import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 109import org.apache.hadoop.util.Tool; 110import org.apache.hadoop.util.ToolRunner; 111import org.apache.yetus.audience.InterfaceAudience; 112import org.slf4j.Logger; 113import org.slf4j.LoggerFactory; 114 115import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 117import org.apache.hbase.thirdparty.com.google.gson.Gson; 118 119/** 120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 122 * etc.). Pass on the command-line which test to run and how many clients are participating in this 123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 124 * <p> 125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 127 * pages 8-10. 128 * <p> 129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 131 * about 1GB of data, unless specified otherwise. 132 */ 133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 134public class PerformanceEvaluation extends Configured implements Tool { 135 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 136 static final String RANDOM_READ = "randomRead"; 137 static final String PE_COMMAND_SHORTNAME = "pe"; 138 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 139 private static final Gson GSON = GsonUtil.createGson().create(); 140 141 public static final String TABLE_NAME = "TestTable"; 142 public static final String FAMILY_NAME_BASE = "info"; 143 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 144 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 145 public static final int DEFAULT_VALUE_LENGTH = 1000; 146 public static final int ROW_LENGTH = 26; 147 148 private static final int ONE_GB = 1024 * 1024 * 1000; 149 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 150 // TODO : should we make this configurable 151 private static final int TAG_LENGTH = 256; 152 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 153 private static final MathContext CXT = MathContext.DECIMAL64; 154 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 155 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 156 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 157 158 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 159 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 160 161 static { 162 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 163 "Run async random read test"); 164 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 165 "Run async random write test"); 166 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 167 "Run async sequential read test"); 168 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 169 "Run async sequential write test"); 170 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 171 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 172 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 173 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 174 "Run random seek and scan 100 test"); 175 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 176 "Run random seek scan with both start and stop row (max 10 rows)"); 177 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 178 "Run random seek scan with both start and stop row (max 100 rows)"); 179 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 180 "Run random seek scan with both start and stop row (max 1000 rows)"); 181 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 182 "Run random seek scan with both start and stop row (max 10000 rows)"); 183 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 184 addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test"); 185 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 186 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 187 addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete", 188 "Run sequential delete test"); 189 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 190 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 191 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 192 addCommandDescriptor(ReverseScanTest.class, "reverseScan", 193 "Run reverse scan test (read every row)"); 194 addCommandDescriptor(FilteredScanTest.class, "filterScan", 195 "Run scan test using a filter to find a specific row based on it's value " 196 + "(make sure to use --rows=20)"); 197 addCommandDescriptor(IncrementTest.class, "increment", 198 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(AppendTest.class, "append", 200 "Append on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 202 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 203 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 204 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 205 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 206 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 207 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 208 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 209 } 210 211 /** 212 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 213 * associated properties. 214 */ 215 protected static enum Counter { 216 /** elapsed time */ 217 ELAPSED_TIME, 218 /** number of rows */ 219 ROWS 220 } 221 222 protected static class RunResult implements Comparable<RunResult> { 223 public RunResult(long duration, Histogram hist) { 224 this.duration = duration; 225 this.hist = hist; 226 numbOfReplyOverThreshold = 0; 227 numOfReplyFromReplica = 0; 228 } 229 230 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 231 Histogram hist) { 232 this.duration = duration; 233 this.hist = hist; 234 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 235 this.numOfReplyFromReplica = numOfReplyFromReplica; 236 } 237 238 public final long duration; 239 public final Histogram hist; 240 public final long numbOfReplyOverThreshold; 241 public final long numOfReplyFromReplica; 242 243 @Override 244 public String toString() { 245 return Long.toString(duration); 246 } 247 248 @Override 249 public int compareTo(RunResult o) { 250 return Long.compare(this.duration, o.duration); 251 } 252 } 253 254 /** 255 * Constructor 256 * @param conf Configuration object 257 */ 258 public PerformanceEvaluation(final Configuration conf) { 259 super(conf); 260 } 261 262 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 263 String description) { 264 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 265 COMMANDS.put(name, cmdDescriptor); 266 } 267 268 /** 269 * Implementations can have their status set. 270 */ 271 interface Status { 272 /** 273 * Sets status 274 * @param msg status message 275 */ 276 void setStatus(final String msg) throws IOException; 277 } 278 279 /** 280 * MapReduce job that runs a performance evaluation client in each map task. 281 */ 282 public static class EvaluationMapTask 283 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 284 285 /** configuration parameter name that contains the command */ 286 public final static String CMD_KEY = "EvaluationMapTask.command"; 287 /** configuration parameter name that contains the PE impl */ 288 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 289 290 private Class<? extends Test> cmd; 291 292 @Override 293 protected void setup(Context context) throws IOException, InterruptedException { 294 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 295 296 // this is required so that extensions of PE are instantiated within the 297 // map reduce task... 298 Class<? extends PerformanceEvaluation> peClass = 299 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 300 try { 301 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 302 } catch (Exception e) { 303 throw new IllegalStateException("Could not instantiate PE instance", e); 304 } 305 } 306 307 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 308 try { 309 return Class.forName(className).asSubclass(type); 310 } catch (ClassNotFoundException e) { 311 throw new IllegalStateException("Could not find class for name: " + className, e); 312 } 313 } 314 315 @Override 316 protected void map(LongWritable key, Text value, final Context context) 317 throws IOException, InterruptedException { 318 319 Status status = new Status() { 320 @Override 321 public void setStatus(String msg) { 322 context.setStatus(msg); 323 } 324 }; 325 326 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 327 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 328 final Connection con = ConnectionFactory.createConnection(conf); 329 AsyncConnection asyncCon = null; 330 try { 331 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 332 } catch (ExecutionException e) { 333 throw new IOException(e); 334 } 335 336 // Evaluation task 337 RunResult result = 338 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 339 // Collect how much time the thing took. Report as map output and 340 // to the ELAPSED_TIME counter. 341 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 342 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 343 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 344 context.progress(); 345 } 346 } 347 348 /* 349 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 350 * is specified or when the existing table's region replica count doesn't match {@code 351 * opts.replicas}. 352 */ 353 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 354 TableName tableName = TableName.valueOf(opts.tableName); 355 boolean needsDelete = false, exists = admin.tableExists(tableName); 356 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 357 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 358 boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete"); 359 if (!exists && (isReadCmd || isDeleteCmd)) { 360 throw new IllegalStateException( 361 "Must specify an existing table for read commands. Run a write command first."); 362 } 363 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 364 byte[][] splits = getSplits(opts); 365 366 // recreate the table when user has requested presplit or when existing 367 // {RegionSplitPolicy,replica count} does not match requested, or when the 368 // number of column families does not match requested. 369 if ( 370 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions 371 && opts.presplitRegions != admin.getRegions(tableName).size()) 372 || (!isReadCmd && desc != null 373 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 374 || (!(isReadCmd || isDeleteCmd) && desc != null 375 && desc.getRegionReplication() != opts.replicas) 376 || (desc != null && desc.getColumnFamilyCount() != opts.families) 377 ) { 378 needsDelete = true; 379 // wait, why did it delete my table?!? 380 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 381 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 382 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 383 .add("replicas", opts.replicas).add("families", opts.families).toString()); 384 } 385 386 // remove an existing table 387 if (needsDelete) { 388 if (admin.isTableEnabled(tableName)) { 389 admin.disableTable(tableName); 390 } 391 admin.deleteTable(tableName); 392 } 393 394 // table creation is necessary 395 if (!exists || needsDelete) { 396 desc = getTableDescriptor(opts); 397 if (splits != null) { 398 if (LOG.isDebugEnabled()) { 399 for (int i = 0; i < splits.length; i++) { 400 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 401 } 402 } 403 } 404 if (splits != null) { 405 admin.createTable(desc, splits); 406 } else { 407 admin.createTable(desc); 408 } 409 LOG.info("Table " + desc + " created"); 410 } 411 return admin.tableExists(tableName); 412 } 413 414 /** 415 * Create an HTableDescriptor from provided TestOptions. 416 */ 417 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 418 TableDescriptorBuilder builder = 419 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 420 421 for (int family = 0; family < opts.families; family++) { 422 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 423 ColumnFamilyDescriptorBuilder cfBuilder = 424 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 425 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 426 cfBuilder.setCompressionType(opts.compression); 427 cfBuilder.setEncryptionType(opts.encryption); 428 cfBuilder.setBloomFilterType(opts.bloomType); 429 cfBuilder.setBlocksize(opts.blockSize); 430 if (opts.inMemoryCF) { 431 cfBuilder.setInMemory(true); 432 } 433 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 434 builder.setColumnFamily(cfBuilder.build()); 435 } 436 if (opts.replicas != DEFAULT_OPTS.replicas) { 437 builder.setRegionReplication(opts.replicas); 438 } 439 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 440 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 441 } 442 return builder.build(); 443 } 444 445 /** 446 * generates splits based on total number of rows and specified split regions 447 */ 448 protected static byte[][] getSplits(TestOptions opts) { 449 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 450 451 int numSplitPoints = opts.presplitRegions - 1; 452 byte[][] splits = new byte[numSplitPoints][]; 453 int jump = opts.totalRows / opts.presplitRegions; 454 for (int i = 0; i < numSplitPoints; i++) { 455 int rowkey = jump * (1 + i); 456 splits[i] = format(rowkey); 457 } 458 return splits; 459 } 460 461 static void setupConnectionCount(final TestOptions opts) { 462 if (opts.oneCon) { 463 opts.connCount = 1; 464 } else { 465 if (opts.connCount == -1) { 466 // set to thread number if connCount is not set 467 opts.connCount = opts.numClientThreads; 468 } 469 } 470 } 471 472 /* 473 * Run all clients in this vm each to its own thread. 474 */ 475 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 476 throws IOException, InterruptedException, ExecutionException { 477 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 478 assert cmd != null; 479 @SuppressWarnings("unchecked") 480 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 481 RunResult[] results = new RunResult[opts.numClientThreads]; 482 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 483 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 484 setupConnectionCount(opts); 485 final Connection[] cons = new Connection[opts.connCount]; 486 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 487 for (int i = 0; i < opts.connCount; i++) { 488 cons[i] = ConnectionFactory.createConnection(conf); 489 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 490 } 491 LOG 492 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 493 for (int i = 0; i < threads.length; i++) { 494 final int index = i; 495 threads[i] = pool.submit(new Callable<RunResult>() { 496 @Override 497 public RunResult call() throws Exception { 498 TestOptions threadOpts = new TestOptions(opts); 499 final Connection con = cons[index % cons.length]; 500 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 501 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 502 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 503 @Override 504 public void setStatus(final String msg) throws IOException { 505 LOG.info(msg); 506 } 507 }); 508 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 509 + "ms over " + threadOpts.perClientRunRows + " rows"); 510 if (opts.latencyThreshold > 0) { 511 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 512 + "(ms) is " + run.numbOfReplyOverThreshold); 513 } 514 return run; 515 } 516 }); 517 } 518 pool.shutdown(); 519 520 for (int i = 0; i < threads.length; i++) { 521 try { 522 results[i] = threads[i].get(); 523 } catch (ExecutionException e) { 524 throw new IOException(e.getCause()); 525 } 526 } 527 final String test = cmd.getSimpleName(); 528 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 529 Arrays.sort(results); 530 long total = 0; 531 float avgLatency = 0; 532 float avgTPS = 0; 533 long replicaWins = 0; 534 for (RunResult result : results) { 535 total += result.duration; 536 avgLatency += result.hist.getSnapshot().getMean(); 537 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 538 replicaWins += result.numOfReplyFromReplica; 539 } 540 avgTPS *= 1000; // ms to second 541 avgLatency = avgLatency / results.length; 542 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 543 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 544 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 545 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 546 if (opts.replicas > 1) { 547 LOG.info("[results from replica regions] " + replicaWins); 548 } 549 550 for (int i = 0; i < opts.connCount; i++) { 551 cons[i].close(); 552 asyncCons[i].close(); 553 } 554 555 return results; 556 } 557 558 /* 559 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 560 * out an input file with instruction per client regards which row they are to start on. 561 * @param cmd Command to run. 562 */ 563 static Job doMapReduce(TestOptions opts, final Configuration conf) 564 throws IOException, InterruptedException, ClassNotFoundException { 565 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 566 assert cmd != null; 567 Path inputDir = writeInputFile(conf, opts); 568 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 569 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 570 Job job = Job.getInstance(conf); 571 job.setJarByClass(PerformanceEvaluation.class); 572 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 573 574 job.setInputFormatClass(NLineInputFormat.class); 575 NLineInputFormat.setInputPaths(job, inputDir); 576 // this is default, but be explicit about it just in case. 577 NLineInputFormat.setNumLinesPerSplit(job, 1); 578 579 job.setOutputKeyClass(LongWritable.class); 580 job.setOutputValueClass(LongWritable.class); 581 582 job.setMapperClass(EvaluationMapTask.class); 583 job.setReducerClass(LongSumReducer.class); 584 585 job.setNumReduceTasks(1); 586 587 job.setOutputFormatClass(TextOutputFormat.class); 588 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 589 590 TableMapReduceUtil.addDependencyJars(job); 591 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 592 // metrics 593 Gson.class, // gson 594 FilterAllFilter.class // hbase-server tests jar 595 ); 596 597 TableMapReduceUtil.initCredentials(job); 598 599 job.waitForCompletion(true); 600 return job; 601 } 602 603 /** 604 * Each client has one mapper to do the work, and client do the resulting count in a map task. 605 */ 606 607 static String JOB_INPUT_FILENAME = "input.txt"; 608 609 /* 610 * Write input file of offsets-per-client for the mapreduce job. 611 * @param c Configuration 612 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 613 */ 614 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 615 return writeInputFile(c, opts, new Path(".")); 616 } 617 618 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 619 throws IOException { 620 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 621 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 622 Path inputDir = new Path(jobdir, "inputs"); 623 624 FileSystem fs = FileSystem.get(c); 625 fs.mkdirs(inputDir); 626 627 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 628 PrintStream out = new PrintStream(fs.create(inputFile)); 629 // Make input random. 630 Map<Integer, String> m = new TreeMap<>(); 631 Hash h = MurmurHash.getInstance(); 632 int perClientRows = (opts.totalRows / opts.numClientThreads); 633 try { 634 for (int j = 0; j < opts.numClientThreads; j++) { 635 TestOptions next = new TestOptions(opts); 636 next.startRow = j * perClientRows; 637 next.perClientRunRows = perClientRows; 638 String s = GSON.toJson(next); 639 LOG.info("Client=" + j + ", input=" + s); 640 byte[] b = Bytes.toBytes(s); 641 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 642 m.put(hash, s); 643 } 644 for (Map.Entry<Integer, String> e : m.entrySet()) { 645 out.println(e.getValue()); 646 } 647 } finally { 648 out.close(); 649 } 650 return inputDir; 651 } 652 653 /** 654 * Describes a command. 655 */ 656 static class CmdDescriptor { 657 private Class<? extends TestBase> cmdClass; 658 private String name; 659 private String description; 660 661 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 662 this.cmdClass = cmdClass; 663 this.name = name; 664 this.description = description; 665 } 666 667 public Class<? extends TestBase> getCmdClass() { 668 return cmdClass; 669 } 670 671 public String getName() { 672 return name; 673 } 674 675 public String getDescription() { 676 return description; 677 } 678 } 679 680 /** 681 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 682 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 683 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 684 * need to add to the clone constructor below copying your new option from the 'that' to the 685 * 'this'. Look for 'clone' below. 686 */ 687 static class TestOptions { 688 String cmdName = null; 689 boolean nomapred = false; 690 boolean filterAll = false; 691 int startRow = 0; 692 float size = 1.0f; 693 int perClientRunRows = DEFAULT_ROWS_PER_GB; 694 int numClientThreads = 1; 695 int totalRows = DEFAULT_ROWS_PER_GB; 696 int measureAfter = 0; 697 float sampleRate = 1.0f; 698 /** 699 * @deprecated Useless after switching to OpenTelemetry 700 */ 701 @Deprecated 702 double traceRate = 0.0; 703 String tableName = TABLE_NAME; 704 boolean flushCommits = true; 705 boolean writeToWAL = true; 706 boolean autoFlush = false; 707 boolean oneCon = false; 708 int connCount = -1; // wil decide the actual num later 709 boolean useTags = false; 710 int noOfTags = 1; 711 boolean reportLatency = false; 712 int multiGet = 0; 713 int multiPut = 0; 714 int randomSleep = 0; 715 boolean inMemoryCF = false; 716 int presplitRegions = 0; 717 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 718 String splitPolicy = null; 719 Compression.Algorithm compression = Compression.Algorithm.NONE; 720 String encryption = null; 721 BloomType bloomType = BloomType.ROW; 722 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 723 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 724 boolean valueRandom = false; 725 boolean valueZipf = false; 726 int valueSize = DEFAULT_VALUE_LENGTH; 727 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 728 int cycles = 1; 729 int columns = 1; 730 int families = 1; 731 int caching = 30; 732 int latencyThreshold = 0; // in millsecond 733 boolean addColumns = true; 734 MemoryCompactionPolicy inMemoryCompaction = 735 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 736 boolean asyncPrefetch = false; 737 boolean cacheBlocks = true; 738 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 739 long bufferSize = 2l * 1024l * 1024l; 740 Properties commandProperties; 741 742 public TestOptions() { 743 } 744 745 /** 746 * Clone constructor. 747 * @param that Object to copy from. 748 */ 749 public TestOptions(TestOptions that) { 750 this.cmdName = that.cmdName; 751 this.cycles = that.cycles; 752 this.nomapred = that.nomapred; 753 this.startRow = that.startRow; 754 this.size = that.size; 755 this.perClientRunRows = that.perClientRunRows; 756 this.numClientThreads = that.numClientThreads; 757 this.totalRows = that.totalRows; 758 this.sampleRate = that.sampleRate; 759 this.traceRate = that.traceRate; 760 this.tableName = that.tableName; 761 this.flushCommits = that.flushCommits; 762 this.writeToWAL = that.writeToWAL; 763 this.autoFlush = that.autoFlush; 764 this.oneCon = that.oneCon; 765 this.connCount = that.connCount; 766 this.useTags = that.useTags; 767 this.noOfTags = that.noOfTags; 768 this.reportLatency = that.reportLatency; 769 this.latencyThreshold = that.latencyThreshold; 770 this.multiGet = that.multiGet; 771 this.multiPut = that.multiPut; 772 this.inMemoryCF = that.inMemoryCF; 773 this.presplitRegions = that.presplitRegions; 774 this.replicas = that.replicas; 775 this.splitPolicy = that.splitPolicy; 776 this.compression = that.compression; 777 this.encryption = that.encryption; 778 this.blockEncoding = that.blockEncoding; 779 this.filterAll = that.filterAll; 780 this.bloomType = that.bloomType; 781 this.blockSize = that.blockSize; 782 this.valueRandom = that.valueRandom; 783 this.valueZipf = that.valueZipf; 784 this.valueSize = that.valueSize; 785 this.period = that.period; 786 this.randomSleep = that.randomSleep; 787 this.measureAfter = that.measureAfter; 788 this.addColumns = that.addColumns; 789 this.columns = that.columns; 790 this.families = that.families; 791 this.caching = that.caching; 792 this.inMemoryCompaction = that.inMemoryCompaction; 793 this.asyncPrefetch = that.asyncPrefetch; 794 this.cacheBlocks = that.cacheBlocks; 795 this.scanReadType = that.scanReadType; 796 this.bufferSize = that.bufferSize; 797 this.commandProperties = that.commandProperties; 798 } 799 800 public Properties getCommandProperties() { 801 return commandProperties; 802 } 803 804 public int getCaching() { 805 return this.caching; 806 } 807 808 public void setCaching(final int caching) { 809 this.caching = caching; 810 } 811 812 public int getColumns() { 813 return this.columns; 814 } 815 816 public void setColumns(final int columns) { 817 this.columns = columns; 818 } 819 820 public int getFamilies() { 821 return this.families; 822 } 823 824 public void setFamilies(final int families) { 825 this.families = families; 826 } 827 828 public int getCycles() { 829 return this.cycles; 830 } 831 832 public void setCycles(final int cycles) { 833 this.cycles = cycles; 834 } 835 836 public boolean isValueZipf() { 837 return valueZipf; 838 } 839 840 public void setValueZipf(boolean valueZipf) { 841 this.valueZipf = valueZipf; 842 } 843 844 public String getCmdName() { 845 return cmdName; 846 } 847 848 public void setCmdName(String cmdName) { 849 this.cmdName = cmdName; 850 } 851 852 public int getRandomSleep() { 853 return randomSleep; 854 } 855 856 public void setRandomSleep(int randomSleep) { 857 this.randomSleep = randomSleep; 858 } 859 860 public int getReplicas() { 861 return replicas; 862 } 863 864 public void setReplicas(int replicas) { 865 this.replicas = replicas; 866 } 867 868 public String getSplitPolicy() { 869 return splitPolicy; 870 } 871 872 public void setSplitPolicy(String splitPolicy) { 873 this.splitPolicy = splitPolicy; 874 } 875 876 public void setNomapred(boolean nomapred) { 877 this.nomapred = nomapred; 878 } 879 880 public void setFilterAll(boolean filterAll) { 881 this.filterAll = filterAll; 882 } 883 884 public void setStartRow(int startRow) { 885 this.startRow = startRow; 886 } 887 888 public void setSize(float size) { 889 this.size = size; 890 } 891 892 public void setPerClientRunRows(int perClientRunRows) { 893 this.perClientRunRows = perClientRunRows; 894 } 895 896 public void setNumClientThreads(int numClientThreads) { 897 this.numClientThreads = numClientThreads; 898 } 899 900 public void setTotalRows(int totalRows) { 901 this.totalRows = totalRows; 902 } 903 904 public void setSampleRate(float sampleRate) { 905 this.sampleRate = sampleRate; 906 } 907 908 public void setTraceRate(double traceRate) { 909 this.traceRate = traceRate; 910 } 911 912 public void setTableName(String tableName) { 913 this.tableName = tableName; 914 } 915 916 public void setFlushCommits(boolean flushCommits) { 917 this.flushCommits = flushCommits; 918 } 919 920 public void setWriteToWAL(boolean writeToWAL) { 921 this.writeToWAL = writeToWAL; 922 } 923 924 public void setAutoFlush(boolean autoFlush) { 925 this.autoFlush = autoFlush; 926 } 927 928 public void setOneCon(boolean oneCon) { 929 this.oneCon = oneCon; 930 } 931 932 public int getConnCount() { 933 return connCount; 934 } 935 936 public void setConnCount(int connCount) { 937 this.connCount = connCount; 938 } 939 940 public void setUseTags(boolean useTags) { 941 this.useTags = useTags; 942 } 943 944 public void setNoOfTags(int noOfTags) { 945 this.noOfTags = noOfTags; 946 } 947 948 public void setReportLatency(boolean reportLatency) { 949 this.reportLatency = reportLatency; 950 } 951 952 public void setMultiGet(int multiGet) { 953 this.multiGet = multiGet; 954 } 955 956 public void setMultiPut(int multiPut) { 957 this.multiPut = multiPut; 958 } 959 960 public void setInMemoryCF(boolean inMemoryCF) { 961 this.inMemoryCF = inMemoryCF; 962 } 963 964 public void setPresplitRegions(int presplitRegions) { 965 this.presplitRegions = presplitRegions; 966 } 967 968 public void setCompression(Compression.Algorithm compression) { 969 this.compression = compression; 970 } 971 972 public void setEncryption(String encryption) { 973 this.encryption = encryption; 974 } 975 976 public void setBloomType(BloomType bloomType) { 977 this.bloomType = bloomType; 978 } 979 980 public void setBlockSize(int blockSize) { 981 this.blockSize = blockSize; 982 } 983 984 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 985 this.blockEncoding = blockEncoding; 986 } 987 988 public void setValueRandom(boolean valueRandom) { 989 this.valueRandom = valueRandom; 990 } 991 992 public void setValueSize(int valueSize) { 993 this.valueSize = valueSize; 994 } 995 996 public void setBufferSize(long bufferSize) { 997 this.bufferSize = bufferSize; 998 } 999 1000 public void setPeriod(int period) { 1001 this.period = period; 1002 } 1003 1004 public boolean isNomapred() { 1005 return nomapred; 1006 } 1007 1008 public boolean isFilterAll() { 1009 return filterAll; 1010 } 1011 1012 public int getStartRow() { 1013 return startRow; 1014 } 1015 1016 public float getSize() { 1017 return size; 1018 } 1019 1020 public int getPerClientRunRows() { 1021 return perClientRunRows; 1022 } 1023 1024 public int getNumClientThreads() { 1025 return numClientThreads; 1026 } 1027 1028 public int getTotalRows() { 1029 return totalRows; 1030 } 1031 1032 public float getSampleRate() { 1033 return sampleRate; 1034 } 1035 1036 public double getTraceRate() { 1037 return traceRate; 1038 } 1039 1040 public String getTableName() { 1041 return tableName; 1042 } 1043 1044 public boolean isFlushCommits() { 1045 return flushCommits; 1046 } 1047 1048 public boolean isWriteToWAL() { 1049 return writeToWAL; 1050 } 1051 1052 public boolean isAutoFlush() { 1053 return autoFlush; 1054 } 1055 1056 public boolean isUseTags() { 1057 return useTags; 1058 } 1059 1060 public int getNoOfTags() { 1061 return noOfTags; 1062 } 1063 1064 public boolean isReportLatency() { 1065 return reportLatency; 1066 } 1067 1068 public int getMultiGet() { 1069 return multiGet; 1070 } 1071 1072 public int getMultiPut() { 1073 return multiPut; 1074 } 1075 1076 public boolean isInMemoryCF() { 1077 return inMemoryCF; 1078 } 1079 1080 public int getPresplitRegions() { 1081 return presplitRegions; 1082 } 1083 1084 public Compression.Algorithm getCompression() { 1085 return compression; 1086 } 1087 1088 public String getEncryption() { 1089 return encryption; 1090 } 1091 1092 public DataBlockEncoding getBlockEncoding() { 1093 return blockEncoding; 1094 } 1095 1096 public boolean isValueRandom() { 1097 return valueRandom; 1098 } 1099 1100 public int getValueSize() { 1101 return valueSize; 1102 } 1103 1104 public int getPeriod() { 1105 return period; 1106 } 1107 1108 public BloomType getBloomType() { 1109 return bloomType; 1110 } 1111 1112 public int getBlockSize() { 1113 return blockSize; 1114 } 1115 1116 public boolean isOneCon() { 1117 return oneCon; 1118 } 1119 1120 public int getMeasureAfter() { 1121 return measureAfter; 1122 } 1123 1124 public void setMeasureAfter(int measureAfter) { 1125 this.measureAfter = measureAfter; 1126 } 1127 1128 public boolean getAddColumns() { 1129 return addColumns; 1130 } 1131 1132 public void setAddColumns(boolean addColumns) { 1133 this.addColumns = addColumns; 1134 } 1135 1136 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1137 this.inMemoryCompaction = inMemoryCompaction; 1138 } 1139 1140 public MemoryCompactionPolicy getInMemoryCompaction() { 1141 return this.inMemoryCompaction; 1142 } 1143 1144 public long getBufferSize() { 1145 return this.bufferSize; 1146 } 1147 } 1148 1149 /* 1150 * A test. Subclass to particularize what happens per row. 1151 */ 1152 static abstract class TestBase { 1153 // Below is make it so when Tests are all running in the one 1154 // jvm, that they each have a differently seeded Random. 1155 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1156 1157 private static long nextRandomSeed() { 1158 return randomSeed.nextLong(); 1159 } 1160 1161 private final int everyN; 1162 1163 protected final Random rand = new Random(nextRandomSeed()); 1164 protected final Configuration conf; 1165 protected final TestOptions opts; 1166 1167 protected final Status status; 1168 1169 private String testName; 1170 protected Histogram latencyHistogram; 1171 private Histogram replicaLatencyHistogram; 1172 private Histogram valueSizeHistogram; 1173 private Histogram rpcCallsHistogram; 1174 private Histogram remoteRpcCallsHistogram; 1175 private Histogram millisBetweenNextHistogram; 1176 private Histogram regionsScannedHistogram; 1177 private Histogram bytesInResultsHistogram; 1178 private Histogram bytesInRemoteResultsHistogram; 1179 private RandomDistribution.Zipf zipf; 1180 private long numOfReplyOverLatencyThreshold = 0; 1181 private long numOfReplyFromReplica = 0; 1182 1183 /** 1184 * Note that all subclasses of this class must provide a public constructor that has the exact 1185 * same list of arguments. 1186 */ 1187 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1188 this.conf = conf; 1189 this.opts = options; 1190 this.status = status; 1191 this.testName = this.getClass().getSimpleName(); 1192 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1193 if (options.isValueZipf()) { 1194 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1195 } 1196 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1197 } 1198 1199 int getValueLength(final Random r) { 1200 if (this.opts.isValueRandom()) { 1201 return r.nextInt(opts.valueSize); 1202 } else if (this.opts.isValueZipf()) { 1203 return Math.abs(this.zipf.nextInt()); 1204 } else { 1205 return opts.valueSize; 1206 } 1207 } 1208 1209 void updateValueSize(final Result[] rs) throws IOException { 1210 updateValueSize(rs, 0); 1211 } 1212 1213 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1214 if (rs == null || (latency == 0)) return; 1215 for (Result r : rs) 1216 updateValueSize(r, latency); 1217 } 1218 1219 void updateValueSize(final Result r) throws IOException { 1220 updateValueSize(r, 0); 1221 } 1222 1223 void updateValueSize(final Result r, final long latency) throws IOException { 1224 if (r == null || (latency == 0)) return; 1225 int size = 0; 1226 // update replicaHistogram 1227 if (r.isStale()) { 1228 replicaLatencyHistogram.update(latency / 1000); 1229 numOfReplyFromReplica++; 1230 } 1231 if (!isRandomValueSize()) return; 1232 1233 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1234 size += scanner.current().getValueLength(); 1235 } 1236 updateValueSize(size); 1237 } 1238 1239 void updateValueSize(final int valueSize) { 1240 if (!isRandomValueSize()) return; 1241 this.valueSizeHistogram.update(valueSize); 1242 } 1243 1244 void updateScanMetrics(final ScanMetrics metrics) { 1245 if (metrics == null) return; 1246 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1247 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1248 if (rpcCalls != null) { 1249 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1250 } 1251 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1252 if (remoteRpcCalls != null) { 1253 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1254 } 1255 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1256 if (millisBetweenNext != null) { 1257 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1258 } 1259 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1260 if (regionsScanned != null) { 1261 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1262 } 1263 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1264 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1265 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1266 } 1267 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1268 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1269 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1270 } 1271 } 1272 1273 String generateStatus(final int sr, final int i, final int lr) { 1274 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1275 + getShortLatencyReport() + "]" 1276 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1277 } 1278 1279 boolean isRandomValueSize() { 1280 return opts.valueRandom; 1281 } 1282 1283 protected int getReportingPeriod() { 1284 return opts.period; 1285 } 1286 1287 /** 1288 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1289 */ 1290 public Histogram getLatencyHistogram() { 1291 return latencyHistogram; 1292 } 1293 1294 void testSetup() throws IOException { 1295 // test metrics 1296 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1297 // If it is a replica test, set up histogram for replica. 1298 if (opts.replicas > 1) { 1299 replicaLatencyHistogram = 1300 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1301 } 1302 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1303 // scan metrics 1304 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1305 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1306 millisBetweenNextHistogram = 1307 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1308 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1309 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1310 bytesInRemoteResultsHistogram = 1311 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1312 1313 onStartup(); 1314 } 1315 1316 abstract void onStartup() throws IOException; 1317 1318 void testTakedown() throws IOException { 1319 onTakedown(); 1320 // Print all stats for this thread continuously. 1321 // Synchronize on Test.class so different threads don't intermingle the 1322 // output. We can't use 'this' here because each thread has its own instance of Test class. 1323 synchronized (Test.class) { 1324 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1325 status 1326 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1327 if (opts.replicas > 1) { 1328 status.setStatus("Latency (us) from Replica Regions: " 1329 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1330 } 1331 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1332 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1333 if (valueSizeHistogram.getCount() > 0) { 1334 status.setStatus( 1335 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1336 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1337 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1338 } else { 1339 status.setStatus("No valueSize statistics available"); 1340 } 1341 if (rpcCallsHistogram.getCount() > 0) { 1342 status.setStatus( 1343 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1344 } 1345 if (remoteRpcCallsHistogram.getCount() > 0) { 1346 status.setStatus("remoteRpcCalls (count): " 1347 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1348 } 1349 if (millisBetweenNextHistogram.getCount() > 0) { 1350 status.setStatus("millisBetweenNext (latency): " 1351 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1352 } 1353 if (regionsScannedHistogram.getCount() > 0) { 1354 status.setStatus("regionsScanned (count): " 1355 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1356 } 1357 if (bytesInResultsHistogram.getCount() > 0) { 1358 status.setStatus("bytesInResults (size): " 1359 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1360 } 1361 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1362 status.setStatus("bytesInRemoteResults (size): " 1363 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1364 } 1365 } 1366 } 1367 1368 abstract void onTakedown() throws IOException; 1369 1370 /* 1371 * Run test 1372 * @return Elapsed time. 1373 */ 1374 long test() throws IOException, InterruptedException { 1375 testSetup(); 1376 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1377 final long startTime = System.nanoTime(); 1378 try { 1379 testTimed(); 1380 } finally { 1381 testTakedown(); 1382 } 1383 return (System.nanoTime() - startTime) / 1000000; 1384 } 1385 1386 int getStartRow() { 1387 return opts.startRow; 1388 } 1389 1390 int getLastRow() { 1391 return getStartRow() + opts.perClientRunRows; 1392 } 1393 1394 /** 1395 * Provides an extension point for tests that don't want a per row invocation. 1396 */ 1397 void testTimed() throws IOException, InterruptedException { 1398 int startRow = getStartRow(); 1399 int lastRow = getLastRow(); 1400 // Report on completion of 1/10th of total. 1401 for (int ii = 0; ii < opts.cycles; ii++) { 1402 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1403 for (int i = startRow; i < lastRow; i++) { 1404 if (i % everyN != 0) continue; 1405 long startTime = System.nanoTime(); 1406 boolean requestSent = false; 1407 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1408 try (Scope scope = span.makeCurrent()) { 1409 requestSent = testRow(i, startTime); 1410 } finally { 1411 span.end(); 1412 } 1413 if ((i - startRow) > opts.measureAfter) { 1414 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1415 // first 9 times and sends the actual get request in the 10th iteration. 1416 // We should only set latency when actual request is sent because otherwise 1417 // it turns out to be 0. 1418 if (requestSent) { 1419 long latency = (System.nanoTime() - startTime) / 1000; 1420 latencyHistogram.update(latency); 1421 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1422 numOfReplyOverLatencyThreshold++; 1423 } 1424 } 1425 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1426 status.setStatus(generateStatus(startRow, i, lastRow)); 1427 } 1428 } 1429 } 1430 } 1431 } 1432 1433 /** Returns Subset of the histograms' calculation. */ 1434 public String getShortLatencyReport() { 1435 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1436 } 1437 1438 /** Returns Subset of the histograms' calculation. */ 1439 public String getShortValueSizeReport() { 1440 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1441 } 1442 1443 /** 1444 * Test for individual row. 1445 * @param i Row index. 1446 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1447 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1448 */ 1449 abstract boolean testRow(final int i, final long startTime) 1450 throws IOException, InterruptedException; 1451 } 1452 1453 static abstract class Test extends TestBase { 1454 protected Connection connection; 1455 1456 Test(final Connection con, final TestOptions options, final Status status) { 1457 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1458 this.connection = con; 1459 } 1460 } 1461 1462 static abstract class AsyncTest extends TestBase { 1463 protected AsyncConnection connection; 1464 1465 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1466 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1467 this.connection = con; 1468 } 1469 } 1470 1471 static abstract class TableTest extends Test { 1472 protected Table table; 1473 1474 TableTest(Connection con, TestOptions options, Status status) { 1475 super(con, options, status); 1476 } 1477 1478 @Override 1479 void onStartup() throws IOException { 1480 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1481 } 1482 1483 @Override 1484 void onTakedown() throws IOException { 1485 table.close(); 1486 } 1487 } 1488 1489 /* 1490 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1491 */ 1492 static abstract class MetaTest extends TableTest { 1493 protected int keyLength; 1494 1495 MetaTest(Connection con, TestOptions options, Status status) { 1496 super(con, options, status); 1497 keyLength = Integer.toString(opts.perClientRunRows).length(); 1498 } 1499 1500 @Override 1501 void onTakedown() throws IOException { 1502 // No clean up 1503 } 1504 1505 /* 1506 * Generates Lexicographically ascending strings 1507 */ 1508 protected byte[] getSplitKey(final int i) { 1509 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1510 } 1511 1512 } 1513 1514 static abstract class AsyncTableTest extends AsyncTest { 1515 protected AsyncTable<?> table; 1516 1517 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1518 super(con, options, status); 1519 } 1520 1521 @Override 1522 void onStartup() throws IOException { 1523 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1524 } 1525 1526 @Override 1527 void onTakedown() throws IOException { 1528 } 1529 } 1530 1531 static class AsyncRandomReadTest extends AsyncTableTest { 1532 private final Consistency consistency; 1533 private ArrayList<Get> gets; 1534 1535 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1536 super(con, options, status); 1537 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1538 if (opts.multiGet > 0) { 1539 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1540 this.gets = new ArrayList<>(opts.multiGet); 1541 } 1542 } 1543 1544 @Override 1545 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1546 if (opts.randomSleep > 0) { 1547 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1548 } 1549 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1550 for (int family = 0; family < opts.families; family++) { 1551 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1552 if (opts.addColumns) { 1553 for (int column = 0; column < opts.columns; column++) { 1554 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1555 get.addColumn(familyName, qualifier); 1556 } 1557 } else { 1558 get.addFamily(familyName); 1559 } 1560 } 1561 if (opts.filterAll) { 1562 get.setFilter(new FilterAllFilter()); 1563 } 1564 get.setConsistency(consistency); 1565 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1566 try { 1567 if (opts.multiGet > 0) { 1568 this.gets.add(get); 1569 if (this.gets.size() == opts.multiGet) { 1570 Result[] rs = 1571 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1572 updateValueSize(rs); 1573 this.gets.clear(); 1574 } else { 1575 return false; 1576 } 1577 } else { 1578 updateValueSize(this.table.get(get).get()); 1579 } 1580 } catch (ExecutionException e) { 1581 throw new IOException(e); 1582 } 1583 return true; 1584 } 1585 1586 public static RuntimeException runtime(Throwable e) { 1587 if (e instanceof RuntimeException) { 1588 return (RuntimeException) e; 1589 } 1590 return new RuntimeException(e); 1591 } 1592 1593 public static <V> V propagate(Callable<V> callable) { 1594 try { 1595 return callable.call(); 1596 } catch (Exception e) { 1597 throw runtime(e); 1598 } 1599 } 1600 1601 @Override 1602 protected int getReportingPeriod() { 1603 int period = opts.perClientRunRows / 10; 1604 return period == 0 ? opts.perClientRunRows : period; 1605 } 1606 1607 @Override 1608 protected void testTakedown() throws IOException { 1609 if (this.gets != null && this.gets.size() > 0) { 1610 this.table.get(gets); 1611 this.gets.clear(); 1612 } 1613 super.testTakedown(); 1614 } 1615 } 1616 1617 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1618 1619 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1620 super(con, options, status); 1621 } 1622 1623 @Override 1624 protected byte[] generateRow(final int i) { 1625 return getRandomRow(this.rand, opts.totalRows); 1626 } 1627 } 1628 1629 static class AsyncScanTest extends AsyncTableTest { 1630 private ResultScanner testScanner; 1631 private AsyncTable<?> asyncTable; 1632 1633 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1634 super(con, options, status); 1635 } 1636 1637 @Override 1638 void onStartup() throws IOException { 1639 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1640 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1641 } 1642 1643 @Override 1644 void testTakedown() throws IOException { 1645 if (this.testScanner != null) { 1646 updateScanMetrics(this.testScanner.getScanMetrics()); 1647 this.testScanner.close(); 1648 } 1649 super.testTakedown(); 1650 } 1651 1652 @Override 1653 boolean testRow(final int i, final long startTime) throws IOException { 1654 if (this.testScanner == null) { 1655 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1656 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1657 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1658 for (int family = 0; family < opts.families; family++) { 1659 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1660 if (opts.addColumns) { 1661 for (int column = 0; column < opts.columns; column++) { 1662 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1663 scan.addColumn(familyName, qualifier); 1664 } 1665 } else { 1666 scan.addFamily(familyName); 1667 } 1668 } 1669 if (opts.filterAll) { 1670 scan.setFilter(new FilterAllFilter()); 1671 } 1672 this.testScanner = asyncTable.getScanner(scan); 1673 } 1674 Result r = testScanner.next(); 1675 updateValueSize(r); 1676 return true; 1677 } 1678 } 1679 1680 static class AsyncSequentialReadTest extends AsyncTableTest { 1681 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1682 super(con, options, status); 1683 } 1684 1685 @Override 1686 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1687 Get get = new Get(format(i)); 1688 for (int family = 0; family < opts.families; family++) { 1689 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1690 if (opts.addColumns) { 1691 for (int column = 0; column < opts.columns; column++) { 1692 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1693 get.addColumn(familyName, qualifier); 1694 } 1695 } else { 1696 get.addFamily(familyName); 1697 } 1698 } 1699 if (opts.filterAll) { 1700 get.setFilter(new FilterAllFilter()); 1701 } 1702 try { 1703 updateValueSize(table.get(get).get()); 1704 } catch (ExecutionException e) { 1705 throw new IOException(e); 1706 } 1707 return true; 1708 } 1709 } 1710 1711 static class AsyncSequentialWriteTest extends AsyncTableTest { 1712 private ArrayList<Put> puts; 1713 1714 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1715 super(con, options, status); 1716 if (opts.multiPut > 0) { 1717 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1718 this.puts = new ArrayList<>(opts.multiPut); 1719 } 1720 } 1721 1722 protected byte[] generateRow(final int i) { 1723 return format(i); 1724 } 1725 1726 @Override 1727 @SuppressWarnings("ReturnValueIgnored") 1728 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1729 byte[] row = generateRow(i); 1730 Put put = new Put(row); 1731 for (int family = 0; family < opts.families; family++) { 1732 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1733 for (int column = 0; column < opts.columns; column++) { 1734 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1735 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1736 if (opts.useTags) { 1737 byte[] tag = generateData(this.rand, TAG_LENGTH); 1738 Tag[] tags = new Tag[opts.noOfTags]; 1739 for (int n = 0; n < opts.noOfTags; n++) { 1740 Tag t = new ArrayBackedTag((byte) n, tag); 1741 tags[n] = t; 1742 } 1743 KeyValue kv = 1744 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1745 put.add(kv); 1746 updateValueSize(kv.getValueLength()); 1747 } else { 1748 put.addColumn(familyName, qualifier, value); 1749 updateValueSize(value.length); 1750 } 1751 } 1752 } 1753 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1754 try { 1755 table.put(put).get(); 1756 if (opts.multiPut > 0) { 1757 this.puts.add(put); 1758 if (this.puts.size() == opts.multiPut) { 1759 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1760 this.puts.clear(); 1761 } else { 1762 return false; 1763 } 1764 } else { 1765 table.put(put).get(); 1766 } 1767 } catch (ExecutionException e) { 1768 throw new IOException(e); 1769 } 1770 return true; 1771 } 1772 } 1773 1774 static abstract class BufferedMutatorTest extends Test { 1775 protected BufferedMutator mutator; 1776 protected Table table; 1777 1778 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1779 super(con, options, status); 1780 } 1781 1782 @Override 1783 void onStartup() throws IOException { 1784 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1785 p.writeBufferSize(opts.bufferSize); 1786 this.mutator = connection.getBufferedMutator(p); 1787 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1788 } 1789 1790 @Override 1791 void onTakedown() throws IOException { 1792 mutator.close(); 1793 table.close(); 1794 } 1795 } 1796 1797 static class RandomSeekScanTest extends TableTest { 1798 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1799 super(con, options, status); 1800 } 1801 1802 @Override 1803 boolean testRow(final int i, final long startTime) throws IOException { 1804 Scan scan = 1805 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1806 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1807 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1808 FilterList list = new FilterList(); 1809 for (int family = 0; family < opts.families; family++) { 1810 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1811 if (opts.addColumns) { 1812 for (int column = 0; column < opts.columns; column++) { 1813 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1814 scan.addColumn(familyName, qualifier); 1815 } 1816 } else { 1817 scan.addFamily(familyName); 1818 } 1819 } 1820 if (opts.filterAll) { 1821 list.addFilter(new FilterAllFilter()); 1822 } 1823 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1824 scan.setFilter(list); 1825 ResultScanner s = this.table.getScanner(scan); 1826 try { 1827 for (Result rr; (rr = s.next()) != null;) { 1828 updateValueSize(rr); 1829 } 1830 } finally { 1831 updateScanMetrics(s.getScanMetrics()); 1832 s.close(); 1833 } 1834 return true; 1835 } 1836 1837 @Override 1838 protected int getReportingPeriod() { 1839 int period = opts.perClientRunRows / 100; 1840 return period == 0 ? opts.perClientRunRows : period; 1841 } 1842 1843 } 1844 1845 static abstract class RandomScanWithRangeTest extends TableTest { 1846 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1847 super(con, options, status); 1848 } 1849 1850 @Override 1851 boolean testRow(final int i, final long startTime) throws IOException { 1852 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1853 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1854 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1855 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1856 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1857 for (int family = 0; family < opts.families; family++) { 1858 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1859 if (opts.addColumns) { 1860 for (int column = 0; column < opts.columns; column++) { 1861 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1862 scan.addColumn(familyName, qualifier); 1863 } 1864 } else { 1865 scan.addFamily(familyName); 1866 } 1867 } 1868 if (opts.filterAll) { 1869 scan.setFilter(new FilterAllFilter()); 1870 } 1871 Result r = null; 1872 int count = 0; 1873 ResultScanner s = this.table.getScanner(scan); 1874 try { 1875 for (; (r = s.next()) != null;) { 1876 updateValueSize(r); 1877 count++; 1878 } 1879 if (i % 100 == 0) { 1880 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1881 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1882 count)); 1883 } 1884 } finally { 1885 updateScanMetrics(s.getScanMetrics()); 1886 s.close(); 1887 } 1888 return true; 1889 } 1890 1891 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1892 1893 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1894 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1895 int stop = start + maxRange; 1896 return new Pair<>(format(start), format(stop)); 1897 } 1898 1899 @Override 1900 protected int getReportingPeriod() { 1901 int period = opts.perClientRunRows / 100; 1902 return period == 0 ? opts.perClientRunRows : period; 1903 } 1904 } 1905 1906 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1907 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1908 super(con, options, status); 1909 } 1910 1911 @Override 1912 protected Pair<byte[], byte[]> getStartAndStopRow() { 1913 return generateStartAndStopRows(10); 1914 } 1915 } 1916 1917 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1918 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1919 super(con, options, status); 1920 } 1921 1922 @Override 1923 protected Pair<byte[], byte[]> getStartAndStopRow() { 1924 return generateStartAndStopRows(100); 1925 } 1926 } 1927 1928 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1929 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1930 super(con, options, status); 1931 } 1932 1933 @Override 1934 protected Pair<byte[], byte[]> getStartAndStopRow() { 1935 return generateStartAndStopRows(1000); 1936 } 1937 } 1938 1939 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1940 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1941 super(con, options, status); 1942 } 1943 1944 @Override 1945 protected Pair<byte[], byte[]> getStartAndStopRow() { 1946 return generateStartAndStopRows(10000); 1947 } 1948 } 1949 1950 static class RandomReadTest extends TableTest { 1951 private final Consistency consistency; 1952 private ArrayList<Get> gets; 1953 1954 RandomReadTest(Connection con, TestOptions options, Status status) { 1955 super(con, options, status); 1956 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1957 if (opts.multiGet > 0) { 1958 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1959 this.gets = new ArrayList<>(opts.multiGet); 1960 } 1961 } 1962 1963 @Override 1964 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1965 if (opts.randomSleep > 0) { 1966 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1967 } 1968 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1969 for (int family = 0; family < opts.families; family++) { 1970 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1971 if (opts.addColumns) { 1972 for (int column = 0; column < opts.columns; column++) { 1973 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1974 get.addColumn(familyName, qualifier); 1975 } 1976 } else { 1977 get.addFamily(familyName); 1978 } 1979 } 1980 if (opts.filterAll) { 1981 get.setFilter(new FilterAllFilter()); 1982 } 1983 get.setConsistency(consistency); 1984 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1985 if (opts.multiGet > 0) { 1986 this.gets.add(get); 1987 if (this.gets.size() == opts.multiGet) { 1988 Result[] rs = this.table.get(this.gets); 1989 if (opts.replicas > 1) { 1990 long latency = System.nanoTime() - startTime; 1991 updateValueSize(rs, latency); 1992 } else { 1993 updateValueSize(rs); 1994 } 1995 this.gets.clear(); 1996 } else { 1997 return false; 1998 } 1999 } else { 2000 if (opts.replicas > 1) { 2001 Result r = this.table.get(get); 2002 long latency = System.nanoTime() - startTime; 2003 updateValueSize(r, latency); 2004 } else { 2005 updateValueSize(this.table.get(get)); 2006 } 2007 } 2008 return true; 2009 } 2010 2011 @Override 2012 protected int getReportingPeriod() { 2013 int period = opts.perClientRunRows / 10; 2014 return period == 0 ? opts.perClientRunRows : period; 2015 } 2016 2017 @Override 2018 protected void testTakedown() throws IOException { 2019 if (this.gets != null && this.gets.size() > 0) { 2020 this.table.get(gets); 2021 this.gets.clear(); 2022 } 2023 super.testTakedown(); 2024 } 2025 } 2026 2027 /* 2028 * Send random reads against fake regions inserted by MetaWriteTest 2029 */ 2030 static class MetaRandomReadTest extends MetaTest { 2031 private RegionLocator regionLocator; 2032 2033 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2034 super(con, options, status); 2035 LOG.info("call getRegionLocation"); 2036 } 2037 2038 @Override 2039 void onStartup() throws IOException { 2040 super.onStartup(); 2041 this.regionLocator = connection.getRegionLocator(table.getName()); 2042 } 2043 2044 @Override 2045 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2046 if (opts.randomSleep > 0) { 2047 Thread.sleep(rand.nextInt(opts.randomSleep)); 2048 } 2049 HRegionLocation hRegionLocation = 2050 regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true); 2051 LOG.debug("get location for region: " + hRegionLocation); 2052 return true; 2053 } 2054 2055 @Override 2056 protected int getReportingPeriod() { 2057 int period = opts.perClientRunRows / 10; 2058 return period == 0 ? opts.perClientRunRows : period; 2059 } 2060 2061 @Override 2062 protected void testTakedown() throws IOException { 2063 super.testTakedown(); 2064 } 2065 } 2066 2067 static class RandomWriteTest extends SequentialWriteTest { 2068 RandomWriteTest(Connection con, TestOptions options, Status status) { 2069 super(con, options, status); 2070 } 2071 2072 @Override 2073 protected byte[] generateRow(final int i) { 2074 return getRandomRow(this.rand, opts.totalRows); 2075 } 2076 2077 } 2078 2079 static class RandomDeleteTest extends SequentialDeleteTest { 2080 RandomDeleteTest(Connection con, TestOptions options, Status status) { 2081 super(con, options, status); 2082 } 2083 2084 @Override 2085 protected byte[] generateRow(final int i) { 2086 return getRandomRow(this.rand, opts.totalRows); 2087 } 2088 2089 } 2090 2091 static class ScanTest extends TableTest { 2092 private ResultScanner testScanner; 2093 2094 ScanTest(Connection con, TestOptions options, Status status) { 2095 super(con, options, status); 2096 } 2097 2098 @Override 2099 void testTakedown() throws IOException { 2100 if (this.testScanner != null) { 2101 this.testScanner.close(); 2102 } 2103 super.testTakedown(); 2104 } 2105 2106 @Override 2107 boolean testRow(final int i, final long startTime) throws IOException { 2108 if (this.testScanner == null) { 2109 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2110 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2111 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2112 for (int family = 0; family < opts.families; family++) { 2113 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2114 if (opts.addColumns) { 2115 for (int column = 0; column < opts.columns; column++) { 2116 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2117 scan.addColumn(familyName, qualifier); 2118 } 2119 } else { 2120 scan.addFamily(familyName); 2121 } 2122 } 2123 if (opts.filterAll) { 2124 scan.setFilter(new FilterAllFilter()); 2125 } 2126 this.testScanner = table.getScanner(scan); 2127 } 2128 Result r = testScanner.next(); 2129 updateValueSize(r); 2130 return true; 2131 } 2132 } 2133 2134 static class ReverseScanTest extends TableTest { 2135 private ResultScanner testScanner; 2136 2137 ReverseScanTest(Connection con, TestOptions options, Status status) { 2138 super(con, options, status); 2139 } 2140 2141 @Override 2142 void testTakedown() throws IOException { 2143 if (this.testScanner != null) { 2144 this.testScanner.close(); 2145 } 2146 super.testTakedown(); 2147 } 2148 2149 @Override 2150 boolean testRow(final int i, final long startTime) throws IOException { 2151 if (this.testScanner == null) { 2152 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2153 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2154 .setScanMetricsEnabled(true).setReversed(true); 2155 for (int family = 0; family < opts.families; family++) { 2156 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2157 if (opts.addColumns) { 2158 for (int column = 0; column < opts.columns; column++) { 2159 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2160 scan.addColumn(familyName, qualifier); 2161 } 2162 } else { 2163 scan.addFamily(familyName); 2164 } 2165 } 2166 if (opts.filterAll) { 2167 scan.setFilter(new FilterAllFilter()); 2168 } 2169 this.testScanner = table.getScanner(scan); 2170 } 2171 Result r = testScanner.next(); 2172 updateValueSize(r); 2173 return true; 2174 } 2175 } 2176 2177 /** 2178 * Base class for operations that are CAS-like; that read a value and then set it based off what 2179 * they read. In this category is increment, append, checkAndPut, etc. 2180 * <p> 2181 * These operations also want some concurrency going on. Usually when these tests run, they 2182 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2183 * same key space. We do this with our getStartRow and getLastRow overrides. 2184 */ 2185 static abstract class CASTableTest extends TableTest { 2186 private final byte[] qualifier; 2187 2188 CASTableTest(Connection con, TestOptions options, Status status) { 2189 super(con, options, status); 2190 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2191 } 2192 2193 byte[] getQualifier() { 2194 return this.qualifier; 2195 } 2196 2197 @Override 2198 int getStartRow() { 2199 return 0; 2200 } 2201 2202 @Override 2203 int getLastRow() { 2204 return opts.perClientRunRows; 2205 } 2206 } 2207 2208 static class IncrementTest extends CASTableTest { 2209 IncrementTest(Connection con, TestOptions options, Status status) { 2210 super(con, options, status); 2211 } 2212 2213 @Override 2214 boolean testRow(final int i, final long startTime) throws IOException { 2215 Increment increment = new Increment(format(i)); 2216 // unlike checkAndXXX tests, which make most sense to do on a single value, 2217 // if multiple families are specified for an increment test we assume it is 2218 // meant to raise the work factor 2219 for (int family = 0; family < opts.families; family++) { 2220 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2221 increment.addColumn(familyName, getQualifier(), 1l); 2222 } 2223 updateValueSize(this.table.increment(increment)); 2224 return true; 2225 } 2226 } 2227 2228 static class AppendTest extends CASTableTest { 2229 AppendTest(Connection con, TestOptions options, Status status) { 2230 super(con, options, status); 2231 } 2232 2233 @Override 2234 boolean testRow(final int i, final long startTime) throws IOException { 2235 byte[] bytes = format(i); 2236 Append append = new Append(bytes); 2237 // unlike checkAndXXX tests, which make most sense to do on a single value, 2238 // if multiple families are specified for an append test we assume it is 2239 // meant to raise the work factor 2240 for (int family = 0; family < opts.families; family++) { 2241 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2242 append.addColumn(familyName, getQualifier(), bytes); 2243 } 2244 updateValueSize(this.table.append(append)); 2245 return true; 2246 } 2247 } 2248 2249 static class CheckAndMutateTest extends CASTableTest { 2250 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2251 super(con, options, status); 2252 } 2253 2254 @Override 2255 boolean testRow(final int i, final long startTime) throws IOException { 2256 final byte[] bytes = format(i); 2257 // checkAndXXX tests operate on only a single value 2258 // Put a known value so when we go to check it, it is there. 2259 Put put = new Put(bytes); 2260 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2261 this.table.put(put); 2262 RowMutations mutations = new RowMutations(bytes); 2263 mutations.add(put); 2264 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2265 .thenMutate(mutations); 2266 return true; 2267 } 2268 } 2269 2270 static class CheckAndPutTest extends CASTableTest { 2271 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2272 super(con, options, status); 2273 } 2274 2275 @Override 2276 boolean testRow(final int i, final long startTime) throws IOException { 2277 final byte[] bytes = format(i); 2278 // checkAndXXX tests operate on only a single value 2279 // Put a known value so when we go to check it, it is there. 2280 Put put = new Put(bytes); 2281 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2282 this.table.put(put); 2283 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2284 .thenPut(put); 2285 return true; 2286 } 2287 } 2288 2289 static class CheckAndDeleteTest extends CASTableTest { 2290 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2291 super(con, options, status); 2292 } 2293 2294 @Override 2295 boolean testRow(final int i, final long startTime) throws IOException { 2296 final byte[] bytes = format(i); 2297 // checkAndXXX tests operate on only a single value 2298 // Put a known value so when we go to check it, it is there. 2299 Put put = new Put(bytes); 2300 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2301 this.table.put(put); 2302 Delete delete = new Delete(put.getRow()); 2303 delete.addColumn(FAMILY_ZERO, getQualifier()); 2304 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2305 .thenDelete(delete); 2306 return true; 2307 } 2308 } 2309 2310 /* 2311 * Delete all fake regions inserted to meta table by MetaWriteTest. 2312 */ 2313 static class CleanMetaTest extends MetaTest { 2314 CleanMetaTest(Connection con, TestOptions options, Status status) { 2315 super(con, options, status); 2316 } 2317 2318 @Override 2319 boolean testRow(final int i, final long startTime) throws IOException { 2320 try { 2321 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2322 .getRegionLocation(getSplitKey(i), false).getRegion(); 2323 LOG.debug("deleting region from meta: " + regionInfo); 2324 2325 Delete delete = 2326 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2327 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2328 t.delete(delete); 2329 } 2330 } catch (IOException ie) { 2331 // Log and continue 2332 LOG.error("cannot find region with start key: " + i); 2333 } 2334 return true; 2335 } 2336 } 2337 2338 static class SequentialReadTest extends TableTest { 2339 SequentialReadTest(Connection con, TestOptions options, Status status) { 2340 super(con, options, status); 2341 } 2342 2343 @Override 2344 boolean testRow(final int i, final long startTime) throws IOException { 2345 Get get = new Get(format(i)); 2346 for (int family = 0; family < opts.families; family++) { 2347 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2348 if (opts.addColumns) { 2349 for (int column = 0; column < opts.columns; column++) { 2350 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2351 get.addColumn(familyName, qualifier); 2352 } 2353 } else { 2354 get.addFamily(familyName); 2355 } 2356 } 2357 if (opts.filterAll) { 2358 get.setFilter(new FilterAllFilter()); 2359 } 2360 updateValueSize(table.get(get)); 2361 return true; 2362 } 2363 } 2364 2365 static class SequentialWriteTest extends BufferedMutatorTest { 2366 private ArrayList<Put> puts; 2367 2368 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2369 super(con, options, status); 2370 if (opts.multiPut > 0) { 2371 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2372 this.puts = new ArrayList<>(opts.multiPut); 2373 } 2374 } 2375 2376 protected byte[] generateRow(final int i) { 2377 return format(i); 2378 } 2379 2380 @Override 2381 boolean testRow(final int i, final long startTime) throws IOException { 2382 byte[] row = generateRow(i); 2383 Put put = new Put(row); 2384 for (int family = 0; family < opts.families; family++) { 2385 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2386 for (int column = 0; column < opts.columns; column++) { 2387 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2388 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2389 if (opts.useTags) { 2390 byte[] tag = generateData(this.rand, TAG_LENGTH); 2391 Tag[] tags = new Tag[opts.noOfTags]; 2392 for (int n = 0; n < opts.noOfTags; n++) { 2393 Tag t = new ArrayBackedTag((byte) n, tag); 2394 tags[n] = t; 2395 } 2396 KeyValue kv = 2397 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2398 put.add(kv); 2399 updateValueSize(kv.getValueLength()); 2400 } else { 2401 put.addColumn(familyName, qualifier, value); 2402 updateValueSize(value.length); 2403 } 2404 } 2405 } 2406 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2407 if (opts.autoFlush) { 2408 if (opts.multiPut > 0) { 2409 this.puts.add(put); 2410 if (this.puts.size() == opts.multiPut) { 2411 table.put(this.puts); 2412 this.puts.clear(); 2413 } else { 2414 return false; 2415 } 2416 } else { 2417 table.put(put); 2418 } 2419 } else { 2420 mutator.mutate(put); 2421 } 2422 return true; 2423 } 2424 } 2425 2426 static class SequentialDeleteTest extends BufferedMutatorTest { 2427 2428 SequentialDeleteTest(Connection con, TestOptions options, Status status) { 2429 super(con, options, status); 2430 } 2431 2432 protected byte[] generateRow(final int i) { 2433 return format(i); 2434 } 2435 2436 @Override 2437 boolean testRow(final int i, final long startTime) throws IOException { 2438 byte[] row = generateRow(i); 2439 Delete delete = new Delete(row); 2440 for (int family = 0; family < opts.families; family++) { 2441 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2442 delete.addFamily(familyName); 2443 } 2444 delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2445 if (opts.autoFlush) { 2446 table.delete(delete); 2447 } else { 2448 mutator.mutate(delete); 2449 } 2450 return true; 2451 } 2452 } 2453 2454 /* 2455 * Insert fake regions into meta table with contiguous split keys. 2456 */ 2457 static class MetaWriteTest extends MetaTest { 2458 2459 MetaWriteTest(Connection con, TestOptions options, Status status) { 2460 super(con, options, status); 2461 } 2462 2463 @Override 2464 boolean testRow(final int i, final long startTime) throws IOException { 2465 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2466 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2467 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2468 regionInfos.add(regionInfo); 2469 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2470 2471 // write the serverName columns 2472 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2473 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2474 EnvironmentEdgeManager.currentTime()); 2475 return true; 2476 } 2477 } 2478 2479 static class FilteredScanTest extends TableTest { 2480 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2481 2482 FilteredScanTest(Connection con, TestOptions options, Status status) { 2483 super(con, options, status); 2484 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2485 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2486 + ". This could take a very long time."); 2487 } 2488 } 2489 2490 @Override 2491 boolean testRow(int i, final long startTime) throws IOException { 2492 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2493 Scan scan = constructScan(value); 2494 ResultScanner scanner = null; 2495 try { 2496 scanner = this.table.getScanner(scan); 2497 for (Result r = null; (r = scanner.next()) != null;) { 2498 updateValueSize(r); 2499 } 2500 } finally { 2501 if (scanner != null) { 2502 updateScanMetrics(scanner.getScanMetrics()); 2503 scanner.close(); 2504 } 2505 } 2506 return true; 2507 } 2508 2509 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2510 FilterList list = new FilterList(); 2511 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2512 new BinaryComparator(valuePrefix)); 2513 list.addFilter(filter); 2514 if (opts.filterAll) { 2515 list.addFilter(new FilterAllFilter()); 2516 } 2517 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2518 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2519 .setScanMetricsEnabled(true); 2520 if (opts.addColumns) { 2521 for (int column = 0; column < opts.columns; column++) { 2522 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2523 scan.addColumn(FAMILY_ZERO, qualifier); 2524 } 2525 } else { 2526 scan.addFamily(FAMILY_ZERO); 2527 } 2528 scan.setFilter(list); 2529 return scan; 2530 } 2531 } 2532 2533 /** 2534 * Compute a throughput rate in MB/s. 2535 * @param rows Number of records consumed. 2536 * @param timeMs Time taken in milliseconds. 2537 * @return String value with label, ie '123.76 MB/s' 2538 */ 2539 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2540 int columns) { 2541 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2542 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2543 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2544 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2545 return FMT.format(mbps) + " MB/s"; 2546 } 2547 2548 /* 2549 * Format passed integer. 2550 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does 2551 * absolute in case number is negative). 2552 */ 2553 public static byte[] format(final int number) { 2554 byte[] b = new byte[ROW_LENGTH]; 2555 int d = Math.abs(number); 2556 for (int i = b.length - 1; i >= 0; i--) { 2557 b[i] = (byte) ((d % 10) + '0'); 2558 d /= 10; 2559 } 2560 return b; 2561 } 2562 2563 /* 2564 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2565 * test, generation of the key and value consumes about 30% of CPU time. 2566 * @return Generated random value to insert into a table cell. 2567 */ 2568 public static byte[] generateData(final Random r, int length) { 2569 byte[] b = new byte[length]; 2570 int i; 2571 2572 for (i = 0; i < (length - 8); i += 8) { 2573 b[i] = (byte) (65 + r.nextInt(26)); 2574 b[i + 1] = b[i]; 2575 b[i + 2] = b[i]; 2576 b[i + 3] = b[i]; 2577 b[i + 4] = b[i]; 2578 b[i + 5] = b[i]; 2579 b[i + 6] = b[i]; 2580 b[i + 7] = b[i]; 2581 } 2582 2583 byte a = (byte) (65 + r.nextInt(26)); 2584 for (; i < length; i++) { 2585 b[i] = a; 2586 } 2587 return b; 2588 } 2589 2590 static byte[] getRandomRow(final Random random, final int totalRows) { 2591 return format(generateRandomRow(random, totalRows)); 2592 } 2593 2594 static int generateRandomRow(final Random random, final int totalRows) { 2595 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2596 } 2597 2598 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2599 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2600 throws IOException, InterruptedException { 2601 status.setStatus( 2602 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2603 long totalElapsedTime; 2604 2605 final TestBase t; 2606 try { 2607 if (AsyncTest.class.isAssignableFrom(cmd)) { 2608 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2609 Constructor<? extends AsyncTest> constructor = 2610 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2611 t = constructor.newInstance(asyncCon, opts, status); 2612 } else { 2613 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2614 Constructor<? extends Test> constructor = 2615 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2616 t = constructor.newInstance(con, opts, status); 2617 } 2618 } catch (NoSuchMethodException e) { 2619 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2620 + ". It does not provide a constructor as described by " 2621 + "the javadoc comment. Available constructors are: " 2622 + Arrays.toString(cmd.getConstructors())); 2623 } catch (Exception e) { 2624 throw new IllegalStateException("Failed to construct command class", e); 2625 } 2626 totalElapsedTime = t.test(); 2627 2628 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2629 + " for " + opts.perClientRunRows + " rows" + " (" 2630 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2631 getAverageValueLength(opts), opts.families, opts.columns) 2632 + ")"); 2633 2634 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2635 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2636 } 2637 2638 private static int getAverageValueLength(final TestOptions opts) { 2639 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2640 } 2641 2642 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2643 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2644 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2645 // the TestOptions introspection for us and dump the output in a readable format. 2646 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2647 Admin admin = null; 2648 Connection connection = null; 2649 try { 2650 connection = ConnectionFactory.createConnection(getConf()); 2651 admin = connection.getAdmin(); 2652 checkTable(admin, opts); 2653 } finally { 2654 if (admin != null) admin.close(); 2655 if (connection != null) connection.close(); 2656 } 2657 if (opts.nomapred) { 2658 doLocalClients(opts, getConf()); 2659 } else { 2660 doMapReduce(opts, getConf()); 2661 } 2662 } 2663 2664 protected void printUsage() { 2665 printUsage(PE_COMMAND_SHORTNAME, null); 2666 } 2667 2668 protected static void printUsage(final String message) { 2669 printUsage(PE_COMMAND_SHORTNAME, message); 2670 } 2671 2672 protected static void printUsageAndExit(final String message, final int exitCode) { 2673 printUsage(message); 2674 System.exit(exitCode); 2675 } 2676 2677 protected static void printUsage(final String shortName, final String message) { 2678 if (message != null && message.length() > 0) { 2679 System.err.println(message); 2680 } 2681 System.err.print("Usage: hbase " + shortName); 2682 System.err.println(" <OPTIONS> [-D<property=value>]* <command|class> <nclients>"); 2683 System.err.println(); 2684 System.err.println("General Options:"); 2685 System.err.println( 2686 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2687 System.err 2688 .println(" oneCon all the threads share the same connection. Default: False"); 2689 System.err.println(" connCount connections all threads share. " 2690 + "For example, if set to 2, then all thread share 2 connection. " 2691 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2692 + "if not, connCount=thread number"); 2693 2694 System.err.println(" sampleRate Execute test on a sample of total " 2695 + "rows. Only supported by randomRead. Default: 1.0"); 2696 System.err.println(" period Report every 'period' rows: " 2697 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2698 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2699 System.err.println( 2700 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2701 System.err.println(" latency Set to report operation latencies. Default: False"); 2702 System.err.println(" latencyThreshold Set to report number of operations with latency " 2703 + "over lantencyThreshold, unit in millisecond, default 0"); 2704 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2705 + " rows have been treated. Default: 0"); 2706 System.err 2707 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2708 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2709 + "'valueSize'; set on read for stats on size: Default: Not set."); 2710 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2711 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2712 System.err.println(); 2713 System.err.println("Table Creation / Write Tests:"); 2714 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2715 System.err.println( 2716 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2717 + ". In case of randomReads and randomSeekScans this could" 2718 + " be specified along with --size to specify the number of rows to be scanned within" 2719 + " the total range specified by the size."); 2720 System.err.println( 2721 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2722 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2723 + " use size to specify the end range and --rows" 2724 + " specifies the number of rows within that range. " + "Default: 1.0."); 2725 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2726 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2727 System.err.println( 2728 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2729 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2730 + "'valueSize' in zipf form: Default: Not set."); 2731 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2732 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2733 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2734 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2735 System.err.println(" presplit Create presplit table. If a table with same name exists," 2736 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2737 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2738 System.err.println( 2739 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2740 System.err.println(" numoftags Specify the no of tags that would be needed. " 2741 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2742 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2743 System.err.println(" columns Columns to write per row. Default: 1"); 2744 System.err 2745 .println(" families Specify number of column families for the table. Default: 1"); 2746 System.err.println(); 2747 System.err.println("Read Tests:"); 2748 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2749 + " there by not returning any thing back to the client. Helps to check the server side" 2750 + " performance. Uses FilterAllFilter internally. "); 2751 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2752 + "by randomRead. Default: disabled"); 2753 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2754 + "inmemory as far as possible. Not guaranteed that reads are always served " 2755 + "from memory. Default: false"); 2756 System.err 2757 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2758 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2759 System.err 2760 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2761 + "Uses the CompactingMemstore"); 2762 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2763 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2764 System.err.println( 2765 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2766 System.err.println(" caching Scan caching to use. Default: 30"); 2767 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2768 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2769 System.err.println( 2770 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2771 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2772 System.err.println(); 2773 System.err.println(" Note: -D properties will be applied to the conf used. "); 2774 System.err.println(" For example: "); 2775 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2776 System.err.println(" -Dmapreduce.task.timeout=60000"); 2777 System.err.println(); 2778 System.err.println("Command:"); 2779 for (CmdDescriptor command : COMMANDS.values()) { 2780 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2781 } 2782 System.err.println(); 2783 System.err.println("Class:"); 2784 System.err.println("To run any custom implementation of PerformanceEvaluation.Test, " 2785 + "provide the classname of the implementaion class in place of " 2786 + "command name and it will be loaded at runtime from classpath.:"); 2787 System.err.println("Please consider to contribute back " 2788 + "this custom test impl into a builtin PE command for the benefit of the community"); 2789 System.err.println(); 2790 System.err.println("Args:"); 2791 System.err.println(" nclients Integer. Required. Total number of clients " 2792 + "(and HRegionServers) running. 1 <= value <= 500"); 2793 System.err.println("Examples:"); 2794 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2795 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2796 System.err.println(" To run 10 clients doing increments over ten rows:"); 2797 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2798 } 2799 2800 /** 2801 * Parse options passed in via an arguments array. Assumes that array has been split on 2802 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2803 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2804 * arguments. 2805 */ 2806 static TestOptions parseOpts(Queue<String> args) { 2807 TestOptions opts = new TestOptions(); 2808 2809 String cmd = null; 2810 while ((cmd = args.poll()) != null) { 2811 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2812 // place item back onto queue so that caller knows parsing was incomplete 2813 args.add(cmd); 2814 break; 2815 } 2816 2817 final String nmr = "--nomapred"; 2818 if (cmd.startsWith(nmr)) { 2819 opts.nomapred = true; 2820 continue; 2821 } 2822 2823 final String rows = "--rows="; 2824 if (cmd.startsWith(rows)) { 2825 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2826 continue; 2827 } 2828 2829 final String cycles = "--cycles="; 2830 if (cmd.startsWith(cycles)) { 2831 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2832 continue; 2833 } 2834 2835 final String sampleRate = "--sampleRate="; 2836 if (cmd.startsWith(sampleRate)) { 2837 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2838 continue; 2839 } 2840 2841 final String table = "--table="; 2842 if (cmd.startsWith(table)) { 2843 opts.tableName = cmd.substring(table.length()); 2844 continue; 2845 } 2846 2847 final String startRow = "--startRow="; 2848 if (cmd.startsWith(startRow)) { 2849 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2850 continue; 2851 } 2852 2853 final String compress = "--compress="; 2854 if (cmd.startsWith(compress)) { 2855 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2856 continue; 2857 } 2858 2859 final String encryption = "--encryption="; 2860 if (cmd.startsWith(encryption)) { 2861 opts.encryption = cmd.substring(encryption.length()); 2862 continue; 2863 } 2864 2865 final String traceRate = "--traceRate="; 2866 if (cmd.startsWith(traceRate)) { 2867 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2868 continue; 2869 } 2870 2871 final String blockEncoding = "--blockEncoding="; 2872 if (cmd.startsWith(blockEncoding)) { 2873 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2874 continue; 2875 } 2876 2877 final String flushCommits = "--flushCommits="; 2878 if (cmd.startsWith(flushCommits)) { 2879 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2880 continue; 2881 } 2882 2883 final String writeToWAL = "--writeToWAL="; 2884 if (cmd.startsWith(writeToWAL)) { 2885 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2886 continue; 2887 } 2888 2889 final String presplit = "--presplit="; 2890 if (cmd.startsWith(presplit)) { 2891 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2892 continue; 2893 } 2894 2895 final String inMemory = "--inmemory="; 2896 if (cmd.startsWith(inMemory)) { 2897 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2898 continue; 2899 } 2900 2901 final String autoFlush = "--autoFlush="; 2902 if (cmd.startsWith(autoFlush)) { 2903 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2904 continue; 2905 } 2906 2907 final String onceCon = "--oneCon="; 2908 if (cmd.startsWith(onceCon)) { 2909 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2910 continue; 2911 } 2912 2913 final String connCount = "--connCount="; 2914 if (cmd.startsWith(connCount)) { 2915 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2916 continue; 2917 } 2918 2919 final String latencyThreshold = "--latencyThreshold="; 2920 if (cmd.startsWith(latencyThreshold)) { 2921 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2922 continue; 2923 } 2924 2925 final String latency = "--latency"; 2926 if (cmd.startsWith(latency)) { 2927 opts.reportLatency = true; 2928 continue; 2929 } 2930 2931 final String multiGet = "--multiGet="; 2932 if (cmd.startsWith(multiGet)) { 2933 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2934 continue; 2935 } 2936 2937 final String multiPut = "--multiPut="; 2938 if (cmd.startsWith(multiPut)) { 2939 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2940 continue; 2941 } 2942 2943 final String useTags = "--usetags="; 2944 if (cmd.startsWith(useTags)) { 2945 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2946 continue; 2947 } 2948 2949 final String noOfTags = "--numoftags="; 2950 if (cmd.startsWith(noOfTags)) { 2951 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2952 continue; 2953 } 2954 2955 final String replicas = "--replicas="; 2956 if (cmd.startsWith(replicas)) { 2957 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2958 continue; 2959 } 2960 2961 final String filterOutAll = "--filterAll"; 2962 if (cmd.startsWith(filterOutAll)) { 2963 opts.filterAll = true; 2964 continue; 2965 } 2966 2967 final String size = "--size="; 2968 if (cmd.startsWith(size)) { 2969 opts.size = Float.parseFloat(cmd.substring(size.length())); 2970 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2971 continue; 2972 } 2973 2974 final String splitPolicy = "--splitPolicy="; 2975 if (cmd.startsWith(splitPolicy)) { 2976 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2977 continue; 2978 } 2979 2980 final String randomSleep = "--randomSleep="; 2981 if (cmd.startsWith(randomSleep)) { 2982 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2983 continue; 2984 } 2985 2986 final String measureAfter = "--measureAfter="; 2987 if (cmd.startsWith(measureAfter)) { 2988 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2989 continue; 2990 } 2991 2992 final String bloomFilter = "--bloomFilter="; 2993 if (cmd.startsWith(bloomFilter)) { 2994 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2995 continue; 2996 } 2997 2998 final String blockSize = "--blockSize="; 2999 if (cmd.startsWith(blockSize)) { 3000 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 3001 continue; 3002 } 3003 3004 final String valueSize = "--valueSize="; 3005 if (cmd.startsWith(valueSize)) { 3006 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 3007 continue; 3008 } 3009 3010 final String valueRandom = "--valueRandom"; 3011 if (cmd.startsWith(valueRandom)) { 3012 opts.valueRandom = true; 3013 continue; 3014 } 3015 3016 final String valueZipf = "--valueZipf"; 3017 if (cmd.startsWith(valueZipf)) { 3018 opts.valueZipf = true; 3019 continue; 3020 } 3021 3022 final String period = "--period="; 3023 if (cmd.startsWith(period)) { 3024 opts.period = Integer.parseInt(cmd.substring(period.length())); 3025 continue; 3026 } 3027 3028 final String addColumns = "--addColumns="; 3029 if (cmd.startsWith(addColumns)) { 3030 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 3031 continue; 3032 } 3033 3034 final String inMemoryCompaction = "--inmemoryCompaction="; 3035 if (cmd.startsWith(inMemoryCompaction)) { 3036 opts.inMemoryCompaction = 3037 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 3038 continue; 3039 } 3040 3041 final String columns = "--columns="; 3042 if (cmd.startsWith(columns)) { 3043 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 3044 continue; 3045 } 3046 3047 final String families = "--families="; 3048 if (cmd.startsWith(families)) { 3049 opts.families = Integer.parseInt(cmd.substring(families.length())); 3050 continue; 3051 } 3052 3053 final String caching = "--caching="; 3054 if (cmd.startsWith(caching)) { 3055 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 3056 continue; 3057 } 3058 3059 final String asyncPrefetch = "--asyncPrefetch"; 3060 if (cmd.startsWith(asyncPrefetch)) { 3061 opts.asyncPrefetch = true; 3062 continue; 3063 } 3064 3065 final String cacheBlocks = "--cacheBlocks="; 3066 if (cmd.startsWith(cacheBlocks)) { 3067 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 3068 continue; 3069 } 3070 3071 final String scanReadType = "--scanReadType="; 3072 if (cmd.startsWith(scanReadType)) { 3073 opts.scanReadType = 3074 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 3075 continue; 3076 } 3077 3078 final String bufferSize = "--bufferSize="; 3079 if (cmd.startsWith(bufferSize)) { 3080 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 3081 continue; 3082 } 3083 3084 final String commandPropertiesFile = "--commandPropertiesFile="; 3085 if (cmd.startsWith(commandPropertiesFile)) { 3086 String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length())); 3087 Properties properties = new Properties(); 3088 try { 3089 properties 3090 .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName)); 3091 opts.commandProperties = properties; 3092 } catch (IOException e) { 3093 LOG.error("Failed to load metricIds from properties file", e); 3094 } 3095 continue; 3096 } 3097 3098 validateParsedOpts(opts); 3099 3100 if (isCommandClass(cmd)) { 3101 opts.cmdName = cmd; 3102 try { 3103 opts.numClientThreads = Integer.parseInt(args.remove()); 3104 } catch (NoSuchElementException | NumberFormatException e) { 3105 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 3106 } 3107 opts = calculateRowsAndSize(opts); 3108 break; 3109 } else { 3110 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 3111 } 3112 3113 // Not matching any option or command. 3114 System.err.println("Error: Wrong option or command: " + cmd); 3115 args.add(cmd); 3116 break; 3117 } 3118 return opts; 3119 } 3120 3121 /** 3122 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3123 */ 3124 private static void validateParsedOpts(TestOptions opts) { 3125 3126 if (!opts.autoFlush && opts.multiPut > 0) { 3127 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3128 } 3129 3130 if (opts.oneCon && opts.connCount > 1) { 3131 throw new IllegalArgumentException( 3132 "oneCon is set to true, " + "connCount should not bigger than 1"); 3133 } 3134 3135 if (opts.valueZipf && opts.valueRandom) { 3136 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3137 } 3138 } 3139 3140 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3141 int rowsPerGB = getRowsPerGB(opts); 3142 if ( 3143 (opts.getCmdName() != null 3144 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3145 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3146 ) { 3147 opts.totalRows = (int) opts.size * rowsPerGB; 3148 } else if (opts.size != DEFAULT_OPTS.size) { 3149 // total size in GB specified 3150 opts.totalRows = (int) opts.size * rowsPerGB; 3151 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3152 } else { 3153 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3154 opts.size = opts.totalRows / rowsPerGB; 3155 } 3156 return opts; 3157 } 3158 3159 static int getRowsPerGB(final TestOptions opts) { 3160 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3161 * opts.getColumns()); 3162 } 3163 3164 @Override 3165 public int run(String[] args) throws Exception { 3166 // Process command-line args. TODO: Better cmd-line processing 3167 // (but hopefully something not as painful as cli options). 3168 int errCode = -1; 3169 if (args.length < 1) { 3170 printUsage(); 3171 return errCode; 3172 } 3173 3174 try { 3175 LinkedList<String> argv = new LinkedList<>(); 3176 argv.addAll(Arrays.asList(args)); 3177 TestOptions opts = parseOpts(argv); 3178 3179 // args remaining, print help and exit 3180 if (!argv.isEmpty()) { 3181 errCode = 0; 3182 printUsage(); 3183 return errCode; 3184 } 3185 3186 // must run at least 1 client 3187 if (opts.numClientThreads <= 0) { 3188 throw new IllegalArgumentException("Number of clients must be > 0"); 3189 } 3190 3191 // cmdName should not be null, print help and exit 3192 if (opts.cmdName == null) { 3193 printUsage(); 3194 return errCode; 3195 } 3196 3197 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3198 if (cmdClass != null) { 3199 runTest(cmdClass, opts); 3200 errCode = 0; 3201 } 3202 3203 } catch (Exception e) { 3204 e.printStackTrace(); 3205 } 3206 3207 return errCode; 3208 } 3209 3210 private static boolean isCommandClass(String cmd) { 3211 return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd); 3212 } 3213 3214 private static boolean isCustomTestClass(String cmd) { 3215 Class<? extends Test> cmdClass; 3216 try { 3217 cmdClass = 3218 (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd); 3219 addCommandDescriptor(cmdClass, cmd, "custom command"); 3220 return true; 3221 } catch (Throwable th) { 3222 LOG.info("No class found for command: " + cmd, th); 3223 return false; 3224 } 3225 } 3226 3227 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3228 CmdDescriptor descriptor = COMMANDS.get(cmd); 3229 return descriptor != null ? descriptor.getCmdClass() : null; 3230 } 3231 3232 public static void main(final String[] args) throws Exception { 3233 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3234 System.exit(res); 3235 } 3236}