001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static com.codahale.metrics.MetricRegistry.name; 021 022import com.codahale.metrics.ConsoleReporter; 023import com.codahale.metrics.Histogram; 024import com.codahale.metrics.Meter; 025import com.codahale.metrics.MetricFilter; 026import com.codahale.metrics.MetricRegistry; 027import io.opentelemetry.api.trace.Span; 028import io.opentelemetry.context.Scope; 029import java.io.IOException; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.Map; 033import java.util.NavigableMap; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ThreadLocalRandom; 038import java.util.concurrent.TimeUnit; 039import java.util.stream.IntStream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.conf.Configured; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HBaseInterfaceAudience; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.MockRegionServerServices; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ClientInternalHelper; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Put; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.RegionInfoBuilder; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 057import org.apache.hadoop.hbase.io.crypto.MockAesKeyProvider; 058import org.apache.hadoop.hbase.regionserver.HRegion; 059import org.apache.hadoop.hbase.regionserver.LogRoller; 060import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 061import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 062import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 063import org.apache.hadoop.hbase.trace.TraceUtil; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.Threads; 068import org.apache.hadoop.hbase.util.WALPerformanceEvaluationUtil; 069import org.apache.hadoop.util.Tool; 070import org.apache.hadoop.util.ToolRunner; 071import org.apache.yetus.audience.InterfaceAudience; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075// imports for things that haven't moved from regionserver.wal yet. 076 077/** 078 * This class runs performance benchmarks for {@link WAL}. See usage for this tool by running: 079 * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code> 080 */ 081@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 082public final class WALPerformanceEvaluation extends Configured implements Tool { 083 private static final Logger LOG = LoggerFactory.getLogger(WALPerformanceEvaluation.class); 084 085 private final MetricRegistry metrics = new MetricRegistry(); 086 private final Meter syncMeter = 087 metrics.meter(name(WALPerformanceEvaluation.class, "syncMeter", "syncs")); 088 089 private final Histogram syncHistogram = 090 metrics.histogram(name(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs")); 091 private final Histogram syncCountHistogram = 092 metrics.histogram(name(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync")); 093 private final Meter appendMeter = 094 metrics.meter(name(WALPerformanceEvaluation.class, "appendMeter", "bytes")); 095 private final Histogram latencyHistogram = 096 metrics.histogram(name(WALPerformanceEvaluation.class, "latencyHistogram", "nanos")); 097 098 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 099 100 static final String TABLE_NAME = "WALPerformanceEvaluation"; 101 static final String QUALIFIER_PREFIX = "q"; 102 static final String FAMILY_PREFIX = "cf"; 103 104 private int numQualifiers = 1; 105 private int valueSize = 512; 106 private int keySize = 16; 107 108 @Override 109 public void setConf(Configuration conf) { 110 super.setConf(conf); 111 } 112 113 /** 114 * Perform WAL.append() of Put object, for the number of iterations requested. Keys and Vaues are 115 * generated randomly, the number of column families, qualifiers and key/value size is tunable by 116 * the user. 117 */ 118 class WALPutBenchmark implements Runnable { 119 private final long numIterations; 120 private final int numFamilies; 121 private final boolean noSync; 122 private final HRegion region; 123 private final int syncInterval; 124 private final NavigableMap<byte[], Integer> scopes; 125 126 WALPutBenchmark(final HRegion region, final TableDescriptor htd, final long numIterations, 127 final boolean noSync, final int syncInterval) { 128 this.numIterations = numIterations; 129 this.noSync = noSync; 130 this.syncInterval = syncInterval; 131 this.numFamilies = htd.getColumnFamilyCount(); 132 this.region = region; 133 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 134 for (byte[] fam : htd.getColumnFamilyNames()) { 135 scopes.put(fam, 0); 136 } 137 } 138 139 @Override 140 public void run() { 141 byte[] key = new byte[keySize]; 142 byte[] value = new byte[valueSize]; 143 WAL wal = region.getWAL(); 144 Span threadSpan = TraceUtil.getGlobalTracer() 145 .spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan(); 146 try (Scope threadScope = threadSpan.makeCurrent()) { 147 int lastSync = 0; 148 for (int i = 0; i < numIterations; ++i) { 149 assert Span.current() == threadSpan : "Span leak detected."; 150 Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan(); 151 try (Scope loopScope = loopSpan.makeCurrent()) { 152 long now = System.nanoTime(); 153 Put put = setupPut(ThreadLocalRandom.current(), key, value, numFamilies); 154 WALEdit walEdit = new WALEdit(); 155 walEdit.addMap(ClientInternalHelper.getExtendedFamilyCellMap(put)); 156 RegionInfo hri = region.getRegionInfo(); 157 final WALKeyImpl logkey = 158 new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); 159 wal.appendData(hri, logkey, walEdit); 160 if (!this.noSync) { 161 if (++lastSync >= this.syncInterval) { 162 wal.sync(); 163 lastSync = 0; 164 } 165 } 166 latencyHistogram.update(System.nanoTime() - now); 167 } finally { 168 loopSpan.end(); 169 } 170 } 171 } catch (IOException e) { 172 LOG.error(getClass().getSimpleName() + " Thread failed", e); 173 } finally { 174 threadSpan.end(); 175 } 176 } 177 } 178 179 @Override 180 public int run(String[] args) throws Exception { 181 Path rootRegionDir = null; 182 int numThreads = 1; 183 long numIterations = 1000000; 184 int numFamilies = 1; 185 int syncInterval = 0; 186 boolean noSync = false; 187 boolean verify = false; 188 boolean verbose = false; 189 boolean cleanup = true; 190 boolean noclosefs = false; 191 long roll = Long.MAX_VALUE; 192 boolean compress = false; 193 String cipher = null; 194 int numRegions = 1; 195 // Process command line args 196 for (int i = 0; i < args.length; i++) { 197 String cmd = args[i]; 198 try { 199 if (cmd.equals("-threads")) { 200 numThreads = Integer.parseInt(args[++i]); 201 } else if (cmd.equals("-iterations")) { 202 numIterations = Long.parseLong(args[++i]); 203 } else if (cmd.equals("-path")) { 204 rootRegionDir = new Path(args[++i]); 205 } else if (cmd.equals("-families")) { 206 numFamilies = Integer.parseInt(args[++i]); 207 } else if (cmd.equals("-qualifiers")) { 208 numQualifiers = Integer.parseInt(args[++i]); 209 } else if (cmd.equals("-keySize")) { 210 keySize = Integer.parseInt(args[++i]); 211 } else if (cmd.equals("-valueSize")) { 212 valueSize = Integer.parseInt(args[++i]); 213 } else if (cmd.equals("-syncInterval")) { 214 syncInterval = Integer.parseInt(args[++i]); 215 } else if (cmd.equals("-nosync")) { 216 noSync = true; 217 } else if (cmd.equals("-verify")) { 218 verify = true; 219 } else if (cmd.equals("-verbose")) { 220 verbose = true; 221 } else if (cmd.equals("-nocleanup")) { 222 cleanup = false; 223 } else if (cmd.equals("-noclosefs")) { 224 noclosefs = true; 225 } else if (cmd.equals("-roll")) { 226 roll = Long.parseLong(args[++i]); 227 } else if (cmd.equals("-compress")) { 228 compress = true; 229 } else if (cmd.equals("-encryption")) { 230 cipher = args[++i]; 231 } else if (cmd.equals("-regions")) { 232 numRegions = Integer.parseInt(args[++i]); 233 } else if (cmd.equals("-traceFreq")) { 234 // keep it here for compatible 235 System.err.println("-traceFreq is not supported any more"); 236 } else if (cmd.equals("-h")) { 237 printUsageAndExit(); 238 } else if (cmd.equals("--help")) { 239 printUsageAndExit(); 240 } else { 241 System.err.println("UNEXPECTED: " + cmd); 242 printUsageAndExit(); 243 } 244 } catch (NumberFormatException e) { 245 printUsageAndExit(); 246 } 247 } 248 249 if (compress) { 250 Configuration conf = getConf(); 251 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); 252 } 253 254 if (cipher != null) { 255 // Set up WAL for encryption 256 Configuration conf = getConf(); 257 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, MockAesKeyProvider.class.getName()); 258 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); 259 conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); 260 conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher); 261 } 262 263 if (numThreads < numRegions) { 264 LOG.warn("Number of threads is less than the number of regions; some regions will sit idle."); 265 } 266 267 // Internal config. goes off number of threads; if more threads than handlers, stuff breaks. 268 // In regionserver, number of handlers == number of threads. 269 getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads); 270 // We do not need memstore here, so disable memstore lab, otherwise we need to initialize 271 // ChunkCreator 272 getConf().setBoolean(MemStoreLAB.USEMSLAB_KEY, false); 273 274 if (rootRegionDir == null) { 275 rootRegionDir = new WALPerformanceEvaluationUtil(getConf()) 276 .getDataTestDirOnTestFS("WALPerformanceEvaluation"); 277 } 278 // Run WAL Performance Evaluation 279 // First set the fs from configs. In case we are on hadoop1 280 CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf())); 281 FileSystem fs = FileSystem.get(getConf()); 282 LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir); 283 Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan(); 284 try (Scope scope = span.makeCurrent()) { 285 rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 286 cleanRegionRootDir(fs, rootRegionDir); 287 CommonFSUtils.setRootDir(getConf(), rootRegionDir); 288 final WALFactory wals = new WALFactory(getConf(), "wals"); 289 final HRegion[] regions = new HRegion[numRegions]; 290 final Runnable[] benchmarks = new Runnable[numRegions]; 291 final MockRegionServerServices mockServices = new MockRegionServerServices(getConf()); 292 final LogRoller roller = new LogRoller(mockServices); 293 Threads.setDaemonThreadRunning(roller, "WALPerfEval.logRoller"); 294 295 try { 296 for (int i = 0; i < numRegions; i++) { 297 // Initialize Table Descriptor 298 // a table per desired region means we can avoid carving up the key space 299 final TableDescriptor htd = createHTableDescriptor(i, numFamilies); 300 regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller); 301 benchmarks[i] = new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval); 302 } 303 ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).outputTo(System.out) 304 .convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build(); 305 reporter.start(30, TimeUnit.SECONDS); 306 307 long putTime = runBenchmark(benchmarks, numThreads); 308 logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations 309 + ", syncInterval=" + syncInterval, numIterations * numThreads, putTime); 310 311 for (int i = 0; i < numRegions; i++) { 312 if (regions[i] != null) { 313 closeRegion(regions[i]); 314 regions[i] = null; 315 } 316 } 317 if (verify) { 318 LOG.info("verifying written log entries."); 319 Path dir = new Path(CommonFSUtils.getRootDir(getConf()), 320 AbstractFSWALProvider.getWALDirectoryName("wals")); 321 long editCount = 0; 322 FileStatus[] fsss = fs.listStatus(dir); 323 if (fsss.length == 0) throw new IllegalStateException("No WAL found"); 324 for (FileStatus fss : fsss) { 325 Path p = fss.getPath(); 326 if (!fs.exists(p)) throw new IllegalStateException(p.toString()); 327 editCount += verify(wals, p, verbose); 328 } 329 long expected = numIterations * numThreads; 330 if (editCount != expected) { 331 throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected); 332 } 333 } 334 } finally { 335 mockServices.stop("test clean up."); 336 for (int i = 0; i < numRegions; i++) { 337 if (regions[i] != null) { 338 closeRegion(regions[i]); 339 } 340 } 341 if (null != roller) { 342 LOG.info("shutting down log roller."); 343 roller.close(); 344 } 345 wals.shutdown(); 346 // Remove the root dir for this test region 347 if (cleanup) cleanRegionRootDir(fs, rootRegionDir); 348 } 349 } finally { 350 span.end(); 351 // We may be called inside a test that wants to keep on using the fs. 352 if (!noclosefs) { 353 fs.close(); 354 } 355 } 356 357 return 0; 358 } 359 360 private static TableDescriptor createHTableDescriptor(final int regionNum, 361 final int numFamilies) { 362 TableDescriptorBuilder builder = 363 TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME + ":" + regionNum)); 364 IntStream.range(0, numFamilies) 365 .mapToObj(i -> ColumnFamilyDescriptorBuilder.of(FAMILY_PREFIX + i)) 366 .forEachOrdered(builder::setColumnFamily); 367 return builder.build(); 368 } 369 370 /** 371 * Verify the content of the WAL file. Verify that the file has expected number of edits. 372 * @param wals may not be null 373 * @return Count of edits. 374 */ 375 private long verify(final WALFactory wals, final Path wal, final boolean verbose) 376 throws IOException { 377 WALStreamReader reader = wals.createStreamReader(wal.getFileSystem(getConf()), wal); 378 long count = 0; 379 Map<String, Long> sequenceIds = new HashMap<>(); 380 try { 381 while (true) { 382 WAL.Entry e = reader.next(); 383 if (e == null) { 384 LOG.debug("Read count=" + count + " from " + wal); 385 break; 386 } 387 count++; 388 long seqid = e.getKey().getSequenceId(); 389 if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) { 390 // sequenceIds should be increasing for every regions 391 if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) { 392 throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = " 393 + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) 394 + ", current seqid = " + seqid); 395 } 396 } 397 // update the sequence Id. 398 sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid); 399 if (verbose) LOG.info("seqid=" + seqid); 400 } 401 } finally { 402 reader.close(); 403 } 404 return count; 405 } 406 407 private static void logBenchmarkResult(String testName, long numTests, long totalTime) { 408 float tsec = totalTime / 1000.0f; 409 LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec)); 410 411 } 412 413 private void printUsageAndExit() { 414 System.err.printf("Usage: hbase %s [options]%n", getClass().getName()); 415 System.err.println(" where [options] are:"); 416 System.err.println(" -h|-help Show this help and exit."); 417 System.err.println(" -threads <N> Number of threads writing on the WAL."); 418 System.err.println(" -regions <N> Number of regions to open in the WAL. Default: 1"); 419 System.err.println(" -iterations <N> Number of iterations per thread."); 420 System.err.println(" -path <PATH> Path where region's root directory is created."); 421 System.err.println(" -families <N> Number of column families to write."); 422 System.err.println(" -qualifiers <N> Number of qualifiers to write."); 423 System.err.println(" -keySize <N> Row key size in byte."); 424 System.err.println(" -valueSize <N> Row/Col value size in byte."); 425 System.err.println(" -nocleanup Do NOT remove test data when done."); 426 System.err.println(" -noclosefs Do NOT close the filesystem when done."); 427 System.err.println(" -nosync Append without syncing"); 428 System.err.println( 429 " -syncInterval <N> Append N edits and then sync. " + "Default=0, i.e. sync every edit."); 430 System.err.println(" -verify Verify edits written in sequence"); 431 System.err 432 .println(" -verbose Output extra info; " + "e.g. all edit seq ids when verifying"); 433 System.err.println(" -roll <N> Roll the way every N appends"); 434 System.err.println(" -encryption <A> Encrypt the WAL with algorithm A, e.g. AES"); 435 System.err.println(" -traceFreq <N> Rate of trace sampling. Default: 1.0, " 436 + "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=..."); 437 System.err.println(""); 438 System.err.println("Examples:"); 439 System.err.println(""); 440 System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " 441 + "verification afterward do:"); 442 System.err.println(" $ hbase org.apache.hadoop.hbase.wal." + "WALPerformanceEvaluation \\"); 443 System.err.println(" -conf ./core-site.xml -path hdfs://example.org:7000/tmp " 444 + "-threads 100 -roll 10000 -verify"); 445 System.exit(1); 446 } 447 448 private final Set<WAL> walsListenedTo = new HashSet<>(); 449 450 private HRegion openRegion(final FileSystem fs, final Path dir, final TableDescriptor htd, 451 final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException { 452 // Initialize HRegion 453 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 454 // Initialize WAL 455 final WAL wal = wals.getWAL(regionInfo); 456 // If we haven't already, attach a listener to this wal to handle rolls and metrics. 457 if (walsListenedTo.add(wal)) { 458 roller.addWAL(wal); 459 wal.registerWALActionsListener(new WALActionsListener() { 460 private int appends = 0; 461 462 @Override 463 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 464 this.appends++; 465 if (this.appends % whenToRoll == 0) { 466 LOG.info("Rolling after " + appends + " edits"); 467 // We used to do explicit call to rollWriter but changed it to a request 468 // to avoid dead lock (there are less threads going on in this class than 469 // in the regionserver -- regionserver does not have the issue). 470 AbstractFSWALProvider.requestLogRoll(wal); 471 } 472 } 473 474 @Override 475 public void postSync(final long timeInNanos, final int handlerSyncs) { 476 syncMeter.mark(); 477 syncHistogram.update(timeInNanos); 478 syncCountHistogram.update(handlerSyncs); 479 } 480 481 @Override 482 public void postAppend(final long size, final long elapsedTime, final WALKey logkey, 483 final WALEdit logEdit) { 484 appendMeter.mark(size); 485 } 486 }); 487 } 488 489 return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal); 490 } 491 492 private void closeRegion(final HRegion region) throws IOException { 493 if (region != null) { 494 region.close(); 495 WAL wal = region.getWAL(); 496 if (wal != null) { 497 wal.shutdown(); 498 } 499 } 500 } 501 502 private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException { 503 if (fs.exists(dir)) { 504 fs.delete(dir, true); 505 } 506 } 507 508 private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) { 509 rand.nextBytes(key); 510 Put put = new Put(key); 511 for (int cf = 0; cf < numFamilies; ++cf) { 512 for (int q = 0; q < numQualifiers; ++q) { 513 rand.nextBytes(value); 514 put.addColumn(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), 515 value); 516 } 517 } 518 return put; 519 } 520 521 private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException { 522 Thread[] threads = new Thread[numThreads]; 523 long startTime = EnvironmentEdgeManager.currentTime(); 524 for (int i = 0; i < numThreads; ++i) { 525 threads[i] = 526 new Thread(runnable[i % runnable.length], "t" + i + ",r" + (i % runnable.length)); 527 threads[i].start(); 528 } 529 for (Thread t : threads) 530 t.join(); 531 long endTime = EnvironmentEdgeManager.currentTime(); 532 return (endTime - startTime); 533 } 534 535 /** 536 * The guts of the {@link #main} method. Call this method to avoid the {@link #main(String[])} 537 * System.exit. 538 */ 539 static int innerMain(final Configuration c, final String[] args) throws Exception { 540 return ToolRunner.run(c, new WALPerformanceEvaluation(), args); 541 } 542 543 public static void main(String[] args) throws Exception { 544 System.exit(innerMain(HBaseConfiguration.create(), args)); 545 } 546}