001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static com.codahale.metrics.MetricRegistry.name;
021
022import com.codahale.metrics.ConsoleReporter;
023import com.codahale.metrics.Histogram;
024import com.codahale.metrics.Meter;
025import com.codahale.metrics.MetricFilter;
026import com.codahale.metrics.MetricRegistry;
027import io.opentelemetry.api.trace.Span;
028import io.opentelemetry.context.Scope;
029import java.io.IOException;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.Map;
033import java.util.NavigableMap;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.concurrent.ThreadLocalRandom;
038import java.util.concurrent.TimeUnit;
039import java.util.stream.IntStream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.conf.Configured;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.MockRegionServerServices;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
057import org.apache.hadoop.hbase.regionserver.HRegion;
058import org.apache.hadoop.hbase.regionserver.LogRoller;
059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
060import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
061import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
062import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
063import org.apache.hadoop.hbase.trace.TraceUtil;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.Threads;
068import org.apache.hadoop.hbase.wal.WALProvider.Writer;
069import org.apache.hadoop.util.Tool;
070import org.apache.hadoop.util.ToolRunner;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075// imports for things that haven't moved from regionserver.wal yet.
076
077/**
078 * This class runs performance benchmarks for {@link WAL}. See usage for this tool by running:
079 * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code>
080 */
081@InterfaceAudience.Private
082public final class WALPerformanceEvaluation extends Configured implements Tool {
083  private static final Logger LOG = LoggerFactory.getLogger(WALPerformanceEvaluation.class);
084
085  private final MetricRegistry metrics = new MetricRegistry();
086  private final Meter syncMeter =
087    metrics.meter(name(WALPerformanceEvaluation.class, "syncMeter", "syncs"));
088
089  private final Histogram syncHistogram =
090    metrics.histogram(name(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs"));
091  private final Histogram syncCountHistogram =
092    metrics.histogram(name(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync"));
093  private final Meter appendMeter =
094    metrics.meter(name(WALPerformanceEvaluation.class, "appendMeter", "bytes"));
095  private final Histogram latencyHistogram =
096    metrics.histogram(name(WALPerformanceEvaluation.class, "latencyHistogram", "nanos"));
097
098  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
099
100  private HBaseTestingUtility TEST_UTIL;
101
102  static final String TABLE_NAME = "WALPerformanceEvaluation";
103  static final String QUALIFIER_PREFIX = "q";
104  static final String FAMILY_PREFIX = "cf";
105
106  private int numQualifiers = 1;
107  private int valueSize = 512;
108  private int keySize = 16;
109
110  @Override
111  public void setConf(Configuration conf) {
112    super.setConf(conf);
113  }
114
115  /**
116   * Perform WAL.append() of Put object, for the number of iterations requested. Keys and Vaues are
117   * generated randomly, the number of column families, qualifiers and key/value size is tunable by
118   * the user.
119   */
120  class WALPutBenchmark implements Runnable {
121    private final long numIterations;
122    private final int numFamilies;
123    private final boolean noSync;
124    private final HRegion region;
125    private final int syncInterval;
126    private final NavigableMap<byte[], Integer> scopes;
127
128    WALPutBenchmark(final HRegion region, final TableDescriptor htd, final long numIterations,
129      final boolean noSync, final int syncInterval) {
130      this.numIterations = numIterations;
131      this.noSync = noSync;
132      this.syncInterval = syncInterval;
133      this.numFamilies = htd.getColumnFamilyCount();
134      this.region = region;
135      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
136      for (byte[] fam : htd.getColumnFamilyNames()) {
137        scopes.put(fam, 0);
138      }
139    }
140
141    @Override
142    public void run() {
143      byte[] key = new byte[keySize];
144      byte[] value = new byte[valueSize];
145      WAL wal = region.getWAL();
146      Span threadSpan = TraceUtil.getGlobalTracer()
147        .spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan();
148      try (Scope threadScope = threadSpan.makeCurrent()) {
149        int lastSync = 0;
150        for (int i = 0; i < numIterations; ++i) {
151          assert Span.current() == threadSpan : "Span leak detected.";
152          Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan();
153          try (Scope loopScope = loopSpan.makeCurrent()) {
154            long now = System.nanoTime();
155            Put put = setupPut(ThreadLocalRandom.current(), key, value, numFamilies);
156            WALEdit walEdit = new WALEdit();
157            walEdit.add(put.getFamilyCellMap());
158            RegionInfo hri = region.getRegionInfo();
159            final WALKeyImpl logkey =
160              new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
161            wal.appendData(hri, logkey, walEdit);
162            if (!this.noSync) {
163              if (++lastSync >= this.syncInterval) {
164                wal.sync();
165                lastSync = 0;
166              }
167            }
168            latencyHistogram.update(System.nanoTime() - now);
169          } finally {
170            loopSpan.end();
171          }
172        }
173      } catch (Exception e) {
174        LOG.error(getClass().getSimpleName() + " Thread failed", e);
175      } finally {
176        threadSpan.end();
177      }
178    }
179  }
180
181  @Override
182  public int run(String[] args) throws Exception {
183    Path rootRegionDir = null;
184    int numThreads = 1;
185    long numIterations = 1000000;
186    int numFamilies = 1;
187    int syncInterval = 0;
188    boolean noSync = false;
189    boolean verify = false;
190    boolean verbose = false;
191    boolean cleanup = true;
192    boolean noclosefs = false;
193    long roll = Long.MAX_VALUE;
194    boolean compress = false;
195    String cipher = null;
196    int numRegions = 1;
197    // Process command line args
198    for (int i = 0; i < args.length; i++) {
199      String cmd = args[i];
200      try {
201        if (cmd.equals("-threads")) {
202          numThreads = Integer.parseInt(args[++i]);
203        } else if (cmd.equals("-iterations")) {
204          numIterations = Long.parseLong(args[++i]);
205        } else if (cmd.equals("-path")) {
206          rootRegionDir = new Path(args[++i]);
207        } else if (cmd.equals("-families")) {
208          numFamilies = Integer.parseInt(args[++i]);
209        } else if (cmd.equals("-qualifiers")) {
210          numQualifiers = Integer.parseInt(args[++i]);
211        } else if (cmd.equals("-keySize")) {
212          keySize = Integer.parseInt(args[++i]);
213        } else if (cmd.equals("-valueSize")) {
214          valueSize = Integer.parseInt(args[++i]);
215        } else if (cmd.equals("-syncInterval")) {
216          syncInterval = Integer.parseInt(args[++i]);
217        } else if (cmd.equals("-nosync")) {
218          noSync = true;
219        } else if (cmd.equals("-verify")) {
220          verify = true;
221        } else if (cmd.equals("-verbose")) {
222          verbose = true;
223        } else if (cmd.equals("-nocleanup")) {
224          cleanup = false;
225        } else if (cmd.equals("-noclosefs")) {
226          noclosefs = true;
227        } else if (cmd.equals("-roll")) {
228          roll = Long.parseLong(args[++i]);
229        } else if (cmd.equals("-compress")) {
230          compress = true;
231        } else if (cmd.equals("-encryption")) {
232          cipher = args[++i];
233        } else if (cmd.equals("-regions")) {
234          numRegions = Integer.parseInt(args[++i]);
235        } else if (cmd.equals("-traceFreq")) {
236          // keep it here for compatible
237          System.err.println("-traceFreq is not supported any more");
238        } else if (cmd.equals("-h")) {
239          printUsageAndExit();
240        } else if (cmd.equals("--help")) {
241          printUsageAndExit();
242        } else {
243          System.err.println("UNEXPECTED: " + cmd);
244          printUsageAndExit();
245        }
246      } catch (Exception e) {
247        printUsageAndExit();
248      }
249    }
250
251    if (compress) {
252      Configuration conf = getConf();
253      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
254    }
255
256    if (cipher != null) {
257      // Set up WAL for encryption
258      Configuration conf = getConf();
259      conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
260      conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
261      conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
262        WAL.Reader.class);
263      conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
264        Writer.class);
265      conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
266      conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
267    }
268
269    if (numThreads < numRegions) {
270      LOG.warn("Number of threads is less than the number of regions; some regions will sit idle.");
271    }
272
273    // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
274    // In regionserver, number of handlers == number of threads.
275    getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
276
277    if (rootRegionDir == null) {
278      TEST_UTIL = new HBaseTestingUtility(getConf());
279      rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation");
280    }
281    // Run WAL Performance Evaluation
282    // First set the fs from configs. In case we are on hadoop1
283    CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf()));
284    FileSystem fs = FileSystem.get(getConf());
285    LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir);
286    Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan();
287    try (Scope scope = span.makeCurrent()) {
288      rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
289      cleanRegionRootDir(fs, rootRegionDir);
290      CommonFSUtils.setRootDir(getConf(), rootRegionDir);
291      final WALFactory wals = new WALFactory(getConf(), "wals");
292      final HRegion[] regions = new HRegion[numRegions];
293      final Runnable[] benchmarks = new Runnable[numRegions];
294      final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
295      final LogRoller roller = new LogRoller(mockServices);
296      Threads.setDaemonThreadRunning(roller, "WALPerfEval.logRoller");
297
298      try {
299        for (int i = 0; i < numRegions; i++) {
300          // Initialize Table Descriptor
301          // a table per desired region means we can avoid carving up the key space
302          final TableDescriptor htd = createHTableDescriptor(i, numFamilies);
303          regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
304          benchmarks[i] = new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval);
305        }
306        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).outputTo(System.out)
307          .convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
308        reporter.start(30, TimeUnit.SECONDS);
309
310        long putTime = runBenchmark(benchmarks, numThreads);
311        logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations
312          + ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
313
314        for (int i = 0; i < numRegions; i++) {
315          if (regions[i] != null) {
316            closeRegion(regions[i]);
317            regions[i] = null;
318          }
319        }
320        if (verify) {
321          LOG.info("verifying written log entries.");
322          Path dir = new Path(CommonFSUtils.getRootDir(getConf()),
323            AbstractFSWALProvider.getWALDirectoryName("wals"));
324          long editCount = 0;
325          FileStatus[] fsss = fs.listStatus(dir);
326          if (fsss.length == 0) throw new IllegalStateException("No WAL found");
327          for (FileStatus fss : fsss) {
328            Path p = fss.getPath();
329            if (!fs.exists(p)) throw new IllegalStateException(p.toString());
330            editCount += verify(wals, p, verbose);
331          }
332          long expected = numIterations * numThreads;
333          if (editCount != expected) {
334            throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
335          }
336        }
337      } finally {
338        mockServices.stop("test clean up.");
339        for (int i = 0; i < numRegions; i++) {
340          if (regions[i] != null) {
341            closeRegion(regions[i]);
342          }
343        }
344        if (null != roller) {
345          LOG.info("shutting down log roller.");
346          roller.close();
347        }
348        wals.shutdown();
349        // Remove the root dir for this test region
350        if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
351      }
352    } finally {
353      span.end();
354      // We may be called inside a test that wants to keep on using the fs.
355      if (!noclosefs) {
356        fs.close();
357      }
358    }
359
360    return 0;
361  }
362
363  private static TableDescriptor createHTableDescriptor(final int regionNum,
364    final int numFamilies) {
365    TableDescriptorBuilder builder =
366      TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME + ":" + regionNum));
367    IntStream.range(0, numFamilies)
368      .mapToObj(i -> ColumnFamilyDescriptorBuilder.of(FAMILY_PREFIX + i))
369      .forEachOrdered(builder::setColumnFamily);
370    return builder.build();
371  }
372
373  /**
374   * Verify the content of the WAL file. Verify that the file has expected number of edits.
375   * @param wals may not be null n * @return Count of edits. n
376   */
377  private long verify(final WALFactory wals, final Path wal, final boolean verbose)
378    throws IOException {
379    WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
380    long count = 0;
381    Map<String, Long> sequenceIds = new HashMap<>();
382    try {
383      while (true) {
384        WAL.Entry e = reader.next();
385        if (e == null) {
386          LOG.debug("Read count=" + count + " from " + wal);
387          break;
388        }
389        count++;
390        long seqid = e.getKey().getSequenceId();
391        if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
392          // sequenceIds should be increasing for every regions
393          if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
394            throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
395              + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
396              + ", current seqid = " + seqid);
397          }
398        }
399        // update the sequence Id.
400        sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
401        if (verbose) LOG.info("seqid=" + seqid);
402      }
403    } finally {
404      reader.close();
405    }
406    return count;
407  }
408
409  private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
410    float tsec = totalTime / 1000.0f;
411    LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
412
413  }
414
415  private void printUsageAndExit() {
416    System.err.printf("Usage: hbase %s [options]\n", getClass().getName());
417    System.err.println(" where [options] are:");
418    System.err.println("  -h|-help         Show this help and exit.");
419    System.err.println("  -threads <N>     Number of threads writing on the WAL.");
420    System.err.println("  -regions <N>     Number of regions to open in the WAL. Default: 1");
421    System.err.println("  -iterations <N>  Number of iterations per thread.");
422    System.err.println("  -path <PATH>     Path where region's root directory is created.");
423    System.err.println("  -families <N>    Number of column families to write.");
424    System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
425    System.err.println("  -keySize <N>     Row key size in byte.");
426    System.err.println("  -valueSize <N>   Row/Col value size in byte.");
427    System.err.println("  -nocleanup       Do NOT remove test data when done.");
428    System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
429    System.err.println("  -nosync          Append without syncing");
430    System.err.println(
431      "  -syncInterval <N> Append N edits and then sync. " + "Default=0, i.e. sync every edit.");
432    System.err.println("  -verify          Verify edits written in sequence");
433    System.err
434      .println("  -verbose         Output extra info; " + "e.g. all edit seq ids when verifying");
435    System.err.println("  -roll <N>        Roll the way every N appends");
436    System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
437    System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 1.0, "
438      + "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
439    System.err.println("");
440    System.err.println("Examples:");
441    System.err.println("");
442    System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and "
443      + "verification afterward do:");
444    System.err.println(" $ hbase org.apache.hadoop.hbase.wal." + "WALPerformanceEvaluation \\");
445    System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp "
446      + "-threads 100 -roll 10000 -verify");
447    System.exit(1);
448  }
449
450  private final Set<WAL> walsListenedTo = new HashSet<>();
451
452  private HRegion openRegion(final FileSystem fs, final Path dir, final TableDescriptor htd,
453    final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException {
454    // Initialize HRegion
455    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
456    // Initialize WAL
457    final WAL wal = wals.getWAL(regionInfo);
458    // If we haven't already, attach a listener to this wal to handle rolls and metrics.
459    if (walsListenedTo.add(wal)) {
460      roller.addWAL(wal);
461      wal.registerWALActionsListener(new WALActionsListener() {
462        private int appends = 0;
463
464        @Override
465        public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
466          this.appends++;
467          if (this.appends % whenToRoll == 0) {
468            LOG.info("Rolling after " + appends + " edits");
469            // We used to do explicit call to rollWriter but changed it to a request
470            // to avoid dead lock (there are less threads going on in this class than
471            // in the regionserver -- regionserver does not have the issue).
472            AbstractFSWALProvider.requestLogRoll(wal);
473          }
474        }
475
476        @Override
477        public void postSync(final long timeInNanos, final int handlerSyncs) {
478          syncMeter.mark();
479          syncHistogram.update(timeInNanos);
480          syncCountHistogram.update(handlerSyncs);
481        }
482
483        @Override
484        public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
485          final WALEdit logEdit) {
486          appendMeter.mark(size);
487        }
488      });
489    }
490
491    return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal);
492  }
493
494  private void closeRegion(final HRegion region) throws IOException {
495    if (region != null) {
496      region.close();
497      WAL wal = region.getWAL();
498      if (wal != null) {
499        wal.shutdown();
500      }
501    }
502  }
503
504  private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
505    if (fs.exists(dir)) {
506      fs.delete(dir, true);
507    }
508  }
509
510  private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
511    rand.nextBytes(key);
512    Put put = new Put(key);
513    for (int cf = 0; cf < numFamilies; ++cf) {
514      for (int q = 0; q < numQualifiers; ++q) {
515        rand.nextBytes(value);
516        put.addColumn(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q),
517          value);
518      }
519    }
520    return put;
521  }
522
523  private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
524    Thread[] threads = new Thread[numThreads];
525    long startTime = EnvironmentEdgeManager.currentTime();
526    for (int i = 0; i < numThreads; ++i) {
527      threads[i] =
528        new Thread(runnable[i % runnable.length], "t" + i + ",r" + (i % runnable.length));
529      threads[i].start();
530    }
531    for (Thread t : threads)
532      t.join();
533    long endTime = EnvironmentEdgeManager.currentTime();
534    return (endTime - startTime);
535  }
536
537  /**
538   * The guts of the {@link #main} method. Call this method to avoid the {@link #main(String[])}
539   * System.exit. nnn
540   */
541  static int innerMain(final Configuration c, final String[] args) throws Exception {
542    return ToolRunner.run(c, new WALPerformanceEvaluation(), args);
543  }
544
545  public static void main(String[] args) throws Exception {
546    System.exit(innerMain(HBaseConfiguration.create(), args));
547  }
548}