001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import org.apache.hadoop.hbase.errorhandling.ForeignException; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.hadoop.hbase.zookeeper.ZKUtil; 026import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 027import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.zookeeper.KeeperException; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 034 035/** 036 * ZooKeeper based controller for a procedure member. 037 * <p> 038 * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member, since each 039 * procedure type is bound to a single set of znodes. You can have multiple 040 * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member name, but each 041 * individual rpcs is still bound to a single member name (and since they are used to determine 042 * global progress, its important to not get this wrong). 043 * <p> 044 * To make this slightly more confusing, you can run multiple, concurrent procedures at the same 045 * time (as long as they have different types), from the same controller, but the same node name 046 * must be used for each procedure (though there is no conflict between the two procedure as long as 047 * they have distinct names). 048 * <p> 049 * There is no real error recovery with this mechanism currently -- if any the coordinator fails, 050 * its re-initialization will delete the znodes and require all in progress subprocedures to start 051 * anew. 052 */ 053@InterfaceAudience.Private 054public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs { 055 private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureMemberRpcs.class); 056 057 private final ZKProcedureUtil zkController; 058 059 protected ProcedureMember member; 060 private String memberName; 061 062 /** 063 * Must call {@link #start(String, ProcedureMember)} before this can be used. 064 * @param watcher {@link ZKWatcher} to be owned by <tt>this</tt>. Closed via {@link #close()}. 065 * @param procType name of the znode describing the procedure type 066 * @throws KeeperException if we can't reach zookeeper 067 */ 068 public ZKProcedureMemberRpcs(final ZKWatcher watcher, final String procType) 069 throws KeeperException { 070 this.zkController = new ZKProcedureUtil(watcher, procType) { 071 @Override 072 public void nodeCreated(String path) { 073 if (!isInProcedurePath(path)) { 074 return; 075 } 076 077 LOG.info("Received created event:" + path); 078 // if it is a simple start/end/abort then we just rewatch the node 079 if (isAcquiredNode(path)) { 080 waitForNewProcedures(); 081 return; 082 } else if (isAbortNode(path)) { 083 watchForAbortedProcedures(); 084 return; 085 } 086 String parent = ZKUtil.getParent(path); 087 // if its the end barrier, the procedure can be completed 088 if (isReachedNode(parent)) { 089 receivedReachedGlobalBarrier(path); 090 return; 091 } else if (isAbortNode(parent)) { 092 abort(path); 093 return; 094 } else if (isAcquiredNode(parent)) { 095 startNewSubprocedure(path); 096 } else { 097 LOG.debug("Ignoring created notification for node:" + path); 098 } 099 } 100 101 @Override 102 public void nodeChildrenChanged(String path) { 103 if (path.equals(this.acquiredZnode)) { 104 LOG.info("Received procedure start children changed event: " + path); 105 waitForNewProcedures(); 106 } else if (path.equals(this.abortZnode)) { 107 LOG.info("Received procedure abort children changed event: " + path); 108 watchForAbortedProcedures(); 109 } 110 } 111 }; 112 } 113 114 public ZKProcedureUtil getZkController() { 115 return zkController; 116 } 117 118 @Override 119 public String getMemberName() { 120 return memberName; 121 } 122 123 /** 124 * Pass along the procedure global barrier notification to any listeners 125 * @param path full znode path that cause the notification 126 */ 127 private void receivedReachedGlobalBarrier(String path) { 128 LOG.debug("Received reached global barrier:" + path); 129 String procName = ZKUtil.getNodeName(path); 130 this.member.receivedReachedGlobalBarrier(procName); 131 } 132 133 private void watchForAbortedProcedures() { 134 LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'"); 135 try { 136 // this is the list of the currently aborted procedues 137 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), 138 zkController.getAbortZnode()); 139 if (children == null || children.isEmpty()) { 140 return; 141 } 142 for (String node : children) { 143 String abortNode = ZNodePaths.joinZNode(zkController.getAbortZnode(), node); 144 abort(abortNode); 145 } 146 } catch (KeeperException e) { 147 member.controllerConnectionFailure( 148 "Failed to list children for abort node:" + zkController.getAbortZnode(), e, null); 149 } 150 } 151 152 private void waitForNewProcedures() { 153 // watch for new procedues that we need to start subprocedures for 154 LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'"); 155 List<String> runningProcedures = null; 156 try { 157 runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), 158 zkController.getAcquiredBarrier()); 159 if (runningProcedures == null) { 160 LOG.debug("No running procedures."); 161 return; 162 } 163 } catch (KeeperException e) { 164 member.controllerConnectionFailure("General failure when watching for new procedures", e, 165 null); 166 } 167 if (runningProcedures == null) { 168 LOG.debug("No running procedures."); 169 return; 170 } 171 for (String procName : runningProcedures) { 172 // then read in the procedure information 173 String path = ZNodePaths.joinZNode(zkController.getAcquiredBarrier(), procName); 174 startNewSubprocedure(path); 175 } 176 } 177 178 /** 179 * Kick off a new sub-procedure on the listener with the data stored in the passed znode. 180 * <p> 181 * Will attempt to create the same procedure multiple times if an procedure znode with the same 182 * name is created. It is left up the coordinator to ensure this doesn't occur. 183 * @param path full path to the znode for the procedure to start 184 */ 185 private synchronized void startNewSubprocedure(String path) { 186 LOG.debug("Found procedure znode: " + path); 187 String opName = ZKUtil.getNodeName(path); 188 // start watching for an abort notification for the procedure 189 String abortZNode = zkController.getAbortZNode(opName); 190 try { 191 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) { 192 LOG.debug("Not starting:" + opName + " because we already have an abort notification."); 193 return; 194 } 195 } catch (KeeperException e) { 196 member.controllerConnectionFailure( 197 "Failed to get the abort znode (" + abortZNode + ") for procedure :" + opName, e, opName); 198 return; 199 } 200 201 // get the data for the procedure 202 Subprocedure subproc = null; 203 try { 204 byte[] data = ZKUtil.getData(zkController.getWatcher(), path); 205 if (!ProtobufUtil.isPBMagicPrefix(data)) { 206 String msg = 207 "Data in for starting procedure " + opName + " is illegally formatted (no pb magic). " 208 + "Killing the procedure: " + Bytes.toString(data); 209 LOG.error(msg); 210 throw new IllegalArgumentException(msg); 211 } 212 LOG.debug("start proc data length is " + data.length); 213 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 214 LOG.debug("Found data for znode:" + path); 215 subproc = member.createSubprocedure(opName, data); 216 member.submitSubprocedure(subproc); 217 } catch (IllegalArgumentException iae) { 218 LOG.error("Illegal argument exception", iae); 219 sendMemberAborted(subproc, new ForeignException(getMemberName(), iae)); 220 } catch (IllegalStateException ise) { 221 LOG.error("Illegal state exception ", ise); 222 sendMemberAborted(subproc, new ForeignException(getMemberName(), ise)); 223 } catch (KeeperException e) { 224 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, e, 225 opName); 226 } catch (InterruptedException e) { 227 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, e, 228 opName); 229 Thread.currentThread().interrupt(); 230 } 231 } 232 233 /** 234 * This attempts to create an acquired state znode for the procedure (snapshot name). It then 235 * looks for the reached znode to trigger in-barrier execution. If not present we have a watcher, 236 * if present then trigger the in-barrier action. 237 */ 238 @Override 239 public void sendMemberAcquired(Subprocedure sub) throws IOException { 240 String procName = sub.getName(); 241 try { 242 LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName 243 + ") in zk"); 244 String acquiredZNode = ZNodePaths 245 .joinZNode(ZKProcedureUtil.getAcquireBarrierNode(zkController, procName), memberName); 246 ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode); 247 248 // watch for the complete node for this snapshot 249 String reachedBarrier = zkController.getReachedBarrierNode(procName); 250 LOG.debug("Watch for global barrier reached:" + reachedBarrier); 251 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) { 252 receivedReachedGlobalBarrier(reachedBarrier); 253 } 254 } catch (KeeperException e) { 255 member.controllerConnectionFailure( 256 "Failed to acquire barrier for procedure: " + procName + " and member: " + memberName, e, 257 procName); 258 } 259 } 260 261 /** 262 * This acts as the ack for a completed procedure 263 */ 264 @Override 265 public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException { 266 String procName = sub.getName(); 267 LOG.debug( 268 "Marking procedure '" + procName + "' completed for member '" + memberName + "' in zk"); 269 String joinPath = 270 ZNodePaths.joinZNode(zkController.getReachedBarrierNode(procName), memberName); 271 // ProtobufUtil.prependPBMagic does not take care of null 272 if (data == null) { 273 data = new byte[0]; 274 } 275 try { 276 ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath, 277 ProtobufUtil.prependPBMagic(data)); 278 } catch (KeeperException e) { 279 member.controllerConnectionFailure( 280 "Failed to post zk node:" + joinPath + " to join procedure barrier.", e, procName); 281 } 282 } 283 284 /** 285 * This should be called by the member and should write a serialized root cause exception as to 286 * the abort znode. 287 */ 288 @Override 289 public void sendMemberAborted(Subprocedure sub, ForeignException ee) { 290 if (sub == null) { 291 LOG.error("Failed due to null subprocedure", ee); 292 return; 293 } 294 String procName = sub.getName(); 295 LOG.debug("Aborting procedure (" + procName + ") in zk"); 296 String procAbortZNode = zkController.getAbortZNode(procName); 297 try { 298 String source = (ee.getSource() == null) ? memberName : ee.getSource(); 299 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee)); 300 ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo); 301 LOG.debug("Finished creating abort znode:" + procAbortZNode); 302 } catch (KeeperException e) { 303 // possible that we get this error for the procedure if we already reset the zk state, but in 304 // that case we should still get an error for that procedure anyways 305 zkController.logZKTree(zkController.getBaseZnode()); 306 member.controllerConnectionFailure( 307 "Failed to post zk node:" + procAbortZNode + " to abort procedure", e, procName); 308 } 309 } 310 311 /** 312 * Pass along the found abort notification to the listener 313 * @param abortZNode full znode path to the failed procedure information 314 */ 315 protected void abort(String abortZNode) { 316 LOG.debug("Aborting procedure member for znode " + abortZNode); 317 String opName = ZKUtil.getNodeName(abortZNode); 318 try { 319 byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode); 320 321 // figure out the data we need to pass 322 ForeignException ee; 323 try { 324 if (data == null || data.length == 0) { 325 // ignore 326 return; 327 } else if (!ProtobufUtil.isPBMagicPrefix(data)) { 328 String msg = "Illegally formatted data in abort node for proc " + opName 329 + ". Killing the procedure."; 330 LOG.error(msg); 331 // we got a remote exception, but we can't describe it so just return exn from here 332 ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg)); 333 } else { 334 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 335 ee = ForeignException.deserialize(data); 336 } 337 } catch (IOException e) { 338 LOG.warn("Got an error notification for op:" + opName 339 + " but we can't read the information. Killing the procedure."); 340 // we got a remote exception, but we can't describe it so just return exn from here 341 ee = new ForeignException(getMemberName(), e); 342 } 343 344 this.member.receiveAbortProcedure(opName, ee); 345 } catch (KeeperException e) { 346 member.controllerConnectionFailure( 347 "Failed to get data for abort znode:" + abortZNode + zkController.getAbortZnode(), e, 348 opName); 349 } catch (InterruptedException e) { 350 LOG.warn("abort already in progress", e); 351 Thread.currentThread().interrupt(); 352 } 353 } 354 355 @Override 356 public void start(final String memberName, final ProcedureMember listener) { 357 LOG.debug("Starting procedure member '" + memberName + "'"); 358 this.member = listener; 359 this.memberName = memberName; 360 watchForAbortedProcedures(); 361 waitForNewProcedures(); 362 } 363 364 @Override 365 public void close() throws IOException { 366 zkController.close(); 367 } 368 369}