001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.chaos.monkies;
020
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.List;
025import java.util.Properties;
026
027import org.apache.commons.lang3.RandomUtils;
028import org.apache.hadoop.hbase.IntegrationTestingUtility;
029import org.apache.hadoop.hbase.chaos.policies.Policy;
030import org.apache.hadoop.hbase.util.Pair;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Chaos monkey that given multiple policies will run actions against the cluster.
036 */
037public class PolicyBasedChaosMonkey extends ChaosMonkey {
038
039  private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class);
040  private static final long ONE_SEC = 1000;
041  private static final long FIVE_SEC = 5 * ONE_SEC;
042  private static final long ONE_MIN = 60 * ONE_SEC;
043
044  public static final long TIMEOUT = ONE_MIN;
045
046  final IntegrationTestingUtility util;
047  final Properties monkeyProps;
048
049  /**
050   * Construct a new ChaosMonkey
051   * @param util the HBaseIntegrationTestingUtility already configured
052   * @param policies custom policies to use
053   */
054  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
055    this(null, util, policies);
056  }
057
058  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) {
059    this(null, util, policies);
060  }
061
062  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
063    Policy... policies) {
064    this.monkeyProps = monkeyProps;
065    this.util = util;
066    this.policies = policies;
067  }
068
069  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
070    Collection<Policy> policies) {
071    this.monkeyProps = monkeyProps;
072    this.util = util;
073    this.policies = policies.toArray(new Policy[policies.size()]);
074  }
075
076
077  /** Selects a random item from the given items */
078  public static <T> T selectRandomItem(T[] items) {
079    return items[RandomUtils.nextInt(0, items.length)];
080  }
081
082  /** Selects a random item from the given items with weights*/
083  public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
084    int totalWeight = 0;
085    for (Pair<T, Integer> pair : items) {
086      totalWeight += pair.getSecond();
087    }
088
089    int cutoff = RandomUtils.nextInt(0, totalWeight);
090    int cummulative = 0;
091    T item = null;
092
093    //warn: O(n)
094    for (int i=0; i<items.size(); i++) {
095      int curWeight = items.get(i).getSecond();
096      if ( cutoff < cummulative + curWeight) {
097        item = items.get(i).getFirst();
098        break;
099      }
100      cummulative += curWeight;
101    }
102
103    return item;
104  }
105
106  /** Selects and returns ceil(ratio * items.length) random items from the given array */
107  public static <T> List<T> selectRandomItems(T[] items, float ratio) {
108    int selectedNumber = (int)Math.ceil(items.length * ratio);
109
110    List<T> originalItems = Arrays.asList(items);
111    Collections.shuffle(originalItems);
112
113    int startIndex = RandomUtils.nextInt(0, items.length - selectedNumber);
114    return originalItems.subList(startIndex, startIndex + selectedNumber);
115  }
116
117  private Policy[] policies;
118  private Thread[] monkeyThreads;
119
120  @Override
121  public void start() throws Exception {
122    monkeyThreads = new Thread[policies.length];
123    Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, this.util);
124    for (int i=0; i<policies.length; i++) {
125      policies[i].init(context);
126      Thread monkeyThread = new Thread(policies[i], "ChaosMonkey");
127      monkeyThread.start();
128      monkeyThreads[i] = monkeyThread;
129    }
130  }
131
132  @Override
133  public void stop(String why) {
134    if (policies == null) {
135      return;
136    }
137
138    for (Policy policy : policies) {
139      policy.stop(why);
140    }
141  }
142
143  @Override
144  public boolean isStopped() {
145    return policies[0].isStopped();
146  }
147
148  /**
149   * Wait for ChaosMonkey to stop.
150   * @throws InterruptedException
151   */
152  @Override
153  public void waitForStop() throws InterruptedException {
154    if (monkeyThreads == null) {
155      return;
156    }
157    for (Thread monkeyThread : monkeyThreads) {
158      // TODO: bound the wait time per policy
159      monkeyThread.join();
160    }
161  }
162
163  @Override
164  public boolean isDestructive() {
165    // TODO: we can look at the actions, and decide to do the restore cluster or not based on them.
166    return true;
167  }
168}