001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.chaos.monkies; 020 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025 026import org.apache.commons.lang3.RandomUtils; 027import org.apache.hadoop.hbase.IntegrationTestingUtility; 028import org.apache.hadoop.hbase.chaos.policies.Policy; 029import org.apache.hadoop.hbase.util.Pair; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Chaos monkey that given multiple policies will run actions against the cluster. 035 */ 036public class PolicyBasedChaosMonkey extends ChaosMonkey { 037 038 private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class); 039 private static final long ONE_SEC = 1000; 040 private static final long FIVE_SEC = 5 * ONE_SEC; 041 private static final long ONE_MIN = 60 * ONE_SEC; 042 043 public static final long TIMEOUT = ONE_MIN; 044 045 final IntegrationTestingUtility util; 046 047 /** 048 * Construct a new ChaosMonkey 049 * @param util the HBaseIntegrationTestingUtility already configured 050 * @param policies custom policies to use 051 */ 052 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) { 053 this.util = util; 054 this.policies = policies; 055 } 056 057 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) { 058 this.util = util; 059 this.policies = policies.toArray(new Policy[policies.size()]); 060 } 061 062 063 /** Selects a random item from the given items */ 064 public static <T> T selectRandomItem(T[] items) { 065 return items[RandomUtils.nextInt(0, items.length)]; 066 } 067 068 /** Selects a random item from the given items with weights*/ 069 public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) { 070 int totalWeight = 0; 071 for (Pair<T, Integer> pair : items) { 072 totalWeight += pair.getSecond(); 073 } 074 075 int cutoff = RandomUtils.nextInt(0, totalWeight); 076 int cummulative = 0; 077 T item = null; 078 079 //warn: O(n) 080 for (int i=0; i<items.size(); i++) { 081 int curWeight = items.get(i).getSecond(); 082 if ( cutoff < cummulative + curWeight) { 083 item = items.get(i).getFirst(); 084 break; 085 } 086 cummulative += curWeight; 087 } 088 089 return item; 090 } 091 092 /** Selects and returns ceil(ratio * items.length) random items from the given array */ 093 public static <T> List<T> selectRandomItems(T[] items, float ratio) { 094 int selectedNumber = (int)Math.ceil(items.length * ratio); 095 096 List<T> originalItems = Arrays.asList(items); 097 Collections.shuffle(originalItems); 098 099 int startIndex = RandomUtils.nextInt(0, items.length - selectedNumber); 100 return originalItems.subList(startIndex, startIndex + selectedNumber); 101 } 102 103 private Policy[] policies; 104 private Thread[] monkeyThreads; 105 106 @Override 107 public void start() throws Exception { 108 monkeyThreads = new Thread[policies.length]; 109 110 for (int i=0; i<policies.length; i++) { 111 policies[i].init(new Policy.PolicyContext(this.util)); 112 Thread monkeyThread = new Thread(policies[i], "ChaosMonkey"); 113 monkeyThread.start(); 114 monkeyThreads[i] = monkeyThread; 115 } 116 } 117 118 @Override 119 public void stop(String why) { 120 if (policies == null) { 121 return; 122 } 123 124 for (Policy policy : policies) { 125 policy.stop(why); 126 } 127 } 128 129 @Override 130 public boolean isStopped() { 131 return policies[0].isStopped(); 132 } 133 134 /** 135 * Wait for ChaosMonkey to stop. 136 * @throws InterruptedException 137 */ 138 @Override 139 public void waitForStop() throws InterruptedException { 140 if (monkeyThreads == null) { 141 return; 142 } 143 for (Thread monkeyThread : monkeyThreads) { 144 // TODO: bound the wait time per policy 145 monkeyThread.join(); 146 } 147 } 148 149 @Override 150 public boolean isDestructive() { 151 // TODO: we can look at the actions, and decide to do the restore cluster or not based on them. 152 return true; 153 } 154}