001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.chaos; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.util.Pair; 027import org.apache.hadoop.hbase.util.RetryCounter; 028import org.apache.hadoop.hbase.util.RetryCounterFactory; 029import org.apache.hadoop.util.Shell; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.zookeeper.AsyncCallback; 032import org.apache.zookeeper.CreateMode; 033import org.apache.zookeeper.KeeperException; 034import org.apache.zookeeper.WatchedEvent; 035import org.apache.zookeeper.Watcher; 036import org.apache.zookeeper.ZooDefs; 037import org.apache.zookeeper.ZooKeeper; 038import org.apache.zookeeper.data.Stat; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/*** 043 * An agent for executing destructive actions for ChaosMonkey. Uses ZooKeeper Watchers and 044 * LocalShell, to do the killing and getting status of service on targeted host without SSH. uses 045 * given ZNode Structure: /perfChaosTest (root) | | /chaosAgents (Used for registration has hostname 046 * ephemeral nodes as children) | | /chaosAgentTaskStatus (Used for task Execution, has hostname 047 * persistent nodes as child with tasks as their children) | | /hostname | | /task0000001 (command 048 * as data) (has two types of command : 1: starts with "exec" for executing a destructive action. 2: 049 * starts with "bool" for getting only status of service. 050 */ 051@InterfaceAudience.Private 052public class ChaosAgent implements Watcher, Closeable, Runnable { 053 054 private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class); 055 static AtomicBoolean stopChaosAgent = new AtomicBoolean(); 056 private ZooKeeper zk; 057 private String quorum; 058 private String agentName; 059 private Configuration conf; 060 private RetryCounterFactory retryCounterFactory; 061 private volatile boolean connected = false; 062 063 public ChaosAgent(Configuration conf, String quorum, String agentName) { 064 initChaosAgent(conf, quorum, agentName); 065 } 066 067 /*** 068 * sets global params and initiates connection with ZooKeeper then does registration. 069 * @param conf initial configuration to use 070 * @param quorum ZK Quorum 071 * @param agentName AgentName to use 072 */ 073 private void initChaosAgent(Configuration conf, String quorum, String agentName) { 074 this.conf = conf; 075 this.quorum = quorum; 076 this.agentName = agentName; 077 this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig() 078 .setMaxAttempts( 079 conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY, ChaosConstants.DEFAULT_RETRY_ATTEMPTS)) 080 .setSleepInterval(conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY, 081 ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL))); 082 try { 083 this.createZKConnection(null); 084 this.register(); 085 } catch (IOException e) { 086 LOG.error("Error Creating Connection: " + e); 087 } 088 } 089 090 /*** 091 * Creates Connection with ZooKeeper. 092 * @throws IOException if something goes wrong 093 */ 094 private void createZKConnection(Watcher watcher) throws IOException { 095 if (watcher == null) { 096 zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this); 097 } else { 098 zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher); 099 } 100 LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName); 101 } 102 103 // WATCHERS: Below are the Watches used by ChaosAgent 104 105 /*** 106 * Watcher for notifying if any task is assigned to agent or not, by seeking if any Node is being 107 * added to agent as Child. 108 */ 109 Watcher newTaskCreatedWatcher = new Watcher() { 110 @Override 111 public void process(WatchedEvent watchedEvent) { 112 if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { 113 if ( 114 !(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + ChaosConstants.ZNODE_PATH_SEPARATOR 115 + agentName).equals(watchedEvent.getPath()) 116 ) { 117 throw new RuntimeException( 118 KeeperException.create(KeeperException.Code.DATAINCONSISTENCY)); 119 } 120 121 LOG.info("Change in Tasks Node, checking for Tasks again."); 122 getTasks(); 123 } 124 125 } 126 }; 127 128 // CALLBACKS: Below are the Callbacks used by Chaos Agent 129 130 /** 131 * Callback used while setting status of a given task, Logs given status. 132 */ 133 AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, stat) -> { 134 switch (KeeperException.Code.get(rc)) { 135 case CONNECTIONLOSS: 136 // Connection to the server was lost while setting status setting again. 137 try { 138 recreateZKConnection(); 139 } catch (Exception e) { 140 break; 141 } 142 setStatusOfTaskZNode(path, (String) ctx); 143 break; 144 145 case OK: 146 LOG.info("Status of Task has been set"); 147 break; 148 149 case NONODE: 150 LOG.error("Chaos Agent status node does not exists: " 151 + "check for ZNode directory structure again."); 152 break; 153 154 default: 155 LOG.error("Error while setting status of task ZNode: " + path, 156 KeeperException.create(KeeperException.Code.get(rc), path)); 157 } 158 }; 159 160 /** 161 * Callback used while creating a Persistent ZNode tries to create ZNode again if Connection was 162 * lost in previous try. 163 */ 164 AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> { 165 switch (KeeperException.Code.get(rc)) { 166 case CONNECTIONLOSS: 167 try { 168 recreateZKConnection(); 169 } catch (Exception e) { 170 break; 171 } 172 createZNode(path, (byte[]) ctx); 173 break; 174 case OK: 175 LOG.info("ZNode created : " + path); 176 break; 177 case NODEEXISTS: 178 LOG.warn("ZNode already registered: " + path); 179 break; 180 default: 181 LOG.error("Error occurred while creating Persistent ZNode: " + path, 182 KeeperException.create(KeeperException.Code.get(rc), path)); 183 } 184 }; 185 186 /** 187 * Callback used while creating a Ephemeral ZNode tries to create ZNode again if Connection was 188 * lost in previous try. 189 */ 190 AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, name) -> { 191 switch (KeeperException.Code.get(rc)) { 192 case CONNECTIONLOSS: 193 try { 194 recreateZKConnection(); 195 } catch (Exception e) { 196 break; 197 } 198 createEphemeralZNode(path, (byte[]) ctx); 199 break; 200 case OK: 201 LOG.info("ZNode created : " + path); 202 break; 203 case NODEEXISTS: 204 LOG.warn("ZNode already registered: " + path); 205 break; 206 default: 207 LOG.error("Error occurred while creating Ephemeral ZNode: ", 208 KeeperException.create(KeeperException.Code.get(rc), path)); 209 } 210 }; 211 212 /** 213 * Callback used by getTasksForAgentCallback while getting command, after getting command 214 * successfully, it executes command and set its status with respect to the command type. 215 */ 216 AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() { 217 @Override 218 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 219 switch (KeeperException.Code.get(rc)) { 220 case CONNECTIONLOSS: 221 // Connection to the server has been lost while getting task, getting data again. 222 try { 223 recreateZKConnection(); 224 } catch (Exception e) { 225 break; 226 } 227 zk.getData(path, false, getTaskForExecutionCallback, new String(data)); 228 break; 229 case OK: 230 String cmd = new String(data); 231 LOG.info("Executing command : " + cmd); 232 String status = ChaosConstants.TASK_COMPLETION_STRING; 233 try { 234 String user = 235 conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER, ChaosConstants.DEFAULT_SHELL_USER); 236 switch (cmd.substring(0, 4)) { 237 case "bool": 238 String ret = execWithRetries(user, cmd.substring(4)).getSecond(); 239 status = Boolean.toString(ret.length() > 0); 240 break; 241 242 case "exec": 243 execWithRetries(user, cmd.substring(4)); 244 break; 245 246 default: 247 LOG.error("Unknown Command Type"); 248 status = ChaosConstants.TASK_ERROR_STRING; 249 } 250 } catch (IOException e) { 251 LOG.error("Got error while executing command : " + cmd + " On agent : " + agentName 252 + " Error : " + e); 253 status = ChaosConstants.TASK_ERROR_STRING; 254 } 255 256 try { 257 setStatusOfTaskZNode(path, status); 258 Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME); 259 } catch (InterruptedException e) { 260 LOG.error("Error occured after setting status: " + e); 261 } 262 263 default: 264 LOG.error("Error occurred while getting data", 265 KeeperException.create(KeeperException.Code.get(rc), path)); 266 } 267 } 268 }; 269 270 /*** 271 * Callback used while getting Tasks for agent if call executed without Exception, It creates a 272 * separate thread for each children to execute given Tasks parallely. 273 */ 274 AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() { 275 @Override 276 public void processResult(int rc, String path, Object ctx, List<String> children) { 277 switch (KeeperException.Code.get(rc)) { 278 case CONNECTIONLOSS: { 279 // Connection to the server has been lost, getting tasks again. 280 try { 281 recreateZKConnection(); 282 } catch (Exception e) { 283 break; 284 } 285 getTasks(); 286 break; 287 } 288 289 case OK: { 290 if (children != null) { 291 try { 292 293 LOG.info("Executing each task as a separate thread"); 294 List<Thread> tasksList = new ArrayList<>(); 295 for (String task : children) { 296 String threadName = agentName + "_" + task; 297 Thread t = new Thread(() -> { 298 299 LOG.info("Executing task : " + task + " of agent : " + agentName); 300 zk.getData( 301 ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE 302 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName 303 + ChaosConstants.ZNODE_PATH_SEPARATOR + task, 304 false, getTaskForExecutionCallback, task); 305 306 }); 307 t.setName(threadName); 308 t.start(); 309 tasksList.add(t); 310 311 for (Thread thread : tasksList) { 312 thread.join(); 313 } 314 } 315 } catch (InterruptedException e) { 316 LOG.error( 317 "Error scheduling next task : " + " for agent : " + agentName + " Error : " + e); 318 } 319 } 320 break; 321 } 322 323 default: 324 LOG.error("Error occurred while getting task", 325 KeeperException.create(KeeperException.Code.get(rc), path)); 326 } 327 } 328 }; 329 330 /*** 331 * Function to create PERSISTENT ZNODE with given path and data given as params 332 * @param path Path at which ZNode to create 333 * @param data Data to put under ZNode 334 */ 335 public void createZNode(String path, byte[] data) { 336 zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createZNodeCallback, 337 data); 338 } 339 340 /*** 341 * Function to create EPHEMERAL ZNODE with given path and data as params. 342 * @param path Path at which Ephemeral ZNode to create 343 * @param data Data to put under ZNode 344 */ 345 public void createEphemeralZNode(String path, byte[] data) { 346 zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 347 createEphemeralZNodeCallback, data); 348 } 349 350 /** 351 * Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same. 352 * @param path Path to check for ZNode 353 */ 354 private void createIfZNodeNotExists(String path) { 355 try { 356 if (zk.exists(path, false) == null) { 357 createZNode(path, new byte[0]); 358 } 359 } catch (KeeperException | InterruptedException e) { 360 LOG.error("Error checking given node : " + path + " " + e); 361 } 362 } 363 364 /** 365 * sets given Status for Task Znode 366 * @param taskZNode ZNode to set status 367 * @param status Status value 368 */ 369 public void setStatusOfTaskZNode(String taskZNode, String status) { 370 LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status); 371 zk.setData(taskZNode, status.getBytes(), -1, setStatusOfTaskZNodeCallback, null); 372 } 373 374 /** 375 * registration of ChaosAgent by checking and creating necessary ZNodes. 376 */ 377 private void register() { 378 createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE); 379 createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE); 380 createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE); 381 createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE 382 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName); 383 384 createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE 385 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]); 386 } 387 388 /*** 389 * Gets tasks for execution, basically sets Watch on it's respective host's Znode and waits for 390 * tasks to be assigned, also has a getTasksForAgentCallback which handles execution of task. 391 */ 392 private void getTasks() { 393 LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for new Tasks"); 394 zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE 395 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, newTaskCreatedWatcher, 396 getTasksForAgentCallback, null); 397 } 398 399 /** 400 * Below function executes command with retries with given user. Uses LocalShell to execute a 401 * command. 402 * @param user user name, default none 403 * @param cmd Command to execute 404 * @return A pair of Exit Code and Shell output 405 * @throws IOException Exception while executing shell command 406 */ 407 private Pair<Integer, String> execWithRetries(String user, String cmd) throws IOException { 408 RetryCounter retryCounter = retryCounterFactory.create(); 409 while (true) { 410 try { 411 return exec(user, cmd); 412 } catch (IOException e) { 413 retryOrThrow(retryCounter, e, user, cmd); 414 } 415 try { 416 retryCounter.sleepUntilNextRetry(); 417 } catch (InterruptedException e) { 418 LOG.warn("Sleep Interrupted: " + e); 419 } 420 } 421 } 422 423 private Pair<Integer, String> exec(String user, String cmd) throws IOException { 424 LOG.info("Executing Shell command: " + cmd + " , user: " + user); 425 426 LocalShell shell = new LocalShell(user, cmd); 427 try { 428 shell.execute(); 429 } catch (Shell.ExitCodeException e) { 430 String output = shell.getOutput(); 431 throw new Shell.ExitCodeException(e.getExitCode(), 432 "stderr: " + e.getMessage() + ", stdout: " + output); 433 } 434 LOG.info("Executed Shell command, exit code: {}, output n{}", shell.getExitCode(), 435 shell.getOutput()); 436 437 return new Pair<>(shell.getExitCode(), shell.getOutput()); 438 } 439 440 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String user, 441 String cmd) throws E { 442 if (retryCounter.shouldRetry()) { 443 LOG.warn( 444 "Local command: {}, user: {}, failed at attempt {}. Retrying until maxAttempts: {}." 445 + "Exception {}", 446 cmd, user, retryCounter.getAttemptTimes(), retryCounter.getMaxAttempts(), ex.getMessage()); 447 return; 448 } 449 throw ex; 450 } 451 452 private boolean isConnected() { 453 return connected; 454 } 455 456 @Override 457 public void close() throws IOException { 458 LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName); 459 try { 460 zk.close(); 461 } catch (InterruptedException e) { 462 LOG.error("Error while closing ZooKeeper Connection."); 463 } 464 } 465 466 @Override 467 public void run() { 468 try { 469 LOG.info("Running Chaos Agent on : " + agentName); 470 while (!this.isConnected()) { 471 Thread.sleep(100); 472 } 473 this.getTasks(); 474 while (!stopChaosAgent.get()) { 475 Thread.sleep(500); 476 } 477 } catch (InterruptedException e) { 478 LOG.error("Error while running Chaos Agent", e); 479 } 480 481 } 482 483 @Override 484 public void process(WatchedEvent watchedEvent) { 485 LOG.info("Processing event: " + watchedEvent.toString()); 486 if (watchedEvent.getType() == Event.EventType.None) { 487 switch (watchedEvent.getState()) { 488 case SyncConnected: 489 connected = true; 490 break; 491 case Disconnected: 492 connected = false; 493 break; 494 case Expired: 495 connected = false; 496 LOG.error("Session expired creating again"); 497 try { 498 createZKConnection(null); 499 } catch (IOException e) { 500 LOG.error("Error creating Zookeeper connection", e); 501 } 502 default: 503 LOG.error("Unknown State"); 504 break; 505 } 506 } 507 } 508 509 private void recreateZKConnection() throws Exception { 510 try { 511 zk.close(); 512 createZKConnection(newTaskCreatedWatcher); 513 createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE 514 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]); 515 } catch (IOException e) { 516 LOG.error("Error creating new ZK COnnection for agent: {}", agentName + e); 517 throw e; 518 } 519 } 520 521 /** 522 * Executes Command locally. 523 */ 524 protected static class LocalShell extends Shell.ShellCommandExecutor { 525 526 private String user; 527 private String execCommand; 528 529 public LocalShell(String user, String execCommand) { 530 super(new String[] { execCommand }); 531 this.user = user; 532 this.execCommand = execCommand; 533 } 534 535 @Override 536 public String[] getExecString() { 537 // TODO: Considering Agent is running with same user. 538 if (!user.equals(ChaosConstants.DEFAULT_SHELL_USER)) { 539 execCommand = String.format("su -u %1$s %2$s", user, execCommand); 540 } 541 return new String[] { "/usr/bin/env", "bash", "-c", execCommand }; 542 } 543 544 @Override 545 public void execute() throws IOException { 546 super.execute(); 547 } 548 } 549}