001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.wal;
020
021import static com.codahale.metrics.MetricRegistry.name;
022
023import com.codahale.metrics.ConsoleReporter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.Meter;
026import com.codahale.metrics.MetricFilter;
027import com.codahale.metrics.MetricRegistry;
028import io.opentelemetry.api.trace.Span;
029import io.opentelemetry.context.Scope;
030import java.io.IOException;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Map;
034import java.util.NavigableMap;
035import java.util.Random;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.concurrent.TimeUnit;
039import java.util.stream.IntStream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.conf.Configured;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HBaseTestingUtil;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.MockRegionServerServices;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
057import org.apache.hadoop.hbase.regionserver.HRegion;
058import org.apache.hadoop.hbase.regionserver.LogRoller;
059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
060import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
061import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
062import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
063import org.apache.hadoop.hbase.trace.TraceUtil;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.Threads;
068import org.apache.hadoop.hbase.wal.WALProvider.Writer;
069import org.apache.hadoop.util.Tool;
070import org.apache.hadoop.util.ToolRunner;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075// imports for things that haven't moved from regionserver.wal yet.
076
077/**
078 * This class runs performance benchmarks for {@link WAL}.
079 * See usage for this tool by running:
080 * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code>
081 */
082@InterfaceAudience.Private
083public final class WALPerformanceEvaluation extends Configured implements Tool {
084  private static final Logger LOG =
085      LoggerFactory.getLogger(WALPerformanceEvaluation.class);
086
087  private final MetricRegistry metrics = new MetricRegistry();
088  private final Meter syncMeter =
089    metrics.meter(name(WALPerformanceEvaluation.class, "syncMeter", "syncs"));
090
091  private final Histogram syncHistogram = metrics.histogram(
092    name(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs"));
093  private final Histogram syncCountHistogram = metrics.histogram(
094    name(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync"));
095  private final Meter appendMeter = metrics.meter(
096    name(WALPerformanceEvaluation.class, "appendMeter", "bytes"));
097  private final Histogram latencyHistogram =
098    metrics.histogram(name(WALPerformanceEvaluation.class, "latencyHistogram", "nanos"));
099
100  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
101
102  private HBaseTestingUtil TEST_UTIL;
103
104  static final String TABLE_NAME = "WALPerformanceEvaluation";
105  static final String QUALIFIER_PREFIX = "q";
106  static final String FAMILY_PREFIX = "cf";
107
108  private int numQualifiers = 1;
109  private int valueSize = 512;
110  private int keySize = 16;
111
112  @Override
113  public void setConf(Configuration conf) {
114    super.setConf(conf);
115  }
116
117  /**
118   * Perform WAL.append() of Put object, for the number of iterations requested.
119   * Keys and Vaues are generated randomly, the number of column families,
120   * qualifiers and key/value size is tunable by the user.
121   */
122  class WALPutBenchmark implements Runnable {
123    private final long numIterations;
124    private final int numFamilies;
125    private final boolean noSync;
126    private final HRegion region;
127    private final int syncInterval;
128    private final NavigableMap<byte[], Integer> scopes;
129
130    WALPutBenchmark(final HRegion region, final TableDescriptor htd,
131        final long numIterations, final boolean noSync, final int syncInterval) {
132      this.numIterations = numIterations;
133      this.noSync = noSync;
134      this.syncInterval = syncInterval;
135      this.numFamilies = htd.getColumnFamilyCount();
136      this.region = region;
137      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
138      for(byte[] fam : htd.getColumnFamilyNames()) {
139        scopes.put(fam, 0);
140      }
141    }
142
143    @Override
144    public void run() {
145      byte[] key = new byte[keySize];
146      byte[] value = new byte[valueSize];
147      Random rand = new Random(Thread.currentThread().getId());
148      WAL wal = region.getWAL();
149      Span threadSpan = TraceUtil.getGlobalTracer()
150        .spanBuilder("WALPerfEval." + Thread.currentThread().getName()).startSpan();
151      try (Scope threadScope = threadSpan.makeCurrent()) {
152        int lastSync = 0;
153        for (int i = 0; i < numIterations; ++i) {
154          assert Span.current() == threadSpan : "Span leak detected.";
155          Span loopSpan = TraceUtil.getGlobalTracer().spanBuilder("runLoopIter" + i).startSpan();
156          try (Scope loopScope = loopSpan.makeCurrent()) {
157            long now = System.nanoTime();
158            Put put = setupPut(rand, key, value, numFamilies);
159            WALEdit walEdit = new WALEdit();
160            walEdit.add(put.getFamilyCellMap());
161            RegionInfo hri = region.getRegionInfo();
162            final WALKeyImpl logkey =
163                new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
164            wal.appendData(hri, logkey, walEdit);
165            if (!this.noSync) {
166              if (++lastSync >= this.syncInterval) {
167                wal.sync();
168                lastSync = 0;
169              }
170            }
171            latencyHistogram.update(System.nanoTime() - now);
172          } finally {
173            loopSpan.end();
174          }
175        }
176      } catch (Exception e) {
177        LOG.error(getClass().getSimpleName() + " Thread failed", e);
178      } finally {
179        threadSpan.end();
180      }
181    }
182  }
183
184  @Override
185  public int run(String[] args) throws Exception {
186    Path rootRegionDir = null;
187    int numThreads = 1;
188    long numIterations = 1000000;
189    int numFamilies = 1;
190    int syncInterval = 0;
191    boolean noSync = false;
192    boolean verify = false;
193    boolean verbose = false;
194    boolean cleanup = true;
195    boolean noclosefs = false;
196    long roll = Long.MAX_VALUE;
197    boolean compress = false;
198    String cipher = null;
199    int numRegions = 1;
200    // Process command line args
201    for (int i = 0; i < args.length; i++) {
202      String cmd = args[i];
203      try {
204        if (cmd.equals("-threads")) {
205          numThreads = Integer.parseInt(args[++i]);
206        } else if (cmd.equals("-iterations")) {
207          numIterations = Long.parseLong(args[++i]);
208        } else if (cmd.equals("-path")) {
209          rootRegionDir = new Path(args[++i]);
210        } else if (cmd.equals("-families")) {
211          numFamilies = Integer.parseInt(args[++i]);
212        } else if (cmd.equals("-qualifiers")) {
213          numQualifiers = Integer.parseInt(args[++i]);
214        } else if (cmd.equals("-keySize")) {
215          keySize = Integer.parseInt(args[++i]);
216        } else if (cmd.equals("-valueSize")) {
217          valueSize = Integer.parseInt(args[++i]);
218        } else if (cmd.equals("-syncInterval")) {
219          syncInterval = Integer.parseInt(args[++i]);
220        } else if (cmd.equals("-nosync")) {
221          noSync = true;
222        } else if (cmd.equals("-verify")) {
223          verify = true;
224        } else if (cmd.equals("-verbose")) {
225          verbose = true;
226        } else if (cmd.equals("-nocleanup")) {
227          cleanup = false;
228        } else if (cmd.equals("-noclosefs")) {
229          noclosefs = true;
230        } else if (cmd.equals("-roll")) {
231          roll = Long.parseLong(args[++i]);
232        } else if (cmd.equals("-compress")) {
233          compress = true;
234        } else if (cmd.equals("-encryption")) {
235          cipher = args[++i];
236        } else if (cmd.equals("-regions")) {
237          numRegions = Integer.parseInt(args[++i]);
238        } else if (cmd.equals("-traceFreq")) {
239          // keep it here for compatible
240          System.err.println("-traceFreq is not supported any more");
241        } else if (cmd.equals("-h")) {
242          printUsageAndExit();
243        } else if (cmd.equals("--help")) {
244          printUsageAndExit();
245        } else {
246          System.err.println("UNEXPECTED: " + cmd);
247          printUsageAndExit();
248        }
249      } catch (Exception e) {
250        printUsageAndExit();
251      }
252    }
253
254    if (compress) {
255      Configuration conf = getConf();
256      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
257    }
258
259    if (cipher != null) {
260      // Set up WAL for encryption
261      Configuration conf = getConf();
262      conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
263      conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
264      conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
265        WAL.Reader.class);
266      conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
267        Writer.class);
268      conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
269      conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
270    }
271
272    if (numThreads < numRegions) {
273      LOG.warn("Number of threads is less than the number of regions; some regions will sit idle.");
274    }
275
276    // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
277    // In regionserver, number of handlers == number of threads.
278    getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
279
280    if (rootRegionDir == null) {
281      TEST_UTIL = new HBaseTestingUtil(getConf());
282      rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation");
283    }
284    // Run WAL Performance Evaluation
285    // First set the fs from configs.  In case we are on hadoop1
286    CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf()));
287    FileSystem fs = FileSystem.get(getConf());
288    LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir);
289    Span span = TraceUtil.getGlobalTracer().spanBuilder("WALPerfEval").startSpan();
290    try (Scope scope = span.makeCurrent()){
291      rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
292      cleanRegionRootDir(fs, rootRegionDir);
293      CommonFSUtils.setRootDir(getConf(), rootRegionDir);
294      final WALFactory wals = new WALFactory(getConf(), "wals");
295      final HRegion[] regions = new HRegion[numRegions];
296      final Runnable[] benchmarks = new Runnable[numRegions];
297      final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
298      final LogRoller roller = new LogRoller(mockServices);
299      Threads.setDaemonThreadRunning(roller, "WALPerfEval.logRoller");
300
301      try {
302        for(int i = 0; i < numRegions; i++) {
303          // Initialize Table Descriptor
304          // a table per desired region means we can avoid carving up the key space
305          final TableDescriptor htd = createHTableDescriptor(i, numFamilies);
306          regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
307          benchmarks[i] =
308            new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval);
309        }
310        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).
311          outputTo(System.out).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
312        reporter.start(30, TimeUnit.SECONDS);
313
314        long putTime = runBenchmark(benchmarks, numThreads);
315        logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
316          ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
317
318        for (int i = 0; i < numRegions; i++) {
319          if (regions[i] != null) {
320            closeRegion(regions[i]);
321            regions[i] = null;
322          }
323        }
324        if (verify) {
325          LOG.info("verifying written log entries.");
326          Path dir = new Path(CommonFSUtils.getRootDir(getConf()),
327            AbstractFSWALProvider.getWALDirectoryName("wals"));
328          long editCount = 0;
329          FileStatus [] fsss = fs.listStatus(dir);
330          if (fsss.length == 0) throw new IllegalStateException("No WAL found");
331          for (FileStatus fss: fsss) {
332            Path p = fss.getPath();
333            if (!fs.exists(p)) throw new IllegalStateException(p.toString());
334            editCount += verify(wals, p, verbose);
335          }
336          long expected = numIterations * numThreads;
337          if (editCount != expected) {
338            throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
339          }
340        }
341      } finally {
342        mockServices.stop("test clean up.");
343        for (int i = 0; i < numRegions; i++) {
344          if (regions[i] != null) {
345            closeRegion(regions[i]);
346          }
347        }
348        if (null != roller) {
349          LOG.info("shutting down log roller.");
350          roller.close();
351        }
352        wals.shutdown();
353        // Remove the root dir for this test region
354        if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
355      }
356    } finally {
357      span.end();
358      // We may be called inside a test that wants to keep on using the fs.
359      if (!noclosefs) {
360        fs.close();
361      }
362    }
363
364    return 0;
365  }
366
367  private static TableDescriptor createHTableDescriptor(final int regionNum,
368      final int numFamilies) {
369    TableDescriptorBuilder builder =
370        TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME + ":" + regionNum));
371    IntStream.range(0, numFamilies)
372        .mapToObj(i -> ColumnFamilyDescriptorBuilder.of(FAMILY_PREFIX + i))
373        .forEachOrdered(builder::setColumnFamily);
374    return builder.build();
375  }
376
377  /**
378   * Verify the content of the WAL file.
379   * Verify that the file has expected number of edits.
380   * @param wals may not be null
381   * @param wal
382   * @return Count of edits.
383   * @throws IOException
384   */
385  private long verify(final WALFactory wals, final Path wal, final boolean verbose)
386      throws IOException {
387    WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
388    long count = 0;
389    Map<String, Long> sequenceIds = new HashMap<>();
390    try {
391      while (true) {
392        WAL.Entry e = reader.next();
393        if (e == null) {
394          LOG.debug("Read count=" + count + " from " + wal);
395          break;
396        }
397        count++;
398        long seqid = e.getKey().getSequenceId();
399        if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
400          // sequenceIds should be increasing for every regions
401          if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
402            throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
403                + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
404                + ", current seqid = " + seqid);
405          }
406        }
407        // update the sequence Id.
408        sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
409        if (verbose) LOG.info("seqid=" + seqid);
410      }
411    } finally {
412      reader.close();
413    }
414    return count;
415  }
416
417  private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
418    float tsec = totalTime / 1000.0f;
419    LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
420
421  }
422
423  private void printUsageAndExit() {
424    System.err.printf("Usage: hbase %s [options]\n", getClass().getName());
425    System.err.println(" where [options] are:");
426    System.err.println("  -h|-help         Show this help and exit.");
427    System.err.println("  -threads <N>     Number of threads writing on the WAL.");
428    System.err.println("  -regions <N>     Number of regions to open in the WAL. Default: 1");
429    System.err.println("  -iterations <N>  Number of iterations per thread.");
430    System.err.println("  -path <PATH>     Path where region's root directory is created.");
431    System.err.println("  -families <N>    Number of column families to write.");
432    System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
433    System.err.println("  -keySize <N>     Row key size in byte.");
434    System.err.println("  -valueSize <N>   Row/Col value size in byte.");
435    System.err.println("  -nocleanup       Do NOT remove test data when done.");
436    System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
437    System.err.println("  -nosync          Append without syncing");
438    System.err.println("  -syncInterval <N> Append N edits and then sync. " +
439      "Default=0, i.e. sync every edit.");
440    System.err.println("  -verify          Verify edits written in sequence");
441    System.err.println("  -verbose         Output extra info; " +
442      "e.g. all edit seq ids when verifying");
443    System.err.println("  -roll <N>        Roll the way every N appends");
444    System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
445    System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 1.0, " +
446      "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
447    System.err.println("");
448    System.err.println("Examples:");
449    System.err.println("");
450    System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
451      "verification afterward do:");
452    System.err.println(" $ hbase org.apache.hadoop.hbase.wal." +
453      "WALPerformanceEvaluation \\");
454    System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
455      "-threads 100 -roll 10000 -verify");
456    System.exit(1);
457  }
458
459  private final Set<WAL> walsListenedTo = new HashSet<>();
460
461  private HRegion openRegion(final FileSystem fs, final Path dir, final TableDescriptor htd,
462      final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException {
463    // Initialize HRegion
464    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
465    // Initialize WAL
466    final WAL wal = wals.getWAL(regionInfo);
467    // If we haven't already, attach a listener to this wal to handle rolls and metrics.
468    if (walsListenedTo.add(wal)) {
469      roller.addWAL(wal);
470      wal.registerWALActionsListener(new WALActionsListener() {
471        private int appends = 0;
472
473        @Override
474        public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
475          this.appends++;
476          if (this.appends % whenToRoll == 0) {
477            LOG.info("Rolling after " + appends + " edits");
478            // We used to do explicit call to rollWriter but changed it to a request
479            // to avoid dead lock (there are less threads going on in this class than
480            // in the regionserver -- regionserver does not have the issue).
481            AbstractFSWALProvider.requestLogRoll(wal);
482          }
483        }
484
485        @Override
486        public void postSync(final long timeInNanos, final int handlerSyncs) {
487          syncMeter.mark();
488          syncHistogram.update(timeInNanos);
489          syncCountHistogram.update(handlerSyncs);
490        }
491
492        @Override
493        public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
494            final WALEdit logEdit) {
495          appendMeter.mark(size);
496        }
497      });
498    }
499
500    return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal);
501  }
502
503  private void closeRegion(final HRegion region) throws IOException {
504    if (region != null) {
505      region.close();
506      WAL wal = region.getWAL();
507      if (wal != null) {
508        wal.shutdown();
509      }
510    }
511  }
512
513  private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
514    if (fs.exists(dir)) {
515      fs.delete(dir, true);
516    }
517  }
518
519  private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
520    rand.nextBytes(key);
521    Put put = new Put(key);
522    for (int cf = 0; cf < numFamilies; ++cf) {
523      for (int q = 0; q < numQualifiers; ++q) {
524        rand.nextBytes(value);
525        put.addColumn(Bytes.toBytes(FAMILY_PREFIX + cf),
526            Bytes.toBytes(QUALIFIER_PREFIX + q), value);
527      }
528    }
529    return put;
530  }
531
532  private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
533    Thread[] threads = new Thread[numThreads];
534    long startTime = EnvironmentEdgeManager.currentTime();
535    for (int i = 0; i < numThreads; ++i) {
536      threads[i] = new Thread(runnable[i%runnable.length], "t" + i + ",r" + (i%runnable.length));
537      threads[i].start();
538    }
539    for (Thread t : threads) t.join();
540    long endTime = EnvironmentEdgeManager.currentTime();
541    return(endTime - startTime);
542  }
543
544  /**
545   * The guts of the {@link #main} method.
546   * Call this method to avoid the {@link #main(String[])} System.exit.
547   * @param args
548   * @return errCode
549   * @throws Exception
550   */
551  static int innerMain(final Configuration c, final String [] args) throws Exception {
552    return ToolRunner.run(c, new WALPerformanceEvaluation(), args);
553  }
554
555  public static void main(String[] args) throws Exception {
556     System.exit(innerMain(HBaseConfiguration.create(), args));
557  }
558}