001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static java.lang.String.format; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import com.codahale.metrics.Histogram; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Queue; 031import java.util.Set; 032import java.util.concurrent.Callable; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction; 035import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; 036import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 037import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; 038import org.apache.hadoop.hbase.chaos.policies.Policy; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.ipc.RpcClient; 041import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 042import org.apache.hadoop.hbase.testclassification.IntegrationTests; 043import org.apache.hadoop.hbase.util.YammerHistogramUtils; 044import org.apache.hadoop.mapreduce.Counters; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.util.ToolRunner; 047import org.junit.experimental.categories.Category; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 052import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 054 055/** 056 * Test for comparing the performance impact of region replicas. Uses components of 057 * {@link PerformanceEvaluation}. Does not run from {@code IntegrationTestsDriver} because 058 * IntegrationTestBase is incompatible with the JUnit runner. Hence no @Test annotations either. See 059 * {@code -help} for full list of options. 060 */ 061@Category(IntegrationTests.class) 062public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { 063 064 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRegionReplicaPerf.class); 065 066 private static final String SLEEP_TIME_KEY = "sleeptime"; 067 // short default interval because tests don't run very long. 068 private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l); 069 private static final String TABLE_NAME_KEY = "tableName"; 070 private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf"; 071 private static final String REPLICA_COUNT_KEY = "replicas"; 072 private static final String REPLICA_COUNT_DEFAULT = "" + 3; 073 private static final String PRIMARY_TIMEOUT_KEY = "timeout"; 074 private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms 075 private static final String NUM_RS_KEY = "numRs"; 076 private static final String NUM_RS_DEFAULT = "" + 3; 077 public static final String FAMILY_NAME = "info"; 078 079 /** Extract a descriptive statistic from a {@link com.codahale.metrics.Histogram}. */ 080 private enum Stat { 081 STDEV { 082 @Override 083 double apply(Histogram hist) { 084 return hist.getSnapshot().getStdDev(); 085 } 086 }, 087 FOUR_9S { 088 @Override 089 double apply(Histogram hist) { 090 return hist.getSnapshot().getValue(0.9999); 091 } 092 }; 093 094 abstract double apply(Histogram hist); 095 } 096 097 private TableName tableName; 098 private long sleepTime; 099 private int replicaCount; 100 private int primaryTimeout; 101 private int clusterSize; 102 103 /** 104 * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}. 105 */ 106 static class PerfEvalCallable implements Callable<TimingResult> { 107 private final Queue<String> argv = new ArrayDeque<>(); 108 private final Admin admin; 109 110 public PerfEvalCallable(Admin admin, String argv) { 111 // TODO: this API is awkward, should take Connection, not Admin 112 this.admin = admin; 113 this.argv.addAll(Arrays.asList(argv.split(" "))); 114 LOG.debug("Created PerformanceEvaluationCallable with args: " + argv); 115 } 116 117 @Override 118 public TimingResult call() throws Exception { 119 PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv); 120 PerformanceEvaluation.checkTable(admin, opts); 121 PerformanceEvaluation.RunResult results[] = null; 122 long numRows = opts.totalRows; 123 long elapsedTime = 0; 124 if (opts.nomapred) { 125 results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration()); 126 for (PerformanceEvaluation.RunResult r : results) { 127 elapsedTime = Math.max(elapsedTime, r.duration); 128 } 129 } else { 130 Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration()); 131 Counters counters = job.getCounters(); 132 numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue(); 133 elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue(); 134 } 135 return new TimingResult(numRows, elapsedTime, results); 136 } 137 } 138 139 /** 140 * Record the results from a single {@link PerformanceEvaluation} job run. 141 */ 142 static class TimingResult { 143 public final long numRows; 144 public final long elapsedTime; 145 public final PerformanceEvaluation.RunResult results[]; 146 147 public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) { 148 this.numRows = numRows; 149 this.elapsedTime = elapsedTime; 150 this.results = results; 151 } 152 153 @Override 154 public String toString() { 155 return MoreObjects.toStringHelper(this).add("numRows", numRows) 156 .add("elapsedTime", elapsedTime).toString(); 157 } 158 } 159 160 @Override 161 public void setUp() throws Exception { 162 super.setUp(); 163 Configuration conf = util.getConfiguration(); 164 165 // sanity check cluster 166 // TODO: this should reach out to master and verify online state instead 167 assertEquals("Master must be configured with StochasticLoadBalancer", 168 "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer", 169 conf.get("hbase.master.loadbalancer.class")); 170 // TODO: this should reach out to master and verify online state instead 171 assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.", 172 conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0); 173 174 // enable client-side settings 175 conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true); 176 // TODO: expose these settings to CLI override 177 conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout); 178 conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout); 179 } 180 181 @Override 182 public void setUpCluster() throws Exception { 183 util = getTestingUtil(getConf()); 184 util.initializeCluster(clusterSize); 185 } 186 187 @Override 188 public void setUpMonkey() throws Exception { 189 Policy p = new PeriodicRandomActionPolicy(sleepTime, 190 new RestartRandomRsExceptMetaAction(sleepTime), new MoveRandomRegionOfTableAction(tableName)); 191 this.monkey = new PolicyBasedChaosMonkey(util, p); 192 // don't start monkey right away 193 } 194 195 @Override 196 protected void addOptions() { 197 addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '" + TABLE_NAME_DEFAULT + "'"); 198 addOptWithArg(SLEEP_TIME_KEY, 199 "How long the monkey sleeps between actions. Default: " + SLEEP_TIME_DEFAULT); 200 addOptWithArg(REPLICA_COUNT_KEY, 201 "Number of region replicas. Default: " + REPLICA_COUNT_DEFAULT); 202 addOptWithArg(PRIMARY_TIMEOUT_KEY, 203 "Overrides hbase.client.primaryCallTimeout. Default: " + PRIMARY_TIMEOUT_DEFAULT + " (10ms)"); 204 addOptWithArg(NUM_RS_KEY, 205 "Specify the number of RegionServers to use. Default: " + NUM_RS_DEFAULT); 206 } 207 208 @Override 209 protected void processOptions(CommandLine cmd) { 210 tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT)); 211 sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT)); 212 replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT)); 213 primaryTimeout = 214 Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT)); 215 clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT)); 216 LOG.debug(MoreObjects.toStringHelper("Parsed Options").add(TABLE_NAME_KEY, tableName) 217 .add(SLEEP_TIME_KEY, sleepTime).add(REPLICA_COUNT_KEY, replicaCount) 218 .add(PRIMARY_TIMEOUT_KEY, primaryTimeout).add(NUM_RS_KEY, clusterSize).toString()); 219 } 220 221 @Override 222 public int runTestFromCommandLine() throws Exception { 223 test(); 224 return 0; 225 } 226 227 @Override 228 public TableName getTablename() { 229 return tableName; 230 } 231 232 @Override 233 protected Set<String> getColumnFamilies() { 234 return Sets.newHashSet(FAMILY_NAME); 235 } 236 237 /** Compute the mean of the given {@code stat} from a timing results. */ 238 private static double calcMean(String desc, Stat stat, List<TimingResult> results) { 239 double sum = 0; 240 int count = 0; 241 242 for (TimingResult tr : results) { 243 for (PerformanceEvaluation.RunResult r : tr.results) { 244 assertNotNull("One of the run results is missing detailed run data.", r.hist); 245 sum += stat.apply(r.hist); 246 count += 1; 247 LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}"); 248 } 249 } 250 return sum / count; 251 } 252 253 public void test() throws Exception { 254 int maxIters = 3; 255 String replicas = "--replicas=" + replicaCount; 256 // TODO: splits disabled until "phase 2" is complete. 257 String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName(); 258 String writeOpts = 259 format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4", splitPolicy, tableName); 260 String readOpts = 261 format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName); 262 String replicaReadOpts = format("%s %s", replicas, readOpts); 263 264 ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<>(maxIters); 265 ArrayList<TimingResult> resultsWithReplicas = new ArrayList<>(maxIters); 266 267 // create/populate the table, replicas disabled 268 LOG.debug("Populating table."); 269 new PerfEvalCallable(util.getAdmin(), writeOpts).call(); 270 271 // one last sanity check, then send in the clowns! 272 assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.", 273 DisabledRegionSplitPolicy.class.getName(), 274 util.getAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName()); 275 startMonkey(); 276 277 // collect a baseline without region replicas. 278 for (int i = 0; i < maxIters; i++) { 279 LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters); 280 resultsWithoutReplicas.add(new PerfEvalCallable(util.getAdmin(), readOpts).call()); 281 // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? 282 Thread.sleep(5000l); 283 } 284 285 // disable monkey, enable region replicas, enable monkey 286 cleanUpMonkey("Altering table."); 287 LOG.debug("Altering " + tableName + " replica count to " + replicaCount); 288 IntegrationTestingUtility.setReplicas(util.getAdmin(), tableName, replicaCount); 289 setUpMonkey(); 290 startMonkey(); 291 292 // run test with region replicas. 293 for (int i = 0; i < maxIters; i++) { 294 LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters); 295 resultsWithReplicas.add(new PerfEvalCallable(util.getAdmin(), replicaReadOpts).call()); 296 // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? 297 Thread.sleep(5000l); 298 } 299 300 // compare the average of the stdev and 99.99pct across runs to determine if region replicas 301 // are having an overall improvement on response variance experienced by clients. 302 double withoutReplicasStdevMean = 303 calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas); 304 double withoutReplicas9999Mean = 305 calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas); 306 double withReplicasStdevMean = calcMean("withReplicas", Stat.STDEV, resultsWithReplicas); 307 double withReplicas9999Mean = calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas); 308 309 LOG.info(MoreObjects.toStringHelper(this).add("withoutReplicas", resultsWithoutReplicas) 310 .add("withReplicas", resultsWithReplicas) 311 .add("withoutReplicasStdevMean", withoutReplicasStdevMean) 312 .add("withoutReplicas99.99Mean", withoutReplicas9999Mean) 313 .add("withReplicasStdevMean", withReplicasStdevMean) 314 .add("withReplicas99.99Mean", withReplicas9999Mean).toString()); 315 316 assertTrue( 317 "Running with region replicas under chaos should have less request variance than without. " 318 + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms " 319 + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.", 320 withReplicasStdevMean <= withoutReplicasStdevMean); 321 assertTrue( 322 "Running with region replicas under chaos should improve 99.99pct latency. " 323 + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms " 324 + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.", 325 withReplicas9999Mean <= withoutReplicas9999Mean); 326 } 327 328 public static void main(String[] args) throws Exception { 329 Configuration conf = HBaseConfiguration.create(); 330 IntegrationTestingUtility.setUseDistributedCluster(conf); 331 int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args); 332 System.exit(status); 333 } 334}