001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static com.codahale.metrics.MetricRegistry.name; 021 022import com.codahale.metrics.ConsoleReporter; 023import com.codahale.metrics.Histogram; 024import com.codahale.metrics.Meter; 025import com.codahale.metrics.MetricFilter; 026import com.codahale.metrics.MetricRegistry; 027import io.opentelemetry.api.trace.Span; 028import io.opentelemetry.context.Scope; 029import java.io.IOException; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.Map; 033import java.util.NavigableMap; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ThreadLocalRandom; 038import java.util.concurrent.TimeUnit; 039import java.util.stream.IntStream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.conf.Configured; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.MockRegionServerServices; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionInfoBuilder; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; 057import org.apache.hadoop.hbase.regionserver.HRegion; 058import org.apache.hadoop.hbase.regionserver.LogRoller; 059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 060import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader; 061import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter; 062import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 063import org.apache.hadoop.hbase.trace.TraceUtil; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.Threads; 068import org.apache.hadoop.hbase.wal.WALProvider.Writer; 069import org.apache.hadoop.util.Tool; 070import org.apache.hadoop.util.ToolRunner; 071import org.apache.yetus.audience.InterfaceAudience; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075// imports for things that haven't moved from regionserver.wal yet. 076 077/** 078 * This class runs performance benchmarks for {@link WAL}. See usage for this tool by running: 079 * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code> 080 */ 081@InterfaceAudience.Private 082public final class WALPerformanceEvaluation extends Configured implements Tool { 083 private static final Logger LOG = LoggerFactory.getLogger(WALPerformanceEvaluation.class); 084 085 private final MetricRegistry metrics = new MetricRegistry(); 086 private final Meter syncMeter = 087 metrics.meter(name(WALPerformanceEvaluation.class, "syncMeter", "syncs")); 088 089 private final Histogram syncHistogram = 090 metrics.histogram(name(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs")); 091 private final Histogram syncCountHistogram = 092 metrics.histogram(name(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync")); 093 private final Meter appendMeter = 094 metrics.meter(name(WALPerformanceEvaluation.class, "appendMeter", "bytes")); 095 private final Histogram latencyHistogram = 096 metrics.histogram(name(WALPerformanceEvaluation.class, "latencyHistogram", "nanos")); 097 098 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 099 100 private HBaseTestingUtility TEST_UTIL; 101 102 static final String TABLE_NAME = "WALPerformanceEvaluation"; 103 static final String QUALIFIER_PREFIX = "q"; 104 static final String FAMILY_PREFIX = "cf"; 105 106 private int numQualifiers = 1; 107 private int valueSize = 512; 108 private int keySize = 16; 109 110 @Override 111 public void setConf(Configuration conf) { 112 super.setConf(conf); 113 } 114 115 /** 116 * Perform WAL.append() of Put object, for the number of iterations requested. Keys and Vaues are 117 * generated randomly, the number of column families, qualifiers and key/value size is tunable by 118 * the user. 119 */ 120 class WALPutBenchmark implements Runnable { 121 private final long numIterations; 122 private final int numFamilies; 123 private final boolean noSync; 124 private final HRegion region; 125 private final int syncInterval; 126 private final NavigableMap<byte[], Integer> scopes; 127 128 WALPutBenchmark(final HRegion region, final TableDescriptor htd, final long numIterations, 129 final boolean noSync, final int syncInterval) { 130 this.numIterations = numIterations; 131 this.noSync = noSync; 132 this.syncInterval = syncInterval; 133 this.numFamilies = htd.getColumnFamilyCount(); 134 this.region = region; 135 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 136 for (byte[] fam : htd.getColumnFamilyNames()) { 137 scopes.put(fam, 0); 138 } 139 } 140 141 @Override 142 public void run() { 143 byte[] key = new byte[keySize]; 144 byte[] value = new byte[valueSize]; 145 WAL wal = region.getWAL(); 146 Span threadSpan = TraceUtil.getGlobalTracer() 147 .spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan(); 148 try (Scope threadScope = threadSpan.makeCurrent()) { 149 int lastSync = 0; 150 for (int i = 0; i < numIterations; ++i) { 151 assert Span.current() == threadSpan : "Span leak detected."; 152 Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan(); 153 try (Scope loopScope = loopSpan.makeCurrent()) { 154 long now = System.nanoTime(); 155 Put put = setupPut(ThreadLocalRandom.current(), key, value, numFamilies); 156 WALEdit walEdit = new WALEdit(); 157 walEdit.add(put.getFamilyCellMap()); 158 RegionInfo hri = region.getRegionInfo(); 159 final WALKeyImpl logkey = 160 new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); 161 wal.appendData(hri, logkey, walEdit); 162 if (!this.noSync) { 163 if (++lastSync >= this.syncInterval) { 164 wal.sync(); 165 lastSync = 0; 166 } 167 } 168 latencyHistogram.update(System.nanoTime() - now); 169 } finally { 170 loopSpan.end(); 171 } 172 } 173 } catch (Exception e) { 174 LOG.error(getClass().getSimpleName() + " Thread failed", e); 175 } finally { 176 threadSpan.end(); 177 } 178 } 179 } 180 181 @Override 182 public int run(String[] args) throws Exception { 183 Path rootRegionDir = null; 184 int numThreads = 1; 185 long numIterations = 1000000; 186 int numFamilies = 1; 187 int syncInterval = 0; 188 boolean noSync = false; 189 boolean verify = false; 190 boolean verbose = false; 191 boolean cleanup = true; 192 boolean noclosefs = false; 193 long roll = Long.MAX_VALUE; 194 boolean compress = false; 195 String cipher = null; 196 int numRegions = 1; 197 // Process command line args 198 for (int i = 0; i < args.length; i++) { 199 String cmd = args[i]; 200 try { 201 if (cmd.equals("-threads")) { 202 numThreads = Integer.parseInt(args[++i]); 203 } else if (cmd.equals("-iterations")) { 204 numIterations = Long.parseLong(args[++i]); 205 } else if (cmd.equals("-path")) { 206 rootRegionDir = new Path(args[++i]); 207 } else if (cmd.equals("-families")) { 208 numFamilies = Integer.parseInt(args[++i]); 209 } else if (cmd.equals("-qualifiers")) { 210 numQualifiers = Integer.parseInt(args[++i]); 211 } else if (cmd.equals("-keySize")) { 212 keySize = Integer.parseInt(args[++i]); 213 } else if (cmd.equals("-valueSize")) { 214 valueSize = Integer.parseInt(args[++i]); 215 } else if (cmd.equals("-syncInterval")) { 216 syncInterval = Integer.parseInt(args[++i]); 217 } else if (cmd.equals("-nosync")) { 218 noSync = true; 219 } else if (cmd.equals("-verify")) { 220 verify = true; 221 } else if (cmd.equals("-verbose")) { 222 verbose = true; 223 } else if (cmd.equals("-nocleanup")) { 224 cleanup = false; 225 } else if (cmd.equals("-noclosefs")) { 226 noclosefs = true; 227 } else if (cmd.equals("-roll")) { 228 roll = Long.parseLong(args[++i]); 229 } else if (cmd.equals("-compress")) { 230 compress = true; 231 } else if (cmd.equals("-encryption")) { 232 cipher = args[++i]; 233 } else if (cmd.equals("-regions")) { 234 numRegions = Integer.parseInt(args[++i]); 235 } else if (cmd.equals("-traceFreq")) { 236 // keep it here for compatible 237 System.err.println("-traceFreq is not supported any more"); 238 } else if (cmd.equals("-h")) { 239 printUsageAndExit(); 240 } else if (cmd.equals("--help")) { 241 printUsageAndExit(); 242 } else { 243 System.err.println("UNEXPECTED: " + cmd); 244 printUsageAndExit(); 245 } 246 } catch (Exception e) { 247 printUsageAndExit(); 248 } 249 } 250 251 if (compress) { 252 Configuration conf = getConf(); 253 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); 254 } 255 256 if (cipher != null) { 257 // Set up WAL for encryption 258 Configuration conf = getConf(); 259 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); 260 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); 261 conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, 262 WAL.Reader.class); 263 conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, 264 Writer.class); 265 conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); 266 conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher); 267 } 268 269 if (numThreads < numRegions) { 270 LOG.warn("Number of threads is less than the number of regions; some regions will sit idle."); 271 } 272 273 // Internal config. goes off number of threads; if more threads than handlers, stuff breaks. 274 // In regionserver, number of handlers == number of threads. 275 getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads); 276 277 if (rootRegionDir == null) { 278 TEST_UTIL = new HBaseTestingUtility(getConf()); 279 rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation"); 280 } 281 // Run WAL Performance Evaluation 282 // First set the fs from configs. In case we are on hadoop1 283 CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf())); 284 FileSystem fs = FileSystem.get(getConf()); 285 LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir); 286 Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan(); 287 try (Scope scope = span.makeCurrent()) { 288 rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 289 cleanRegionRootDir(fs, rootRegionDir); 290 CommonFSUtils.setRootDir(getConf(), rootRegionDir); 291 final WALFactory wals = new WALFactory(getConf(), "wals"); 292 final HRegion[] regions = new HRegion[numRegions]; 293 final Runnable[] benchmarks = new Runnable[numRegions]; 294 final MockRegionServerServices mockServices = new MockRegionServerServices(getConf()); 295 final LogRoller roller = new LogRoller(mockServices); 296 Threads.setDaemonThreadRunning(roller, "WALPerfEval.logRoller"); 297 298 try { 299 for (int i = 0; i < numRegions; i++) { 300 // Initialize Table Descriptor 301 // a table per desired region means we can avoid carving up the key space 302 final TableDescriptor htd = createHTableDescriptor(i, numFamilies); 303 regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller); 304 benchmarks[i] = new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval); 305 } 306 ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).outputTo(System.out) 307 .convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build(); 308 reporter.start(30, TimeUnit.SECONDS); 309 310 long putTime = runBenchmark(benchmarks, numThreads); 311 logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations 312 + ", syncInterval=" + syncInterval, numIterations * numThreads, putTime); 313 314 for (int i = 0; i < numRegions; i++) { 315 if (regions[i] != null) { 316 closeRegion(regions[i]); 317 regions[i] = null; 318 } 319 } 320 if (verify) { 321 LOG.info("verifying written log entries."); 322 Path dir = new Path(CommonFSUtils.getRootDir(getConf()), 323 AbstractFSWALProvider.getWALDirectoryName("wals")); 324 long editCount = 0; 325 FileStatus[] fsss = fs.listStatus(dir); 326 if (fsss.length == 0) throw new IllegalStateException("No WAL found"); 327 for (FileStatus fss : fsss) { 328 Path p = fss.getPath(); 329 if (!fs.exists(p)) throw new IllegalStateException(p.toString()); 330 editCount += verify(wals, p, verbose); 331 } 332 long expected = numIterations * numThreads; 333 if (editCount != expected) { 334 throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected); 335 } 336 } 337 } finally { 338 mockServices.stop("test clean up."); 339 for (int i = 0; i < numRegions; i++) { 340 if (regions[i] != null) { 341 closeRegion(regions[i]); 342 } 343 } 344 if (null != roller) { 345 LOG.info("shutting down log roller."); 346 roller.close(); 347 } 348 wals.shutdown(); 349 // Remove the root dir for this test region 350 if (cleanup) cleanRegionRootDir(fs, rootRegionDir); 351 } 352 } finally { 353 span.end(); 354 // We may be called inside a test that wants to keep on using the fs. 355 if (!noclosefs) { 356 fs.close(); 357 } 358 } 359 360 return 0; 361 } 362 363 private static TableDescriptor createHTableDescriptor(final int regionNum, 364 final int numFamilies) { 365 TableDescriptorBuilder builder = 366 TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME + ":" + regionNum)); 367 IntStream.range(0, numFamilies) 368 .mapToObj(i -> ColumnFamilyDescriptorBuilder.of(FAMILY_PREFIX + i)) 369 .forEachOrdered(builder::setColumnFamily); 370 return builder.build(); 371 } 372 373 /** 374 * Verify the content of the WAL file. Verify that the file has expected number of edits. 375 * @param wals may not be null n * @return Count of edits. n 376 */ 377 private long verify(final WALFactory wals, final Path wal, final boolean verbose) 378 throws IOException { 379 WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal); 380 long count = 0; 381 Map<String, Long> sequenceIds = new HashMap<>(); 382 try { 383 while (true) { 384 WAL.Entry e = reader.next(); 385 if (e == null) { 386 LOG.debug("Read count=" + count + " from " + wal); 387 break; 388 } 389 count++; 390 long seqid = e.getKey().getSequenceId(); 391 if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) { 392 // sequenceIds should be increasing for every regions 393 if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) { 394 throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = " 395 + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) 396 + ", current seqid = " + seqid); 397 } 398 } 399 // update the sequence Id. 400 sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid); 401 if (verbose) LOG.info("seqid=" + seqid); 402 } 403 } finally { 404 reader.close(); 405 } 406 return count; 407 } 408 409 private static void logBenchmarkResult(String testName, long numTests, long totalTime) { 410 float tsec = totalTime / 1000.0f; 411 LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec)); 412 413 } 414 415 private void printUsageAndExit() { 416 System.err.printf("Usage: hbase %s [options]\n", getClass().getName()); 417 System.err.println(" where [options] are:"); 418 System.err.println(" -h|-help Show this help and exit."); 419 System.err.println(" -threads <N> Number of threads writing on the WAL."); 420 System.err.println(" -regions <N> Number of regions to open in the WAL. Default: 1"); 421 System.err.println(" -iterations <N> Number of iterations per thread."); 422 System.err.println(" -path <PATH> Path where region's root directory is created."); 423 System.err.println(" -families <N> Number of column families to write."); 424 System.err.println(" -qualifiers <N> Number of qualifiers to write."); 425 System.err.println(" -keySize <N> Row key size in byte."); 426 System.err.println(" -valueSize <N> Row/Col value size in byte."); 427 System.err.println(" -nocleanup Do NOT remove test data when done."); 428 System.err.println(" -noclosefs Do NOT close the filesystem when done."); 429 System.err.println(" -nosync Append without syncing"); 430 System.err.println( 431 " -syncInterval <N> Append N edits and then sync. " + "Default=0, i.e. sync every edit."); 432 System.err.println(" -verify Verify edits written in sequence"); 433 System.err 434 .println(" -verbose Output extra info; " + "e.g. all edit seq ids when verifying"); 435 System.err.println(" -roll <N> Roll the way every N appends"); 436 System.err.println(" -encryption <A> Encrypt the WAL with algorithm A, e.g. AES"); 437 System.err.println(" -traceFreq <N> Rate of trace sampling. Default: 1.0, " 438 + "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=..."); 439 System.err.println(""); 440 System.err.println("Examples:"); 441 System.err.println(""); 442 System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " 443 + "verification afterward do:"); 444 System.err.println(" $ hbase org.apache.hadoop.hbase.wal." + "WALPerformanceEvaluation \\"); 445 System.err.println(" -conf ./core-site.xml -path hdfs://example.org:7000/tmp " 446 + "-threads 100 -roll 10000 -verify"); 447 System.exit(1); 448 } 449 450 private final Set<WAL> walsListenedTo = new HashSet<>(); 451 452 private HRegion openRegion(final FileSystem fs, final Path dir, final TableDescriptor htd, 453 final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException { 454 // Initialize HRegion 455 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 456 // Initialize WAL 457 final WAL wal = wals.getWAL(regionInfo); 458 // If we haven't already, attach a listener to this wal to handle rolls and metrics. 459 if (walsListenedTo.add(wal)) { 460 roller.addWAL(wal); 461 wal.registerWALActionsListener(new WALActionsListener() { 462 private int appends = 0; 463 464 @Override 465 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 466 this.appends++; 467 if (this.appends % whenToRoll == 0) { 468 LOG.info("Rolling after " + appends + " edits"); 469 // We used to do explicit call to rollWriter but changed it to a request 470 // to avoid dead lock (there are less threads going on in this class than 471 // in the regionserver -- regionserver does not have the issue). 472 AbstractFSWALProvider.requestLogRoll(wal); 473 } 474 } 475 476 @Override 477 public void postSync(final long timeInNanos, final int handlerSyncs) { 478 syncMeter.mark(); 479 syncHistogram.update(timeInNanos); 480 syncCountHistogram.update(handlerSyncs); 481 } 482 483 @Override 484 public void postAppend(final long size, final long elapsedTime, final WALKey logkey, 485 final WALEdit logEdit) { 486 appendMeter.mark(size); 487 } 488 }); 489 } 490 491 return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal); 492 } 493 494 private void closeRegion(final HRegion region) throws IOException { 495 if (region != null) { 496 region.close(); 497 WAL wal = region.getWAL(); 498 if (wal != null) { 499 wal.shutdown(); 500 } 501 } 502 } 503 504 private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException { 505 if (fs.exists(dir)) { 506 fs.delete(dir, true); 507 } 508 } 509 510 private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) { 511 rand.nextBytes(key); 512 Put put = new Put(key); 513 for (int cf = 0; cf < numFamilies; ++cf) { 514 for (int q = 0; q < numQualifiers; ++q) { 515 rand.nextBytes(value); 516 put.addColumn(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), 517 value); 518 } 519 } 520 return put; 521 } 522 523 private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException { 524 Thread[] threads = new Thread[numThreads]; 525 long startTime = EnvironmentEdgeManager.currentTime(); 526 for (int i = 0; i < numThreads; ++i) { 527 threads[i] = 528 new Thread(runnable[i % runnable.length], "t" + i + ",r" + (i % runnable.length)); 529 threads[i].start(); 530 } 531 for (Thread t : threads) 532 t.join(); 533 long endTime = EnvironmentEdgeManager.currentTime(); 534 return (endTime - startTime); 535 } 536 537 /** 538 * The guts of the {@link #main} method. Call this method to avoid the {@link #main(String[])} 539 * System.exit. nnn 540 */ 541 static int innerMain(final Configuration c, final String[] args) throws Exception { 542 return ToolRunner.run(c, new WALPerformanceEvaluation(), args); 543 } 544 545 public static void main(String[] args) throws Exception { 546 System.exit(innerMain(HBaseConfiguration.create(), args)); 547 } 548}