001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import static java.lang.String.format;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025
026import com.codahale.metrics.Histogram;
027
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Queue;
033import java.util.Set;
034import java.util.concurrent.Callable;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
038import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
039import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
040import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
041import org.apache.hadoop.hbase.chaos.policies.Policy;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.ipc.RpcClient;
044import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
045import org.apache.hadoop.hbase.testclassification.IntegrationTests;
046import org.apache.hadoop.hbase.util.YammerHistogramUtils;
047import org.apache.hadoop.mapreduce.Counters;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.util.ToolRunner;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
055import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
057
058/**
059 * Test for comparing the performance impact of region replicas. Uses
060 * components of {@link PerformanceEvaluation}. Does not run from
061 * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
062 * with the JUnit runner. Hence no @Test annotations either. See {@code -help}
063 * for full list of options.
064 */
065@Category(IntegrationTests.class)
066public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
067
068  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRegionReplicaPerf.class);
069
070  private static final String SLEEP_TIME_KEY = "sleeptime";
071  // short default interval because tests don't run very long.
072  private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
073  private static final String TABLE_NAME_KEY = "tableName";
074  private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
075  private static final String REPLICA_COUNT_KEY = "replicas";
076  private static final String REPLICA_COUNT_DEFAULT = "" + 3;
077  private static final String PRIMARY_TIMEOUT_KEY = "timeout";
078  private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
079  private static final String NUM_RS_KEY = "numRs";
080  private static final String NUM_RS_DEFAULT = "" + 3;
081  public static final String FAMILY_NAME = "info";
082
083  /** Extract a descriptive statistic from a {@link com.codahale.metrics.Histogram}. */
084  private enum Stat {
085    STDEV {
086      @Override
087      double apply(Histogram hist) {
088        return hist.getSnapshot().getStdDev();
089      }
090    },
091    FOUR_9S {
092      @Override
093      double apply(Histogram hist) {
094        return hist.getSnapshot().getValue(0.9999);
095      }
096    };
097
098    abstract double apply(Histogram hist);
099  }
100
101  private TableName tableName;
102  private long sleepTime;
103  private int replicaCount;
104  private int primaryTimeout;
105  private int clusterSize;
106
107  /**
108   * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
109   */
110  static class PerfEvalCallable implements Callable<TimingResult> {
111    private final Queue<String> argv = new LinkedList<>();
112    private final Admin admin;
113
114    public PerfEvalCallable(Admin admin, String argv) {
115      // TODO: this API is awkward, should take Connection, not Admin
116      this.admin = admin;
117      this.argv.addAll(Arrays.asList(argv.split(" ")));
118      LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
119    }
120
121    @Override
122    public TimingResult call() throws Exception {
123      PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
124      PerformanceEvaluation.checkTable(admin, opts);
125      PerformanceEvaluation.RunResult results[] = null;
126      long numRows = opts.totalRows;
127      long elapsedTime = 0;
128      if (opts.nomapred) {
129        results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
130        for (PerformanceEvaluation.RunResult r : results) {
131          elapsedTime = Math.max(elapsedTime, r.duration);
132        }
133      } else {
134        Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
135        Counters counters = job.getCounters();
136        numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
137        elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
138      }
139      return new TimingResult(numRows, elapsedTime, results);
140    }
141  }
142
143  /**
144   * Record the results from a single {@link PerformanceEvaluation} job run.
145   */
146  static class TimingResult {
147    public final long numRows;
148    public final long elapsedTime;
149    public final PerformanceEvaluation.RunResult results[];
150
151    public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
152      this.numRows = numRows;
153      this.elapsedTime = elapsedTime;
154      this.results = results;
155    }
156
157    @Override
158    public String toString() {
159      return MoreObjects.toStringHelper(this)
160        .add("numRows", numRows)
161        .add("elapsedTime", elapsedTime)
162        .toString();
163    }
164  }
165
166  @Override
167  public void setUp() throws Exception {
168    super.setUp();
169    Configuration conf = util.getConfiguration();
170
171    // sanity check cluster
172    // TODO: this should reach out to master and verify online state instead
173    assertEquals("Master must be configured with StochasticLoadBalancer",
174      "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
175      conf.get("hbase.master.loadbalancer.class"));
176    // TODO: this should reach out to master and verify online state instead
177    assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
178      conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
179
180    // enable client-side settings
181    conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
182    // TODO: expose these settings to CLI override
183    conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
184    conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
185  }
186
187  @Override
188  public void setUpCluster() throws Exception {
189    util = getTestingUtil(getConf());
190    util.initializeCluster(clusterSize);
191  }
192
193  @Override
194  public void setUpMonkey() throws Exception {
195    Policy p = new PeriodicRandomActionPolicy(sleepTime,
196      new RestartRandomRsExceptMetaAction(sleepTime),
197      new MoveRandomRegionOfTableAction(tableName));
198    this.monkey = new PolicyBasedChaosMonkey(util, p);
199    // don't start monkey right away
200  }
201
202  @Override
203  protected void addOptions() {
204    addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
205      + TABLE_NAME_DEFAULT + "'");
206    addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
207      + SLEEP_TIME_DEFAULT);
208    addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
209      + REPLICA_COUNT_DEFAULT);
210    addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
211      + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
212    addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
213        + NUM_RS_DEFAULT);
214  }
215
216  @Override
217  protected void processOptions(CommandLine cmd) {
218    tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
219    sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
220    replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
221    primaryTimeout =
222      Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
223    clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
224    LOG.debug(MoreObjects.toStringHelper("Parsed Options")
225      .add(TABLE_NAME_KEY, tableName)
226      .add(SLEEP_TIME_KEY, sleepTime)
227      .add(REPLICA_COUNT_KEY, replicaCount)
228      .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
229      .add(NUM_RS_KEY, clusterSize)
230      .toString());
231  }
232
233  @Override
234  public int runTestFromCommandLine() throws Exception {
235    test();
236    return 0;
237  }
238
239  @Override
240  public TableName getTablename() {
241    return tableName;
242  }
243
244  @Override
245  protected Set<String> getColumnFamilies() {
246    return Sets.newHashSet(FAMILY_NAME);
247  }
248
249  /** Compute the mean of the given {@code stat} from a timing results. */
250  private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
251    double sum = 0;
252    int count = 0;
253
254    for (TimingResult tr : results) {
255      for (PerformanceEvaluation.RunResult r : tr.results) {
256        assertNotNull("One of the run results is missing detailed run data.", r.hist);
257        sum += stat.apply(r.hist);
258        count += 1;
259        LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
260      }
261    }
262    return sum / count;
263  }
264
265  public void test() throws Exception {
266    int maxIters = 3;
267    String replicas = "--replicas=" + replicaCount;
268    // TODO: splits disabled until "phase 2" is complete.
269    String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
270    String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4",
271      splitPolicy, tableName);
272    String readOpts =
273      format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
274    String replicaReadOpts = format("%s %s", replicas, readOpts);
275
276    ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<>(maxIters);
277    ArrayList<TimingResult> resultsWithReplicas = new ArrayList<>(maxIters);
278
279    // create/populate the table, replicas disabled
280    LOG.debug("Populating table.");
281    new PerfEvalCallable(util.getAdmin(), writeOpts).call();
282
283    // one last sanity check, then send in the clowns!
284    assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
285        DisabledRegionSplitPolicy.class.getName(),
286        util.getAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
287    startMonkey();
288
289    // collect a baseline without region replicas.
290    for (int i = 0; i < maxIters; i++) {
291      LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
292      resultsWithoutReplicas.add(new PerfEvalCallable(util.getAdmin(), readOpts).call());
293      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
294      Thread.sleep(5000l);
295    }
296
297    // disable monkey, enable region replicas, enable monkey
298    cleanUpMonkey("Altering table.");
299    LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
300    IntegrationTestingUtility.setReplicas(util.getAdmin(), tableName, replicaCount);
301    setUpMonkey();
302    startMonkey();
303
304    // run test with region replicas.
305    for (int i = 0; i < maxIters; i++) {
306      LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
307      resultsWithReplicas.add(new PerfEvalCallable(util.getAdmin(), replicaReadOpts).call());
308      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
309      Thread.sleep(5000l);
310    }
311
312    // compare the average of the stdev and 99.99pct across runs to determine if region replicas
313    // are having an overall improvement on response variance experienced by clients.
314    double withoutReplicasStdevMean =
315        calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
316    double withoutReplicas9999Mean =
317        calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
318    double withReplicasStdevMean =
319        calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
320    double withReplicas9999Mean =
321        calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
322
323    LOG.info(MoreObjects.toStringHelper(this)
324      .add("withoutReplicas", resultsWithoutReplicas)
325      .add("withReplicas", resultsWithReplicas)
326      .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
327      .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
328      .add("withReplicasStdevMean", withReplicasStdevMean)
329      .add("withReplicas99.99Mean", withReplicas9999Mean)
330      .toString());
331
332    assertTrue(
333      "Running with region replicas under chaos should have less request variance than without. "
334      + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
335      + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
336      withReplicasStdevMean <= withoutReplicasStdevMean);
337    assertTrue(
338        "Running with region replicas under chaos should improve 99.99pct latency. "
339            + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
340            + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
341        withReplicas9999Mean <= withoutReplicas9999Mean);
342  }
343
344  public static void main(String[] args) throws Exception {
345    Configuration conf = HBaseConfiguration.create();
346    IntegrationTestingUtility.setUseDistributedCluster(conf);
347    int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
348    System.exit(status);
349  }
350}