001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 022import org.apache.hadoop.hbase.util.Threads; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.zookeeper.AsyncCallback; 025import org.apache.zookeeper.CreateMode; 026import org.apache.zookeeper.KeeperException; 027import org.apache.zookeeper.WatchedEvent; 028import org.apache.zookeeper.Watcher; 029import org.apache.zookeeper.ZooDefs; 030import org.apache.zookeeper.ZooKeeper; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034@InterfaceAudience.Private 035public class ChaosZKClient { 036 037 private static final Logger LOG = LoggerFactory.getLogger(ChaosZKClient.class.getName()); 038 private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents"; 039 private static final String CHAOS_AGENT_STATUS_ZNODE = "/hbase/chaosAgentTaskStatus"; 040 private static final String ZNODE_PATH_SEPARATOR = "/"; 041 private static final String TASK_PREFIX = "task_"; 042 private static final String TASK_ERROR_STRING = "error"; 043 private static final String TASK_COMPLETION_STRING = "done"; 044 private static final String TASK_BOOLEAN_TRUE = "true"; 045 private static final String TASK_BOOLEAN_FALSE = "false"; 046 private static final String CONNECTION_LOSS = "ConnectionLoss"; 047 private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000; 048 private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000; 049 private volatile String taskStatus = null; 050 051 private final String quorum; 052 private ZooKeeper zk; 053 054 public ChaosZKClient(String quorum) { 055 this.quorum = quorum; 056 try { 057 this.createNewZKConnection(); 058 } catch (IOException e) { 059 LOG.error("Error creating ZooKeeper Connection: ", e); 060 } 061 } 062 063 /** 064 * Creates connection with ZooKeeper 065 * @throws IOException when not able to create connection properly 066 */ 067 public void createNewZKConnection() throws IOException { 068 Watcher watcher = new Watcher() { 069 @Override 070 public void process(WatchedEvent watchedEvent) { 071 LOG.info("Created ZooKeeper Connection For executing task"); 072 } 073 }; 074 075 this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher); 076 } 077 078 /** 079 * Checks if ChaosAgent is running or not on target host by checking its ZNode. 080 * @param hostname hostname to check for chaosagent 081 * @return true/false whether agent is running or not 082 */ 083 private boolean isChaosAgentRunning(String hostname) { 084 try { 085 return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false) != null; 086 } catch (KeeperException e) { 087 if (e.toString().contains(CONNECTION_LOSS)) { 088 recreateZKConnection(); 089 try { 090 return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false) 091 != null; 092 } catch (KeeperException | InterruptedException ie) { 093 LOG.error("ERROR ", ie); 094 } 095 } 096 } catch (InterruptedException e) { 097 LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e); 098 } 099 return false; 100 } 101 102 /** 103 * Creates tasks for target hosts by creating ZNodes. Waits for a limited amount of time to 104 * complete task to execute. 105 * @param taskObject Object data represents command 106 * @return returns status 107 */ 108 public String submitTask(final TaskObject taskObject) { 109 if (isChaosAgentRunning(taskObject.getTaskHostname())) { 110 LOG.info("Creating task node"); 111 zk.create( 112 CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR + taskObject.getTaskHostname() 113 + ZNODE_PATH_SEPARATOR + TASK_PREFIX, 114 taskObject.getCommand().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 115 CreateMode.EPHEMERAL_SEQUENTIAL, submitTaskCallback, taskObject); 116 long start = EnvironmentEdgeManager.currentTime(); 117 118 while ((EnvironmentEdgeManager.currentTime() - start) < TASK_EXECUTION_TIMEOUT) { 119 if (taskStatus != null) { 120 return taskStatus; 121 } 122 Threads.sleep(500); 123 } 124 } else { 125 LOG.info("EHHHHH! ChaosAgent Not running"); 126 } 127 return TASK_ERROR_STRING; 128 } 129 130 /** 131 * To get status of task submitted 132 * @param path path at which to get status 133 * @param ctx path context 134 */ 135 private void getStatus(String path, Object ctx) { 136 LOG.info("Getting Status of task: " + path); 137 zk.getData(path, false, getStatusCallback, ctx); 138 } 139 140 /** 141 * Set a watch on task submitted 142 * @param name ZNode name to set a watch 143 * @param taskObject context for ZNode name 144 */ 145 private void setStatusWatch(String name, TaskObject taskObject) { 146 LOG.info("Checking for ZNode and Setting watch for task : " + name); 147 zk.exists(name, setStatusWatcher, setStatusWatchCallback, taskObject); 148 } 149 150 /** 151 * Delete task after getting its status 152 * @param path path to delete ZNode 153 */ 154 private void deleteTask(String path) { 155 LOG.info("Deleting task: " + path); 156 zk.delete(path, -1, taskDeleteCallback, null); 157 } 158 159 // WATCHERS: 160 161 /** 162 * Watcher to get notification whenever status of task changes. 163 */ 164 Watcher setStatusWatcher = new Watcher() { 165 @Override 166 public void process(WatchedEvent watchedEvent) { 167 LOG.info("Setting status watch for task: " + watchedEvent.getPath()); 168 if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { 169 if (!watchedEvent.getPath().contains(TASK_PREFIX)) { 170 throw new RuntimeException( 171 KeeperException.create(KeeperException.Code.DATAINCONSISTENCY)); 172 } 173 getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath()); 174 175 } 176 } 177 }; 178 179 // CALLBACKS 180 181 AsyncCallback.DataCallback getStatusCallback = (rc, path, ctx, data, stat) -> { 182 switch (KeeperException.Code.get(rc)) { 183 case CONNECTIONLOSS: 184 // Connectionloss while getting status of task, getting again 185 recreateZKConnection(); 186 getStatus(path, ctx); 187 break; 188 189 case OK: 190 if (ctx != null) { 191 192 String status = new String(data); 193 taskStatus = status; 194 switch (status) { 195 case TASK_COMPLETION_STRING: 196 case TASK_BOOLEAN_TRUE: 197 case TASK_BOOLEAN_FALSE: 198 LOG.info("Task executed completely : Status --> " + status); 199 break; 200 201 case TASK_ERROR_STRING: 202 LOG.info("There was error while executing task : Status --> " + status); 203 break; 204 205 default: 206 LOG.warn("Status of task is undefined!! : Status --> " + status); 207 } 208 209 deleteTask(path); 210 } 211 break; 212 213 default: 214 LOG.error("ERROR while getting status of task: " + path + " ERROR: " 215 + KeeperException.create(KeeperException.Code.get(rc))); 216 } 217 }; 218 219 AsyncCallback.StatCallback setStatusWatchCallback = (rc, path, ctx, stat) -> { 220 switch (KeeperException.Code.get(rc)) { 221 case CONNECTIONLOSS: 222 // ConnectionLoss while setting watch on status ZNode, setting again. 223 recreateZKConnection(); 224 setStatusWatch(path, (TaskObject) ctx); 225 break; 226 227 case OK: 228 if (stat != null) { 229 getStatus(path, null); 230 } 231 break; 232 233 default: 234 LOG.error("ERROR while setting watch on task ZNode: " + path + " ERROR: " 235 + KeeperException.create(KeeperException.Code.get(rc))); 236 } 237 }; 238 239 AsyncCallback.StringCallback submitTaskCallback = (rc, path, ctx, name) -> { 240 switch (KeeperException.Code.get(rc)) { 241 case CONNECTIONLOSS: 242 // Connection to server was lost while submitting task, submitting again. 243 recreateZKConnection(); 244 submitTask((TaskObject) ctx); 245 break; 246 247 case OK: 248 LOG.info("Task created : " + name); 249 setStatusWatch(name, (TaskObject) ctx); 250 break; 251 252 default: 253 LOG.error("Error submitting task: " + name + " ERROR:" 254 + KeeperException.create(KeeperException.Code.get(rc))); 255 } 256 }; 257 258 AsyncCallback.VoidCallback taskDeleteCallback = new AsyncCallback.VoidCallback() { 259 @Override 260 public void processResult(int rc, String path, Object ctx) { 261 switch (KeeperException.Code.get(rc)) { 262 case CONNECTIONLOSS: 263 // Connectionloss while deleting task, deleting again 264 recreateZKConnection(); 265 deleteTask(path); 266 break; 267 268 case OK: 269 LOG.info("Task Deleted successfully!"); 270 LOG.info("Closing ZooKeeper Connection"); 271 try { 272 zk.close(); 273 } catch (InterruptedException e) { 274 LOG.error("Error while closing ZooKeeper Connection."); 275 } 276 break; 277 278 default: 279 LOG.error("ERROR while deleting task: " + path + " ERROR: " 280 + KeeperException.create(KeeperException.Code.get(rc))); 281 } 282 } 283 }; 284 285 private void recreateZKConnection() { 286 try { 287 zk.close(); 288 } catch (InterruptedException e) { 289 LOG.error("Error closing ZK connection : ", e); 290 } finally { 291 try { 292 createNewZKConnection(); 293 } catch (IOException e) { 294 LOG.error("Error creating new ZK COnnection for agent: ", e); 295 } 296 } 297 } 298 299 static class TaskObject { 300 private final String command; 301 private final String taskHostname; 302 303 public TaskObject(String command, String taskHostname) { 304 this.command = command; 305 this.taskHostname = taskHostname; 306 } 307 308 public String getCommand() { 309 return this.command; 310 } 311 312 public String getTaskHostname() { 313 return taskHostname; 314 } 315 } 316 317}