001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.nio.charset.StandardCharsets; 023import java.util.Arrays; 024import java.util.List; 025 026import org.apache.hadoop.hbase.errorhandling.ForeignException; 027import org.apache.hadoop.hbase.zookeeper.ZKUtil; 028import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 029import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.zookeeper.KeeperException; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 036 037/** 038 * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator} 039 */ 040@InterfaceAudience.Private 041public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs { 042 private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureCoordinator.class); 043 private ZKProcedureUtil zkProc = null; 044 protected ProcedureCoordinator coordinator = null; // if started this should be non-null 045 046 ZKWatcher watcher; 047 String procedureType; 048 String coordName; 049 050 /** 051 * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()} 052 * @param procedureClass procedure type name is a category for when there are multiple kinds of 053 * procedures.-- this becomes a znode so be aware of the naming restrictions 054 * @param coordName name of the node running the coordinator 055 * @throws KeeperException if an unexpected zk error occurs 056 */ 057 public ZKProcedureCoordinator(ZKWatcher watcher, 058 String procedureClass, String coordName) { 059 this.watcher = watcher; 060 this.procedureType = procedureClass; 061 this.coordName = coordName; 062 } 063 064 /** 065 * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes 066 * appear, first acquire to relevant listener or sets watch waiting for notification of 067 * the acquire node 068 * 069 * @param proc the Procedure 070 * @param info data to be stored in the acquire node 071 * @param nodeNames children of the acquire phase 072 * @throws IOException if any failure occurs. 073 */ 074 @Override 075 final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames) 076 throws IOException, IllegalArgumentException { 077 String procName = proc.getName(); 078 // start watching for the abort node 079 String abortNode = zkProc.getAbortZNode(procName); 080 try { 081 // check to see if the abort node already exists 082 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) { 083 abort(abortNode); 084 } 085 // If we get an abort node watch triggered here, we'll go complete creating the acquired 086 // znode but then handle the acquire znode and bail out 087 } catch (KeeperException e) { 088 String msg = "Failed while watching abort node:" + abortNode; 089 LOG.error(msg, e); 090 throw new IOException(msg, e); 091 } 092 093 // create the acquire barrier 094 String acquire = zkProc.getAcquiredBarrierNode(procName); 095 LOG.debug("Creating acquire znode:" + acquire); 096 try { 097 // notify all the procedure listeners to look for the acquire node 098 byte[] data = ProtobufUtil.prependPBMagic(info); 099 ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data); 100 // loop through all the children of the acquire phase and watch for them 101 for (String node : nodeNames) { 102 String znode = ZNodePaths.joinZNode(acquire, node); 103 LOG.debug("Watching for acquire node:" + znode); 104 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { 105 coordinator.memberAcquiredBarrier(procName, node); 106 } 107 } 108 } catch (KeeperException e) { 109 String msg = "Failed while creating acquire node:" + acquire; 110 LOG.error(msg, e); 111 throw new IOException(msg, e); 112 } 113 } 114 115 @Override 116 public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException { 117 String procName = proc.getName(); 118 String reachedNode = zkProc.getReachedBarrierNode(procName); 119 LOG.debug("Creating reached barrier zk node:" + reachedNode); 120 try { 121 // create the reached znode and watch for the reached znodes 122 ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode); 123 // loop through all the children of the acquire phase and watch for them 124 for (String node : nodeNames) { 125 String znode = ZNodePaths.joinZNode(reachedNode, node); 126 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { 127 byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode); 128 // ProtobufUtil.isPBMagicPrefix will check null 129 if (dataFromMember != null && dataFromMember.length > 0) { 130 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { 131 String msg = 132 "Failed to get data from finished node or data is illegally formatted: " + znode; 133 LOG.error(msg); 134 throw new IOException(msg); 135 } else { 136 dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), 137 dataFromMember.length); 138 coordinator.memberFinishedBarrier(procName, node, dataFromMember); 139 } 140 } else { 141 coordinator.memberFinishedBarrier(procName, node, dataFromMember); 142 } 143 } 144 } 145 } catch (KeeperException e) { 146 String msg = "Failed while creating reached node:" + reachedNode; 147 LOG.error(msg, e); 148 throw new IOException(msg, e); 149 } catch (InterruptedException e) { 150 String msg = "Interrupted while creating reached node:" + reachedNode; 151 LOG.error(msg, e); 152 throw new InterruptedIOException(msg); 153 } 154 } 155 156 157 /** 158 * Delete znodes that are no longer in use. 159 */ 160 @Override 161 final public void resetMembers(Procedure proc) throws IOException { 162 String procName = proc.getName(); 163 boolean stillGettingNotifications = false; 164 do { 165 try { 166 LOG.debug("Attempting to clean out zk node for op:" + procName); 167 zkProc.clearZNodes(procName); 168 stillGettingNotifications = false; 169 } catch (KeeperException.NotEmptyException e) { 170 // recursive delete isn't transactional (yet) so we need to deal with cases where we get 171 // children trickling in 172 stillGettingNotifications = true; 173 } catch (KeeperException e) { 174 String msg = "Failed to complete reset procedure " + procName; 175 LOG.error(msg, e); 176 throw new IOException(msg, e); 177 } 178 } while (stillGettingNotifications); 179 } 180 181 /** 182 * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about. 183 * @return true if succeed, false if encountered initialization errors. 184 */ 185 @Override 186 final public boolean start(final ProcedureCoordinator coordinator) { 187 if (this.coordinator != null) { 188 throw new IllegalStateException( 189 "ZKProcedureCoordinator already started and already has listener installed"); 190 } 191 this.coordinator = coordinator; 192 193 try { 194 this.zkProc = new ZKProcedureUtil(watcher, procedureType) { 195 @Override 196 public void nodeCreated(String path) { 197 if (!isInProcedurePath(path)) return; 198 LOG.debug("Node created: " + path); 199 logZKTree(this.baseZNode); 200 if (isAcquiredPathNode(path)) { 201 // node wasn't present when we created the watch so zk event triggers acquire 202 coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), 203 ZKUtil.getNodeName(path)); 204 } else if (isReachedPathNode(path)) { 205 // node was absent when we created the watch so zk event triggers the finished barrier. 206 207 // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order. 208 String procName = ZKUtil.getNodeName(ZKUtil.getParent(path)); 209 String member = ZKUtil.getNodeName(path); 210 // get the data from the procedure member 211 try { 212 byte[] dataFromMember = ZKUtil.getData(watcher, path); 213 // ProtobufUtil.isPBMagicPrefix will check null 214 if (dataFromMember != null && dataFromMember.length > 0) { 215 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { 216 ForeignException ee = new ForeignException(coordName, 217 "Failed to get data from finished node or data is illegally formatted:" 218 + path); 219 coordinator.abortProcedure(procName, ee); 220 } else { 221 dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), 222 dataFromMember.length); 223 LOG.debug("Finished data from procedure '{}' member '{}': {}", procName, member, 224 new String(dataFromMember, StandardCharsets.UTF_8)); 225 coordinator.memberFinishedBarrier(procName, member, dataFromMember); 226 } 227 } else { 228 coordinator.memberFinishedBarrier(procName, member, dataFromMember); 229 } 230 } catch (KeeperException e) { 231 ForeignException ee = new ForeignException(coordName, e); 232 coordinator.abortProcedure(procName, ee); 233 } catch (InterruptedException e) { 234 ForeignException ee = new ForeignException(coordName, e); 235 coordinator.abortProcedure(procName, ee); 236 } 237 } else if (isAbortPathNode(path)) { 238 abort(path); 239 } else { 240 LOG.debug("Ignoring created notification for node:" + path); 241 } 242 } 243 }; 244 zkProc.clearChildZNodes(); 245 } catch (KeeperException e) { 246 LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e); 247 return false; 248 } 249 250 LOG.debug("Starting controller for procedure member=" + coordName); 251 return true; 252 } 253 254 /** 255 * This is the abort message being sent by the coordinator to member 256 * 257 * TODO this code isn't actually used but can be used to issue a cancellation from the 258 * coordinator. 259 */ 260 @Override 261 final public void sendAbortToMembers(Procedure proc, ForeignException ee) { 262 String procName = proc.getName(); 263 LOG.debug("Aborting procedure '" + procName + "' in zk"); 264 String procAbortNode = zkProc.getAbortZNode(procName); 265 try { 266 LOG.debug("Creating abort znode:" + procAbortNode); 267 String source = (ee.getSource() == null) ? coordName : ee.getSource(); 268 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee)); 269 // first create the znode for the procedure 270 ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo); 271 LOG.debug("Finished creating abort node:" + procAbortNode); 272 } catch (KeeperException e) { 273 // possible that we get this error for the procedure if we already reset the zk state, but in 274 // that case we should still get an error for that procedure anyways 275 zkProc.logZKTree(zkProc.baseZNode); 276 coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode 277 + " to abort procedure '" + procName + "'", new IOException(e)); 278 } 279 } 280 281 /** 282 * Receive a notification and propagate it to the local coordinator 283 * @param abortNode full znode path to the failed procedure information 284 */ 285 protected void abort(String abortNode) { 286 String procName = ZKUtil.getNodeName(abortNode); 287 ForeignException ee = null; 288 try { 289 byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode); 290 if (data == null || data.length == 0) { 291 // ignore 292 return; 293 } else if (!ProtobufUtil.isPBMagicPrefix(data)) { 294 LOG.warn("Got an error notification for op:" + abortNode 295 + " but we can't read the information. Killing the procedure."); 296 // we got a remote exception, but we can't describe it 297 ee = new ForeignException(coordName, 298 "Data in abort node is illegally formatted. ignoring content."); 299 } else { 300 301 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); 302 ee = ForeignException.deserialize(data); 303 } 304 } catch (IOException e) { 305 LOG.warn("Got an error notification for op:" + abortNode 306 + " but we can't read the information. Killing the procedure."); 307 // we got a remote exception, but we can't describe it 308 ee = new ForeignException(coordName, e); 309 } catch (KeeperException e) { 310 coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode 311 + zkProc.getAbortZnode(), new IOException(e)); 312 } catch (InterruptedException e) { 313 coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode 314 + zkProc.getAbortZnode(), new IOException(e)); 315 Thread.currentThread().interrupt(); 316 } 317 coordinator.abortProcedure(procName, ee); 318 } 319 320 @Override 321 final public void close() throws IOException { 322 zkProc.close(); 323 } 324 325 /** 326 * Used in testing 327 */ 328 final ZKProcedureUtil getZkProcedureUtil() { 329 return zkProc; 330 } 331}