001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Queue; 040import java.util.Random; 041import java.util.TreeMap; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.ThreadLocalRandom; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.conf.Configured; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.Append; 055import org.apache.hadoop.hbase.client.AsyncConnection; 056import org.apache.hadoop.hbase.client.AsyncTable; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.BufferedMutatorParams; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Consistency; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Durability; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Increment; 067import org.apache.hadoop.hbase.client.Put; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.RegionInfoBuilder; 070import org.apache.hadoop.hbase.client.RegionLocator; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ResultScanner; 073import org.apache.hadoop.hbase.client.RowMutations; 074import org.apache.hadoop.hbase.client.Scan; 075import org.apache.hadoop.hbase.client.Table; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 079import org.apache.hadoop.hbase.filter.BinaryComparator; 080import org.apache.hadoop.hbase.filter.Filter; 081import org.apache.hadoop.hbase.filter.FilterAllFilter; 082import org.apache.hadoop.hbase.filter.FilterList; 083import org.apache.hadoop.hbase.filter.PageFilter; 084import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 085import org.apache.hadoop.hbase.filter.WhileMatchFilter; 086import org.apache.hadoop.hbase.io.compress.Compression; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 089import org.apache.hadoop.hbase.regionserver.BloomType; 090import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 091import org.apache.hadoop.hbase.trace.TraceUtil; 092import org.apache.hadoop.hbase.util.ByteArrayHashKey; 093import org.apache.hadoop.hbase.util.Bytes; 094import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 095import org.apache.hadoop.hbase.util.GsonUtil; 096import org.apache.hadoop.hbase.util.Hash; 097import org.apache.hadoop.hbase.util.MurmurHash; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.util.RandomDistribution; 100import org.apache.hadoop.hbase.util.YammerHistogramUtils; 101import org.apache.hadoop.io.LongWritable; 102import org.apache.hadoop.io.Text; 103import org.apache.hadoop.mapreduce.Job; 104import org.apache.hadoop.mapreduce.Mapper; 105import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 106import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 107import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 108import org.apache.hadoop.util.Tool; 109import org.apache.hadoop.util.ToolRunner; 110import org.apache.yetus.audience.InterfaceAudience; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113 114import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 115import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 116import org.apache.hbase.thirdparty.com.google.gson.Gson; 117 118/** 119 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 120 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 121 * etc.). Pass on the command-line which test to run and how many clients are participating in this 122 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 123 * <p> 124 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 125 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 126 * pages 8-10. 127 * <p> 128 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 129 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 130 * about 1GB of data, unless specified otherwise. 131 */ 132@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 133public class PerformanceEvaluation extends Configured implements Tool { 134 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 135 static final String RANDOM_READ = "randomRead"; 136 static final String PE_COMMAND_SHORTNAME = "pe"; 137 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 138 private static final Gson GSON = GsonUtil.createGson().create(); 139 140 public static final String TABLE_NAME = "TestTable"; 141 public static final String FAMILY_NAME_BASE = "info"; 142 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 143 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 144 public static final int DEFAULT_VALUE_LENGTH = 1000; 145 public static final int ROW_LENGTH = 26; 146 147 private static final int ONE_GB = 1024 * 1024 * 1000; 148 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 149 // TODO : should we make this configurable 150 private static final int TAG_LENGTH = 256; 151 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 152 private static final MathContext CXT = MathContext.DECIMAL64; 153 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 154 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 155 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 156 157 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 158 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 159 160 static { 161 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 162 "Run async random read test"); 163 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 164 "Run async random write test"); 165 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 166 "Run async sequential read test"); 167 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 168 "Run async sequential write test"); 169 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 170 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 171 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 172 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 173 "Run random seek and scan 100 test"); 174 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 175 "Run random seek scan with both start and stop row (max 10 rows)"); 176 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 177 "Run random seek scan with both start and stop row (max 100 rows)"); 178 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 179 "Run random seek scan with both start and stop row (max 1000 rows)"); 180 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 181 "Run random seek scan with both start and stop row (max 10000 rows)"); 182 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 183 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 184 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 185 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 186 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 187 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 188 addCommandDescriptor(FilteredScanTest.class, "filterScan", 189 "Run scan test using a filter to find a specific row based on it's value " 190 + "(make sure to use --rows=20)"); 191 addCommandDescriptor(IncrementTest.class, "increment", 192 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 193 addCommandDescriptor(AppendTest.class, "append", 194 "Append on each row; clients overlap on keyspace so some concurrent operations"); 195 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 196 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 197 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 198 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 200 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 202 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 203 } 204 205 /** 206 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 207 * associated properties. 208 */ 209 protected static enum Counter { 210 /** elapsed time */ 211 ELAPSED_TIME, 212 /** number of rows */ 213 ROWS 214 } 215 216 protected static class RunResult implements Comparable<RunResult> { 217 public RunResult(long duration, Histogram hist) { 218 this.duration = duration; 219 this.hist = hist; 220 numbOfReplyOverThreshold = 0; 221 numOfReplyFromReplica = 0; 222 } 223 224 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 225 Histogram hist) { 226 this.duration = duration; 227 this.hist = hist; 228 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 229 this.numOfReplyFromReplica = numOfReplyFromReplica; 230 } 231 232 public final long duration; 233 public final Histogram hist; 234 public final long numbOfReplyOverThreshold; 235 public final long numOfReplyFromReplica; 236 237 @Override 238 public String toString() { 239 return Long.toString(duration); 240 } 241 242 @Override 243 public int compareTo(RunResult o) { 244 return Long.compare(this.duration, o.duration); 245 } 246 } 247 248 /** 249 * Constructor 250 * @param conf Configuration object 251 */ 252 public PerformanceEvaluation(final Configuration conf) { 253 super(conf); 254 } 255 256 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 257 String description) { 258 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 259 COMMANDS.put(name, cmdDescriptor); 260 } 261 262 /** 263 * Implementations can have their status set. 264 */ 265 interface Status { 266 /** 267 * Sets status 268 * @param msg status message n 269 */ 270 void setStatus(final String msg) throws IOException; 271 } 272 273 /** 274 * MapReduce job that runs a performance evaluation client in each map task. 275 */ 276 public static class EvaluationMapTask 277 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 278 279 /** configuration parameter name that contains the command */ 280 public final static String CMD_KEY = "EvaluationMapTask.command"; 281 /** configuration parameter name that contains the PE impl */ 282 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 283 284 private Class<? extends Test> cmd; 285 286 @Override 287 protected void setup(Context context) throws IOException, InterruptedException { 288 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 289 290 // this is required so that extensions of PE are instantiated within the 291 // map reduce task... 292 Class<? extends PerformanceEvaluation> peClass = 293 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 294 try { 295 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 296 } catch (Exception e) { 297 throw new IllegalStateException("Could not instantiate PE instance", e); 298 } 299 } 300 301 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 302 try { 303 return Class.forName(className).asSubclass(type); 304 } catch (ClassNotFoundException e) { 305 throw new IllegalStateException("Could not find class for name: " + className, e); 306 } 307 } 308 309 @Override 310 protected void map(LongWritable key, Text value, final Context context) 311 throws IOException, InterruptedException { 312 313 Status status = new Status() { 314 @Override 315 public void setStatus(String msg) { 316 context.setStatus(msg); 317 } 318 }; 319 320 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 321 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 322 final Connection con = ConnectionFactory.createConnection(conf); 323 AsyncConnection asyncCon = null; 324 try { 325 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 326 } catch (ExecutionException e) { 327 throw new IOException(e); 328 } 329 330 // Evaluation task 331 RunResult result = 332 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 333 // Collect how much time the thing took. Report as map output and 334 // to the ELAPSED_TIME counter. 335 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 336 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 337 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 338 context.progress(); 339 } 340 } 341 342 /* 343 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 344 * is specified or when the existing table's region replica count doesn't match {@code 345 * opts.replicas}. 346 */ 347 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 348 TableName tableName = TableName.valueOf(opts.tableName); 349 boolean needsDelete = false, exists = admin.tableExists(tableName); 350 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 351 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 352 if (!exists && isReadCmd) { 353 throw new IllegalStateException( 354 "Must specify an existing table for read commands. Run a write command first."); 355 } 356 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 357 byte[][] splits = getSplits(opts); 358 359 // recreate the table when user has requested presplit or when existing 360 // {RegionSplitPolicy,replica count} does not match requested, or when the 361 // number of column families does not match requested. 362 if ( 363 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 364 || (!isReadCmd && desc != null 365 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 366 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 367 || (desc != null && desc.getColumnFamilyCount() != opts.families) 368 ) { 369 needsDelete = true; 370 // wait, why did it delete my table?!? 371 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 372 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 373 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 374 .add("replicas", opts.replicas).add("families", opts.families).toString()); 375 } 376 377 // remove an existing table 378 if (needsDelete) { 379 if (admin.isTableEnabled(tableName)) { 380 admin.disableTable(tableName); 381 } 382 admin.deleteTable(tableName); 383 } 384 385 // table creation is necessary 386 if (!exists || needsDelete) { 387 desc = getTableDescriptor(opts); 388 if (splits != null) { 389 if (LOG.isDebugEnabled()) { 390 for (int i = 0; i < splits.length; i++) { 391 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 392 } 393 } 394 } 395 if (splits != null) { 396 admin.createTable(desc, splits); 397 } else { 398 admin.createTable(desc); 399 } 400 LOG.info("Table " + desc + " created"); 401 } 402 return admin.tableExists(tableName); 403 } 404 405 /** 406 * Create an HTableDescriptor from provided TestOptions. 407 */ 408 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 409 TableDescriptorBuilder builder = 410 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 411 412 for (int family = 0; family < opts.families; family++) { 413 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 414 ColumnFamilyDescriptorBuilder cfBuilder = 415 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 416 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 417 cfBuilder.setCompressionType(opts.compression); 418 cfBuilder.setEncryptionType(opts.encryption); 419 cfBuilder.setBloomFilterType(opts.bloomType); 420 cfBuilder.setBlocksize(opts.blockSize); 421 if (opts.inMemoryCF) { 422 cfBuilder.setInMemory(true); 423 } 424 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 425 builder.setColumnFamily(cfBuilder.build()); 426 } 427 if (opts.replicas != DEFAULT_OPTS.replicas) { 428 builder.setRegionReplication(opts.replicas); 429 } 430 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 431 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 432 } 433 return builder.build(); 434 } 435 436 /** 437 * generates splits based on total number of rows and specified split regions 438 */ 439 protected static byte[][] getSplits(TestOptions opts) { 440 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 441 442 int numSplitPoints = opts.presplitRegions - 1; 443 byte[][] splits = new byte[numSplitPoints][]; 444 int jump = opts.totalRows / opts.presplitRegions; 445 for (int i = 0; i < numSplitPoints; i++) { 446 int rowkey = jump * (1 + i); 447 splits[i] = format(rowkey); 448 } 449 return splits; 450 } 451 452 static void setupConnectionCount(final TestOptions opts) { 453 if (opts.oneCon) { 454 opts.connCount = 1; 455 } else { 456 if (opts.connCount == -1) { 457 // set to thread number if connCount is not set 458 opts.connCount = opts.numClientThreads; 459 } 460 } 461 } 462 463 /* 464 * Run all clients in this vm each to its own thread. 465 */ 466 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 467 throws IOException, InterruptedException, ExecutionException { 468 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 469 assert cmd != null; 470 @SuppressWarnings("unchecked") 471 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 472 RunResult[] results = new RunResult[opts.numClientThreads]; 473 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 474 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 475 setupConnectionCount(opts); 476 final Connection[] cons = new Connection[opts.connCount]; 477 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 478 for (int i = 0; i < opts.connCount; i++) { 479 cons[i] = ConnectionFactory.createConnection(conf); 480 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 481 } 482 LOG 483 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 484 for (int i = 0; i < threads.length; i++) { 485 final int index = i; 486 threads[i] = pool.submit(new Callable<RunResult>() { 487 @Override 488 public RunResult call() throws Exception { 489 TestOptions threadOpts = new TestOptions(opts); 490 final Connection con = cons[index % cons.length]; 491 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 492 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 493 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 494 @Override 495 public void setStatus(final String msg) throws IOException { 496 LOG.info(msg); 497 } 498 }); 499 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 500 + "ms over " + threadOpts.perClientRunRows + " rows"); 501 if (opts.latencyThreshold > 0) { 502 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 503 + "(ms) is " + run.numbOfReplyOverThreshold); 504 } 505 return run; 506 } 507 }); 508 } 509 pool.shutdown(); 510 511 for (int i = 0; i < threads.length; i++) { 512 try { 513 results[i] = threads[i].get(); 514 } catch (ExecutionException e) { 515 throw new IOException(e.getCause()); 516 } 517 } 518 final String test = cmd.getSimpleName(); 519 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 520 Arrays.sort(results); 521 long total = 0; 522 float avgLatency = 0; 523 float avgTPS = 0; 524 long replicaWins = 0; 525 for (RunResult result : results) { 526 total += result.duration; 527 avgLatency += result.hist.getSnapshot().getMean(); 528 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 529 replicaWins += result.numOfReplyFromReplica; 530 } 531 avgTPS *= 1000; // ms to second 532 avgLatency = avgLatency / results.length; 533 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 534 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 535 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 536 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 537 if (opts.replicas > 1) { 538 LOG.info("[results from replica regions] " + replicaWins); 539 } 540 541 for (int i = 0; i < opts.connCount; i++) { 542 cons[i].close(); 543 asyncCons[i].close(); 544 } 545 546 return results; 547 } 548 549 /* 550 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 551 * out an input file with instruction per client regards which row they are to start on. 552 * @param cmd Command to run. n 553 */ 554 static Job doMapReduce(TestOptions opts, final Configuration conf) 555 throws IOException, InterruptedException, ClassNotFoundException { 556 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 557 assert cmd != null; 558 Path inputDir = writeInputFile(conf, opts); 559 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 560 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 561 Job job = Job.getInstance(conf); 562 job.setJarByClass(PerformanceEvaluation.class); 563 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 564 565 job.setInputFormatClass(NLineInputFormat.class); 566 NLineInputFormat.setInputPaths(job, inputDir); 567 // this is default, but be explicit about it just in case. 568 NLineInputFormat.setNumLinesPerSplit(job, 1); 569 570 job.setOutputKeyClass(LongWritable.class); 571 job.setOutputValueClass(LongWritable.class); 572 573 job.setMapperClass(EvaluationMapTask.class); 574 job.setReducerClass(LongSumReducer.class); 575 576 job.setNumReduceTasks(1); 577 578 job.setOutputFormatClass(TextOutputFormat.class); 579 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 580 581 TableMapReduceUtil.addDependencyJars(job); 582 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 583 // metrics 584 Gson.class, // gson 585 FilterAllFilter.class // hbase-server tests jar 586 ); 587 588 TableMapReduceUtil.initCredentials(job); 589 590 job.waitForCompletion(true); 591 return job; 592 } 593 594 /** 595 * Each client has one mapper to do the work, and client do the resulting count in a map task. 596 */ 597 598 static String JOB_INPUT_FILENAME = "input.txt"; 599 600 /* 601 * Write input file of offsets-per-client for the mapreduce job. 602 * @param c Configuration 603 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME n 604 */ 605 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 606 return writeInputFile(c, opts, new Path(".")); 607 } 608 609 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 610 throws IOException { 611 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 612 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 613 Path inputDir = new Path(jobdir, "inputs"); 614 615 FileSystem fs = FileSystem.get(c); 616 fs.mkdirs(inputDir); 617 618 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 619 PrintStream out = new PrintStream(fs.create(inputFile)); 620 // Make input random. 621 Map<Integer, String> m = new TreeMap<>(); 622 Hash h = MurmurHash.getInstance(); 623 int perClientRows = (opts.totalRows / opts.numClientThreads); 624 try { 625 for (int j = 0; j < opts.numClientThreads; j++) { 626 TestOptions next = new TestOptions(opts); 627 next.startRow = j * perClientRows; 628 next.perClientRunRows = perClientRows; 629 String s = GSON.toJson(next); 630 LOG.info("Client=" + j + ", input=" + s); 631 byte[] b = Bytes.toBytes(s); 632 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 633 m.put(hash, s); 634 } 635 for (Map.Entry<Integer, String> e : m.entrySet()) { 636 out.println(e.getValue()); 637 } 638 } finally { 639 out.close(); 640 } 641 return inputDir; 642 } 643 644 /** 645 * Describes a command. 646 */ 647 static class CmdDescriptor { 648 private Class<? extends TestBase> cmdClass; 649 private String name; 650 private String description; 651 652 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 653 this.cmdClass = cmdClass; 654 this.name = name; 655 this.description = description; 656 } 657 658 public Class<? extends TestBase> getCmdClass() { 659 return cmdClass; 660 } 661 662 public String getName() { 663 return name; 664 } 665 666 public String getDescription() { 667 return description; 668 } 669 } 670 671 /** 672 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 673 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 674 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 675 * need to add to the clone constructor below copying your new option from the 'that' to the 676 * 'this'. Look for 'clone' below. 677 */ 678 static class TestOptions { 679 String cmdName = null; 680 boolean nomapred = false; 681 boolean filterAll = false; 682 int startRow = 0; 683 float size = 1.0f; 684 int perClientRunRows = DEFAULT_ROWS_PER_GB; 685 int numClientThreads = 1; 686 int totalRows = DEFAULT_ROWS_PER_GB; 687 int measureAfter = 0; 688 float sampleRate = 1.0f; 689 /** 690 * @deprecated Useless after switching to OpenTelemetry 691 */ 692 @Deprecated 693 double traceRate = 0.0; 694 String tableName = TABLE_NAME; 695 boolean flushCommits = true; 696 boolean writeToWAL = true; 697 boolean autoFlush = false; 698 boolean oneCon = false; 699 int connCount = -1; // wil decide the actual num later 700 boolean useTags = false; 701 int noOfTags = 1; 702 boolean reportLatency = false; 703 int multiGet = 0; 704 int multiPut = 0; 705 int randomSleep = 0; 706 boolean inMemoryCF = false; 707 int presplitRegions = 0; 708 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 709 String splitPolicy = null; 710 Compression.Algorithm compression = Compression.Algorithm.NONE; 711 String encryption = null; 712 BloomType bloomType = BloomType.ROW; 713 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 714 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 715 boolean valueRandom = false; 716 boolean valueZipf = false; 717 int valueSize = DEFAULT_VALUE_LENGTH; 718 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 719 int cycles = 1; 720 int columns = 1; 721 int families = 1; 722 int caching = 30; 723 int latencyThreshold = 0; // in millsecond 724 boolean addColumns = true; 725 MemoryCompactionPolicy inMemoryCompaction = 726 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 727 boolean asyncPrefetch = false; 728 boolean cacheBlocks = true; 729 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 730 long bufferSize = 2l * 1024l * 1024l; 731 732 public TestOptions() { 733 } 734 735 /** 736 * Clone constructor. 737 * @param that Object to copy from. 738 */ 739 public TestOptions(TestOptions that) { 740 this.cmdName = that.cmdName; 741 this.cycles = that.cycles; 742 this.nomapred = that.nomapred; 743 this.startRow = that.startRow; 744 this.size = that.size; 745 this.perClientRunRows = that.perClientRunRows; 746 this.numClientThreads = that.numClientThreads; 747 this.totalRows = that.totalRows; 748 this.sampleRate = that.sampleRate; 749 this.traceRate = that.traceRate; 750 this.tableName = that.tableName; 751 this.flushCommits = that.flushCommits; 752 this.writeToWAL = that.writeToWAL; 753 this.autoFlush = that.autoFlush; 754 this.oneCon = that.oneCon; 755 this.connCount = that.connCount; 756 this.useTags = that.useTags; 757 this.noOfTags = that.noOfTags; 758 this.reportLatency = that.reportLatency; 759 this.latencyThreshold = that.latencyThreshold; 760 this.multiGet = that.multiGet; 761 this.multiPut = that.multiPut; 762 this.inMemoryCF = that.inMemoryCF; 763 this.presplitRegions = that.presplitRegions; 764 this.replicas = that.replicas; 765 this.splitPolicy = that.splitPolicy; 766 this.compression = that.compression; 767 this.encryption = that.encryption; 768 this.blockEncoding = that.blockEncoding; 769 this.filterAll = that.filterAll; 770 this.bloomType = that.bloomType; 771 this.blockSize = that.blockSize; 772 this.valueRandom = that.valueRandom; 773 this.valueZipf = that.valueZipf; 774 this.valueSize = that.valueSize; 775 this.period = that.period; 776 this.randomSleep = that.randomSleep; 777 this.measureAfter = that.measureAfter; 778 this.addColumns = that.addColumns; 779 this.columns = that.columns; 780 this.families = that.families; 781 this.caching = that.caching; 782 this.inMemoryCompaction = that.inMemoryCompaction; 783 this.asyncPrefetch = that.asyncPrefetch; 784 this.cacheBlocks = that.cacheBlocks; 785 this.scanReadType = that.scanReadType; 786 this.bufferSize = that.bufferSize; 787 } 788 789 public int getCaching() { 790 return this.caching; 791 } 792 793 public void setCaching(final int caching) { 794 this.caching = caching; 795 } 796 797 public int getColumns() { 798 return this.columns; 799 } 800 801 public void setColumns(final int columns) { 802 this.columns = columns; 803 } 804 805 public int getFamilies() { 806 return this.families; 807 } 808 809 public void setFamilies(final int families) { 810 this.families = families; 811 } 812 813 public int getCycles() { 814 return this.cycles; 815 } 816 817 public void setCycles(final int cycles) { 818 this.cycles = cycles; 819 } 820 821 public boolean isValueZipf() { 822 return valueZipf; 823 } 824 825 public void setValueZipf(boolean valueZipf) { 826 this.valueZipf = valueZipf; 827 } 828 829 public String getCmdName() { 830 return cmdName; 831 } 832 833 public void setCmdName(String cmdName) { 834 this.cmdName = cmdName; 835 } 836 837 public int getRandomSleep() { 838 return randomSleep; 839 } 840 841 public void setRandomSleep(int randomSleep) { 842 this.randomSleep = randomSleep; 843 } 844 845 public int getReplicas() { 846 return replicas; 847 } 848 849 public void setReplicas(int replicas) { 850 this.replicas = replicas; 851 } 852 853 public String getSplitPolicy() { 854 return splitPolicy; 855 } 856 857 public void setSplitPolicy(String splitPolicy) { 858 this.splitPolicy = splitPolicy; 859 } 860 861 public void setNomapred(boolean nomapred) { 862 this.nomapred = nomapred; 863 } 864 865 public void setFilterAll(boolean filterAll) { 866 this.filterAll = filterAll; 867 } 868 869 public void setStartRow(int startRow) { 870 this.startRow = startRow; 871 } 872 873 public void setSize(float size) { 874 this.size = size; 875 } 876 877 public void setPerClientRunRows(int perClientRunRows) { 878 this.perClientRunRows = perClientRunRows; 879 } 880 881 public void setNumClientThreads(int numClientThreads) { 882 this.numClientThreads = numClientThreads; 883 } 884 885 public void setTotalRows(int totalRows) { 886 this.totalRows = totalRows; 887 } 888 889 public void setSampleRate(float sampleRate) { 890 this.sampleRate = sampleRate; 891 } 892 893 public void setTraceRate(double traceRate) { 894 this.traceRate = traceRate; 895 } 896 897 public void setTableName(String tableName) { 898 this.tableName = tableName; 899 } 900 901 public void setFlushCommits(boolean flushCommits) { 902 this.flushCommits = flushCommits; 903 } 904 905 public void setWriteToWAL(boolean writeToWAL) { 906 this.writeToWAL = writeToWAL; 907 } 908 909 public void setAutoFlush(boolean autoFlush) { 910 this.autoFlush = autoFlush; 911 } 912 913 public void setOneCon(boolean oneCon) { 914 this.oneCon = oneCon; 915 } 916 917 public int getConnCount() { 918 return connCount; 919 } 920 921 public void setConnCount(int connCount) { 922 this.connCount = connCount; 923 } 924 925 public void setUseTags(boolean useTags) { 926 this.useTags = useTags; 927 } 928 929 public void setNoOfTags(int noOfTags) { 930 this.noOfTags = noOfTags; 931 } 932 933 public void setReportLatency(boolean reportLatency) { 934 this.reportLatency = reportLatency; 935 } 936 937 public void setMultiGet(int multiGet) { 938 this.multiGet = multiGet; 939 } 940 941 public void setMultiPut(int multiPut) { 942 this.multiPut = multiPut; 943 } 944 945 public void setInMemoryCF(boolean inMemoryCF) { 946 this.inMemoryCF = inMemoryCF; 947 } 948 949 public void setPresplitRegions(int presplitRegions) { 950 this.presplitRegions = presplitRegions; 951 } 952 953 public void setCompression(Compression.Algorithm compression) { 954 this.compression = compression; 955 } 956 957 public void setEncryption(String encryption) { 958 this.encryption = encryption; 959 } 960 961 public void setBloomType(BloomType bloomType) { 962 this.bloomType = bloomType; 963 } 964 965 public void setBlockSize(int blockSize) { 966 this.blockSize = blockSize; 967 } 968 969 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 970 this.blockEncoding = blockEncoding; 971 } 972 973 public void setValueRandom(boolean valueRandom) { 974 this.valueRandom = valueRandom; 975 } 976 977 public void setValueSize(int valueSize) { 978 this.valueSize = valueSize; 979 } 980 981 public void setBufferSize(long bufferSize) { 982 this.bufferSize = bufferSize; 983 } 984 985 public void setPeriod(int period) { 986 this.period = period; 987 } 988 989 public boolean isNomapred() { 990 return nomapred; 991 } 992 993 public boolean isFilterAll() { 994 return filterAll; 995 } 996 997 public int getStartRow() { 998 return startRow; 999 } 1000 1001 public float getSize() { 1002 return size; 1003 } 1004 1005 public int getPerClientRunRows() { 1006 return perClientRunRows; 1007 } 1008 1009 public int getNumClientThreads() { 1010 return numClientThreads; 1011 } 1012 1013 public int getTotalRows() { 1014 return totalRows; 1015 } 1016 1017 public float getSampleRate() { 1018 return sampleRate; 1019 } 1020 1021 public double getTraceRate() { 1022 return traceRate; 1023 } 1024 1025 public String getTableName() { 1026 return tableName; 1027 } 1028 1029 public boolean isFlushCommits() { 1030 return flushCommits; 1031 } 1032 1033 public boolean isWriteToWAL() { 1034 return writeToWAL; 1035 } 1036 1037 public boolean isAutoFlush() { 1038 return autoFlush; 1039 } 1040 1041 public boolean isUseTags() { 1042 return useTags; 1043 } 1044 1045 public int getNoOfTags() { 1046 return noOfTags; 1047 } 1048 1049 public boolean isReportLatency() { 1050 return reportLatency; 1051 } 1052 1053 public int getMultiGet() { 1054 return multiGet; 1055 } 1056 1057 public int getMultiPut() { 1058 return multiPut; 1059 } 1060 1061 public boolean isInMemoryCF() { 1062 return inMemoryCF; 1063 } 1064 1065 public int getPresplitRegions() { 1066 return presplitRegions; 1067 } 1068 1069 public Compression.Algorithm getCompression() { 1070 return compression; 1071 } 1072 1073 public String getEncryption() { 1074 return encryption; 1075 } 1076 1077 public DataBlockEncoding getBlockEncoding() { 1078 return blockEncoding; 1079 } 1080 1081 public boolean isValueRandom() { 1082 return valueRandom; 1083 } 1084 1085 public int getValueSize() { 1086 return valueSize; 1087 } 1088 1089 public int getPeriod() { 1090 return period; 1091 } 1092 1093 public BloomType getBloomType() { 1094 return bloomType; 1095 } 1096 1097 public int getBlockSize() { 1098 return blockSize; 1099 } 1100 1101 public boolean isOneCon() { 1102 return oneCon; 1103 } 1104 1105 public int getMeasureAfter() { 1106 return measureAfter; 1107 } 1108 1109 public void setMeasureAfter(int measureAfter) { 1110 this.measureAfter = measureAfter; 1111 } 1112 1113 public boolean getAddColumns() { 1114 return addColumns; 1115 } 1116 1117 public void setAddColumns(boolean addColumns) { 1118 this.addColumns = addColumns; 1119 } 1120 1121 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1122 this.inMemoryCompaction = inMemoryCompaction; 1123 } 1124 1125 public MemoryCompactionPolicy getInMemoryCompaction() { 1126 return this.inMemoryCompaction; 1127 } 1128 1129 public long getBufferSize() { 1130 return this.bufferSize; 1131 } 1132 } 1133 1134 /* 1135 * A test. Subclass to particularize what happens per row. 1136 */ 1137 static abstract class TestBase { 1138 // Below is make it so when Tests are all running in the one 1139 // jvm, that they each have a differently seeded Random. 1140 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1141 1142 private static long nextRandomSeed() { 1143 return randomSeed.nextLong(); 1144 } 1145 1146 private final int everyN; 1147 1148 protected final Random rand = new Random(nextRandomSeed()); 1149 protected final Configuration conf; 1150 protected final TestOptions opts; 1151 1152 private final Status status; 1153 1154 private String testName; 1155 private Histogram latencyHistogram; 1156 private Histogram replicaLatencyHistogram; 1157 private Histogram valueSizeHistogram; 1158 private Histogram rpcCallsHistogram; 1159 private Histogram remoteRpcCallsHistogram; 1160 private Histogram millisBetweenNextHistogram; 1161 private Histogram regionsScannedHistogram; 1162 private Histogram bytesInResultsHistogram; 1163 private Histogram bytesInRemoteResultsHistogram; 1164 private RandomDistribution.Zipf zipf; 1165 private long numOfReplyOverLatencyThreshold = 0; 1166 private long numOfReplyFromReplica = 0; 1167 1168 /** 1169 * Note that all subclasses of this class must provide a public constructor that has the exact 1170 * same list of arguments. 1171 */ 1172 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1173 this.conf = conf; 1174 this.opts = options; 1175 this.status = status; 1176 this.testName = this.getClass().getSimpleName(); 1177 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1178 if (options.isValueZipf()) { 1179 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1180 } 1181 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1182 } 1183 1184 int getValueLength(final Random r) { 1185 if (this.opts.isValueRandom()) { 1186 return r.nextInt(opts.valueSize); 1187 } else if (this.opts.isValueZipf()) { 1188 return Math.abs(this.zipf.nextInt()); 1189 } else { 1190 return opts.valueSize; 1191 } 1192 } 1193 1194 void updateValueSize(final Result[] rs) throws IOException { 1195 updateValueSize(rs, 0); 1196 } 1197 1198 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1199 if (rs == null || (latency == 0)) return; 1200 for (Result r : rs) 1201 updateValueSize(r, latency); 1202 } 1203 1204 void updateValueSize(final Result r) throws IOException { 1205 updateValueSize(r, 0); 1206 } 1207 1208 void updateValueSize(final Result r, final long latency) throws IOException { 1209 if (r == null || (latency == 0)) return; 1210 int size = 0; 1211 // update replicaHistogram 1212 if (r.isStale()) { 1213 replicaLatencyHistogram.update(latency / 1000); 1214 numOfReplyFromReplica++; 1215 } 1216 if (!isRandomValueSize()) return; 1217 1218 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1219 size += scanner.current().getValueLength(); 1220 } 1221 updateValueSize(size); 1222 } 1223 1224 void updateValueSize(final int valueSize) { 1225 if (!isRandomValueSize()) return; 1226 this.valueSizeHistogram.update(valueSize); 1227 } 1228 1229 void updateScanMetrics(final ScanMetrics metrics) { 1230 if (metrics == null) return; 1231 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1232 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1233 if (rpcCalls != null) { 1234 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1235 } 1236 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1237 if (remoteRpcCalls != null) { 1238 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1239 } 1240 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1241 if (millisBetweenNext != null) { 1242 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1243 } 1244 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1245 if (regionsScanned != null) { 1246 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1247 } 1248 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1249 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1250 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1251 } 1252 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1253 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1254 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1255 } 1256 } 1257 1258 String generateStatus(final int sr, final int i, final int lr) { 1259 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1260 + getShortLatencyReport() + "]" 1261 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1262 } 1263 1264 boolean isRandomValueSize() { 1265 return opts.valueRandom; 1266 } 1267 1268 protected int getReportingPeriod() { 1269 return opts.period; 1270 } 1271 1272 /** 1273 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1274 */ 1275 public Histogram getLatencyHistogram() { 1276 return latencyHistogram; 1277 } 1278 1279 void testSetup() throws IOException { 1280 // test metrics 1281 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1282 // If it is a replica test, set up histogram for replica. 1283 if (opts.replicas > 1) { 1284 replicaLatencyHistogram = 1285 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1286 } 1287 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1288 // scan metrics 1289 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1290 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1291 millisBetweenNextHistogram = 1292 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1293 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1294 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1295 bytesInRemoteResultsHistogram = 1296 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1297 1298 onStartup(); 1299 } 1300 1301 abstract void onStartup() throws IOException; 1302 1303 void testTakedown() throws IOException { 1304 onTakedown(); 1305 // Print all stats for this thread continuously. 1306 // Synchronize on Test.class so different threads don't intermingle the 1307 // output. We can't use 'this' here because each thread has its own instance of Test class. 1308 synchronized (Test.class) { 1309 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1310 status 1311 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1312 if (opts.replicas > 1) { 1313 status.setStatus("Latency (us) from Replica Regions: " 1314 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1315 } 1316 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1317 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1318 if (valueSizeHistogram.getCount() > 0) { 1319 status.setStatus( 1320 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1321 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1322 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1323 } else { 1324 status.setStatus("No valueSize statistics available"); 1325 } 1326 if (rpcCallsHistogram.getCount() > 0) { 1327 status.setStatus( 1328 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1329 } 1330 if (remoteRpcCallsHistogram.getCount() > 0) { 1331 status.setStatus("remoteRpcCalls (count): " 1332 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1333 } 1334 if (millisBetweenNextHistogram.getCount() > 0) { 1335 status.setStatus("millisBetweenNext (latency): " 1336 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1337 } 1338 if (regionsScannedHistogram.getCount() > 0) { 1339 status.setStatus("regionsScanned (count): " 1340 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1341 } 1342 if (bytesInResultsHistogram.getCount() > 0) { 1343 status.setStatus("bytesInResults (size): " 1344 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1345 } 1346 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1347 status.setStatus("bytesInRemoteResults (size): " 1348 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1349 } 1350 } 1351 } 1352 1353 abstract void onTakedown() throws IOException; 1354 1355 /* 1356 * Run test 1357 * @return Elapsed time. n 1358 */ 1359 long test() throws IOException, InterruptedException { 1360 testSetup(); 1361 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1362 final long startTime = System.nanoTime(); 1363 try { 1364 testTimed(); 1365 } finally { 1366 testTakedown(); 1367 } 1368 return (System.nanoTime() - startTime) / 1000000; 1369 } 1370 1371 int getStartRow() { 1372 return opts.startRow; 1373 } 1374 1375 int getLastRow() { 1376 return getStartRow() + opts.perClientRunRows; 1377 } 1378 1379 /** 1380 * Provides an extension point for tests that don't want a per row invocation. 1381 */ 1382 void testTimed() throws IOException, InterruptedException { 1383 int startRow = getStartRow(); 1384 int lastRow = getLastRow(); 1385 // Report on completion of 1/10th of total. 1386 for (int ii = 0; ii < opts.cycles; ii++) { 1387 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1388 for (int i = startRow; i < lastRow; i++) { 1389 if (i % everyN != 0) continue; 1390 long startTime = System.nanoTime(); 1391 boolean requestSent = false; 1392 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1393 try (Scope scope = span.makeCurrent()) { 1394 requestSent = testRow(i, startTime); 1395 } finally { 1396 span.end(); 1397 } 1398 if ((i - startRow) > opts.measureAfter) { 1399 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1400 // first 9 times and sends the actual get request in the 10th iteration. 1401 // We should only set latency when actual request is sent because otherwise 1402 // it turns out to be 0. 1403 if (requestSent) { 1404 long latency = (System.nanoTime() - startTime) / 1000; 1405 latencyHistogram.update(latency); 1406 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1407 numOfReplyOverLatencyThreshold++; 1408 } 1409 } 1410 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1411 status.setStatus(generateStatus(startRow, i, lastRow)); 1412 } 1413 } 1414 } 1415 } 1416 } 1417 1418 /** 1419 * @return Subset of the histograms' calculation. 1420 */ 1421 public String getShortLatencyReport() { 1422 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1423 } 1424 1425 /** 1426 * @return Subset of the histograms' calculation. 1427 */ 1428 public String getShortValueSizeReport() { 1429 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1430 } 1431 1432 /** 1433 * Test for individual row. 1434 * @param i Row index. 1435 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1436 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1437 */ 1438 abstract boolean testRow(final int i, final long startTime) 1439 throws IOException, InterruptedException; 1440 } 1441 1442 static abstract class Test extends TestBase { 1443 protected Connection connection; 1444 1445 Test(final Connection con, final TestOptions options, final Status status) { 1446 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1447 this.connection = con; 1448 } 1449 } 1450 1451 static abstract class AsyncTest extends TestBase { 1452 protected AsyncConnection connection; 1453 1454 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1455 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1456 this.connection = con; 1457 } 1458 } 1459 1460 static abstract class TableTest extends Test { 1461 protected Table table; 1462 1463 TableTest(Connection con, TestOptions options, Status status) { 1464 super(con, options, status); 1465 } 1466 1467 @Override 1468 void onStartup() throws IOException { 1469 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1470 } 1471 1472 @Override 1473 void onTakedown() throws IOException { 1474 table.close(); 1475 } 1476 } 1477 1478 /* 1479 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1480 */ 1481 static abstract class MetaTest extends TableTest { 1482 protected int keyLength; 1483 1484 MetaTest(Connection con, TestOptions options, Status status) { 1485 super(con, options, status); 1486 keyLength = Integer.toString(opts.perClientRunRows).length(); 1487 } 1488 1489 @Override 1490 void onTakedown() throws IOException { 1491 // No clean up 1492 } 1493 1494 /* 1495 * Generates Lexicographically ascending strings 1496 */ 1497 protected byte[] getSplitKey(final int i) { 1498 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1499 } 1500 1501 } 1502 1503 static abstract class AsyncTableTest extends AsyncTest { 1504 protected AsyncTable<?> table; 1505 1506 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1507 super(con, options, status); 1508 } 1509 1510 @Override 1511 void onStartup() throws IOException { 1512 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1513 } 1514 1515 @Override 1516 void onTakedown() throws IOException { 1517 } 1518 } 1519 1520 static class AsyncRandomReadTest extends AsyncTableTest { 1521 private final Consistency consistency; 1522 private ArrayList<Get> gets; 1523 1524 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1525 super(con, options, status); 1526 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1527 if (opts.multiGet > 0) { 1528 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1529 this.gets = new ArrayList<>(opts.multiGet); 1530 } 1531 } 1532 1533 @Override 1534 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1535 if (opts.randomSleep > 0) { 1536 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1537 } 1538 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1539 for (int family = 0; family < opts.families; family++) { 1540 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1541 if (opts.addColumns) { 1542 for (int column = 0; column < opts.columns; column++) { 1543 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1544 get.addColumn(familyName, qualifier); 1545 } 1546 } else { 1547 get.addFamily(familyName); 1548 } 1549 } 1550 if (opts.filterAll) { 1551 get.setFilter(new FilterAllFilter()); 1552 } 1553 get.setConsistency(consistency); 1554 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1555 try { 1556 if (opts.multiGet > 0) { 1557 this.gets.add(get); 1558 if (this.gets.size() == opts.multiGet) { 1559 Result[] rs = 1560 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1561 updateValueSize(rs); 1562 this.gets.clear(); 1563 } else { 1564 return false; 1565 } 1566 } else { 1567 updateValueSize(this.table.get(get).get()); 1568 } 1569 } catch (ExecutionException e) { 1570 throw new IOException(e); 1571 } 1572 return true; 1573 } 1574 1575 public static RuntimeException runtime(Throwable e) { 1576 if (e instanceof RuntimeException) { 1577 return (RuntimeException) e; 1578 } 1579 return new RuntimeException(e); 1580 } 1581 1582 public static <V> V propagate(Callable<V> callable) { 1583 try { 1584 return callable.call(); 1585 } catch (Exception e) { 1586 throw runtime(e); 1587 } 1588 } 1589 1590 @Override 1591 protected int getReportingPeriod() { 1592 int period = opts.perClientRunRows / 10; 1593 return period == 0 ? opts.perClientRunRows : period; 1594 } 1595 1596 @Override 1597 protected void testTakedown() throws IOException { 1598 if (this.gets != null && this.gets.size() > 0) { 1599 this.table.get(gets); 1600 this.gets.clear(); 1601 } 1602 super.testTakedown(); 1603 } 1604 } 1605 1606 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1607 1608 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1609 super(con, options, status); 1610 } 1611 1612 @Override 1613 protected byte[] generateRow(final int i) { 1614 return getRandomRow(this.rand, opts.totalRows); 1615 } 1616 } 1617 1618 static class AsyncScanTest extends AsyncTableTest { 1619 private ResultScanner testScanner; 1620 private AsyncTable<?> asyncTable; 1621 1622 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1623 super(con, options, status); 1624 } 1625 1626 @Override 1627 void onStartup() throws IOException { 1628 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1629 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1630 } 1631 1632 @Override 1633 void testTakedown() throws IOException { 1634 if (this.testScanner != null) { 1635 updateScanMetrics(this.testScanner.getScanMetrics()); 1636 this.testScanner.close(); 1637 } 1638 super.testTakedown(); 1639 } 1640 1641 @Override 1642 boolean testRow(final int i, final long startTime) throws IOException { 1643 if (this.testScanner == null) { 1644 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1645 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1646 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1647 for (int family = 0; family < opts.families; family++) { 1648 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1649 if (opts.addColumns) { 1650 for (int column = 0; column < opts.columns; column++) { 1651 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1652 scan.addColumn(familyName, qualifier); 1653 } 1654 } else { 1655 scan.addFamily(familyName); 1656 } 1657 } 1658 if (opts.filterAll) { 1659 scan.setFilter(new FilterAllFilter()); 1660 } 1661 this.testScanner = asyncTable.getScanner(scan); 1662 } 1663 Result r = testScanner.next(); 1664 updateValueSize(r); 1665 return true; 1666 } 1667 } 1668 1669 static class AsyncSequentialReadTest extends AsyncTableTest { 1670 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1671 super(con, options, status); 1672 } 1673 1674 @Override 1675 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1676 Get get = new Get(format(i)); 1677 for (int family = 0; family < opts.families; family++) { 1678 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1679 if (opts.addColumns) { 1680 for (int column = 0; column < opts.columns; column++) { 1681 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1682 get.addColumn(familyName, qualifier); 1683 } 1684 } else { 1685 get.addFamily(familyName); 1686 } 1687 } 1688 if (opts.filterAll) { 1689 get.setFilter(new FilterAllFilter()); 1690 } 1691 try { 1692 updateValueSize(table.get(get).get()); 1693 } catch (ExecutionException e) { 1694 throw new IOException(e); 1695 } 1696 return true; 1697 } 1698 } 1699 1700 static class AsyncSequentialWriteTest extends AsyncTableTest { 1701 private ArrayList<Put> puts; 1702 1703 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1704 super(con, options, status); 1705 if (opts.multiPut > 0) { 1706 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1707 this.puts = new ArrayList<>(opts.multiPut); 1708 } 1709 } 1710 1711 protected byte[] generateRow(final int i) { 1712 return format(i); 1713 } 1714 1715 @Override 1716 @SuppressWarnings("ReturnValueIgnored") 1717 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1718 byte[] row = generateRow(i); 1719 Put put = new Put(row); 1720 for (int family = 0; family < opts.families; family++) { 1721 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1722 for (int column = 0; column < opts.columns; column++) { 1723 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1724 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1725 if (opts.useTags) { 1726 byte[] tag = generateData(this.rand, TAG_LENGTH); 1727 Tag[] tags = new Tag[opts.noOfTags]; 1728 for (int n = 0; n < opts.noOfTags; n++) { 1729 Tag t = new ArrayBackedTag((byte) n, tag); 1730 tags[n] = t; 1731 } 1732 KeyValue kv = 1733 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1734 put.add(kv); 1735 updateValueSize(kv.getValueLength()); 1736 } else { 1737 put.addColumn(familyName, qualifier, value); 1738 updateValueSize(value.length); 1739 } 1740 } 1741 } 1742 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1743 try { 1744 table.put(put).get(); 1745 if (opts.multiPut > 0) { 1746 this.puts.add(put); 1747 if (this.puts.size() == opts.multiPut) { 1748 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1749 this.puts.clear(); 1750 } else { 1751 return false; 1752 } 1753 } else { 1754 table.put(put).get(); 1755 } 1756 } catch (ExecutionException e) { 1757 throw new IOException(e); 1758 } 1759 return true; 1760 } 1761 } 1762 1763 static abstract class BufferedMutatorTest extends Test { 1764 protected BufferedMutator mutator; 1765 protected Table table; 1766 1767 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1768 super(con, options, status); 1769 } 1770 1771 @Override 1772 void onStartup() throws IOException { 1773 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1774 p.writeBufferSize(opts.bufferSize); 1775 this.mutator = connection.getBufferedMutator(p); 1776 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1777 } 1778 1779 @Override 1780 void onTakedown() throws IOException { 1781 mutator.close(); 1782 table.close(); 1783 } 1784 } 1785 1786 static class RandomSeekScanTest extends TableTest { 1787 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1788 super(con, options, status); 1789 } 1790 1791 @Override 1792 boolean testRow(final int i, final long startTime) throws IOException { 1793 Scan scan = 1794 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1795 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1796 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1797 FilterList list = new FilterList(); 1798 for (int family = 0; family < opts.families; family++) { 1799 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1800 if (opts.addColumns) { 1801 for (int column = 0; column < opts.columns; column++) { 1802 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1803 scan.addColumn(familyName, qualifier); 1804 } 1805 } else { 1806 scan.addFamily(familyName); 1807 } 1808 } 1809 if (opts.filterAll) { 1810 list.addFilter(new FilterAllFilter()); 1811 } 1812 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1813 scan.setFilter(list); 1814 ResultScanner s = this.table.getScanner(scan); 1815 try { 1816 for (Result rr; (rr = s.next()) != null;) { 1817 updateValueSize(rr); 1818 } 1819 } finally { 1820 updateScanMetrics(s.getScanMetrics()); 1821 s.close(); 1822 } 1823 return true; 1824 } 1825 1826 @Override 1827 protected int getReportingPeriod() { 1828 int period = opts.perClientRunRows / 100; 1829 return period == 0 ? opts.perClientRunRows : period; 1830 } 1831 1832 } 1833 1834 static abstract class RandomScanWithRangeTest extends TableTest { 1835 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1836 super(con, options, status); 1837 } 1838 1839 @Override 1840 boolean testRow(final int i, final long startTime) throws IOException { 1841 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1842 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1843 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1844 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1845 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1846 for (int family = 0; family < opts.families; family++) { 1847 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1848 if (opts.addColumns) { 1849 for (int column = 0; column < opts.columns; column++) { 1850 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1851 scan.addColumn(familyName, qualifier); 1852 } 1853 } else { 1854 scan.addFamily(familyName); 1855 } 1856 } 1857 if (opts.filterAll) { 1858 scan.setFilter(new FilterAllFilter()); 1859 } 1860 Result r = null; 1861 int count = 0; 1862 ResultScanner s = this.table.getScanner(scan); 1863 try { 1864 for (; (r = s.next()) != null;) { 1865 updateValueSize(r); 1866 count++; 1867 } 1868 if (i % 100 == 0) { 1869 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1870 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1871 count)); 1872 } 1873 } finally { 1874 updateScanMetrics(s.getScanMetrics()); 1875 s.close(); 1876 } 1877 return true; 1878 } 1879 1880 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1881 1882 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1883 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1884 int stop = start + maxRange; 1885 return new Pair<>(format(start), format(stop)); 1886 } 1887 1888 @Override 1889 protected int getReportingPeriod() { 1890 int period = opts.perClientRunRows / 100; 1891 return period == 0 ? opts.perClientRunRows : period; 1892 } 1893 } 1894 1895 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1896 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1897 super(con, options, status); 1898 } 1899 1900 @Override 1901 protected Pair<byte[], byte[]> getStartAndStopRow() { 1902 return generateStartAndStopRows(10); 1903 } 1904 } 1905 1906 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1907 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1908 super(con, options, status); 1909 } 1910 1911 @Override 1912 protected Pair<byte[], byte[]> getStartAndStopRow() { 1913 return generateStartAndStopRows(100); 1914 } 1915 } 1916 1917 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1918 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1919 super(con, options, status); 1920 } 1921 1922 @Override 1923 protected Pair<byte[], byte[]> getStartAndStopRow() { 1924 return generateStartAndStopRows(1000); 1925 } 1926 } 1927 1928 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1929 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1930 super(con, options, status); 1931 } 1932 1933 @Override 1934 protected Pair<byte[], byte[]> getStartAndStopRow() { 1935 return generateStartAndStopRows(10000); 1936 } 1937 } 1938 1939 static class RandomReadTest extends TableTest { 1940 private final Consistency consistency; 1941 private ArrayList<Get> gets; 1942 1943 RandomReadTest(Connection con, TestOptions options, Status status) { 1944 super(con, options, status); 1945 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1946 if (opts.multiGet > 0) { 1947 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1948 this.gets = new ArrayList<>(opts.multiGet); 1949 } 1950 } 1951 1952 @Override 1953 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1954 if (opts.randomSleep > 0) { 1955 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1956 } 1957 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1958 for (int family = 0; family < opts.families; family++) { 1959 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1960 if (opts.addColumns) { 1961 for (int column = 0; column < opts.columns; column++) { 1962 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1963 get.addColumn(familyName, qualifier); 1964 } 1965 } else { 1966 get.addFamily(familyName); 1967 } 1968 } 1969 if (opts.filterAll) { 1970 get.setFilter(new FilterAllFilter()); 1971 } 1972 get.setConsistency(consistency); 1973 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1974 if (opts.multiGet > 0) { 1975 this.gets.add(get); 1976 if (this.gets.size() == opts.multiGet) { 1977 Result[] rs = this.table.get(this.gets); 1978 if (opts.replicas > 1) { 1979 long latency = System.nanoTime() - startTime; 1980 updateValueSize(rs, latency); 1981 } else { 1982 updateValueSize(rs); 1983 } 1984 this.gets.clear(); 1985 } else { 1986 return false; 1987 } 1988 } else { 1989 if (opts.replicas > 1) { 1990 Result r = this.table.get(get); 1991 long latency = System.nanoTime() - startTime; 1992 updateValueSize(r, latency); 1993 } else { 1994 updateValueSize(this.table.get(get)); 1995 } 1996 } 1997 return true; 1998 } 1999 2000 @Override 2001 protected int getReportingPeriod() { 2002 int period = opts.perClientRunRows / 10; 2003 return period == 0 ? opts.perClientRunRows : period; 2004 } 2005 2006 @Override 2007 protected void testTakedown() throws IOException { 2008 if (this.gets != null && this.gets.size() > 0) { 2009 this.table.get(gets); 2010 this.gets.clear(); 2011 } 2012 super.testTakedown(); 2013 } 2014 } 2015 2016 /* 2017 * Send random reads against fake regions inserted by MetaWriteTest 2018 */ 2019 static class MetaRandomReadTest extends MetaTest { 2020 private RegionLocator regionLocator; 2021 2022 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2023 super(con, options, status); 2024 LOG.info("call getRegionLocation"); 2025 } 2026 2027 @Override 2028 void onStartup() throws IOException { 2029 super.onStartup(); 2030 this.regionLocator = connection.getRegionLocator(table.getName()); 2031 } 2032 2033 @Override 2034 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2035 if (opts.randomSleep > 0) { 2036 Thread.sleep(rand.nextInt(opts.randomSleep)); 2037 } 2038 HRegionLocation hRegionLocation = 2039 regionLocator.getRegionLocation(getSplitKey(rand.nextInt(opts.perClientRunRows)), true); 2040 LOG.debug("get location for region: " + hRegionLocation); 2041 return true; 2042 } 2043 2044 @Override 2045 protected int getReportingPeriod() { 2046 int period = opts.perClientRunRows / 10; 2047 return period == 0 ? opts.perClientRunRows : period; 2048 } 2049 2050 @Override 2051 protected void testTakedown() throws IOException { 2052 super.testTakedown(); 2053 } 2054 } 2055 2056 static class RandomWriteTest extends SequentialWriteTest { 2057 RandomWriteTest(Connection con, TestOptions options, Status status) { 2058 super(con, options, status); 2059 } 2060 2061 @Override 2062 protected byte[] generateRow(final int i) { 2063 return getRandomRow(this.rand, opts.totalRows); 2064 } 2065 2066 } 2067 2068 static class ScanTest extends TableTest { 2069 private ResultScanner testScanner; 2070 2071 ScanTest(Connection con, TestOptions options, Status status) { 2072 super(con, options, status); 2073 } 2074 2075 @Override 2076 void testTakedown() throws IOException { 2077 if (this.testScanner != null) { 2078 this.testScanner.close(); 2079 } 2080 super.testTakedown(); 2081 } 2082 2083 @Override 2084 boolean testRow(final int i, final long startTime) throws IOException { 2085 if (this.testScanner == null) { 2086 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2087 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2088 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2089 for (int family = 0; family < opts.families; family++) { 2090 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2091 if (opts.addColumns) { 2092 for (int column = 0; column < opts.columns; column++) { 2093 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2094 scan.addColumn(familyName, qualifier); 2095 } 2096 } else { 2097 scan.addFamily(familyName); 2098 } 2099 } 2100 if (opts.filterAll) { 2101 scan.setFilter(new FilterAllFilter()); 2102 } 2103 this.testScanner = table.getScanner(scan); 2104 } 2105 Result r = testScanner.next(); 2106 updateValueSize(r); 2107 return true; 2108 } 2109 } 2110 2111 /** 2112 * Base class for operations that are CAS-like; that read a value and then set it based off what 2113 * they read. In this category is increment, append, checkAndPut, etc. 2114 * <p> 2115 * These operations also want some concurrency going on. Usually when these tests run, they 2116 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2117 * same key space. We do this with our getStartRow and getLastRow overrides. 2118 */ 2119 static abstract class CASTableTest extends TableTest { 2120 private final byte[] qualifier; 2121 2122 CASTableTest(Connection con, TestOptions options, Status status) { 2123 super(con, options, status); 2124 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2125 } 2126 2127 byte[] getQualifier() { 2128 return this.qualifier; 2129 } 2130 2131 @Override 2132 int getStartRow() { 2133 return 0; 2134 } 2135 2136 @Override 2137 int getLastRow() { 2138 return opts.perClientRunRows; 2139 } 2140 } 2141 2142 static class IncrementTest extends CASTableTest { 2143 IncrementTest(Connection con, TestOptions options, Status status) { 2144 super(con, options, status); 2145 } 2146 2147 @Override 2148 boolean testRow(final int i, final long startTime) throws IOException { 2149 Increment increment = new Increment(format(i)); 2150 // unlike checkAndXXX tests, which make most sense to do on a single value, 2151 // if multiple families are specified for an increment test we assume it is 2152 // meant to raise the work factor 2153 for (int family = 0; family < opts.families; family++) { 2154 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2155 increment.addColumn(familyName, getQualifier(), 1l); 2156 } 2157 updateValueSize(this.table.increment(increment)); 2158 return true; 2159 } 2160 } 2161 2162 static class AppendTest extends CASTableTest { 2163 AppendTest(Connection con, TestOptions options, Status status) { 2164 super(con, options, status); 2165 } 2166 2167 @Override 2168 boolean testRow(final int i, final long startTime) throws IOException { 2169 byte[] bytes = format(i); 2170 Append append = new Append(bytes); 2171 // unlike checkAndXXX tests, which make most sense to do on a single value, 2172 // if multiple families are specified for an append test we assume it is 2173 // meant to raise the work factor 2174 for (int family = 0; family < opts.families; family++) { 2175 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2176 append.addColumn(familyName, getQualifier(), bytes); 2177 } 2178 updateValueSize(this.table.append(append)); 2179 return true; 2180 } 2181 } 2182 2183 static class CheckAndMutateTest extends CASTableTest { 2184 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2185 super(con, options, status); 2186 } 2187 2188 @Override 2189 boolean testRow(final int i, final long startTime) throws IOException { 2190 final byte[] bytes = format(i); 2191 // checkAndXXX tests operate on only a single value 2192 // Put a known value so when we go to check it, it is there. 2193 Put put = new Put(bytes); 2194 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2195 this.table.put(put); 2196 RowMutations mutations = new RowMutations(bytes); 2197 mutations.add(put); 2198 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2199 .thenMutate(mutations); 2200 return true; 2201 } 2202 } 2203 2204 static class CheckAndPutTest extends CASTableTest { 2205 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2206 super(con, options, status); 2207 } 2208 2209 @Override 2210 boolean testRow(final int i, final long startTime) throws IOException { 2211 final byte[] bytes = format(i); 2212 // checkAndXXX tests operate on only a single value 2213 // Put a known value so when we go to check it, it is there. 2214 Put put = new Put(bytes); 2215 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2216 this.table.put(put); 2217 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2218 .thenPut(put); 2219 return true; 2220 } 2221 } 2222 2223 static class CheckAndDeleteTest extends CASTableTest { 2224 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2225 super(con, options, status); 2226 } 2227 2228 @Override 2229 boolean testRow(final int i, final long startTime) throws IOException { 2230 final byte[] bytes = format(i); 2231 // checkAndXXX tests operate on only a single value 2232 // Put a known value so when we go to check it, it is there. 2233 Put put = new Put(bytes); 2234 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2235 this.table.put(put); 2236 Delete delete = new Delete(put.getRow()); 2237 delete.addColumn(FAMILY_ZERO, getQualifier()); 2238 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2239 .thenDelete(delete); 2240 return true; 2241 } 2242 } 2243 2244 /* 2245 * Delete all fake regions inserted to meta table by MetaWriteTest. 2246 */ 2247 static class CleanMetaTest extends MetaTest { 2248 CleanMetaTest(Connection con, TestOptions options, Status status) { 2249 super(con, options, status); 2250 } 2251 2252 @Override 2253 boolean testRow(final int i, final long startTime) throws IOException { 2254 try { 2255 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2256 .getRegionLocation(getSplitKey(i), false).getRegion(); 2257 LOG.debug("deleting region from meta: " + regionInfo); 2258 2259 Delete delete = 2260 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2261 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2262 t.delete(delete); 2263 } 2264 } catch (IOException ie) { 2265 // Log and continue 2266 LOG.error("cannot find region with start key: " + i); 2267 } 2268 return true; 2269 } 2270 } 2271 2272 static class SequentialReadTest extends TableTest { 2273 SequentialReadTest(Connection con, TestOptions options, Status status) { 2274 super(con, options, status); 2275 } 2276 2277 @Override 2278 boolean testRow(final int i, final long startTime) throws IOException { 2279 Get get = new Get(format(i)); 2280 for (int family = 0; family < opts.families; family++) { 2281 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2282 if (opts.addColumns) { 2283 for (int column = 0; column < opts.columns; column++) { 2284 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2285 get.addColumn(familyName, qualifier); 2286 } 2287 } else { 2288 get.addFamily(familyName); 2289 } 2290 } 2291 if (opts.filterAll) { 2292 get.setFilter(new FilterAllFilter()); 2293 } 2294 updateValueSize(table.get(get)); 2295 return true; 2296 } 2297 } 2298 2299 static class SequentialWriteTest extends BufferedMutatorTest { 2300 private ArrayList<Put> puts; 2301 2302 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2303 super(con, options, status); 2304 if (opts.multiPut > 0) { 2305 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2306 this.puts = new ArrayList<>(opts.multiPut); 2307 } 2308 } 2309 2310 protected byte[] generateRow(final int i) { 2311 return format(i); 2312 } 2313 2314 @Override 2315 boolean testRow(final int i, final long startTime) throws IOException { 2316 byte[] row = generateRow(i); 2317 Put put = new Put(row); 2318 for (int family = 0; family < opts.families; family++) { 2319 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2320 for (int column = 0; column < opts.columns; column++) { 2321 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2322 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2323 if (opts.useTags) { 2324 byte[] tag = generateData(this.rand, TAG_LENGTH); 2325 Tag[] tags = new Tag[opts.noOfTags]; 2326 for (int n = 0; n < opts.noOfTags; n++) { 2327 Tag t = new ArrayBackedTag((byte) n, tag); 2328 tags[n] = t; 2329 } 2330 KeyValue kv = 2331 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2332 put.add(kv); 2333 updateValueSize(kv.getValueLength()); 2334 } else { 2335 put.addColumn(familyName, qualifier, value); 2336 updateValueSize(value.length); 2337 } 2338 } 2339 } 2340 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2341 if (opts.autoFlush) { 2342 if (opts.multiPut > 0) { 2343 this.puts.add(put); 2344 if (this.puts.size() == opts.multiPut) { 2345 table.put(this.puts); 2346 this.puts.clear(); 2347 } else { 2348 return false; 2349 } 2350 } else { 2351 table.put(put); 2352 } 2353 } else { 2354 mutator.mutate(put); 2355 } 2356 return true; 2357 } 2358 } 2359 2360 /* 2361 * Insert fake regions into meta table with contiguous split keys. 2362 */ 2363 static class MetaWriteTest extends MetaTest { 2364 2365 MetaWriteTest(Connection con, TestOptions options, Status status) { 2366 super(con, options, status); 2367 } 2368 2369 @Override 2370 boolean testRow(final int i, final long startTime) throws IOException { 2371 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2372 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2373 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2374 regionInfos.add(regionInfo); 2375 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2376 2377 // write the serverName columns 2378 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2379 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2380 EnvironmentEdgeManager.currentTime()); 2381 return true; 2382 } 2383 } 2384 2385 static class FilteredScanTest extends TableTest { 2386 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2387 2388 FilteredScanTest(Connection con, TestOptions options, Status status) { 2389 super(con, options, status); 2390 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2391 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2392 + ". This could take a very long time."); 2393 } 2394 } 2395 2396 @Override 2397 boolean testRow(int i, final long startTime) throws IOException { 2398 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2399 Scan scan = constructScan(value); 2400 ResultScanner scanner = null; 2401 try { 2402 scanner = this.table.getScanner(scan); 2403 for (Result r = null; (r = scanner.next()) != null;) { 2404 updateValueSize(r); 2405 } 2406 } finally { 2407 if (scanner != null) { 2408 updateScanMetrics(scanner.getScanMetrics()); 2409 scanner.close(); 2410 } 2411 } 2412 return true; 2413 } 2414 2415 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2416 FilterList list = new FilterList(); 2417 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2418 new BinaryComparator(valuePrefix)); 2419 list.addFilter(filter); 2420 if (opts.filterAll) { 2421 list.addFilter(new FilterAllFilter()); 2422 } 2423 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2424 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2425 .setScanMetricsEnabled(true); 2426 if (opts.addColumns) { 2427 for (int column = 0; column < opts.columns; column++) { 2428 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2429 scan.addColumn(FAMILY_ZERO, qualifier); 2430 } 2431 } else { 2432 scan.addFamily(FAMILY_ZERO); 2433 } 2434 scan.setFilter(list); 2435 return scan; 2436 } 2437 } 2438 2439 /** 2440 * Compute a throughput rate in MB/s. 2441 * @param rows Number of records consumed. 2442 * @param timeMs Time taken in milliseconds. 2443 * @return String value with label, ie '123.76 MB/s' 2444 */ 2445 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2446 int columns) { 2447 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2448 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2449 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2450 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2451 return FMT.format(mbps) + " MB/s"; 2452 } 2453 2454 /* 2455 * Format passed integer. n * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version 2456 * of passed number (Does absolute in case number is negative). 2457 */ 2458 public static byte[] format(final int number) { 2459 byte[] b = new byte[ROW_LENGTH]; 2460 int d = Math.abs(number); 2461 for (int i = b.length - 1; i >= 0; i--) { 2462 b[i] = (byte) ((d % 10) + '0'); 2463 d /= 10; 2464 } 2465 return b; 2466 } 2467 2468 /* 2469 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2470 * test, generation of the key and value consumes about 30% of CPU time. 2471 * @return Generated random value to insert into a table cell. 2472 */ 2473 public static byte[] generateData(final Random r, int length) { 2474 byte[] b = new byte[length]; 2475 int i; 2476 2477 for (i = 0; i < (length - 8); i += 8) { 2478 b[i] = (byte) (65 + r.nextInt(26)); 2479 b[i + 1] = b[i]; 2480 b[i + 2] = b[i]; 2481 b[i + 3] = b[i]; 2482 b[i + 4] = b[i]; 2483 b[i + 5] = b[i]; 2484 b[i + 6] = b[i]; 2485 b[i + 7] = b[i]; 2486 } 2487 2488 byte a = (byte) (65 + r.nextInt(26)); 2489 for (; i < length; i++) { 2490 b[i] = a; 2491 } 2492 return b; 2493 } 2494 2495 static byte[] getRandomRow(final Random random, final int totalRows) { 2496 return format(generateRandomRow(random, totalRows)); 2497 } 2498 2499 static int generateRandomRow(final Random random, final int totalRows) { 2500 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2501 } 2502 2503 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2504 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2505 throws IOException, InterruptedException { 2506 status.setStatus( 2507 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2508 long totalElapsedTime; 2509 2510 final TestBase t; 2511 try { 2512 if (AsyncTest.class.isAssignableFrom(cmd)) { 2513 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2514 Constructor<? extends AsyncTest> constructor = 2515 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2516 t = constructor.newInstance(asyncCon, opts, status); 2517 } else { 2518 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2519 Constructor<? extends Test> constructor = 2520 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2521 t = constructor.newInstance(con, opts, status); 2522 } 2523 } catch (NoSuchMethodException e) { 2524 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2525 + ". It does not provide a constructor as described by " 2526 + "the javadoc comment. Available constructors are: " 2527 + Arrays.toString(cmd.getConstructors())); 2528 } catch (Exception e) { 2529 throw new IllegalStateException("Failed to construct command class", e); 2530 } 2531 totalElapsedTime = t.test(); 2532 2533 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2534 + " for " + opts.perClientRunRows + " rows" + " (" 2535 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2536 getAverageValueLength(opts), opts.families, opts.columns) 2537 + ")"); 2538 2539 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2540 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2541 } 2542 2543 private static int getAverageValueLength(final TestOptions opts) { 2544 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2545 } 2546 2547 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2548 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2549 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2550 // the TestOptions introspection for us and dump the output in a readable format. 2551 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2552 Admin admin = null; 2553 Connection connection = null; 2554 try { 2555 connection = ConnectionFactory.createConnection(getConf()); 2556 admin = connection.getAdmin(); 2557 checkTable(admin, opts); 2558 } finally { 2559 if (admin != null) admin.close(); 2560 if (connection != null) connection.close(); 2561 } 2562 if (opts.nomapred) { 2563 doLocalClients(opts, getConf()); 2564 } else { 2565 doMapReduce(opts, getConf()); 2566 } 2567 } 2568 2569 protected void printUsage() { 2570 printUsage(PE_COMMAND_SHORTNAME, null); 2571 } 2572 2573 protected static void printUsage(final String message) { 2574 printUsage(PE_COMMAND_SHORTNAME, message); 2575 } 2576 2577 protected static void printUsageAndExit(final String message, final int exitCode) { 2578 printUsage(message); 2579 System.exit(exitCode); 2580 } 2581 2582 protected static void printUsage(final String shortName, final String message) { 2583 if (message != null && message.length() > 0) { 2584 System.err.println(message); 2585 } 2586 System.err.print("Usage: hbase " + shortName); 2587 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2588 System.err.println(); 2589 System.err.println("General Options:"); 2590 System.err.println( 2591 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2592 System.err 2593 .println(" oneCon all the threads share the same connection. Default: False"); 2594 System.err.println(" connCount connections all threads share. " 2595 + "For example, if set to 2, then all thread share 2 connection. " 2596 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2597 + "if not, connCount=thread number"); 2598 2599 System.err.println(" sampleRate Execute test on a sample of total " 2600 + "rows. Only supported by randomRead. Default: 1.0"); 2601 System.err.println(" period Report every 'period' rows: " 2602 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2603 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2604 System.err.println( 2605 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2606 System.err.println(" latency Set to report operation latencies. Default: False"); 2607 System.err.println(" latencyThreshold Set to report number of operations with latency " 2608 + "over lantencyThreshold, unit in millisecond, default 0"); 2609 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2610 + " rows have been treated. Default: 0"); 2611 System.err 2612 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2613 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2614 + "'valueSize'; set on read for stats on size: Default: Not set."); 2615 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2616 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2617 System.err.println(); 2618 System.err.println("Table Creation / Write Tests:"); 2619 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2620 System.err.println( 2621 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2622 + ". In case of randomReads and randomSeekScans this could" 2623 + " be specified along with --size to specify the number of rows to be scanned within" 2624 + " the total range specified by the size."); 2625 System.err.println( 2626 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2627 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2628 + " use size to specify the end range and --rows" 2629 + " specifies the number of rows within that range. " + "Default: 1.0."); 2630 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2631 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2632 System.err.println( 2633 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2634 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2635 + "'valueSize' in zipf form: Default: Not set."); 2636 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2637 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2638 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2639 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2640 System.err.println(" presplit Create presplit table. If a table with same name exists," 2641 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2642 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2643 System.err.println( 2644 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2645 System.err.println(" numoftags Specify the no of tags that would be needed. " 2646 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2647 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2648 System.err.println(" columns Columns to write per row. Default: 1"); 2649 System.err 2650 .println(" families Specify number of column families for the table. Default: 1"); 2651 System.err.println(); 2652 System.err.println("Read Tests:"); 2653 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2654 + " there by not returning any thing back to the client. Helps to check the server side" 2655 + " performance. Uses FilterAllFilter internally. "); 2656 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2657 + "by randomRead. Default: disabled"); 2658 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2659 + "inmemory as far as possible. Not guaranteed that reads are always served " 2660 + "from memory. Default: false"); 2661 System.err 2662 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2663 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2664 System.err 2665 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2666 + "Uses the CompactingMemstore"); 2667 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2668 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2669 System.err.println( 2670 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2671 System.err.println(" caching Scan caching to use. Default: 30"); 2672 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2673 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2674 System.err.println( 2675 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2676 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2677 System.err.println(); 2678 System.err.println(" Note: -D properties will be applied to the conf used. "); 2679 System.err.println(" For example: "); 2680 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2681 System.err.println(" -Dmapreduce.task.timeout=60000"); 2682 System.err.println(); 2683 System.err.println("Command:"); 2684 for (CmdDescriptor command : COMMANDS.values()) { 2685 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2686 } 2687 System.err.println(); 2688 System.err.println("Args:"); 2689 System.err.println(" nclients Integer. Required. Total number of clients " 2690 + "(and HRegionServers) running. 1 <= value <= 500"); 2691 System.err.println("Examples:"); 2692 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2693 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2694 System.err.println(" To run 10 clients doing increments over ten rows:"); 2695 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2696 } 2697 2698 /** 2699 * Parse options passed in via an arguments array. Assumes that array has been split on 2700 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2701 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2702 * arguments. 2703 */ 2704 static TestOptions parseOpts(Queue<String> args) { 2705 TestOptions opts = new TestOptions(); 2706 2707 String cmd = null; 2708 while ((cmd = args.poll()) != null) { 2709 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2710 // place item back onto queue so that caller knows parsing was incomplete 2711 args.add(cmd); 2712 break; 2713 } 2714 2715 final String nmr = "--nomapred"; 2716 if (cmd.startsWith(nmr)) { 2717 opts.nomapred = true; 2718 continue; 2719 } 2720 2721 final String rows = "--rows="; 2722 if (cmd.startsWith(rows)) { 2723 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2724 continue; 2725 } 2726 2727 final String cycles = "--cycles="; 2728 if (cmd.startsWith(cycles)) { 2729 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2730 continue; 2731 } 2732 2733 final String sampleRate = "--sampleRate="; 2734 if (cmd.startsWith(sampleRate)) { 2735 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2736 continue; 2737 } 2738 2739 final String table = "--table="; 2740 if (cmd.startsWith(table)) { 2741 opts.tableName = cmd.substring(table.length()); 2742 continue; 2743 } 2744 2745 final String startRow = "--startRow="; 2746 if (cmd.startsWith(startRow)) { 2747 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2748 continue; 2749 } 2750 2751 final String compress = "--compress="; 2752 if (cmd.startsWith(compress)) { 2753 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2754 continue; 2755 } 2756 2757 final String encryption = "--encryption="; 2758 if (cmd.startsWith(encryption)) { 2759 opts.encryption = cmd.substring(encryption.length()); 2760 continue; 2761 } 2762 2763 final String traceRate = "--traceRate="; 2764 if (cmd.startsWith(traceRate)) { 2765 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2766 continue; 2767 } 2768 2769 final String blockEncoding = "--blockEncoding="; 2770 if (cmd.startsWith(blockEncoding)) { 2771 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2772 continue; 2773 } 2774 2775 final String flushCommits = "--flushCommits="; 2776 if (cmd.startsWith(flushCommits)) { 2777 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2778 continue; 2779 } 2780 2781 final String writeToWAL = "--writeToWAL="; 2782 if (cmd.startsWith(writeToWAL)) { 2783 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2784 continue; 2785 } 2786 2787 final String presplit = "--presplit="; 2788 if (cmd.startsWith(presplit)) { 2789 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2790 continue; 2791 } 2792 2793 final String inMemory = "--inmemory="; 2794 if (cmd.startsWith(inMemory)) { 2795 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2796 continue; 2797 } 2798 2799 final String autoFlush = "--autoFlush="; 2800 if (cmd.startsWith(autoFlush)) { 2801 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2802 continue; 2803 } 2804 2805 final String onceCon = "--oneCon="; 2806 if (cmd.startsWith(onceCon)) { 2807 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2808 continue; 2809 } 2810 2811 final String connCount = "--connCount="; 2812 if (cmd.startsWith(connCount)) { 2813 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2814 continue; 2815 } 2816 2817 final String latencyThreshold = "--latencyThreshold="; 2818 if (cmd.startsWith(latencyThreshold)) { 2819 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2820 continue; 2821 } 2822 2823 final String latency = "--latency"; 2824 if (cmd.startsWith(latency)) { 2825 opts.reportLatency = true; 2826 continue; 2827 } 2828 2829 final String multiGet = "--multiGet="; 2830 if (cmd.startsWith(multiGet)) { 2831 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2832 continue; 2833 } 2834 2835 final String multiPut = "--multiPut="; 2836 if (cmd.startsWith(multiPut)) { 2837 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2838 continue; 2839 } 2840 2841 final String useTags = "--usetags="; 2842 if (cmd.startsWith(useTags)) { 2843 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2844 continue; 2845 } 2846 2847 final String noOfTags = "--numoftags="; 2848 if (cmd.startsWith(noOfTags)) { 2849 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2850 continue; 2851 } 2852 2853 final String replicas = "--replicas="; 2854 if (cmd.startsWith(replicas)) { 2855 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2856 continue; 2857 } 2858 2859 final String filterOutAll = "--filterAll"; 2860 if (cmd.startsWith(filterOutAll)) { 2861 opts.filterAll = true; 2862 continue; 2863 } 2864 2865 final String size = "--size="; 2866 if (cmd.startsWith(size)) { 2867 opts.size = Float.parseFloat(cmd.substring(size.length())); 2868 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2869 continue; 2870 } 2871 2872 final String splitPolicy = "--splitPolicy="; 2873 if (cmd.startsWith(splitPolicy)) { 2874 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2875 continue; 2876 } 2877 2878 final String randomSleep = "--randomSleep="; 2879 if (cmd.startsWith(randomSleep)) { 2880 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2881 continue; 2882 } 2883 2884 final String measureAfter = "--measureAfter="; 2885 if (cmd.startsWith(measureAfter)) { 2886 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2887 continue; 2888 } 2889 2890 final String bloomFilter = "--bloomFilter="; 2891 if (cmd.startsWith(bloomFilter)) { 2892 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2893 continue; 2894 } 2895 2896 final String blockSize = "--blockSize="; 2897 if (cmd.startsWith(blockSize)) { 2898 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2899 continue; 2900 } 2901 2902 final String valueSize = "--valueSize="; 2903 if (cmd.startsWith(valueSize)) { 2904 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2905 continue; 2906 } 2907 2908 final String valueRandom = "--valueRandom"; 2909 if (cmd.startsWith(valueRandom)) { 2910 opts.valueRandom = true; 2911 continue; 2912 } 2913 2914 final String valueZipf = "--valueZipf"; 2915 if (cmd.startsWith(valueZipf)) { 2916 opts.valueZipf = true; 2917 continue; 2918 } 2919 2920 final String period = "--period="; 2921 if (cmd.startsWith(period)) { 2922 opts.period = Integer.parseInt(cmd.substring(period.length())); 2923 continue; 2924 } 2925 2926 final String addColumns = "--addColumns="; 2927 if (cmd.startsWith(addColumns)) { 2928 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2929 continue; 2930 } 2931 2932 final String inMemoryCompaction = "--inmemoryCompaction="; 2933 if (cmd.startsWith(inMemoryCompaction)) { 2934 opts.inMemoryCompaction = 2935 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2936 continue; 2937 } 2938 2939 final String columns = "--columns="; 2940 if (cmd.startsWith(columns)) { 2941 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2942 continue; 2943 } 2944 2945 final String families = "--families="; 2946 if (cmd.startsWith(families)) { 2947 opts.families = Integer.parseInt(cmd.substring(families.length())); 2948 continue; 2949 } 2950 2951 final String caching = "--caching="; 2952 if (cmd.startsWith(caching)) { 2953 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2954 continue; 2955 } 2956 2957 final String asyncPrefetch = "--asyncPrefetch"; 2958 if (cmd.startsWith(asyncPrefetch)) { 2959 opts.asyncPrefetch = true; 2960 continue; 2961 } 2962 2963 final String cacheBlocks = "--cacheBlocks="; 2964 if (cmd.startsWith(cacheBlocks)) { 2965 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 2966 continue; 2967 } 2968 2969 final String scanReadType = "--scanReadType="; 2970 if (cmd.startsWith(scanReadType)) { 2971 opts.scanReadType = 2972 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 2973 continue; 2974 } 2975 2976 final String bufferSize = "--bufferSize="; 2977 if (cmd.startsWith(bufferSize)) { 2978 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 2979 continue; 2980 } 2981 2982 validateParsedOpts(opts); 2983 2984 if (isCommandClass(cmd)) { 2985 opts.cmdName = cmd; 2986 try { 2987 opts.numClientThreads = Integer.parseInt(args.remove()); 2988 } catch (NoSuchElementException | NumberFormatException e) { 2989 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 2990 } 2991 opts = calculateRowsAndSize(opts); 2992 break; 2993 } else { 2994 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 2995 } 2996 2997 // Not matching any option or command. 2998 System.err.println("Error: Wrong option or command: " + cmd); 2999 args.add(cmd); 3000 break; 3001 } 3002 return opts; 3003 } 3004 3005 /** 3006 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3007 */ 3008 private static void validateParsedOpts(TestOptions opts) { 3009 3010 if (!opts.autoFlush && opts.multiPut > 0) { 3011 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3012 } 3013 3014 if (opts.oneCon && opts.connCount > 1) { 3015 throw new IllegalArgumentException( 3016 "oneCon is set to true, " + "connCount should not bigger than 1"); 3017 } 3018 3019 if (opts.valueZipf && opts.valueRandom) { 3020 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3021 } 3022 } 3023 3024 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3025 int rowsPerGB = getRowsPerGB(opts); 3026 if ( 3027 (opts.getCmdName() != null 3028 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3029 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3030 ) { 3031 opts.totalRows = (int) opts.size * rowsPerGB; 3032 } else if (opts.size != DEFAULT_OPTS.size) { 3033 // total size in GB specified 3034 opts.totalRows = (int) opts.size * rowsPerGB; 3035 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3036 } else { 3037 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3038 opts.size = opts.totalRows / rowsPerGB; 3039 } 3040 return opts; 3041 } 3042 3043 static int getRowsPerGB(final TestOptions opts) { 3044 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3045 * opts.getColumns()); 3046 } 3047 3048 @Override 3049 public int run(String[] args) throws Exception { 3050 // Process command-line args. TODO: Better cmd-line processing 3051 // (but hopefully something not as painful as cli options). 3052 int errCode = -1; 3053 if (args.length < 1) { 3054 printUsage(); 3055 return errCode; 3056 } 3057 3058 try { 3059 LinkedList<String> argv = new LinkedList<>(); 3060 argv.addAll(Arrays.asList(args)); 3061 TestOptions opts = parseOpts(argv); 3062 3063 // args remaining, print help and exit 3064 if (!argv.isEmpty()) { 3065 errCode = 0; 3066 printUsage(); 3067 return errCode; 3068 } 3069 3070 // must run at least 1 client 3071 if (opts.numClientThreads <= 0) { 3072 throw new IllegalArgumentException("Number of clients must be > 0"); 3073 } 3074 3075 // cmdName should not be null, print help and exit 3076 if (opts.cmdName == null) { 3077 printUsage(); 3078 return errCode; 3079 } 3080 3081 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3082 if (cmdClass != null) { 3083 runTest(cmdClass, opts); 3084 errCode = 0; 3085 } 3086 3087 } catch (Exception e) { 3088 e.printStackTrace(); 3089 } 3090 3091 return errCode; 3092 } 3093 3094 private static boolean isCommandClass(String cmd) { 3095 return COMMANDS.containsKey(cmd); 3096 } 3097 3098 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3099 CmdDescriptor descriptor = COMMANDS.get(cmd); 3100 return descriptor != null ? descriptor.getCmdClass() : null; 3101 } 3102 3103 public static void main(final String[] args) throws Exception { 3104 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3105 System.exit(res); 3106 } 3107}