001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import java.util.concurrent.Callable; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Executors; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 034import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 035import org.apache.hadoop.hbase.procedure2.util.StringUtils; 036import org.apache.hadoop.hbase.util.AbstractHBaseTool; 037 038import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 039import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 040 041public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { 042 protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 043 044 // Command line options and defaults. 045 public static int DEFAULT_NUM_THREADS = 20; 046 public static Option NUM_THREADS_OPTION = new Option("threads", true, 047 "Number of parallel threads which will write insert/updates/deletes to WAL. Default: " 048 + DEFAULT_NUM_THREADS); 049 public static int DEFAULT_NUM_PROCS = 1000000; // 1M 050 public static Option NUM_PROCS_OPTION = new Option("procs", true, 051 "Total number of procedures. Each procedure writes one insert and one update. Default: " 052 + DEFAULT_NUM_PROCS); 053 public static int DEFAULT_NUM_WALS = 0; 054 public static Option NUM_WALS_OPTION = new Option("wals", true, 055 "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + 056 " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); 057 public static int DEFAULT_STATE_SIZE = 1024; // 1KB 058 public static Option STATE_SIZE_OPTION = new Option("state_size", true, 059 "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE 060 + "bytes"); 061 public static Option SYNC_OPTION = new Option("sync", true, 062 "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " 063 + "hsync, nosync. Default: hflush"); 064 public static String DEFAULT_SYNC_OPTION = "hflush"; 065 066 public int numThreads; 067 public long numProcs; 068 public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value. 069 public int numWals; 070 public String syncType; 071 public int stateSize; 072 static byte[] serializedState; 073 private WALProcedureStore store; 074 075 /** Used by {@link Worker}. */ 076 private AtomicLong procIds = new AtomicLong(0); 077 private AtomicBoolean workersFailed = new AtomicBoolean(false); 078 // Timeout for worker threads. 079 private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds 080 081 // Non-default configurations. 082 private void setupConf() { 083 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType)); 084 if (numWals > 0) { 085 conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); 086 numProcsPerWal = numProcs / numWals; 087 } 088 } 089 090 private void setupProcedureStore() throws IOException { 091 Path testDir = UTIL.getDataTestDir(); 092 FileSystem fs = testDir.getFileSystem(conf); 093 Path logDir = new Path(testDir, "proc-logs"); 094 System.out.println("Logs directory : " + logDir.toString()); 095 fs.delete(logDir, true); 096 if ("nosync".equals(syncType)) { 097 store = new NoSyncWalProcedureStore(conf, logDir); 098 } else { 099 store = ProcedureTestingUtility.createWalStore(conf, logDir); 100 } 101 store.start(numThreads); 102 store.recoverLease(); 103 store.load(new ProcedureTestingUtility.LoadCounter()); 104 System.out.println("Starting new log : " 105 + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); 106 } 107 108 private void tearDownProcedureStore() { 109 store.stop(false); 110 try { 111 store.getFileSystem().delete(store.getWALDir(), true); 112 } catch (IOException e) { 113 System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " 114 + "disk space. Location: " + store.getWALDir().toString()); 115 e.printStackTrace(); 116 } 117 } 118 119 /** 120 * Processes and validates command line options. 121 */ 122 @Override 123 public void processOptions(CommandLine cmd) { 124 numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS); 125 numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); 126 numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); 127 syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION); 128 assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType): 129 "sync argument can only accept one of these three values: hsync, hflush, nosync"; 130 stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); 131 serializedState = new byte[stateSize]; 132 setupConf(); 133 } 134 135 @Override 136 public void addOptions() { 137 addOption(NUM_THREADS_OPTION); 138 addOption(NUM_PROCS_OPTION); 139 addOption(NUM_WALS_OPTION); 140 addOption(SYNC_OPTION); 141 addOption(STATE_SIZE_OPTION); 142 } 143 144 @Override 145 public int doWork() { 146 try { 147 setupProcedureStore(); 148 ExecutorService executor = Executors.newFixedThreadPool(numThreads); 149 Future<?>[] futures = new Future<?>[numThreads]; 150 // Start worker threads. 151 long start = System.currentTimeMillis(); 152 for (int i = 0; i < numThreads; i++) { 153 futures[i] = executor.submit(new Worker(start)); 154 } 155 boolean failure = false; 156 try { 157 for (Future<?> future : futures) { 158 long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis(); 159 failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE)); 160 } 161 } catch (Exception e) { 162 System.err.println("Exception in worker thread."); 163 e.printStackTrace(); 164 return EXIT_FAILURE; 165 } 166 executor.shutdown(); 167 if (failure) { 168 return EXIT_FAILURE; 169 } 170 long timeTaken = System.currentTimeMillis() - start; 171 System.out.println("******************************************"); 172 System.out.println("Num threads : " + numThreads); 173 System.out.println("Num procedures : " + numProcs); 174 System.out.println("Sync type : " + syncType); 175 System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec"); 176 System.out.println("******************************************"); 177 System.out.println("Raw format for scripts"); 178 System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " 179 + "total_time_ms=%s]", 180 NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize, 181 SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads, 182 NUM_WALS_OPTION.getOpt(), numWals, timeTaken)); 183 return EXIT_SUCCESS; 184 } catch (IOException e) { 185 e.printStackTrace(); 186 return EXIT_FAILURE; 187 } finally { 188 tearDownProcedureStore(); 189 } 190 } 191 192 /////////////////////////////// 193 // HELPER CLASSES 194 /////////////////////////////// 195 196 /** 197 * Callable to generate load for wal by inserting/deleting/updating procedures. 198 * If procedure store fails to roll log file (throws IOException), all threads quit, and at 199 * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}. 200 */ 201 private final class Worker implements Callable<Integer> { 202 private final long start; 203 204 public Worker(long start) { 205 this.start = start; 206 } 207 208 // TODO: Can also collect #procs, time taken by each thread to measure fairness. 209 @Override 210 public Integer call() throws IOException { 211 while (true) { 212 if (workersFailed.get()) { 213 return EXIT_FAILURE; 214 } 215 long procId = procIds.getAndIncrement(); 216 if (procId >= numProcs) { 217 break; 218 } 219 if (procId != 0 && procId % 10000 == 0) { 220 long ms = System.currentTimeMillis() - start; 221 System.out.println("Wrote " + procId + " procedures in " 222 + StringUtils.humanTimeDiff(ms)); 223 } 224 try{ 225 if (procId > 0 && procId % numProcsPerWal == 0) { 226 store.rollWriterForTesting(); 227 System.out.println("Starting new log : " 228 + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); 229 } 230 } catch (IOException ioe) { 231 // Ask other threads to quit too. 232 workersFailed.set(true); 233 System.err.println("Exception when rolling log file. Current procId = " + procId); 234 ioe.printStackTrace(); 235 return EXIT_FAILURE; 236 } 237 ProcedureTestingUtility.TestProcedure proc = 238 new ProcedureTestingUtility.TestProcedure(procId); 239 proc.setData(serializedState); 240 store.insert(proc, null); 241 store.update(proc); 242 } 243 return EXIT_SUCCESS; 244 } 245 } 246 247 private static class NoSyncWalProcedureStore extends WALProcedureStore { 248 public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException { 249 super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() { 250 @Override 251 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 252 // no-op 253 } 254 }); 255 } 256 257 @Override 258 protected void syncStream(FSDataOutputStream stream) { 259 // no-op 260 } 261 } 262 263 public static void main(String[] args) throws IOException { 264 ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation(); 265 tool.setConf(UTIL.getConfiguration()); 266 tool.run(args); 267 } 268}