001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.ThreadLocalRandom;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
034import org.apache.hadoop.hbase.procedure2.util.StringUtils;
035import org.apache.hadoop.hbase.util.AbstractHBaseTool;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037
038import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
039import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
040
041public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
042  protected static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
043
044  // Command line options and defaults.
045  public static int DEFAULT_NUM_PROCS = 1000000; // 1M
046  public static Option NUM_PROCS_OPTION =
047    new Option("procs", true, "Total number of procedures. Default: " + DEFAULT_NUM_PROCS);
048  public static int DEFAULT_NUM_WALS = 0;
049  public static Option NUM_WALS_OPTION = new Option("wals", true,
050    "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY
051      + " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
052  public static int DEFAULT_STATE_SIZE = 1024; // 1KB
053  public static Option STATE_SIZE_OPTION =
054    new Option("state_size", true, "Size of serialized state in bytes to write on update. Default: "
055      + DEFAULT_STATE_SIZE + " bytes");
056  public static int DEFAULT_UPDATES_PER_PROC = 5;
057  public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true,
058    "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC);
059  public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50;
060  public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true,
061    "Fraction of procs for which to write delete state. Distribution of procs chosen for "
062      + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION);
063
064  public int numProcs;
065  public int updatesPerProc;
066  public double deleteProcsFraction;
067  public int numWals;
068  private WALProcedureStore store;
069  static byte[] serializedState;
070
071  private static class LoadCounter implements ProcedureStore.ProcedureLoader {
072    public LoadCounter() {
073    }
074
075    @Override
076    public void setMaxProcId(long maxProcId) {
077    }
078
079    @Override
080    public void load(ProcedureIterator procIter) throws IOException {
081      while (procIter.hasNext()) {
082        procIter.next();
083      }
084    }
085
086    @Override
087    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
088      while (procIter.hasNext()) {
089        procIter.next();
090      }
091    }
092  }
093
094  @Override
095  protected void addOptions() {
096    addOption(NUM_PROCS_OPTION);
097    addOption(UPDATES_PER_PROC_OPTION);
098    addOption(DELETE_PROCS_FRACTION_OPTION);
099    addOption(NUM_WALS_OPTION);
100    addOption(STATE_SIZE_OPTION);
101  }
102
103  @Override
104  protected void processOptions(CommandLine cmd) {
105    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
106    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
107    int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
108    serializedState = new byte[stateSize];
109    updatesPerProc =
110      getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(), DEFAULT_UPDATES_PER_PROC);
111    deleteProcsFraction =
112      getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(), DEFAULT_DELETE_PROCS_FRACTION);
113    setupConf();
114  }
115
116  private void setupConf() {
117    if (numWals > 0) {
118      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
119    }
120  }
121
122  public void setUpProcedureStore() throws IOException {
123    Path testDir = UTIL.getDataTestDir();
124    FileSystem fs = testDir.getFileSystem(conf);
125    Path logDir = new Path(testDir, "proc-logs");
126    System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
127    fs.delete(logDir, true);
128    store = ProcedureTestingUtility.createWalStore(conf, logDir);
129    store.start(1);
130    store.recoverLease();
131    store.load(new LoadCounter());
132  }
133
134  /**
135   * @return a list of shuffled integers which represent state of proc id. First occurrence of a
136   *         number denotes insert state, consecutive occurrences denote update states, and -ve
137   *         value denotes delete state.
138   */
139  private List<Integer> shuffleProcWriteSequence() {
140    List<Integer> procStatesSequence = new ArrayList<>();
141    Set<Integer> toBeDeletedProcs = new HashSet<>();
142    // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add
143    // extra entry which is marked -ve in the loop after shuffle.
144    for (int procId = 1; procId <= numProcs; ++procId) {
145      procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId));
146      if (ThreadLocalRandom.current().nextFloat() < deleteProcsFraction) {
147        procStatesSequence.add(procId);
148        toBeDeletedProcs.add(procId);
149      }
150    }
151    Collections.shuffle(procStatesSequence);
152    // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state.
153    for (int i = procStatesSequence.size() - 1; i >= 0; --i) {
154      int procId = procStatesSequence.get(i);
155      if (toBeDeletedProcs.contains(procId)) {
156        procStatesSequence.set(i, -1 * procId);
157        toBeDeletedProcs.remove(procId);
158      }
159    }
160    return procStatesSequence;
161  }
162
163  private void writeWals() throws IOException {
164    List<Integer> procStates = shuffleProcWriteSequence();
165    TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used.
166    int numProcsPerWal = numWals > 0 ? procStates.size() / numWals : Integer.MAX_VALUE;
167    long startTime = EnvironmentEdgeManager.currentTime();
168    long lastTime = startTime;
169    for (int i = 0; i < procStates.size(); ++i) {
170      int procId = procStates.get(i);
171      if (procId < 0) {
172        store.delete(procs[-procId].getProcId());
173        procs[-procId] = null;
174      } else if (procs[procId] == null) {
175        procs[procId] = new TestProcedure(procId, 0);
176        procs[procId].setData(serializedState);
177        store.insert(procs[procId], null);
178      } else {
179        store.update(procs[procId]);
180      }
181      if (i > 0 && i % numProcsPerWal == 0) {
182        long currentTime = EnvironmentEdgeManager.currentTime();
183        System.out.println("Forcing wall roll. Time taken on last WAL: "
184          + (currentTime - lastTime) / 1000.0f + " sec");
185        store.rollWriterForTesting();
186        lastTime = currentTime;
187      }
188    }
189    long timeTaken = EnvironmentEdgeManager.currentTime() - startTime;
190    System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : "
191      + StringUtils.humanTimeDiff(timeTaken) + "\n\n");
192  }
193
194  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException {
195    System.out.println("Restarting procedure store to read back the WALs");
196    store.stop(false);
197    store.start(1);
198    store.recoverLease();
199
200    long startTime = EnvironmentEdgeManager.currentTime();
201    store.load(loader);
202    long timeTaken = EnvironmentEdgeManager.currentTime() - startTime;
203    System.out.println("******************************************");
204    System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec");
205    System.out.println("******************************************");
206    System.out.println("Raw format for scripts");
207    System.out
208      .println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]",
209        NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length,
210        UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(),
211        deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
212  }
213
214  public void tearDownProcedureStore() {
215    store.stop(false);
216    try {
217      store.getFileSystem().delete(store.getWALDir(), true);
218    } catch (IOException e) {
219      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
220        + "disk space. Location: " + store.getWALDir().toString());
221      System.err.println(e.toString());
222    }
223  }
224
225  @Override
226  protected int doWork() {
227    try {
228      setUpProcedureStore();
229      writeWals();
230      storeRestart(new LoadCounter());
231      return EXIT_SUCCESS;
232    } catch (IOException e) {
233      e.printStackTrace();
234      return EXIT_FAILURE;
235    } finally {
236      tearDownProcedureStore();
237    }
238  }
239
240  public static void main(String[] args) throws IOException {
241    ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation();
242    tool.setConf(UTIL.getConfiguration());
243    tool.run(args);
244  }
245}