001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023 024import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.hadoop.hbase.errorhandling.ForeignException; 027import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.zookeeper.ZKUtil; 030import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 031import org.apache.zookeeper.KeeperException; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * ZooKeeper based controller for a procedure member. 037 * <p> 038 * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member, 039 * since each procedure type is bound to a single set of znodes. You can have multiple 040 * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member 041 * name, but each individual rpcs is still bound to a single member name (and since they are 042 * used to determine global progress, its important to not get this wrong). 043 * <p> 044 * To make this slightly more confusing, you can run multiple, concurrent procedures at the same 045 * time (as long as they have different types), from the same controller, but the same node name 046 * must be used for each procedure (though there is no conflict between the two procedure as long 047 * as they have distinct names). 048 * <p> 049 * There is no real error recovery with this mechanism currently -- if any the coordinator fails, 050 * its re-initialization will delete the znodes and require all in progress subprocedures to start 051 * anew. 052 */ 053@InterfaceAudience.Private 054public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs { 055 private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureMemberRpcs.class); 056 057 private final ZKProcedureUtil zkController; 058 059 protected ProcedureMember member; 060 private String memberName; 061 062 /** 063 * Must call {@link #start(String, ProcedureMember)} before this can be used. 064 * @param watcher {@link ZKWatcher} to be owned by <tt>this</tt>. Closed via 065 * {@link #close()}. 066 * @param procType name of the znode describing the procedure type 067 * @throws KeeperException if we can't reach zookeeper 068 */ 069 public ZKProcedureMemberRpcs(final ZKWatcher watcher, final String procType) 070 throws KeeperException { 071 this.zkController = new ZKProcedureUtil(watcher, procType) { 072 @Override 073 public void nodeCreated(String path) { 074 if (!isInProcedurePath(path)) { 075 return; 076 } 077 078 LOG.info("Received created event:" + path); 079 // if it is a simple start/end/abort then we just rewatch the node 080 if (isAcquiredNode(path)) { 081 waitForNewProcedures(); 082 return; 083 } else if (isAbortNode(path)) { 084 watchForAbortedProcedures(); 085 return; 086 } 087 String parent = ZKUtil.getParent(path); 088 // if its the end barrier, the procedure can be completed 089 if (isReachedNode(parent)) { 090 receivedReachedGlobalBarrier(path); 091 return; 092 } else if (isAbortNode(parent)) { 093 abort(path); 094 return; 095 } else if (isAcquiredNode(parent)) { 096 startNewSubprocedure(path); 097 } else { 098 LOG.debug("Ignoring created notification for node:" + path); 099 } 100 } 101 102 @Override 103 public void nodeChildrenChanged(String path) { 104 if (path.equals(this.acquiredZnode)) { 105 LOG.info("Received procedure start children changed event: " + path); 106 waitForNewProcedures(); 107 } else if (path.equals(this.abortZnode)) { 108 LOG.info("Received procedure abort children changed event: " + path); 109 watchForAbortedProcedures(); 110 } 111 } 112 }; 113 } 114 115 public ZKProcedureUtil getZkController() { 116 return zkController; 117 } 118 119 @Override 120 public String getMemberName() { 121 return memberName; 122 } 123 124 /** 125 * Pass along the procedure global barrier notification to any listeners 126 * @param path full znode path that cause the notification 127 */ 128 private void receivedReachedGlobalBarrier(String path) { 129 LOG.debug("Received reached global barrier:" + path); 130 String procName = ZKUtil.getNodeName(path); 131 this.member.receivedReachedGlobalBarrier(procName); 132 } 133 134 private void watchForAbortedProcedures() { 135 LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'"); 136 try { 137 // this is the list of the currently aborted procedues 138 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), 139 zkController.getAbortZnode()); 140 if (children == null || children.isEmpty()) { 141 return; 142 } 143 for (String node : children) { 144 String abortNode = ZNodePaths.joinZNode(zkController.getAbortZnode(), node); 145 abort(abortNode); 146 } 147 } catch (KeeperException e) { 148 member.controllerConnectionFailure("Failed to list children for abort node:" 149 + zkController.getAbortZnode(), e, null); 150 } 151 } 152 153 private void waitForNewProcedures() { 154 // watch for new procedues that we need to start subprocedures for 155 LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'"); 156 List<String> runningProcedures = null; 157 try { 158 runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), 159 zkController.getAcquiredBarrier()); 160 if (runningProcedures == null) { 161 LOG.debug("No running procedures."); 162 return; 163 } 164 } catch (KeeperException e) { 165 member.controllerConnectionFailure("General failure when watching for new procedures", 166 e, null); 167 } 168 if (runningProcedures == null) { 169 LOG.debug("No running procedures."); 170 return; 171 } 172 for (String procName : runningProcedures) { 173 // then read in the procedure information 174 String path = ZNodePaths.joinZNode(zkController.getAcquiredBarrier(), procName); 175 startNewSubprocedure(path); 176 } 177 } 178 179 /** 180 * Kick off a new sub-procedure on the listener with the data stored in the passed znode. 181 * <p> 182 * Will attempt to create the same procedure multiple times if an procedure znode with the same 183 * name is created. It is left up the coordinator to ensure this doesn't occur. 184 * @param path full path to the znode for the procedure to start 185 */ 186 private synchronized void startNewSubprocedure(String path) { 187 LOG.debug("Found procedure znode: " + path); 188 String opName = ZKUtil.getNodeName(path); 189 // start watching for an abort notification for the procedure 190 String abortZNode = zkController.getAbortZNode(opName); 191 try { 192 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) { 193 LOG.debug("Not starting:" + opName + " because we already have an abort notification."); 194 return; 195 } 196 } catch (KeeperException e) { 197 member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode 198 + ") for procedure :" + opName, e, opName); 199 return; 200 } 201 202 // get the data for the procedure 203 Subprocedure subproc = null; 204 try { 205 byte[] data = ZKUtil.getData(zkController.getWatcher(), path); 206 if (!ProtobufUtil.isPBMagicPrefix(data)) { 207 String msg = "Data in for starting procedure " + opName + 208 " is illegally formatted (no pb magic). " + 209 "Killing the procedure: " + Bytes.toString(data); 210 LOG.error(msg); 211 throw new IllegalArgumentException(msg); 212 } 213 LOG.debug("start proc data length is " + data.length); 214 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 215 LOG.debug("Found data for znode:" + path); 216 subproc = member.createSubprocedure(opName, data); 217 member.submitSubprocedure(subproc); 218 } catch (IllegalArgumentException iae ) { 219 LOG.error("Illegal argument exception", iae); 220 sendMemberAborted(subproc, new ForeignException(getMemberName(), iae)); 221 } catch (IllegalStateException ise) { 222 LOG.error("Illegal state exception ", ise); 223 sendMemberAborted(subproc, new ForeignException(getMemberName(), ise)); 224 } catch (KeeperException e) { 225 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, 226 e, opName); 227 } catch (InterruptedException e) { 228 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, 229 e, opName); 230 Thread.currentThread().interrupt(); 231 } 232 } 233 234 /** 235 * This attempts to create an acquired state znode for the procedure (snapshot name). 236 * 237 * It then looks for the reached znode to trigger in-barrier execution. If not present we 238 * have a watcher, if present then trigger the in-barrier action. 239 */ 240 @Override 241 public void sendMemberAcquired(Subprocedure sub) throws IOException { 242 String procName = sub.getName(); 243 try { 244 LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName 245 + ") in zk"); 246 String acquiredZNode = ZNodePaths.joinZNode(ZKProcedureUtil.getAcquireBarrierNode( 247 zkController, procName), memberName); 248 ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode); 249 250 // watch for the complete node for this snapshot 251 String reachedBarrier = zkController.getReachedBarrierNode(procName); 252 LOG.debug("Watch for global barrier reached:" + reachedBarrier); 253 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) { 254 receivedReachedGlobalBarrier(reachedBarrier); 255 } 256 } catch (KeeperException e) { 257 member.controllerConnectionFailure("Failed to acquire barrier for procedure: " 258 + procName + " and member: " + memberName, e, procName); 259 } 260 } 261 262 /** 263 * This acts as the ack for a completed procedure 264 */ 265 @Override 266 public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException { 267 String procName = sub.getName(); 268 LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName 269 + "' in zk"); 270 String joinPath = 271 ZNodePaths.joinZNode(zkController.getReachedBarrierNode(procName), memberName); 272 // ProtobufUtil.prependPBMagic does not take care of null 273 if (data == null) { 274 data = new byte[0]; 275 } 276 try { 277 ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath, 278 ProtobufUtil.prependPBMagic(data)); 279 } catch (KeeperException e) { 280 member.controllerConnectionFailure("Failed to post zk node:" + joinPath 281 + " to join procedure barrier.", e, procName); 282 } 283 } 284 285 /** 286 * This should be called by the member and should write a serialized root cause exception as 287 * to the abort znode. 288 */ 289 @Override 290 public void sendMemberAborted(Subprocedure sub, ForeignException ee) { 291 if (sub == null) { 292 LOG.error("Failed due to null subprocedure", ee); 293 return; 294 } 295 String procName = sub.getName(); 296 LOG.debug("Aborting procedure (" + procName + ") in zk"); 297 String procAbortZNode = zkController.getAbortZNode(procName); 298 try { 299 String source = (ee.getSource() == null) ? memberName: ee.getSource(); 300 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee)); 301 ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo); 302 LOG.debug("Finished creating abort znode:" + procAbortZNode); 303 } catch (KeeperException e) { 304 // possible that we get this error for the procedure if we already reset the zk state, but in 305 // that case we should still get an error for that procedure anyways 306 zkController.logZKTree(zkController.getBaseZnode()); 307 member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode 308 + " to abort procedure", e, procName); 309 } 310 } 311 312 /** 313 * Pass along the found abort notification to the listener 314 * @param abortZNode full znode path to the failed procedure information 315 */ 316 protected void abort(String abortZNode) { 317 LOG.debug("Aborting procedure member for znode " + abortZNode); 318 String opName = ZKUtil.getNodeName(abortZNode); 319 try { 320 byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode); 321 322 // figure out the data we need to pass 323 ForeignException ee; 324 try { 325 if (data == null || data.length == 0) { 326 // ignore 327 return; 328 } else if (!ProtobufUtil.isPBMagicPrefix(data)) { 329 String msg = "Illegally formatted data in abort node for proc " + opName 330 + ". Killing the procedure."; 331 LOG.error(msg); 332 // we got a remote exception, but we can't describe it so just return exn from here 333 ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg)); 334 } else { 335 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 336 ee = ForeignException.deserialize(data); 337 } 338 } catch (IOException e) { 339 LOG.warn("Got an error notification for op:" + opName 340 + " but we can't read the information. Killing the procedure."); 341 // we got a remote exception, but we can't describe it so just return exn from here 342 ee = new ForeignException(getMemberName(), e); 343 } 344 345 this.member.receiveAbortProcedure(opName, ee); 346 } catch (KeeperException e) { 347 member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode 348 + zkController.getAbortZnode(), e, opName); 349 } catch (InterruptedException e) { 350 LOG.warn("abort already in progress", e); 351 Thread.currentThread().interrupt(); 352 } 353 } 354 355 @Override 356 public void start(final String memberName, final ProcedureMember listener) { 357 LOG.debug("Starting procedure member '" + memberName + "'"); 358 this.member = listener; 359 this.memberName = memberName; 360 watchForAbortedProcedures(); 361 waitForNewProcedures(); 362 } 363 364 @Override 365 public void close() throws IOException { 366 zkController.close(); 367 } 368 369}