001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.concurrent.ThreadLocalRandom; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 034import org.apache.hadoop.hbase.procedure2.util.StringUtils; 035import org.apache.hadoop.hbase.util.AbstractHBaseTool; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037 038import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 039import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 040 041public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { 042 protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 043 044 // Command line options and defaults. 045 public static int DEFAULT_NUM_PROCS = 1000000; // 1M 046 public static Option NUM_PROCS_OPTION = 047 new Option("procs", true, "Total number of procedures. Default: " + DEFAULT_NUM_PROCS); 048 public static int DEFAULT_NUM_WALS = 0; 049 public static Option NUM_WALS_OPTION = new Option("wals", true, 050 "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY 051 + " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); 052 public static int DEFAULT_STATE_SIZE = 1024; // 1KB 053 public static Option STATE_SIZE_OPTION = 054 new Option("state_size", true, "Size of serialized state in bytes to write on update. Default: " 055 + DEFAULT_STATE_SIZE + " bytes"); 056 public static int DEFAULT_UPDATES_PER_PROC = 5; 057 public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true, 058 "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC); 059 public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50; 060 public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true, 061 "Fraction of procs for which to write delete state. Distribution of procs chosen for " 062 + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION); 063 064 public int numProcs; 065 public int updatesPerProc; 066 public double deleteProcsFraction; 067 public int numWals; 068 private WALProcedureStore store; 069 static byte[] serializedState; 070 071 private static class LoadCounter implements ProcedureStore.ProcedureLoader { 072 public LoadCounter() { 073 } 074 075 @Override 076 public void setMaxProcId(long maxProcId) { 077 } 078 079 @Override 080 public void load(ProcedureIterator procIter) throws IOException { 081 while (procIter.hasNext()) { 082 procIter.next(); 083 } 084 } 085 086 @Override 087 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 088 while (procIter.hasNext()) { 089 procIter.next(); 090 } 091 } 092 } 093 094 @Override 095 protected void addOptions() { 096 addOption(NUM_PROCS_OPTION); 097 addOption(UPDATES_PER_PROC_OPTION); 098 addOption(DELETE_PROCS_FRACTION_OPTION); 099 addOption(NUM_WALS_OPTION); 100 addOption(STATE_SIZE_OPTION); 101 } 102 103 @Override 104 protected void processOptions(CommandLine cmd) { 105 numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); 106 numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); 107 int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); 108 serializedState = new byte[stateSize]; 109 updatesPerProc = 110 getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(), DEFAULT_UPDATES_PER_PROC); 111 deleteProcsFraction = 112 getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(), DEFAULT_DELETE_PROCS_FRACTION); 113 setupConf(); 114 } 115 116 private void setupConf() { 117 if (numWals > 0) { 118 conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); 119 } 120 } 121 122 public void setUpProcedureStore() throws IOException { 123 Path testDir = UTIL.getDataTestDir(); 124 FileSystem fs = testDir.getFileSystem(conf); 125 Path logDir = new Path(testDir, "proc-logs"); 126 System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); 127 fs.delete(logDir, true); 128 store = ProcedureTestingUtility.createWalStore(conf, logDir); 129 store.start(1); 130 store.recoverLease(); 131 store.load(new LoadCounter()); 132 } 133 134 /** 135 * @return a list of shuffled integers which represent state of proc id. First occurrence of a 136 * number denotes insert state, consecutive occurrences denote update states, and -ve 137 * value denotes delete state. 138 */ 139 private List<Integer> shuffleProcWriteSequence() { 140 List<Integer> procStatesSequence = new ArrayList<>(); 141 Set<Integer> toBeDeletedProcs = new HashSet<>(); 142 // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add 143 // extra entry which is marked -ve in the loop after shuffle. 144 for (int procId = 1; procId <= numProcs; ++procId) { 145 procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId)); 146 if (ThreadLocalRandom.current().nextFloat() < deleteProcsFraction) { 147 procStatesSequence.add(procId); 148 toBeDeletedProcs.add(procId); 149 } 150 } 151 Collections.shuffle(procStatesSequence); 152 // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state. 153 for (int i = procStatesSequence.size() - 1; i >= 0; --i) { 154 int procId = procStatesSequence.get(i); 155 if (toBeDeletedProcs.contains(procId)) { 156 procStatesSequence.set(i, -1 * procId); 157 toBeDeletedProcs.remove(procId); 158 } 159 } 160 return procStatesSequence; 161 } 162 163 private void writeWals() throws IOException { 164 List<Integer> procStates = shuffleProcWriteSequence(); 165 TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used. 166 int numProcsPerWal = numWals > 0 ? procStates.size() / numWals : Integer.MAX_VALUE; 167 long startTime = EnvironmentEdgeManager.currentTime(); 168 long lastTime = startTime; 169 for (int i = 0; i < procStates.size(); ++i) { 170 int procId = procStates.get(i); 171 if (procId < 0) { 172 store.delete(procs[-procId].getProcId()); 173 procs[-procId] = null; 174 } else if (procs[procId] == null) { 175 procs[procId] = new TestProcedure(procId, 0); 176 procs[procId].setData(serializedState); 177 store.insert(procs[procId], null); 178 } else { 179 store.update(procs[procId]); 180 } 181 if (i > 0 && i % numProcsPerWal == 0) { 182 long currentTime = EnvironmentEdgeManager.currentTime(); 183 System.out.println("Forcing wall roll. Time taken on last WAL: " 184 + (currentTime - lastTime) / 1000.0f + " sec"); 185 store.rollWriterForTesting(); 186 lastTime = currentTime; 187 } 188 } 189 long timeTaken = EnvironmentEdgeManager.currentTime() - startTime; 190 System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : " 191 + StringUtils.humanTimeDiff(timeTaken) + "\n\n"); 192 } 193 194 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException { 195 System.out.println("Restarting procedure store to read back the WALs"); 196 store.stop(false); 197 store.start(1); 198 store.recoverLease(); 199 200 long startTime = EnvironmentEdgeManager.currentTime(); 201 store.load(loader); 202 long timeTaken = EnvironmentEdgeManager.currentTime() - startTime; 203 System.out.println("******************************************"); 204 System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec"); 205 System.out.println("******************************************"); 206 System.out.println("Raw format for scripts"); 207 System.out 208 .println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]", 209 NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length, 210 UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(), 211 deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken)); 212 } 213 214 public void tearDownProcedureStore() { 215 store.stop(false); 216 try { 217 store.getFileSystem().delete(store.getWALDir(), true); 218 } catch (IOException e) { 219 System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " 220 + "disk space. Location: " + store.getWALDir().toString()); 221 System.err.println(e.toString()); 222 } 223 } 224 225 @Override 226 protected int doWork() { 227 try { 228 setUpProcedureStore(); 229 writeWals(); 230 storeRestart(new LoadCounter()); 231 return EXIT_SUCCESS; 232 } catch (IOException e) { 233 e.printStackTrace(); 234 return EXIT_FAILURE; 235 } finally { 236 tearDownProcedureStore(); 237 } 238 } 239 240 public static void main(String[] args) throws IOException { 241 ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation(); 242 tool.setConf(UTIL.getConfiguration()); 243 tool.run(args); 244 } 245}