001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.chaos.monkies; 020 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import java.util.Objects; 026import java.util.Properties; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.TimeUnit; 030import org.apache.commons.lang3.RandomUtils; 031import org.apache.hadoop.hbase.IntegrationTestingUtility; 032import org.apache.hadoop.hbase.chaos.policies.Policy; 033import org.apache.hadoop.hbase.util.Pair; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 037 038/** 039 * Chaos monkey that given multiple policies will run actions against the cluster. 040 */ 041public class PolicyBasedChaosMonkey extends ChaosMonkey { 042 043 private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class); 044 private static final long ONE_SEC = 1000; 045 private static final long ONE_MIN = 60 * ONE_SEC; 046 047 public static final long TIMEOUT = ONE_MIN; 048 049 final IntegrationTestingUtility util; 050 final Properties monkeyProps; 051 052 private final Policy[] policies; 053 private final ExecutorService monkeyThreadPool; 054 055 /** 056 * Construct a new ChaosMonkey 057 * @param util the HBaseIntegrationTestingUtility already configured 058 * @param policies custom policies to use 059 */ 060 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) { 061 this(null, util, policies); 062 } 063 064 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) { 065 this(null, util, policies); 066 } 067 068 public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util, 069 Collection<Policy> policies) { 070 this(monkeyProps, util, policies.toArray(new Policy[0])); 071 } 072 073 public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util, 074 Policy... policies) { 075 this.monkeyProps = monkeyProps; 076 this.util = Objects.requireNonNull(util); 077 this.policies = Objects.requireNonNull(policies); 078 if (policies.length == 0) { 079 throw new IllegalArgumentException("policies may not be empty"); 080 } 081 this.monkeyThreadPool = buildMonkeyThreadPool(policies.length); 082 } 083 084 private static ExecutorService buildMonkeyThreadPool(final int size) { 085 return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder() 086 .setDaemon(false) 087 .setNameFormat("ChaosMonkey-%d") 088 .setUncaughtExceptionHandler((t, e) -> { 089 throw new RuntimeException(e); 090 }) 091 .build()); 092 } 093 094 /** Selects a random item from the given items */ 095 public static <T> T selectRandomItem(T[] items) { 096 return items[RandomUtils.nextInt(0, items.length)]; 097 } 098 099 /** Selects a random item from the given items with weights*/ 100 public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) { 101 int totalWeight = 0; 102 for (Pair<T, Integer> pair : items) { 103 totalWeight += pair.getSecond(); 104 } 105 106 int cutoff = RandomUtils.nextInt(0, totalWeight); 107 int cummulative = 0; 108 T item = null; 109 110 //warn: O(n) 111 for (int i=0; i<items.size(); i++) { 112 int curWeight = items.get(i).getSecond(); 113 if ( cutoff < cummulative + curWeight) { 114 item = items.get(i).getFirst(); 115 break; 116 } 117 cummulative += curWeight; 118 } 119 120 return item; 121 } 122 123 /** Selects and returns ceil(ratio * items.length) random items from the given array */ 124 public static <T> List<T> selectRandomItems(T[] items, float ratio) { 125 int selectedNumber = (int)Math.ceil(items.length * ratio); 126 127 List<T> originalItems = Arrays.asList(items); 128 Collections.shuffle(originalItems); 129 130 int startIndex = RandomUtils.nextInt(0, items.length - selectedNumber); 131 return originalItems.subList(startIndex, startIndex + selectedNumber); 132 } 133 134 @Override 135 public void start() throws Exception { 136 final Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, util); 137 for (final Policy policy : policies) { 138 policy.init(context); 139 monkeyThreadPool.execute(policy); 140 } 141 } 142 143 @Override 144 public void stop(String why) { 145 // stop accepting new work (shouldn't be any with a fixed-size pool) 146 monkeyThreadPool.shutdown(); 147 // notify all executing policies that it's time to halt. 148 for (Policy policy : policies) { 149 policy.stop(why); 150 } 151 } 152 153 @Override 154 public boolean isStopped() { 155 return monkeyThreadPool.isTerminated(); 156 } 157 158 @Override 159 public void waitForStop() throws InterruptedException { 160 monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES); 161 } 162 163 @Override 164 public boolean isDestructive() { 165 // TODO: we can look at the actions, and decide to do the restore cluster or not based on them. 166 return true; 167 } 168}