001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import java.util.concurrent.Callable;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Executors;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
034import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
035import org.apache.hadoop.hbase.procedure2.util.StringUtils;
036import org.apache.hadoop.hbase.util.AbstractHBaseTool;
037
038import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
039import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
040
041public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
042  protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
043
044  // Command line options and defaults.
045  public static int DEFAULT_NUM_THREADS = 20;
046  public static Option NUM_THREADS_OPTION = new Option("threads", true,
047      "Number of parallel threads which will write insert/updates/deletes to WAL. Default: "
048      + DEFAULT_NUM_THREADS);
049  public static int DEFAULT_NUM_PROCS = 1000000;  // 1M
050  public static Option NUM_PROCS_OPTION = new Option("procs", true,
051      "Total number of procedures. Each procedure writes one insert and one update. Default: "
052      + DEFAULT_NUM_PROCS);
053  public static int DEFAULT_NUM_WALS = 0;
054  public static Option NUM_WALS_OPTION = new Option("wals", true,
055      "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
056          " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
057  public static int DEFAULT_STATE_SIZE = 1024;  // 1KB
058  public static Option STATE_SIZE_OPTION = new Option("state_size", true,
059      "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
060          + "bytes");
061  public static Option SYNC_OPTION = new Option("sync", true,
062      "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, "
063          + "hsync, nosync. Default: hflush");
064  public static String DEFAULT_SYNC_OPTION = "hflush";
065
066  public int numThreads;
067  public long numProcs;
068  public long numProcsPerWal = Long.MAX_VALUE;  // never roll wall based on this value.
069  public int numWals;
070  public String syncType;
071  public int stateSize;
072  static byte[] serializedState;
073  private WALProcedureStore store;
074
075  /** Used by {@link Worker}. */
076  private AtomicLong procIds = new AtomicLong(0);
077  private AtomicBoolean workersFailed = new AtomicBoolean(false);
078  // Timeout for worker threads.
079  private static final int WORKER_THREADS_TIMEOUT_SEC = 600;  // in seconds
080
081  // Non-default configurations.
082  private void setupConf() {
083    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType));
084    if (numWals > 0) {
085      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
086      numProcsPerWal = numProcs / numWals;
087    }
088  }
089
090  private void setupProcedureStore() throws IOException {
091    Path testDir = UTIL.getDataTestDir();
092    FileSystem fs = testDir.getFileSystem(conf);
093    Path logDir = new Path(testDir, "proc-logs");
094    System.out.println("Logs directory : " + logDir.toString());
095    fs.delete(logDir, true);
096    if ("nosync".equals(syncType)) {
097      store = new NoSyncWalProcedureStore(conf, logDir);
098    } else {
099      store = ProcedureTestingUtility.createWalStore(conf, logDir);
100    }
101    store.start(numThreads);
102    store.recoverLease();
103    store.load(new ProcedureTestingUtility.LoadCounter());
104    System.out.println("Starting new log : "
105        + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
106  }
107
108  private void tearDownProcedureStore() {
109    store.stop(false);
110    try {
111      store.getFileSystem().delete(store.getWALDir(), true);
112    } catch (IOException e) {
113      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
114          + "disk space. Location: " + store.getWALDir().toString());
115      e.printStackTrace();
116    }
117  }
118
119  /**
120   * Processes and validates command line options.
121   */
122  @Override
123  public void processOptions(CommandLine cmd) {
124    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
125    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
126    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
127    syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
128    assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType):
129        "sync argument can only accept one of these three values: hsync, hflush, nosync";
130    stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
131    serializedState = new byte[stateSize];
132    setupConf();
133  }
134
135  @Override
136  public void addOptions() {
137    addOption(NUM_THREADS_OPTION);
138    addOption(NUM_PROCS_OPTION);
139    addOption(NUM_WALS_OPTION);
140    addOption(SYNC_OPTION);
141    addOption(STATE_SIZE_OPTION);
142  }
143
144  @Override
145  public int doWork() {
146    try {
147      setupProcedureStore();
148      ExecutorService executor = Executors.newFixedThreadPool(numThreads);
149      Future<?>[] futures = new Future<?>[numThreads];
150      // Start worker threads.
151      long start = System.currentTimeMillis();
152      for (int i = 0; i < numThreads; i++) {
153        futures[i] = executor.submit(new Worker(start));
154      }
155      boolean failure = false;
156      try {
157        for (Future<?> future : futures) {
158          long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
159          failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
160        }
161      } catch (Exception e) {
162        System.err.println("Exception in worker thread.");
163        e.printStackTrace();
164        return EXIT_FAILURE;
165      }
166      executor.shutdown();
167      if (failure) {
168        return EXIT_FAILURE;
169      }
170      long timeTaken = System.currentTimeMillis() - start;
171      System.out.println("******************************************");
172      System.out.println("Num threads    : " + numThreads);
173      System.out.println("Num procedures : " + numProcs);
174      System.out.println("Sync type      : " + syncType);
175      System.out.println("Time taken     : " + (timeTaken / 1000.0f) + "sec");
176      System.out.println("******************************************");
177      System.out.println("Raw format for scripts");
178      System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
179              + "total_time_ms=%s]",
180          NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
181          SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
182          NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
183      return EXIT_SUCCESS;
184    } catch (IOException e) {
185      e.printStackTrace();
186      return EXIT_FAILURE;
187    } finally {
188      tearDownProcedureStore();
189    }
190  }
191
192  ///////////////////////////////
193  // HELPER CLASSES
194  ///////////////////////////////
195
196  /**
197   * Callable to generate load for wal by inserting/deleting/updating procedures.
198   * If procedure store fails to roll log file (throws IOException), all threads quit, and at
199   * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
200   */
201  private final class Worker implements Callable<Integer> {
202    private final long start;
203
204    public Worker(long start) {
205      this.start = start;
206    }
207
208    // TODO: Can also collect #procs, time taken by each thread to measure fairness.
209    @Override
210    public Integer call() throws IOException {
211      while (true) {
212        if (workersFailed.get()) {
213          return EXIT_FAILURE;
214        }
215        long procId = procIds.getAndIncrement();
216        if (procId >= numProcs) {
217          break;
218        }
219        if (procId != 0 && procId % 10000 == 0) {
220          long ms = System.currentTimeMillis() - start;
221          System.out.println("Wrote " + procId + " procedures in "
222              + StringUtils.humanTimeDiff(ms));
223        }
224        try{
225          if (procId > 0 && procId % numProcsPerWal == 0) {
226            store.rollWriterForTesting();
227            System.out.println("Starting new log : "
228                + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
229          }
230        } catch (IOException ioe) {
231          // Ask other threads to quit too.
232          workersFailed.set(true);
233          System.err.println("Exception when rolling log file. Current procId = " + procId);
234          ioe.printStackTrace();
235          return EXIT_FAILURE;
236        }
237        ProcedureTestingUtility.TestProcedure proc =
238            new ProcedureTestingUtility.TestProcedure(procId);
239        proc.setData(serializedState);
240        store.insert(proc, null);
241        store.update(proc);
242      }
243      return EXIT_SUCCESS;
244    }
245  }
246
247  private static class NoSyncWalProcedureStore extends WALProcedureStore {
248    public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
249      super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
250        @Override
251        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
252          // no-op
253        }
254      });
255    }
256
257    @Override
258    protected void syncStream(FSDataOutputStream stream) {
259      // no-op
260    }
261  }
262
263  public static void main(String[] args) throws IOException {
264    ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
265    tool.setConf(UTIL.getConfiguration());
266    tool.run(args);
267  }
268}