001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static com.codahale.metrics.MetricRegistry.name;
021
022import com.codahale.metrics.ConsoleReporter;
023import com.codahale.metrics.Histogram;
024import com.codahale.metrics.Meter;
025import com.codahale.metrics.MetricFilter;
026import com.codahale.metrics.MetricRegistry;
027import io.opentelemetry.api.trace.Span;
028import io.opentelemetry.context.Scope;
029import java.io.IOException;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.Map;
033import java.util.NavigableMap;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.concurrent.ThreadLocalRandom;
038import java.util.concurrent.TimeUnit;
039import java.util.stream.IntStream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.conf.Configured;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HBaseTestingUtil;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.MockRegionServerServices;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
057import org.apache.hadoop.hbase.regionserver.HRegion;
058import org.apache.hadoop.hbase.regionserver.LogRoller;
059import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
060import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
061import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
062import org.apache.hadoop.hbase.trace.TraceUtil;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.CommonFSUtils;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.util.Tool;
068import org.apache.hadoop.util.ToolRunner;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073// imports for things that haven't moved from regionserver.wal yet.
074
075/**
076 * This class runs performance benchmarks for {@link WAL}. See usage for this tool by running:
077 * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code>
078 */
079@InterfaceAudience.Private
080public final class WALPerformanceEvaluation extends Configured implements Tool {
081  private static final Logger LOG = LoggerFactory.getLogger(WALPerformanceEvaluation.class);
082
083  private final MetricRegistry metrics = new MetricRegistry();
084  private final Meter syncMeter =
085    metrics.meter(name(WALPerformanceEvaluation.class, "syncMeter", "syncs"));
086
087  private final Histogram syncHistogram =
088    metrics.histogram(name(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs"));
089  private final Histogram syncCountHistogram =
090    metrics.histogram(name(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync"));
091  private final Meter appendMeter =
092    metrics.meter(name(WALPerformanceEvaluation.class, "appendMeter", "bytes"));
093  private final Histogram latencyHistogram =
094    metrics.histogram(name(WALPerformanceEvaluation.class, "latencyHistogram", "nanos"));
095
096  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
097
098  private HBaseTestingUtil TEST_UTIL;
099
100  static final String TABLE_NAME = "WALPerformanceEvaluation";
101  static final String QUALIFIER_PREFIX = "q";
102  static final String FAMILY_PREFIX = "cf";
103
104  private int numQualifiers = 1;
105  private int valueSize = 512;
106  private int keySize = 16;
107
108  @Override
109  public void setConf(Configuration conf) {
110    super.setConf(conf);
111  }
112
113  /**
114   * Perform WAL.append() of Put object, for the number of iterations requested. Keys and Vaues are
115   * generated randomly, the number of column families, qualifiers and key/value size is tunable by
116   * the user.
117   */
118  class WALPutBenchmark implements Runnable {
119    private final long numIterations;
120    private final int numFamilies;
121    private final boolean noSync;
122    private final HRegion region;
123    private final int syncInterval;
124    private final NavigableMap<byte[], Integer> scopes;
125
126    WALPutBenchmark(final HRegion region, final TableDescriptor htd, final long numIterations,
127      final boolean noSync, final int syncInterval) {
128      this.numIterations = numIterations;
129      this.noSync = noSync;
130      this.syncInterval = syncInterval;
131      this.numFamilies = htd.getColumnFamilyCount();
132      this.region = region;
133      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
134      for (byte[] fam : htd.getColumnFamilyNames()) {
135        scopes.put(fam, 0);
136      }
137    }
138
139    @Override
140    public void run() {
141      byte[] key = new byte[keySize];
142      byte[] value = new byte[valueSize];
143      WAL wal = region.getWAL();
144      Span threadSpan = TraceUtil.getGlobalTracer()
145        .spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan();
146      try (Scope threadScope = threadSpan.makeCurrent()) {
147        int lastSync = 0;
148        for (int i = 0; i < numIterations; ++i) {
149          assert Span.current() == threadSpan : "Span leak detected.";
150          Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan();
151          try (Scope loopScope = loopSpan.makeCurrent()) {
152            long now = System.nanoTime();
153            Put put = setupPut(ThreadLocalRandom.current(), key, value, numFamilies);
154            WALEdit walEdit = new WALEdit();
155            walEdit.add(put.getFamilyCellMap());
156            RegionInfo hri = region.getRegionInfo();
157            final WALKeyImpl logkey =
158              new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
159            wal.appendData(hri, logkey, walEdit);
160            if (!this.noSync) {
161              if (++lastSync >= this.syncInterval) {
162                wal.sync();
163                lastSync = 0;
164              }
165            }
166            latencyHistogram.update(System.nanoTime() - now);
167          } finally {
168            loopSpan.end();
169          }
170        }
171      } catch (Exception e) {
172        LOG.error(getClass().getSimpleName() + " Thread failed", e);
173      } finally {
174        threadSpan.end();
175      }
176    }
177  }
178
179  @Override
180  public int run(String[] args) throws Exception {
181    Path rootRegionDir = null;
182    int numThreads = 1;
183    long numIterations = 1000000;
184    int numFamilies = 1;
185    int syncInterval = 0;
186    boolean noSync = false;
187    boolean verify = false;
188    boolean verbose = false;
189    boolean cleanup = true;
190    boolean noclosefs = false;
191    long roll = Long.MAX_VALUE;
192    boolean compress = false;
193    String cipher = null;
194    int numRegions = 1;
195    // Process command line args
196    for (int i = 0; i < args.length; i++) {
197      String cmd = args[i];
198      try {
199        if (cmd.equals("-threads")) {
200          numThreads = Integer.parseInt(args[++i]);
201        } else if (cmd.equals("-iterations")) {
202          numIterations = Long.parseLong(args[++i]);
203        } else if (cmd.equals("-path")) {
204          rootRegionDir = new Path(args[++i]);
205        } else if (cmd.equals("-families")) {
206          numFamilies = Integer.parseInt(args[++i]);
207        } else if (cmd.equals("-qualifiers")) {
208          numQualifiers = Integer.parseInt(args[++i]);
209        } else if (cmd.equals("-keySize")) {
210          keySize = Integer.parseInt(args[++i]);
211        } else if (cmd.equals("-valueSize")) {
212          valueSize = Integer.parseInt(args[++i]);
213        } else if (cmd.equals("-syncInterval")) {
214          syncInterval = Integer.parseInt(args[++i]);
215        } else if (cmd.equals("-nosync")) {
216          noSync = true;
217        } else if (cmd.equals("-verify")) {
218          verify = true;
219        } else if (cmd.equals("-verbose")) {
220          verbose = true;
221        } else if (cmd.equals("-nocleanup")) {
222          cleanup = false;
223        } else if (cmd.equals("-noclosefs")) {
224          noclosefs = true;
225        } else if (cmd.equals("-roll")) {
226          roll = Long.parseLong(args[++i]);
227        } else if (cmd.equals("-compress")) {
228          compress = true;
229        } else if (cmd.equals("-encryption")) {
230          cipher = args[++i];
231        } else if (cmd.equals("-regions")) {
232          numRegions = Integer.parseInt(args[++i]);
233        } else if (cmd.equals("-traceFreq")) {
234          // keep it here for compatible
235          System.err.println("-traceFreq is not supported any more");
236        } else if (cmd.equals("-h")) {
237          printUsageAndExit();
238        } else if (cmd.equals("--help")) {
239          printUsageAndExit();
240        } else {
241          System.err.println("UNEXPECTED: " + cmd);
242          printUsageAndExit();
243        }
244      } catch (Exception e) {
245        printUsageAndExit();
246      }
247    }
248
249    if (compress) {
250      Configuration conf = getConf();
251      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
252    }
253
254    if (cipher != null) {
255      // Set up WAL for encryption
256      Configuration conf = getConf();
257      conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
258      conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
259      conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
260      conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
261    }
262
263    if (numThreads < numRegions) {
264      LOG.warn("Number of threads is less than the number of regions; some regions will sit idle.");
265    }
266
267    // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
268    // In regionserver, number of handlers == number of threads.
269    getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
270    // We do not need memstore here, so disable memstore lab, otherwise we need to initialize
271    // ChunkCreator
272    getConf().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
273
274    if (rootRegionDir == null) {
275      TEST_UTIL = new HBaseTestingUtil(getConf());
276      rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation");
277    }
278    // Run WAL Performance Evaluation
279    // First set the fs from configs. In case we are on hadoop1
280    CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf()));
281    FileSystem fs = FileSystem.get(getConf());
282    LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir);
283    Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan();
284    try (Scope scope = span.makeCurrent()) {
285      rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
286      cleanRegionRootDir(fs, rootRegionDir);
287      CommonFSUtils.setRootDir(getConf(), rootRegionDir);
288      final WALFactory wals = new WALFactory(getConf(), "wals");
289      final HRegion[] regions = new HRegion[numRegions];
290      final Runnable[] benchmarks = new Runnable[numRegions];
291      final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
292      final LogRoller roller = new LogRoller(mockServices);
293      Threads.setDaemonThreadRunning(roller, "WALPerfEval.logRoller");
294
295      try {
296        for (int i = 0; i < numRegions; i++) {
297          // Initialize Table Descriptor
298          // a table per desired region means we can avoid carving up the key space
299          final TableDescriptor htd = createHTableDescriptor(i, numFamilies);
300          regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
301          benchmarks[i] = new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval);
302        }
303        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).outputTo(System.out)
304          .convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
305        reporter.start(30, TimeUnit.SECONDS);
306
307        long putTime = runBenchmark(benchmarks, numThreads);
308        logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations
309          + ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
310
311        for (int i = 0; i < numRegions; i++) {
312          if (regions[i] != null) {
313            closeRegion(regions[i]);
314            regions[i] = null;
315          }
316        }
317        if (verify) {
318          LOG.info("verifying written log entries.");
319          Path dir = new Path(CommonFSUtils.getRootDir(getConf()),
320            AbstractFSWALProvider.getWALDirectoryName("wals"));
321          long editCount = 0;
322          FileStatus[] fsss = fs.listStatus(dir);
323          if (fsss.length == 0) throw new IllegalStateException("No WAL found");
324          for (FileStatus fss : fsss) {
325            Path p = fss.getPath();
326            if (!fs.exists(p)) throw new IllegalStateException(p.toString());
327            editCount += verify(wals, p, verbose);
328          }
329          long expected = numIterations * numThreads;
330          if (editCount != expected) {
331            throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
332          }
333        }
334      } finally {
335        mockServices.stop("test clean up.");
336        for (int i = 0; i < numRegions; i++) {
337          if (regions[i] != null) {
338            closeRegion(regions[i]);
339          }
340        }
341        if (null != roller) {
342          LOG.info("shutting down log roller.");
343          roller.close();
344        }
345        wals.shutdown();
346        // Remove the root dir for this test region
347        if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
348      }
349    } finally {
350      span.end();
351      // We may be called inside a test that wants to keep on using the fs.
352      if (!noclosefs) {
353        fs.close();
354      }
355    }
356
357    return 0;
358  }
359
360  private static TableDescriptor createHTableDescriptor(final int regionNum,
361    final int numFamilies) {
362    TableDescriptorBuilder builder =
363      TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME + ":" + regionNum));
364    IntStream.range(0, numFamilies)
365      .mapToObj(i -> ColumnFamilyDescriptorBuilder.of(FAMILY_PREFIX + i))
366      .forEachOrdered(builder::setColumnFamily);
367    return builder.build();
368  }
369
370  /**
371   * Verify the content of the WAL file. Verify that the file has expected number of edits.
372   * @param wals may not be null
373   * @return Count of edits.
374   */
375  private long verify(final WALFactory wals, final Path wal, final boolean verbose)
376    throws IOException {
377    WALStreamReader reader = wals.createStreamReader(wal.getFileSystem(getConf()), wal);
378    long count = 0;
379    Map<String, Long> sequenceIds = new HashMap<>();
380    try {
381      while (true) {
382        WAL.Entry e = reader.next();
383        if (e == null) {
384          LOG.debug("Read count=" + count + " from " + wal);
385          break;
386        }
387        count++;
388        long seqid = e.getKey().getSequenceId();
389        if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
390          // sequenceIds should be increasing for every regions
391          if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
392            throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
393              + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
394              + ", current seqid = " + seqid);
395          }
396        }
397        // update the sequence Id.
398        sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
399        if (verbose) LOG.info("seqid=" + seqid);
400      }
401    } finally {
402      reader.close();
403    }
404    return count;
405  }
406
407  private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
408    float tsec = totalTime / 1000.0f;
409    LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
410
411  }
412
413  private void printUsageAndExit() {
414    System.err.printf("Usage: hbase %s [options]\n", getClass().getName());
415    System.err.println(" where [options] are:");
416    System.err.println("  -h|-help         Show this help and exit.");
417    System.err.println("  -threads <N>     Number of threads writing on the WAL.");
418    System.err.println("  -regions <N>     Number of regions to open in the WAL. Default: 1");
419    System.err.println("  -iterations <N>  Number of iterations per thread.");
420    System.err.println("  -path <PATH>     Path where region's root directory is created.");
421    System.err.println("  -families <N>    Number of column families to write.");
422    System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
423    System.err.println("  -keySize <N>     Row key size in byte.");
424    System.err.println("  -valueSize <N>   Row/Col value size in byte.");
425    System.err.println("  -nocleanup       Do NOT remove test data when done.");
426    System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
427    System.err.println("  -nosync          Append without syncing");
428    System.err.println(
429      "  -syncInterval <N> Append N edits and then sync. " + "Default=0, i.e. sync every edit.");
430    System.err.println("  -verify          Verify edits written in sequence");
431    System.err
432      .println("  -verbose         Output extra info; " + "e.g. all edit seq ids when verifying");
433    System.err.println("  -roll <N>        Roll the way every N appends");
434    System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
435    System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 1.0, "
436      + "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
437    System.err.println("");
438    System.err.println("Examples:");
439    System.err.println("");
440    System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and "
441      + "verification afterward do:");
442    System.err.println(" $ hbase org.apache.hadoop.hbase.wal." + "WALPerformanceEvaluation \\");
443    System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp "
444      + "-threads 100 -roll 10000 -verify");
445    System.exit(1);
446  }
447
448  private final Set<WAL> walsListenedTo = new HashSet<>();
449
450  private HRegion openRegion(final FileSystem fs, final Path dir, final TableDescriptor htd,
451    final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException {
452    // Initialize HRegion
453    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
454    // Initialize WAL
455    final WAL wal = wals.getWAL(regionInfo);
456    // If we haven't already, attach a listener to this wal to handle rolls and metrics.
457    if (walsListenedTo.add(wal)) {
458      roller.addWAL(wal);
459      wal.registerWALActionsListener(new WALActionsListener() {
460        private int appends = 0;
461
462        @Override
463        public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
464          this.appends++;
465          if (this.appends % whenToRoll == 0) {
466            LOG.info("Rolling after " + appends + " edits");
467            // We used to do explicit call to rollWriter but changed it to a request
468            // to avoid dead lock (there are less threads going on in this class than
469            // in the regionserver -- regionserver does not have the issue).
470            AbstractFSWALProvider.requestLogRoll(wal);
471          }
472        }
473
474        @Override
475        public void postSync(final long timeInNanos, final int handlerSyncs) {
476          syncMeter.mark();
477          syncHistogram.update(timeInNanos);
478          syncCountHistogram.update(handlerSyncs);
479        }
480
481        @Override
482        public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
483          final WALEdit logEdit) {
484          appendMeter.mark(size);
485        }
486      });
487    }
488
489    return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal);
490  }
491
492  private void closeRegion(final HRegion region) throws IOException {
493    if (region != null) {
494      region.close();
495      WAL wal = region.getWAL();
496      if (wal != null) {
497        wal.shutdown();
498      }
499    }
500  }
501
502  private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
503    if (fs.exists(dir)) {
504      fs.delete(dir, true);
505    }
506  }
507
508  private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
509    rand.nextBytes(key);
510    Put put = new Put(key);
511    for (int cf = 0; cf < numFamilies; ++cf) {
512      for (int q = 0; q < numQualifiers; ++q) {
513        rand.nextBytes(value);
514        put.addColumn(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q),
515          value);
516      }
517    }
518    return put;
519  }
520
521  private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
522    Thread[] threads = new Thread[numThreads];
523    long startTime = EnvironmentEdgeManager.currentTime();
524    for (int i = 0; i < numThreads; ++i) {
525      threads[i] =
526        new Thread(runnable[i % runnable.length], "t" + i + ",r" + (i % runnable.length));
527      threads[i].start();
528    }
529    for (Thread t : threads)
530      t.join();
531    long endTime = EnvironmentEdgeManager.currentTime();
532    return (endTime - startTime);
533  }
534
535  /**
536   * The guts of the {@link #main} method. Call this method to avoid the {@link #main(String[])}
537   * System.exit.
538   */
539  static int innerMain(final Configuration c, final String[] args) throws Exception {
540    return ToolRunner.run(c, new WALPerformanceEvaluation(), args);
541  }
542
543  public static void main(String[] args) throws Exception {
544    System.exit(innerMain(HBaseConfiguration.create(), args));
545  }
546}