001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
022import org.apache.hadoop.hbase.util.Threads;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.zookeeper.AsyncCallback;
025import org.apache.zookeeper.CreateMode;
026import org.apache.zookeeper.KeeperException;
027import org.apache.zookeeper.WatchedEvent;
028import org.apache.zookeeper.Watcher;
029import org.apache.zookeeper.ZooDefs;
030import org.apache.zookeeper.ZooKeeper;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034@InterfaceAudience.Private
035public class ChaosZKClient {
036
037  private static final Logger LOG = LoggerFactory.getLogger(ChaosZKClient.class.getName());
038  private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents";
039  private static final String CHAOS_AGENT_STATUS_ZNODE = "/hbase/chaosAgentTaskStatus";
040  private static final String ZNODE_PATH_SEPARATOR = "/";
041  private static final String TASK_PREFIX = "task_";
042  private static final String TASK_ERROR_STRING = "error";
043  private static final String TASK_COMPLETION_STRING = "done";
044  private static final String TASK_BOOLEAN_TRUE = "true";
045  private static final String TASK_BOOLEAN_FALSE = "false";
046  private static final String CONNECTION_LOSS = "ConnectionLoss";
047  private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000;
048  private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000;
049  private volatile String taskStatus = null;
050
051  private final String quorum;
052  private ZooKeeper zk;
053
054  public ChaosZKClient(String quorum) {
055    this.quorum = quorum;
056    try {
057      this.createNewZKConnection();
058    } catch (IOException e) {
059      LOG.error("Error creating ZooKeeper Connection: ", e);
060    }
061  }
062
063  /**
064   * Creates connection with ZooKeeper
065   * @throws IOException when not able to create connection properly
066   */
067  public void createNewZKConnection() throws IOException {
068    Watcher watcher = new Watcher() {
069      @Override
070      public void process(WatchedEvent watchedEvent) {
071        LOG.info("Created ZooKeeper Connection For executing task");
072      }
073    };
074
075    this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher);
076  }
077
078  /**
079   * Checks if ChaosAgent is running or not on target host by checking its ZNode.
080   * @param hostname hostname to check for chaosagent
081   * @return true/false whether agent is running or not
082   */
083  private boolean isChaosAgentRunning(String hostname) {
084    try {
085      return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false) != null;
086    } catch (KeeperException e) {
087      if (e.toString().contains(CONNECTION_LOSS)) {
088        recreateZKConnection();
089        try {
090          return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false)
091              != null;
092        } catch (KeeperException | InterruptedException ie) {
093          LOG.error("ERROR ", ie);
094        }
095      }
096    } catch (InterruptedException e) {
097      LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e);
098    }
099    return false;
100  }
101
102  /**
103   * Creates tasks for target hosts by creating ZNodes. Waits for a limited amount of time to
104   * complete task to execute.
105   * @param taskObject Object data represents command
106   * @return returns status
107   */
108  public String submitTask(final TaskObject taskObject) {
109    if (isChaosAgentRunning(taskObject.getTaskHostname())) {
110      LOG.info("Creating task node");
111      zk.create(
112        CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR + taskObject.getTaskHostname()
113          + ZNODE_PATH_SEPARATOR + TASK_PREFIX,
114        taskObject.getCommand().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
115        CreateMode.EPHEMERAL_SEQUENTIAL, submitTaskCallback, taskObject);
116      long start = EnvironmentEdgeManager.currentTime();
117
118      while ((EnvironmentEdgeManager.currentTime() - start) < TASK_EXECUTION_TIMEOUT) {
119        if (taskStatus != null) {
120          return taskStatus;
121        }
122        Threads.sleep(500);
123      }
124    } else {
125      LOG.info("EHHHHH!  ChaosAgent Not running");
126    }
127    return TASK_ERROR_STRING;
128  }
129
130  /**
131   * To get status of task submitted
132   * @param path path at which to get status
133   * @param ctx  path context
134   */
135  private void getStatus(String path, Object ctx) {
136    LOG.info("Getting Status of task: " + path);
137    zk.getData(path, false, getStatusCallback, ctx);
138  }
139
140  /**
141   * Set a watch on task submitted
142   * @param name       ZNode name to set a watch
143   * @param taskObject context for ZNode name
144   */
145  private void setStatusWatch(String name, TaskObject taskObject) {
146    LOG.info("Checking for ZNode and Setting watch for task : " + name);
147    zk.exists(name, setStatusWatcher, setStatusWatchCallback, taskObject);
148  }
149
150  /**
151   * Delete task after getting its status
152   * @param path path to delete ZNode
153   */
154  private void deleteTask(String path) {
155    LOG.info("Deleting task: " + path);
156    zk.delete(path, -1, taskDeleteCallback, null);
157  }
158
159  // WATCHERS:
160
161  /**
162   * Watcher to get notification whenever status of task changes.
163   */
164  Watcher setStatusWatcher = new Watcher() {
165    @Override
166    public void process(WatchedEvent watchedEvent) {
167      LOG.info("Setting status watch for task: " + watchedEvent.getPath());
168      if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
169        if (!watchedEvent.getPath().contains(TASK_PREFIX)) {
170          throw new RuntimeException(
171            KeeperException.create(KeeperException.Code.DATAINCONSISTENCY));
172        }
173        getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath());
174
175      }
176    }
177  };
178
179  // CALLBACKS
180
181  AsyncCallback.DataCallback getStatusCallback = (rc, path, ctx, data, stat) -> {
182    switch (KeeperException.Code.get(rc)) {
183      case CONNECTIONLOSS:
184        // Connectionloss while getting status of task, getting again
185        recreateZKConnection();
186        getStatus(path, ctx);
187        break;
188
189      case OK:
190        if (ctx != null) {
191
192          String status = new String(data);
193          taskStatus = status;
194          switch (status) {
195            case TASK_COMPLETION_STRING:
196            case TASK_BOOLEAN_TRUE:
197            case TASK_BOOLEAN_FALSE:
198              LOG.info("Task executed completely : Status --> " + status);
199              break;
200
201            case TASK_ERROR_STRING:
202              LOG.info("There was error while executing task : Status --> " + status);
203              break;
204
205            default:
206              LOG.warn("Status of task is undefined!! : Status --> " + status);
207          }
208
209          deleteTask(path);
210        }
211        break;
212
213      default:
214        LOG.error("ERROR while getting status of task: " + path + " ERROR: "
215          + KeeperException.create(KeeperException.Code.get(rc)));
216    }
217  };
218
219  AsyncCallback.StatCallback setStatusWatchCallback = (rc, path, ctx, stat) -> {
220    switch (KeeperException.Code.get(rc)) {
221      case CONNECTIONLOSS:
222        // ConnectionLoss while setting watch on status ZNode, setting again.
223        recreateZKConnection();
224        setStatusWatch(path, (TaskObject) ctx);
225        break;
226
227      case OK:
228        if (stat != null) {
229          getStatus(path, null);
230        }
231        break;
232
233      default:
234        LOG.error("ERROR while setting watch on task ZNode: " + path + " ERROR: "
235          + KeeperException.create(KeeperException.Code.get(rc)));
236    }
237  };
238
239  AsyncCallback.StringCallback submitTaskCallback = (rc, path, ctx, name) -> {
240    switch (KeeperException.Code.get(rc)) {
241      case CONNECTIONLOSS:
242        // Connection to server was lost while submitting task, submitting again.
243        recreateZKConnection();
244        submitTask((TaskObject) ctx);
245        break;
246
247      case OK:
248        LOG.info("Task created : " + name);
249        setStatusWatch(name, (TaskObject) ctx);
250        break;
251
252      default:
253        LOG.error("Error submitting task: " + name + " ERROR:"
254          + KeeperException.create(KeeperException.Code.get(rc)));
255    }
256  };
257
258  AsyncCallback.VoidCallback taskDeleteCallback = new AsyncCallback.VoidCallback() {
259    @Override
260    public void processResult(int rc, String path, Object ctx) {
261      switch (KeeperException.Code.get(rc)) {
262        case CONNECTIONLOSS:
263          // Connectionloss while deleting task, deleting again
264          recreateZKConnection();
265          deleteTask(path);
266          break;
267
268        case OK:
269          LOG.info("Task Deleted successfully!");
270          LOG.info("Closing ZooKeeper Connection");
271          try {
272            zk.close();
273          } catch (InterruptedException e) {
274            LOG.error("Error while closing ZooKeeper Connection.");
275          }
276          break;
277
278        default:
279          LOG.error("ERROR while deleting task: " + path + " ERROR: "
280            + KeeperException.create(KeeperException.Code.get(rc)));
281      }
282    }
283  };
284
285  private void recreateZKConnection() {
286    try {
287      zk.close();
288    } catch (InterruptedException e) {
289      LOG.error("Error closing ZK connection : ", e);
290    } finally {
291      try {
292        createNewZKConnection();
293      } catch (IOException e) {
294        LOG.error("Error creating new ZK COnnection for agent: ", e);
295      }
296    }
297  }
298
299  static class TaskObject {
300    private final String command;
301    private final String taskHostname;
302
303    public TaskObject(String command, String taskHostname) {
304      this.command = command;
305      this.taskHostname = taskHostname;
306    }
307
308    public String getCommand() {
309      return this.command;
310    }
311
312    public String getTaskHostname() {
313      return taskHostname;
314    }
315  }
316
317}