001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.chaos.actions; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Map; 027import java.util.function.BiConsumer; 028import java.util.function.Consumer; 029import org.apache.commons.lang3.RandomUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.HBaseCluster; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HRegionInfo; 035import org.apache.hadoop.hbase.IntegrationTestingUtility; 036import org.apache.hadoop.hbase.ServerMetrics; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A (possibly mischievous) action that the ChaosMonkey can perform. 051 */ 052public class Action { 053 054 public static final String KILL_MASTER_TIMEOUT_KEY = 055 "hbase.chaosmonkey.action.killmastertimeout"; 056 public static final String START_MASTER_TIMEOUT_KEY = 057 "hbase.chaosmonkey.action.startmastertimeout"; 058 public static final String KILL_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.killrstimeout"; 059 public static final String START_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.startrstimeout"; 060 public static final String KILL_ZK_NODE_TIMEOUT_KEY = 061 "hbase.chaosmonkey.action.killzknodetimeout"; 062 public static final String START_ZK_NODE_TIMEOUT_KEY = 063 "hbase.chaosmonkey.action.startzknodetimeout"; 064 public static final String KILL_DATANODE_TIMEOUT_KEY = 065 "hbase.chaosmonkey.action.killdatanodetimeout"; 066 public static final String START_DATANODE_TIMEOUT_KEY = 067 "hbase.chaosmonkey.action.startdatanodetimeout"; 068 069 protected static final Logger LOG = LoggerFactory.getLogger(Action.class); 070 071 protected static final long KILL_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 072 protected static final long START_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 073 protected static final long KILL_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 074 protected static final long START_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 075 protected static final long KILL_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 076 protected static final long START_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 077 protected static final long KILL_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 078 protected static final long START_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT; 079 080 protected ActionContext context; 081 protected HBaseCluster cluster; 082 protected ClusterMetrics initialStatus; 083 protected ServerName[] initialServers; 084 085 protected long killMasterTimeout; 086 protected long startMasterTimeout; 087 protected long killRsTimeout; 088 protected long startRsTimeout; 089 protected long killZkNodeTimeout; 090 protected long startZkNodeTimeout; 091 protected long killDataNodeTimeout; 092 protected long startDataNodeTimeout; 093 094 public void init(ActionContext context) throws IOException { 095 this.context = context; 096 cluster = context.getHBaseCluster(); 097 initialStatus = cluster.getInitialClusterMetrics(); 098 Collection<ServerName> regionServers = initialStatus.getLiveServerMetrics().keySet(); 099 initialServers = regionServers.toArray(new ServerName[regionServers.size()]); 100 101 killMasterTimeout = cluster.getConf().getLong(KILL_MASTER_TIMEOUT_KEY, 102 KILL_MASTER_TIMEOUT_DEFAULT); 103 startMasterTimeout = cluster.getConf().getLong(START_MASTER_TIMEOUT_KEY, 104 START_MASTER_TIMEOUT_DEFAULT); 105 killRsTimeout = cluster.getConf().getLong(KILL_RS_TIMEOUT_KEY, KILL_RS_TIMEOUT_DEFAULT); 106 startRsTimeout = cluster.getConf().getLong(START_RS_TIMEOUT_KEY, START_RS_TIMEOUT_DEFAULT); 107 killZkNodeTimeout = cluster.getConf().getLong(KILL_ZK_NODE_TIMEOUT_KEY, 108 KILL_ZK_NODE_TIMEOUT_DEFAULT); 109 startZkNodeTimeout = cluster.getConf().getLong(START_ZK_NODE_TIMEOUT_KEY, 110 START_ZK_NODE_TIMEOUT_DEFAULT); 111 killDataNodeTimeout = cluster.getConf().getLong(KILL_DATANODE_TIMEOUT_KEY, 112 KILL_DATANODE_TIMEOUT_DEFAULT); 113 startDataNodeTimeout = cluster.getConf().getLong(START_DATANODE_TIMEOUT_KEY, 114 START_DATANODE_TIMEOUT_DEFAULT); 115 } 116 117 public void perform() throws Exception { } 118 119 /** Returns current region servers - active master */ 120 protected ServerName[] getCurrentServers() throws IOException { 121 ClusterMetrics clusterStatus = cluster.getClusterMetrics(); 122 Collection<ServerName> regionServers = clusterStatus.getLiveServerMetrics().keySet(); 123 int count = regionServers == null ? 0 : regionServers.size(); 124 if (count <= 0) { 125 return new ServerName [] {}; 126 } 127 ServerName master = clusterStatus.getMasterName(); 128 if (master == null || !regionServers.contains(master)) { 129 return regionServers.toArray(new ServerName[count]); 130 } 131 if (count == 1) { 132 return new ServerName [] {}; 133 } 134 ArrayList<ServerName> tmp = new ArrayList<>(count); 135 tmp.addAll(regionServers); 136 tmp.remove(master); 137 return tmp.toArray(new ServerName[count-1]); 138 } 139 140 protected void killMaster(ServerName server) throws IOException { 141 LOG.info("Killing master " + server); 142 cluster.killMaster(server); 143 cluster.waitForMasterToStop(server, killMasterTimeout); 144 LOG.info("Killed master " + server); 145 } 146 147 protected void startMaster(ServerName server) throws IOException { 148 LOG.info("Starting master " + server.getHostname()); 149 cluster.startMaster(server.getHostname(), server.getPort()); 150 cluster.waitForActiveAndReadyMaster(startMasterTimeout); 151 LOG.info("Started master " + server.getHostname()); 152 } 153 154 protected void killRs(ServerName server) throws IOException { 155 LOG.info("Killing regionserver " + server); 156 cluster.killRegionServer(server); 157 cluster.waitForRegionServerToStop(server, killRsTimeout); 158 LOG.info("Killed regionserver " + server + ". Reported num of rs:" 159 + cluster.getClusterMetrics().getLiveServerMetrics().size()); 160 } 161 162 protected void startRs(ServerName server) throws IOException { 163 LOG.info("Starting regionserver " + server.getAddress()); 164 cluster.startRegionServer(server.getHostname(), server.getPort()); 165 cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), startRsTimeout); 166 LOG.info("Started regionserver " + server.getAddress() + ". Reported num of rs:" 167 + cluster.getClusterMetrics().getLiveServerMetrics().size()); 168 } 169 170 protected void killZKNode(ServerName server) throws IOException { 171 LOG.info("Killing zookeeper node " + server); 172 cluster.killZkNode(server); 173 cluster.waitForZkNodeToStop(server, killZkNodeTimeout); 174 LOG.info("Killed zookeeper node " + server + ". Reported num of rs:" 175 + cluster.getClusterMetrics().getLiveServerMetrics().size()); 176 } 177 178 protected void startZKNode(ServerName server) throws IOException { 179 LOG.info("Starting zookeeper node " + server.getHostname()); 180 cluster.startZkNode(server.getHostname(), server.getPort()); 181 cluster.waitForZkNodeToStart(server, startZkNodeTimeout); 182 LOG.info("Started zookeeper node " + server); 183 } 184 185 protected void killDataNode(ServerName server) throws IOException { 186 LOG.info("Killing datanode " + server); 187 cluster.killDataNode(server); 188 cluster.waitForDataNodeToStop(server, killDataNodeTimeout); 189 LOG.info("Killed datanode " + server + ". Reported num of rs:" 190 + cluster.getClusterMetrics().getLiveServerMetrics().size()); 191 } 192 193 protected void startDataNode(ServerName server) throws IOException { 194 LOG.info("Starting datanode " + server.getHostname()); 195 cluster.startDataNode(server); 196 cluster.waitForDataNodeToStart(server, startDataNodeTimeout); 197 LOG.info("Started datanode " + server); 198 } 199 200 protected void unbalanceRegions(ClusterMetrics clusterStatus, 201 List<ServerName> fromServers, List<ServerName> toServers, 202 double fractionOfRegions) throws Exception { 203 List<byte[]> victimRegions = new LinkedList<>(); 204 for (Map.Entry<ServerName, ServerMetrics> entry 205 : clusterStatus.getLiveServerMetrics().entrySet()) { 206 ServerName sn = entry.getKey(); 207 ServerMetrics serverLoad = entry.getValue(); 208 // Ugh. 209 List<byte[]> regions = new LinkedList<>(serverLoad.getRegionMetrics().keySet()); 210 int victimRegionCount = (int)Math.ceil(fractionOfRegions * regions.size()); 211 LOG.debug("Removing " + victimRegionCount + " regions from " + sn); 212 for (int i = 0; i < victimRegionCount; ++i) { 213 int victimIx = RandomUtils.nextInt(0, regions.size()); 214 String regionId = HRegionInfo.encodeRegionName(regions.remove(victimIx)); 215 victimRegions.add(Bytes.toBytes(regionId)); 216 } 217 } 218 219 LOG.info("Moving " + victimRegions.size() + " regions from " + fromServers.size() 220 + " servers to " + toServers.size() + " different servers"); 221 Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin(); 222 for (byte[] victimRegion : victimRegions) { 223 // Don't keep moving regions if we're 224 // trying to stop the monkey. 225 if (context.isStopping()) { 226 break; 227 } 228 int targetIx = RandomUtils.nextInt(0, toServers.size()); 229 admin.move(victimRegion, Bytes.toBytes(toServers.get(targetIx).getServerName())); 230 } 231 } 232 233 protected void forceBalancer() throws Exception { 234 Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin(); 235 boolean result = false; 236 try { 237 result = admin.balancer(); 238 } catch (Exception e) { 239 LOG.warn("Got exception while doing balance ", e); 240 } 241 if (!result) { 242 LOG.error("Balancer didn't succeed"); 243 } 244 } 245 246 public Configuration getConf() { 247 return cluster.getConf(); 248 } 249 250 /** 251 * Apply a transform to all columns in a given table. If there are no columns in a table or if the context is stopping does nothing. 252 * @param tableName the table to modify 253 * @param transform the modification to perform. Callers will have the column name as a string and a column family builder available to them 254 */ 255 protected void modifyAllTableColumns(TableName tableName, BiConsumer<String, ColumnFamilyDescriptorBuilder> transform) throws IOException { 256 HBaseTestingUtility util = this.context.getHBaseIntegrationTestingUtility(); 257 Admin admin = util.getAdmin(); 258 259 TableDescriptor tableDescriptor = admin.getDescriptor(tableName); 260 ColumnFamilyDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies(); 261 262 if (columnDescriptors == null || columnDescriptors.length == 0) { 263 return; 264 } 265 266 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor); 267 for (ColumnFamilyDescriptor descriptor : columnDescriptors) { 268 ColumnFamilyDescriptorBuilder cfd = ColumnFamilyDescriptorBuilder.newBuilder(descriptor); 269 transform.accept(descriptor.getNameAsString(), cfd); 270 builder.modifyColumnFamily(cfd.build()); 271 } 272 273 // Don't try the modify if we're stopping 274 if (this.context.isStopping()) { 275 return; 276 } 277 admin.modifyTable(builder.build()); 278 } 279 280 /** 281 * Apply a transform to all columns in a given table. If there are no columns in a table or if the context is stopping does nothing. 282 * @param tableName the table to modify 283 * @param transform the modification to perform on each column family descriptor builder 284 */ 285 protected void modifyAllTableColumns(TableName tableName, Consumer<ColumnFamilyDescriptorBuilder> transform) throws IOException { 286 modifyAllTableColumns(tableName, (name, cfd) -> transform.accept(cfd)); 287 } 288 289 /** 290 * Context for Action's 291 */ 292 public static class ActionContext { 293 private IntegrationTestingUtility util; 294 295 public ActionContext(IntegrationTestingUtility util) { 296 this.util = util; 297 } 298 299 public IntegrationTestingUtility getHBaseIntegrationTestingUtility() { 300 return util; 301 } 302 303 public HBaseCluster getHBaseCluster() { 304 return util.getHBaseClusterInterface(); 305 } 306 307 public boolean isStopping() { 308 return false; 309 } 310 } 311}