001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static java.lang.System.currentTimeMillis; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Random; 028import java.util.Set; 029 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 033import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 034import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 035import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 037import org.apache.hadoop.hbase.procedure2.util.StringUtils; 038import org.apache.hadoop.hbase.util.AbstractHBaseTool; 039 040import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 041import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 042 043public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { 044 protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 045 046 // Command line options and defaults. 047 public static int DEFAULT_NUM_PROCS = 1000000; // 1M 048 public static Option NUM_PROCS_OPTION = new Option("procs", true, 049 "Total number of procedures. Default: " + DEFAULT_NUM_PROCS); 050 public static int DEFAULT_NUM_WALS = 0; 051 public static Option NUM_WALS_OPTION = new Option("wals", true, 052 "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + 053 " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); 054 public static int DEFAULT_STATE_SIZE = 1024; // 1KB 055 public static Option STATE_SIZE_OPTION = new Option("state_size", true, 056 "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE 057 + " bytes"); 058 public static int DEFAULT_UPDATES_PER_PROC = 5; 059 public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true, 060 "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC); 061 public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50; 062 public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true, 063 "Fraction of procs for which to write delete state. Distribution of procs chosen for " 064 + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION); 065 066 public int numProcs; 067 public int updatesPerProc; 068 public double deleteProcsFraction; 069 public int numWals; 070 private WALProcedureStore store; 071 static byte[] serializedState; 072 073 private static class LoadCounter implements ProcedureStore.ProcedureLoader { 074 public LoadCounter() {} 075 076 @Override 077 public void setMaxProcId(long maxProcId) { 078 } 079 080 @Override 081 public void load(ProcedureIterator procIter) throws IOException { 082 while (procIter.hasNext()) { 083 procIter.next(); 084 } 085 } 086 087 @Override 088 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 089 while (procIter.hasNext()) { 090 procIter.next(); 091 } 092 } 093 } 094 095 @Override 096 protected void addOptions() { 097 addOption(NUM_PROCS_OPTION); 098 addOption(UPDATES_PER_PROC_OPTION); 099 addOption(DELETE_PROCS_FRACTION_OPTION); 100 addOption(NUM_WALS_OPTION); 101 addOption(STATE_SIZE_OPTION); 102 } 103 104 @Override 105 protected void processOptions(CommandLine cmd) { 106 numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); 107 numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); 108 int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); 109 serializedState = new byte[stateSize]; 110 updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(), 111 DEFAULT_UPDATES_PER_PROC); 112 deleteProcsFraction = getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(), 113 DEFAULT_DELETE_PROCS_FRACTION); 114 setupConf(); 115 } 116 117 private void setupConf() { 118 if (numWals > 0) { 119 conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); 120 } 121 } 122 123 public void setUpProcedureStore() throws IOException { 124 Path testDir = UTIL.getDataTestDir(); 125 FileSystem fs = testDir.getFileSystem(conf); 126 Path logDir = new Path(testDir, "proc-logs"); 127 System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); 128 fs.delete(logDir, true); 129 store = ProcedureTestingUtility.createWalStore(conf, logDir); 130 store.start(1); 131 store.recoverLease(); 132 store.load(new LoadCounter()); 133 } 134 135 /** 136 * @return a list of shuffled integers which represent state of proc id. First occurrence of a 137 * number denotes insert state, consecutive occurrences denote update states, and -ve 138 * value denotes delete state. 139 */ 140 private List<Integer> shuffleProcWriteSequence() { 141 Random rand = new Random(); 142 List<Integer> procStatesSequence = new ArrayList<>(); 143 Set<Integer> toBeDeletedProcs = new HashSet<>(); 144 // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add 145 // extra entry which is marked -ve in the loop after shuffle. 146 for (int procId = 1; procId <= numProcs; ++procId) { 147 procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId)); 148 if (rand.nextFloat() < deleteProcsFraction) { 149 procStatesSequence.add(procId); 150 toBeDeletedProcs.add(procId); 151 } 152 } 153 Collections.shuffle(procStatesSequence); 154 // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state. 155 for (int i = procStatesSequence.size() - 1; i >= 0; --i) { 156 int procId = procStatesSequence.get(i); 157 if (toBeDeletedProcs.contains(procId)) { 158 procStatesSequence.set(i, -1 * procId); 159 toBeDeletedProcs.remove(procId); 160 } 161 } 162 return procStatesSequence; 163 } 164 165 private void writeWals() throws IOException { 166 List<Integer> procStates = shuffleProcWriteSequence(); 167 TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used. 168 int numProcsPerWal = numWals > 0 ? procStates.size() / numWals : Integer.MAX_VALUE; 169 long startTime = currentTimeMillis(); 170 long lastTime = startTime; 171 for (int i = 0; i < procStates.size(); ++i) { 172 int procId = procStates.get(i); 173 if (procId < 0) { 174 store.delete(procs[-procId].getProcId()); 175 procs[-procId] = null; 176 } else if (procs[procId] == null) { 177 procs[procId] = new TestProcedure(procId, 0); 178 procs[procId].setData(serializedState); 179 store.insert(procs[procId], null); 180 } else { 181 store.update(procs[procId]); 182 } 183 if (i > 0 && i % numProcsPerWal == 0) { 184 long currentTime = currentTimeMillis(); 185 System.out.println("Forcing wall roll. Time taken on last WAL: " + 186 (currentTime - lastTime) / 1000.0f + " sec"); 187 store.rollWriterForTesting(); 188 lastTime = currentTime; 189 } 190 } 191 long timeTaken = currentTimeMillis() - startTime; 192 System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : " 193 + StringUtils.humanTimeDiff(timeTaken) + "\n\n"); 194 } 195 196 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException { 197 System.out.println("Restarting procedure store to read back the WALs"); 198 store.stop(false); 199 store.start(1); 200 store.recoverLease(); 201 202 long startTime = currentTimeMillis(); 203 store.load(loader); 204 long timeTaken = System.currentTimeMillis() - startTime; 205 System.out.println("******************************************"); 206 System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec"); 207 System.out.println("******************************************"); 208 System.out.println("Raw format for scripts"); 209 System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " 210 + "total_time_ms=%s]", 211 NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length, 212 UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(), 213 deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken)); 214 } 215 216 public void tearDownProcedureStore() { 217 store.stop(false); 218 try { 219 store.getFileSystem().delete(store.getWALDir(), true); 220 } catch (IOException e) { 221 System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " 222 + "disk space. Location: " + store.getWALDir().toString()); 223 System.err.println(e.toString()); 224 } 225 } 226 227 @Override 228 protected int doWork() { 229 try { 230 setUpProcedureStore(); 231 writeWals(); 232 storeRestart(new LoadCounter()); 233 return EXIT_SUCCESS; 234 } catch (IOException e) { 235 e.printStackTrace(); 236 return EXIT_FAILURE; 237 } finally { 238 tearDownProcedureStore(); 239 } 240 } 241 242 public static void main(String[] args) throws IOException { 243 ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation(); 244 tool.setConf(UTIL.getConfiguration()); 245 tool.run(args); 246 } 247}