001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023 024import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.hadoop.hbase.errorhandling.ForeignException; 027import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.zookeeper.ZKUtil; 030import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 031import org.apache.zookeeper.KeeperException; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * ZooKeeper based controller for a procedure member. 037 * <p> 038 * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member, 039 * since each procedure type is bound to a single set of znodes. You can have multiple 040 * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member 041 * name, but each individual rpcs is still bound to a single member name (and since they are 042 * used to determine global progress, its important to not get this wrong). 043 * <p> 044 * To make this slightly more confusing, you can run multiple, concurrent procedures at the same 045 * time (as long as they have different types), from the same controller, but the same node name 046 * must be used for each procedure (though there is no conflict between the two procedure as long 047 * as they have distinct names). 048 * <p> 049 * There is no real error recovery with this mechanism currently -- if any the coordinator fails, 050 * its re-initialization will delete the znodes and require all in progress subprocedures to start 051 * anew. 052 */ 053@InterfaceAudience.Private 054public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs { 055 private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureMemberRpcs.class); 056 057 private final ZKProcedureUtil zkController; 058 059 protected ProcedureMember member; 060 private String memberName; 061 062 /** 063 * Must call {@link #start(String, ProcedureMember)} before this can be used. 064 * @param watcher {@link ZKWatcher} to be owned by <tt>this</tt>. Closed via 065 * {@link #close()}. 066 * @param procType name of the znode describing the procedure type 067 * @throws KeeperException if we can't reach zookeeper 068 */ 069 public ZKProcedureMemberRpcs(final ZKWatcher watcher, final String procType) 070 throws KeeperException { 071 this.zkController = new ZKProcedureUtil(watcher, procType) { 072 @Override 073 public void nodeCreated(String path) { 074 if (!isInProcedurePath(path)) { 075 return; 076 } 077 078 LOG.info("Received created event:" + path); 079 // if it is a simple start/end/abort then we just rewatch the node 080 if (isAcquiredNode(path)) { 081 waitForNewProcedures(); 082 return; 083 } else if (isAbortNode(path)) { 084 watchForAbortedProcedures(); 085 return; 086 } 087 String parent = ZKUtil.getParent(path); 088 // if its the end barrier, the procedure can be completed 089 if (isReachedNode(parent)) { 090 receivedReachedGlobalBarrier(path); 091 return; 092 } else if (isAbortNode(parent)) { 093 abort(path); 094 return; 095 } else if (isAcquiredNode(parent)) { 096 startNewSubprocedure(path); 097 } else { 098 LOG.debug("Ignoring created notification for node:" + path); 099 } 100 } 101 102 @Override 103 public void nodeChildrenChanged(String path) { 104 if (path.equals(this.acquiredZnode)) { 105 LOG.info("Received procedure start children changed event: " + path); 106 waitForNewProcedures(); 107 } else if (path.equals(this.abortZnode)) { 108 LOG.info("Received procedure abort children changed event: " + path); 109 watchForAbortedProcedures(); 110 } 111 } 112 }; 113 } 114 115 public ZKProcedureUtil getZkController() { 116 return zkController; 117 } 118 119 @Override 120 public String getMemberName() { 121 return memberName; 122 } 123 124 /** 125 * Pass along the procedure global barrier notification to any listeners 126 * @param path full znode path that cause the notification 127 */ 128 private void receivedReachedGlobalBarrier(String path) { 129 LOG.debug("Received reached global barrier:" + path); 130 String procName = ZKUtil.getNodeName(path); 131 this.member.receivedReachedGlobalBarrier(procName); 132 } 133 134 private void watchForAbortedProcedures() { 135 LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'"); 136 try { 137 // this is the list of the currently aborted procedues 138 for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), 139 zkController.getAbortZnode())) { 140 String abortNode = ZNodePaths.joinZNode(zkController.getAbortZnode(), node); 141 abort(abortNode); 142 } 143 } catch (KeeperException e) { 144 member.controllerConnectionFailure("Failed to list children for abort node:" 145 + zkController.getAbortZnode(), e, null); 146 } 147 } 148 149 private void waitForNewProcedures() { 150 // watch for new procedues that we need to start subprocedures for 151 LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'"); 152 List<String> runningProcedures = null; 153 try { 154 runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), 155 zkController.getAcquiredBarrier()); 156 if (runningProcedures == null) { 157 LOG.debug("No running procedures."); 158 return; 159 } 160 } catch (KeeperException e) { 161 member.controllerConnectionFailure("General failure when watching for new procedures", 162 e, null); 163 } 164 if (runningProcedures == null) { 165 LOG.debug("No running procedures."); 166 return; 167 } 168 for (String procName : runningProcedures) { 169 // then read in the procedure information 170 String path = ZNodePaths.joinZNode(zkController.getAcquiredBarrier(), procName); 171 startNewSubprocedure(path); 172 } 173 } 174 175 /** 176 * Kick off a new sub-procedure on the listener with the data stored in the passed znode. 177 * <p> 178 * Will attempt to create the same procedure multiple times if an procedure znode with the same 179 * name is created. It is left up the coordinator to ensure this doesn't occur. 180 * @param path full path to the znode for the procedure to start 181 */ 182 private synchronized void startNewSubprocedure(String path) { 183 LOG.debug("Found procedure znode: " + path); 184 String opName = ZKUtil.getNodeName(path); 185 // start watching for an abort notification for the procedure 186 String abortZNode = zkController.getAbortZNode(opName); 187 try { 188 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) { 189 LOG.debug("Not starting:" + opName + " because we already have an abort notification."); 190 return; 191 } 192 } catch (KeeperException e) { 193 member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode 194 + ") for procedure :" + opName, e, opName); 195 return; 196 } 197 198 // get the data for the procedure 199 Subprocedure subproc = null; 200 try { 201 byte[] data = ZKUtil.getData(zkController.getWatcher(), path); 202 if (!ProtobufUtil.isPBMagicPrefix(data)) { 203 String msg = "Data in for starting procedure " + opName + 204 " is illegally formatted (no pb magic). " + 205 "Killing the procedure: " + Bytes.toString(data); 206 LOG.error(msg); 207 throw new IllegalArgumentException(msg); 208 } 209 LOG.debug("start proc data length is " + data.length); 210 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 211 LOG.debug("Found data for znode:" + path); 212 subproc = member.createSubprocedure(opName, data); 213 member.submitSubprocedure(subproc); 214 } catch (IllegalArgumentException iae ) { 215 LOG.error("Illegal argument exception", iae); 216 sendMemberAborted(subproc, new ForeignException(getMemberName(), iae)); 217 } catch (IllegalStateException ise) { 218 LOG.error("Illegal state exception ", ise); 219 sendMemberAborted(subproc, new ForeignException(getMemberName(), ise)); 220 } catch (KeeperException e) { 221 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, 222 e, opName); 223 } catch (InterruptedException e) { 224 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, 225 e, opName); 226 Thread.currentThread().interrupt(); 227 } 228 } 229 230 /** 231 * This attempts to create an acquired state znode for the procedure (snapshot name). 232 * 233 * It then looks for the reached znode to trigger in-barrier execution. If not present we 234 * have a watcher, if present then trigger the in-barrier action. 235 */ 236 @Override 237 public void sendMemberAcquired(Subprocedure sub) throws IOException { 238 String procName = sub.getName(); 239 try { 240 LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName 241 + ") in zk"); 242 String acquiredZNode = ZNodePaths.joinZNode(ZKProcedureUtil.getAcquireBarrierNode( 243 zkController, procName), memberName); 244 ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode); 245 246 // watch for the complete node for this snapshot 247 String reachedBarrier = zkController.getReachedBarrierNode(procName); 248 LOG.debug("Watch for global barrier reached:" + reachedBarrier); 249 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) { 250 receivedReachedGlobalBarrier(reachedBarrier); 251 } 252 } catch (KeeperException e) { 253 member.controllerConnectionFailure("Failed to acquire barrier for procedure: " 254 + procName + " and member: " + memberName, e, procName); 255 } 256 } 257 258 /** 259 * This acts as the ack for a completed procedure 260 */ 261 @Override 262 public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException { 263 String procName = sub.getName(); 264 LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName 265 + "' in zk"); 266 String joinPath = 267 ZNodePaths.joinZNode(zkController.getReachedBarrierNode(procName), memberName); 268 // ProtobufUtil.prependPBMagic does not take care of null 269 if (data == null) { 270 data = new byte[0]; 271 } 272 try { 273 ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath, 274 ProtobufUtil.prependPBMagic(data)); 275 } catch (KeeperException e) { 276 member.controllerConnectionFailure("Failed to post zk node:" + joinPath 277 + " to join procedure barrier.", e, procName); 278 } 279 } 280 281 /** 282 * This should be called by the member and should write a serialized root cause exception as 283 * to the abort znode. 284 */ 285 @Override 286 public void sendMemberAborted(Subprocedure sub, ForeignException ee) { 287 if (sub == null) { 288 LOG.error("Failed due to null subprocedure", ee); 289 return; 290 } 291 String procName = sub.getName(); 292 LOG.debug("Aborting procedure (" + procName + ") in zk"); 293 String procAbortZNode = zkController.getAbortZNode(procName); 294 try { 295 String source = (ee.getSource() == null) ? memberName: ee.getSource(); 296 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee)); 297 ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo); 298 LOG.debug("Finished creating abort znode:" + procAbortZNode); 299 } catch (KeeperException e) { 300 // possible that we get this error for the procedure if we already reset the zk state, but in 301 // that case we should still get an error for that procedure anyways 302 zkController.logZKTree(zkController.getBaseZnode()); 303 member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode 304 + " to abort procedure", e, procName); 305 } 306 } 307 308 /** 309 * Pass along the found abort notification to the listener 310 * @param abortZNode full znode path to the failed procedure information 311 */ 312 protected void abort(String abortZNode) { 313 LOG.debug("Aborting procedure member for znode " + abortZNode); 314 String opName = ZKUtil.getNodeName(abortZNode); 315 try { 316 byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode); 317 318 // figure out the data we need to pass 319 ForeignException ee; 320 try { 321 if (data == null || data.length == 0) { 322 // ignore 323 return; 324 } else if (!ProtobufUtil.isPBMagicPrefix(data)) { 325 String msg = "Illegally formatted data in abort node for proc " + opName 326 + ". Killing the procedure."; 327 LOG.error(msg); 328 // we got a remote exception, but we can't describe it so just return exn from here 329 ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg)); 330 } else { 331 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 332 ee = ForeignException.deserialize(data); 333 } 334 } catch (IOException e) { 335 LOG.warn("Got an error notification for op:" + opName 336 + " but we can't read the information. Killing the procedure."); 337 // we got a remote exception, but we can't describe it so just return exn from here 338 ee = new ForeignException(getMemberName(), e); 339 } 340 341 this.member.receiveAbortProcedure(opName, ee); 342 } catch (KeeperException e) { 343 member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode 344 + zkController.getAbortZnode(), e, opName); 345 } catch (InterruptedException e) { 346 LOG.warn("abort already in progress", e); 347 Thread.currentThread().interrupt(); 348 } 349 } 350 351 @Override 352 public void start(final String memberName, final ProcedureMember listener) { 353 LOG.debug("Starting procedure member '" + memberName + "'"); 354 this.member = listener; 355 this.memberName = memberName; 356 watchForAbortedProcedures(); 357 waitForNewProcedures(); 358 } 359 360 @Override 361 public void close() throws IOException { 362 zkController.close(); 363 } 364 365}