View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
28  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
29  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
30  import org.apache.zookeeper.KeeperException;
31  
32  /**
33   * This is a shared ZooKeeper-based znode management utils for distributed procedure.  All znode
34   * operations should go through the provided methods in coordinators and members.
35   *
36   * Layout of nodes in ZK is
37   * /hbase/[op name]/acquired/
38   *                    [op instance] - op data/
39   *                        /[nodes that have acquired]
40   *                 /reached/
41   *                    [op instance]/
42   *                        /[nodes that have completed]
43   *                 /abort/
44   *                    [op instance] - failure data
45   *
46   * NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
47   *
48   * Assumption here that procedure names are unique
49   */
50  @InterfaceAudience.Private
51  public abstract class ZKProcedureUtil
52      extends ZooKeeperListener implements Closeable {
53  
54    private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
55  
56    public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
57    public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
58    public static final String ABORT_ZNODE_DEFAULT = "abort";
59  
60    public final String baseZNode;
61    protected final String acquiredZnode;
62    protected final String reachedZnode;
63    protected final String abortZnode;
64  
65    /**
66     * Top-level watcher/controller for procedures across the cluster.
67     * <p>
68     * On instantiation, this ensures the procedure znodes exist.  This however requires the passed in
69     *  watcher has been started.
70     * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
71     *          {@link #close()}
72     * @param procDescription name of the znode describing the procedure to run
73     * @throws KeeperException when the procedure znodes cannot be created
74     */
75    public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription)
76        throws KeeperException {
77      super(watcher);
78      // make sure we are listening for events
79      watcher.registerListener(this);
80      // setup paths for the zknodes used in procedures
81      this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
82      acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
83      reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
84      abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
85  
86      // first make sure all the ZK nodes exist
87      // make sure all the parents exist (sometimes not the case in tests)
88      ZKUtil.createWithParents(watcher, acquiredZnode);
89      // regular create because all the parents exist
90      ZKUtil.createAndFailSilent(watcher, reachedZnode);
91      ZKUtil.createAndFailSilent(watcher, abortZnode);
92    }
93  
94    @Override
95    public void close() throws IOException {
96      // the watcher is passed from either Master or Region Server
97      // watcher.close() will be called by the owner so no need to call close() here
98    }
99  
100   public String getAcquiredBarrierNode(String opInstanceName) {
101     return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
102   }
103 
104   public String getReachedBarrierNode(String opInstanceName) {
105     return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
106   }
107 
108   public String getAbortZNode(String opInstanceName) {
109     return ZKProcedureUtil.getAbortNode(this, opInstanceName);
110   }
111 
112   public String getAbortZnode() {
113     return abortZnode;
114   }
115 
116   public String getBaseZnode() {
117     return baseZNode;
118   }
119 
120   public String getAcquiredBarrier() {
121     return acquiredZnode;
122   }
123 
124   /**
125    * Get the full znode path for the node used by the coordinator to trigger a global barrier
126    * acquire on each subprocedure.
127    * @param controller controller running the procedure
128    * @param opInstanceName name of the running procedure instance (not the procedure description).
129    * @return full znode path to the prepare barrier/start node
130    */
131   public static String getAcquireBarrierNode(ZKProcedureUtil controller,
132       String opInstanceName) {
133     return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
134   }
135 
136   /**
137    * Get the full znode path for the node used by the coordinator to trigger a global barrier
138    * execution and release on each subprocedure.
139    * @param controller controller running the procedure
140    * @param opInstanceName name of the running procedure instance (not the procedure description).
141    * @return full znode path to the commit barrier
142    */
143   public static String getReachedBarrierNode(ZKProcedureUtil controller,
144       String opInstanceName) {
145     return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
146   }
147 
148   /**
149    * Get the full znode path for the node used by the coordinator or member to trigger an abort
150    * of the global barrier acquisition or execution in subprocedures.
151    * @param controller controller running the procedure
152    * @param opInstanceName name of the running procedure instance (not the procedure description).
153    * @return full znode path to the abort znode
154    */
155   public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
156     return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
157   }
158 
159   public ZooKeeperWatcher getWatcher() {
160     return watcher;
161   }
162 
163   /**
164    * Is this a procedure related znode path?
165    *
166    * TODO: this is not strict, can return true if had name just starts with same prefix but is
167    * different zdir.
168    *
169    * @return true if starts with baseZnode
170    */
171   boolean isInProcedurePath(String path) {
172     return path.startsWith(baseZNode);
173   }
174 
175   /**
176    * Is this the exact procedure barrier acquired znode
177    */
178   boolean isAcquiredNode(String path) {
179     return path.equals(acquiredZnode);
180   }
181 
182 
183   /**
184    * Is this in the procedure barrier acquired znode path
185    */
186   boolean isAcquiredPathNode(String path) {
187     return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode) &&
188       isMemberNode(path, acquiredZnode);
189   }
190 
191   /**
192    * Is this the exact procedure barrier reached znode
193    */
194   boolean isReachedNode(String path) {
195     return path.equals(reachedZnode);
196   }
197 
198   /**
199    * Is this in the procedure barrier reached znode path
200    */
201   boolean isReachedPathNode(String path) {
202     return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode) &&
203       isMemberNode(path, reachedZnode);
204   }
205 
206   /*
207    * Returns true if the specified path is a member of the "statePath"
208    *      /hbase/<ProcName>/<state>/<instance>/member
209    *      |------ state path -----|
210    *      |------------------ path ------------------|
211    */
212   private boolean isMemberNode(final String path, final String statePath) {
213     int count = 0;
214     for (int i = statePath.length(); i < path.length(); ++i) {
215       count += (path.charAt(i) == ZKUtil.ZNODE_PATH_SEPARATOR) ? 1 : 0;
216     }
217     return count == 2;
218   }
219 
220   /**
221    * Is this in the procedure barrier abort znode path
222    */
223   boolean isAbortNode(String path) {
224     return path.equals(abortZnode);
225   }
226 
227   /**
228    * Is this in the procedure barrier abort znode path
229    */
230   public boolean isAbortPathNode(String path) {
231     return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
232   }
233 
234   // --------------------------------------------------------------------------
235   // internal debugging methods
236   // --------------------------------------------------------------------------
237   /**
238    * Recursively print the current state of ZK (non-transactional)
239    * @param root name of the root directory in zk to print
240    * @throws KeeperException
241    */
242   void logZKTree(String root) {
243     if (!LOG.isDebugEnabled()) return;
244     LOG.debug("Current zk system:");
245     String prefix = "|-";
246     LOG.debug(prefix + root);
247     try {
248       logZKTree(root, prefix);
249     } catch (KeeperException e) {
250       throw new RuntimeException(e);
251     }
252   }
253 
254   /**
255    * Helper method to print the current state of the ZK tree.
256    * @see #logZKTree(String)
257    * @throws KeeperException if an unexpected exception occurs
258    */
259   protected void logZKTree(String root, String prefix) throws KeeperException {
260     List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
261     if (children == null) return;
262     for (String child : children) {
263       LOG.debug(prefix + child);
264       String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
265       logZKTree(node, prefix + "---");
266     }
267   }
268 
269   public void clearChildZNodes() throws KeeperException {
270     // TODO This is potentially racy since not atomic. update when we support zk that has multi
271     LOG.info("Clearing all procedure znodes: " + acquiredZnode + " " + reachedZnode + " "
272         + abortZnode);
273 
274     // If the coordinator was shutdown mid-procedure, then we are going to lose
275     // an procedure that was previously started by cleaning out all the previous state. Its much
276     // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
277     ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode);
278     ZKUtil.deleteChildrenRecursively(watcher, reachedZnode);
279     ZKUtil.deleteChildrenRecursively(watcher, abortZnode);
280   }
281 
282   public void clearZNodes(String procedureName) throws KeeperException {
283     // TODO This is potentially racy since not atomic. update when we support zk that has multi
284     LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
285         + acquiredZnode + " " + reachedZnode + " " + abortZnode);
286     ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
287     ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
288     ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
289   }
290 }