001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.wal;
020
021import static com.codahale.metrics.MetricRegistry.name;
022
023import com.codahale.metrics.ConsoleReporter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.Meter;
026import com.codahale.metrics.MetricFilter;
027import com.codahale.metrics.MetricRegistry;
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Map;
032import java.util.NavigableMap;
033import java.util.Random;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.concurrent.TimeUnit;
037import java.util.stream.IntStream;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.MockRegionServerServices;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionInfoBuilder;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.LogRoller;
057import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
058import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
059import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
060import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
061import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
062import org.apache.hadoop.hbase.trace.SpanReceiverHost;
063import org.apache.hadoop.hbase.trace.TraceUtil;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.FSUtils;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.WALProvider.Writer;
068import org.apache.hadoop.util.Tool;
069import org.apache.hadoop.util.ToolRunner;
070import org.apache.htrace.core.ProbabilitySampler;
071import org.apache.htrace.core.Sampler;
072import org.apache.htrace.core.TraceScope;
073import org.apache.htrace.core.Tracer;
074import org.apache.yetus.audience.InterfaceAudience;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078// imports for things that haven't moved from regionserver.wal yet.
079
080/**
081 * This class runs performance benchmarks for {@link WAL}.
082 * See usage for this tool by running:
083 * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code>
084 */
085@InterfaceAudience.Private
086public final class WALPerformanceEvaluation extends Configured implements Tool {
087  private static final Logger LOG =
088      LoggerFactory.getLogger(WALPerformanceEvaluation.class);
089
090  private final MetricRegistry metrics = new MetricRegistry();
091  private final Meter syncMeter =
092    metrics.meter(name(WALPerformanceEvaluation.class, "syncMeter", "syncs"));
093
094  private final Histogram syncHistogram = metrics.histogram(
095    name(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs"));
096  private final Histogram syncCountHistogram = metrics.histogram(
097    name(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync"));
098  private final Meter appendMeter = metrics.meter(
099    name(WALPerformanceEvaluation.class, "appendMeter", "bytes"));
100  private final Histogram latencyHistogram =
101    metrics.histogram(name(WALPerformanceEvaluation.class, "latencyHistogram", "nanos"));
102
103  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
104
105  private HBaseTestingUtility TEST_UTIL;
106
107  static final String TABLE_NAME = "WALPerformanceEvaluation";
108  static final String QUALIFIER_PREFIX = "q";
109  static final String FAMILY_PREFIX = "cf";
110
111  private int numQualifiers = 1;
112  private int valueSize = 512;
113  private int keySize = 16;
114
115  @Override
116  public void setConf(Configuration conf) {
117    super.setConf(conf);
118  }
119
120  /**
121   * Perform WAL.append() of Put object, for the number of iterations requested.
122   * Keys and Vaues are generated randomly, the number of column families,
123   * qualifiers and key/value size is tunable by the user.
124   */
125  class WALPutBenchmark implements Runnable {
126    private final long numIterations;
127    private final int numFamilies;
128    private final boolean noSync;
129    private final HRegion region;
130    private final int syncInterval;
131    private final Sampler loopSampler;
132    private final NavigableMap<byte[], Integer> scopes;
133
134    WALPutBenchmark(final HRegion region, final TableDescriptor htd,
135        final long numIterations, final boolean noSync, final int syncInterval,
136        final double traceFreq) {
137      this.numIterations = numIterations;
138      this.noSync = noSync;
139      this.syncInterval = syncInterval;
140      this.numFamilies = htd.getColumnFamilyCount();
141      this.region = region;
142      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
143      for(byte[] fam : htd.getColumnFamilyNames()) {
144        scopes.put(fam, 0);
145      }
146      String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
147      if (spanReceivers == null || spanReceivers.isEmpty()) {
148        loopSampler = Sampler.NEVER;
149      } else {
150        if (traceFreq <= 0.0) {
151          LOG.warn("Tracing enabled but traceFreq=0.");
152          loopSampler = Sampler.NEVER;
153        } else if (traceFreq >= 1.0) {
154          loopSampler = Sampler.ALWAYS;
155          if (numIterations > 1000) {
156            LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
157              + " SpanReceiver can keep up.");
158          }
159        } else {
160          getConf().setDouble("hbase.sampler.fraction", traceFreq);
161          loopSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(getConf()));
162        }
163      }
164    }
165
166    @Override
167    public void run() {
168      byte[] key = new byte[keySize];
169      byte[] value = new byte[valueSize];
170      Random rand = new Random(Thread.currentThread().getId());
171      WAL wal = region.getWAL();
172
173      try (TraceScope threadScope = TraceUtil.createTrace("WALPerfEval." + Thread.currentThread().getName())) {
174        long startTime = System.currentTimeMillis();
175        int lastSync = 0;
176        TraceUtil.addSampler(loopSampler);
177        for (int i = 0; i < numIterations; ++i) {
178          assert Tracer.getCurrentSpan() == threadScope.getSpan() : "Span leak detected.";
179          try (TraceScope loopScope = TraceUtil.createTrace("runLoopIter" + i)) {
180            long now = System.nanoTime();
181            Put put = setupPut(rand, key, value, numFamilies);
182            WALEdit walEdit = new WALEdit();
183            walEdit.add(put.getFamilyCellMap());
184            RegionInfo hri = region.getRegionInfo();
185            final WALKeyImpl logkey =
186                new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
187            wal.appendData(hri, logkey, walEdit);
188            if (!this.noSync) {
189              if (++lastSync >= this.syncInterval) {
190                wal.sync();
191                lastSync = 0;
192              }
193            }
194            latencyHistogram.update(System.nanoTime() - now);
195          }
196        }
197      } catch (Exception e) {
198        LOG.error(getClass().getSimpleName() + " Thread failed", e);
199      }
200    }
201  }
202
203  @Override
204  public int run(String[] args) throws Exception {
205    Path rootRegionDir = null;
206    int numThreads = 1;
207    long numIterations = 1000000;
208    int numFamilies = 1;
209    int syncInterval = 0;
210    boolean noSync = false;
211    boolean verify = false;
212    boolean verbose = false;
213    boolean cleanup = true;
214    boolean noclosefs = false;
215    long roll = Long.MAX_VALUE;
216    boolean compress = false;
217    String cipher = null;
218    int numRegions = 1;
219    String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
220    boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
221    double traceFreq = 1.0;
222    // Process command line args
223    for (int i = 0; i < args.length; i++) {
224      String cmd = args[i];
225      try {
226        if (cmd.equals("-threads")) {
227          numThreads = Integer.parseInt(args[++i]);
228        } else if (cmd.equals("-iterations")) {
229          numIterations = Long.parseLong(args[++i]);
230        } else if (cmd.equals("-path")) {
231          rootRegionDir = new Path(args[++i]);
232        } else if (cmd.equals("-families")) {
233          numFamilies = Integer.parseInt(args[++i]);
234        } else if (cmd.equals("-qualifiers")) {
235          numQualifiers = Integer.parseInt(args[++i]);
236        } else if (cmd.equals("-keySize")) {
237          keySize = Integer.parseInt(args[++i]);
238        } else if (cmd.equals("-valueSize")) {
239          valueSize = Integer.parseInt(args[++i]);
240        } else if (cmd.equals("-syncInterval")) {
241          syncInterval = Integer.parseInt(args[++i]);
242        } else if (cmd.equals("-nosync")) {
243          noSync = true;
244        } else if (cmd.equals("-verify")) {
245          verify = true;
246        } else if (cmd.equals("-verbose")) {
247          verbose = true;
248        } else if (cmd.equals("-nocleanup")) {
249          cleanup = false;
250        } else if (cmd.equals("-noclosefs")) {
251          noclosefs = true;
252        } else if (cmd.equals("-roll")) {
253          roll = Long.parseLong(args[++i]);
254        } else if (cmd.equals("-compress")) {
255          compress = true;
256        } else if (cmd.equals("-encryption")) {
257          cipher = args[++i];
258        } else if (cmd.equals("-regions")) {
259          numRegions = Integer.parseInt(args[++i]);
260        } else if (cmd.equals("-traceFreq")) {
261          traceFreq = Double.parseDouble(args[++i]);
262        } else if (cmd.equals("-h")) {
263          printUsageAndExit();
264        } else if (cmd.equals("--help")) {
265          printUsageAndExit();
266        } else {
267          System.err.println("UNEXPECTED: " + cmd);
268          printUsageAndExit();
269        }
270      } catch (Exception e) {
271        printUsageAndExit();
272      }
273    }
274
275    if (compress) {
276      Configuration conf = getConf();
277      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
278    }
279
280    if (cipher != null) {
281      // Set up WAL for encryption
282      Configuration conf = getConf();
283      conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
284      conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
285      conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
286        WAL.Reader.class);
287      conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
288        Writer.class);
289      conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
290      conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
291    }
292
293    if (numThreads < numRegions) {
294      LOG.warn("Number of threads is less than the number of regions; some regions will sit idle.");
295    }
296
297    // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
298    // In regionserver, number of handlers == number of threads.
299    getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
300
301    if (rootRegionDir == null) {
302      TEST_UTIL = new HBaseTestingUtility(getConf());
303      rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation");
304    }
305    // Run WAL Performance Evaluation
306    // First set the fs from configs.  In case we are on hadoop1
307    FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
308    FileSystem fs = FileSystem.get(getConf());
309    LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir);
310
311    SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
312    final Sampler sampler = trace ? Sampler.ALWAYS : Sampler.NEVER;
313    TraceUtil.addSampler(sampler);
314    TraceScope scope = TraceUtil.createTrace("WALPerfEval");
315
316    try {
317      rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
318      cleanRegionRootDir(fs, rootRegionDir);
319      FSUtils.setRootDir(getConf(), rootRegionDir);
320      final WALFactory wals = new WALFactory(getConf(), "wals");
321      final HRegion[] regions = new HRegion[numRegions];
322      final Runnable[] benchmarks = new Runnable[numRegions];
323      final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
324      final LogRoller roller = new LogRoller(mockServices);
325      Threads.setDaemonThreadRunning(roller.getThread(), "WALPerfEval.logRoller");
326
327      try {
328        for(int i = 0; i < numRegions; i++) {
329          // Initialize Table Descriptor
330          // a table per desired region means we can avoid carving up the key space
331          final TableDescriptor htd = createHTableDescriptor(i, numFamilies);
332          regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
333          benchmarks[i] = TraceUtil.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync,
334              syncInterval, traceFreq), "");
335        }
336        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).
337          outputTo(System.out).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
338        reporter.start(30, TimeUnit.SECONDS);
339
340        long putTime = runBenchmark(benchmarks, numThreads);
341        logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
342          ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
343
344        for (int i = 0; i < numRegions; i++) {
345          if (regions[i] != null) {
346            closeRegion(regions[i]);
347            regions[i] = null;
348          }
349        }
350        if (verify) {
351          LOG.info("verifying written log entries.");
352          Path dir = new Path(FSUtils.getRootDir(getConf()),
353            AbstractFSWALProvider.getWALDirectoryName("wals"));
354          long editCount = 0;
355          FileStatus [] fsss = fs.listStatus(dir);
356          if (fsss.length == 0) throw new IllegalStateException("No WAL found");
357          for (FileStatus fss: fsss) {
358            Path p = fss.getPath();
359            if (!fs.exists(p)) throw new IllegalStateException(p.toString());
360            editCount += verify(wals, p, verbose);
361          }
362          long expected = numIterations * numThreads;
363          if (editCount != expected) {
364            throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
365          }
366        }
367      } finally {
368        mockServices.stop("test clean up.");
369        for (int i = 0; i < numRegions; i++) {
370          if (regions[i] != null) {
371            closeRegion(regions[i]);
372          }
373        }
374        if (null != roller) {
375          LOG.info("shutting down log roller.");
376          roller.close();
377        }
378        wals.shutdown();
379        // Remove the root dir for this test region
380        if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
381      }
382    } finally {
383      // We may be called inside a test that wants to keep on using the fs.
384      if (!noclosefs) {
385        fs.close();
386      }
387      if (scope != null) {
388        scope.close();
389      }
390      if (receiverHost != null) {
391        receiverHost.closeReceivers();
392      }
393    }
394
395    return(0);
396  }
397
398  private static TableDescriptor createHTableDescriptor(final int regionNum,
399      final int numFamilies) {
400    TableDescriptorBuilder builder =
401        TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME + ":" + regionNum));
402    IntStream.range(0, numFamilies)
403        .mapToObj(i -> ColumnFamilyDescriptorBuilder.of(FAMILY_PREFIX + i))
404        .forEachOrdered(builder::setColumnFamily);
405    return builder.build();
406  }
407
408  /**
409   * Verify the content of the WAL file.
410   * Verify that the file has expected number of edits.
411   * @param wals may not be null
412   * @param wal
413   * @return Count of edits.
414   * @throws IOException
415   */
416  private long verify(final WALFactory wals, final Path wal, final boolean verbose)
417      throws IOException {
418    WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
419    long count = 0;
420    Map<String, Long> sequenceIds = new HashMap<>();
421    try {
422      while (true) {
423        WAL.Entry e = reader.next();
424        if (e == null) {
425          LOG.debug("Read count=" + count + " from " + wal);
426          break;
427        }
428        count++;
429        long seqid = e.getKey().getSequenceId();
430        if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
431          // sequenceIds should be increasing for every regions
432          if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
433            throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
434                + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
435                + ", current seqid = " + seqid);
436          }
437        }
438        // update the sequence Id.
439        sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
440        if (verbose) LOG.info("seqid=" + seqid);
441      }
442    } finally {
443      reader.close();
444    }
445    return count;
446  }
447
448  private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
449    float tsec = totalTime / 1000.0f;
450    LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
451
452  }
453
454  private void printUsageAndExit() {
455    System.err.printf("Usage: hbase %s [options]\n", getClass().getName());
456    System.err.println(" where [options] are:");
457    System.err.println("  -h|-help         Show this help and exit.");
458    System.err.println("  -threads <N>     Number of threads writing on the WAL.");
459    System.err.println("  -regions <N>     Number of regions to open in the WAL. Default: 1");
460    System.err.println("  -iterations <N>  Number of iterations per thread.");
461    System.err.println("  -path <PATH>     Path where region's root directory is created.");
462    System.err.println("  -families <N>    Number of column families to write.");
463    System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
464    System.err.println("  -keySize <N>     Row key size in byte.");
465    System.err.println("  -valueSize <N>   Row/Col value size in byte.");
466    System.err.println("  -nocleanup       Do NOT remove test data when done.");
467    System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
468    System.err.println("  -nosync          Append without syncing");
469    System.err.println("  -syncInterval <N> Append N edits and then sync. " +
470      "Default=0, i.e. sync every edit.");
471    System.err.println("  -verify          Verify edits written in sequence");
472    System.err.println("  -verbose         Output extra info; " +
473      "e.g. all edit seq ids when verifying");
474    System.err.println("  -roll <N>        Roll the way every N appends");
475    System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
476    System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 1.0, " +
477      "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
478    System.err.println("");
479    System.err.println("Examples:");
480    System.err.println("");
481    System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
482      "verification afterward do:");
483    System.err.println(" $ hbase org.apache.hadoop.hbase.wal." +
484      "WALPerformanceEvaluation \\");
485    System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
486      "-threads 100 -roll 10000 -verify");
487    System.exit(1);
488  }
489
490  private final Set<WAL> walsListenedTo = new HashSet<>();
491
492  private HRegion openRegion(final FileSystem fs, final Path dir, final TableDescriptor htd,
493      final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException {
494    // Initialize HRegion
495    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
496    // Initialize WAL
497    final WAL wal = wals.getWAL(regionInfo);
498    // If we haven't already, attach a listener to this wal to handle rolls and metrics.
499    if (walsListenedTo.add(wal)) {
500      roller.addWAL(wal);
501      wal.registerWALActionsListener(new WALActionsListener() {
502        private int appends = 0;
503
504        @Override
505        public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
506          this.appends++;
507          if (this.appends % whenToRoll == 0) {
508            LOG.info("Rolling after " + appends + " edits");
509            // We used to do explicit call to rollWriter but changed it to a request
510            // to avoid dead lock (there are less threads going on in this class than
511            // in the regionserver -- regionserver does not have the issue).
512            AbstractFSWALProvider.requestLogRoll(wal);
513          }
514        }
515
516        @Override
517        public void postSync(final long timeInNanos, final int handlerSyncs) {
518          syncMeter.mark();
519          syncHistogram.update(timeInNanos);
520          syncCountHistogram.update(handlerSyncs);
521        }
522
523        @Override
524        public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
525            final WALEdit logEdit) {
526          appendMeter.mark(size);
527        }
528      });
529    }
530
531    return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal);
532  }
533
534  private void closeRegion(final HRegion region) throws IOException {
535    if (region != null) {
536      region.close();
537      WAL wal = region.getWAL();
538      if (wal != null) {
539        wal.shutdown();
540      }
541    }
542  }
543
544  private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
545    if (fs.exists(dir)) {
546      fs.delete(dir, true);
547    }
548  }
549
550  private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
551    rand.nextBytes(key);
552    Put put = new Put(key);
553    for (int cf = 0; cf < numFamilies; ++cf) {
554      for (int q = 0; q < numQualifiers; ++q) {
555        rand.nextBytes(value);
556        put.addColumn(Bytes.toBytes(FAMILY_PREFIX + cf),
557            Bytes.toBytes(QUALIFIER_PREFIX + q), value);
558      }
559    }
560    return put;
561  }
562
563  private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
564    Thread[] threads = new Thread[numThreads];
565    long startTime = System.currentTimeMillis();
566    for (int i = 0; i < numThreads; ++i) {
567      threads[i] = new Thread(runnable[i%runnable.length], "t" + i + ",r" + (i%runnable.length));
568      threads[i].start();
569    }
570    for (Thread t : threads) t.join();
571    long endTime = System.currentTimeMillis();
572    return(endTime - startTime);
573  }
574
575  /**
576   * The guts of the {@link #main} method.
577   * Call this method to avoid the {@link #main(String[])} System.exit.
578   * @param args
579   * @return errCode
580   * @throws Exception
581   */
582  static int innerMain(final Configuration c, final String [] args) throws Exception {
583    return ToolRunner.run(c, new WALPerformanceEvaluation(), args);
584  }
585
586  public static void main(String[] args) throws Exception {
587     System.exit(innerMain(HBaseConfiguration.create(), args));
588  }
589}