001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.nio.charset.StandardCharsets; 023import java.util.Arrays; 024import java.util.List; 025import org.apache.hadoop.hbase.errorhandling.ForeignException; 026import org.apache.hadoop.hbase.zookeeper.ZKUtil; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.zookeeper.KeeperException; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 035 036/** 037 * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator} 038 */ 039@InterfaceAudience.Private 040public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs { 041 private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureCoordinator.class); 042 private ZKProcedureUtil zkProc = null; 043 protected ProcedureCoordinator coordinator = null; // if started this should be non-null 044 045 ZKWatcher watcher; 046 String procedureType; 047 String coordName; 048 049 /** 050 * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()} 051 * @param procedureClass procedure type name is a category for when there are multiple kinds of 052 * procedures.-- this becomes a znode so be aware of the naming restrictions 053 * @param coordName name of the node running the coordinator 054 * @throws KeeperException if an unexpected zk error occurs 055 */ 056 public ZKProcedureCoordinator(ZKWatcher watcher, String procedureClass, String coordName) { 057 this.watcher = watcher; 058 this.procedureType = procedureClass; 059 this.coordName = coordName; 060 } 061 062 /** 063 * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes 064 * appear, first acquire to relevant listener or sets watch waiting for notification of the 065 * acquire node 066 * @param proc the Procedure 067 * @param info data to be stored in the acquire node 068 * @param nodeNames children of the acquire phase 069 * @throws IOException if any failure occurs. 070 */ 071 @Override 072 final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames) 073 throws IOException, IllegalArgumentException { 074 String procName = proc.getName(); 075 // start watching for the abort node 076 String abortNode = zkProc.getAbortZNode(procName); 077 try { 078 // check to see if the abort node already exists 079 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) { 080 abort(abortNode); 081 } 082 // If we get an abort node watch triggered here, we'll go complete creating the acquired 083 // znode but then handle the acquire znode and bail out 084 } catch (KeeperException e) { 085 String msg = "Failed while watching abort node:" + abortNode; 086 LOG.error(msg, e); 087 throw new IOException(msg, e); 088 } 089 090 // create the acquire barrier 091 String acquire = zkProc.getAcquiredBarrierNode(procName); 092 LOG.debug("Creating acquire znode:" + acquire); 093 try { 094 // notify all the procedure listeners to look for the acquire node 095 byte[] data = ProtobufUtil.prependPBMagic(info); 096 ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data); 097 // loop through all the children of the acquire phase and watch for them 098 for (String node : nodeNames) { 099 String znode = ZNodePaths.joinZNode(acquire, node); 100 LOG.debug("Watching for acquire node:" + znode); 101 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { 102 coordinator.memberAcquiredBarrier(procName, node); 103 } 104 } 105 } catch (KeeperException e) { 106 String msg = "Failed while creating acquire node:" + acquire; 107 LOG.error(msg, e); 108 throw new IOException(msg, e); 109 } 110 } 111 112 @Override 113 public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException { 114 String procName = proc.getName(); 115 String reachedNode = zkProc.getReachedBarrierNode(procName); 116 LOG.debug("Creating reached barrier zk node:" + reachedNode); 117 try { 118 // create the reached znode and watch for the reached znodes 119 ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode); 120 // loop through all the children of the acquire phase and watch for them 121 for (String node : nodeNames) { 122 String znode = ZNodePaths.joinZNode(reachedNode, node); 123 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { 124 byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode); 125 // ProtobufUtil.isPBMagicPrefix will check null 126 if (dataFromMember != null && dataFromMember.length > 0) { 127 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { 128 String msg = 129 "Failed to get data from finished node or data is illegally formatted: " + znode; 130 LOG.error(msg); 131 throw new IOException(msg); 132 } else { 133 dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), 134 dataFromMember.length); 135 coordinator.memberFinishedBarrier(procName, node, dataFromMember); 136 } 137 } else { 138 coordinator.memberFinishedBarrier(procName, node, dataFromMember); 139 } 140 } 141 } 142 } catch (KeeperException e) { 143 String msg = "Failed while creating reached node:" + reachedNode; 144 LOG.error(msg, e); 145 throw new IOException(msg, e); 146 } catch (InterruptedException e) { 147 String msg = "Interrupted while creating reached node:" + reachedNode; 148 LOG.error(msg, e); 149 throw new InterruptedIOException(msg); 150 } 151 } 152 153 /** 154 * Delete znodes that are no longer in use. 155 */ 156 @Override 157 final public void resetMembers(Procedure proc) throws IOException { 158 String procName = proc.getName(); 159 boolean stillGettingNotifications = false; 160 do { 161 try { 162 LOG.debug("Attempting to clean out zk node for op:" + procName); 163 zkProc.clearZNodes(procName); 164 stillGettingNotifications = false; 165 } catch (KeeperException.NotEmptyException e) { 166 // recursive delete isn't transactional (yet) so we need to deal with cases where we get 167 // children trickling in 168 stillGettingNotifications = true; 169 } catch (KeeperException e) { 170 String msg = "Failed to complete reset procedure " + procName; 171 LOG.error(msg, e); 172 throw new IOException(msg, e); 173 } 174 } while (stillGettingNotifications); 175 } 176 177 /** 178 * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about. 179 * @return true if succeed, false if encountered initialization errors. 180 */ 181 @Override 182 final public boolean start(final ProcedureCoordinator coordinator) { 183 if (this.coordinator != null) { 184 throw new IllegalStateException( 185 "ZKProcedureCoordinator already started and already has listener installed"); 186 } 187 this.coordinator = coordinator; 188 189 try { 190 this.zkProc = new ZKProcedureUtil(watcher, procedureType) { 191 @Override 192 public void nodeCreated(String path) { 193 if (!isInProcedurePath(path)) return; 194 LOG.debug("Node created: " + path); 195 logZKTree(this.baseZNode); 196 if (isAcquiredPathNode(path)) { 197 // node wasn't present when we created the watch so zk event triggers acquire 198 coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), 199 ZKUtil.getNodeName(path)); 200 } else if (isReachedPathNode(path)) { 201 // node was absent when we created the watch so zk event triggers the finished barrier. 202 203 // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order. 204 String procName = ZKUtil.getNodeName(ZKUtil.getParent(path)); 205 String member = ZKUtil.getNodeName(path); 206 // get the data from the procedure member 207 try { 208 byte[] dataFromMember = ZKUtil.getData(watcher, path); 209 // ProtobufUtil.isPBMagicPrefix will check null 210 if (dataFromMember != null && dataFromMember.length > 0) { 211 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { 212 ForeignException ee = new ForeignException(coordName, 213 "Failed to get data from finished node or data is illegally formatted:" + path); 214 coordinator.abortProcedure(procName, ee); 215 } else { 216 dataFromMember = Arrays.copyOfRange(dataFromMember, 217 ProtobufUtil.lengthOfPBMagic(), dataFromMember.length); 218 LOG.debug("Finished data from procedure '{}' member '{}': {}", procName, member, 219 new String(dataFromMember, StandardCharsets.UTF_8)); 220 coordinator.memberFinishedBarrier(procName, member, dataFromMember); 221 } 222 } else { 223 coordinator.memberFinishedBarrier(procName, member, dataFromMember); 224 } 225 } catch (KeeperException e) { 226 ForeignException ee = new ForeignException(coordName, e); 227 coordinator.abortProcedure(procName, ee); 228 } catch (InterruptedException e) { 229 ForeignException ee = new ForeignException(coordName, e); 230 coordinator.abortProcedure(procName, ee); 231 } 232 } else if (isAbortPathNode(path)) { 233 abort(path); 234 } else { 235 LOG.debug("Ignoring created notification for node:" + path); 236 } 237 } 238 }; 239 zkProc.clearChildZNodes(); 240 } catch (KeeperException e) { 241 LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e); 242 return false; 243 } 244 245 LOG.debug("Starting controller for procedure member=" + coordName); 246 return true; 247 } 248 249 /** 250 * This is the abort message being sent by the coordinator to member TODO this code isn't actually 251 * used but can be used to issue a cancellation from the coordinator. 252 */ 253 @Override 254 final public void sendAbortToMembers(Procedure proc, ForeignException ee) { 255 String procName = proc.getName(); 256 LOG.debug("Aborting procedure '" + procName + "' in zk"); 257 String procAbortNode = zkProc.getAbortZNode(procName); 258 try { 259 LOG.debug("Creating abort znode:" + procAbortNode); 260 String source = (ee.getSource() == null) ? coordName : ee.getSource(); 261 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee)); 262 // first create the znode for the procedure 263 ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo); 264 LOG.debug("Finished creating abort node:" + procAbortNode); 265 } catch (KeeperException e) { 266 // possible that we get this error for the procedure if we already reset the zk state, but in 267 // that case we should still get an error for that procedure anyways 268 zkProc.logZKTree(zkProc.baseZNode); 269 coordinator.rpcConnectionFailure( 270 "Failed to post zk node:" + procAbortNode + " to abort procedure '" + procName + "'", 271 new IOException(e)); 272 } 273 } 274 275 /** 276 * Receive a notification and propagate it to the local coordinator 277 * @param abortNode full znode path to the failed procedure information 278 */ 279 protected void abort(String abortNode) { 280 String procName = ZKUtil.getNodeName(abortNode); 281 ForeignException ee = null; 282 try { 283 byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode); 284 if (data == null || data.length == 0) { 285 // ignore 286 return; 287 } else if (!ProtobufUtil.isPBMagicPrefix(data)) { 288 LOG.warn("Got an error notification for op:" + abortNode 289 + " but we can't read the information. Killing the procedure."); 290 // we got a remote exception, but we can't describe it 291 ee = new ForeignException(coordName, 292 "Data in abort node is illegally formatted. ignoring content."); 293 } else { 294 295 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 296 ee = ForeignException.deserialize(data); 297 } 298 } catch (IOException e) { 299 LOG.warn("Got an error notification for op:" + abortNode 300 + " but we can't read the information. Killing the procedure."); 301 // we got a remote exception, but we can't describe it 302 ee = new ForeignException(coordName, e); 303 } catch (KeeperException e) { 304 coordinator.rpcConnectionFailure( 305 "Failed to get data for abort node:" + abortNode + zkProc.getAbortZnode(), 306 new IOException(e)); 307 } catch (InterruptedException e) { 308 coordinator.rpcConnectionFailure( 309 "Failed to get data for abort node:" + abortNode + zkProc.getAbortZnode(), 310 new IOException(e)); 311 Thread.currentThread().interrupt(); 312 } 313 coordinator.abortProcedure(procName, ee); 314 } 315 316 @Override 317 final public void close() throws IOException { 318 zkProc.close(); 319 } 320 321 /** 322 * Used in testing 323 */ 324 final ZKProcedureUtil getZkProcedureUtil() { 325 return zkProc; 326 } 327}