001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos.monkies;
019
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.List;
024import java.util.Objects;
025import java.util.Properties;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.IntegrationTestingUtility;
031import org.apache.hadoop.hbase.chaos.policies.Policy;
032import org.apache.hadoop.hbase.util.Pair;
033
034import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
035
036/**
037 * Chaos monkey that given multiple policies will run actions against the cluster.
038 */
039public class PolicyBasedChaosMonkey extends ChaosMonkey {
040
041  private static final long ONE_SEC = 1000;
042  private static final long ONE_MIN = 60 * ONE_SEC;
043
044  public static final long TIMEOUT = ONE_MIN;
045
046  final IntegrationTestingUtility util;
047  final Properties monkeyProps;
048
049  private final Policy[] policies;
050  private final ExecutorService monkeyThreadPool;
051
052  /**
053   * Construct a new ChaosMonkey
054   * @param util     the HBaseIntegrationTestingUtility already configured
055   * @param policies custom policies to use
056   */
057  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
058    this(null, util, policies);
059  }
060
061  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) {
062    this(null, util, policies);
063  }
064
065  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
066    Collection<Policy> policies) {
067    this(monkeyProps, util, policies.toArray(new Policy[0]));
068  }
069
070  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
071    Policy... policies) {
072    this.monkeyProps = monkeyProps;
073    this.util = Objects.requireNonNull(util);
074    this.policies = Objects.requireNonNull(policies);
075    if (policies.length == 0) {
076      throw new IllegalArgumentException("policies may not be empty");
077    }
078    this.monkeyThreadPool = buildMonkeyThreadPool(policies.length);
079  }
080
081  private static ExecutorService buildMonkeyThreadPool(final int size) {
082    return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setDaemon(false)
083      .setNameFormat("ChaosMonkey-%d").setUncaughtExceptionHandler((t, e) -> {
084        throw new RuntimeException(e);
085      }).build());
086  }
087
088  /** Selects a random item from the given items */
089  public static <T> T selectRandomItem(T[] items) {
090    return items[ThreadLocalRandom.current().nextInt(items.length)];
091  }
092
093  /** Selects a random item from the given items with weights */
094  public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
095    int totalWeight = 0;
096    for (Pair<T, Integer> pair : items) {
097      totalWeight += pair.getSecond();
098    }
099
100    int cutoff = ThreadLocalRandom.current().nextInt(totalWeight);
101    int cummulative = 0;
102    T item = null;
103
104    // warn: O(n)
105    for (int i = 0; i < items.size(); i++) {
106      int curWeight = items.get(i).getSecond();
107      if (cutoff < cummulative + curWeight) {
108        item = items.get(i).getFirst();
109        break;
110      }
111      cummulative += curWeight;
112    }
113
114    return item;
115  }
116
117  /** Selects and returns ceil(ratio * items.length) random items from the given array */
118  public static <T> List<T> selectRandomItems(T[] items, float ratio) {
119    int selectedNumber = (int) Math.ceil(items.length * ratio);
120
121    List<T> originalItems = Arrays.asList(items);
122    Collections.shuffle(originalItems);
123
124    int startIndex = ThreadLocalRandom.current().nextInt(items.length - selectedNumber);
125    return originalItems.subList(startIndex, startIndex + selectedNumber);
126  }
127
128  @Override
129  public void start() throws Exception {
130    final Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, util);
131    for (final Policy policy : policies) {
132      policy.init(context);
133      monkeyThreadPool.execute(policy);
134    }
135  }
136
137  @Override
138  public void stop(String why) {
139    // stop accepting new work (shouldn't be any with a fixed-size pool)
140    monkeyThreadPool.shutdown();
141    // notify all executing policies that it's time to halt.
142    for (Policy policy : policies) {
143      policy.stop(why);
144    }
145  }
146
147  @Override
148  public boolean isStopped() {
149    return monkeyThreadPool.isTerminated();
150  }
151
152  @Override
153  public void waitForStop() throws InterruptedException {
154    monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES);
155  }
156
157  @Override
158  public boolean isDestructive() {
159    // TODO: we can look at the actions, and decide to do the restore cluster or not based on them.
160    return true;
161  }
162}