001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.chaos; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.nio.charset.StandardCharsets; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicBoolean; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.hadoop.hbase.util.RetryCounter; 029import org.apache.hadoop.hbase.util.RetryCounterFactory; 030import org.apache.hadoop.util.Shell; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.zookeeper.AsyncCallback; 033import org.apache.zookeeper.CreateMode; 034import org.apache.zookeeper.KeeperException; 035import org.apache.zookeeper.WatchedEvent; 036import org.apache.zookeeper.Watcher; 037import org.apache.zookeeper.ZooDefs; 038import org.apache.zookeeper.ZooKeeper; 039import org.apache.zookeeper.data.Stat; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/*** 044 * An agent for executing destructive actions for ChaosMonkey. Uses ZooKeeper Watchers and 045 * LocalShell, to do the killing and getting status of service on targeted host without SSH. uses 046 * given ZNode Structure: /perfChaosTest (root) | | /chaosAgents (Used for registration has hostname 047 * ephemeral nodes as children) | | /chaosAgentTaskStatus (Used for task Execution, has hostname 048 * persistent nodes as child with tasks as their children) | | /hostname | | /task0000001 (command 049 * as data) (has two types of command : 1: starts with "exec" for executing a destructive action. 2: 050 * starts with "bool" for getting only status of service. 051 */ 052@InterfaceAudience.Private 053public class ChaosAgent implements Watcher, Closeable, Runnable { 054 055 private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class); 056 static AtomicBoolean stopChaosAgent = new AtomicBoolean(); 057 private ZooKeeper zk; 058 private String quorum; 059 private String agentName; 060 private Configuration conf; 061 private RetryCounterFactory retryCounterFactory; 062 private volatile boolean connected = false; 063 064 public ChaosAgent(Configuration conf, String quorum, String agentName) { 065 initChaosAgent(conf, quorum, agentName); 066 } 067 068 /*** 069 * sets global params and initiates connection with ZooKeeper then does registration. 070 * @param conf initial configuration to use 071 * @param quorum ZK Quorum 072 * @param agentName AgentName to use 073 */ 074 private void initChaosAgent(Configuration conf, String quorum, String agentName) { 075 this.conf = conf; 076 this.quorum = quorum; 077 this.agentName = agentName; 078 this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig() 079 .setMaxAttempts( 080 conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY, ChaosConstants.DEFAULT_RETRY_ATTEMPTS)) 081 .setSleepInterval(conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY, 082 ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL))); 083 try { 084 this.createZKConnection(null); 085 this.register(); 086 } catch (IOException e) { 087 LOG.error("Error Creating Connection: " + e); 088 } 089 } 090 091 /*** 092 * Creates Connection with ZooKeeper. 093 * @throws IOException if something goes wrong 094 */ 095 private void createZKConnection(Watcher watcher) throws IOException { 096 if (watcher == null) { 097 zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this); 098 } else { 099 zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher); 100 } 101 LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName); 102 } 103 104 // WATCHERS: Below are the Watches used by ChaosAgent 105 106 /*** 107 * Watcher for notifying if any task is assigned to agent or not, by seeking if any Node is being 108 * added to agent as Child. 109 */ 110 Watcher newTaskCreatedWatcher = new Watcher() { 111 @Override 112 public void process(WatchedEvent watchedEvent) { 113 if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { 114 if ( 115 !(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + ChaosConstants.ZNODE_PATH_SEPARATOR 116 + agentName).equals(watchedEvent.getPath()) 117 ) { 118 throw new RuntimeException( 119 KeeperException.create(KeeperException.Code.DATAINCONSISTENCY)); 120 } 121 122 LOG.info("Change in Tasks Node, checking for Tasks again."); 123 getTasks(); 124 } 125 126 } 127 }; 128 129 // CALLBACKS: Below are the Callbacks used by Chaos Agent 130 131 /** 132 * Callback used while setting status of a given task, Logs given status. 133 */ 134 AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, stat) -> { 135 switch (KeeperException.Code.get(rc)) { 136 case CONNECTIONLOSS: 137 // Connection to the server was lost while setting status setting again. 138 try { 139 recreateZKConnection(); 140 } catch (Exception e) { 141 break; 142 } 143 setStatusOfTaskZNode(path, (String) ctx); 144 break; 145 146 case OK: 147 LOG.info("Status of Task has been set"); 148 break; 149 150 case NONODE: 151 LOG.error("Chaos Agent status node does not exists: " 152 + "check for ZNode directory structure again."); 153 break; 154 155 default: 156 LOG.error("Error while setting status of task ZNode: " + path, 157 KeeperException.create(KeeperException.Code.get(rc), path)); 158 } 159 }; 160 161 /** 162 * Callback used while creating a Persistent ZNode tries to create ZNode again if Connection was 163 * lost in previous try. 164 */ 165 AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> { 166 switch (KeeperException.Code.get(rc)) { 167 case CONNECTIONLOSS: 168 try { 169 recreateZKConnection(); 170 } catch (Exception e) { 171 break; 172 } 173 createZNode(path, (byte[]) ctx); 174 break; 175 case OK: 176 LOG.info("ZNode created : " + path); 177 break; 178 case NODEEXISTS: 179 LOG.warn("ZNode already registered: " + path); 180 break; 181 default: 182 LOG.error("Error occurred while creating Persistent ZNode: " + path, 183 KeeperException.create(KeeperException.Code.get(rc), path)); 184 } 185 }; 186 187 /** 188 * Callback used while creating a Ephemeral ZNode tries to create ZNode again if Connection was 189 * lost in previous try. 190 */ 191 AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, name) -> { 192 switch (KeeperException.Code.get(rc)) { 193 case CONNECTIONLOSS: 194 try { 195 recreateZKConnection(); 196 } catch (Exception e) { 197 break; 198 } 199 createEphemeralZNode(path, (byte[]) ctx); 200 break; 201 case OK: 202 LOG.info("ZNode created : " + path); 203 break; 204 case NODEEXISTS: 205 LOG.warn("ZNode already registered: " + path); 206 break; 207 default: 208 LOG.error("Error occurred while creating Ephemeral ZNode: ", 209 KeeperException.create(KeeperException.Code.get(rc), path)); 210 } 211 }; 212 213 /** 214 * Callback used by getTasksForAgentCallback while getting command, after getting command 215 * successfully, it executes command and set its status with respect to the command type. 216 */ 217 AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() { 218 @Override 219 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 220 switch (KeeperException.Code.get(rc)) { 221 case CONNECTIONLOSS: 222 // Connection to the server has been lost while getting task, getting data again. 223 try { 224 recreateZKConnection(); 225 } catch (Exception e) { 226 break; 227 } 228 zk.getData(path, false, getTaskForExecutionCallback, 229 new String(data, StandardCharsets.UTF_8)); 230 break; 231 case OK: 232 String cmd = new String(data, StandardCharsets.UTF_8); 233 LOG.info("Executing command : " + cmd); 234 String status = ChaosConstants.TASK_COMPLETION_STRING; 235 try { 236 String user = 237 conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER, ChaosConstants.DEFAULT_SHELL_USER); 238 switch (cmd.substring(0, 4)) { 239 case "bool": 240 String ret = execWithRetries(user, cmd.substring(4)).getSecond(); 241 status = Boolean.toString(ret.length() > 0); 242 break; 243 244 case "exec": 245 execWithRetries(user, cmd.substring(4)); 246 break; 247 248 default: 249 LOG.error("Unknown Command Type"); 250 status = ChaosConstants.TASK_ERROR_STRING; 251 } 252 } catch (IOException e) { 253 LOG.error("Got error while executing command : " + cmd + " On agent : " + agentName 254 + " Error : " + e); 255 status = ChaosConstants.TASK_ERROR_STRING; 256 } 257 258 try { 259 setStatusOfTaskZNode(path, status); 260 Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME); 261 } catch (InterruptedException e) { 262 LOG.error("Error occured after setting status: " + e); 263 } 264 265 default: 266 LOG.error("Error occurred while getting data", 267 KeeperException.create(KeeperException.Code.get(rc), path)); 268 } 269 } 270 }; 271 272 /*** 273 * Callback used while getting Tasks for agent if call executed without Exception, It creates a 274 * separate thread for each children to execute given Tasks parallely. 275 */ 276 AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() { 277 @Override 278 public void processResult(int rc, String path, Object ctx, List<String> children) { 279 switch (KeeperException.Code.get(rc)) { 280 case CONNECTIONLOSS: { 281 // Connection to the server has been lost, getting tasks again. 282 try { 283 recreateZKConnection(); 284 } catch (Exception e) { 285 break; 286 } 287 getTasks(); 288 break; 289 } 290 291 case OK: { 292 if (children != null) { 293 try { 294 295 LOG.info("Executing each task as a separate thread"); 296 List<Thread> tasksList = new ArrayList<>(); 297 for (String task : children) { 298 String threadName = agentName + "_" + task; 299 Thread t = new Thread(() -> { 300 301 LOG.info("Executing task : " + task + " of agent : " + agentName); 302 zk.getData( 303 ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE 304 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName 305 + ChaosConstants.ZNODE_PATH_SEPARATOR + task, 306 false, getTaskForExecutionCallback, task); 307 308 }); 309 t.setName(threadName); 310 t.start(); 311 tasksList.add(t); 312 313 for (Thread thread : tasksList) { 314 thread.join(); 315 } 316 } 317 } catch (InterruptedException e) { 318 LOG.error( 319 "Error scheduling next task : " + " for agent : " + agentName + " Error : " + e); 320 } 321 } 322 break; 323 } 324 325 default: 326 LOG.error("Error occurred while getting task", 327 KeeperException.create(KeeperException.Code.get(rc), path)); 328 } 329 } 330 }; 331 332 /*** 333 * Function to create PERSISTENT ZNODE with given path and data given as params 334 * @param path Path at which ZNode to create 335 * @param data Data to put under ZNode 336 */ 337 public void createZNode(String path, byte[] data) { 338 zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createZNodeCallback, 339 data); 340 } 341 342 /*** 343 * Function to create EPHEMERAL ZNODE with given path and data as params. 344 * @param path Path at which Ephemeral ZNode to create 345 * @param data Data to put under ZNode 346 */ 347 public void createEphemeralZNode(String path, byte[] data) { 348 zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 349 createEphemeralZNodeCallback, data); 350 } 351 352 /** 353 * Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same. 354 * @param path Path to check for ZNode 355 */ 356 private void createIfZNodeNotExists(String path) { 357 try { 358 if (zk.exists(path, false) == null) { 359 createZNode(path, new byte[0]); 360 } 361 } catch (KeeperException | InterruptedException e) { 362 LOG.error("Error checking given node : " + path + " " + e); 363 } 364 } 365 366 /** 367 * sets given Status for Task Znode 368 * @param taskZNode ZNode to set status 369 * @param status Status value 370 */ 371 public void setStatusOfTaskZNode(String taskZNode, String status) { 372 LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status); 373 zk.setData(taskZNode, status.getBytes(StandardCharsets.UTF_8), -1, setStatusOfTaskZNodeCallback, 374 null); 375 } 376 377 /** 378 * registration of ChaosAgent by checking and creating necessary ZNodes. 379 */ 380 private void register() { 381 createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE); 382 createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE); 383 createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE); 384 createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE 385 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName); 386 387 createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE 388 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]); 389 } 390 391 /*** 392 * Gets tasks for execution, basically sets Watch on it's respective host's Znode and waits for 393 * tasks to be assigned, also has a getTasksForAgentCallback which handles execution of task. 394 */ 395 private void getTasks() { 396 LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for new Tasks"); 397 zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE 398 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, newTaskCreatedWatcher, 399 getTasksForAgentCallback, null); 400 } 401 402 /** 403 * Below function executes command with retries with given user. Uses LocalShell to execute a 404 * command. 405 * @param user user name, default none 406 * @param cmd Command to execute 407 * @return A pair of Exit Code and Shell output 408 * @throws IOException Exception while executing shell command 409 */ 410 private Pair<Integer, String> execWithRetries(String user, String cmd) throws IOException { 411 RetryCounter retryCounter = retryCounterFactory.create(); 412 while (true) { 413 try { 414 return exec(user, cmd); 415 } catch (IOException e) { 416 retryOrThrow(retryCounter, e, user, cmd); 417 } 418 try { 419 retryCounter.sleepUntilNextRetry(); 420 } catch (InterruptedException e) { 421 LOG.warn("Sleep Interrupted: " + e); 422 } 423 } 424 } 425 426 private Pair<Integer, String> exec(String user, String cmd) throws IOException { 427 LOG.info("Executing Shell command: " + cmd + " , user: " + user); 428 429 LocalShell shell = new LocalShell(user, cmd); 430 try { 431 shell.execute(); 432 } catch (Shell.ExitCodeException e) { 433 String output = shell.getOutput(); 434 throw new Shell.ExitCodeException(e.getExitCode(), 435 "stderr: " + e.getMessage() + ", stdout: " + output); 436 } 437 LOG.info("Executed Shell command, exit code: {}, output n{}", shell.getExitCode(), 438 shell.getOutput()); 439 440 return new Pair<>(shell.getExitCode(), shell.getOutput()); 441 } 442 443 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String user, 444 String cmd) throws E { 445 if (retryCounter.shouldRetry()) { 446 LOG.warn( 447 "Local command: {}, user: {}, failed at attempt {}. Retrying until maxAttempts: {}." 448 + "Exception {}", 449 cmd, user, retryCounter.getAttemptTimes(), retryCounter.getMaxAttempts(), ex.getMessage()); 450 return; 451 } 452 throw ex; 453 } 454 455 private boolean isConnected() { 456 return connected; 457 } 458 459 @Override 460 public void close() throws IOException { 461 LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName); 462 try { 463 zk.close(); 464 } catch (InterruptedException e) { 465 LOG.error("Error while closing ZooKeeper Connection."); 466 } 467 } 468 469 @Override 470 public void run() { 471 try { 472 LOG.info("Running Chaos Agent on : " + agentName); 473 while (!this.isConnected()) { 474 Thread.sleep(100); 475 } 476 this.getTasks(); 477 while (!stopChaosAgent.get()) { 478 Thread.sleep(500); 479 } 480 } catch (InterruptedException e) { 481 LOG.error("Error while running Chaos Agent", e); 482 } 483 484 } 485 486 @Override 487 public void process(WatchedEvent watchedEvent) { 488 LOG.info("Processing event: " + watchedEvent.toString()); 489 if (watchedEvent.getType() == Event.EventType.None) { 490 switch (watchedEvent.getState()) { 491 case SyncConnected: 492 connected = true; 493 break; 494 case Disconnected: 495 connected = false; 496 break; 497 case Expired: 498 connected = false; 499 LOG.error("Session expired creating again"); 500 try { 501 createZKConnection(null); 502 } catch (IOException e) { 503 LOG.error("Error creating Zookeeper connection", e); 504 } 505 default: 506 LOG.error("Unknown State"); 507 break; 508 } 509 } 510 } 511 512 private void recreateZKConnection() throws Exception { 513 try { 514 zk.close(); 515 createZKConnection(newTaskCreatedWatcher); 516 createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE 517 + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]); 518 } catch (IOException e) { 519 LOG.error("Error creating new ZK COnnection for agent: {}", agentName + e); 520 throw e; 521 } 522 } 523 524 /** 525 * Executes Command locally. 526 */ 527 protected static class LocalShell extends Shell.ShellCommandExecutor { 528 529 private String user; 530 private String execCommand; 531 532 public LocalShell(String user, String execCommand) { 533 super(new String[] { execCommand }); 534 this.user = user; 535 this.execCommand = execCommand; 536 } 537 538 @Override 539 public String[] getExecString() { 540 // TODO: Considering Agent is running with same user. 541 if (!user.equals(ChaosConstants.DEFAULT_SHELL_USER)) { 542 execCommand = String.format("su -u %1$s %2$s", user, execCommand); 543 } 544 return new String[] { "/usr/bin/env", "bash", "-c", execCommand }; 545 } 546 547 @Override 548 public void execute() throws IOException { 549 super.execute(); 550 } 551 } 552}