001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store; 019 020import java.io.IOException; 021import java.util.concurrent.Callable; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Executors; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.util.StringUtils; 032import org.apache.hadoop.hbase.util.AbstractHBaseTool; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035 036import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 037import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 038 039/** 040 * Base class for testing procedure store performance. 041 */ 042public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureStore> 043 extends AbstractHBaseTool { 044 045 // Command line options and defaults. 046 public static String DEFAULT_OUTPUT_PATH = "proc-store"; 047 048 public static Option OUTPUT_PATH_OPTION = 049 new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH); 050 051 public static int DEFAULT_NUM_THREADS = 20; 052 053 public static Option NUM_THREADS_OPTION = new Option("threads", true, 054 "Number of parallel threads which will write insert/updates/deletes to store. Default: " 055 + DEFAULT_NUM_THREADS); 056 057 public static int DEFAULT_NUM_PROCS = 1000000; // 1M 058 059 public static Option NUM_PROCS_OPTION = new Option("procs", true, 060 "Total number of procedures. Each procedure writes one insert and one update. Default: " 061 + DEFAULT_NUM_PROCS); 062 063 public static int DEFAULT_STATE_SIZE = 1024; // 1KB 064 065 public static Option STATE_SIZE_OPTION = 066 new Option("state_size", true, "Size of serialized state in bytes to write on update. Default: " 067 + DEFAULT_STATE_SIZE + "bytes"); 068 069 public static Option SYNC_OPTION = new Option("sync", true, 070 "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " 071 + "hsync, nosync. Default: hflush"); 072 073 public static String DEFAULT_SYNC_OPTION = "hflush"; 074 075 protected String outputPath; 076 protected int numThreads; 077 protected long numProcs; 078 protected String syncType; 079 protected int stateSize; 080 protected static byte[] SERIALIZED_STATE; 081 082 protected T store; 083 084 /** Used by {@link Worker}. */ 085 private AtomicLong procIds = new AtomicLong(0); 086 private AtomicBoolean workersFailed = new AtomicBoolean(false); 087 088 // Timeout for worker threads. 089 private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds 090 091 @Override 092 protected void addOptions() { 093 addOption(OUTPUT_PATH_OPTION); 094 addOption(NUM_THREADS_OPTION); 095 addOption(NUM_PROCS_OPTION); 096 addOption(SYNC_OPTION); 097 addOption(STATE_SIZE_OPTION); 098 } 099 100 @Override 101 protected void processOptions(CommandLine cmd) { 102 outputPath = cmd.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH); 103 numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS); 104 numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); 105 syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION); 106 assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType) 107 : "sync argument can only accept one of these three values: hsync, hflush, nosync"; 108 stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); 109 SERIALIZED_STATE = new byte[stateSize]; 110 Bytes.random(SERIALIZED_STATE); 111 } 112 113 private void setUpProcedureStore() throws IOException { 114 FileSystem fs = FileSystem.get(conf); 115 Path storeDir = fs.makeQualified(new Path(outputPath)); 116 System.out.println("Procedure store directory : " + storeDir.toString()); 117 fs.delete(storeDir, true); 118 store = createProcedureStore(storeDir); 119 store.start(numThreads); 120 store.recoverLease(); 121 store.load(new ProcedureTestingUtility.LoadCounter()); 122 System.out.println("Starting new procedure store: " + store.getClass().getSimpleName()); 123 } 124 125 protected abstract T createProcedureStore(Path storeDir) throws IOException; 126 127 protected void postStop(T store) throws IOException { 128 } 129 130 private void tearDownProcedureStore() { 131 Path storeDir = null; 132 try { 133 if (store != null) { 134 store.stop(false); 135 postStop(store); 136 } 137 FileSystem fs = FileSystem.get(conf); 138 storeDir = fs.makeQualified(new Path(outputPath)); 139 fs.delete(storeDir, true); 140 } catch (IOException e) { 141 System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " 142 + "disk space. Location: " + storeDir); 143 e.printStackTrace(); 144 } 145 } 146 147 protected abstract void printRawFormatResult(long timeTakenNs); 148 149 @Override 150 protected int doWork() throws Exception { 151 try { 152 setUpProcedureStore(); 153 ExecutorService executor = Executors.newFixedThreadPool(numThreads); 154 Future<?>[] futures = new Future<?>[numThreads]; 155 // Start worker threads. 156 long start = System.nanoTime(); 157 for (int i = 0; i < numThreads; i++) { 158 futures[i] = executor.submit(new Worker(start)); 159 } 160 boolean failure = false; 161 try { 162 for (Future<?> future : futures) { 163 long timeout = 164 start + WORKER_THREADS_TIMEOUT_SEC * 1000 - EnvironmentEdgeManager.currentTime(); 165 failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE)); 166 } 167 } catch (Exception e) { 168 System.err.println("Exception in worker thread."); 169 e.printStackTrace(); 170 return EXIT_FAILURE; 171 } 172 executor.shutdown(); 173 if (failure) { 174 return EXIT_FAILURE; 175 } 176 long timeTakenNs = System.nanoTime() - start; 177 System.out.println("******************************************"); 178 System.out.println("Num threads : " + numThreads); 179 System.out.println("Num procedures : " + numProcs); 180 System.out.println("Sync type : " + syncType); 181 System.out.println("Time taken : " + TimeUnit.NANOSECONDS.toSeconds(timeTakenNs) + "sec"); 182 System.out.println("******************************************"); 183 System.out.println("Raw format for scripts"); 184 printRawFormatResult(timeTakenNs); 185 return EXIT_SUCCESS; 186 } catch (IOException e) { 187 e.printStackTrace(); 188 return EXIT_FAILURE; 189 } finally { 190 tearDownProcedureStore(); 191 } 192 } 193 194 /////////////////////////////// 195 // HELPER CLASSES 196 /////////////////////////////// 197 198 /** 199 * Callable to generate load for wal by inserting/deleting/updating procedures. If procedure store 200 * fails to roll log file (throws IOException), all threads quit, and at least one returns value 201 * of {@link AbstractHBaseTool#EXIT_FAILURE}. 202 */ 203 private final class Worker implements Callable<Integer> { 204 private final long start; 205 206 public Worker(long start) { 207 this.start = start; 208 } 209 210 // TODO: Can also collect #procs, time taken by each thread to measure fairness. 211 @Override 212 public Integer call() throws IOException { 213 while (true) { 214 if (workersFailed.get()) { 215 return EXIT_FAILURE; 216 } 217 long procId = procIds.getAndIncrement(); 218 if (procId >= numProcs) { 219 break; 220 } 221 if (procId != 0 && procId % 10000 == 0) { 222 long ns = System.nanoTime() - start; 223 System.out.println("Wrote " + procId + " procedures in " 224 + StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(ns))); 225 } 226 try { 227 preWrite(procId); 228 } catch (IOException ioe) { 229 // Ask other threads to quit too. 230 workersFailed.set(true); 231 System.err.println("Exception when rolling log file. Current procId = " + procId); 232 ioe.printStackTrace(); 233 return EXIT_FAILURE; 234 } 235 ProcedureTestingUtility.TestProcedure proc = 236 new ProcedureTestingUtility.TestProcedure(procId); 237 proc.setData(SERIALIZED_STATE); 238 store.insert(proc, null); 239 store.update(proc); 240 } 241 return EXIT_SUCCESS; 242 } 243 } 244 245 protected abstract void preWrite(long procId) throws IOException; 246}