001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.nio.charset.StandardCharsets;
022import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
023import org.apache.hadoop.hbase.util.Threads;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.zookeeper.AsyncCallback;
026import org.apache.zookeeper.CreateMode;
027import org.apache.zookeeper.KeeperException;
028import org.apache.zookeeper.WatchedEvent;
029import org.apache.zookeeper.Watcher;
030import org.apache.zookeeper.ZooDefs;
031import org.apache.zookeeper.ZooKeeper;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035@InterfaceAudience.Private
036public class ChaosZKClient {
037
038  private static final Logger LOG = LoggerFactory.getLogger(ChaosZKClient.class.getName());
039  private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents";
040  private static final String CHAOS_AGENT_STATUS_ZNODE = "/hbase/chaosAgentTaskStatus";
041  private static final String ZNODE_PATH_SEPARATOR = "/";
042  private static final String TASK_PREFIX = "task_";
043  private static final String TASK_ERROR_STRING = "error";
044  private static final String TASK_COMPLETION_STRING = "done";
045  private static final String TASK_BOOLEAN_TRUE = "true";
046  private static final String TASK_BOOLEAN_FALSE = "false";
047  private static final String CONNECTION_LOSS = "ConnectionLoss";
048  private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000;
049  private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000;
050  private volatile String taskStatus = null;
051
052  private final String quorum;
053  private ZooKeeper zk;
054
055  public ChaosZKClient(String quorum) {
056    this.quorum = quorum;
057    try {
058      this.createNewZKConnection();
059    } catch (IOException e) {
060      LOG.error("Error creating ZooKeeper Connection: ", e);
061    }
062  }
063
064  /**
065   * Creates connection with ZooKeeper
066   * @throws IOException when not able to create connection properly
067   */
068  public void createNewZKConnection() throws IOException {
069    Watcher watcher = new Watcher() {
070      @Override
071      public void process(WatchedEvent watchedEvent) {
072        LOG.info("Created ZooKeeper Connection For executing task");
073      }
074    };
075
076    this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher);
077  }
078
079  /**
080   * Checks if ChaosAgent is running or not on target host by checking its ZNode.
081   * @param hostname hostname to check for chaosagent
082   * @return true/false whether agent is running or not
083   */
084  private boolean isChaosAgentRunning(String hostname) {
085    try {
086      return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false) != null;
087    } catch (KeeperException e) {
088      if (e.toString().contains(CONNECTION_LOSS)) {
089        recreateZKConnection();
090        try {
091          return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false)
092              != null;
093        } catch (KeeperException | InterruptedException ie) {
094          LOG.error("ERROR ", ie);
095        }
096      }
097    } catch (InterruptedException e) {
098      LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e);
099    }
100    return false;
101  }
102
103  /**
104   * Creates tasks for target hosts by creating ZNodes. Waits for a limited amount of time to
105   * complete task to execute.
106   * @param taskObject Object data represents command
107   * @return returns status
108   */
109  public String submitTask(final TaskObject taskObject) {
110    if (isChaosAgentRunning(taskObject.getTaskHostname())) {
111      LOG.info("Creating task node");
112      zk.create(
113        CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR + taskObject.getTaskHostname()
114          + ZNODE_PATH_SEPARATOR + TASK_PREFIX,
115        taskObject.getCommand().getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
116        CreateMode.EPHEMERAL_SEQUENTIAL, submitTaskCallback, taskObject);
117      long start = EnvironmentEdgeManager.currentTime();
118
119      while ((EnvironmentEdgeManager.currentTime() - start) < TASK_EXECUTION_TIMEOUT) {
120        if (taskStatus != null) {
121          return taskStatus;
122        }
123        Threads.sleep(500);
124      }
125    } else {
126      LOG.info("EHHHHH!  ChaosAgent Not running");
127    }
128    return TASK_ERROR_STRING;
129  }
130
131  /**
132   * To get status of task submitted
133   * @param path path at which to get status
134   * @param ctx  path context
135   */
136  private void getStatus(String path, Object ctx) {
137    LOG.info("Getting Status of task: " + path);
138    zk.getData(path, false, getStatusCallback, ctx);
139  }
140
141  /**
142   * Set a watch on task submitted
143   * @param name       ZNode name to set a watch
144   * @param taskObject context for ZNode name
145   */
146  private void setStatusWatch(String name, TaskObject taskObject) {
147    LOG.info("Checking for ZNode and Setting watch for task : " + name);
148    zk.exists(name, setStatusWatcher, setStatusWatchCallback, taskObject);
149  }
150
151  /**
152   * Delete task after getting its status
153   * @param path path to delete ZNode
154   */
155  private void deleteTask(String path) {
156    LOG.info("Deleting task: " + path);
157    zk.delete(path, -1, taskDeleteCallback, null);
158  }
159
160  // WATCHERS:
161
162  /**
163   * Watcher to get notification whenever status of task changes.
164   */
165  Watcher setStatusWatcher = new Watcher() {
166    @Override
167    public void process(WatchedEvent watchedEvent) {
168      LOG.info("Setting status watch for task: " + watchedEvent.getPath());
169      if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
170        if (!watchedEvent.getPath().contains(TASK_PREFIX)) {
171          throw new RuntimeException(
172            KeeperException.create(KeeperException.Code.DATAINCONSISTENCY));
173        }
174        getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath());
175
176      }
177    }
178  };
179
180  // CALLBACKS
181
182  AsyncCallback.DataCallback getStatusCallback = (rc, path, ctx, data, stat) -> {
183    switch (KeeperException.Code.get(rc)) {
184      case CONNECTIONLOSS:
185        // Connectionloss while getting status of task, getting again
186        recreateZKConnection();
187        getStatus(path, ctx);
188        break;
189
190      case OK:
191        if (ctx != null) {
192
193          String status = new String(data, StandardCharsets.UTF_8);
194          taskStatus = status;
195          switch (status) {
196            case TASK_COMPLETION_STRING:
197            case TASK_BOOLEAN_TRUE:
198            case TASK_BOOLEAN_FALSE:
199              LOG.info("Task executed completely : Status --> " + status);
200              break;
201
202            case TASK_ERROR_STRING:
203              LOG.info("There was error while executing task : Status --> " + status);
204              break;
205
206            default:
207              LOG.warn("Status of task is undefined!! : Status --> " + status);
208          }
209
210          deleteTask(path);
211        }
212        break;
213
214      default:
215        LOG.error("ERROR while getting status of task: " + path + " ERROR: "
216          + KeeperException.create(KeeperException.Code.get(rc)));
217    }
218  };
219
220  AsyncCallback.StatCallback setStatusWatchCallback = (rc, path, ctx, stat) -> {
221    switch (KeeperException.Code.get(rc)) {
222      case CONNECTIONLOSS:
223        // ConnectionLoss while setting watch on status ZNode, setting again.
224        recreateZKConnection();
225        setStatusWatch(path, (TaskObject) ctx);
226        break;
227
228      case OK:
229        if (stat != null) {
230          getStatus(path, null);
231        }
232        break;
233
234      default:
235        LOG.error("ERROR while setting watch on task ZNode: " + path + " ERROR: "
236          + KeeperException.create(KeeperException.Code.get(rc)));
237    }
238  };
239
240  AsyncCallback.StringCallback submitTaskCallback = (rc, path, ctx, name) -> {
241    switch (KeeperException.Code.get(rc)) {
242      case CONNECTIONLOSS:
243        // Connection to server was lost while submitting task, submitting again.
244        recreateZKConnection();
245        submitTask((TaskObject) ctx);
246        break;
247
248      case OK:
249        LOG.info("Task created : " + name);
250        setStatusWatch(name, (TaskObject) ctx);
251        break;
252
253      default:
254        LOG.error("Error submitting task: " + name + " ERROR:"
255          + KeeperException.create(KeeperException.Code.get(rc)));
256    }
257  };
258
259  AsyncCallback.VoidCallback taskDeleteCallback = new AsyncCallback.VoidCallback() {
260    @Override
261    public void processResult(int rc, String path, Object ctx) {
262      switch (KeeperException.Code.get(rc)) {
263        case CONNECTIONLOSS:
264          // Connectionloss while deleting task, deleting again
265          recreateZKConnection();
266          deleteTask(path);
267          break;
268
269        case OK:
270          LOG.info("Task Deleted successfully!");
271          LOG.info("Closing ZooKeeper Connection");
272          try {
273            zk.close();
274          } catch (InterruptedException e) {
275            LOG.error("Error while closing ZooKeeper Connection.");
276          }
277          break;
278
279        default:
280          LOG.error("ERROR while deleting task: " + path + " ERROR: "
281            + KeeperException.create(KeeperException.Code.get(rc)));
282      }
283    }
284  };
285
286  private void recreateZKConnection() {
287    try {
288      zk.close();
289    } catch (InterruptedException e) {
290      LOG.error("Error closing ZK connection : ", e);
291    } finally {
292      try {
293        createNewZKConnection();
294      } catch (IOException e) {
295        LOG.error("Error creating new ZK COnnection for agent: ", e);
296      }
297    }
298  }
299
300  static class TaskObject {
301    private final String command;
302    private final String taskHostname;
303
304    public TaskObject(String command, String taskHostname) {
305      this.command = command;
306      this.taskHostname = taskHostname;
307    }
308
309    public String getCommand() {
310      return this.command;
311    }
312
313    public String getTaskHostname() {
314      return taskHostname;
315    }
316  }
317
318}