001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.chaos.monkies; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024 025import org.apache.commons.lang3.RandomUtils; 026import org.apache.hadoop.hbase.IntegrationTestingUtility; 027import org.apache.hadoop.hbase.chaos.policies.Policy; 028import org.apache.hadoop.hbase.util.Pair; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Chaos monkey that given multiple policies will run actions against the cluster. 034 */ 035public class PolicyBasedChaosMonkey extends ChaosMonkey { 036 037 private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class); 038 private static final long ONE_SEC = 1000; 039 private static final long FIVE_SEC = 5 * ONE_SEC; 040 private static final long ONE_MIN = 60 * ONE_SEC; 041 042 public static final long TIMEOUT = ONE_MIN; 043 044 final IntegrationTestingUtility util; 045 046 /** 047 * Construct a new ChaosMonkey 048 * @param util the HBaseIntegrationTestingUtility already configured 049 * @param policies custom policies to use 050 */ 051 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) { 052 this.util = util; 053 this.policies = policies; 054 } 055 056 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) { 057 this.util = util; 058 this.policies = policies.toArray(new Policy[policies.size()]); 059 } 060 061 062 /** Selects a random item from the given items */ 063 public static <T> T selectRandomItem(T[] items) { 064 return items[RandomUtils.nextInt(0, items.length)]; 065 } 066 067 /** Selects a random item from the given items with weights*/ 068 public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) { 069 int totalWeight = 0; 070 for (Pair<T, Integer> pair : items) { 071 totalWeight += pair.getSecond(); 072 } 073 074 int cutoff = RandomUtils.nextInt(0, totalWeight); 075 int cummulative = 0; 076 T item = null; 077 078 //warn: O(n) 079 for (int i=0; i<items.size(); i++) { 080 int curWeight = items.get(i).getSecond(); 081 if ( cutoff < cummulative + curWeight) { 082 item = items.get(i).getFirst(); 083 break; 084 } 085 cummulative += curWeight; 086 } 087 088 return item; 089 } 090 091 /** Selects and returns ceil(ratio * items.length) random items from the given array */ 092 public static <T> List<T> selectRandomItems(T[] items, float ratio) { 093 int remaining = (int)Math.ceil(items.length * ratio); 094 095 List<T> selectedItems = new ArrayList<>(remaining); 096 097 for (int i=0; i<items.length && remaining > 0; i++) { 098 if (RandomUtils.nextFloat() < ((float)remaining/(items.length-i))) { 099 selectedItems.add(items[i]); 100 remaining--; 101 } 102 } 103 104 return selectedItems; 105 } 106 107 private Policy[] policies; 108 private Thread[] monkeyThreads; 109 110 @Override 111 public void start() throws Exception { 112 monkeyThreads = new Thread[policies.length]; 113 114 for (int i=0; i<policies.length; i++) { 115 policies[i].init(new Policy.PolicyContext(this.util)); 116 Thread monkeyThread = new Thread(policies[i], "ChaosMonkey"); 117 monkeyThread.start(); 118 monkeyThreads[i] = monkeyThread; 119 } 120 } 121 122 @Override 123 public void stop(String why) { 124 if (policies == null) { 125 return; 126 } 127 128 for (Policy policy : policies) { 129 policy.stop(why); 130 } 131 } 132 133 @Override 134 public boolean isStopped() { 135 return policies[0].isStopped(); 136 } 137 138 /** 139 * Wait for ChaosMonkey to stop. 140 * @throws InterruptedException 141 */ 142 @Override 143 public void waitForStop() throws InterruptedException { 144 if (monkeyThreads == null) { 145 return; 146 } 147 for (Thread monkeyThread : monkeyThreads) { 148 // TODO: bound the wait time per policy 149 monkeyThread.join(); 150 } 151 } 152 153 @Override 154 public boolean isDestructive() { 155 // TODO: we can look at the actions, and decide to do the restore cluster or not based on them. 156 return true; 157 } 158}