001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.List; 023import org.apache.hadoop.hbase.zookeeper.ZKListener; 024import org.apache.hadoop.hbase.zookeeper.ZKUtil; 025import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 026import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.zookeeper.KeeperException; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * This is a shared ZooKeeper-based znode management utils for distributed procedure. All znode 034 * operations should go through the provided methods in coordinators and members. Layout of nodes in 035 * ZK is /hbase/[op name]/acquired/ [op instance] - op data/ /[nodes that have acquired] /reached/ 036 * [op instance]/ /[nodes that have completed] /abort/ [op instance] - failure data NOTE: while 037 * acquired and completed are znode dirs, abort is actually just a znode. Assumption here that 038 * procedure names are unique 039 */ 040@InterfaceAudience.Private 041public abstract class ZKProcedureUtil extends ZKListener implements Closeable { 042 043 private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureUtil.class); 044 045 public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired"; 046 public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached"; 047 public static final String ABORT_ZNODE_DEFAULT = "abort"; 048 049 public final String baseZNode; 050 protected final String acquiredZnode; 051 protected final String reachedZnode; 052 protected final String abortZnode; 053 054 /** 055 * Top-level watcher/controller for procedures across the cluster. 056 * <p> 057 * On instantiation, this ensures the procedure znodes exist. This however requires the passed in 058 * watcher has been started. 059 * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via 060 * {@link #close()} 061 * @param procDescription name of the znode describing the procedure to run 062 * @throws KeeperException when the procedure znodes cannot be created 063 */ 064 public ZKProcedureUtil(ZKWatcher watcher, String procDescription) throws KeeperException { 065 super(watcher); 066 // make sure we are listening for events 067 watcher.registerListener(this); 068 // setup paths for the zknodes used in procedures 069 this.baseZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, procDescription); 070 acquiredZnode = ZNodePaths.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT); 071 reachedZnode = ZNodePaths.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT); 072 abortZnode = ZNodePaths.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT); 073 074 // first make sure all the ZK nodes exist 075 // make sure all the parents exist (sometimes not the case in tests) 076 ZKUtil.createWithParents(watcher, acquiredZnode); 077 // regular create because all the parents exist 078 ZKUtil.createAndFailSilent(watcher, reachedZnode); 079 ZKUtil.createAndFailSilent(watcher, abortZnode); 080 } 081 082 @Override 083 public void close() throws IOException { 084 // the watcher is passed from either Master or Region Server 085 // watcher.close() will be called by the owner so no need to call close() here 086 } 087 088 public String getAcquiredBarrierNode(String opInstanceName) { 089 return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName); 090 } 091 092 public String getReachedBarrierNode(String opInstanceName) { 093 return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName); 094 } 095 096 public String getAbortZNode(String opInstanceName) { 097 return ZKProcedureUtil.getAbortNode(this, opInstanceName); 098 } 099 100 public String getAbortZnode() { 101 return abortZnode; 102 } 103 104 public String getBaseZnode() { 105 return baseZNode; 106 } 107 108 public String getAcquiredBarrier() { 109 return acquiredZnode; 110 } 111 112 /** 113 * Get the full znode path for the node used by the coordinator to trigger a global barrier 114 * acquire on each subprocedure. 115 * @param controller controller running the procedure 116 * @param opInstanceName name of the running procedure instance (not the procedure description). 117 * @return full znode path to the prepare barrier/start node 118 */ 119 public static String getAcquireBarrierNode(ZKProcedureUtil controller, String opInstanceName) { 120 return ZNodePaths.joinZNode(controller.acquiredZnode, opInstanceName); 121 } 122 123 /** 124 * Get the full znode path for the node used by the coordinator to trigger a global barrier 125 * execution and release on each subprocedure. 126 * @param controller controller running the procedure 127 * @param opInstanceName name of the running procedure instance (not the procedure description). 128 * @return full znode path to the commit barrier 129 */ 130 public static String getReachedBarrierNode(ZKProcedureUtil controller, String opInstanceName) { 131 return ZNodePaths.joinZNode(controller.reachedZnode, opInstanceName); 132 } 133 134 /** 135 * Get the full znode path for the node used by the coordinator or member to trigger an abort of 136 * the global barrier acquisition or execution in subprocedures. 137 * @param controller controller running the procedure 138 * @param opInstanceName name of the running procedure instance (not the procedure description). 139 * @return full znode path to the abort znode 140 */ 141 public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) { 142 return ZNodePaths.joinZNode(controller.abortZnode, opInstanceName); 143 } 144 145 @Override 146 public ZKWatcher getWatcher() { 147 return watcher; 148 } 149 150 /** 151 * Is this a procedure related znode path? TODO: this is not strict, can return true if had name 152 * just starts with same prefix but is different zdir. 153 * @return true if starts with baseZnode 154 */ 155 boolean isInProcedurePath(String path) { 156 return path.startsWith(baseZNode); 157 } 158 159 /** 160 * Is this the exact procedure barrier acquired znode 161 */ 162 boolean isAcquiredNode(String path) { 163 return path.equals(acquiredZnode); 164 } 165 166 /** 167 * Is this in the procedure barrier acquired znode path 168 */ 169 boolean isAcquiredPathNode(String path) { 170 return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode) 171 && isMemberNode(path, acquiredZnode); 172 } 173 174 /** 175 * Is this the exact procedure barrier reached znode 176 */ 177 boolean isReachedNode(String path) { 178 return path.equals(reachedZnode); 179 } 180 181 /** 182 * Is this in the procedure barrier reached znode path 183 */ 184 boolean isReachedPathNode(String path) { 185 return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode) 186 && isMemberNode(path, reachedZnode); 187 } 188 189 /* 190 * Returns true if the specified path is a member of the "statePath" 191 * /hbase/<ProcName>/<state>/<instance>/member |------ state path -----| |------------------ path 192 * ------------------| 193 */ 194 private boolean isMemberNode(final String path, final String statePath) { 195 int count = 0; 196 for (int i = statePath.length(); i < path.length(); ++i) { 197 count += (path.charAt(i) == ZNodePaths.ZNODE_PATH_SEPARATOR) ? 1 : 0; 198 } 199 return count == 2; 200 } 201 202 /** 203 * Is this in the procedure barrier abort znode path 204 */ 205 boolean isAbortNode(String path) { 206 return path.equals(abortZnode); 207 } 208 209 /** 210 * Is this in the procedure barrier abort znode path 211 */ 212 public boolean isAbortPathNode(String path) { 213 return path.startsWith(this.abortZnode) && !path.equals(abortZnode); 214 } 215 216 // -------------------------------------------------------------------------- 217 // internal debugging methods 218 // -------------------------------------------------------------------------- 219 /** 220 * Recursively print the current state of ZK (non-transactional) 221 * @param root name of the root directory in zk to print 222 */ 223 void logZKTree(String root) { 224 if (!LOG.isDebugEnabled()) return; 225 LOG.debug("Current zk system:"); 226 String prefix = "|-"; 227 LOG.debug(prefix + root); 228 try { 229 logZKTree(root, prefix); 230 } catch (KeeperException e) { 231 throw new RuntimeException(e); 232 } 233 } 234 235 /** 236 * Helper method to print the current state of the ZK tree. 237 * @see #logZKTree(String) 238 * @throws KeeperException if an unexpected exception occurs 239 */ 240 protected void logZKTree(String root, String prefix) throws KeeperException { 241 List<String> children = ZKUtil.listChildrenNoWatch(watcher, root); 242 if (children == null) return; 243 for (String child : children) { 244 LOG.debug(prefix + child); 245 String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child); 246 logZKTree(node, prefix + "---"); 247 } 248 } 249 250 public void clearChildZNodes() throws KeeperException { 251 LOG.debug("Clearing all znodes {}, {}, {}", acquiredZnode, reachedZnode, abortZnode); 252 253 // If the coordinator was shutdown mid-procedure, then we are going to lose 254 // an procedure that was previously started by cleaning out all the previous state. Its much 255 // harder to figure out how to keep an procedure going and the subject of HBASE-5487. 256 ZKUtil.deleteChildrenRecursivelyMultiOrSequential(watcher, true, acquiredZnode, reachedZnode, 257 abortZnode); 258 259 if (LOG.isTraceEnabled()) { 260 logZKTree(this.baseZNode); 261 } 262 } 263 264 public void clearZNodes(String procedureName) throws KeeperException { 265 LOG.info("Clearing all znodes for procedure " + procedureName + " including nodes " 266 + acquiredZnode + " " + reachedZnode + " " + abortZnode); 267 268 // Make sure we trigger the watches on these nodes by creating them. (HBASE-13885) 269 String acquiredBarrierNode = getAcquiredBarrierNode(procedureName); 270 String reachedBarrierNode = getReachedBarrierNode(procedureName); 271 String abortZNode = getAbortZNode(procedureName); 272 273 ZKUtil.createAndFailSilent(watcher, acquiredBarrierNode); 274 ZKUtil.createAndFailSilent(watcher, abortZNode); 275 276 ZKUtil.deleteNodeRecursivelyMultiOrSequential(watcher, true, acquiredBarrierNode, 277 reachedBarrierNode, abortZNode); 278 279 if (LOG.isTraceEnabled()) { 280 logZKTree(this.baseZNode); 281 } 282 } 283}