001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store;
019
020import java.io.IOException;
021import java.util.concurrent.Callable;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Executors;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
031import org.apache.hadoop.hbase.procedure2.util.StringUtils;
032import org.apache.hadoop.hbase.util.AbstractHBaseTool;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035
036import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
037import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
038
039/**
040 * Base class for testing procedure store performance.
041 */
042public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureStore>
043  extends AbstractHBaseTool {
044
045  // Command line options and defaults.
046  public static String DEFAULT_OUTPUT_PATH = "proc-store";
047
048  public static Option OUTPUT_PATH_OPTION =
049    new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH);
050
051  public static int DEFAULT_NUM_THREADS = 20;
052
053  public static Option NUM_THREADS_OPTION = new Option("threads", true,
054    "Number of parallel threads which will write insert/updates/deletes to store. Default: "
055      + DEFAULT_NUM_THREADS);
056
057  public static int DEFAULT_NUM_PROCS = 1000000; // 1M
058
059  public static Option NUM_PROCS_OPTION = new Option("procs", true,
060    "Total number of procedures. Each procedure writes one insert and one update. Default: "
061      + DEFAULT_NUM_PROCS);
062
063  public static int DEFAULT_STATE_SIZE = 1024; // 1KB
064
065  public static Option STATE_SIZE_OPTION =
066    new Option("state_size", true, "Size of serialized state in bytes to write on update. Default: "
067      + DEFAULT_STATE_SIZE + "bytes");
068
069  public static Option SYNC_OPTION = new Option("sync", true,
070    "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, "
071      + "hsync, nosync. Default: hflush");
072
073  public static String DEFAULT_SYNC_OPTION = "hflush";
074
075  protected String outputPath;
076  protected int numThreads;
077  protected long numProcs;
078  protected String syncType;
079  protected int stateSize;
080  protected static byte[] SERIALIZED_STATE;
081
082  protected T store;
083
084  /** Used by {@link Worker}. */
085  private AtomicLong procIds = new AtomicLong(0);
086  private AtomicBoolean workersFailed = new AtomicBoolean(false);
087
088  // Timeout for worker threads.
089  private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
090
091  @Override
092  protected void addOptions() {
093    addOption(OUTPUT_PATH_OPTION);
094    addOption(NUM_THREADS_OPTION);
095    addOption(NUM_PROCS_OPTION);
096    addOption(SYNC_OPTION);
097    addOption(STATE_SIZE_OPTION);
098  }
099
100  @Override
101  protected void processOptions(CommandLine cmd) {
102    outputPath = cmd.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH);
103    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
104    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
105    syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
106    assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType)
107      : "sync argument can only accept one of these three values: hsync, hflush, nosync";
108    stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
109    SERIALIZED_STATE = new byte[stateSize];
110    Bytes.random(SERIALIZED_STATE);
111  }
112
113  private void setUpProcedureStore() throws IOException {
114    FileSystem fs = FileSystem.get(conf);
115    Path storeDir = fs.makeQualified(new Path(outputPath));
116    System.out.println("Procedure store directory : " + storeDir.toString());
117    fs.delete(storeDir, true);
118    store = createProcedureStore(storeDir);
119    store.start(numThreads);
120    store.recoverLease();
121    store.load(new ProcedureTestingUtility.LoadCounter());
122    System.out.println("Starting new procedure store: " + store.getClass().getSimpleName());
123  }
124
125  protected abstract T createProcedureStore(Path storeDir) throws IOException;
126
127  protected void postStop(T store) throws IOException {
128  }
129
130  private void tearDownProcedureStore() {
131    Path storeDir = null;
132    try {
133      if (store != null) {
134        store.stop(false);
135        postStop(store);
136      }
137      FileSystem fs = FileSystem.get(conf);
138      storeDir = fs.makeQualified(new Path(outputPath));
139      fs.delete(storeDir, true);
140    } catch (IOException e) {
141      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
142        + "disk space. Location: " + storeDir);
143      e.printStackTrace();
144    }
145  }
146
147  protected abstract void printRawFormatResult(long timeTakenNs);
148
149  @Override
150  protected int doWork() throws Exception {
151    try {
152      setUpProcedureStore();
153      ExecutorService executor = Executors.newFixedThreadPool(numThreads);
154      Future<?>[] futures = new Future<?>[numThreads];
155      // Start worker threads.
156      long start = System.nanoTime();
157      for (int i = 0; i < numThreads; i++) {
158        futures[i] = executor.submit(new Worker(start));
159      }
160      boolean failure = false;
161      try {
162        for (Future<?> future : futures) {
163          long timeout =
164            start + WORKER_THREADS_TIMEOUT_SEC * 1000 - EnvironmentEdgeManager.currentTime();
165          failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
166        }
167      } catch (Exception e) {
168        System.err.println("Exception in worker thread.");
169        e.printStackTrace();
170        return EXIT_FAILURE;
171      }
172      executor.shutdown();
173      if (failure) {
174        return EXIT_FAILURE;
175      }
176      long timeTakenNs = System.nanoTime() - start;
177      System.out.println("******************************************");
178      System.out.println("Num threads    : " + numThreads);
179      System.out.println("Num procedures : " + numProcs);
180      System.out.println("Sync type      : " + syncType);
181      System.out.println("Time taken     : " + TimeUnit.NANOSECONDS.toSeconds(timeTakenNs) + "sec");
182      System.out.println("******************************************");
183      System.out.println("Raw format for scripts");
184      printRawFormatResult(timeTakenNs);
185      return EXIT_SUCCESS;
186    } catch (IOException e) {
187      e.printStackTrace();
188      return EXIT_FAILURE;
189    } finally {
190      tearDownProcedureStore();
191    }
192  }
193
194  ///////////////////////////////
195  // HELPER CLASSES
196  ///////////////////////////////
197
198  /**
199   * Callable to generate load for wal by inserting/deleting/updating procedures. If procedure store
200   * fails to roll log file (throws IOException), all threads quit, and at least one returns value
201   * of {@link AbstractHBaseTool#EXIT_FAILURE}.
202   */
203  private final class Worker implements Callable<Integer> {
204    private final long start;
205
206    public Worker(long start) {
207      this.start = start;
208    }
209
210    // TODO: Can also collect #procs, time taken by each thread to measure fairness.
211    @Override
212    public Integer call() throws IOException {
213      while (true) {
214        if (workersFailed.get()) {
215          return EXIT_FAILURE;
216        }
217        long procId = procIds.getAndIncrement();
218        if (procId >= numProcs) {
219          break;
220        }
221        if (procId != 0 && procId % 10000 == 0) {
222          long ns = System.nanoTime() - start;
223          System.out.println("Wrote " + procId + " procedures in "
224            + StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(ns)));
225        }
226        try {
227          preWrite(procId);
228        } catch (IOException ioe) {
229          // Ask other threads to quit too.
230          workersFailed.set(true);
231          System.err.println("Exception when rolling log file. Current procId = " + procId);
232          ioe.printStackTrace();
233          return EXIT_FAILURE;
234        }
235        ProcedureTestingUtility.TestProcedure proc =
236          new ProcedureTestingUtility.TestProcedure(procId);
237        proc.setData(SERIALIZED_STATE);
238        store.insert(proc, null);
239        store.update(proc);
240      }
241      return EXIT_SUCCESS;
242    }
243  }
244
245  protected abstract void preWrite(long procId) throws IOException;
246}