001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static java.lang.String.format;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import com.codahale.metrics.Histogram;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Queue;
031import java.util.Set;
032import java.util.concurrent.Callable;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
035import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
036import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
037import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
038import org.apache.hadoop.hbase.chaos.policies.Policy;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.ipc.RpcClient;
041import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
042import org.apache.hadoop.hbase.testclassification.IntegrationTests;
043import org.apache.hadoop.hbase.util.YammerHistogramUtils;
044import org.apache.hadoop.mapreduce.Counters;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.util.ToolRunner;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
052import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
054
055/**
056 * Test for comparing the performance impact of region replicas. Uses components of
057 * {@link PerformanceEvaluation}. Does not run from {@code IntegrationTestsDriver} because
058 * IntegrationTestBase is incompatible with the JUnit runner. Hence no @Test annotations either. See
059 * {@code -help} for full list of options.
060 */
061@Category(IntegrationTests.class)
062public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
063
064  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRegionReplicaPerf.class);
065
066  private static final String SLEEP_TIME_KEY = "sleeptime";
067  // short default interval because tests don't run very long.
068  private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
069  private static final String TABLE_NAME_KEY = "tableName";
070  private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
071  private static final String REPLICA_COUNT_KEY = "replicas";
072  private static final String REPLICA_COUNT_DEFAULT = "" + 3;
073  private static final String PRIMARY_TIMEOUT_KEY = "timeout";
074  private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
075  private static final String NUM_RS_KEY = "numRs";
076  private static final String NUM_RS_DEFAULT = "" + 3;
077  public static final String FAMILY_NAME = "info";
078
079  /** Extract a descriptive statistic from a {@link com.codahale.metrics.Histogram}. */
080  private enum Stat {
081    STDEV {
082      @Override
083      double apply(Histogram hist) {
084        return hist.getSnapshot().getStdDev();
085      }
086    },
087    FOUR_9S {
088      @Override
089      double apply(Histogram hist) {
090        return hist.getSnapshot().getValue(0.9999);
091      }
092    };
093
094    abstract double apply(Histogram hist);
095  }
096
097  private TableName tableName;
098  private long sleepTime;
099  private int replicaCount;
100  private int primaryTimeout;
101  private int clusterSize;
102
103  /**
104   * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
105   */
106  static class PerfEvalCallable implements Callable<TimingResult> {
107    private final Queue<String> argv = new ArrayDeque<>();
108    private final Admin admin;
109
110    public PerfEvalCallable(Admin admin, String argv) {
111      // TODO: this API is awkward, should take Connection, not Admin
112      this.admin = admin;
113      this.argv.addAll(Arrays.asList(argv.split(" ")));
114      LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
115    }
116
117    @Override
118    public TimingResult call() throws Exception {
119      PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
120      PerformanceEvaluation.checkTable(admin, opts);
121      PerformanceEvaluation.RunResult results[] = null;
122      long numRows = opts.totalRows;
123      long elapsedTime = 0;
124      if (opts.nomapred) {
125        results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
126        for (PerformanceEvaluation.RunResult r : results) {
127          elapsedTime = Math.max(elapsedTime, r.duration);
128        }
129      } else {
130        Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
131        Counters counters = job.getCounters();
132        numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
133        elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
134      }
135      return new TimingResult(numRows, elapsedTime, results);
136    }
137  }
138
139  /**
140   * Record the results from a single {@link PerformanceEvaluation} job run.
141   */
142  static class TimingResult {
143    public final long numRows;
144    public final long elapsedTime;
145    public final PerformanceEvaluation.RunResult results[];
146
147    public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
148      this.numRows = numRows;
149      this.elapsedTime = elapsedTime;
150      this.results = results;
151    }
152
153    @Override
154    public String toString() {
155      return MoreObjects.toStringHelper(this).add("numRows", numRows)
156        .add("elapsedTime", elapsedTime).toString();
157    }
158  }
159
160  @Override
161  public void setUp() throws Exception {
162    super.setUp();
163    Configuration conf = util.getConfiguration();
164
165    // sanity check cluster
166    // TODO: this should reach out to master and verify online state instead
167    assertEquals("Master must be configured with StochasticLoadBalancer",
168      "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
169      conf.get("hbase.master.loadbalancer.class"));
170    // TODO: this should reach out to master and verify online state instead
171    assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
172      conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
173
174    // enable client-side settings
175    conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
176    // TODO: expose these settings to CLI override
177    conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
178    conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
179  }
180
181  @Override
182  public void setUpCluster() throws Exception {
183    util = getTestingUtil(getConf());
184    util.initializeCluster(clusterSize);
185  }
186
187  @Override
188  public void setUpMonkey() throws Exception {
189    Policy p = new PeriodicRandomActionPolicy(sleepTime,
190      new RestartRandomRsExceptMetaAction(sleepTime), new MoveRandomRegionOfTableAction(tableName));
191    this.monkey = new PolicyBasedChaosMonkey(util, p);
192    // don't start monkey right away
193  }
194
195  @Override
196  protected void addOptions() {
197    addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '" + TABLE_NAME_DEFAULT + "'");
198    addOptWithArg(SLEEP_TIME_KEY,
199      "How long the monkey sleeps between actions. Default: " + SLEEP_TIME_DEFAULT);
200    addOptWithArg(REPLICA_COUNT_KEY,
201      "Number of region replicas. Default: " + REPLICA_COUNT_DEFAULT);
202    addOptWithArg(PRIMARY_TIMEOUT_KEY,
203      "Overrides hbase.client.primaryCallTimeout. Default: " + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
204    addOptWithArg(NUM_RS_KEY,
205      "Specify the number of RegionServers to use. Default: " + NUM_RS_DEFAULT);
206  }
207
208  @Override
209  protected void processOptions(CommandLine cmd) {
210    tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
211    sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
212    replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
213    primaryTimeout =
214      Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
215    clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
216    LOG.debug(MoreObjects.toStringHelper("Parsed Options").add(TABLE_NAME_KEY, tableName)
217      .add(SLEEP_TIME_KEY, sleepTime).add(REPLICA_COUNT_KEY, replicaCount)
218      .add(PRIMARY_TIMEOUT_KEY, primaryTimeout).add(NUM_RS_KEY, clusterSize).toString());
219  }
220
221  @Override
222  public int runTestFromCommandLine() throws Exception {
223    test();
224    return 0;
225  }
226
227  @Override
228  public TableName getTablename() {
229    return tableName;
230  }
231
232  @Override
233  protected Set<String> getColumnFamilies() {
234    return Sets.newHashSet(FAMILY_NAME);
235  }
236
237  /** Compute the mean of the given {@code stat} from a timing results. */
238  private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
239    double sum = 0;
240    int count = 0;
241
242    for (TimingResult tr : results) {
243      for (PerformanceEvaluation.RunResult r : tr.results) {
244        assertNotNull("One of the run results is missing detailed run data.", r.hist);
245        sum += stat.apply(r.hist);
246        count += 1;
247        LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
248      }
249    }
250    return sum / count;
251  }
252
253  public void test() throws Exception {
254    int maxIters = 3;
255    String replicas = "--replicas=" + replicaCount;
256    // TODO: splits disabled until "phase 2" is complete.
257    String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
258    String writeOpts =
259      format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4", splitPolicy, tableName);
260    String readOpts =
261      format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
262    String replicaReadOpts = format("%s %s", replicas, readOpts);
263
264    ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<>(maxIters);
265    ArrayList<TimingResult> resultsWithReplicas = new ArrayList<>(maxIters);
266
267    // create/populate the table, replicas disabled
268    LOG.debug("Populating table.");
269    new PerfEvalCallable(util.getAdmin(), writeOpts).call();
270
271    // one last sanity check, then send in the clowns!
272    assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
273      DisabledRegionSplitPolicy.class.getName(),
274      util.getAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
275    startMonkey();
276
277    // collect a baseline without region replicas.
278    for (int i = 0; i < maxIters; i++) {
279      LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
280      resultsWithoutReplicas.add(new PerfEvalCallable(util.getAdmin(), readOpts).call());
281      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
282      Thread.sleep(5000l);
283    }
284
285    // disable monkey, enable region replicas, enable monkey
286    cleanUpMonkey("Altering table.");
287    LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
288    IntegrationTestingUtility.setReplicas(util.getAdmin(), tableName, replicaCount);
289    setUpMonkey();
290    startMonkey();
291
292    // run test with region replicas.
293    for (int i = 0; i < maxIters; i++) {
294      LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
295      resultsWithReplicas.add(new PerfEvalCallable(util.getAdmin(), replicaReadOpts).call());
296      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
297      Thread.sleep(5000l);
298    }
299
300    // compare the average of the stdev and 99.99pct across runs to determine if region replicas
301    // are having an overall improvement on response variance experienced by clients.
302    double withoutReplicasStdevMean =
303      calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
304    double withoutReplicas9999Mean =
305      calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
306    double withReplicasStdevMean = calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
307    double withReplicas9999Mean = calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
308
309    LOG.info(MoreObjects.toStringHelper(this).add("withoutReplicas", resultsWithoutReplicas)
310      .add("withReplicas", resultsWithReplicas)
311      .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
312      .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
313      .add("withReplicasStdevMean", withReplicasStdevMean)
314      .add("withReplicas99.99Mean", withReplicas9999Mean).toString());
315
316    assertTrue(
317      "Running with region replicas under chaos should have less request variance than without. "
318        + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
319        + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
320      withReplicasStdevMean <= withoutReplicasStdevMean);
321    assertTrue(
322      "Running with region replicas under chaos should improve 99.99pct latency. "
323        + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
324        + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
325      withReplicas9999Mean <= withoutReplicas9999Mean);
326  }
327
328  public static void main(String[] args) throws Exception {
329    Configuration conf = HBaseConfiguration.create();
330    IntegrationTestingUtility.setUseDistributedCluster(conf);
331    int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
332    System.exit(status);
333  }
334}