001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.wal;
020
021import static com.codahale.metrics.MetricRegistry.name;
022
023import com.codahale.metrics.ConsoleReporter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.Meter;
026import com.codahale.metrics.MetricFilter;
027import com.codahale.metrics.MetricRegistry;
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Map;
032import java.util.NavigableMap;
033import java.util.Random;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.concurrent.TimeUnit;
037import java.util.stream.IntStream;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.MockRegionServerServices;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionInfoBuilder;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.LogRoller;
057import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
058import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
059import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
060import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
061import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
062import org.apache.hadoop.hbase.trace.SpanReceiverHost;
063import org.apache.hadoop.hbase.trace.TraceUtil;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.WALProvider.Writer;
068import org.apache.hadoop.util.Tool;
069import org.apache.hadoop.util.ToolRunner;
070import org.apache.htrace.core.ProbabilitySampler;
071import org.apache.htrace.core.Sampler;
072import org.apache.htrace.core.TraceScope;
073import org.apache.htrace.core.Tracer;
074import org.apache.yetus.audience.InterfaceAudience;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078// imports for things that haven't moved from regionserver.wal yet.
079
080/**
081 * This class runs performance benchmarks for {@link WAL}.
082 * See usage for this tool by running:
083 * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code>
084 */
085@InterfaceAudience.Private
086public final class WALPerformanceEvaluation extends Configured implements Tool {
087  private static final Logger LOG =
088      LoggerFactory.getLogger(WALPerformanceEvaluation.class);
089
090  private final MetricRegistry metrics = new MetricRegistry();
091  private final Meter syncMeter =
092    metrics.meter(name(WALPerformanceEvaluation.class, "syncMeter", "syncs"));
093
094  private final Histogram syncHistogram = metrics.histogram(
095    name(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs"));
096  private final Histogram syncCountHistogram = metrics.histogram(
097    name(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync"));
098  private final Meter appendMeter = metrics.meter(
099    name(WALPerformanceEvaluation.class, "appendMeter", "bytes"));
100  private final Histogram latencyHistogram =
101    metrics.histogram(name(WALPerformanceEvaluation.class, "latencyHistogram", "nanos"));
102
103  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
104
105  private HBaseTestingUtility TEST_UTIL;
106
107  static final String TABLE_NAME = "WALPerformanceEvaluation";
108  static final String QUALIFIER_PREFIX = "q";
109  static final String FAMILY_PREFIX = "cf";
110
111  private int numQualifiers = 1;
112  private int valueSize = 512;
113  private int keySize = 16;
114
115  @Override
116  public void setConf(Configuration conf) {
117    super.setConf(conf);
118  }
119
120  /**
121   * Perform WAL.append() of Put object, for the number of iterations requested.
122   * Keys and Vaues are generated randomly, the number of column families,
123   * qualifiers and key/value size is tunable by the user.
124   */
125  class WALPutBenchmark implements Runnable {
126    private final long numIterations;
127    private final int numFamilies;
128    private final boolean noSync;
129    private final HRegion region;
130    private final int syncInterval;
131    private final Sampler loopSampler;
132    private final NavigableMap<byte[], Integer> scopes;
133
134    WALPutBenchmark(final HRegion region, final TableDescriptor htd,
135        final long numIterations, final boolean noSync, final int syncInterval,
136        final double traceFreq) {
137      this.numIterations = numIterations;
138      this.noSync = noSync;
139      this.syncInterval = syncInterval;
140      this.numFamilies = htd.getColumnFamilyCount();
141      this.region = region;
142      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
143      for(byte[] fam : htd.getColumnFamilyNames()) {
144        scopes.put(fam, 0);
145      }
146      String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
147      if (spanReceivers == null || spanReceivers.isEmpty()) {
148        loopSampler = Sampler.NEVER;
149      } else {
150        if (traceFreq <= 0.0) {
151          LOG.warn("Tracing enabled but traceFreq=0.");
152          loopSampler = Sampler.NEVER;
153        } else if (traceFreq >= 1.0) {
154          loopSampler = Sampler.ALWAYS;
155          if (numIterations > 1000) {
156            LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
157              + " SpanReceiver can keep up.");
158          }
159        } else {
160          getConf().setDouble("hbase.sampler.fraction", traceFreq);
161          loopSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(getConf()));
162        }
163      }
164    }
165
166    @Override
167    public void run() {
168      byte[] key = new byte[keySize];
169      byte[] value = new byte[valueSize];
170      Random rand = new Random(Thread.currentThread().getId());
171      WAL wal = region.getWAL();
172
173      try (TraceScope threadScope = TraceUtil.createTrace("WALPerfEval." + Thread.currentThread().getName())) {
174        int lastSync = 0;
175        TraceUtil.addSampler(loopSampler);
176        for (int i = 0; i < numIterations; ++i) {
177          assert Tracer.getCurrentSpan() == threadScope.getSpan() : "Span leak detected.";
178          try (TraceScope loopScope = TraceUtil.createTrace("runLoopIter" + i)) {
179            long now = System.nanoTime();
180            Put put = setupPut(rand, key, value, numFamilies);
181            WALEdit walEdit = new WALEdit();
182            walEdit.add(put.getFamilyCellMap());
183            RegionInfo hri = region.getRegionInfo();
184            final WALKeyImpl logkey =
185                new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
186            wal.appendData(hri, logkey, walEdit);
187            if (!this.noSync) {
188              if (++lastSync >= this.syncInterval) {
189                wal.sync();
190                lastSync = 0;
191              }
192            }
193            latencyHistogram.update(System.nanoTime() - now);
194          }
195        }
196      } catch (Exception e) {
197        LOG.error(getClass().getSimpleName() + " Thread failed", e);
198      }
199    }
200  }
201
202  @Override
203  public int run(String[] args) throws Exception {
204    Path rootRegionDir = null;
205    int numThreads = 1;
206    long numIterations = 1000000;
207    int numFamilies = 1;
208    int syncInterval = 0;
209    boolean noSync = false;
210    boolean verify = false;
211    boolean verbose = false;
212    boolean cleanup = true;
213    boolean noclosefs = false;
214    long roll = Long.MAX_VALUE;
215    boolean compress = false;
216    String cipher = null;
217    int numRegions = 1;
218    String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
219    boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
220    double traceFreq = 1.0;
221    // Process command line args
222    for (int i = 0; i < args.length; i++) {
223      String cmd = args[i];
224      try {
225        if (cmd.equals("-threads")) {
226          numThreads = Integer.parseInt(args[++i]);
227        } else if (cmd.equals("-iterations")) {
228          numIterations = Long.parseLong(args[++i]);
229        } else if (cmd.equals("-path")) {
230          rootRegionDir = new Path(args[++i]);
231        } else if (cmd.equals("-families")) {
232          numFamilies = Integer.parseInt(args[++i]);
233        } else if (cmd.equals("-qualifiers")) {
234          numQualifiers = Integer.parseInt(args[++i]);
235        } else if (cmd.equals("-keySize")) {
236          keySize = Integer.parseInt(args[++i]);
237        } else if (cmd.equals("-valueSize")) {
238          valueSize = Integer.parseInt(args[++i]);
239        } else if (cmd.equals("-syncInterval")) {
240          syncInterval = Integer.parseInt(args[++i]);
241        } else if (cmd.equals("-nosync")) {
242          noSync = true;
243        } else if (cmd.equals("-verify")) {
244          verify = true;
245        } else if (cmd.equals("-verbose")) {
246          verbose = true;
247        } else if (cmd.equals("-nocleanup")) {
248          cleanup = false;
249        } else if (cmd.equals("-noclosefs")) {
250          noclosefs = true;
251        } else if (cmd.equals("-roll")) {
252          roll = Long.parseLong(args[++i]);
253        } else if (cmd.equals("-compress")) {
254          compress = true;
255        } else if (cmd.equals("-encryption")) {
256          cipher = args[++i];
257        } else if (cmd.equals("-regions")) {
258          numRegions = Integer.parseInt(args[++i]);
259        } else if (cmd.equals("-traceFreq")) {
260          traceFreq = Double.parseDouble(args[++i]);
261        } else if (cmd.equals("-h")) {
262          printUsageAndExit();
263        } else if (cmd.equals("--help")) {
264          printUsageAndExit();
265        } else {
266          System.err.println("UNEXPECTED: " + cmd);
267          printUsageAndExit();
268        }
269      } catch (Exception e) {
270        printUsageAndExit();
271      }
272    }
273
274    if (compress) {
275      Configuration conf = getConf();
276      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
277    }
278
279    if (cipher != null) {
280      // Set up WAL for encryption
281      Configuration conf = getConf();
282      conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
283      conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
284      conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
285        WAL.Reader.class);
286      conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
287        Writer.class);
288      conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
289      conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
290    }
291
292    if (numThreads < numRegions) {
293      LOG.warn("Number of threads is less than the number of regions; some regions will sit idle.");
294    }
295
296    // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
297    // In regionserver, number of handlers == number of threads.
298    getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
299
300    if (rootRegionDir == null) {
301      TEST_UTIL = new HBaseTestingUtility(getConf());
302      rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation");
303    }
304    // Run WAL Performance Evaluation
305    // First set the fs from configs.  In case we are on hadoop1
306    CommonFSUtils.setFsDefault(getConf(), CommonFSUtils.getRootDir(getConf()));
307    FileSystem fs = FileSystem.get(getConf());
308    LOG.info("FileSystem={}, rootDir={}", fs, rootRegionDir);
309
310    SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
311    final Sampler sampler = trace ? Sampler.ALWAYS : Sampler.NEVER;
312    TraceUtil.addSampler(sampler);
313    TraceScope scope = TraceUtil.createTrace("WALPerfEval");
314
315    try {
316      rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
317      cleanRegionRootDir(fs, rootRegionDir);
318      CommonFSUtils.setRootDir(getConf(), rootRegionDir);
319      final WALFactory wals = new WALFactory(getConf(), "wals");
320      final HRegion[] regions = new HRegion[numRegions];
321      final Runnable[] benchmarks = new Runnable[numRegions];
322      final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
323      final LogRoller roller = new LogRoller(mockServices);
324      Threads.setDaemonThreadRunning(roller, "WALPerfEval.logRoller");
325
326      try {
327        for(int i = 0; i < numRegions; i++) {
328          // Initialize Table Descriptor
329          // a table per desired region means we can avoid carving up the key space
330          final TableDescriptor htd = createHTableDescriptor(i, numFamilies);
331          regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
332          benchmarks[i] = TraceUtil.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync,
333              syncInterval, traceFreq), "");
334        }
335        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).
336          outputTo(System.out).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
337        reporter.start(30, TimeUnit.SECONDS);
338
339        long putTime = runBenchmark(benchmarks, numThreads);
340        logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
341          ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
342
343        for (int i = 0; i < numRegions; i++) {
344          if (regions[i] != null) {
345            closeRegion(regions[i]);
346            regions[i] = null;
347          }
348        }
349        if (verify) {
350          LOG.info("verifying written log entries.");
351          Path dir = new Path(CommonFSUtils.getRootDir(getConf()),
352            AbstractFSWALProvider.getWALDirectoryName("wals"));
353          long editCount = 0;
354          FileStatus [] fsss = fs.listStatus(dir);
355          if (fsss.length == 0) throw new IllegalStateException("No WAL found");
356          for (FileStatus fss: fsss) {
357            Path p = fss.getPath();
358            if (!fs.exists(p)) throw new IllegalStateException(p.toString());
359            editCount += verify(wals, p, verbose);
360          }
361          long expected = numIterations * numThreads;
362          if (editCount != expected) {
363            throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
364          }
365        }
366      } finally {
367        mockServices.stop("test clean up.");
368        for (int i = 0; i < numRegions; i++) {
369          if (regions[i] != null) {
370            closeRegion(regions[i]);
371          }
372        }
373        if (null != roller) {
374          LOG.info("shutting down log roller.");
375          roller.close();
376        }
377        wals.shutdown();
378        // Remove the root dir for this test region
379        if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
380      }
381    } finally {
382      // We may be called inside a test that wants to keep on using the fs.
383      if (!noclosefs) {
384        fs.close();
385      }
386      if (scope != null) {
387        scope.close();
388      }
389      if (receiverHost != null) {
390        receiverHost.closeReceivers();
391      }
392    }
393
394    return(0);
395  }
396
397  private static TableDescriptor createHTableDescriptor(final int regionNum,
398      final int numFamilies) {
399    TableDescriptorBuilder builder =
400        TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME + ":" + regionNum));
401    IntStream.range(0, numFamilies)
402        .mapToObj(i -> ColumnFamilyDescriptorBuilder.of(FAMILY_PREFIX + i))
403        .forEachOrdered(builder::setColumnFamily);
404    return builder.build();
405  }
406
407  /**
408   * Verify the content of the WAL file.
409   * Verify that the file has expected number of edits.
410   * @param wals may not be null
411   * @param wal
412   * @return Count of edits.
413   * @throws IOException
414   */
415  private long verify(final WALFactory wals, final Path wal, final boolean verbose)
416      throws IOException {
417    WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
418    long count = 0;
419    Map<String, Long> sequenceIds = new HashMap<>();
420    try {
421      while (true) {
422        WAL.Entry e = reader.next();
423        if (e == null) {
424          LOG.debug("Read count=" + count + " from " + wal);
425          break;
426        }
427        count++;
428        long seqid = e.getKey().getSequenceId();
429        if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
430          // sequenceIds should be increasing for every regions
431          if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
432            throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
433                + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
434                + ", current seqid = " + seqid);
435          }
436        }
437        // update the sequence Id.
438        sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
439        if (verbose) LOG.info("seqid=" + seqid);
440      }
441    } finally {
442      reader.close();
443    }
444    return count;
445  }
446
447  private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
448    float tsec = totalTime / 1000.0f;
449    LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
450
451  }
452
453  private void printUsageAndExit() {
454    System.err.printf("Usage: hbase %s [options]\n", getClass().getName());
455    System.err.println(" where [options] are:");
456    System.err.println("  -h|-help         Show this help and exit.");
457    System.err.println("  -threads <N>     Number of threads writing on the WAL.");
458    System.err.println("  -regions <N>     Number of regions to open in the WAL. Default: 1");
459    System.err.println("  -iterations <N>  Number of iterations per thread.");
460    System.err.println("  -path <PATH>     Path where region's root directory is created.");
461    System.err.println("  -families <N>    Number of column families to write.");
462    System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
463    System.err.println("  -keySize <N>     Row key size in byte.");
464    System.err.println("  -valueSize <N>   Row/Col value size in byte.");
465    System.err.println("  -nocleanup       Do NOT remove test data when done.");
466    System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
467    System.err.println("  -nosync          Append without syncing");
468    System.err.println("  -syncInterval <N> Append N edits and then sync. " +
469      "Default=0, i.e. sync every edit.");
470    System.err.println("  -verify          Verify edits written in sequence");
471    System.err.println("  -verbose         Output extra info; " +
472      "e.g. all edit seq ids when verifying");
473    System.err.println("  -roll <N>        Roll the way every N appends");
474    System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
475    System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 1.0, " +
476      "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
477    System.err.println("");
478    System.err.println("Examples:");
479    System.err.println("");
480    System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
481      "verification afterward do:");
482    System.err.println(" $ hbase org.apache.hadoop.hbase.wal." +
483      "WALPerformanceEvaluation \\");
484    System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
485      "-threads 100 -roll 10000 -verify");
486    System.exit(1);
487  }
488
489  private final Set<WAL> walsListenedTo = new HashSet<>();
490
491  private HRegion openRegion(final FileSystem fs, final Path dir, final TableDescriptor htd,
492      final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException {
493    // Initialize HRegion
494    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
495    // Initialize WAL
496    final WAL wal = wals.getWAL(regionInfo);
497    // If we haven't already, attach a listener to this wal to handle rolls and metrics.
498    if (walsListenedTo.add(wal)) {
499      roller.addWAL(wal);
500      wal.registerWALActionsListener(new WALActionsListener() {
501        private int appends = 0;
502
503        @Override
504        public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
505          this.appends++;
506          if (this.appends % whenToRoll == 0) {
507            LOG.info("Rolling after " + appends + " edits");
508            // We used to do explicit call to rollWriter but changed it to a request
509            // to avoid dead lock (there are less threads going on in this class than
510            // in the regionserver -- regionserver does not have the issue).
511            AbstractFSWALProvider.requestLogRoll(wal);
512          }
513        }
514
515        @Override
516        public void postSync(final long timeInNanos, final int handlerSyncs) {
517          syncMeter.mark();
518          syncHistogram.update(timeInNanos);
519          syncCountHistogram.update(handlerSyncs);
520        }
521
522        @Override
523        public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
524            final WALEdit logEdit) {
525          appendMeter.mark(size);
526        }
527      });
528    }
529
530    return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal);
531  }
532
533  private void closeRegion(final HRegion region) throws IOException {
534    if (region != null) {
535      region.close();
536      WAL wal = region.getWAL();
537      if (wal != null) {
538        wal.shutdown();
539      }
540    }
541  }
542
543  private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
544    if (fs.exists(dir)) {
545      fs.delete(dir, true);
546    }
547  }
548
549  private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
550    rand.nextBytes(key);
551    Put put = new Put(key);
552    for (int cf = 0; cf < numFamilies; ++cf) {
553      for (int q = 0; q < numQualifiers; ++q) {
554        rand.nextBytes(value);
555        put.addColumn(Bytes.toBytes(FAMILY_PREFIX + cf),
556            Bytes.toBytes(QUALIFIER_PREFIX + q), value);
557      }
558    }
559    return put;
560  }
561
562  private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
563    Thread[] threads = new Thread[numThreads];
564    long startTime = System.currentTimeMillis();
565    for (int i = 0; i < numThreads; ++i) {
566      threads[i] = new Thread(runnable[i%runnable.length], "t" + i + ",r" + (i%runnable.length));
567      threads[i].start();
568    }
569    for (Thread t : threads) t.join();
570    long endTime = System.currentTimeMillis();
571    return(endTime - startTime);
572  }
573
574  /**
575   * The guts of the {@link #main} method.
576   * Call this method to avoid the {@link #main(String[])} System.exit.
577   * @param args
578   * @return errCode
579   * @throws Exception
580   */
581  static int innerMain(final Configuration c, final String [] args) throws Exception {
582    return ToolRunner.run(c, new WALPerformanceEvaluation(), args);
583  }
584
585  public static void main(String[] args) throws Exception {
586     System.exit(innerMain(HBaseConfiguration.create(), args));
587  }
588}