001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.nio.charset.StandardCharsets;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.hadoop.hbase.util.RetryCounter;
029import org.apache.hadoop.hbase.util.RetryCounterFactory;
030import org.apache.hadoop.util.Shell;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.zookeeper.AsyncCallback;
033import org.apache.zookeeper.CreateMode;
034import org.apache.zookeeper.KeeperException;
035import org.apache.zookeeper.WatchedEvent;
036import org.apache.zookeeper.Watcher;
037import org.apache.zookeeper.ZooDefs;
038import org.apache.zookeeper.ZooKeeper;
039import org.apache.zookeeper.data.Stat;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/***
044 * An agent for executing destructive actions for ChaosMonkey. Uses ZooKeeper Watchers and
045 * LocalShell, to do the killing and getting status of service on targeted host without SSH. uses
046 * given ZNode Structure: /perfChaosTest (root) | | /chaosAgents (Used for registration has hostname
047 * ephemeral nodes as children) | | /chaosAgentTaskStatus (Used for task Execution, has hostname
048 * persistent nodes as child with tasks as their children) | | /hostname | | /task0000001 (command
049 * as data) (has two types of command : 1: starts with "exec" for executing a destructive action. 2:
050 * starts with "bool" for getting only status of service.
051 */
052@InterfaceAudience.Private
053public class ChaosAgent implements Watcher, Closeable, Runnable {
054
055  private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class);
056  static AtomicBoolean stopChaosAgent = new AtomicBoolean();
057  private ZooKeeper zk;
058  private String quorum;
059  private String agentName;
060  private Configuration conf;
061  private RetryCounterFactory retryCounterFactory;
062  private volatile boolean connected = false;
063
064  public ChaosAgent(Configuration conf, String quorum, String agentName) {
065    initChaosAgent(conf, quorum, agentName);
066  }
067
068  /***
069   * sets global params and initiates connection with ZooKeeper then does registration.
070   * @param conf      initial configuration to use
071   * @param quorum    ZK Quorum
072   * @param agentName AgentName to use
073   */
074  private void initChaosAgent(Configuration conf, String quorum, String agentName) {
075    this.conf = conf;
076    this.quorum = quorum;
077    this.agentName = agentName;
078    this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig()
079      .setMaxAttempts(
080        conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY, ChaosConstants.DEFAULT_RETRY_ATTEMPTS))
081      .setSleepInterval(conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY,
082        ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL)));
083    try {
084      this.createZKConnection(null);
085      this.register();
086    } catch (IOException e) {
087      LOG.error("Error Creating Connection: " + e);
088    }
089  }
090
091  /***
092   * Creates Connection with ZooKeeper.
093   * @throws IOException if something goes wrong
094   */
095  private void createZKConnection(Watcher watcher) throws IOException {
096    if (watcher == null) {
097      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this);
098    } else {
099      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher);
100    }
101    LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName);
102  }
103
104  // WATCHERS: Below are the Watches used by ChaosAgent
105
106  /***
107   * Watcher for notifying if any task is assigned to agent or not, by seeking if any Node is being
108   * added to agent as Child.
109   */
110  Watcher newTaskCreatedWatcher = new Watcher() {
111    @Override
112    public void process(WatchedEvent watchedEvent) {
113      if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
114        if (
115          !(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + ChaosConstants.ZNODE_PATH_SEPARATOR
116            + agentName).equals(watchedEvent.getPath())
117        ) {
118          throw new RuntimeException(
119            KeeperException.create(KeeperException.Code.DATAINCONSISTENCY));
120        }
121
122        LOG.info("Change in Tasks Node, checking for Tasks again.");
123        getTasks();
124      }
125
126    }
127  };
128
129  // CALLBACKS: Below are the Callbacks used by Chaos Agent
130
131  /**
132   * Callback used while setting status of a given task, Logs given status.
133   */
134  AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, stat) -> {
135    switch (KeeperException.Code.get(rc)) {
136      case CONNECTIONLOSS:
137        // Connection to the server was lost while setting status setting again.
138        try {
139          recreateZKConnection();
140        } catch (Exception e) {
141          break;
142        }
143        setStatusOfTaskZNode(path, (String) ctx);
144        break;
145
146      case OK:
147        LOG.info("Status of Task has been set");
148        break;
149
150      case NONODE:
151        LOG.error("Chaos Agent status node does not exists: "
152          + "check for ZNode directory structure again.");
153        break;
154
155      default:
156        LOG.error("Error while setting status of task ZNode: " + path,
157          KeeperException.create(KeeperException.Code.get(rc), path));
158    }
159  };
160
161  /**
162   * Callback used while creating a Persistent ZNode tries to create ZNode again if Connection was
163   * lost in previous try.
164   */
165  AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> {
166    switch (KeeperException.Code.get(rc)) {
167      case CONNECTIONLOSS:
168        try {
169          recreateZKConnection();
170        } catch (Exception e) {
171          break;
172        }
173        createZNode(path, (byte[]) ctx);
174        break;
175      case OK:
176        LOG.info("ZNode created : " + path);
177        break;
178      case NODEEXISTS:
179        LOG.warn("ZNode already registered: " + path);
180        break;
181      default:
182        LOG.error("Error occurred while creating Persistent ZNode: " + path,
183          KeeperException.create(KeeperException.Code.get(rc), path));
184    }
185  };
186
187  /**
188   * Callback used while creating a Ephemeral ZNode tries to create ZNode again if Connection was
189   * lost in previous try.
190   */
191  AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, name) -> {
192    switch (KeeperException.Code.get(rc)) {
193      case CONNECTIONLOSS:
194        try {
195          recreateZKConnection();
196        } catch (Exception e) {
197          break;
198        }
199        createEphemeralZNode(path, (byte[]) ctx);
200        break;
201      case OK:
202        LOG.info("ZNode created : " + path);
203        break;
204      case NODEEXISTS:
205        LOG.warn("ZNode already registered: " + path);
206        break;
207      default:
208        LOG.error("Error occurred while creating Ephemeral ZNode: ",
209          KeeperException.create(KeeperException.Code.get(rc), path));
210    }
211  };
212
213  /**
214   * Callback used by getTasksForAgentCallback while getting command, after getting command
215   * successfully, it executes command and set its status with respect to the command type.
216   */
217  AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() {
218    @Override
219    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
220      switch (KeeperException.Code.get(rc)) {
221        case CONNECTIONLOSS:
222          // Connection to the server has been lost while getting task, getting data again.
223          try {
224            recreateZKConnection();
225          } catch (Exception e) {
226            break;
227          }
228          zk.getData(path, false, getTaskForExecutionCallback,
229            new String(data, StandardCharsets.UTF_8));
230          break;
231        case OK:
232          String cmd = new String(data, StandardCharsets.UTF_8);
233          LOG.info("Executing command : " + cmd);
234          String status = ChaosConstants.TASK_COMPLETION_STRING;
235          try {
236            String user =
237              conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER, ChaosConstants.DEFAULT_SHELL_USER);
238            switch (cmd.substring(0, 4)) {
239              case "bool":
240                String ret = execWithRetries(user, cmd.substring(4)).getSecond();
241                status = Boolean.toString(ret.length() > 0);
242                break;
243
244              case "exec":
245                execWithRetries(user, cmd.substring(4));
246                break;
247
248              default:
249                LOG.error("Unknown Command Type");
250                status = ChaosConstants.TASK_ERROR_STRING;
251            }
252          } catch (IOException e) {
253            LOG.error("Got error while executing command : " + cmd + " On agent : " + agentName
254              + " Error : " + e);
255            status = ChaosConstants.TASK_ERROR_STRING;
256          }
257
258          try {
259            setStatusOfTaskZNode(path, status);
260            Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME);
261          } catch (InterruptedException e) {
262            LOG.error("Error occured after setting status: " + e);
263          }
264
265        default:
266          LOG.error("Error occurred while getting data",
267            KeeperException.create(KeeperException.Code.get(rc), path));
268      }
269    }
270  };
271
272  /***
273   * Callback used while getting Tasks for agent if call executed without Exception, It creates a
274   * separate thread for each children to execute given Tasks parallely.
275   */
276  AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() {
277    @Override
278    public void processResult(int rc, String path, Object ctx, List<String> children) {
279      switch (KeeperException.Code.get(rc)) {
280        case CONNECTIONLOSS: {
281          // Connection to the server has been lost, getting tasks again.
282          try {
283            recreateZKConnection();
284          } catch (Exception e) {
285            break;
286          }
287          getTasks();
288          break;
289        }
290
291        case OK: {
292          if (children != null) {
293            try {
294
295              LOG.info("Executing each task as a separate thread");
296              List<Thread> tasksList = new ArrayList<>();
297              for (String task : children) {
298                String threadName = agentName + "_" + task;
299                Thread t = new Thread(() -> {
300
301                  LOG.info("Executing task : " + task + " of agent : " + agentName);
302                  zk.getData(
303                    ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE
304                      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName
305                      + ChaosConstants.ZNODE_PATH_SEPARATOR + task,
306                    false, getTaskForExecutionCallback, task);
307
308                });
309                t.setName(threadName);
310                t.start();
311                tasksList.add(t);
312
313                for (Thread thread : tasksList) {
314                  thread.join();
315                }
316              }
317            } catch (InterruptedException e) {
318              LOG.error(
319                "Error scheduling next task : " + " for agent : " + agentName + " Error : " + e);
320            }
321          }
322          break;
323        }
324
325        default:
326          LOG.error("Error occurred while getting task",
327            KeeperException.create(KeeperException.Code.get(rc), path));
328      }
329    }
330  };
331
332  /***
333   * Function to create PERSISTENT ZNODE with given path and data given as params
334   * @param path Path at which ZNode to create
335   * @param data Data to put under ZNode
336   */
337  public void createZNode(String path, byte[] data) {
338    zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createZNodeCallback,
339      data);
340  }
341
342  /***
343   * Function to create EPHEMERAL ZNODE with given path and data as params.
344   * @param path Path at which Ephemeral ZNode to create
345   * @param data Data to put under ZNode
346   */
347  public void createEphemeralZNode(String path, byte[] data) {
348    zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
349      createEphemeralZNodeCallback, data);
350  }
351
352  /**
353   * Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same.
354   * @param path Path to check for ZNode
355   */
356  private void createIfZNodeNotExists(String path) {
357    try {
358      if (zk.exists(path, false) == null) {
359        createZNode(path, new byte[0]);
360      }
361    } catch (KeeperException | InterruptedException e) {
362      LOG.error("Error checking given node : " + path + " " + e);
363    }
364  }
365
366  /**
367   * sets given Status for Task Znode
368   * @param taskZNode ZNode to set status
369   * @param status    Status value
370   */
371  public void setStatusOfTaskZNode(String taskZNode, String status) {
372    LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status);
373    zk.setData(taskZNode, status.getBytes(StandardCharsets.UTF_8), -1, setStatusOfTaskZNodeCallback,
374      null);
375  }
376
377  /**
378   * registration of ChaosAgent by checking and creating necessary ZNodes.
379   */
380  private void register() {
381    createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE);
382    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE);
383    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE);
384    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE
385      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName);
386
387    createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE
388      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
389  }
390
391  /***
392   * Gets tasks for execution, basically sets Watch on it's respective host's Znode and waits for
393   * tasks to be assigned, also has a getTasksForAgentCallback which handles execution of task.
394   */
395  private void getTasks() {
396    LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for new Tasks");
397    zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE
398      + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, newTaskCreatedWatcher,
399      getTasksForAgentCallback, null);
400  }
401
402  /**
403   * Below function executes command with retries with given user. Uses LocalShell to execute a
404   * command.
405   * @param user user name, default none
406   * @param cmd  Command to execute
407   * @return A pair of Exit Code and Shell output
408   * @throws IOException Exception while executing shell command
409   */
410  private Pair<Integer, String> execWithRetries(String user, String cmd) throws IOException {
411    RetryCounter retryCounter = retryCounterFactory.create();
412    while (true) {
413      try {
414        return exec(user, cmd);
415      } catch (IOException e) {
416        retryOrThrow(retryCounter, e, user, cmd);
417      }
418      try {
419        retryCounter.sleepUntilNextRetry();
420      } catch (InterruptedException e) {
421        LOG.warn("Sleep Interrupted: " + e);
422      }
423    }
424  }
425
426  private Pair<Integer, String> exec(String user, String cmd) throws IOException {
427    LOG.info("Executing Shell command: " + cmd + " , user: " + user);
428
429    LocalShell shell = new LocalShell(user, cmd);
430    try {
431      shell.execute();
432    } catch (Shell.ExitCodeException e) {
433      String output = shell.getOutput();
434      throw new Shell.ExitCodeException(e.getExitCode(),
435        "stderr: " + e.getMessage() + ", stdout: " + output);
436    }
437    LOG.info("Executed Shell command, exit code: {}, output n{}", shell.getExitCode(),
438      shell.getOutput());
439
440    return new Pair<>(shell.getExitCode(), shell.getOutput());
441  }
442
443  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String user,
444    String cmd) throws E {
445    if (retryCounter.shouldRetry()) {
446      LOG.warn(
447        "Local command: {}, user: {}, failed at attempt {}. Retrying until maxAttempts: {}."
448          + "Exception {}",
449        cmd, user, retryCounter.getAttemptTimes(), retryCounter.getMaxAttempts(), ex.getMessage());
450      return;
451    }
452    throw ex;
453  }
454
455  private boolean isConnected() {
456    return connected;
457  }
458
459  @Override
460  public void close() throws IOException {
461    LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName);
462    try {
463      zk.close();
464    } catch (InterruptedException e) {
465      LOG.error("Error while closing ZooKeeper Connection.");
466    }
467  }
468
469  @Override
470  public void run() {
471    try {
472      LOG.info("Running Chaos Agent on : " + agentName);
473      while (!this.isConnected()) {
474        Thread.sleep(100);
475      }
476      this.getTasks();
477      while (!stopChaosAgent.get()) {
478        Thread.sleep(500);
479      }
480    } catch (InterruptedException e) {
481      LOG.error("Error while running Chaos Agent", e);
482    }
483
484  }
485
486  @Override
487  public void process(WatchedEvent watchedEvent) {
488    LOG.info("Processing event: " + watchedEvent.toString());
489    if (watchedEvent.getType() == Event.EventType.None) {
490      switch (watchedEvent.getState()) {
491        case SyncConnected:
492          connected = true;
493          break;
494        case Disconnected:
495          connected = false;
496          break;
497        case Expired:
498          connected = false;
499          LOG.error("Session expired creating again");
500          try {
501            createZKConnection(null);
502          } catch (IOException e) {
503            LOG.error("Error creating Zookeeper connection", e);
504          }
505        default:
506          LOG.error("Unknown State");
507          break;
508      }
509    }
510  }
511
512  private void recreateZKConnection() throws Exception {
513    try {
514      zk.close();
515      createZKConnection(newTaskCreatedWatcher);
516      createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE
517        + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
518    } catch (IOException e) {
519      LOG.error("Error creating new ZK COnnection for agent: {}", agentName + e);
520      throw e;
521    }
522  }
523
524  /**
525   * Executes Command locally.
526   */
527  protected static class LocalShell extends Shell.ShellCommandExecutor {
528
529    private String user;
530    private String execCommand;
531
532    public LocalShell(String user, String execCommand) {
533      super(new String[] { execCommand });
534      this.user = user;
535      this.execCommand = execCommand;
536    }
537
538    @Override
539    public String[] getExecString() {
540      // TODO: Considering Agent is running with same user.
541      if (!user.equals(ChaosConstants.DEFAULT_SHELL_USER)) {
542        execCommand = String.format("su -u %1$s %2$s", user, execCommand);
543      }
544      return new String[] { "/usr/bin/env", "bash", "-c", execCommand };
545    }
546
547    @Override
548    public void execute() throws IOException {
549      super.execute();
550    }
551  }
552}