001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2.store.wal;
020
021import java.io.IOException;
022import java.util.concurrent.Callable;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import java.util.concurrent.Future;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicLong;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
035import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
036
037import org.apache.hadoop.hbase.procedure2.util.StringUtils;
038import org.apache.hadoop.hbase.util.AbstractHBaseTool;
039
040import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
041import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
042
043public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
044  protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
045
046  // Command line options and defaults.
047  public static int DEFAULT_NUM_THREADS = 20;
048  public static Option NUM_THREADS_OPTION = new Option("threads", true,
049      "Number of parallel threads which will write insert/updates/deletes to WAL. Default: "
050      + DEFAULT_NUM_THREADS);
051  public static int DEFAULT_NUM_PROCS = 1000000;  // 1M
052  public static Option NUM_PROCS_OPTION = new Option("procs", true,
053      "Total number of procedures. Each procedure writes one insert and one update. Default: "
054      + DEFAULT_NUM_PROCS);
055  public static int DEFAULT_NUM_WALS = 0;
056  public static Option NUM_WALS_OPTION = new Option("wals", true,
057      "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
058          " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
059  public static int DEFAULT_STATE_SIZE = 1024;  // 1KB
060  public static Option STATE_SIZE_OPTION = new Option("state_size", true,
061      "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
062          + "bytes");
063  public static Option SYNC_OPTION = new Option("sync", true,
064      "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, "
065          + "hsync, nosync. Default: hflush");
066  public static String DEFAULT_SYNC_OPTION = "hflush";
067
068  public int numThreads;
069  public long numProcs;
070  public long numProcsPerWal = Long.MAX_VALUE;  // never roll wall based on this value.
071  public int numWals;
072  public String syncType;
073  public int stateSize;
074  static byte[] serializedState;
075  private WALProcedureStore store;
076
077  /** Used by {@link Worker}. */
078  private AtomicLong procIds = new AtomicLong(0);
079  private AtomicBoolean workersFailed = new AtomicBoolean(false);
080  // Timeout for worker threads.
081  private static final int WORKER_THREADS_TIMEOUT_SEC = 600;  // in seconds
082
083  // Non-default configurations.
084  private void setupConf() {
085    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType));
086    if (numWals > 0) {
087      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
088      numProcsPerWal = numProcs / numWals;
089    }
090  }
091
092  private void setupProcedureStore() throws IOException {
093    Path testDir = UTIL.getDataTestDir();
094    FileSystem fs = testDir.getFileSystem(conf);
095    Path logDir = new Path(testDir, "proc-logs");
096    System.out.println("Logs directory : " + logDir.toString());
097    fs.delete(logDir, true);
098    if ("nosync".equals(syncType)) {
099      store = new NoSyncWalProcedureStore(conf, logDir);
100    } else {
101      store = ProcedureTestingUtility.createWalStore(conf, logDir);
102    }
103    store.start(numThreads);
104    store.recoverLease();
105    store.load(new ProcedureTestingUtility.LoadCounter());
106    System.out.println("Starting new log : "
107        + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
108  }
109
110  private void tearDownProcedureStore() {
111    store.stop(false);
112    try {
113      store.getFileSystem().delete(store.getWALDir(), true);
114    } catch (IOException e) {
115      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
116          + "disk space. Location: " + store.getWALDir().toString());
117      e.printStackTrace();
118    }
119  }
120
121  /**
122   * Processes and validates command line options.
123   */
124  @Override
125  public void processOptions(CommandLine cmd) {
126    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
127    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
128    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
129    syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
130    assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType):
131        "sync argument can only accept one of these three values: hsync, hflush, nosync";
132    stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
133    serializedState = new byte[stateSize];
134    setupConf();
135  }
136
137  @Override
138  public void addOptions() {
139    addOption(NUM_THREADS_OPTION);
140    addOption(NUM_PROCS_OPTION);
141    addOption(NUM_WALS_OPTION);
142    addOption(SYNC_OPTION);
143    addOption(STATE_SIZE_OPTION);
144  }
145
146  @Override
147  public int doWork() {
148    try {
149      setupProcedureStore();
150      ExecutorService executor = Executors.newFixedThreadPool(numThreads);
151      Future<?>[] futures = new Future<?>[numThreads];
152      // Start worker threads.
153      long start = System.currentTimeMillis();
154      for (int i = 0; i < numThreads; i++) {
155        futures[i] = executor.submit(new Worker(start));
156      }
157      boolean failure = false;
158      try {
159        for (Future<?> future : futures) {
160          long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
161          failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
162        }
163      } catch (Exception e) {
164        System.err.println("Exception in worker thread.");
165        e.printStackTrace();
166        return EXIT_FAILURE;
167      }
168      executor.shutdown();
169      if (failure) {
170        return EXIT_FAILURE;
171      }
172      long timeTaken = System.currentTimeMillis() - start;
173      System.out.println("******************************************");
174      System.out.println("Num threads    : " + numThreads);
175      System.out.println("Num procedures : " + numProcs);
176      System.out.println("Sync type      : " + syncType);
177      System.out.println("Time taken     : " + (timeTaken / 1000.0f) + "sec");
178      System.out.println("******************************************");
179      System.out.println("Raw format for scripts");
180      System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
181              + "total_time_ms=%s]",
182          NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
183          SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
184          NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
185      return EXIT_SUCCESS;
186    } catch (IOException e) {
187      e.printStackTrace();
188      return EXIT_FAILURE;
189    } finally {
190      tearDownProcedureStore();
191    }
192  }
193
194  ///////////////////////////////
195  // HELPER CLASSES
196  ///////////////////////////////
197
198  /**
199   * Callable to generate load for wal by inserting/deleting/updating procedures.
200   * If procedure store fails to roll log file (throws IOException), all threads quit, and at
201   * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
202   */
203  private final class Worker implements Callable<Integer> {
204    private final long start;
205
206    public Worker(long start) {
207      this.start = start;
208    }
209
210    // TODO: Can also collect #procs, time taken by each thread to measure fairness.
211    @Override
212    public Integer call() throws IOException {
213      while (true) {
214        if (workersFailed.get()) {
215          return EXIT_FAILURE;
216        }
217        long procId = procIds.getAndIncrement();
218        if (procId >= numProcs) {
219          break;
220        }
221        if (procId != 0 && procId % 10000 == 0) {
222          long ms = System.currentTimeMillis() - start;
223          System.out.println("Wrote " + procId + " procedures in "
224              + StringUtils.humanTimeDiff(ms));
225        }
226        try{
227          if (procId > 0 && procId % numProcsPerWal == 0) {
228            store.rollWriterForTesting();
229            System.out.println("Starting new log : "
230                + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
231          }
232        } catch (IOException ioe) {
233          // Ask other threads to quit too.
234          workersFailed.set(true);
235          System.err.println("Exception when rolling log file. Current procId = " + procId);
236          ioe.printStackTrace();
237          return EXIT_FAILURE;
238        }
239        ProcedureTestingUtility.TestProcedure proc =
240            new ProcedureTestingUtility.TestProcedure(procId);
241        proc.setData(serializedState);
242        store.insert(proc, null);
243        store.update(proc);
244      }
245      return EXIT_SUCCESS;
246    }
247  }
248
249  private static class NoSyncWalProcedureStore extends WALProcedureStore {
250    public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
251      super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
252        @Override
253        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
254          // no-op
255        }
256      });
257    }
258
259    @Override
260    protected void syncStream(FSDataOutputStream stream) {
261      // no-op
262    }
263  }
264
265  public static void main(String[] args) throws IOException {
266    ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
267    tool.setConf(UTIL.getConfiguration());
268    tool.run(args);
269  }
270}