001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.chaos.monkies;
020
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.List;
025import java.util.Objects;
026import java.util.Properties;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.TimeUnit;
030import org.apache.commons.lang3.RandomUtils;
031import org.apache.hadoop.hbase.IntegrationTestingUtility;
032import org.apache.hadoop.hbase.chaos.policies.Policy;
033import org.apache.hadoop.hbase.util.Pair;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
037
038/**
039 * Chaos monkey that given multiple policies will run actions against the cluster.
040 */
041public class PolicyBasedChaosMonkey extends ChaosMonkey {
042
043  private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class);
044  private static final long ONE_SEC = 1000;
045  private static final long ONE_MIN = 60 * ONE_SEC;
046
047  public static final long TIMEOUT = ONE_MIN;
048
049  final IntegrationTestingUtility util;
050  final Properties monkeyProps;
051
052  private final Policy[] policies;
053  private final ExecutorService monkeyThreadPool;
054
055  /**
056   * Construct a new ChaosMonkey
057   * @param util the HBaseIntegrationTestingUtility already configured
058   * @param policies custom policies to use
059   */
060  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
061    this(null, util, policies);
062  }
063
064  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) {
065    this(null, util, policies);
066  }
067
068  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
069    Collection<Policy> policies) {
070    this(monkeyProps, util, policies.toArray(new Policy[0]));
071  }
072
073  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
074    Policy... policies) {
075    this.monkeyProps = monkeyProps;
076    this.util = Objects.requireNonNull(util);
077    this.policies = Objects.requireNonNull(policies);
078    if (policies.length == 0) {
079      throw new IllegalArgumentException("policies may not be empty");
080    }
081    this.monkeyThreadPool = buildMonkeyThreadPool(policies.length);
082  }
083
084  private static ExecutorService buildMonkeyThreadPool(final int size) {
085    return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder()
086      .setDaemon(false)
087      .setNameFormat("ChaosMonkey-%d")
088      .setUncaughtExceptionHandler((t, e) -> {
089        throw new RuntimeException(e);
090      })
091      .build());
092  }
093
094  /** Selects a random item from the given items */
095  public static <T> T selectRandomItem(T[] items) {
096    return items[RandomUtils.nextInt(0, items.length)];
097  }
098
099  /** Selects a random item from the given items with weights*/
100  public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
101    int totalWeight = 0;
102    for (Pair<T, Integer> pair : items) {
103      totalWeight += pair.getSecond();
104    }
105
106    int cutoff = RandomUtils.nextInt(0, totalWeight);
107    int cummulative = 0;
108    T item = null;
109
110    //warn: O(n)
111    for (int i=0; i<items.size(); i++) {
112      int curWeight = items.get(i).getSecond();
113      if ( cutoff < cummulative + curWeight) {
114        item = items.get(i).getFirst();
115        break;
116      }
117      cummulative += curWeight;
118    }
119
120    return item;
121  }
122
123  /** Selects and returns ceil(ratio * items.length) random items from the given array */
124  public static <T> List<T> selectRandomItems(T[] items, float ratio) {
125    int selectedNumber = (int)Math.ceil(items.length * ratio);
126
127    List<T> originalItems = Arrays.asList(items);
128    Collections.shuffle(originalItems);
129
130    int startIndex = RandomUtils.nextInt(0, items.length - selectedNumber);
131    return originalItems.subList(startIndex, startIndex + selectedNumber);
132  }
133
134  @Override
135  public void start() throws Exception {
136    final Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, util);
137    for (final Policy policy : policies) {
138      policy.init(context);
139      monkeyThreadPool.execute(policy);
140    }
141  }
142
143  @Override
144  public void stop(String why) {
145    // stop accepting new work (shouldn't be any with a fixed-size pool)
146    monkeyThreadPool.shutdown();
147    // notify all executing policies that it's time to halt.
148    for (Policy policy : policies) {
149      policy.stop(why);
150    }
151  }
152
153  @Override
154  public boolean isStopped() {
155    return monkeyThreadPool.isTerminated();
156  }
157
158  @Override
159  public void waitForStop() throws InterruptedException {
160    monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES);
161  }
162
163  @Override
164  public boolean isDestructive() {
165    // TODO: we can look at the actions, and decide to do the restore cluster or not based on them.
166    return true;
167  }
168}