001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import static java.lang.String.format;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025
026import com.codahale.metrics.Histogram;
027
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Queue;
033import java.util.Set;
034import java.util.concurrent.Callable;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
038import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
039import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
040import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
041import org.apache.hadoop.hbase.chaos.policies.Policy;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.ipc.RpcClient;
044import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
045import org.apache.hadoop.hbase.testclassification.IntegrationTests;
046import org.apache.hadoop.hbase.util.YammerHistogramUtils;
047import org.apache.hadoop.mapreduce.Counters;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.util.ToolRunner;
050
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
056import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
058
059/**
060 * Test for comparing the performance impact of region replicas. Uses
061 * components of {@link PerformanceEvaluation}. Does not run from
062 * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
063 * with the JUnit runner. Hence no @Test annotations either. See {@code -help}
064 * for full list of options.
065 */
066@Category(IntegrationTests.class)
067public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
068
069  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRegionReplicaPerf.class);
070
071  private static final String SLEEP_TIME_KEY = "sleeptime";
072  // short default interval because tests don't run very long.
073  private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
074  private static final String TABLE_NAME_KEY = "tableName";
075  private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
076  private static final String REPLICA_COUNT_KEY = "replicas";
077  private static final String REPLICA_COUNT_DEFAULT = "" + 3;
078  private static final String PRIMARY_TIMEOUT_KEY = "timeout";
079  private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
080  private static final String NUM_RS_KEY = "numRs";
081  private static final String NUM_RS_DEFAULT = "" + 3;
082  public static final String FAMILY_NAME = "info";
083
084  /** Extract a descriptive statistic from a {@link com.codahale.metrics.Histogram}. */
085  private enum Stat {
086    STDEV {
087      @Override
088      double apply(Histogram hist) {
089        return hist.getSnapshot().getStdDev();
090      }
091    },
092    FOUR_9S {
093      @Override
094      double apply(Histogram hist) {
095        return hist.getSnapshot().getValue(0.9999);
096      }
097    };
098
099    abstract double apply(Histogram hist);
100  }
101
102  private TableName tableName;
103  private long sleepTime;
104  private int replicaCount;
105  private int primaryTimeout;
106  private int clusterSize;
107
108  /**
109   * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
110   */
111  static class PerfEvalCallable implements Callable<TimingResult> {
112    private final Queue<String> argv = new LinkedList<>();
113    private final Admin admin;
114
115    public PerfEvalCallable(Admin admin, String argv) {
116      // TODO: this API is awkward, should take Connection, not Admin
117      this.admin = admin;
118      this.argv.addAll(Arrays.asList(argv.split(" ")));
119      LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
120    }
121
122    @Override
123    public TimingResult call() throws Exception {
124      PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
125      PerformanceEvaluation.checkTable(admin, opts);
126      PerformanceEvaluation.RunResult results[] = null;
127      long numRows = opts.totalRows;
128      long elapsedTime = 0;
129      if (opts.nomapred) {
130        results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
131        for (PerformanceEvaluation.RunResult r : results) {
132          elapsedTime = Math.max(elapsedTime, r.duration);
133        }
134      } else {
135        Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
136        Counters counters = job.getCounters();
137        numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
138        elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
139      }
140      return new TimingResult(numRows, elapsedTime, results);
141    }
142  }
143
144  /**
145   * Record the results from a single {@link PerformanceEvaluation} job run.
146   */
147  static class TimingResult {
148    public final long numRows;
149    public final long elapsedTime;
150    public final PerformanceEvaluation.RunResult results[];
151
152    public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
153      this.numRows = numRows;
154      this.elapsedTime = elapsedTime;
155      this.results = results;
156    }
157
158    @Override
159    public String toString() {
160      return MoreObjects.toStringHelper(this)
161        .add("numRows", numRows)
162        .add("elapsedTime", elapsedTime)
163        .toString();
164    }
165  }
166
167  @Override
168  public void setUp() throws Exception {
169    super.setUp();
170    Configuration conf = util.getConfiguration();
171
172    // sanity check cluster
173    // TODO: this should reach out to master and verify online state instead
174    assertEquals("Master must be configured with StochasticLoadBalancer",
175      "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
176      conf.get("hbase.master.loadbalancer.class"));
177    // TODO: this should reach out to master and verify online state instead
178    assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
179      conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
180
181    // enable client-side settings
182    conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
183    // TODO: expose these settings to CLI override
184    conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
185    conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
186  }
187
188  @Override
189  public void setUpCluster() throws Exception {
190    util = getTestingUtil(getConf());
191    util.initializeCluster(clusterSize);
192  }
193
194  @Override
195  public void setUpMonkey() throws Exception {
196    Policy p = new PeriodicRandomActionPolicy(sleepTime,
197      new RestartRandomRsExceptMetaAction(sleepTime),
198      new MoveRandomRegionOfTableAction(tableName));
199    this.monkey = new PolicyBasedChaosMonkey(util, p);
200    // don't start monkey right away
201  }
202
203  @Override
204  protected void addOptions() {
205    addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
206      + TABLE_NAME_DEFAULT + "'");
207    addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
208      + SLEEP_TIME_DEFAULT);
209    addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
210      + REPLICA_COUNT_DEFAULT);
211    addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
212      + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
213    addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
214        + NUM_RS_DEFAULT);
215  }
216
217  @Override
218  protected void processOptions(CommandLine cmd) {
219    tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
220    sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
221    replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
222    primaryTimeout =
223      Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
224    clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
225    LOG.debug(MoreObjects.toStringHelper("Parsed Options")
226      .add(TABLE_NAME_KEY, tableName)
227      .add(SLEEP_TIME_KEY, sleepTime)
228      .add(REPLICA_COUNT_KEY, replicaCount)
229      .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
230      .add(NUM_RS_KEY, clusterSize)
231      .toString());
232  }
233
234  @Override
235  public int runTestFromCommandLine() throws Exception {
236    test();
237    return 0;
238  }
239
240  @Override
241  public TableName getTablename() {
242    return tableName;
243  }
244
245  @Override
246  protected Set<String> getColumnFamilies() {
247    return Sets.newHashSet(FAMILY_NAME);
248  }
249
250  /** Compute the mean of the given {@code stat} from a timing results. */
251  private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
252    double sum = 0;
253    int count = 0;
254
255    for (TimingResult tr : results) {
256      for (PerformanceEvaluation.RunResult r : tr.results) {
257        assertNotNull("One of the run results is missing detailed run data.", r.hist);
258        sum += stat.apply(r.hist);
259        count += 1;
260        LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
261      }
262    }
263    return sum / count;
264  }
265
266  public void test() throws Exception {
267    int maxIters = 3;
268    String replicas = "--replicas=" + replicaCount;
269    // TODO: splits disabled until "phase 2" is complete.
270    String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
271    String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4",
272      splitPolicy, tableName);
273    String readOpts =
274      format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
275    String replicaReadOpts = format("%s %s", replicas, readOpts);
276
277    ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<>(maxIters);
278    ArrayList<TimingResult> resultsWithReplicas = new ArrayList<>(maxIters);
279
280    // create/populate the table, replicas disabled
281    LOG.debug("Populating table.");
282    new PerfEvalCallable(util.getAdmin(), writeOpts).call();
283
284    // one last sanity check, then send in the clowns!
285    assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
286        DisabledRegionSplitPolicy.class.getName(),
287        util.getAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
288    startMonkey();
289
290    // collect a baseline without region replicas.
291    for (int i = 0; i < maxIters; i++) {
292      LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
293      resultsWithoutReplicas.add(new PerfEvalCallable(util.getAdmin(), readOpts).call());
294      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
295      Thread.sleep(5000l);
296    }
297
298    // disable monkey, enable region replicas, enable monkey
299    cleanUpMonkey("Altering table.");
300    LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
301    IntegrationTestingUtility.setReplicas(util.getAdmin(), tableName, replicaCount);
302    setUpMonkey();
303    startMonkey();
304
305    // run test with region replicas.
306    for (int i = 0; i < maxIters; i++) {
307      LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
308      resultsWithReplicas.add(new PerfEvalCallable(util.getAdmin(), replicaReadOpts).call());
309      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
310      Thread.sleep(5000l);
311    }
312
313    // compare the average of the stdev and 99.99pct across runs to determine if region replicas
314    // are having an overall improvement on response variance experienced by clients.
315    double withoutReplicasStdevMean =
316        calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
317    double withoutReplicas9999Mean =
318        calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
319    double withReplicasStdevMean =
320        calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
321    double withReplicas9999Mean =
322        calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
323
324    LOG.info(MoreObjects.toStringHelper(this)
325      .add("withoutReplicas", resultsWithoutReplicas)
326      .add("withReplicas", resultsWithReplicas)
327      .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
328      .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
329      .add("withReplicasStdevMean", withReplicasStdevMean)
330      .add("withReplicas99.99Mean", withReplicas9999Mean)
331      .toString());
332
333    assertTrue(
334      "Running with region replicas under chaos should have less request variance than without. "
335      + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
336      + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
337      withReplicasStdevMean <= withoutReplicasStdevMean);
338    assertTrue(
339        "Running with region replicas under chaos should improve 99.99pct latency. "
340            + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
341            + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
342        withReplicas9999Mean <= withoutReplicas9999Mean);
343  }
344
345  public static void main(String[] args) throws Exception {
346    Configuration conf = HBaseConfiguration.create();
347    IntegrationTestingUtility.setUseDistributedCluster(conf);
348    int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
349    System.exit(status);
350  }
351}