001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import static java.lang.String.format; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025 026import com.codahale.metrics.Histogram; 027 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Queue; 033import java.util.Set; 034import java.util.concurrent.Callable; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction; 038import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; 039import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 040import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; 041import org.apache.hadoop.hbase.chaos.policies.Policy; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.ipc.RpcClient; 044import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 045import org.apache.hadoop.hbase.testclassification.IntegrationTests; 046import org.apache.hadoop.hbase.util.YammerHistogramUtils; 047import org.apache.hadoop.mapreduce.Counters; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.util.ToolRunner; 050 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 056import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 057import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 058 059/** 060 * Test for comparing the performance impact of region replicas. Uses 061 * components of {@link PerformanceEvaluation}. Does not run from 062 * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible 063 * with the JUnit runner. Hence no @Test annotations either. See {@code -help} 064 * for full list of options. 065 */ 066@Category(IntegrationTests.class) 067public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { 068 069 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRegionReplicaPerf.class); 070 071 private static final String SLEEP_TIME_KEY = "sleeptime"; 072 // short default interval because tests don't run very long. 073 private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l); 074 private static final String TABLE_NAME_KEY = "tableName"; 075 private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf"; 076 private static final String REPLICA_COUNT_KEY = "replicas"; 077 private static final String REPLICA_COUNT_DEFAULT = "" + 3; 078 private static final String PRIMARY_TIMEOUT_KEY = "timeout"; 079 private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms 080 private static final String NUM_RS_KEY = "numRs"; 081 private static final String NUM_RS_DEFAULT = "" + 3; 082 public static final String FAMILY_NAME = "info"; 083 084 /** Extract a descriptive statistic from a {@link com.codahale.metrics.Histogram}. */ 085 private enum Stat { 086 STDEV { 087 @Override 088 double apply(Histogram hist) { 089 return hist.getSnapshot().getStdDev(); 090 } 091 }, 092 FOUR_9S { 093 @Override 094 double apply(Histogram hist) { 095 return hist.getSnapshot().getValue(0.9999); 096 } 097 }; 098 099 abstract double apply(Histogram hist); 100 } 101 102 private TableName tableName; 103 private long sleepTime; 104 private int replicaCount; 105 private int primaryTimeout; 106 private int clusterSize; 107 108 /** 109 * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}. 110 */ 111 static class PerfEvalCallable implements Callable<TimingResult> { 112 private final Queue<String> argv = new LinkedList<>(); 113 private final Admin admin; 114 115 public PerfEvalCallable(Admin admin, String argv) { 116 // TODO: this API is awkward, should take Connection, not Admin 117 this.admin = admin; 118 this.argv.addAll(Arrays.asList(argv.split(" "))); 119 LOG.debug("Created PerformanceEvaluationCallable with args: " + argv); 120 } 121 122 @Override 123 public TimingResult call() throws Exception { 124 PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv); 125 PerformanceEvaluation.checkTable(admin, opts); 126 PerformanceEvaluation.RunResult results[] = null; 127 long numRows = opts.totalRows; 128 long elapsedTime = 0; 129 if (opts.nomapred) { 130 results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration()); 131 for (PerformanceEvaluation.RunResult r : results) { 132 elapsedTime = Math.max(elapsedTime, r.duration); 133 } 134 } else { 135 Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration()); 136 Counters counters = job.getCounters(); 137 numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue(); 138 elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue(); 139 } 140 return new TimingResult(numRows, elapsedTime, results); 141 } 142 } 143 144 /** 145 * Record the results from a single {@link PerformanceEvaluation} job run. 146 */ 147 static class TimingResult { 148 public final long numRows; 149 public final long elapsedTime; 150 public final PerformanceEvaluation.RunResult results[]; 151 152 public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) { 153 this.numRows = numRows; 154 this.elapsedTime = elapsedTime; 155 this.results = results; 156 } 157 158 @Override 159 public String toString() { 160 return MoreObjects.toStringHelper(this) 161 .add("numRows", numRows) 162 .add("elapsedTime", elapsedTime) 163 .toString(); 164 } 165 } 166 167 @Override 168 public void setUp() throws Exception { 169 super.setUp(); 170 Configuration conf = util.getConfiguration(); 171 172 // sanity check cluster 173 // TODO: this should reach out to master and verify online state instead 174 assertEquals("Master must be configured with StochasticLoadBalancer", 175 "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer", 176 conf.get("hbase.master.loadbalancer.class")); 177 // TODO: this should reach out to master and verify online state instead 178 assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.", 179 conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0); 180 181 // enable client-side settings 182 conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true); 183 // TODO: expose these settings to CLI override 184 conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout); 185 conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout); 186 } 187 188 @Override 189 public void setUpCluster() throws Exception { 190 util = getTestingUtil(getConf()); 191 util.initializeCluster(clusterSize); 192 } 193 194 @Override 195 public void setUpMonkey() throws Exception { 196 Policy p = new PeriodicRandomActionPolicy(sleepTime, 197 new RestartRandomRsExceptMetaAction(sleepTime), 198 new MoveRandomRegionOfTableAction(tableName)); 199 this.monkey = new PolicyBasedChaosMonkey(util, p); 200 // don't start monkey right away 201 } 202 203 @Override 204 protected void addOptions() { 205 addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '" 206 + TABLE_NAME_DEFAULT + "'"); 207 addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: " 208 + SLEEP_TIME_DEFAULT); 209 addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: " 210 + REPLICA_COUNT_DEFAULT); 211 addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: " 212 + PRIMARY_TIMEOUT_DEFAULT + " (10ms)"); 213 addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: " 214 + NUM_RS_DEFAULT); 215 } 216 217 @Override 218 protected void processOptions(CommandLine cmd) { 219 tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT)); 220 sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT)); 221 replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT)); 222 primaryTimeout = 223 Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT)); 224 clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT)); 225 LOG.debug(MoreObjects.toStringHelper("Parsed Options") 226 .add(TABLE_NAME_KEY, tableName) 227 .add(SLEEP_TIME_KEY, sleepTime) 228 .add(REPLICA_COUNT_KEY, replicaCount) 229 .add(PRIMARY_TIMEOUT_KEY, primaryTimeout) 230 .add(NUM_RS_KEY, clusterSize) 231 .toString()); 232 } 233 234 @Override 235 public int runTestFromCommandLine() throws Exception { 236 test(); 237 return 0; 238 } 239 240 @Override 241 public TableName getTablename() { 242 return tableName; 243 } 244 245 @Override 246 protected Set<String> getColumnFamilies() { 247 return Sets.newHashSet(FAMILY_NAME); 248 } 249 250 /** Compute the mean of the given {@code stat} from a timing results. */ 251 private static double calcMean(String desc, Stat stat, List<TimingResult> results) { 252 double sum = 0; 253 int count = 0; 254 255 for (TimingResult tr : results) { 256 for (PerformanceEvaluation.RunResult r : tr.results) { 257 assertNotNull("One of the run results is missing detailed run data.", r.hist); 258 sum += stat.apply(r.hist); 259 count += 1; 260 LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}"); 261 } 262 } 263 return sum / count; 264 } 265 266 public void test() throws Exception { 267 int maxIters = 3; 268 String replicas = "--replicas=" + replicaCount; 269 // TODO: splits disabled until "phase 2" is complete. 270 String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName(); 271 String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4", 272 splitPolicy, tableName); 273 String readOpts = 274 format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName); 275 String replicaReadOpts = format("%s %s", replicas, readOpts); 276 277 ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<>(maxIters); 278 ArrayList<TimingResult> resultsWithReplicas = new ArrayList<>(maxIters); 279 280 // create/populate the table, replicas disabled 281 LOG.debug("Populating table."); 282 new PerfEvalCallable(util.getAdmin(), writeOpts).call(); 283 284 // one last sanity check, then send in the clowns! 285 assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.", 286 DisabledRegionSplitPolicy.class.getName(), 287 util.getAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName()); 288 startMonkey(); 289 290 // collect a baseline without region replicas. 291 for (int i = 0; i < maxIters; i++) { 292 LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters); 293 resultsWithoutReplicas.add(new PerfEvalCallable(util.getAdmin(), readOpts).call()); 294 // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? 295 Thread.sleep(5000l); 296 } 297 298 // disable monkey, enable region replicas, enable monkey 299 cleanUpMonkey("Altering table."); 300 LOG.debug("Altering " + tableName + " replica count to " + replicaCount); 301 IntegrationTestingUtility.setReplicas(util.getAdmin(), tableName, replicaCount); 302 setUpMonkey(); 303 startMonkey(); 304 305 // run test with region replicas. 306 for (int i = 0; i < maxIters; i++) { 307 LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters); 308 resultsWithReplicas.add(new PerfEvalCallable(util.getAdmin(), replicaReadOpts).call()); 309 // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? 310 Thread.sleep(5000l); 311 } 312 313 // compare the average of the stdev and 99.99pct across runs to determine if region replicas 314 // are having an overall improvement on response variance experienced by clients. 315 double withoutReplicasStdevMean = 316 calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas); 317 double withoutReplicas9999Mean = 318 calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas); 319 double withReplicasStdevMean = 320 calcMean("withReplicas", Stat.STDEV, resultsWithReplicas); 321 double withReplicas9999Mean = 322 calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas); 323 324 LOG.info(MoreObjects.toStringHelper(this) 325 .add("withoutReplicas", resultsWithoutReplicas) 326 .add("withReplicas", resultsWithReplicas) 327 .add("withoutReplicasStdevMean", withoutReplicasStdevMean) 328 .add("withoutReplicas99.99Mean", withoutReplicas9999Mean) 329 .add("withReplicasStdevMean", withReplicasStdevMean) 330 .add("withReplicas99.99Mean", withReplicas9999Mean) 331 .toString()); 332 333 assertTrue( 334 "Running with region replicas under chaos should have less request variance than without. " 335 + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms " 336 + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.", 337 withReplicasStdevMean <= withoutReplicasStdevMean); 338 assertTrue( 339 "Running with region replicas under chaos should improve 99.99pct latency. " 340 + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms " 341 + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.", 342 withReplicas9999Mean <= withoutReplicas9999Mean); 343 } 344 345 public static void main(String[] args) throws Exception { 346 Configuration conf = HBaseConfiguration.create(); 347 IntegrationTestingUtility.setUseDistributedCluster(conf); 348 int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args); 349 System.exit(status); 350 } 351}