001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.chaos.monkies; 020 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import java.util.Properties; 026 027import org.apache.commons.lang3.RandomUtils; 028import org.apache.hadoop.hbase.IntegrationTestingUtility; 029import org.apache.hadoop.hbase.chaos.policies.Policy; 030import org.apache.hadoop.hbase.util.Pair; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Chaos monkey that given multiple policies will run actions against the cluster. 036 */ 037public class PolicyBasedChaosMonkey extends ChaosMonkey { 038 039 private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class); 040 private static final long ONE_SEC = 1000; 041 private static final long FIVE_SEC = 5 * ONE_SEC; 042 private static final long ONE_MIN = 60 * ONE_SEC; 043 044 public static final long TIMEOUT = ONE_MIN; 045 046 final IntegrationTestingUtility util; 047 final Properties monkeyProps; 048 049 /** 050 * Construct a new ChaosMonkey 051 * @param util the HBaseIntegrationTestingUtility already configured 052 * @param policies custom policies to use 053 */ 054 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) { 055 this(null, util, policies); 056 } 057 058 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) { 059 this(null, util, policies); 060 } 061 062 public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util, 063 Policy... policies) { 064 this.monkeyProps = monkeyProps; 065 this.util = util; 066 this.policies = policies; 067 } 068 069 public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util, 070 Collection<Policy> policies) { 071 this.monkeyProps = monkeyProps; 072 this.util = util; 073 this.policies = policies.toArray(new Policy[policies.size()]); 074 } 075 076 077 /** Selects a random item from the given items */ 078 public static <T> T selectRandomItem(T[] items) { 079 return items[RandomUtils.nextInt(0, items.length)]; 080 } 081 082 /** Selects a random item from the given items with weights*/ 083 public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) { 084 int totalWeight = 0; 085 for (Pair<T, Integer> pair : items) { 086 totalWeight += pair.getSecond(); 087 } 088 089 int cutoff = RandomUtils.nextInt(0, totalWeight); 090 int cummulative = 0; 091 T item = null; 092 093 //warn: O(n) 094 for (int i=0; i<items.size(); i++) { 095 int curWeight = items.get(i).getSecond(); 096 if ( cutoff < cummulative + curWeight) { 097 item = items.get(i).getFirst(); 098 break; 099 } 100 cummulative += curWeight; 101 } 102 103 return item; 104 } 105 106 /** Selects and returns ceil(ratio * items.length) random items from the given array */ 107 public static <T> List<T> selectRandomItems(T[] items, float ratio) { 108 int selectedNumber = (int)Math.ceil(items.length * ratio); 109 110 List<T> originalItems = Arrays.asList(items); 111 Collections.shuffle(originalItems); 112 113 int startIndex = RandomUtils.nextInt(0, items.length - selectedNumber); 114 return originalItems.subList(startIndex, startIndex + selectedNumber); 115 } 116 117 private Policy[] policies; 118 private Thread[] monkeyThreads; 119 120 @Override 121 public void start() throws Exception { 122 monkeyThreads = new Thread[policies.length]; 123 Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, this.util); 124 for (int i=0; i<policies.length; i++) { 125 policies[i].init(context); 126 Thread monkeyThread = new Thread(policies[i], "ChaosMonkey"); 127 monkeyThread.start(); 128 monkeyThreads[i] = monkeyThread; 129 } 130 } 131 132 @Override 133 public void stop(String why) { 134 if (policies == null) { 135 return; 136 } 137 138 for (Policy policy : policies) { 139 policy.stop(why); 140 } 141 } 142 143 @Override 144 public boolean isStopped() { 145 return policies[0].isStopped(); 146 } 147 148 /** 149 * Wait for ChaosMonkey to stop. 150 * @throws InterruptedException 151 */ 152 @Override 153 public void waitForStop() throws InterruptedException { 154 if (monkeyThreads == null) { 155 return; 156 } 157 for (Thread monkeyThread : monkeyThreads) { 158 // TODO: bound the wait time per policy 159 monkeyThread.join(); 160 } 161 } 162 163 @Override 164 public boolean isDestructive() { 165 // TODO: we can look at the actions, and decide to do the restore cluster or not based on them. 166 return true; 167 } 168}