001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store;
019
020import java.io.IOException;
021import java.util.Random;
022import java.util.concurrent.Callable;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import java.util.concurrent.Future;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
032import org.apache.hadoop.hbase.procedure2.util.StringUtils;
033import org.apache.hadoop.hbase.util.AbstractHBaseTool;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
036import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
037
038/**
039 * Base class for testing procedure store performance.
040 */
041public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureStore>
042  extends AbstractHBaseTool {
043
044  // Command line options and defaults.
045  public static String DEFAULT_OUTPUT_PATH = "proc-store";
046
047  public static Option OUTPUT_PATH_OPTION =
048    new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH);
049
050  public static int DEFAULT_NUM_THREADS = 20;
051
052  public static Option NUM_THREADS_OPTION = new Option("threads", true,
053    "Number of parallel threads which will write insert/updates/deletes to store. Default: " +
054      DEFAULT_NUM_THREADS);
055
056  public static int DEFAULT_NUM_PROCS = 1000000; // 1M
057
058  public static Option NUM_PROCS_OPTION = new Option("procs", true,
059    "Total number of procedures. Each procedure writes one insert and one update. Default: " +
060      DEFAULT_NUM_PROCS);
061
062  public static int DEFAULT_STATE_SIZE = 1024; // 1KB
063
064  public static Option STATE_SIZE_OPTION = new Option("state_size", true,
065    "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE +
066      "bytes");
067
068  public static Option SYNC_OPTION = new Option("sync", true,
069    "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " +
070      "hsync, nosync. Default: hflush");
071
072  public static String DEFAULT_SYNC_OPTION = "hflush";
073
074  protected String outputPath;
075  protected int numThreads;
076  protected long numProcs;
077  protected String syncType;
078  protected int stateSize;
079  protected static byte[] SERIALIZED_STATE;
080
081  protected T store;
082
083  /** Used by {@link Worker}. */
084  private AtomicLong procIds = new AtomicLong(0);
085  private AtomicBoolean workersFailed = new AtomicBoolean(false);
086
087  // Timeout for worker threads.
088  private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
089
090  @Override
091  protected void addOptions() {
092    addOption(OUTPUT_PATH_OPTION);
093    addOption(NUM_THREADS_OPTION);
094    addOption(NUM_PROCS_OPTION);
095    addOption(SYNC_OPTION);
096    addOption(STATE_SIZE_OPTION);
097  }
098
099  @Override
100  protected void processOptions(CommandLine cmd) {
101    outputPath = cmd.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH);
102    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
103    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
104    syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
105    assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(
106      syncType) : "sync argument can only accept one of these three values: hsync, hflush, nosync";
107    stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
108    SERIALIZED_STATE = new byte[stateSize];
109    new Random(12345).nextBytes(SERIALIZED_STATE);
110  }
111
112  private void setUpProcedureStore() throws IOException {
113    FileSystem fs = FileSystem.get(conf);
114    Path storeDir = fs.makeQualified(new Path(outputPath));
115    System.out.println("Procedure store directory : " + storeDir.toString());
116    fs.delete(storeDir, true);
117    store = createProcedureStore(storeDir);
118    store.start(numThreads);
119    store.recoverLease();
120    store.load(new ProcedureTestingUtility.LoadCounter());
121    System.out.println("Starting new procedure store: " + store.getClass().getSimpleName());
122  }
123
124  protected abstract T createProcedureStore(Path storeDir) throws IOException;
125
126  protected void postStop(T store) throws IOException {
127  }
128
129  private void tearDownProcedureStore() {
130    Path storeDir = null;
131    try {
132      if (store != null) {
133        store.stop(false);
134        postStop(store);
135      }
136      FileSystem fs = FileSystem.get(conf);
137      storeDir = fs.makeQualified(new Path(outputPath));
138      fs.delete(storeDir, true);
139    } catch (IOException e) {
140      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " +
141        "disk space. Location: " + storeDir);
142      e.printStackTrace();
143    }
144  }
145
146  protected abstract void printRawFormatResult(long timeTakenNs);
147
148  @Override
149  protected int doWork() throws Exception {
150    try {
151      setUpProcedureStore();
152      ExecutorService executor = Executors.newFixedThreadPool(numThreads);
153      Future<?>[] futures = new Future<?>[numThreads];
154      // Start worker threads.
155      long start = System.nanoTime();
156      for (int i = 0; i < numThreads; i++) {
157        futures[i] = executor.submit(new Worker(start));
158      }
159      boolean failure = false;
160      try {
161        for (Future<?> future : futures) {
162          long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 -
163            EnvironmentEdgeManager.currentTime();
164          failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
165        }
166      } catch (Exception e) {
167        System.err.println("Exception in worker thread.");
168        e.printStackTrace();
169        return EXIT_FAILURE;
170      }
171      executor.shutdown();
172      if (failure) {
173        return EXIT_FAILURE;
174      }
175      long timeTakenNs = System.nanoTime() - start;
176      System.out.println("******************************************");
177      System.out.println("Num threads    : " + numThreads);
178      System.out.println("Num procedures : " + numProcs);
179      System.out.println("Sync type      : " + syncType);
180      System.out.println("Time taken     : " + TimeUnit.NANOSECONDS.toSeconds(timeTakenNs) + "sec");
181      System.out.println("******************************************");
182      System.out.println("Raw format for scripts");
183      printRawFormatResult(timeTakenNs);
184      return EXIT_SUCCESS;
185    } catch (IOException e) {
186      e.printStackTrace();
187      return EXIT_FAILURE;
188    } finally {
189      tearDownProcedureStore();
190    }
191  }
192
193  ///////////////////////////////
194  // HELPER CLASSES
195  ///////////////////////////////
196
197  /**
198   * Callable to generate load for wal by inserting/deleting/updating procedures. If procedure store
199   * fails to roll log file (throws IOException), all threads quit, and at least one returns value
200   * of {@link AbstractHBaseTool#EXIT_FAILURE}.
201   */
202  private final class Worker implements Callable<Integer> {
203    private final long start;
204
205    public Worker(long start) {
206      this.start = start;
207    }
208
209    // TODO: Can also collect #procs, time taken by each thread to measure fairness.
210    @Override
211    public Integer call() throws IOException {
212      while (true) {
213        if (workersFailed.get()) {
214          return EXIT_FAILURE;
215        }
216        long procId = procIds.getAndIncrement();
217        if (procId >= numProcs) {
218          break;
219        }
220        if (procId != 0 && procId % 10000 == 0) {
221          long ns = System.nanoTime() - start;
222          System.out.println("Wrote " + procId + " procedures in " +
223            StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(ns)));
224        }
225        try {
226          preWrite(procId);
227        } catch (IOException ioe) {
228          // Ask other threads to quit too.
229          workersFailed.set(true);
230          System.err.println("Exception when rolling log file. Current procId = " + procId);
231          ioe.printStackTrace();
232          return EXIT_FAILURE;
233        }
234        ProcedureTestingUtility.TestProcedure proc =
235          new ProcedureTestingUtility.TestProcedure(procId);
236        proc.setData(SERIALIZED_STATE);
237        store.insert(proc, null);
238        store.update(proc);
239      }
240      return EXIT_SUCCESS;
241    }
242  }
243
244  protected abstract void preWrite(long procId) throws IOException;
245}