View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
29  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
30  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
31  import org.apache.zookeeper.KeeperException;
32  
33  /**
34   * This is a shared ZooKeeper-based znode management utils for distributed procedure.  All znode
35   * operations should go through the provided methods in coordinators and members.
36   *
37   * Layout of nodes in ZK is
38   * /hbase/[op name]/acquired/
39   *                    [op instance] - op data/
40   *                        /[nodes that have acquired]
41   *                 /reached/
42   *                    [op instance]/
43   *                        /[nodes that have completed]
44   *                 /abort/
45   *                    [op instance] - failure data
46   *
47   * NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
48   *
49   * Assumption here that procedure names are unique
50   */
51  @InterfaceAudience.Public
52  @InterfaceStability.Evolving
53  public abstract class ZKProcedureUtil
54      extends ZooKeeperListener implements Closeable {
55  
56    private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
57  
58    public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
59    public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
60    public static final String ABORT_ZNODE_DEFAULT = "abort";
61  
62    public final String baseZNode;
63    protected final String acquiredZnode;
64    protected final String reachedZnode;
65    protected final String abortZnode;
66  
67    /**
68     * Top-level watcher/controller for procedures across the cluster.
69     * <p>
70     * On instantiation, this ensures the procedure znodes exist.  This however requires the passed in
71     *  watcher has been started.
72     * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
73     *          {@link #close()}
74     * @param procDescription name of the znode describing the procedure to run
75     * @throws KeeperException when the procedure znodes cannot be created
76     */
77    public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription)
78        throws KeeperException {
79      super(watcher);
80      // make sure we are listening for events
81      watcher.registerListener(this);
82      // setup paths for the zknodes used in procedures
83      this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
84      acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
85      reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
86      abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
87  
88      // first make sure all the ZK nodes exist
89      // make sure all the parents exist (sometimes not the case in tests)
90      ZKUtil.createWithParents(watcher, acquiredZnode);
91      // regular create because all the parents exist
92      ZKUtil.createAndFailSilent(watcher, reachedZnode);
93      ZKUtil.createAndFailSilent(watcher, abortZnode);
94    }
95  
96    @Override
97    public void close() throws IOException {
98      // the watcher is passed from either Master or Region Server
99      // watcher.close() will be called by the owner so no need to call close() here
100   }
101 
102   public String getAcquiredBarrierNode(String opInstanceName) {
103     return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
104   }
105 
106   public String getReachedBarrierNode(String opInstanceName) {
107     return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
108   }
109 
110   public String getAbortZNode(String opInstanceName) {
111     return ZKProcedureUtil.getAbortNode(this, opInstanceName);
112   }
113 
114   public String getAbortZnode() {
115     return abortZnode;
116   }
117 
118   public String getBaseZnode() {
119     return baseZNode;
120   }
121 
122   public String getAcquiredBarrier() {
123     return acquiredZnode;
124   }
125 
126   /**
127    * Get the full znode path for the node used by the coordinator to trigger a global barrier
128    * acquire on each subprocedure.
129    * @param controller controller running the procedure
130    * @param opInstanceName name of the running procedure instance (not the procedure description).
131    * @return full znode path to the prepare barrier/start node
132    */
133   public static String getAcquireBarrierNode(ZKProcedureUtil controller,
134       String opInstanceName) {
135     return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
136   }
137 
138   /**
139    * Get the full znode path for the node used by the coordinator to trigger a global barrier
140    * execution and release on each subprocedure.
141    * @param controller controller running the procedure
142    * @param opInstanceName name of the running procedure instance (not the procedure description).
143    * @return full znode path to the commit barrier
144    */
145   public static String getReachedBarrierNode(ZKProcedureUtil controller,
146       String opInstanceName) {
147     return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
148   }
149 
150   /**
151    * Get the full znode path for the node used by the coordinator or member to trigger an abort
152    * of the global barrier acquisition or execution in subprocedures.
153    * @param controller controller running the procedure
154    * @param opInstanceName name of the running procedure instance (not the procedure description).
155    * @return full znode path to the abort znode
156    */
157   public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
158     return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
159   }
160 
161   public ZooKeeperWatcher getWatcher() {
162     return watcher;
163   }
164 
165   /**
166    * Is this a procedure related znode path?
167    *
168    * TODO: this is not strict, can return true if had name just starts with same prefix but is
169    * different zdir.
170    *
171    * @return true if starts with baseZnode
172    */
173   boolean isInProcedurePath(String path) {
174     return path.startsWith(baseZNode);
175   }
176 
177   /**
178    * Is this the exact procedure barrier acquired znode
179    */
180   boolean isAcquiredNode(String path) {
181     return path.equals(acquiredZnode);
182   }
183 
184 
185   /**
186    * Is this in the procedure barrier acquired znode path
187    */
188   boolean isAcquiredPathNode(String path) {
189     return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode) &&
190       isMemberNode(path, acquiredZnode);
191   }
192 
193   /**
194    * Is this the exact procedure barrier reached znode
195    */
196   boolean isReachedNode(String path) {
197     return path.equals(reachedZnode);
198   }
199 
200   /**
201    * Is this in the procedure barrier reached znode path
202    */
203   boolean isReachedPathNode(String path) {
204     return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode) &&
205       isMemberNode(path, reachedZnode);
206   }
207 
208   /*
209    * Returns true if the specified path is a member of the "statePath"
210    *      /hbase/<ProcName>/<state>/<instance>/member
211    *      |------ state path -----|
212    *      |------------------ path ------------------|
213    */
214   private boolean isMemberNode(final String path, final String statePath) {
215     int count = 0;
216     for (int i = statePath.length(); i < path.length(); ++i) {
217       count += (path.charAt(i) == ZKUtil.ZNODE_PATH_SEPARATOR) ? 1 : 0;
218     }
219     return count == 2;
220   }
221 
222   /**
223    * Is this in the procedure barrier abort znode path
224    */
225   boolean isAbortNode(String path) {
226     return path.equals(abortZnode);
227   }
228 
229   /**
230    * Is this in the procedure barrier abort znode path
231    */
232   public boolean isAbortPathNode(String path) {
233     return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
234   }
235 
236   // --------------------------------------------------------------------------
237   // internal debugging methods
238   // --------------------------------------------------------------------------
239   /**
240    * Recursively print the current state of ZK (non-transactional)
241    * @param root name of the root directory in zk to print
242    * @throws KeeperException
243    */
244   void logZKTree(String root) {
245     if (!LOG.isDebugEnabled()) return;
246     LOG.debug("Current zk system:");
247     String prefix = "|-";
248     LOG.debug(prefix + root);
249     try {
250       logZKTree(root, prefix);
251     } catch (KeeperException e) {
252       throw new RuntimeException(e);
253     }
254   }
255 
256   /**
257    * Helper method to print the current state of the ZK tree.
258    * @see #logZKTree(String)
259    * @throws KeeperException if an unexpected exception occurs
260    */
261   protected void logZKTree(String root, String prefix) throws KeeperException {
262     List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
263     if (children == null) return;
264     for (String child : children) {
265       LOG.debug(prefix + child);
266       String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
267       logZKTree(node, prefix + "---");
268     }
269   }
270 
271   public void clearChildZNodes() throws KeeperException {
272     // TODO This is potentially racy since not atomic. update when we support zk that has multi
273     LOG.info("Clearing all procedure znodes: " + acquiredZnode + " " + reachedZnode + " "
274         + abortZnode);
275 
276     // If the coordinator was shutdown mid-procedure, then we are going to lose
277     // an procedure that was previously started by cleaning out all the previous state. Its much
278     // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
279     ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode);
280     ZKUtil.deleteChildrenRecursively(watcher, reachedZnode);
281     ZKUtil.deleteChildrenRecursively(watcher, abortZnode);
282   }
283 
284   public void clearZNodes(String procedureName) throws KeeperException {
285     // TODO This is potentially racy since not atomic. update when we support zk that has multi
286     LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
287         + acquiredZnode + " " + reachedZnode + " " + abortZnode);
288     ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
289     ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
290     ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
291   }
292 }