001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.hadoop.hbase.util.RetryCounter;
028import org.apache.hadoop.hbase.util.RetryCounterFactory;
029import org.apache.hadoop.util.Shell;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.zookeeper.AsyncCallback;
032import org.apache.zookeeper.CreateMode;
033import org.apache.zookeeper.KeeperException;
034import org.apache.zookeeper.WatchedEvent;
035import org.apache.zookeeper.Watcher;
036import org.apache.zookeeper.ZooDefs;
037import org.apache.zookeeper.ZooKeeper;
038import org.apache.zookeeper.data.Stat;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/***
043 * An agent for executing destructive actions for ChaosMonkey. Uses ZooKeeper Watchers and
044 * LocalShell, to do the killing and getting status of service on targeted host without SSH. uses
045 * given ZNode Structure: /perfChaosTest (root) | | /chaosAgents (Used for registration has hostname
046 * ephemeral nodes as children) | | /chaosAgentTaskStatus (Used for task Execution, has hostname
047 * persistent nodes as child with tasks as their children) | | /hostname | | /task0000001 (command
048 * as data) (has two types of command : 1: starts with "exec" for executing a destructive action. 2:
049 * starts with "bool" for getting only status of service.
050 */
051@InterfaceAudience.Private
052public class ChaosAgent implements Watcher, Closeable, Runnable {
053
054  private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class);
055  static AtomicBoolean stopChaosAgent = new AtomicBoolean();
056  private ZooKeeper zk;
057  private String quorum;
058  private String agentName;
059  private Configuration conf;
060  private RetryCounterFactory retryCounterFactory;
061  private volatile boolean connected = false;
062
063  public ChaosAgent(Configuration conf, String quorum, String agentName) {
064    initChaosAgent(conf, quorum, agentName);
065  }
066
067  /***
068   * sets global params and initiates connection with ZooKeeper then does registration.
069   * @param conf      initial configuration to use
070   * @param quorum    ZK Quorum
071   * @param agentName AgentName to use
072   */
073  private void initChaosAgent(Configuration conf, String quorum, String agentName) {
074    this.conf = conf;
075    this.quorum = quorum;
076    this.agentName = agentName;
077    this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig()
078      .setMaxAttempts(
079        conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY, ChaosConstants.DEFAULT_RETRY_ATTEMPTS))
080      .setSleepInterval(conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY,
081        ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL)));
082    try {
083      this.createZKConnection(null);
084      this.register();
085    } catch (IOException e) {
086      LOG.error("Error Creating Connection: " + e);
087    }
088  }
089
090  /***
091   * Creates Connection with ZooKeeper.
092   * @throws IOException if something goes wrong
093   */
094  private void createZKConnection(Watcher watcher) throws IOException {
095    if (watcher == null) {
096      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this);
097    } else {
098      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher);
099    }
100    LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName);
101  }
102
103  // WATCHERS: Below are the Watches used by ChaosAgent
104
105  /***
106   * Watcher for notifying if any task is assigned to agent or not, by seeking if any Node is being
107   * added to agent as Child.
108   */
109  Watcher newTaskCreatedWatcher = new Watcher() {
110    @Override
111    public void process(WatchedEvent watchedEvent) {
112      if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
113        if (
114          !(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + ChaosConstants.ZNODE_PATH_SEPARATOR
115            + agentName).equals(watchedEvent.getPath())
116        ) {
117          throw new RuntimeException(
118            KeeperException.create(KeeperException.Code.DATAINCONSISTENCY));
119        }
120
121        LOG.info("Change in Tasks Node, checking for Tasks again.");
122        getTasks();
123      }
124
125    }
126  };
127
128  // CALLBACKS: Below are the Callbacks used by Chaos Agent
129
130  /**
131   * Callback used while setting status of a given task, Logs given status.
132   */
133  AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, stat) -> {
134    switch (KeeperException.Code.get(rc)) {
135      case CONNECTIONLOSS:
136        // Connection to the server was lost while setting status setting again.
137        try {
138          recreateZKConnection();
139        } catch (Exception e) {
140          break;
141        }
142        setStatusOfTaskZNode(path, (String) ctx);
143        break;
144
145      case OK:
146        LOG.info("Status of Task has been set");
147        break;
148
149      case NONODE:
150        LOG.error("Chaos Agent status node does not exists: "
151          + "check for ZNode directory structure again.");
152        break;
153
154      default:
155        LOG.error("Error while setting status of task ZNode: " + path,
156          KeeperException.create(KeeperException.Code.get(rc), path));
157    }
158  };
159
160  /**
161   * Callback used while creating a Persistent ZNode tries to create ZNode again if Connection was
162   * lost in previous try.
163   */
164  AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> {
165    switch (KeeperException.Code.get(rc)) {
166      case CONNECTIONLOSS:
167        try {
168          recreateZKConnection();
169        } catch (Exception e) {
170          break;
171        }
172        createZNode(path, (byte[]) ctx);
173        break;
174      case OK:
175        LOG.info("ZNode created : " + path);
176        break;
177      case NODEEXISTS:
178        LOG.warn("ZNode already registered: " + path);
179        break;
180      default:
181        LOG.error("Error occurred while creating Persistent ZNode: " + path,
182          KeeperException.create(KeeperException.Code.get(rc), path));
183    }
184  };
185
186  /**
187   * Callback used while creating a Ephemeral ZNode tries to create ZNode again if Connection was
188   * lost in previous try.
189   */
190  AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, name) -> {
191    switch (KeeperException.Code.get(rc)) {
192      case CONNECTIONLOSS:
193        try {
194          recreateZKConnection();
195        } catch (Exception e) {
196          break;
197        }
198        createEphemeralZNode(path, (byte[]) ctx);
199        break;
200      case OK:
201        LOG.info("ZNode created : " + path);
202        break;
203      case NODEEXISTS:
204        LOG.warn("ZNode already registered: " + path);
205        break;
206      default:
207        LOG.error("Error occurred while creating Ephemeral ZNode: ",
208          KeeperException.create(KeeperException.Code.get(rc), path));
209    }
210  };
211
212  /**
213   * Callback used by getTasksForAgentCallback while getting command, after getting command
214   * successfully, it executes command and set its status with respect to the command type.
215   */
216  AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() {
217    @Override
218    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
219      switch (KeeperException.Code.get(rc)) {
220        case CONNECTIONLOSS:
221          // Connection to the server has been lost while getting task, getting data again.
222          try {
223            recreateZKConnection();
224          } catch (Exception e) {
225            break;
226          }
227          zk.getData(path, false, getTaskForExecutionCallback, new String(data));
228          break;
229        case OK:
230          String cmd = new String(data);
231          LOG.info("Executing command : " + cmd);
232          String status = ChaosConstants.TASK_COMPLETION_STRING;
233          try {
234            String user =
235              conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER, ChaosConstants.DEFAULT_SHELL_USER);
236            switch (cmd.substring(0, 4)) {
237              case "bool":
238                String ret = execWithRetries(user, cmd.substring(4)).getSecond();
239                status = Boolean.toString(ret.length() > 0);
240                break;
241
242              case "exec":
243                execWithRetries(user, cmd.substring(4));
244                break;
245
246              default:
247                LOG.error("Unknown Command Type");
248                status = ChaosConstants.TASK_ERROR_STRING;
249            }
250          } catch (IOException e) {
251            LOG.error("Got error while executing command : " + cmd + " On agent : " + agentName
252              + " Error : " + e);
253            status = ChaosConstants.TASK_ERROR_STRING;
254          }
255
256          try {
257            setStatusOfTaskZNode(path, status);
258            Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME);
259          } catch (InterruptedException e) {
260            LOG.error("Error occured after setting status: " + e);
261          }
262
263        default:
264          LOG.error("Error occurred while getting data",
265            KeeperException.create(KeeperException.Code.get(rc), path));
266      }
267    }
268  };
269
270  /***
271   * Callback used while getting Tasks for agent if call executed without Exception, It creates a
272   * separate thread for each children to execute given Tasks parallely.
273   */
274  AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() {
275    @Override
276    public void processResult(int rc, String path, Object ctx, List<String> children) {
277      switch (KeeperException.Code.get(rc)) {
278        case CONNECTIONLOSS: {
279          // Connection to the server has been lost, getting tasks again.
280          try {
281            recreateZKConnection();
282          } catch (Exception e) {
283            break;
284          }
285          getTasks();
286          break;
287        }
288
289        case OK: {
290          if (children != null) {
291            try {
292
293              LOG.info("Executing each task as a separate thread");
294              List<Thread> tasksList = new ArrayList<>();
295              for (String task : children) {
296                String threadName = agentName + "_" + task;
297                Thread t = new Thread(() -> {
298
299                  LOG.info("Executing task : " + task + " of agent : " + agentName);
300                  zk.getData(
301                    ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE
302                      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName
303                      + ChaosConstants.ZNODE_PATH_SEPARATOR + task,
304                    false, getTaskForExecutionCallback, task);
305
306                });
307                t.setName(threadName);
308                t.start();
309                tasksList.add(t);
310
311                for (Thread thread : tasksList) {
312                  thread.join();
313                }
314              }
315            } catch (InterruptedException e) {
316              LOG.error(
317                "Error scheduling next task : " + " for agent : " + agentName + " Error : " + e);
318            }
319          }
320          break;
321        }
322
323        default:
324          LOG.error("Error occurred while getting task",
325            KeeperException.create(KeeperException.Code.get(rc), path));
326      }
327    }
328  };
329
330  /***
331   * Function to create PERSISTENT ZNODE with given path and data given as params
332   * @param path Path at which ZNode to create
333   * @param data Data to put under ZNode
334   */
335  public void createZNode(String path, byte[] data) {
336    zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createZNodeCallback,
337      data);
338  }
339
340  /***
341   * Function to create EPHEMERAL ZNODE with given path and data as params.
342   * @param path Path at which Ephemeral ZNode to create
343   * @param data Data to put under ZNode
344   */
345  public void createEphemeralZNode(String path, byte[] data) {
346    zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
347      createEphemeralZNodeCallback, data);
348  }
349
350  /**
351   * Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same.
352   * @param path Path to check for ZNode
353   */
354  private void createIfZNodeNotExists(String path) {
355    try {
356      if (zk.exists(path, false) == null) {
357        createZNode(path, new byte[0]);
358      }
359    } catch (KeeperException | InterruptedException e) {
360      LOG.error("Error checking given node : " + path + " " + e);
361    }
362  }
363
364  /**
365   * sets given Status for Task Znode
366   * @param taskZNode ZNode to set status
367   * @param status    Status value
368   */
369  public void setStatusOfTaskZNode(String taskZNode, String status) {
370    LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status);
371    zk.setData(taskZNode, status.getBytes(), -1, setStatusOfTaskZNodeCallback, null);
372  }
373
374  /**
375   * registration of ChaosAgent by checking and creating necessary ZNodes.
376   */
377  private void register() {
378    createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE);
379    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE);
380    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE);
381    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE
382      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName);
383
384    createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE
385      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
386  }
387
388  /***
389   * Gets tasks for execution, basically sets Watch on it's respective host's Znode and waits for
390   * tasks to be assigned, also has a getTasksForAgentCallback which handles execution of task.
391   */
392  private void getTasks() {
393    LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for new Tasks");
394    zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE
395      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, newTaskCreatedWatcher,
396      getTasksForAgentCallback, null);
397  }
398
399  /**
400   * Below function executes command with retries with given user. Uses LocalShell to execute a
401   * command.
402   * @param user user name, default none
403   * @param cmd  Command to execute
404   * @return A pair of Exit Code and Shell output
405   * @throws IOException Exception while executing shell command
406   */
407  private Pair<Integer, String> execWithRetries(String user, String cmd) throws IOException {
408    RetryCounter retryCounter = retryCounterFactory.create();
409    while (true) {
410      try {
411        return exec(user, cmd);
412      } catch (IOException e) {
413        retryOrThrow(retryCounter, e, user, cmd);
414      }
415      try {
416        retryCounter.sleepUntilNextRetry();
417      } catch (InterruptedException e) {
418        LOG.warn("Sleep Interrupted: " + e);
419      }
420    }
421  }
422
423  private Pair<Integer, String> exec(String user, String cmd) throws IOException {
424    LOG.info("Executing Shell command: " + cmd + " , user: " + user);
425
426    LocalShell shell = new LocalShell(user, cmd);
427    try {
428      shell.execute();
429    } catch (Shell.ExitCodeException e) {
430      String output = shell.getOutput();
431      throw new Shell.ExitCodeException(e.getExitCode(),
432        "stderr: " + e.getMessage() + ", stdout: " + output);
433    }
434    LOG.info("Executed Shell command, exit code: {}, output n{}", shell.getExitCode(),
435      shell.getOutput());
436
437    return new Pair<>(shell.getExitCode(), shell.getOutput());
438  }
439
440  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String user,
441    String cmd) throws E {
442    if (retryCounter.shouldRetry()) {
443      LOG.warn(
444        "Local command: {}, user: {}, failed at attempt {}. Retrying until maxAttempts: {}."
445          + "Exception {}",
446        cmd, user, retryCounter.getAttemptTimes(), retryCounter.getMaxAttempts(), ex.getMessage());
447      return;
448    }
449    throw ex;
450  }
451
452  private boolean isConnected() {
453    return connected;
454  }
455
456  @Override
457  public void close() throws IOException {
458    LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName);
459    try {
460      zk.close();
461    } catch (InterruptedException e) {
462      LOG.error("Error while closing ZooKeeper Connection.");
463    }
464  }
465
466  @Override
467  public void run() {
468    try {
469      LOG.info("Running Chaos Agent on : " + agentName);
470      while (!this.isConnected()) {
471        Thread.sleep(100);
472      }
473      this.getTasks();
474      while (!stopChaosAgent.get()) {
475        Thread.sleep(500);
476      }
477    } catch (InterruptedException e) {
478      LOG.error("Error while running Chaos Agent", e);
479    }
480
481  }
482
483  @Override
484  public void process(WatchedEvent watchedEvent) {
485    LOG.info("Processing event: " + watchedEvent.toString());
486    if (watchedEvent.getType() == Event.EventType.None) {
487      switch (watchedEvent.getState()) {
488        case SyncConnected:
489          connected = true;
490          break;
491        case Disconnected:
492          connected = false;
493          break;
494        case Expired:
495          connected = false;
496          LOG.error("Session expired creating again");
497          try {
498            createZKConnection(null);
499          } catch (IOException e) {
500            LOG.error("Error creating Zookeeper connection", e);
501          }
502        default:
503          LOG.error("Unknown State");
504          break;
505      }
506    }
507  }
508
509  private void recreateZKConnection() throws Exception {
510    try {
511      zk.close();
512      createZKConnection(newTaskCreatedWatcher);
513      createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE
514        + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
515    } catch (IOException e) {
516      LOG.error("Error creating new ZK COnnection for agent: {}", agentName + e);
517      throw e;
518    }
519  }
520
521  /**
522   * Executes Command locally.
523   */
524  protected static class LocalShell extends Shell.ShellCommandExecutor {
525
526    private String user;
527    private String execCommand;
528
529    public LocalShell(String user, String execCommand) {
530      super(new String[] { execCommand });
531      this.user = user;
532      this.execCommand = execCommand;
533    }
534
535    @Override
536    public String[] getExecString() {
537      // TODO: Considering Agent is running with same user.
538      if (!user.equals(ChaosConstants.DEFAULT_SHELL_USER)) {
539        execCommand = String.format("su -u %1$s %2$s", user, execCommand);
540      }
541      return new String[] { "/usr/bin/env", "bash", "-c", execCommand };
542    }
543
544    @Override
545    public void execute() throws IOException {
546      super.execute();
547    }
548  }
549}