001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import static java.lang.String.format; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025 026import com.codahale.metrics.Histogram; 027 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Queue; 033import java.util.Set; 034import java.util.concurrent.Callable; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction; 038import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; 039import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 040import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; 041import org.apache.hadoop.hbase.chaos.policies.Policy; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.ipc.RpcClient; 044import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 045import org.apache.hadoop.hbase.testclassification.IntegrationTests; 046import org.apache.hadoop.hbase.util.YammerHistogramUtils; 047import org.apache.hadoop.mapreduce.Counters; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.util.ToolRunner; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 055import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 057 058/** 059 * Test for comparing the performance impact of region replicas. Uses 060 * components of {@link PerformanceEvaluation}. Does not run from 061 * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible 062 * with the JUnit runner. Hence no @Test annotations either. See {@code -help} 063 * for full list of options. 064 */ 065@Category(IntegrationTests.class) 066public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { 067 068 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRegionReplicaPerf.class); 069 070 private static final String SLEEP_TIME_KEY = "sleeptime"; 071 // short default interval because tests don't run very long. 072 private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l); 073 private static final String TABLE_NAME_KEY = "tableName"; 074 private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf"; 075 private static final String REPLICA_COUNT_KEY = "replicas"; 076 private static final String REPLICA_COUNT_DEFAULT = "" + 3; 077 private static final String PRIMARY_TIMEOUT_KEY = "timeout"; 078 private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms 079 private static final String NUM_RS_KEY = "numRs"; 080 private static final String NUM_RS_DEFAULT = "" + 3; 081 public static final String FAMILY_NAME = "info"; 082 083 /** Extract a descriptive statistic from a {@link com.codahale.metrics.Histogram}. */ 084 private enum Stat { 085 STDEV { 086 @Override 087 double apply(Histogram hist) { 088 return hist.getSnapshot().getStdDev(); 089 } 090 }, 091 FOUR_9S { 092 @Override 093 double apply(Histogram hist) { 094 return hist.getSnapshot().getValue(0.9999); 095 } 096 }; 097 098 abstract double apply(Histogram hist); 099 } 100 101 private TableName tableName; 102 private long sleepTime; 103 private int replicaCount; 104 private int primaryTimeout; 105 private int clusterSize; 106 107 /** 108 * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}. 109 */ 110 static class PerfEvalCallable implements Callable<TimingResult> { 111 private final Queue<String> argv = new LinkedList<>(); 112 private final Admin admin; 113 114 public PerfEvalCallable(Admin admin, String argv) { 115 // TODO: this API is awkward, should take Connection, not Admin 116 this.admin = admin; 117 this.argv.addAll(Arrays.asList(argv.split(" "))); 118 LOG.debug("Created PerformanceEvaluationCallable with args: " + argv); 119 } 120 121 @Override 122 public TimingResult call() throws Exception { 123 PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv); 124 PerformanceEvaluation.checkTable(admin, opts); 125 PerformanceEvaluation.RunResult results[] = null; 126 long numRows = opts.totalRows; 127 long elapsedTime = 0; 128 if (opts.nomapred) { 129 results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration()); 130 for (PerformanceEvaluation.RunResult r : results) { 131 elapsedTime = Math.max(elapsedTime, r.duration); 132 } 133 } else { 134 Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration()); 135 Counters counters = job.getCounters(); 136 numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue(); 137 elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue(); 138 } 139 return new TimingResult(numRows, elapsedTime, results); 140 } 141 } 142 143 /** 144 * Record the results from a single {@link PerformanceEvaluation} job run. 145 */ 146 static class TimingResult { 147 public final long numRows; 148 public final long elapsedTime; 149 public final PerformanceEvaluation.RunResult results[]; 150 151 public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) { 152 this.numRows = numRows; 153 this.elapsedTime = elapsedTime; 154 this.results = results; 155 } 156 157 @Override 158 public String toString() { 159 return MoreObjects.toStringHelper(this) 160 .add("numRows", numRows) 161 .add("elapsedTime", elapsedTime) 162 .toString(); 163 } 164 } 165 166 @Override 167 public void setUp() throws Exception { 168 super.setUp(); 169 Configuration conf = util.getConfiguration(); 170 171 // sanity check cluster 172 // TODO: this should reach out to master and verify online state instead 173 assertEquals("Master must be configured with StochasticLoadBalancer", 174 "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer", 175 conf.get("hbase.master.loadbalancer.class")); 176 // TODO: this should reach out to master and verify online state instead 177 assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.", 178 conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0); 179 180 // enable client-side settings 181 conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true); 182 // TODO: expose these settings to CLI override 183 conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout); 184 conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout); 185 } 186 187 @Override 188 public void setUpCluster() throws Exception { 189 util = getTestingUtil(getConf()); 190 util.initializeCluster(clusterSize); 191 } 192 193 @Override 194 public void setUpMonkey() throws Exception { 195 Policy p = new PeriodicRandomActionPolicy(sleepTime, 196 new RestartRandomRsExceptMetaAction(sleepTime), 197 new MoveRandomRegionOfTableAction(tableName)); 198 this.monkey = new PolicyBasedChaosMonkey(util, p); 199 // don't start monkey right away 200 } 201 202 @Override 203 protected void addOptions() { 204 addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '" 205 + TABLE_NAME_DEFAULT + "'"); 206 addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: " 207 + SLEEP_TIME_DEFAULT); 208 addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: " 209 + REPLICA_COUNT_DEFAULT); 210 addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: " 211 + PRIMARY_TIMEOUT_DEFAULT + " (10ms)"); 212 addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: " 213 + NUM_RS_DEFAULT); 214 } 215 216 @Override 217 protected void processOptions(CommandLine cmd) { 218 tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT)); 219 sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT)); 220 replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT)); 221 primaryTimeout = 222 Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT)); 223 clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT)); 224 LOG.debug(MoreObjects.toStringHelper("Parsed Options") 225 .add(TABLE_NAME_KEY, tableName) 226 .add(SLEEP_TIME_KEY, sleepTime) 227 .add(REPLICA_COUNT_KEY, replicaCount) 228 .add(PRIMARY_TIMEOUT_KEY, primaryTimeout) 229 .add(NUM_RS_KEY, clusterSize) 230 .toString()); 231 } 232 233 @Override 234 public int runTestFromCommandLine() throws Exception { 235 test(); 236 return 0; 237 } 238 239 @Override 240 public TableName getTablename() { 241 return tableName; 242 } 243 244 @Override 245 protected Set<String> getColumnFamilies() { 246 return Sets.newHashSet(FAMILY_NAME); 247 } 248 249 /** Compute the mean of the given {@code stat} from a timing results. */ 250 private static double calcMean(String desc, Stat stat, List<TimingResult> results) { 251 double sum = 0; 252 int count = 0; 253 254 for (TimingResult tr : results) { 255 for (PerformanceEvaluation.RunResult r : tr.results) { 256 assertNotNull("One of the run results is missing detailed run data.", r.hist); 257 sum += stat.apply(r.hist); 258 count += 1; 259 LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}"); 260 } 261 } 262 return sum / count; 263 } 264 265 public void test() throws Exception { 266 int maxIters = 3; 267 String replicas = "--replicas=" + replicaCount; 268 // TODO: splits disabled until "phase 2" is complete. 269 String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName(); 270 String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4", 271 splitPolicy, tableName); 272 String readOpts = 273 format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName); 274 String replicaReadOpts = format("%s %s", replicas, readOpts); 275 276 ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<>(maxIters); 277 ArrayList<TimingResult> resultsWithReplicas = new ArrayList<>(maxIters); 278 279 // create/populate the table, replicas disabled 280 LOG.debug("Populating table."); 281 new PerfEvalCallable(util.getAdmin(), writeOpts).call(); 282 283 // one last sanity check, then send in the clowns! 284 assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.", 285 DisabledRegionSplitPolicy.class.getName(), 286 util.getAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName()); 287 startMonkey(); 288 289 // collect a baseline without region replicas. 290 for (int i = 0; i < maxIters; i++) { 291 LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters); 292 resultsWithoutReplicas.add(new PerfEvalCallable(util.getAdmin(), readOpts).call()); 293 // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? 294 Thread.sleep(5000l); 295 } 296 297 // disable monkey, enable region replicas, enable monkey 298 cleanUpMonkey("Altering table."); 299 LOG.debug("Altering " + tableName + " replica count to " + replicaCount); 300 IntegrationTestingUtility.setReplicas(util.getAdmin(), tableName, replicaCount); 301 setUpMonkey(); 302 startMonkey(); 303 304 // run test with region replicas. 305 for (int i = 0; i < maxIters; i++) { 306 LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters); 307 resultsWithReplicas.add(new PerfEvalCallable(util.getAdmin(), replicaReadOpts).call()); 308 // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? 309 Thread.sleep(5000l); 310 } 311 312 // compare the average of the stdev and 99.99pct across runs to determine if region replicas 313 // are having an overall improvement on response variance experienced by clients. 314 double withoutReplicasStdevMean = 315 calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas); 316 double withoutReplicas9999Mean = 317 calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas); 318 double withReplicasStdevMean = 319 calcMean("withReplicas", Stat.STDEV, resultsWithReplicas); 320 double withReplicas9999Mean = 321 calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas); 322 323 LOG.info(MoreObjects.toStringHelper(this) 324 .add("withoutReplicas", resultsWithoutReplicas) 325 .add("withReplicas", resultsWithReplicas) 326 .add("withoutReplicasStdevMean", withoutReplicasStdevMean) 327 .add("withoutReplicas99.99Mean", withoutReplicas9999Mean) 328 .add("withReplicasStdevMean", withReplicasStdevMean) 329 .add("withReplicas99.99Mean", withReplicas9999Mean) 330 .toString()); 331 332 assertTrue( 333 "Running with region replicas under chaos should have less request variance than without. " 334 + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms " 335 + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.", 336 withReplicasStdevMean <= withoutReplicasStdevMean); 337 assertTrue( 338 "Running with region replicas under chaos should improve 99.99pct latency. " 339 + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms " 340 + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.", 341 withReplicas9999Mean <= withoutReplicas9999Mean); 342 } 343 344 public static void main(String[] args) throws Exception { 345 Configuration conf = HBaseConfiguration.create(); 346 IntegrationTestingUtility.setUseDistributedCluster(conf); 347 int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args); 348 System.exit(status); 349 } 350}