001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2.store.wal; 020 021import java.io.IOException; 022import java.util.concurrent.Callable; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import java.util.concurrent.Future; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicLong; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 035import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 036 037import org.apache.hadoop.hbase.procedure2.util.StringUtils; 038import org.apache.hadoop.hbase.util.AbstractHBaseTool; 039 040import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 041import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 042 043public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { 044 protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 045 046 // Command line options and defaults. 047 public static int DEFAULT_NUM_THREADS = 20; 048 public static Option NUM_THREADS_OPTION = new Option("threads", true, 049 "Number of parallel threads which will write insert/updates/deletes to WAL. Default: " 050 + DEFAULT_NUM_THREADS); 051 public static int DEFAULT_NUM_PROCS = 1000000; // 1M 052 public static Option NUM_PROCS_OPTION = new Option("procs", true, 053 "Total number of procedures. Each procedure writes one insert and one update. Default: " 054 + DEFAULT_NUM_PROCS); 055 public static int DEFAULT_NUM_WALS = 0; 056 public static Option NUM_WALS_OPTION = new Option("wals", true, 057 "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + 058 " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); 059 public static int DEFAULT_STATE_SIZE = 1024; // 1KB 060 public static Option STATE_SIZE_OPTION = new Option("state_size", true, 061 "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE 062 + "bytes"); 063 public static Option SYNC_OPTION = new Option("sync", true, 064 "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " 065 + "hsync, nosync. Default: hflush"); 066 public static String DEFAULT_SYNC_OPTION = "hflush"; 067 068 public int numThreads; 069 public long numProcs; 070 public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value. 071 public int numWals; 072 public String syncType; 073 public int stateSize; 074 static byte[] serializedState; 075 private WALProcedureStore store; 076 077 /** Used by {@link Worker}. */ 078 private AtomicLong procIds = new AtomicLong(0); 079 private AtomicBoolean workersFailed = new AtomicBoolean(false); 080 // Timeout for worker threads. 081 private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds 082 083 // Non-default configurations. 084 private void setupConf() { 085 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType)); 086 if (numWals > 0) { 087 conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); 088 numProcsPerWal = numProcs / numWals; 089 } 090 } 091 092 private void setupProcedureStore() throws IOException { 093 Path testDir = UTIL.getDataTestDir(); 094 FileSystem fs = testDir.getFileSystem(conf); 095 Path logDir = new Path(testDir, "proc-logs"); 096 System.out.println("Logs directory : " + logDir.toString()); 097 fs.delete(logDir, true); 098 if ("nosync".equals(syncType)) { 099 store = new NoSyncWalProcedureStore(conf, logDir); 100 } else { 101 store = ProcedureTestingUtility.createWalStore(conf, logDir); 102 } 103 store.start(numThreads); 104 store.recoverLease(); 105 store.load(new ProcedureTestingUtility.LoadCounter()); 106 System.out.println("Starting new log : " 107 + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); 108 } 109 110 private void tearDownProcedureStore() { 111 store.stop(false); 112 try { 113 store.getFileSystem().delete(store.getWALDir(), true); 114 } catch (IOException e) { 115 System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " 116 + "disk space. Location: " + store.getWALDir().toString()); 117 e.printStackTrace(); 118 } 119 } 120 121 /** 122 * Processes and validates command line options. 123 */ 124 @Override 125 public void processOptions(CommandLine cmd) { 126 numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS); 127 numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); 128 numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); 129 syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION); 130 assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType): 131 "sync argument can only accept one of these three values: hsync, hflush, nosync"; 132 stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); 133 serializedState = new byte[stateSize]; 134 setupConf(); 135 } 136 137 @Override 138 public void addOptions() { 139 addOption(NUM_THREADS_OPTION); 140 addOption(NUM_PROCS_OPTION); 141 addOption(NUM_WALS_OPTION); 142 addOption(SYNC_OPTION); 143 addOption(STATE_SIZE_OPTION); 144 } 145 146 @Override 147 public int doWork() { 148 try { 149 setupProcedureStore(); 150 ExecutorService executor = Executors.newFixedThreadPool(numThreads); 151 Future<?>[] futures = new Future<?>[numThreads]; 152 // Start worker threads. 153 long start = System.currentTimeMillis(); 154 for (int i = 0; i < numThreads; i++) { 155 futures[i] = executor.submit(new Worker(start)); 156 } 157 boolean failure = false; 158 try { 159 for (Future<?> future : futures) { 160 long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis(); 161 failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE)); 162 } 163 } catch (Exception e) { 164 System.err.println("Exception in worker thread."); 165 e.printStackTrace(); 166 return EXIT_FAILURE; 167 } 168 executor.shutdown(); 169 if (failure) { 170 return EXIT_FAILURE; 171 } 172 long timeTaken = System.currentTimeMillis() - start; 173 System.out.println("******************************************"); 174 System.out.println("Num threads : " + numThreads); 175 System.out.println("Num procedures : " + numProcs); 176 System.out.println("Sync type : " + syncType); 177 System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec"); 178 System.out.println("******************************************"); 179 System.out.println("Raw format for scripts"); 180 System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " 181 + "total_time_ms=%s]", 182 NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize, 183 SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads, 184 NUM_WALS_OPTION.getOpt(), numWals, timeTaken)); 185 return EXIT_SUCCESS; 186 } catch (IOException e) { 187 e.printStackTrace(); 188 return EXIT_FAILURE; 189 } finally { 190 tearDownProcedureStore(); 191 } 192 } 193 194 /////////////////////////////// 195 // HELPER CLASSES 196 /////////////////////////////// 197 198 /** 199 * Callable to generate load for wal by inserting/deleting/updating procedures. 200 * If procedure store fails to roll log file (throws IOException), all threads quit, and at 201 * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}. 202 */ 203 private final class Worker implements Callable<Integer> { 204 private final long start; 205 206 public Worker(long start) { 207 this.start = start; 208 } 209 210 // TODO: Can also collect #procs, time taken by each thread to measure fairness. 211 @Override 212 public Integer call() throws IOException { 213 while (true) { 214 if (workersFailed.get()) { 215 return EXIT_FAILURE; 216 } 217 long procId = procIds.getAndIncrement(); 218 if (procId >= numProcs) { 219 break; 220 } 221 if (procId != 0 && procId % 10000 == 0) { 222 long ms = System.currentTimeMillis() - start; 223 System.out.println("Wrote " + procId + " procedures in " 224 + StringUtils.humanTimeDiff(ms)); 225 } 226 try{ 227 if (procId > 0 && procId % numProcsPerWal == 0) { 228 store.rollWriterForTesting(); 229 System.out.println("Starting new log : " 230 + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); 231 } 232 } catch (IOException ioe) { 233 // Ask other threads to quit too. 234 workersFailed.set(true); 235 System.err.println("Exception when rolling log file. Current procId = " + procId); 236 ioe.printStackTrace(); 237 return EXIT_FAILURE; 238 } 239 ProcedureTestingUtility.TestProcedure proc = 240 new ProcedureTestingUtility.TestProcedure(procId); 241 proc.setData(serializedState); 242 store.insert(proc, null); 243 store.update(proc); 244 } 245 return EXIT_SUCCESS; 246 } 247 } 248 249 private static class NoSyncWalProcedureStore extends WALProcedureStore { 250 public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException { 251 super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() { 252 @Override 253 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 254 // no-op 255 } 256 }); 257 } 258 259 @Override 260 protected void syncStream(FSDataOutputStream stream) { 261 // no-op 262 } 263 } 264 265 public static void main(String[] args) throws IOException { 266 ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation(); 267 tool.setConf(UTIL.getConfiguration()); 268 tool.run(args); 269 } 270}