001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store; 019 020import java.io.IOException; 021import java.util.Random; 022import java.util.concurrent.Callable; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import java.util.concurrent.Future; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 032import org.apache.hadoop.hbase.procedure2.util.StringUtils; 033import org.apache.hadoop.hbase.util.AbstractHBaseTool; 034 035import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 036import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 037 038/** 039 * Base class for testing procedure store performance. 040 */ 041public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureStore> 042 extends AbstractHBaseTool { 043 044 // Command line options and defaults. 045 public static String DEFAULT_OUTPUT_PATH = "proc-store"; 046 047 public static Option OUTPUT_PATH_OPTION = 048 new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH); 049 050 public static int DEFAULT_NUM_THREADS = 20; 051 052 public static Option NUM_THREADS_OPTION = new Option("threads", true, 053 "Number of parallel threads which will write insert/updates/deletes to store. Default: " + 054 DEFAULT_NUM_THREADS); 055 056 public static int DEFAULT_NUM_PROCS = 1000000; // 1M 057 058 public static Option NUM_PROCS_OPTION = new Option("procs", true, 059 "Total number of procedures. Each procedure writes one insert and one update. Default: " + 060 DEFAULT_NUM_PROCS); 061 062 public static int DEFAULT_STATE_SIZE = 1024; // 1KB 063 064 public static Option STATE_SIZE_OPTION = new Option("state_size", true, 065 "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE + 066 "bytes"); 067 068 public static Option SYNC_OPTION = new Option("sync", true, 069 "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " + 070 "hsync, nosync. Default: hflush"); 071 072 public static String DEFAULT_SYNC_OPTION = "hflush"; 073 074 protected String outputPath; 075 protected int numThreads; 076 protected long numProcs; 077 protected String syncType; 078 protected int stateSize; 079 protected static byte[] SERIALIZED_STATE; 080 081 protected T store; 082 083 /** Used by {@link Worker}. */ 084 private AtomicLong procIds = new AtomicLong(0); 085 private AtomicBoolean workersFailed = new AtomicBoolean(false); 086 087 // Timeout for worker threads. 088 private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds 089 090 @Override 091 protected void addOptions() { 092 addOption(OUTPUT_PATH_OPTION); 093 addOption(NUM_THREADS_OPTION); 094 addOption(NUM_PROCS_OPTION); 095 addOption(SYNC_OPTION); 096 addOption(STATE_SIZE_OPTION); 097 } 098 099 @Override 100 protected void processOptions(CommandLine cmd) { 101 outputPath = cmd.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH); 102 numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS); 103 numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); 104 syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION); 105 assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals( 106 syncType) : "sync argument can only accept one of these three values: hsync, hflush, nosync"; 107 stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); 108 SERIALIZED_STATE = new byte[stateSize]; 109 new Random(12345).nextBytes(SERIALIZED_STATE); 110 } 111 112 private void setUpProcedureStore() throws IOException { 113 FileSystem fs = FileSystem.get(conf); 114 Path storeDir = fs.makeQualified(new Path(outputPath)); 115 System.out.println("Procedure store directory : " + storeDir.toString()); 116 fs.delete(storeDir, true); 117 store = createProcedureStore(storeDir); 118 store.start(numThreads); 119 store.recoverLease(); 120 store.load(new ProcedureTestingUtility.LoadCounter()); 121 System.out.println("Starting new procedure store: " + store.getClass().getSimpleName()); 122 } 123 124 protected abstract T createProcedureStore(Path storeDir) throws IOException; 125 126 protected void postStop(T store) throws IOException { 127 } 128 129 private void tearDownProcedureStore() { 130 Path storeDir = null; 131 try { 132 if (store != null) { 133 store.stop(false); 134 postStop(store); 135 } 136 FileSystem fs = FileSystem.get(conf); 137 storeDir = fs.makeQualified(new Path(outputPath)); 138 fs.delete(storeDir, true); 139 } catch (IOException e) { 140 System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " + 141 "disk space. Location: " + storeDir); 142 e.printStackTrace(); 143 } 144 } 145 146 protected abstract void printRawFormatResult(long timeTakenNs); 147 148 @Override 149 protected int doWork() throws Exception { 150 try { 151 setUpProcedureStore(); 152 ExecutorService executor = Executors.newFixedThreadPool(numThreads); 153 Future<?>[] futures = new Future<?>[numThreads]; 154 // Start worker threads. 155 long start = System.nanoTime(); 156 for (int i = 0; i < numThreads; i++) { 157 futures[i] = executor.submit(new Worker(start)); 158 } 159 boolean failure = false; 160 try { 161 for (Future<?> future : futures) { 162 long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis(); 163 failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE)); 164 } 165 } catch (Exception e) { 166 System.err.println("Exception in worker thread."); 167 e.printStackTrace(); 168 return EXIT_FAILURE; 169 } 170 executor.shutdown(); 171 if (failure) { 172 return EXIT_FAILURE; 173 } 174 long timeTakenNs = System.nanoTime() - start; 175 System.out.println("******************************************"); 176 System.out.println("Num threads : " + numThreads); 177 System.out.println("Num procedures : " + numProcs); 178 System.out.println("Sync type : " + syncType); 179 System.out.println("Time taken : " + TimeUnit.NANOSECONDS.toSeconds(timeTakenNs) + "sec"); 180 System.out.println("******************************************"); 181 System.out.println("Raw format for scripts"); 182 printRawFormatResult(timeTakenNs); 183 return EXIT_SUCCESS; 184 } catch (IOException e) { 185 e.printStackTrace(); 186 return EXIT_FAILURE; 187 } finally { 188 tearDownProcedureStore(); 189 } 190 } 191 192 /////////////////////////////// 193 // HELPER CLASSES 194 /////////////////////////////// 195 196 /** 197 * Callable to generate load for wal by inserting/deleting/updating procedures. If procedure store 198 * fails to roll log file (throws IOException), all threads quit, and at least one returns value 199 * of {@link AbstractHBaseTool#EXIT_FAILURE}. 200 */ 201 private final class Worker implements Callable<Integer> { 202 private final long start; 203 204 public Worker(long start) { 205 this.start = start; 206 } 207 208 // TODO: Can also collect #procs, time taken by each thread to measure fairness. 209 @Override 210 public Integer call() throws IOException { 211 while (true) { 212 if (workersFailed.get()) { 213 return EXIT_FAILURE; 214 } 215 long procId = procIds.getAndIncrement(); 216 if (procId >= numProcs) { 217 break; 218 } 219 if (procId != 0 && procId % 10000 == 0) { 220 long ns = System.nanoTime() - start; 221 System.out.println("Wrote " + procId + " procedures in " + 222 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(ns))); 223 } 224 try { 225 preWrite(procId); 226 } catch (IOException ioe) { 227 // Ask other threads to quit too. 228 workersFailed.set(true); 229 System.err.println("Exception when rolling log file. Current procId = " + procId); 230 ioe.printStackTrace(); 231 return EXIT_FAILURE; 232 } 233 ProcedureTestingUtility.TestProcedure proc = 234 new ProcedureTestingUtility.TestProcedure(procId); 235 proc.setData(SERIALIZED_STATE); 236 store.insert(proc, null); 237 store.update(proc); 238 } 239 return EXIT_SUCCESS; 240 } 241 } 242 243 protected abstract void preWrite(long procId) throws IOException; 244}