001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2.store.wal; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Random; 027import java.util.Set; 028 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 033import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 034import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 035import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 036import org.apache.hadoop.hbase.procedure2.util.StringUtils; 037import org.apache.hadoop.hbase.util.AbstractHBaseTool; 038 039import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 040import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 041 042import static java.lang.System.currentTimeMillis; 043 044public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { 045 protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 046 047 // Command line options and defaults. 048 public static int DEFAULT_NUM_PROCS = 1000000; // 1M 049 public static Option NUM_PROCS_OPTION = new Option("procs", true, 050 "Total number of procedures. Default: " + DEFAULT_NUM_PROCS); 051 public static int DEFAULT_NUM_WALS = 0; 052 public static Option NUM_WALS_OPTION = new Option("wals", true, 053 "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + 054 " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); 055 public static int DEFAULT_STATE_SIZE = 1024; // 1KB 056 public static Option STATE_SIZE_OPTION = new Option("state_size", true, 057 "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE 058 + " bytes"); 059 public static int DEFAULT_UPDATES_PER_PROC = 5; 060 public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true, 061 "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC); 062 public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50; 063 public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true, 064 "Fraction of procs for which to write delete state. Distribution of procs chosen for " 065 + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION); 066 067 public int numProcs; 068 public int updatesPerProc; 069 public double deleteProcsFraction; 070 public int numWals; 071 private WALProcedureStore store; 072 static byte[] serializedState; 073 074 private static class LoadCounter implements ProcedureStore.ProcedureLoader { 075 public LoadCounter() {} 076 077 @Override 078 public void setMaxProcId(long maxProcId) { 079 } 080 081 @Override 082 public void load(ProcedureIterator procIter) throws IOException { 083 while (procIter.hasNext()) { 084 procIter.next(); 085 } 086 } 087 088 @Override 089 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 090 while (procIter.hasNext()) { 091 procIter.next(); 092 } 093 } 094 } 095 096 @Override 097 protected void addOptions() { 098 addOption(NUM_PROCS_OPTION); 099 addOption(UPDATES_PER_PROC_OPTION); 100 addOption(DELETE_PROCS_FRACTION_OPTION); 101 addOption(NUM_WALS_OPTION); 102 addOption(STATE_SIZE_OPTION); 103 } 104 105 @Override 106 protected void processOptions(CommandLine cmd) { 107 numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); 108 numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); 109 int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); 110 serializedState = new byte[stateSize]; 111 updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(), 112 DEFAULT_UPDATES_PER_PROC); 113 deleteProcsFraction = getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(), 114 DEFAULT_DELETE_PROCS_FRACTION); 115 setupConf(); 116 } 117 118 private void setupConf() { 119 if (numWals > 0) { 120 conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); 121 } 122 } 123 124 public void setUpProcedureStore() throws IOException { 125 Path testDir = UTIL.getDataTestDir(); 126 FileSystem fs = testDir.getFileSystem(conf); 127 Path logDir = new Path(testDir, "proc-logs"); 128 System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); 129 fs.delete(logDir, true); 130 store = ProcedureTestingUtility.createWalStore(conf, logDir); 131 store.start(1); 132 store.recoverLease(); 133 store.load(new LoadCounter()); 134 } 135 136 /** 137 * @return a list of shuffled integers which represent state of proc id. First occurrence of a 138 * number denotes insert state, consecutive occurrences denote update states, and -ve value 139 * denotes delete state. 140 */ 141 private List<Integer> shuffleProcWriteSequence() { 142 Random rand = new Random(); 143 List<Integer> procStatesSequence = new ArrayList<>(); 144 Set<Integer> toBeDeletedProcs = new HashSet<>(); 145 // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add 146 // extra entry which is marked -ve in the loop after shuffle. 147 for (int procId = 1; procId <= numProcs; ++procId) { 148 procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId)); 149 if (rand.nextFloat() < deleteProcsFraction) { 150 procStatesSequence.add(procId); 151 toBeDeletedProcs.add(procId); 152 } 153 } 154 Collections.shuffle(procStatesSequence); 155 // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state. 156 for (int i = procStatesSequence.size() - 1; i >= 0; --i) { 157 int procId = procStatesSequence.get(i); 158 if (toBeDeletedProcs.contains(procId)) { 159 procStatesSequence.set(i, -1 * procId); 160 toBeDeletedProcs.remove(procId); 161 } 162 } 163 return procStatesSequence; 164 } 165 166 private void writeWals() throws IOException { 167 List<Integer> procStates = shuffleProcWriteSequence(); 168 TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used. 169 int numProcsPerWal = numWals > 0 ? procStates.size() / numWals : Integer.MAX_VALUE; 170 long startTime = currentTimeMillis(); 171 long lastTime = startTime; 172 for (int i = 0; i < procStates.size(); ++i) { 173 int procId = procStates.get(i); 174 if (procId < 0) { 175 store.delete(procs[-procId].getProcId()); 176 procs[-procId] = null; 177 } else if (procs[procId] == null) { 178 procs[procId] = new TestProcedure(procId, 0); 179 procs[procId].setData(serializedState); 180 store.insert(procs[procId], null); 181 } else { 182 store.update(procs[procId]); 183 } 184 if (i > 0 && i % numProcsPerWal == 0) { 185 long currentTime = currentTimeMillis(); 186 System.out.println("Forcing wall roll. Time taken on last WAL: " + 187 (currentTime - lastTime) / 1000.0f + " sec"); 188 store.rollWriterForTesting(); 189 lastTime = currentTime; 190 } 191 } 192 long timeTaken = currentTimeMillis() - startTime; 193 System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : " 194 + StringUtils.humanTimeDiff(timeTaken) + "\n\n"); 195 } 196 197 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException { 198 System.out.println("Restarting procedure store to read back the WALs"); 199 store.stop(false); 200 store.start(1); 201 store.recoverLease(); 202 203 long startTime = currentTimeMillis(); 204 store.load(loader); 205 long timeTaken = System.currentTimeMillis() - startTime; 206 System.out.println("******************************************"); 207 System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec"); 208 System.out.println("******************************************"); 209 System.out.println("Raw format for scripts"); 210 System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " 211 + "total_time_ms=%s]", 212 NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length, 213 UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(), 214 deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken)); 215 } 216 217 public void tearDownProcedureStore() { 218 store.stop(false); 219 try { 220 store.getFileSystem().delete(store.getWALDir(), true); 221 } catch (IOException e) { 222 System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " 223 + "disk space. Location: " + store.getWALDir().toString()); 224 System.err.println(e.toString()); 225 } 226 } 227 228 @Override 229 protected int doWork() { 230 try { 231 setUpProcedureStore(); 232 writeWals(); 233 storeRestart(new LoadCounter()); 234 return EXIT_SUCCESS; 235 } catch (IOException e) { 236 e.printStackTrace(); 237 return EXIT_FAILURE; 238 } finally { 239 tearDownProcedureStore(); 240 } 241 } 242 243 public static void main(String[] args) throws IOException { 244 ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation(); 245 tool.setConf(UTIL.getConfiguration()); 246 tool.run(args); 247 } 248}