001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.nio.charset.StandardCharsets; 022import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 023import org.apache.hadoop.hbase.util.Threads; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.zookeeper.AsyncCallback; 026import org.apache.zookeeper.CreateMode; 027import org.apache.zookeeper.KeeperException; 028import org.apache.zookeeper.WatchedEvent; 029import org.apache.zookeeper.Watcher; 030import org.apache.zookeeper.ZooDefs; 031import org.apache.zookeeper.ZooKeeper; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035@InterfaceAudience.Private 036public class ChaosZKClient { 037 038 private static final Logger LOG = LoggerFactory.getLogger(ChaosZKClient.class.getName()); 039 private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents"; 040 private static final String CHAOS_AGENT_STATUS_ZNODE = "/hbase/chaosAgentTaskStatus"; 041 private static final String ZNODE_PATH_SEPARATOR = "/"; 042 private static final String TASK_PREFIX = "task_"; 043 private static final String TASK_ERROR_STRING = "error"; 044 private static final String TASK_COMPLETION_STRING = "done"; 045 private static final String TASK_BOOLEAN_TRUE = "true"; 046 private static final String TASK_BOOLEAN_FALSE = "false"; 047 private static final String CONNECTION_LOSS = "ConnectionLoss"; 048 private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000; 049 private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000; 050 private volatile String taskStatus = null; 051 052 private final String quorum; 053 private ZooKeeper zk; 054 055 public ChaosZKClient(String quorum) { 056 this.quorum = quorum; 057 try { 058 this.createNewZKConnection(); 059 } catch (IOException e) { 060 LOG.error("Error creating ZooKeeper Connection: ", e); 061 } 062 } 063 064 /** 065 * Creates connection with ZooKeeper 066 * @throws IOException when not able to create connection properly 067 */ 068 public void createNewZKConnection() throws IOException { 069 Watcher watcher = new Watcher() { 070 @Override 071 public void process(WatchedEvent watchedEvent) { 072 LOG.info("Created ZooKeeper Connection For executing task"); 073 } 074 }; 075 076 this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher); 077 } 078 079 /** 080 * Checks if ChaosAgent is running or not on target host by checking its ZNode. 081 * @param hostname hostname to check for chaosagent 082 * @return true/false whether agent is running or not 083 */ 084 private boolean isChaosAgentRunning(String hostname) { 085 try { 086 return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false) != null; 087 } catch (KeeperException e) { 088 if (e.toString().contains(CONNECTION_LOSS)) { 089 recreateZKConnection(); 090 try { 091 return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, false) 092 != null; 093 } catch (KeeperException | InterruptedException ie) { 094 LOG.error("ERROR ", ie); 095 } 096 } 097 } catch (InterruptedException e) { 098 LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e); 099 } 100 return false; 101 } 102 103 /** 104 * Creates tasks for target hosts by creating ZNodes. Waits for a limited amount of time to 105 * complete task to execute. 106 * @param taskObject Object data represents command 107 * @return returns status 108 */ 109 public String submitTask(final TaskObject taskObject) { 110 if (isChaosAgentRunning(taskObject.getTaskHostname())) { 111 LOG.info("Creating task node"); 112 zk.create( 113 CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR + taskObject.getTaskHostname() 114 + ZNODE_PATH_SEPARATOR + TASK_PREFIX, 115 taskObject.getCommand().getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, 116 CreateMode.EPHEMERAL_SEQUENTIAL, submitTaskCallback, taskObject); 117 long start = EnvironmentEdgeManager.currentTime(); 118 119 while ((EnvironmentEdgeManager.currentTime() - start) < TASK_EXECUTION_TIMEOUT) { 120 if (taskStatus != null) { 121 return taskStatus; 122 } 123 Threads.sleep(500); 124 } 125 } else { 126 LOG.info("EHHHHH! ChaosAgent Not running"); 127 } 128 return TASK_ERROR_STRING; 129 } 130 131 /** 132 * To get status of task submitted 133 * @param path path at which to get status 134 * @param ctx path context 135 */ 136 private void getStatus(String path, Object ctx) { 137 LOG.info("Getting Status of task: " + path); 138 zk.getData(path, false, getStatusCallback, ctx); 139 } 140 141 /** 142 * Set a watch on task submitted 143 * @param name ZNode name to set a watch 144 * @param taskObject context for ZNode name 145 */ 146 private void setStatusWatch(String name, TaskObject taskObject) { 147 LOG.info("Checking for ZNode and Setting watch for task : " + name); 148 zk.exists(name, setStatusWatcher, setStatusWatchCallback, taskObject); 149 } 150 151 /** 152 * Delete task after getting its status 153 * @param path path to delete ZNode 154 */ 155 private void deleteTask(String path) { 156 LOG.info("Deleting task: " + path); 157 zk.delete(path, -1, taskDeleteCallback, null); 158 } 159 160 // WATCHERS: 161 162 /** 163 * Watcher to get notification whenever status of task changes. 164 */ 165 Watcher setStatusWatcher = new Watcher() { 166 @Override 167 public void process(WatchedEvent watchedEvent) { 168 LOG.info("Setting status watch for task: " + watchedEvent.getPath()); 169 if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { 170 if (!watchedEvent.getPath().contains(TASK_PREFIX)) { 171 throw new RuntimeException( 172 KeeperException.create(KeeperException.Code.DATAINCONSISTENCY)); 173 } 174 getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath()); 175 176 } 177 } 178 }; 179 180 // CALLBACKS 181 182 AsyncCallback.DataCallback getStatusCallback = (rc, path, ctx, data, stat) -> { 183 switch (KeeperException.Code.get(rc)) { 184 case CONNECTIONLOSS: 185 // Connectionloss while getting status of task, getting again 186 recreateZKConnection(); 187 getStatus(path, ctx); 188 break; 189 190 case OK: 191 if (ctx != null) { 192 193 String status = new String(data, StandardCharsets.UTF_8); 194 taskStatus = status; 195 switch (status) { 196 case TASK_COMPLETION_STRING: 197 case TASK_BOOLEAN_TRUE: 198 case TASK_BOOLEAN_FALSE: 199 LOG.info("Task executed completely : Status --> " + status); 200 break; 201 202 case TASK_ERROR_STRING: 203 LOG.info("There was error while executing task : Status --> " + status); 204 break; 205 206 default: 207 LOG.warn("Status of task is undefined!! : Status --> " + status); 208 } 209 210 deleteTask(path); 211 } 212 break; 213 214 default: 215 LOG.error("ERROR while getting status of task: " + path + " ERROR: " 216 + KeeperException.create(KeeperException.Code.get(rc))); 217 } 218 }; 219 220 AsyncCallback.StatCallback setStatusWatchCallback = (rc, path, ctx, stat) -> { 221 switch (KeeperException.Code.get(rc)) { 222 case CONNECTIONLOSS: 223 // ConnectionLoss while setting watch on status ZNode, setting again. 224 recreateZKConnection(); 225 setStatusWatch(path, (TaskObject) ctx); 226 break; 227 228 case OK: 229 if (stat != null) { 230 getStatus(path, null); 231 } 232 break; 233 234 default: 235 LOG.error("ERROR while setting watch on task ZNode: " + path + " ERROR: " 236 + KeeperException.create(KeeperException.Code.get(rc))); 237 } 238 }; 239 240 AsyncCallback.StringCallback submitTaskCallback = (rc, path, ctx, name) -> { 241 switch (KeeperException.Code.get(rc)) { 242 case CONNECTIONLOSS: 243 // Connection to server was lost while submitting task, submitting again. 244 recreateZKConnection(); 245 submitTask((TaskObject) ctx); 246 break; 247 248 case OK: 249 LOG.info("Task created : " + name); 250 setStatusWatch(name, (TaskObject) ctx); 251 break; 252 253 default: 254 LOG.error("Error submitting task: " + name + " ERROR:" 255 + KeeperException.create(KeeperException.Code.get(rc))); 256 } 257 }; 258 259 AsyncCallback.VoidCallback taskDeleteCallback = new AsyncCallback.VoidCallback() { 260 @Override 261 public void processResult(int rc, String path, Object ctx) { 262 switch (KeeperException.Code.get(rc)) { 263 case CONNECTIONLOSS: 264 // Connectionloss while deleting task, deleting again 265 recreateZKConnection(); 266 deleteTask(path); 267 break; 268 269 case OK: 270 LOG.info("Task Deleted successfully!"); 271 LOG.info("Closing ZooKeeper Connection"); 272 try { 273 zk.close(); 274 } catch (InterruptedException e) { 275 LOG.error("Error while closing ZooKeeper Connection."); 276 } 277 break; 278 279 default: 280 LOG.error("ERROR while deleting task: " + path + " ERROR: " 281 + KeeperException.create(KeeperException.Code.get(rc))); 282 } 283 } 284 }; 285 286 private void recreateZKConnection() { 287 try { 288 zk.close(); 289 } catch (InterruptedException e) { 290 LOG.error("Error closing ZK connection : ", e); 291 } finally { 292 try { 293 createNewZKConnection(); 294 } catch (IOException e) { 295 LOG.error("Error creating new ZK COnnection for agent: ", e); 296 } 297 } 298 } 299 300 static class TaskObject { 301 private final String command; 302 private final String taskHostname; 303 304 public TaskObject(String command, String taskHostname) { 305 this.command = command; 306 this.taskHostname = taskHostname; 307 } 308 309 public String getCommand() { 310 return this.command; 311 } 312 313 public String getTaskHostname() { 314 return taskHostname; 315 } 316 } 317 318}