001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2.store.wal;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Random;
027import java.util.Set;
028
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
033import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
034import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
035import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
036import org.apache.hadoop.hbase.procedure2.util.StringUtils;
037import org.apache.hadoop.hbase.util.AbstractHBaseTool;
038
039import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
040import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
041
042import static java.lang.System.currentTimeMillis;
043
044public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
045  protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
046
047  // Command line options and defaults.
048  public static int DEFAULT_NUM_PROCS = 1000000;  // 1M
049  public static Option NUM_PROCS_OPTION = new Option("procs", true,
050      "Total number of procedures. Default: " + DEFAULT_NUM_PROCS);
051  public static int DEFAULT_NUM_WALS = 0;
052  public static Option NUM_WALS_OPTION = new Option("wals", true,
053      "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
054          " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
055  public static int DEFAULT_STATE_SIZE = 1024;  // 1KB
056  public static Option STATE_SIZE_OPTION = new Option("state_size", true,
057      "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
058          + " bytes");
059  public static int DEFAULT_UPDATES_PER_PROC = 5;
060  public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true,
061      "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC);
062  public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50;
063  public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true,
064      "Fraction of procs for which to write delete state. Distribution of procs chosen for "
065          + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION);
066
067  public int numProcs;
068  public int updatesPerProc;
069  public double deleteProcsFraction;
070  public int numWals;
071  private WALProcedureStore store;
072  static byte[] serializedState;
073
074  private static class LoadCounter implements ProcedureStore.ProcedureLoader {
075    public LoadCounter() {}
076
077    @Override
078    public void setMaxProcId(long maxProcId) {
079    }
080
081    @Override
082    public void load(ProcedureIterator procIter) throws IOException {
083      while (procIter.hasNext()) {
084        procIter.next();
085      }
086    }
087
088    @Override
089    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
090      while (procIter.hasNext()) {
091        procIter.next();
092      }
093    }
094  }
095
096  @Override
097  protected void addOptions() {
098    addOption(NUM_PROCS_OPTION);
099    addOption(UPDATES_PER_PROC_OPTION);
100    addOption(DELETE_PROCS_FRACTION_OPTION);
101    addOption(NUM_WALS_OPTION);
102    addOption(STATE_SIZE_OPTION);
103  }
104
105  @Override
106  protected void processOptions(CommandLine cmd) {
107    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
108    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
109    int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
110    serializedState = new byte[stateSize];
111    updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(),
112        DEFAULT_UPDATES_PER_PROC);
113    deleteProcsFraction = getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(),
114        DEFAULT_DELETE_PROCS_FRACTION);
115    setupConf();
116  }
117
118  private void setupConf() {
119    if (numWals > 0) {
120      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
121    }
122  }
123
124  public void setUpProcedureStore() throws IOException {
125    Path testDir = UTIL.getDataTestDir();
126    FileSystem fs = testDir.getFileSystem(conf);
127    Path logDir = new Path(testDir, "proc-logs");
128    System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
129    fs.delete(logDir, true);
130    store = ProcedureTestingUtility.createWalStore(conf, logDir);
131    store.start(1);
132    store.recoverLease();
133    store.load(new LoadCounter());
134  }
135
136  /**
137   * @return a list of shuffled integers which represent state of proc id. First occurrence of a
138   * number denotes insert state, consecutive occurrences denote update states, and -ve value
139   * denotes delete state.
140   */
141  private List<Integer> shuffleProcWriteSequence() {
142    Random rand = new Random();
143    List<Integer> procStatesSequence = new ArrayList<>();
144    Set<Integer> toBeDeletedProcs = new HashSet<>();
145    // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add
146    // extra entry which is marked -ve in the loop after shuffle.
147    for (int procId  = 1; procId <= numProcs; ++procId) {
148      procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId));
149      if (rand.nextFloat() < deleteProcsFraction) {
150        procStatesSequence.add(procId);
151        toBeDeletedProcs.add(procId);
152      }
153    }
154    Collections.shuffle(procStatesSequence);
155    // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state.
156    for (int i = procStatesSequence.size() - 1; i >= 0; --i) {
157      int procId = procStatesSequence.get(i);
158      if (toBeDeletedProcs.contains(procId)) {
159        procStatesSequence.set(i, -1 * procId);
160        toBeDeletedProcs.remove(procId);
161      }
162    }
163    return procStatesSequence;
164  }
165
166  private void writeWals() throws IOException {
167    List<Integer> procStates = shuffleProcWriteSequence();
168    TestProcedure[] procs = new TestProcedure[numProcs + 1];  // 0 is not used.
169    int numProcsPerWal = numWals > 0 ? procStates.size() / numWals : Integer.MAX_VALUE;
170    long startTime = currentTimeMillis();
171    long lastTime = startTime;
172    for (int i = 0; i < procStates.size(); ++i) {
173      int procId = procStates.get(i);
174      if (procId < 0) {
175        store.delete(procs[-procId].getProcId());
176        procs[-procId] = null;
177      } else if (procs[procId] == null) {
178        procs[procId] = new TestProcedure(procId, 0);
179        procs[procId].setData(serializedState);
180        store.insert(procs[procId], null);
181      } else {
182        store.update(procs[procId]);
183      }
184      if (i > 0 && i % numProcsPerWal == 0) {
185        long currentTime = currentTimeMillis();
186        System.out.println("Forcing wall roll. Time taken on last WAL: " +
187            (currentTime - lastTime) / 1000.0f + " sec");
188        store.rollWriterForTesting();
189        lastTime = currentTime;
190      }
191    }
192    long timeTaken = currentTimeMillis() - startTime;
193    System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : "
194        + StringUtils.humanTimeDiff(timeTaken) + "\n\n");
195  }
196
197  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException {
198    System.out.println("Restarting procedure store to read back the WALs");
199    store.stop(false);
200    store.start(1);
201    store.recoverLease();
202
203    long startTime = currentTimeMillis();
204    store.load(loader);
205    long timeTaken = System.currentTimeMillis() - startTime;
206    System.out.println("******************************************");
207    System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec");
208    System.out.println("******************************************");
209    System.out.println("Raw format for scripts");
210        System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
211                + "total_time_ms=%s]",
212        NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length,
213        UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(),
214        deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
215  }
216
217  public void tearDownProcedureStore() {
218    store.stop(false);
219    try {
220      store.getFileSystem().delete(store.getWALDir(), true);
221    } catch (IOException e) {
222      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
223          + "disk space. Location: " + store.getWALDir().toString());
224      System.err.println(e.toString());
225    }
226  }
227
228  @Override
229  protected int doWork() {
230    try {
231      setUpProcedureStore();
232      writeWals();
233      storeRestart(new LoadCounter());
234      return EXIT_SUCCESS;
235    } catch (IOException e) {
236      e.printStackTrace();
237      return EXIT_FAILURE;
238    } finally {
239      tearDownProcedureStore();
240    }
241  }
242
243  public static void main(String[] args) throws IOException {
244    ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation();
245    tool.setConf(UTIL.getConfiguration());
246    tool.run(args);
247  }
248}