001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Queue; 040import java.util.Random; 041import java.util.TreeMap; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.ThreadLocalRandom; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.conf.Configured; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.Append; 055import org.apache.hadoop.hbase.client.AsyncConnection; 056import org.apache.hadoop.hbase.client.AsyncTable; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.BufferedMutatorParams; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Consistency; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Durability; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Increment; 067import org.apache.hadoop.hbase.client.Put; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.RegionInfoBuilder; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.RowMutations; 074import org.apache.hadoop.hbase.client.Scan; 075import org.apache.hadoop.hbase.client.Table; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 079import org.apache.hadoop.hbase.filter.BinaryComparator; 080import org.apache.hadoop.hbase.filter.Filter; 081import org.apache.hadoop.hbase.filter.FilterAllFilter; 082import org.apache.hadoop.hbase.filter.FilterList; 083import org.apache.hadoop.hbase.filter.PageFilter; 084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 085import org.apache.hadoop.hbase.filter.WhileMatchFilter; 086import org.apache.hadoop.hbase.io.compress.Compression; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 089import org.apache.hadoop.hbase.regionserver.BloomType; 090import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 091import org.apache.hadoop.hbase.trace.TraceUtil; 092import org.apache.hadoop.hbase.util.ByteArrayHashKey; 093import org.apache.hadoop.hbase.util.Bytes; 094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 095import org.apache.hadoop.hbase.util.GsonUtil; 096import org.apache.hadoop.hbase.util.Hash; 097import org.apache.hadoop.hbase.util.MurmurHash; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.util.RandomDistribution; 100import org.apache.hadoop.hbase.util.YammerHistogramUtils; 101import org.apache.hadoop.io.LongWritable; 102import org.apache.hadoop.io.Text; 103import org.apache.hadoop.mapreduce.Job; 104import org.apache.hadoop.mapreduce.Mapper; 105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 108import org.apache.hadoop.util.Tool; 109import org.apache.hadoop.util.ToolRunner; 110import org.apache.yetus.audience.InterfaceAudience; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113 114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 116import org.apache.hbase.thirdparty.com.google.gson.Gson; 117 118/** 119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 121 * etc.). Pass on the command-line which test to run and how many clients are participating in this 122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 123 * <p> 124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 126 * pages 8-10. 127 * <p> 128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 130 * about 1GB of data, unless specified otherwise. 131 */ 132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 133public class PerformanceEvaluation extends Configured implements Tool { 134 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 135 static final String RANDOM_READ = "randomRead"; 136 static final String PE_COMMAND_SHORTNAME = "pe"; 137 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 138 private static final Gson GSON = GsonUtil.createGson().create(); 139 140 public static final String TABLE_NAME = "TestTable"; 141 public static final String FAMILY_NAME_BASE = "info"; 142 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 143 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 144 public static final int DEFAULT_VALUE_LENGTH = 1000; 145 public static final int ROW_LENGTH = 26; 146 147 private static final int ONE_GB = 1024 * 1024 * 1000; 148 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 149 // TODO : should we make this configurable 150 private static final int TAG_LENGTH = 256; 151 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 152 private static final MathContext CXT = MathContext.DECIMAL64; 153 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 154 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 155 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 156 157 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 158 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 159 160 static { 161 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 162 "Run async random read test"); 163 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 164 "Run async random write test"); 165 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 166 "Run async sequential read test"); 167 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 168 "Run async sequential write test"); 169 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 170 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 171 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 172 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 173 "Run random seek and scan 100 test"); 174 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 175 "Run random seek scan with both start and stop row (max 10 rows)"); 176 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 177 "Run random seek scan with both start and stop row (max 100 rows)"); 178 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 179 "Run random seek scan with both start and stop row (max 1000 rows)"); 180 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 181 "Run random seek scan with both start and stop row (max 10000 rows)"); 182 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 183 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 184 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 185 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 186 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 187 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 188 addCommandDescriptor(ReverseScanTest.class, "reverseScan", 189 "Run reverse scan test (read every row)"); 190 addCommandDescriptor(FilteredScanTest.class, "filterScan", 191 "Run scan test using a filter to find a specific row based on it's value " 192 + "(make sure to use --rows=20)"); 193 addCommandDescriptor(IncrementTest.class, "increment", 194 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 195 addCommandDescriptor(AppendTest.class, "append", 196 "Append on each row; clients overlap on keyspace so some concurrent operations"); 197 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 198 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 200 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 202 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 203 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 204 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 205 } 206 207 /** 208 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 209 * associated properties. 210 */ 211 protected static enum Counter { 212 /** elapsed time */ 213 ELAPSED_TIME, 214 /** number of rows */ 215 ROWS 216 } 217 218 protected static class RunResult implements Comparable<RunResult> { 219 public RunResult(long duration, Histogram hist) { 220 this.duration = duration; 221 this.hist = hist; 222 numbOfReplyOverThreshold = 0; 223 numOfReplyFromReplica = 0; 224 } 225 226 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 227 Histogram hist) { 228 this.duration = duration; 229 this.hist = hist; 230 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 231 this.numOfReplyFromReplica = numOfReplyFromReplica; 232 } 233 234 public final long duration; 235 public final Histogram hist; 236 public final long numbOfReplyOverThreshold; 237 public final long numOfReplyFromReplica; 238 239 @Override 240 public String toString() { 241 return Long.toString(duration); 242 } 243 244 @Override 245 public int compareTo(RunResult o) { 246 return Long.compare(this.duration, o.duration); 247 } 248 } 249 250 /** 251 * Constructor 252 * @param conf Configuration object 253 */ 254 public PerformanceEvaluation(final Configuration conf) { 255 super(conf); 256 } 257 258 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 259 String description) { 260 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 261 COMMANDS.put(name, cmdDescriptor); 262 } 263 264 /** 265 * Implementations can have their status set. 266 */ 267 interface Status { 268 /** 269 * Sets status 270 * @param msg status message 271 */ 272 void setStatus(final String msg) throws IOException; 273 } 274 275 /** 276 * MapReduce job that runs a performance evaluation client in each map task. 277 */ 278 public static class EvaluationMapTask 279 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 280 281 /** configuration parameter name that contains the command */ 282 public final static String CMD_KEY = "EvaluationMapTask.command"; 283 /** configuration parameter name that contains the PE impl */ 284 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 285 286 private Class<? extends Test> cmd; 287 288 @Override 289 protected void setup(Context context) throws IOException, InterruptedException { 290 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 291 292 // this is required so that extensions of PE are instantiated within the 293 // map reduce task... 294 Class<? extends PerformanceEvaluation> peClass = 295 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 296 try { 297 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 298 } catch (Exception e) { 299 throw new IllegalStateException("Could not instantiate PE instance", e); 300 } 301 } 302 303 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 304 try { 305 return Class.forName(className).asSubclass(type); 306 } catch (ClassNotFoundException e) { 307 throw new IllegalStateException("Could not find class for name: " + className, e); 308 } 309 } 310 311 @Override 312 protected void map(LongWritable key, Text value, final Context context) 313 throws IOException, InterruptedException { 314 315 Status status = new Status() { 316 @Override 317 public void setStatus(String msg) { 318 context.setStatus(msg); 319 } 320 }; 321 322 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 323 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 324 final Connection con = ConnectionFactory.createConnection(conf); 325 AsyncConnection asyncCon = null; 326 try { 327 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 328 } catch (ExecutionException e) { 329 throw new IOException(e); 330 } 331 332 // Evaluation task 333 RunResult result = 334 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 335 // Collect how much time the thing took. Report as map output and 336 // to the ELAPSED_TIME counter. 337 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 338 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 339 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 340 context.progress(); 341 } 342 } 343 344 /* 345 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 346 * is specified or when the existing table's region replica count doesn't match {@code 347 * opts.replicas}. 348 */ 349 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 350 TableName tableName = TableName.valueOf(opts.tableName); 351 boolean needsDelete = false, exists = admin.tableExists(tableName); 352 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 353 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 354 if (!exists && isReadCmd) { 355 throw new IllegalStateException( 356 "Must specify an existing table for read commands. Run a write command first."); 357 } 358 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 359 byte[][] splits = getSplits(opts); 360 361 // recreate the table when user has requested presplit or when existing 362 // {RegionSplitPolicy,replica count} does not match requested, or when the 363 // number of column families does not match requested. 364 if ( 365 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 366 || (!isReadCmd && desc != null 367 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 368 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 369 || (desc != null && desc.getColumnFamilyCount() != opts.families) 370 ) { 371 needsDelete = true; 372 // wait, why did it delete my table?!? 373 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 374 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 375 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 376 .add("replicas", opts.replicas).add("families", opts.families).toString()); 377 } 378 379 // remove an existing table 380 if (needsDelete) { 381 if (admin.isTableEnabled(tableName)) { 382 admin.disableTable(tableName); 383 } 384 admin.deleteTable(tableName); 385 } 386 387 // table creation is necessary 388 if (!exists || needsDelete) { 389 desc = getTableDescriptor(opts); 390 if (splits != null) { 391 if (LOG.isDebugEnabled()) { 392 for (int i = 0; i < splits.length; i++) { 393 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 394 } 395 } 396 } 397 if (splits != null) { 398 admin.createTable(desc, splits); 399 } else { 400 admin.createTable(desc); 401 } 402 LOG.info("Table " + desc + " created"); 403 } 404 return admin.tableExists(tableName); 405 } 406 407 /** 408 * Create an HTableDescriptor from provided TestOptions. 409 */ 410 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 411 TableDescriptorBuilder builder = 412 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 413 414 for (int family = 0; family < opts.families; family++) { 415 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 416 ColumnFamilyDescriptorBuilder cfBuilder = 417 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 418 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 419 cfBuilder.setCompressionType(opts.compression); 420 cfBuilder.setEncryptionType(opts.encryption); 421 cfBuilder.setBloomFilterType(opts.bloomType); 422 cfBuilder.setBlocksize(opts.blockSize); 423 if (opts.inMemoryCF) { 424 cfBuilder.setInMemory(true); 425 } 426 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 427 builder.setColumnFamily(cfBuilder.build()); 428 } 429 if (opts.replicas != DEFAULT_OPTS.replicas) { 430 builder.setRegionReplication(opts.replicas); 431 } 432 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 433 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 434 } 435 return builder.build(); 436 } 437 438 /** 439 * generates splits based on total number of rows and specified split regions 440 */ 441 protected static byte[][] getSplits(TestOptions opts) { 442 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 443 444 int numSplitPoints = opts.presplitRegions - 1; 445 byte[][] splits = new byte[numSplitPoints][]; 446 int jump = opts.totalRows / opts.presplitRegions; 447 for (int i = 0; i < numSplitPoints; i++) { 448 int rowkey = jump * (1 + i); 449 splits[i] = format(rowkey); 450 } 451 return splits; 452 } 453 454 static void setupConnectionCount(final TestOptions opts) { 455 if (opts.oneCon) { 456 opts.connCount = 1; 457 } else { 458 if (opts.connCount == -1) { 459 // set to thread number if connCount is not set 460 opts.connCount = opts.numClientThreads; 461 } 462 } 463 } 464 465 /* 466 * Run all clients in this vm each to its own thread. 467 */ 468 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 469 throws IOException, InterruptedException, ExecutionException { 470 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 471 assert cmd != null; 472 @SuppressWarnings("unchecked") 473 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 474 RunResult[] results = new RunResult[opts.numClientThreads]; 475 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 476 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 477 setupConnectionCount(opts); 478 final Connection[] cons = new Connection[opts.connCount]; 479 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 480 for (int i = 0; i < opts.connCount; i++) { 481 cons[i] = ConnectionFactory.createConnection(conf); 482 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 483 } 484 LOG 485 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 486 for (int i = 0; i < threads.length; i++) { 487 final int index = i; 488 threads[i] = pool.submit(new Callable<RunResult>() { 489 @Override 490 public RunResult call() throws Exception { 491 TestOptions threadOpts = new TestOptions(opts); 492 final Connection con = cons[index % cons.length]; 493 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 494 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 495 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 496 @Override 497 public void setStatus(final String msg) throws IOException { 498 LOG.info(msg); 499 } 500 }); 501 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 502 + "ms over " + threadOpts.perClientRunRows + " rows"); 503 if (opts.latencyThreshold > 0) { 504 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 505 + "(ms) is " + run.numbOfReplyOverThreshold); 506 } 507 return run; 508 } 509 }); 510 } 511 pool.shutdown(); 512 513 for (int i = 0; i < threads.length; i++) { 514 try { 515 results[i] = threads[i].get(); 516 } catch (ExecutionException e) { 517 throw new IOException(e.getCause()); 518 } 519 } 520 final String test = cmd.getSimpleName(); 521 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 522 Arrays.sort(results); 523 long total = 0; 524 float avgLatency = 0; 525 float avgTPS = 0; 526 long replicaWins = 0; 527 for (RunResult result : results) { 528 total += result.duration; 529 avgLatency += result.hist.getSnapshot().getMean(); 530 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 531 replicaWins += result.numOfReplyFromReplica; 532 } 533 avgTPS *= 1000; // ms to second 534 avgLatency = avgLatency / results.length; 535 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 536 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 537 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 538 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 539 if (opts.replicas > 1) { 540 LOG.info("[results from replica regions] " + replicaWins); 541 } 542 543 for (int i = 0; i < opts.connCount; i++) { 544 cons[i].close(); 545 asyncCons[i].close(); 546 } 547 548 return results; 549 } 550 551 /* 552 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 553 * out an input file with instruction per client regards which row they are to start on. 554 * @param cmd Command to run. 555 */ 556 static Job doMapReduce(TestOptions opts, final Configuration conf) 557 throws IOException, InterruptedException, ClassNotFoundException { 558 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 559 assert cmd != null; 560 Path inputDir = writeInputFile(conf, opts); 561 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 562 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 563 Job job = Job.getInstance(conf); 564 job.setJarByClass(PerformanceEvaluation.class); 565 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 566 567 job.setInputFormatClass(NLineInputFormat.class); 568 NLineInputFormat.setInputPaths(job, inputDir); 569 // this is default, but be explicit about it just in case. 570 NLineInputFormat.setNumLinesPerSplit(job, 1); 571 572 job.setOutputKeyClass(LongWritable.class); 573 job.setOutputValueClass(LongWritable.class); 574 575 job.setMapperClass(EvaluationMapTask.class); 576 job.setReducerClass(LongSumReducer.class); 577 578 job.setNumReduceTasks(1); 579 580 job.setOutputFormatClass(TextOutputFormat.class); 581 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 582 583 TableMapReduceUtil.addDependencyJars(job); 584 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 585 // metrics 586 Gson.class, // gson 587 FilterAllFilter.class // hbase-server tests jar 588 ); 589 590 TableMapReduceUtil.initCredentials(job); 591 592 job.waitForCompletion(true); 593 return job; 594 } 595 596 /** 597 * Each client has one mapper to do the work, and client do the resulting count in a map task. 598 */ 599 600 static String JOB_INPUT_FILENAME = "input.txt"; 601 602 /* 603 * Write input file of offsets-per-client for the mapreduce job. 604 * @param c Configuration 605 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 606 */ 607 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 608 return writeInputFile(c, opts, new Path(".")); 609 } 610 611 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 612 throws IOException { 613 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 614 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 615 Path inputDir = new Path(jobdir, "inputs"); 616 617 FileSystem fs = FileSystem.get(c); 618 fs.mkdirs(inputDir); 619 620 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 621 PrintStream out = new PrintStream(fs.create(inputFile)); 622 // Make input random. 623 Map<Integer, String> m = new TreeMap<>(); 624 Hash h = MurmurHash.getInstance(); 625 int perClientRows = (opts.totalRows / opts.numClientThreads); 626 try { 627 for (int j = 0; j < opts.numClientThreads; j++) { 628 TestOptions next = new TestOptions(opts); 629 next.startRow = j * perClientRows; 630 next.perClientRunRows = perClientRows; 631 String s = GSON.toJson(next); 632 LOG.info("Client=" + j + ", input=" + s); 633 byte[] b = Bytes.toBytes(s); 634 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 635 m.put(hash, s); 636 } 637 for (Map.Entry<Integer, String> e : m.entrySet()) { 638 out.println(e.getValue()); 639 } 640 } finally { 641 out.close(); 642 } 643 return inputDir; 644 } 645 646 /** 647 * Describes a command. 648 */ 649 static class CmdDescriptor { 650 private Class<? extends TestBase> cmdClass; 651 private String name; 652 private String description; 653 654 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 655 this.cmdClass = cmdClass; 656 this.name = name; 657 this.description = description; 658 } 659 660 public Class<? extends TestBase> getCmdClass() { 661 return cmdClass; 662 } 663 664 public String getName() { 665 return name; 666 } 667 668 public String getDescription() { 669 return description; 670 } 671 } 672 673 /** 674 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 675 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 676 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 677 * need to add to the clone constructor below copying your new option from the 'that' to the 678 * 'this'. Look for 'clone' below. 679 */ 680 static class TestOptions { 681 String cmdName = null; 682 boolean nomapred = false; 683 boolean filterAll = false; 684 int startRow = 0; 685 float size = 1.0f; 686 int perClientRunRows = DEFAULT_ROWS_PER_GB; 687 int numClientThreads = 1; 688 int totalRows = DEFAULT_ROWS_PER_GB; 689 int measureAfter = 0; 690 float sampleRate = 1.0f; 691 /** 692 * @deprecated Useless after switching to OpenTelemetry 693 */ 694 @Deprecated 695 double traceRate = 0.0; 696 String tableName = TABLE_NAME; 697 boolean flushCommits = true; 698 boolean writeToWAL = true; 699 boolean autoFlush = false; 700 boolean oneCon = false; 701 int connCount = -1; // wil decide the actual num later 702 boolean useTags = false; 703 int noOfTags = 1; 704 boolean reportLatency = false; 705 int multiGet = 0; 706 int multiPut = 0; 707 int randomSleep = 0; 708 boolean inMemoryCF = false; 709 int presplitRegions = 0; 710 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 711 String splitPolicy = null; 712 Compression.Algorithm compression = Compression.Algorithm.NONE; 713 String encryption = null; 714 BloomType bloomType = BloomType.ROW; 715 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 716 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 717 boolean valueRandom = false; 718 boolean valueZipf = false; 719 int valueSize = DEFAULT_VALUE_LENGTH; 720 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 721 int cycles = 1; 722 int columns = 1; 723 int families = 1; 724 int caching = 30; 725 int latencyThreshold = 0; // in millsecond 726 boolean addColumns = true; 727 MemoryCompactionPolicy inMemoryCompaction = 728 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 729 boolean asyncPrefetch = false; 730 boolean cacheBlocks = true; 731 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 732 long bufferSize = 2l * 1024l * 1024l; 733 734 public TestOptions() { 735 } 736 737 /** 738 * Clone constructor. 739 * @param that Object to copy from. 740 */ 741 public TestOptions(TestOptions that) { 742 this.cmdName = that.cmdName; 743 this.cycles = that.cycles; 744 this.nomapred = that.nomapred; 745 this.startRow = that.startRow; 746 this.size = that.size; 747 this.perClientRunRows = that.perClientRunRows; 748 this.numClientThreads = that.numClientThreads; 749 this.totalRows = that.totalRows; 750 this.sampleRate = that.sampleRate; 751 this.traceRate = that.traceRate; 752 this.tableName = that.tableName; 753 this.flushCommits = that.flushCommits; 754 this.writeToWAL = that.writeToWAL; 755 this.autoFlush = that.autoFlush; 756 this.oneCon = that.oneCon; 757 this.connCount = that.connCount; 758 this.useTags = that.useTags; 759 this.noOfTags = that.noOfTags; 760 this.reportLatency = that.reportLatency; 761 this.latencyThreshold = that.latencyThreshold; 762 this.multiGet = that.multiGet; 763 this.multiPut = that.multiPut; 764 this.inMemoryCF = that.inMemoryCF; 765 this.presplitRegions = that.presplitRegions; 766 this.replicas = that.replicas; 767 this.splitPolicy = that.splitPolicy; 768 this.compression = that.compression; 769 this.encryption = that.encryption; 770 this.blockEncoding = that.blockEncoding; 771 this.filterAll = that.filterAll; 772 this.bloomType = that.bloomType; 773 this.blockSize = that.blockSize; 774 this.valueRandom = that.valueRandom; 775 this.valueZipf = that.valueZipf; 776 this.valueSize = that.valueSize; 777 this.period = that.period; 778 this.randomSleep = that.randomSleep; 779 this.measureAfter = that.measureAfter; 780 this.addColumns = that.addColumns; 781 this.columns = that.columns; 782 this.families = that.families; 783 this.caching = that.caching; 784 this.inMemoryCompaction = that.inMemoryCompaction; 785 this.asyncPrefetch = that.asyncPrefetch; 786 this.cacheBlocks = that.cacheBlocks; 787 this.scanReadType = that.scanReadType; 788 this.bufferSize = that.bufferSize; 789 } 790 791 public int getCaching() { 792 return this.caching; 793 } 794 795 public void setCaching(final int caching) { 796 this.caching = caching; 797 } 798 799 public int getColumns() { 800 return this.columns; 801 } 802 803 public void setColumns(final int columns) { 804 this.columns = columns; 805 } 806 807 public int getFamilies() { 808 return this.families; 809 } 810 811 public void setFamilies(final int families) { 812 this.families = families; 813 } 814 815 public int getCycles() { 816 return this.cycles; 817 } 818 819 public void setCycles(final int cycles) { 820 this.cycles = cycles; 821 } 822 823 public boolean isValueZipf() { 824 return valueZipf; 825 } 826 827 public void setValueZipf(boolean valueZipf) { 828 this.valueZipf = valueZipf; 829 } 830 831 public String getCmdName() { 832 return cmdName; 833 } 834 835 public void setCmdName(String cmdName) { 836 this.cmdName = cmdName; 837 } 838 839 public int getRandomSleep() { 840 return randomSleep; 841 } 842 843 public void setRandomSleep(int randomSleep) { 844 this.randomSleep = randomSleep; 845 } 846 847 public int getReplicas() { 848 return replicas; 849 } 850 851 public void setReplicas(int replicas) { 852 this.replicas = replicas; 853 } 854 855 public String getSplitPolicy() { 856 return splitPolicy; 857 } 858 859 public void setSplitPolicy(String splitPolicy) { 860 this.splitPolicy = splitPolicy; 861 } 862 863 public void setNomapred(boolean nomapred) { 864 this.nomapred = nomapred; 865 } 866 867 public void setFilterAll(boolean filterAll) { 868 this.filterAll = filterAll; 869 } 870 871 public void setStartRow(int startRow) { 872 this.startRow = startRow; 873 } 874 875 public void setSize(float size) { 876 this.size = size; 877 } 878 879 public void setPerClientRunRows(int perClientRunRows) { 880 this.perClientRunRows = perClientRunRows; 881 } 882 883 public void setNumClientThreads(int numClientThreads) { 884 this.numClientThreads = numClientThreads; 885 } 886 887 public void setTotalRows(int totalRows) { 888 this.totalRows = totalRows; 889 } 890 891 public void setSampleRate(float sampleRate) { 892 this.sampleRate = sampleRate; 893 } 894 895 public void setTraceRate(double traceRate) { 896 this.traceRate = traceRate; 897 } 898 899 public void setTableName(String tableName) { 900 this.tableName = tableName; 901 } 902 903 public void setFlushCommits(boolean flushCommits) { 904 this.flushCommits = flushCommits; 905 } 906 907 public void setWriteToWAL(boolean writeToWAL) { 908 this.writeToWAL = writeToWAL; 909 } 910 911 public void setAutoFlush(boolean autoFlush) { 912 this.autoFlush = autoFlush; 913 } 914 915 public void setOneCon(boolean oneCon) { 916 this.oneCon = oneCon; 917 } 918 919 public int getConnCount() { 920 return connCount; 921 } 922 923 public void setConnCount(int connCount) { 924 this.connCount = connCount; 925 } 926 927 public void setUseTags(boolean useTags) { 928 this.useTags = useTags; 929 } 930 931 public void setNoOfTags(int noOfTags) { 932 this.noOfTags = noOfTags; 933 } 934 935 public void setReportLatency(boolean reportLatency) { 936 this.reportLatency = reportLatency; 937 } 938 939 public void setMultiGet(int multiGet) { 940 this.multiGet = multiGet; 941 } 942 943 public void setMultiPut(int multiPut) { 944 this.multiPut = multiPut; 945 } 946 947 public void setInMemoryCF(boolean inMemoryCF) { 948 this.inMemoryCF = inMemoryCF; 949 } 950 951 public void setPresplitRegions(int presplitRegions) { 952 this.presplitRegions = presplitRegions; 953 } 954 955 public void setCompression(Compression.Algorithm compression) { 956 this.compression = compression; 957 } 958 959 public void setEncryption(String encryption) { 960 this.encryption = encryption; 961 } 962 963 public void setBloomType(BloomType bloomType) { 964 this.bloomType = bloomType; 965 } 966 967 public void setBlockSize(int blockSize) { 968 this.blockSize = blockSize; 969 } 970 971 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 972 this.blockEncoding = blockEncoding; 973 } 974 975 public void setValueRandom(boolean valueRandom) { 976 this.valueRandom = valueRandom; 977 } 978 979 public void setValueSize(int valueSize) { 980 this.valueSize = valueSize; 981 } 982 983 public void setBufferSize(long bufferSize) { 984 this.bufferSize = bufferSize; 985 } 986 987 public void setPeriod(int period) { 988 this.period = period; 989 } 990 991 public boolean isNomapred() { 992 return nomapred; 993 } 994 995 public boolean isFilterAll() { 996 return filterAll; 997 } 998 999 public int getStartRow() { 1000 return startRow; 1001 } 1002 1003 public float getSize() { 1004 return size; 1005 } 1006 1007 public int getPerClientRunRows() { 1008 return perClientRunRows; 1009 } 1010 1011 public int getNumClientThreads() { 1012 return numClientThreads; 1013 } 1014 1015 public int getTotalRows() { 1016 return totalRows; 1017 } 1018 1019 public float getSampleRate() { 1020 return sampleRate; 1021 } 1022 1023 public double getTraceRate() { 1024 return traceRate; 1025 } 1026 1027 public String getTableName() { 1028 return tableName; 1029 } 1030 1031 public boolean isFlushCommits() { 1032 return flushCommits; 1033 } 1034 1035 public boolean isWriteToWAL() { 1036 return writeToWAL; 1037 } 1038 1039 public boolean isAutoFlush() { 1040 return autoFlush; 1041 } 1042 1043 public boolean isUseTags() { 1044 return useTags; 1045 } 1046 1047 public int getNoOfTags() { 1048 return noOfTags; 1049 } 1050 1051 public boolean isReportLatency() { 1052 return reportLatency; 1053 } 1054 1055 public int getMultiGet() { 1056 return multiGet; 1057 } 1058 1059 public int getMultiPut() { 1060 return multiPut; 1061 } 1062 1063 public boolean isInMemoryCF() { 1064 return inMemoryCF; 1065 } 1066 1067 public int getPresplitRegions() { 1068 return presplitRegions; 1069 } 1070 1071 public Compression.Algorithm getCompression() { 1072 return compression; 1073 } 1074 1075 public String getEncryption() { 1076 return encryption; 1077 } 1078 1079 public DataBlockEncoding getBlockEncoding() { 1080 return blockEncoding; 1081 } 1082 1083 public boolean isValueRandom() { 1084 return valueRandom; 1085 } 1086 1087 public int getValueSize() { 1088 return valueSize; 1089 } 1090 1091 public int getPeriod() { 1092 return period; 1093 } 1094 1095 public BloomType getBloomType() { 1096 return bloomType; 1097 } 1098 1099 public int getBlockSize() { 1100 return blockSize; 1101 } 1102 1103 public boolean isOneCon() { 1104 return oneCon; 1105 } 1106 1107 public int getMeasureAfter() { 1108 return measureAfter; 1109 } 1110 1111 public void setMeasureAfter(int measureAfter) { 1112 this.measureAfter = measureAfter; 1113 } 1114 1115 public boolean getAddColumns() { 1116 return addColumns; 1117 } 1118 1119 public void setAddColumns(boolean addColumns) { 1120 this.addColumns = addColumns; 1121 } 1122 1123 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1124 this.inMemoryCompaction = inMemoryCompaction; 1125 } 1126 1127 public MemoryCompactionPolicy getInMemoryCompaction() { 1128 return this.inMemoryCompaction; 1129 } 1130 1131 public long getBufferSize() { 1132 return this.bufferSize; 1133 } 1134 } 1135 1136 /* 1137 * A test. Subclass to particularize what happens per row. 1138 */ 1139 static abstract class TestBase { 1140 // Below is make it so when Tests are all running in the one 1141 // jvm, that they each have a differently seeded Random. 1142 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1143 1144 private static long nextRandomSeed() { 1145 return randomSeed.nextLong(); 1146 } 1147 1148 private final int everyN; 1149 1150 protected final Random rand = new Random(nextRandomSeed()); 1151 protected final Configuration conf; 1152 protected final TestOptions opts; 1153 1154 private final Status status; 1155 1156 private String testName; 1157 private Histogram latencyHistogram; 1158 private Histogram replicaLatencyHistogram; 1159 private Histogram valueSizeHistogram; 1160 private Histogram rpcCallsHistogram; 1161 private Histogram remoteRpcCallsHistogram; 1162 private Histogram millisBetweenNextHistogram; 1163 private Histogram regionsScannedHistogram; 1164 private Histogram bytesInResultsHistogram; 1165 private Histogram bytesInRemoteResultsHistogram; 1166 private RandomDistribution.Zipf zipf; 1167 private long numOfReplyOverLatencyThreshold = 0; 1168 private long numOfReplyFromReplica = 0; 1169 1170 /** 1171 * Note that all subclasses of this class must provide a public constructor that has the exact 1172 * same list of arguments. 1173 */ 1174 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1175 this.conf = conf; 1176 this.opts = options; 1177 this.status = status; 1178 this.testName = this.getClass().getSimpleName(); 1179 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1180 if (options.isValueZipf()) { 1181 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1182 } 1183 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1184 } 1185 1186 int getValueLength(final Random r) { 1187 if (this.opts.isValueRandom()) { 1188 return r.nextInt(opts.valueSize); 1189 } else if (this.opts.isValueZipf()) { 1190 return Math.abs(this.zipf.nextInt()); 1191 } else { 1192 return opts.valueSize; 1193 } 1194 } 1195 1196 void updateValueSize(final Result[] rs) throws IOException { 1197 updateValueSize(rs, 0); 1198 } 1199 1200 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1201 if (rs == null || (latency == 0)) return; 1202 for (Result r : rs) 1203 updateValueSize(r, latency); 1204 } 1205 1206 void updateValueSize(final Result r) throws IOException { 1207 updateValueSize(r, 0); 1208 } 1209 1210 void updateValueSize(final Result r, final long latency) throws IOException { 1211 if (r == null || (latency == 0)) return; 1212 int size = 0; 1213 // update replicaHistogram 1214 if (r.isStale()) { 1215 replicaLatencyHistogram.update(latency / 1000); 1216 numOfReplyFromReplica++; 1217 } 1218 if (!isRandomValueSize()) return; 1219 1220 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1221 size += scanner.current().getValueLength(); 1222 } 1223 updateValueSize(size); 1224 } 1225 1226 void updateValueSize(final int valueSize) { 1227 if (!isRandomValueSize()) return; 1228 this.valueSizeHistogram.update(valueSize); 1229 } 1230 1231 void updateScanMetrics(final ScanMetrics metrics) { 1232 if (metrics == null) return; 1233 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1234 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1235 if (rpcCalls != null) { 1236 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1237 } 1238 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1239 if (remoteRpcCalls != null) { 1240 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1241 } 1242 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1243 if (millisBetweenNext != null) { 1244 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1245 } 1246 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1247 if (regionsScanned != null) { 1248 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1249 } 1250 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1251 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1252 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1253 } 1254 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1255 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1256 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1257 } 1258 } 1259 1260 String generateStatus(final int sr, final int i, final int lr) { 1261 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1262 + getShortLatencyReport() + "]" 1263 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1264 } 1265 1266 boolean isRandomValueSize() { 1267 return opts.valueRandom; 1268 } 1269 1270 protected int getReportingPeriod() { 1271 return opts.period; 1272 } 1273 1274 /** 1275 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1276 */ 1277 public Histogram getLatencyHistogram() { 1278 return latencyHistogram; 1279 } 1280 1281 void testSetup() throws IOException { 1282 // test metrics 1283 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1284 // If it is a replica test, set up histogram for replica. 1285 if (opts.replicas > 1) { 1286 replicaLatencyHistogram = 1287 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1288 } 1289 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1290 // scan metrics 1291 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1292 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1293 millisBetweenNextHistogram = 1294 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1295 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1296 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1297 bytesInRemoteResultsHistogram = 1298 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1299 1300 onStartup(); 1301 } 1302 1303 abstract void onStartup() throws IOException; 1304 1305 void testTakedown() throws IOException { 1306 onTakedown(); 1307 // Print all stats for this thread continuously. 1308 // Synchronize on Test.class so different threads don't intermingle the 1309 // output. We can't use 'this' here because each thread has its own instance of Test class. 1310 synchronized (Test.class) { 1311 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1312 status 1313 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1314 if (opts.replicas > 1) { 1315 status.setStatus("Latency (us) from Replica Regions: " 1316 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1317 } 1318 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1319 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1320 if (valueSizeHistogram.getCount() > 0) { 1321 status.setStatus( 1322 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1323 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1324 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1325 } else { 1326 status.setStatus("No valueSize statistics available"); 1327 } 1328 if (rpcCallsHistogram.getCount() > 0) { 1329 status.setStatus( 1330 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1331 } 1332 if (remoteRpcCallsHistogram.getCount() > 0) { 1333 status.setStatus("remoteRpcCalls (count): " 1334 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1335 } 1336 if (millisBetweenNextHistogram.getCount() > 0) { 1337 status.setStatus("millisBetweenNext (latency): " 1338 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1339 } 1340 if (regionsScannedHistogram.getCount() > 0) { 1341 status.setStatus("regionsScanned (count): " 1342 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1343 } 1344 if (bytesInResultsHistogram.getCount() > 0) { 1345 status.setStatus("bytesInResults (size): " 1346 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1347 } 1348 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1349 status.setStatus("bytesInRemoteResults (size): " 1350 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1351 } 1352 } 1353 } 1354 1355 abstract void onTakedown() throws IOException; 1356 1357 /* 1358 * Run test 1359 * @return Elapsed time. 1360 */ 1361 long test() throws IOException, InterruptedException { 1362 testSetup(); 1363 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1364 final long startTime = System.nanoTime(); 1365 try { 1366 testTimed(); 1367 } finally { 1368 testTakedown(); 1369 } 1370 return (System.nanoTime() - startTime) / 1000000; 1371 } 1372 1373 int getStartRow() { 1374 return opts.startRow; 1375 } 1376 1377 int getLastRow() { 1378 return getStartRow() + opts.perClientRunRows; 1379 } 1380 1381 /** 1382 * Provides an extension point for tests that don't want a per row invocation. 1383 */ 1384 void testTimed() throws IOException, InterruptedException { 1385 int startRow = getStartRow(); 1386 int lastRow = getLastRow(); 1387 // Report on completion of 1/10th of total. 1388 for (int ii = 0; ii < opts.cycles; ii++) { 1389 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1390 for (int i = startRow; i < lastRow; i++) { 1391 if (i % everyN != 0) continue; 1392 long startTime = System.nanoTime(); 1393 boolean requestSent = false; 1394 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1395 try (Scope scope = span.makeCurrent()) { 1396 requestSent = testRow(i, startTime); 1397 } finally { 1398 span.end(); 1399 } 1400 if ((i - startRow) > opts.measureAfter) { 1401 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1402 // first 9 times and sends the actual get request in the 10th iteration. 1403 // We should only set latency when actual request is sent because otherwise 1404 // it turns out to be 0. 1405 if (requestSent) { 1406 long latency = (System.nanoTime() - startTime) / 1000; 1407 latencyHistogram.update(latency); 1408 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1409 numOfReplyOverLatencyThreshold++; 1410 } 1411 } 1412 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1413 status.setStatus(generateStatus(startRow, i, lastRow)); 1414 } 1415 } 1416 } 1417 } 1418 } 1419 1420 /** Returns Subset of the histograms' calculation. */ 1421 public String getShortLatencyReport() { 1422 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1423 } 1424 1425 /** Returns Subset of the histograms' calculation. */ 1426 public String getShortValueSizeReport() { 1427 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1428 } 1429 1430 /** 1431 * Test for individual row. 1432 * @param i Row index. 1433 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1434 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1435 */ 1436 abstract boolean testRow(final int i, final long startTime) 1437 throws IOException, InterruptedException; 1438 } 1439 1440 static abstract class Test extends TestBase { 1441 protected Connection connection; 1442 1443 Test(final Connection con, final TestOptions options, final Status status) { 1444 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1445 this.connection = con; 1446 } 1447 } 1448 1449 static abstract class AsyncTest extends TestBase { 1450 protected AsyncConnection connection; 1451 1452 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1453 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1454 this.connection = con; 1455 } 1456 } 1457 1458 static abstract class TableTest extends Test { 1459 protected Table table; 1460 1461 TableTest(Connection con, TestOptions options, Status status) { 1462 super(con, options, status); 1463 } 1464 1465 @Override 1466 void onStartup() throws IOException { 1467 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1468 } 1469 1470 @Override 1471 void onTakedown() throws IOException { 1472 table.close(); 1473 } 1474 } 1475 1476 /* 1477 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1478 */ 1479 static abstract class MetaTest extends TableTest { 1480 protected int keyLength; 1481 1482 MetaTest(Connection con, TestOptions options, Status status) { 1483 super(con, options, status); 1484 keyLength = Integer.toString(opts.perClientRunRows).length(); 1485 } 1486 1487 @Override 1488 void onTakedown() throws IOException { 1489 // No clean up 1490 } 1491 1492 /* 1493 * Generates Lexicographically ascending strings 1494 */ 1495 protected byte[] getSplitKey(final int i) { 1496 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1497 } 1498 1499 } 1500 1501 static abstract class AsyncTableTest extends AsyncTest { 1502 protected AsyncTable<?> table; 1503 1504 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1505 super(con, options, status); 1506 } 1507 1508 @Override 1509 void onStartup() throws IOException { 1510 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1511 } 1512 1513 @Override 1514 void onTakedown() throws IOException { 1515 } 1516 } 1517 1518 static class AsyncRandomReadTest extends AsyncTableTest { 1519 private final Consistency consistency; 1520 private ArrayList<Get> gets; 1521 1522 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1523 super(con, options, status); 1524 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1525 if (opts.multiGet > 0) { 1526 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1527 this.gets = new ArrayList<>(opts.multiGet); 1528 } 1529 } 1530 1531 @Override 1532 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1533 if (opts.randomSleep > 0) { 1534 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1535 } 1536 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1537 for (int family = 0; family < opts.families; family++) { 1538 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1539 if (opts.addColumns) { 1540 for (int column = 0; column < opts.columns; column++) { 1541 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1542 get.addColumn(familyName, qualifier); 1543 } 1544 } else { 1545 get.addFamily(familyName); 1546 } 1547 } 1548 if (opts.filterAll) { 1549 get.setFilter(new FilterAllFilter()); 1550 } 1551 get.setConsistency(consistency); 1552 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1553 try { 1554 if (opts.multiGet > 0) { 1555 this.gets.add(get); 1556 if (this.gets.size() == opts.multiGet) { 1557 Result[] rs = 1558 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1559 updateValueSize(rs); 1560 this.gets.clear(); 1561 } else { 1562 return false; 1563 } 1564 } else { 1565 updateValueSize(this.table.get(get).get()); 1566 } 1567 } catch (ExecutionException e) { 1568 throw new IOException(e); 1569 } 1570 return true; 1571 } 1572 1573 public static RuntimeException runtime(Throwable e) { 1574 if (e instanceof RuntimeException) { 1575 return (RuntimeException) e; 1576 } 1577 return new RuntimeException(e); 1578 } 1579 1580 public static <V> V propagate(Callable<V> callable) { 1581 try { 1582 return callable.call(); 1583 } catch (Exception e) { 1584 throw runtime(e); 1585 } 1586 } 1587 1588 @Override 1589 protected int getReportingPeriod() { 1590 int period = opts.perClientRunRows / 10; 1591 return period == 0 ? opts.perClientRunRows : period; 1592 } 1593 1594 @Override 1595 protected void testTakedown() throws IOException { 1596 if (this.gets != null && this.gets.size() > 0) { 1597 this.table.get(gets); 1598 this.gets.clear(); 1599 } 1600 super.testTakedown(); 1601 } 1602 } 1603 1604 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1605 1606 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1607 super(con, options, status); 1608 } 1609 1610 @Override 1611 protected byte[] generateRow(final int i) { 1612 return getRandomRow(this.rand, opts.totalRows); 1613 } 1614 } 1615 1616 static class AsyncScanTest extends AsyncTableTest { 1617 private ResultScanner testScanner; 1618 private AsyncTable<?> asyncTable; 1619 1620 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1621 super(con, options, status); 1622 } 1623 1624 @Override 1625 void onStartup() throws IOException { 1626 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1627 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1628 } 1629 1630 @Override 1631 void testTakedown() throws IOException { 1632 if (this.testScanner != null) { 1633 updateScanMetrics(this.testScanner.getScanMetrics()); 1634 this.testScanner.close(); 1635 } 1636 super.testTakedown(); 1637 } 1638 1639 @Override 1640 boolean testRow(final int i, final long startTime) throws IOException { 1641 if (this.testScanner == null) { 1642 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1643 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1644 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1645 for (int family = 0; family < opts.families; family++) { 1646 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1647 if (opts.addColumns) { 1648 for (int column = 0; column < opts.columns; column++) { 1649 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1650 scan.addColumn(familyName, qualifier); 1651 } 1652 } else { 1653 scan.addFamily(familyName); 1654 } 1655 } 1656 if (opts.filterAll) { 1657 scan.setFilter(new FilterAllFilter()); 1658 } 1659 this.testScanner = asyncTable.getScanner(scan); 1660 } 1661 Result r = testScanner.next(); 1662 updateValueSize(r); 1663 return true; 1664 } 1665 } 1666 1667 static class AsyncSequentialReadTest extends AsyncTableTest { 1668 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1669 super(con, options, status); 1670 } 1671 1672 @Override 1673 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1674 Get get = new Get(format(i)); 1675 for (int family = 0; family < opts.families; family++) { 1676 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1677 if (opts.addColumns) { 1678 for (int column = 0; column < opts.columns; column++) { 1679 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1680 get.addColumn(familyName, qualifier); 1681 } 1682 } else { 1683 get.addFamily(familyName); 1684 } 1685 } 1686 if (opts.filterAll) { 1687 get.setFilter(new FilterAllFilter()); 1688 } 1689 try { 1690 updateValueSize(table.get(get).get()); 1691 } catch (ExecutionException e) { 1692 throw new IOException(e); 1693 } 1694 return true; 1695 } 1696 } 1697 1698 static class AsyncSequentialWriteTest extends AsyncTableTest { 1699 private ArrayList<Put> puts; 1700 1701 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1702 super(con, options, status); 1703 if (opts.multiPut > 0) { 1704 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1705 this.puts = new ArrayList<>(opts.multiPut); 1706 } 1707 } 1708 1709 protected byte[] generateRow(final int i) { 1710 return format(i); 1711 } 1712 1713 @Override 1714 @SuppressWarnings("ReturnValueIgnored") 1715 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1716 byte[] row = generateRow(i); 1717 Put put = new Put(row); 1718 for (int family = 0; family < opts.families; family++) { 1719 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1720 for (int column = 0; column < opts.columns; column++) { 1721 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1722 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1723 if (opts.useTags) { 1724 byte[] tag = generateData(this.rand, TAG_LENGTH); 1725 Tag[] tags = new Tag[opts.noOfTags]; 1726 for (int n = 0; n < opts.noOfTags; n++) { 1727 Tag t = new ArrayBackedTag((byte) n, tag); 1728 tags[n] = t; 1729 } 1730 KeyValue kv = 1731 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1732 put.add(kv); 1733 updateValueSize(kv.getValueLength()); 1734 } else { 1735 put.addColumn(familyName, qualifier, value); 1736 updateValueSize(value.length); 1737 } 1738 } 1739 } 1740 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1741 try { 1742 table.put(put).get(); 1743 if (opts.multiPut > 0) { 1744 this.puts.add(put); 1745 if (this.puts.size() == opts.multiPut) { 1746 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1747 this.puts.clear(); 1748 } else { 1749 return false; 1750 } 1751 } else { 1752 table.put(put).get(); 1753 } 1754 } catch (ExecutionException e) { 1755 throw new IOException(e); 1756 } 1757 return true; 1758 } 1759 } 1760 1761 static abstract class BufferedMutatorTest extends Test { 1762 protected BufferedMutator mutator; 1763 protected Table table; 1764 1765 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1766 super(con, options, status); 1767 } 1768 1769 @Override 1770 void onStartup() throws IOException { 1771 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1772 p.writeBufferSize(opts.bufferSize); 1773 this.mutator = connection.getBufferedMutator(p); 1774 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1775 } 1776 1777 @Override 1778 void onTakedown() throws IOException { 1779 mutator.close(); 1780 table.close(); 1781 } 1782 } 1783 1784 static class RandomSeekScanTest extends TableTest { 1785 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1786 super(con, options, status); 1787 } 1788 1789 @Override 1790 boolean testRow(final int i, final long startTime) throws IOException { 1791 Scan scan = 1792 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1793 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1794 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1795 FilterList list = new FilterList(); 1796 for (int family = 0; family < opts.families; family++) { 1797 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1798 if (opts.addColumns) { 1799 for (int column = 0; column < opts.columns; column++) { 1800 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1801 scan.addColumn(familyName, qualifier); 1802 } 1803 } else { 1804 scan.addFamily(familyName); 1805 } 1806 } 1807 if (opts.filterAll) { 1808 list.addFilter(new FilterAllFilter()); 1809 } 1810 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1811 scan.setFilter(list); 1812 ResultScanner s = this.table.getScanner(scan); 1813 try { 1814 for (Result rr; (rr = s.next()) != null;) { 1815 updateValueSize(rr); 1816 } 1817 } finally { 1818 updateScanMetrics(s.getScanMetrics()); 1819 s.close(); 1820 } 1821 return true; 1822 } 1823 1824 @Override 1825 protected int getReportingPeriod() { 1826 int period = opts.perClientRunRows / 100; 1827 return period == 0 ? opts.perClientRunRows : period; 1828 } 1829 1830 } 1831 1832 static abstract class RandomScanWithRangeTest extends TableTest { 1833 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1834 super(con, options, status); 1835 } 1836 1837 @Override 1838 boolean testRow(final int i, final long startTime) throws IOException { 1839 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1840 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1841 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1842 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1843 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1844 for (int family = 0; family < opts.families; family++) { 1845 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1846 if (opts.addColumns) { 1847 for (int column = 0; column < opts.columns; column++) { 1848 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1849 scan.addColumn(familyName, qualifier); 1850 } 1851 } else { 1852 scan.addFamily(familyName); 1853 } 1854 } 1855 if (opts.filterAll) { 1856 scan.setFilter(new FilterAllFilter()); 1857 } 1858 Result r = null; 1859 int count = 0; 1860 ResultScanner s = this.table.getScanner(scan); 1861 try { 1862 for (; (r = s.next()) != null;) { 1863 updateValueSize(r); 1864 count++; 1865 } 1866 if (i % 100 == 0) { 1867 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1868 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1869 count)); 1870 } 1871 } finally { 1872 updateScanMetrics(s.getScanMetrics()); 1873 s.close(); 1874 } 1875 return true; 1876 } 1877 1878 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1879 1880 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1881 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1882 int stop = start + maxRange; 1883 return new Pair<>(format(start), format(stop)); 1884 } 1885 1886 @Override 1887 protected int getReportingPeriod() { 1888 int period = opts.perClientRunRows / 100; 1889 return period == 0 ? opts.perClientRunRows : period; 1890 } 1891 } 1892 1893 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1894 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1895 super(con, options, status); 1896 } 1897 1898 @Override 1899 protected Pair<byte[], byte[]> getStartAndStopRow() { 1900 return generateStartAndStopRows(10); 1901 } 1902 } 1903 1904 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1905 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1906 super(con, options, status); 1907 } 1908 1909 @Override 1910 protected Pair<byte[], byte[]> getStartAndStopRow() { 1911 return generateStartAndStopRows(100); 1912 } 1913 } 1914 1915 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1916 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1917 super(con, options, status); 1918 } 1919 1920 @Override 1921 protected Pair<byte[], byte[]> getStartAndStopRow() { 1922 return generateStartAndStopRows(1000); 1923 } 1924 } 1925 1926 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1927 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1928 super(con, options, status); 1929 } 1930 1931 @Override 1932 protected Pair<byte[], byte[]> getStartAndStopRow() { 1933 return generateStartAndStopRows(10000); 1934 } 1935 } 1936 1937 static class RandomReadTest extends TableTest { 1938 private final Consistency consistency; 1939 private ArrayList<Get> gets; 1940 1941 RandomReadTest(Connection con, TestOptions options, Status status) { 1942 super(con, options, status); 1943 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1944 if (opts.multiGet > 0) { 1945 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1946 this.gets = new ArrayList<>(opts.multiGet); 1947 } 1948 } 1949 1950 @Override 1951 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1952 if (opts.randomSleep > 0) { 1953 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1954 } 1955 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1956 for (int family = 0; family < opts.families; family++) { 1957 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1958 if (opts.addColumns) { 1959 for (int column = 0; column < opts.columns; column++) { 1960 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1961 get.addColumn(familyName, qualifier); 1962 } 1963 } else { 1964 get.addFamily(familyName); 1965 } 1966 } 1967 if (opts.filterAll) { 1968 get.setFilter(new FilterAllFilter()); 1969 } 1970 get.setConsistency(consistency); 1971 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1972 if (opts.multiGet > 0) { 1973 this.gets.add(get); 1974 if (this.gets.size() == opts.multiGet) { 1975 Result[] rs = this.table.get(this.gets); 1976 if (opts.replicas > 1) { 1977 long latency = System.nanoTime() - startTime; 1978 updateValueSize(rs, latency); 1979 } else { 1980 updateValueSize(rs); 1981 } 1982 this.gets.clear(); 1983 } else { 1984 return false; 1985 } 1986 } else { 1987 if (opts.replicas > 1) { 1988 Result r = this.table.get(get); 1989 long latency = System.nanoTime() - startTime; 1990 updateValueSize(r, latency); 1991 } else { 1992 updateValueSize(this.table.get(get)); 1993 } 1994 } 1995 return true; 1996 } 1997 1998 @Override 1999 protected int getReportingPeriod() { 2000 int period = opts.perClientRunRows / 10; 2001 return period == 0 ? opts.perClientRunRows : period; 2002 } 2003 2004 @Override 2005 protected void testTakedown() throws IOException { 2006 if (this.gets != null && this.gets.size() > 0) { 2007 this.table.get(gets); 2008 this.gets.clear(); 2009 } 2010 super.testTakedown(); 2011 } 2012 } 2013 2014 /* 2015 * Send random reads against fake regions inserted by MetaWriteTest 2016 */ 2017 static class MetaRandomReadTest extends MetaTest { 2018 private RegionLocator regionLocator; 2019 2020 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2021 super(con, options, status); 2022 LOG.info("call getRegionLocation"); 2023 } 2024 2025 @Override 2026 void onStartup() throws IOException { 2027 super.onStartup(); 2028 this.regionLocator = connection.getRegionLocator(table.getName()); 2029 } 2030 2031 @Override 2032 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2033 if (opts.randomSleep > 0) { 2034 Thread.sleep(rand.nextInt(opts.randomSleep)); 2035 } 2036 HRegionLocation hRegionLocation = 2037 regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true); 2038 LOG.debug("get location for region: " + hRegionLocation); 2039 return true; 2040 } 2041 2042 @Override 2043 protected int getReportingPeriod() { 2044 int period = opts.perClientRunRows / 10; 2045 return period == 0 ? opts.perClientRunRows : period; 2046 } 2047 2048 @Override 2049 protected void testTakedown() throws IOException { 2050 super.testTakedown(); 2051 } 2052 } 2053 2054 static class RandomWriteTest extends SequentialWriteTest { 2055 RandomWriteTest(Connection con, TestOptions options, Status status) { 2056 super(con, options, status); 2057 } 2058 2059 @Override 2060 protected byte[] generateRow(final int i) { 2061 return getRandomRow(this.rand, opts.totalRows); 2062 } 2063 2064 } 2065 2066 static class ScanTest extends TableTest { 2067 private ResultScanner testScanner; 2068 2069 ScanTest(Connection con, TestOptions options, Status status) { 2070 super(con, options, status); 2071 } 2072 2073 @Override 2074 void testTakedown() throws IOException { 2075 if (this.testScanner != null) { 2076 this.testScanner.close(); 2077 } 2078 super.testTakedown(); 2079 } 2080 2081 @Override 2082 boolean testRow(final int i, final long startTime) throws IOException { 2083 if (this.testScanner == null) { 2084 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2085 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2086 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2087 for (int family = 0; family < opts.families; family++) { 2088 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2089 if (opts.addColumns) { 2090 for (int column = 0; column < opts.columns; column++) { 2091 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2092 scan.addColumn(familyName, qualifier); 2093 } 2094 } else { 2095 scan.addFamily(familyName); 2096 } 2097 } 2098 if (opts.filterAll) { 2099 scan.setFilter(new FilterAllFilter()); 2100 } 2101 this.testScanner = table.getScanner(scan); 2102 } 2103 Result r = testScanner.next(); 2104 updateValueSize(r); 2105 return true; 2106 } 2107 } 2108 2109 static class ReverseScanTest extends TableTest { 2110 private ResultScanner testScanner; 2111 2112 ReverseScanTest(Connection con, TestOptions options, Status status) { 2113 super(con, options, status); 2114 } 2115 2116 @Override 2117 void testTakedown() throws IOException { 2118 if (this.testScanner != null) { 2119 this.testScanner.close(); 2120 } 2121 super.testTakedown(); 2122 } 2123 2124 @Override 2125 boolean testRow(final int i, final long startTime) throws IOException { 2126 if (this.testScanner == null) { 2127 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2128 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2129 .setScanMetricsEnabled(true).setReversed(true); 2130 for (int family = 0; family < opts.families; family++) { 2131 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2132 if (opts.addColumns) { 2133 for (int column = 0; column < opts.columns; column++) { 2134 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2135 scan.addColumn(familyName, qualifier); 2136 } 2137 } else { 2138 scan.addFamily(familyName); 2139 } 2140 } 2141 if (opts.filterAll) { 2142 scan.setFilter(new FilterAllFilter()); 2143 } 2144 this.testScanner = table.getScanner(scan); 2145 } 2146 Result r = testScanner.next(); 2147 updateValueSize(r); 2148 return true; 2149 } 2150 } 2151 2152 /** 2153 * Base class for operations that are CAS-like; that read a value and then set it based off what 2154 * they read. In this category is increment, append, checkAndPut, etc. 2155 * <p> 2156 * These operations also want some concurrency going on. Usually when these tests run, they 2157 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2158 * same key space. We do this with our getStartRow and getLastRow overrides. 2159 */ 2160 static abstract class CASTableTest extends TableTest { 2161 private final byte[] qualifier; 2162 2163 CASTableTest(Connection con, TestOptions options, Status status) { 2164 super(con, options, status); 2165 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2166 } 2167 2168 byte[] getQualifier() { 2169 return this.qualifier; 2170 } 2171 2172 @Override 2173 int getStartRow() { 2174 return 0; 2175 } 2176 2177 @Override 2178 int getLastRow() { 2179 return opts.perClientRunRows; 2180 } 2181 } 2182 2183 static class IncrementTest extends CASTableTest { 2184 IncrementTest(Connection con, TestOptions options, Status status) { 2185 super(con, options, status); 2186 } 2187 2188 @Override 2189 boolean testRow(final int i, final long startTime) throws IOException { 2190 Increment increment = new Increment(format(i)); 2191 // unlike checkAndXXX tests, which make most sense to do on a single value, 2192 // if multiple families are specified for an increment test we assume it is 2193 // meant to raise the work factor 2194 for (int family = 0; family < opts.families; family++) { 2195 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2196 increment.addColumn(familyName, getQualifier(), 1l); 2197 } 2198 updateValueSize(this.table.increment(increment)); 2199 return true; 2200 } 2201 } 2202 2203 static class AppendTest extends CASTableTest { 2204 AppendTest(Connection con, TestOptions options, Status status) { 2205 super(con, options, status); 2206 } 2207 2208 @Override 2209 boolean testRow(final int i, final long startTime) throws IOException { 2210 byte[] bytes = format(i); 2211 Append append = new Append(bytes); 2212 // unlike checkAndXXX tests, which make most sense to do on a single value, 2213 // if multiple families are specified for an append test we assume it is 2214 // meant to raise the work factor 2215 for (int family = 0; family < opts.families; family++) { 2216 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2217 append.addColumn(familyName, getQualifier(), bytes); 2218 } 2219 updateValueSize(this.table.append(append)); 2220 return true; 2221 } 2222 } 2223 2224 static class CheckAndMutateTest extends CASTableTest { 2225 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2226 super(con, options, status); 2227 } 2228 2229 @Override 2230 boolean testRow(final int i, final long startTime) throws IOException { 2231 final byte[] bytes = format(i); 2232 // checkAndXXX tests operate on only a single value 2233 // Put a known value so when we go to check it, it is there. 2234 Put put = new Put(bytes); 2235 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2236 this.table.put(put); 2237 RowMutations mutations = new RowMutations(bytes); 2238 mutations.add(put); 2239 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2240 .thenMutate(mutations); 2241 return true; 2242 } 2243 } 2244 2245 static class CheckAndPutTest extends CASTableTest { 2246 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2247 super(con, options, status); 2248 } 2249 2250 @Override 2251 boolean testRow(final int i, final long startTime) throws IOException { 2252 final byte[] bytes = format(i); 2253 // checkAndXXX tests operate on only a single value 2254 // Put a known value so when we go to check it, it is there. 2255 Put put = new Put(bytes); 2256 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2257 this.table.put(put); 2258 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2259 .thenPut(put); 2260 return true; 2261 } 2262 } 2263 2264 static class CheckAndDeleteTest extends CASTableTest { 2265 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2266 super(con, options, status); 2267 } 2268 2269 @Override 2270 boolean testRow(final int i, final long startTime) throws IOException { 2271 final byte[] bytes = format(i); 2272 // checkAndXXX tests operate on only a single value 2273 // Put a known value so when we go to check it, it is there. 2274 Put put = new Put(bytes); 2275 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2276 this.table.put(put); 2277 Delete delete = new Delete(put.getRow()); 2278 delete.addColumn(FAMILY_ZERO, getQualifier()); 2279 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2280 .thenDelete(delete); 2281 return true; 2282 } 2283 } 2284 2285 /* 2286 * Delete all fake regions inserted to meta table by MetaWriteTest. 2287 */ 2288 static class CleanMetaTest extends MetaTest { 2289 CleanMetaTest(Connection con, TestOptions options, Status status) { 2290 super(con, options, status); 2291 } 2292 2293 @Override 2294 boolean testRow(final int i, final long startTime) throws IOException { 2295 try { 2296 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2297 .getRegionLocation(getSplitKey(i), false).getRegion(); 2298 LOG.debug("deleting region from meta: " + regionInfo); 2299 2300 Delete delete = 2301 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2302 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2303 t.delete(delete); 2304 } 2305 } catch (IOException ie) { 2306 // Log and continue 2307 LOG.error("cannot find region with start key: " + i); 2308 } 2309 return true; 2310 } 2311 } 2312 2313 static class SequentialReadTest extends TableTest { 2314 SequentialReadTest(Connection con, TestOptions options, Status status) { 2315 super(con, options, status); 2316 } 2317 2318 @Override 2319 boolean testRow(final int i, final long startTime) throws IOException { 2320 Get get = new Get(format(i)); 2321 for (int family = 0; family < opts.families; family++) { 2322 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2323 if (opts.addColumns) { 2324 for (int column = 0; column < opts.columns; column++) { 2325 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2326 get.addColumn(familyName, qualifier); 2327 } 2328 } else { 2329 get.addFamily(familyName); 2330 } 2331 } 2332 if (opts.filterAll) { 2333 get.setFilter(new FilterAllFilter()); 2334 } 2335 updateValueSize(table.get(get)); 2336 return true; 2337 } 2338 } 2339 2340 static class SequentialWriteTest extends BufferedMutatorTest { 2341 private ArrayList<Put> puts; 2342 2343 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2344 super(con, options, status); 2345 if (opts.multiPut > 0) { 2346 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2347 this.puts = new ArrayList<>(opts.multiPut); 2348 } 2349 } 2350 2351 protected byte[] generateRow(final int i) { 2352 return format(i); 2353 } 2354 2355 @Override 2356 boolean testRow(final int i, final long startTime) throws IOException { 2357 byte[] row = generateRow(i); 2358 Put put = new Put(row); 2359 for (int family = 0; family < opts.families; family++) { 2360 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2361 for (int column = 0; column < opts.columns; column++) { 2362 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2363 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2364 if (opts.useTags) { 2365 byte[] tag = generateData(this.rand, TAG_LENGTH); 2366 Tag[] tags = new Tag[opts.noOfTags]; 2367 for (int n = 0; n < opts.noOfTags; n++) { 2368 Tag t = new ArrayBackedTag((byte) n, tag); 2369 tags[n] = t; 2370 } 2371 KeyValue kv = 2372 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2373 put.add(kv); 2374 updateValueSize(kv.getValueLength()); 2375 } else { 2376 put.addColumn(familyName, qualifier, value); 2377 updateValueSize(value.length); 2378 } 2379 } 2380 } 2381 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2382 if (opts.autoFlush) { 2383 if (opts.multiPut > 0) { 2384 this.puts.add(put); 2385 if (this.puts.size() == opts.multiPut) { 2386 table.put(this.puts); 2387 this.puts.clear(); 2388 } else { 2389 return false; 2390 } 2391 } else { 2392 table.put(put); 2393 } 2394 } else { 2395 mutator.mutate(put); 2396 } 2397 return true; 2398 } 2399 } 2400 2401 /* 2402 * Insert fake regions into meta table with contiguous split keys. 2403 */ 2404 static class MetaWriteTest extends MetaTest { 2405 2406 MetaWriteTest(Connection con, TestOptions options, Status status) { 2407 super(con, options, status); 2408 } 2409 2410 @Override 2411 boolean testRow(final int i, final long startTime) throws IOException { 2412 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2413 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2414 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2415 regionInfos.add(regionInfo); 2416 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2417 2418 // write the serverName columns 2419 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2420 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2421 EnvironmentEdgeManager.currentTime()); 2422 return true; 2423 } 2424 } 2425 2426 static class FilteredScanTest extends TableTest { 2427 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2428 2429 FilteredScanTest(Connection con, TestOptions options, Status status) { 2430 super(con, options, status); 2431 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2432 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2433 + ". This could take a very long time."); 2434 } 2435 } 2436 2437 @Override 2438 boolean testRow(int i, final long startTime) throws IOException { 2439 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2440 Scan scan = constructScan(value); 2441 ResultScanner scanner = null; 2442 try { 2443 scanner = this.table.getScanner(scan); 2444 for (Result r = null; (r = scanner.next()) != null;) { 2445 updateValueSize(r); 2446 } 2447 } finally { 2448 if (scanner != null) { 2449 updateScanMetrics(scanner.getScanMetrics()); 2450 scanner.close(); 2451 } 2452 } 2453 return true; 2454 } 2455 2456 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2457 FilterList list = new FilterList(); 2458 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2459 new BinaryComparator(valuePrefix)); 2460 list.addFilter(filter); 2461 if (opts.filterAll) { 2462 list.addFilter(new FilterAllFilter()); 2463 } 2464 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2465 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2466 .setScanMetricsEnabled(true); 2467 if (opts.addColumns) { 2468 for (int column = 0; column < opts.columns; column++) { 2469 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2470 scan.addColumn(FAMILY_ZERO, qualifier); 2471 } 2472 } else { 2473 scan.addFamily(FAMILY_ZERO); 2474 } 2475 scan.setFilter(list); 2476 return scan; 2477 } 2478 } 2479 2480 /** 2481 * Compute a throughput rate in MB/s. 2482 * @param rows Number of records consumed. 2483 * @param timeMs Time taken in milliseconds. 2484 * @return String value with label, ie '123.76 MB/s' 2485 */ 2486 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2487 int columns) { 2488 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2489 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2490 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2491 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2492 return FMT.format(mbps) + " MB/s"; 2493 } 2494 2495 /* 2496 * Format passed integer. 2497 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does 2498 * absolute in case number is negative). 2499 */ 2500 public static byte[] format(final int number) { 2501 byte[] b = new byte[ROW_LENGTH]; 2502 int d = Math.abs(number); 2503 for (int i = b.length - 1; i >= 0; i--) { 2504 b[i] = (byte) ((d % 10) + '0'); 2505 d /= 10; 2506 } 2507 return b; 2508 } 2509 2510 /* 2511 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2512 * test, generation of the key and value consumes about 30% of CPU time. 2513 * @return Generated random value to insert into a table cell. 2514 */ 2515 public static byte[] generateData(final Random r, int length) { 2516 byte[] b = new byte[length]; 2517 int i; 2518 2519 for (i = 0; i < (length - 8); i += 8) { 2520 b[i] = (byte) (65 + r.nextInt(26)); 2521 b[i + 1] = b[i]; 2522 b[i + 2] = b[i]; 2523 b[i + 3] = b[i]; 2524 b[i + 4] = b[i]; 2525 b[i + 5] = b[i]; 2526 b[i + 6] = b[i]; 2527 b[i + 7] = b[i]; 2528 } 2529 2530 byte a = (byte) (65 + r.nextInt(26)); 2531 for (; i < length; i++) { 2532 b[i] = a; 2533 } 2534 return b; 2535 } 2536 2537 static byte[] getRandomRow(final Random random, final int totalRows) { 2538 return format(generateRandomRow(random, totalRows)); 2539 } 2540 2541 static int generateRandomRow(final Random random, final int totalRows) { 2542 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2543 } 2544 2545 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2546 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2547 throws IOException, InterruptedException { 2548 status.setStatus( 2549 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2550 long totalElapsedTime; 2551 2552 final TestBase t; 2553 try { 2554 if (AsyncTest.class.isAssignableFrom(cmd)) { 2555 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2556 Constructor<? extends AsyncTest> constructor = 2557 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2558 t = constructor.newInstance(asyncCon, opts, status); 2559 } else { 2560 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2561 Constructor<? extends Test> constructor = 2562 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2563 t = constructor.newInstance(con, opts, status); 2564 } 2565 } catch (NoSuchMethodException e) { 2566 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2567 + ". It does not provide a constructor as described by " 2568 + "the javadoc comment. Available constructors are: " 2569 + Arrays.toString(cmd.getConstructors())); 2570 } catch (Exception e) { 2571 throw new IllegalStateException("Failed to construct command class", e); 2572 } 2573 totalElapsedTime = t.test(); 2574 2575 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2576 + " for " + opts.perClientRunRows + " rows" + " (" 2577 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2578 getAverageValueLength(opts), opts.families, opts.columns) 2579 + ")"); 2580 2581 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2582 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2583 } 2584 2585 private static int getAverageValueLength(final TestOptions opts) { 2586 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2587 } 2588 2589 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2590 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2591 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2592 // the TestOptions introspection for us and dump the output in a readable format. 2593 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2594 Admin admin = null; 2595 Connection connection = null; 2596 try { 2597 connection = ConnectionFactory.createConnection(getConf()); 2598 admin = connection.getAdmin(); 2599 checkTable(admin, opts); 2600 } finally { 2601 if (admin != null) admin.close(); 2602 if (connection != null) connection.close(); 2603 } 2604 if (opts.nomapred) { 2605 doLocalClients(opts, getConf()); 2606 } else { 2607 doMapReduce(opts, getConf()); 2608 } 2609 } 2610 2611 protected void printUsage() { 2612 printUsage(PE_COMMAND_SHORTNAME, null); 2613 } 2614 2615 protected static void printUsage(final String message) { 2616 printUsage(PE_COMMAND_SHORTNAME, message); 2617 } 2618 2619 protected static void printUsageAndExit(final String message, final int exitCode) { 2620 printUsage(message); 2621 System.exit(exitCode); 2622 } 2623 2624 protected static void printUsage(final String shortName, final String message) { 2625 if (message != null && message.length() > 0) { 2626 System.err.println(message); 2627 } 2628 System.err.print("Usage: hbase " + shortName); 2629 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2630 System.err.println(); 2631 System.err.println("General Options:"); 2632 System.err.println( 2633 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2634 System.err 2635 .println(" oneCon all the threads share the same connection. Default: False"); 2636 System.err.println(" connCount connections all threads share. " 2637 + "For example, if set to 2, then all thread share 2 connection. " 2638 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2639 + "if not, connCount=thread number"); 2640 2641 System.err.println(" sampleRate Execute test on a sample of total " 2642 + "rows. Only supported by randomRead. Default: 1.0"); 2643 System.err.println(" period Report every 'period' rows: " 2644 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2645 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2646 System.err.println( 2647 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2648 System.err.println(" latency Set to report operation latencies. Default: False"); 2649 System.err.println(" latencyThreshold Set to report number of operations with latency " 2650 + "over lantencyThreshold, unit in millisecond, default 0"); 2651 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2652 + " rows have been treated. Default: 0"); 2653 System.err 2654 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2655 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2656 + "'valueSize'; set on read for stats on size: Default: Not set."); 2657 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2658 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2659 System.err.println(); 2660 System.err.println("Table Creation / Write Tests:"); 2661 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2662 System.err.println( 2663 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2664 + ". In case of randomReads and randomSeekScans this could" 2665 + " be specified along with --size to specify the number of rows to be scanned within" 2666 + " the total range specified by the size."); 2667 System.err.println( 2668 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2669 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2670 + " use size to specify the end range and --rows" 2671 + " specifies the number of rows within that range. " + "Default: 1.0."); 2672 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2673 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2674 System.err.println( 2675 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2676 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2677 + "'valueSize' in zipf form: Default: Not set."); 2678 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2679 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2680 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2681 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2682 System.err.println(" presplit Create presplit table. If a table with same name exists," 2683 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2684 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2685 System.err.println( 2686 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2687 System.err.println(" numoftags Specify the no of tags that would be needed. " 2688 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2689 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2690 System.err.println(" columns Columns to write per row. Default: 1"); 2691 System.err 2692 .println(" families Specify number of column families for the table. Default: 1"); 2693 System.err.println(); 2694 System.err.println("Read Tests:"); 2695 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2696 + " there by not returning any thing back to the client. Helps to check the server side" 2697 + " performance. Uses FilterAllFilter internally. "); 2698 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2699 + "by randomRead. Default: disabled"); 2700 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2701 + "inmemory as far as possible. Not guaranteed that reads are always served " 2702 + "from memory. Default: false"); 2703 System.err 2704 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2705 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2706 System.err 2707 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2708 + "Uses the CompactingMemstore"); 2709 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2710 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2711 System.err.println( 2712 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2713 System.err.println(" caching Scan caching to use. Default: 30"); 2714 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2715 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2716 System.err.println( 2717 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2718 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2719 System.err.println(); 2720 System.err.println(" Note: -D properties will be applied to the conf used. "); 2721 System.err.println(" For example: "); 2722 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2723 System.err.println(" -Dmapreduce.task.timeout=60000"); 2724 System.err.println(); 2725 System.err.println("Command:"); 2726 for (CmdDescriptor command : COMMANDS.values()) { 2727 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2728 } 2729 System.err.println(); 2730 System.err.println("Args:"); 2731 System.err.println(" nclients Integer. Required. Total number of clients " 2732 + "(and HRegionServers) running. 1 <= value <= 500"); 2733 System.err.println("Examples:"); 2734 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2735 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2736 System.err.println(" To run 10 clients doing increments over ten rows:"); 2737 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2738 } 2739 2740 /** 2741 * Parse options passed in via an arguments array. Assumes that array has been split on 2742 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2743 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2744 * arguments. 2745 */ 2746 static TestOptions parseOpts(Queue<String> args) { 2747 TestOptions opts = new TestOptions(); 2748 2749 String cmd = null; 2750 while ((cmd = args.poll()) != null) { 2751 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2752 // place item back onto queue so that caller knows parsing was incomplete 2753 args.add(cmd); 2754 break; 2755 } 2756 2757 final String nmr = "--nomapred"; 2758 if (cmd.startsWith(nmr)) { 2759 opts.nomapred = true; 2760 continue; 2761 } 2762 2763 final String rows = "--rows="; 2764 if (cmd.startsWith(rows)) { 2765 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2766 continue; 2767 } 2768 2769 final String cycles = "--cycles="; 2770 if (cmd.startsWith(cycles)) { 2771 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2772 continue; 2773 } 2774 2775 final String sampleRate = "--sampleRate="; 2776 if (cmd.startsWith(sampleRate)) { 2777 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2778 continue; 2779 } 2780 2781 final String table = "--table="; 2782 if (cmd.startsWith(table)) { 2783 opts.tableName = cmd.substring(table.length()); 2784 continue; 2785 } 2786 2787 final String startRow = "--startRow="; 2788 if (cmd.startsWith(startRow)) { 2789 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2790 continue; 2791 } 2792 2793 final String compress = "--compress="; 2794 if (cmd.startsWith(compress)) { 2795 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2796 continue; 2797 } 2798 2799 final String encryption = "--encryption="; 2800 if (cmd.startsWith(encryption)) { 2801 opts.encryption = cmd.substring(encryption.length()); 2802 continue; 2803 } 2804 2805 final String traceRate = "--traceRate="; 2806 if (cmd.startsWith(traceRate)) { 2807 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2808 continue; 2809 } 2810 2811 final String blockEncoding = "--blockEncoding="; 2812 if (cmd.startsWith(blockEncoding)) { 2813 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2814 continue; 2815 } 2816 2817 final String flushCommits = "--flushCommits="; 2818 if (cmd.startsWith(flushCommits)) { 2819 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2820 continue; 2821 } 2822 2823 final String writeToWAL = "--writeToWAL="; 2824 if (cmd.startsWith(writeToWAL)) { 2825 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2826 continue; 2827 } 2828 2829 final String presplit = "--presplit="; 2830 if (cmd.startsWith(presplit)) { 2831 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2832 continue; 2833 } 2834 2835 final String inMemory = "--inmemory="; 2836 if (cmd.startsWith(inMemory)) { 2837 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2838 continue; 2839 } 2840 2841 final String autoFlush = "--autoFlush="; 2842 if (cmd.startsWith(autoFlush)) { 2843 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2844 continue; 2845 } 2846 2847 final String onceCon = "--oneCon="; 2848 if (cmd.startsWith(onceCon)) { 2849 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2850 continue; 2851 } 2852 2853 final String connCount = "--connCount="; 2854 if (cmd.startsWith(connCount)) { 2855 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2856 continue; 2857 } 2858 2859 final String latencyThreshold = "--latencyThreshold="; 2860 if (cmd.startsWith(latencyThreshold)) { 2861 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2862 continue; 2863 } 2864 2865 final String latency = "--latency"; 2866 if (cmd.startsWith(latency)) { 2867 opts.reportLatency = true; 2868 continue; 2869 } 2870 2871 final String multiGet = "--multiGet="; 2872 if (cmd.startsWith(multiGet)) { 2873 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2874 continue; 2875 } 2876 2877 final String multiPut = "--multiPut="; 2878 if (cmd.startsWith(multiPut)) { 2879 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2880 continue; 2881 } 2882 2883 final String useTags = "--usetags="; 2884 if (cmd.startsWith(useTags)) { 2885 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2886 continue; 2887 } 2888 2889 final String noOfTags = "--numoftags="; 2890 if (cmd.startsWith(noOfTags)) { 2891 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2892 continue; 2893 } 2894 2895 final String replicas = "--replicas="; 2896 if (cmd.startsWith(replicas)) { 2897 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2898 continue; 2899 } 2900 2901 final String filterOutAll = "--filterAll"; 2902 if (cmd.startsWith(filterOutAll)) { 2903 opts.filterAll = true; 2904 continue; 2905 } 2906 2907 final String size = "--size="; 2908 if (cmd.startsWith(size)) { 2909 opts.size = Float.parseFloat(cmd.substring(size.length())); 2910 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2911 continue; 2912 } 2913 2914 final String splitPolicy = "--splitPolicy="; 2915 if (cmd.startsWith(splitPolicy)) { 2916 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2917 continue; 2918 } 2919 2920 final String randomSleep = "--randomSleep="; 2921 if (cmd.startsWith(randomSleep)) { 2922 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2923 continue; 2924 } 2925 2926 final String measureAfter = "--measureAfter="; 2927 if (cmd.startsWith(measureAfter)) { 2928 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2929 continue; 2930 } 2931 2932 final String bloomFilter = "--bloomFilter="; 2933 if (cmd.startsWith(bloomFilter)) { 2934 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2935 continue; 2936 } 2937 2938 final String blockSize = "--blockSize="; 2939 if (cmd.startsWith(blockSize)) { 2940 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2941 continue; 2942 } 2943 2944 final String valueSize = "--valueSize="; 2945 if (cmd.startsWith(valueSize)) { 2946 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2947 continue; 2948 } 2949 2950 final String valueRandom = "--valueRandom"; 2951 if (cmd.startsWith(valueRandom)) { 2952 opts.valueRandom = true; 2953 continue; 2954 } 2955 2956 final String valueZipf = "--valueZipf"; 2957 if (cmd.startsWith(valueZipf)) { 2958 opts.valueZipf = true; 2959 continue; 2960 } 2961 2962 final String period = "--period="; 2963 if (cmd.startsWith(period)) { 2964 opts.period = Integer.parseInt(cmd.substring(period.length())); 2965 continue; 2966 } 2967 2968 final String addColumns = "--addColumns="; 2969 if (cmd.startsWith(addColumns)) { 2970 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2971 continue; 2972 } 2973 2974 final String inMemoryCompaction = "--inmemoryCompaction="; 2975 if (cmd.startsWith(inMemoryCompaction)) { 2976 opts.inMemoryCompaction = 2977 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2978 continue; 2979 } 2980 2981 final String columns = "--columns="; 2982 if (cmd.startsWith(columns)) { 2983 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2984 continue; 2985 } 2986 2987 final String families = "--families="; 2988 if (cmd.startsWith(families)) { 2989 opts.families = Integer.parseInt(cmd.substring(families.length())); 2990 continue; 2991 } 2992 2993 final String caching = "--caching="; 2994 if (cmd.startsWith(caching)) { 2995 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2996 continue; 2997 } 2998 2999 final String asyncPrefetch = "--asyncPrefetch"; 3000 if (cmd.startsWith(asyncPrefetch)) { 3001 opts.asyncPrefetch = true; 3002 continue; 3003 } 3004 3005 final String cacheBlocks = "--cacheBlocks="; 3006 if (cmd.startsWith(cacheBlocks)) { 3007 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 3008 continue; 3009 } 3010 3011 final String scanReadType = "--scanReadType="; 3012 if (cmd.startsWith(scanReadType)) { 3013 opts.scanReadType = 3014 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 3015 continue; 3016 } 3017 3018 final String bufferSize = "--bufferSize="; 3019 if (cmd.startsWith(bufferSize)) { 3020 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 3021 continue; 3022 } 3023 3024 validateParsedOpts(opts); 3025 3026 if (isCommandClass(cmd)) { 3027 opts.cmdName = cmd; 3028 try { 3029 opts.numClientThreads = Integer.parseInt(args.remove()); 3030 } catch (NoSuchElementException | NumberFormatException e) { 3031 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 3032 } 3033 opts = calculateRowsAndSize(opts); 3034 break; 3035 } else { 3036 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 3037 } 3038 3039 // Not matching any option or command. 3040 System.err.println("Error: Wrong option or command: " + cmd); 3041 args.add(cmd); 3042 break; 3043 } 3044 return opts; 3045 } 3046 3047 /** 3048 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3049 */ 3050 private static void validateParsedOpts(TestOptions opts) { 3051 3052 if (!opts.autoFlush && opts.multiPut > 0) { 3053 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3054 } 3055 3056 if (opts.oneCon && opts.connCount > 1) { 3057 throw new IllegalArgumentException( 3058 "oneCon is set to true, " + "connCount should not bigger than 1"); 3059 } 3060 3061 if (opts.valueZipf && opts.valueRandom) { 3062 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3063 } 3064 } 3065 3066 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3067 int rowsPerGB = getRowsPerGB(opts); 3068 if ( 3069 (opts.getCmdName() != null 3070 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3071 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3072 ) { 3073 opts.totalRows = (int) opts.size * rowsPerGB; 3074 } else if (opts.size != DEFAULT_OPTS.size) { 3075 // total size in GB specified 3076 opts.totalRows = (int) opts.size * rowsPerGB; 3077 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3078 } else { 3079 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3080 opts.size = opts.totalRows / rowsPerGB; 3081 } 3082 return opts; 3083 } 3084 3085 static int getRowsPerGB(final TestOptions opts) { 3086 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3087 * opts.getColumns()); 3088 } 3089 3090 @Override 3091 public int run(String[] args) throws Exception { 3092 // Process command-line args. TODO: Better cmd-line processing 3093 // (but hopefully something not as painful as cli options). 3094 int errCode = -1; 3095 if (args.length < 1) { 3096 printUsage(); 3097 return errCode; 3098 } 3099 3100 try { 3101 LinkedList<String> argv = new LinkedList<>(); 3102 argv.addAll(Arrays.asList(args)); 3103 TestOptions opts = parseOpts(argv); 3104 3105 // args remaining, print help and exit 3106 if (!argv.isEmpty()) { 3107 errCode = 0; 3108 printUsage(); 3109 return errCode; 3110 } 3111 3112 // must run at least 1 client 3113 if (opts.numClientThreads <= 0) { 3114 throw new IllegalArgumentException("Number of clients must be > 0"); 3115 } 3116 3117 // cmdName should not be null, print help and exit 3118 if (opts.cmdName == null) { 3119 printUsage(); 3120 return errCode; 3121 } 3122 3123 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3124 if (cmdClass != null) { 3125 runTest(cmdClass, opts); 3126 errCode = 0; 3127 } 3128 3129 } catch (Exception e) { 3130 e.printStackTrace(); 3131 } 3132 3133 return errCode; 3134 } 3135 3136 private static boolean isCommandClass(String cmd) { 3137 return COMMANDS.containsKey(cmd); 3138 } 3139 3140 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3141 CmdDescriptor descriptor = COMMANDS.get(cmd); 3142 return descriptor != null ? descriptor.getCmdClass() : null; 3143 } 3144 3145 public static void main(final String[] args) throws Exception { 3146 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3147 System.exit(res); 3148 } 3149}