001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.List; 023 024import org.apache.hadoop.hbase.zookeeper.ZKListener; 025import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.hadoop.hbase.zookeeper.ZKUtil; 028import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 029import org.apache.zookeeper.KeeperException; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * This is a shared ZooKeeper-based znode management utils for distributed procedure. All znode 035 * operations should go through the provided methods in coordinators and members. 036 * 037 * Layout of nodes in ZK is 038 * /hbase/[op name]/acquired/ 039 * [op instance] - op data/ 040 * /[nodes that have acquired] 041 * /reached/ 042 * [op instance]/ 043 * /[nodes that have completed] 044 * /abort/ 045 * [op instance] - failure data 046 * 047 * NOTE: while acquired and completed are znode dirs, abort is actually just a znode. 048 * 049 * Assumption here that procedure names are unique 050 */ 051@InterfaceAudience.Private 052public abstract class ZKProcedureUtil 053 extends ZKListener implements Closeable { 054 055 private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureUtil.class); 056 057 public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired"; 058 public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached"; 059 public static final String ABORT_ZNODE_DEFAULT = "abort"; 060 061 public final String baseZNode; 062 protected final String acquiredZnode; 063 protected final String reachedZnode; 064 protected final String abortZnode; 065 066 /** 067 * Top-level watcher/controller for procedures across the cluster. 068 * <p> 069 * On instantiation, this ensures the procedure znodes exist. This however requires the passed in 070 * watcher has been started. 071 * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via 072 * {@link #close()} 073 * @param procDescription name of the znode describing the procedure to run 074 * @throws KeeperException when the procedure znodes cannot be created 075 */ 076 public ZKProcedureUtil(ZKWatcher watcher, String procDescription) 077 throws KeeperException { 078 super(watcher); 079 // make sure we are listening for events 080 watcher.registerListener(this); 081 // setup paths for the zknodes used in procedures 082 this.baseZNode = ZNodePaths.joinZNode(watcher.znodePaths.baseZNode, procDescription); 083 acquiredZnode = ZNodePaths.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT); 084 reachedZnode = ZNodePaths.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT); 085 abortZnode = ZNodePaths.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT); 086 087 // first make sure all the ZK nodes exist 088 // make sure all the parents exist (sometimes not the case in tests) 089 ZKUtil.createWithParents(watcher, acquiredZnode); 090 // regular create because all the parents exist 091 ZKUtil.createAndFailSilent(watcher, reachedZnode); 092 ZKUtil.createAndFailSilent(watcher, abortZnode); 093 } 094 095 @Override 096 public void close() throws IOException { 097 // the watcher is passed from either Master or Region Server 098 // watcher.close() will be called by the owner so no need to call close() here 099 } 100 101 public String getAcquiredBarrierNode(String opInstanceName) { 102 return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName); 103 } 104 105 public String getReachedBarrierNode(String opInstanceName) { 106 return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName); 107 } 108 109 public String getAbortZNode(String opInstanceName) { 110 return ZKProcedureUtil.getAbortNode(this, opInstanceName); 111 } 112 113 public String getAbortZnode() { 114 return abortZnode; 115 } 116 117 public String getBaseZnode() { 118 return baseZNode; 119 } 120 121 public String getAcquiredBarrier() { 122 return acquiredZnode; 123 } 124 125 /** 126 * Get the full znode path for the node used by the coordinator to trigger a global barrier 127 * acquire on each subprocedure. 128 * @param controller controller running the procedure 129 * @param opInstanceName name of the running procedure instance (not the procedure description). 130 * @return full znode path to the prepare barrier/start node 131 */ 132 public static String getAcquireBarrierNode(ZKProcedureUtil controller, 133 String opInstanceName) { 134 return ZNodePaths.joinZNode(controller.acquiredZnode, opInstanceName); 135 } 136 137 /** 138 * Get the full znode path for the node used by the coordinator to trigger a global barrier 139 * execution and release on each subprocedure. 140 * @param controller controller running the procedure 141 * @param opInstanceName name of the running procedure instance (not the procedure description). 142 * @return full znode path to the commit barrier 143 */ 144 public static String getReachedBarrierNode(ZKProcedureUtil controller, 145 String opInstanceName) { 146 return ZNodePaths.joinZNode(controller.reachedZnode, opInstanceName); 147 } 148 149 /** 150 * Get the full znode path for the node used by the coordinator or member to trigger an abort 151 * of the global barrier acquisition or execution in subprocedures. 152 * @param controller controller running the procedure 153 * @param opInstanceName name of the running procedure instance (not the procedure description). 154 * @return full znode path to the abort znode 155 */ 156 public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) { 157 return ZNodePaths.joinZNode(controller.abortZnode, opInstanceName); 158 } 159 160 @Override 161 public ZKWatcher getWatcher() { 162 return watcher; 163 } 164 165 /** 166 * Is this a procedure related znode path? 167 * 168 * TODO: this is not strict, can return true if had name just starts with same prefix but is 169 * different zdir. 170 * 171 * @return true if starts with baseZnode 172 */ 173 boolean isInProcedurePath(String path) { 174 return path.startsWith(baseZNode); 175 } 176 177 /** 178 * Is this the exact procedure barrier acquired znode 179 */ 180 boolean isAcquiredNode(String path) { 181 return path.equals(acquiredZnode); 182 } 183 184 185 /** 186 * Is this in the procedure barrier acquired znode path 187 */ 188 boolean isAcquiredPathNode(String path) { 189 return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode) && 190 isMemberNode(path, acquiredZnode); 191 } 192 193 /** 194 * Is this the exact procedure barrier reached znode 195 */ 196 boolean isReachedNode(String path) { 197 return path.equals(reachedZnode); 198 } 199 200 /** 201 * Is this in the procedure barrier reached znode path 202 */ 203 boolean isReachedPathNode(String path) { 204 return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode) && 205 isMemberNode(path, reachedZnode); 206 } 207 208 /* 209 * Returns true if the specified path is a member of the "statePath" 210 * /hbase/<ProcName>/<state>/<instance>/member 211 * |------ state path -----| 212 * |------------------ path ------------------| 213 */ 214 private boolean isMemberNode(final String path, final String statePath) { 215 int count = 0; 216 for (int i = statePath.length(); i < path.length(); ++i) { 217 count += (path.charAt(i) == ZNodePaths.ZNODE_PATH_SEPARATOR) ? 1 : 0; 218 } 219 return count == 2; 220 } 221 222 /** 223 * Is this in the procedure barrier abort znode path 224 */ 225 boolean isAbortNode(String path) { 226 return path.equals(abortZnode); 227 } 228 229 /** 230 * Is this in the procedure barrier abort znode path 231 */ 232 public boolean isAbortPathNode(String path) { 233 return path.startsWith(this.abortZnode) && !path.equals(abortZnode); 234 } 235 236 // -------------------------------------------------------------------------- 237 // internal debugging methods 238 // -------------------------------------------------------------------------- 239 /** 240 * Recursively print the current state of ZK (non-transactional) 241 * @param root name of the root directory in zk to print 242 * @throws KeeperException 243 */ 244 void logZKTree(String root) { 245 if (!LOG.isDebugEnabled()) return; 246 LOG.debug("Current zk system:"); 247 String prefix = "|-"; 248 LOG.debug(prefix + root); 249 try { 250 logZKTree(root, prefix); 251 } catch (KeeperException e) { 252 throw new RuntimeException(e); 253 } 254 } 255 256 /** 257 * Helper method to print the current state of the ZK tree. 258 * @see #logZKTree(String) 259 * @throws KeeperException if an unexpected exception occurs 260 */ 261 protected void logZKTree(String root, String prefix) throws KeeperException { 262 List<String> children = ZKUtil.listChildrenNoWatch(watcher, root); 263 if (children == null) return; 264 for (String child : children) { 265 LOG.debug(prefix + child); 266 String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child); 267 logZKTree(node, prefix + "---"); 268 } 269 } 270 271 public void clearChildZNodes() throws KeeperException { 272 LOG.debug("Clearing all znodes {}, {}, {}", acquiredZnode, reachedZnode, abortZnode); 273 274 // If the coordinator was shutdown mid-procedure, then we are going to lose 275 // an procedure that was previously started by cleaning out all the previous state. Its much 276 // harder to figure out how to keep an procedure going and the subject of HBASE-5487. 277 ZKUtil.deleteChildrenRecursivelyMultiOrSequential(watcher, true, acquiredZnode, reachedZnode, 278 abortZnode); 279 280 if (LOG.isTraceEnabled()) { 281 logZKTree(this.baseZNode); 282 } 283 } 284 285 public void clearZNodes(String procedureName) throws KeeperException { 286 LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes " 287 + acquiredZnode + " " + reachedZnode + " " + abortZnode); 288 289 // Make sure we trigger the watches on these nodes by creating them. (HBASE-13885) 290 String acquiredBarrierNode = getAcquiredBarrierNode(procedureName); 291 String reachedBarrierNode = getReachedBarrierNode(procedureName); 292 String abortZNode = getAbortZNode(procedureName); 293 294 ZKUtil.createAndFailSilent(watcher, acquiredBarrierNode); 295 ZKUtil.createAndFailSilent(watcher, abortZNode); 296 297 ZKUtil.deleteNodeRecursivelyMultiOrSequential(watcher, true, acquiredBarrierNode, 298 reachedBarrierNode, abortZNode); 299 300 if (LOG.isTraceEnabled()) { 301 logZKTree(this.baseZNode); 302 } 303 } 304}