001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.List;
023
024import org.apache.hadoop.hbase.zookeeper.ZKListener;
025import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.hadoop.hbase.zookeeper.ZKUtil;
028import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
029import org.apache.zookeeper.KeeperException;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * This is a shared ZooKeeper-based znode management utils for distributed procedure.  All znode
035 * operations should go through the provided methods in coordinators and members.
036 *
037 * Layout of nodes in ZK is
038 * /hbase/[op name]/acquired/
039 *                    [op instance] - op data/
040 *                        /[nodes that have acquired]
041 *                 /reached/
042 *                    [op instance]/
043 *                        /[nodes that have completed]
044 *                 /abort/
045 *                    [op instance] - failure data
046 *
047 * NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
048 *
049 * Assumption here that procedure names are unique
050 */
051@InterfaceAudience.Private
052public abstract class ZKProcedureUtil
053    extends ZKListener implements Closeable {
054
055  private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureUtil.class);
056
057  public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
058  public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
059  public static final String ABORT_ZNODE_DEFAULT = "abort";
060
061  public final String baseZNode;
062  protected final String acquiredZnode;
063  protected final String reachedZnode;
064  protected final String abortZnode;
065
066  /**
067   * Top-level watcher/controller for procedures across the cluster.
068   * <p>
069   * On instantiation, this ensures the procedure znodes exist.  This however requires the passed in
070   *  watcher has been started.
071   * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
072   *          {@link #close()}
073   * @param procDescription name of the znode describing the procedure to run
074   * @throws KeeperException when the procedure znodes cannot be created
075   */
076  public ZKProcedureUtil(ZKWatcher watcher, String procDescription)
077      throws KeeperException {
078    super(watcher);
079    // make sure we are listening for events
080    watcher.registerListener(this);
081    // setup paths for the zknodes used in procedures
082    this.baseZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, procDescription);
083    acquiredZnode = ZNodePaths.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
084    reachedZnode = ZNodePaths.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
085    abortZnode = ZNodePaths.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
086
087    // first make sure all the ZK nodes exist
088    // make sure all the parents exist (sometimes not the case in tests)
089    ZKUtil.createWithParents(watcher, acquiredZnode);
090    // regular create because all the parents exist
091    ZKUtil.createAndFailSilent(watcher, reachedZnode);
092    ZKUtil.createAndFailSilent(watcher, abortZnode);
093  }
094
095  @Override
096  public void close() throws IOException {
097    // the watcher is passed from either Master or Region Server
098    // watcher.close() will be called by the owner so no need to call close() here
099  }
100
101  public String getAcquiredBarrierNode(String opInstanceName) {
102    return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
103  }
104
105  public String getReachedBarrierNode(String opInstanceName) {
106    return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
107  }
108
109  public String getAbortZNode(String opInstanceName) {
110    return ZKProcedureUtil.getAbortNode(this, opInstanceName);
111  }
112
113  public String getAbortZnode() {
114    return abortZnode;
115  }
116
117  public String getBaseZnode() {
118    return baseZNode;
119  }
120
121  public String getAcquiredBarrier() {
122    return acquiredZnode;
123  }
124
125  /**
126   * Get the full znode path for the node used by the coordinator to trigger a global barrier
127   * acquire on each subprocedure.
128   * @param controller controller running the procedure
129   * @param opInstanceName name of the running procedure instance (not the procedure description).
130   * @return full znode path to the prepare barrier/start node
131   */
132  public static String getAcquireBarrierNode(ZKProcedureUtil controller,
133      String opInstanceName) {
134    return ZNodePaths.joinZNode(controller.acquiredZnode, opInstanceName);
135  }
136
137  /**
138   * Get the full znode path for the node used by the coordinator to trigger a global barrier
139   * execution and release on each subprocedure.
140   * @param controller controller running the procedure
141   * @param opInstanceName name of the running procedure instance (not the procedure description).
142   * @return full znode path to the commit barrier
143   */
144  public static String getReachedBarrierNode(ZKProcedureUtil controller,
145      String opInstanceName) {
146    return ZNodePaths.joinZNode(controller.reachedZnode, opInstanceName);
147  }
148
149  /**
150   * Get the full znode path for the node used by the coordinator or member to trigger an abort
151   * of the global barrier acquisition or execution in subprocedures.
152   * @param controller controller running the procedure
153   * @param opInstanceName name of the running procedure instance (not the procedure description).
154   * @return full znode path to the abort znode
155   */
156  public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
157    return ZNodePaths.joinZNode(controller.abortZnode, opInstanceName);
158  }
159
160  @Override
161  public ZKWatcher getWatcher() {
162    return watcher;
163  }
164
165  /**
166   * Is this a procedure related znode path?
167   *
168   * TODO: this is not strict, can return true if had name just starts with same prefix but is
169   * different zdir.
170   *
171   * @return true if starts with baseZnode
172   */
173  boolean isInProcedurePath(String path) {
174    return path.startsWith(baseZNode);
175  }
176
177  /**
178   * Is this the exact procedure barrier acquired znode
179   */
180  boolean isAcquiredNode(String path) {
181    return path.equals(acquiredZnode);
182  }
183
184
185  /**
186   * Is this in the procedure barrier acquired znode path
187   */
188  boolean isAcquiredPathNode(String path) {
189    return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode) &&
190      isMemberNode(path, acquiredZnode);
191  }
192
193  /**
194   * Is this the exact procedure barrier reached znode
195   */
196  boolean isReachedNode(String path) {
197    return path.equals(reachedZnode);
198  }
199
200  /**
201   * Is this in the procedure barrier reached znode path
202   */
203  boolean isReachedPathNode(String path) {
204    return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode) &&
205      isMemberNode(path, reachedZnode);
206  }
207
208  /*
209   * Returns true if the specified path is a member of the "statePath"
210   *      /hbase/<ProcName>/<state>/<instance>/member
211   *      |------ state path -----|
212   *      |------------------ path ------------------|
213   */
214  private boolean isMemberNode(final String path, final String statePath) {
215    int count = 0;
216    for (int i = statePath.length(); i < path.length(); ++i) {
217      count += (path.charAt(i) == ZNodePaths.ZNODE_PATH_SEPARATOR) ? 1 : 0;
218    }
219    return count == 2;
220  }
221
222  /**
223   * Is this in the procedure barrier abort znode path
224   */
225  boolean isAbortNode(String path) {
226    return path.equals(abortZnode);
227  }
228
229  /**
230   * Is this in the procedure barrier abort znode path
231   */
232  public boolean isAbortPathNode(String path) {
233    return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
234  }
235
236  // --------------------------------------------------------------------------
237  // internal debugging methods
238  // --------------------------------------------------------------------------
239  /**
240   * Recursively print the current state of ZK (non-transactional)
241   * @param root name of the root directory in zk to print
242   * @throws KeeperException
243   */
244  void logZKTree(String root) {
245    if (!LOG.isDebugEnabled()) return;
246    LOG.debug("Current zk system:");
247    String prefix = "|-";
248    LOG.debug(prefix + root);
249    try {
250      logZKTree(root, prefix);
251    } catch (KeeperException e) {
252      throw new RuntimeException(e);
253    }
254  }
255
256  /**
257   * Helper method to print the current state of the ZK tree.
258   * @see #logZKTree(String)
259   * @throws KeeperException if an unexpected exception occurs
260   */
261  protected void logZKTree(String root, String prefix) throws KeeperException {
262    List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
263    if (children == null) return;
264    for (String child : children) {
265      LOG.debug(prefix + child);
266      String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child);
267      logZKTree(node, prefix + "---");
268    }
269  }
270
271  public void clearChildZNodes() throws KeeperException {
272    LOG.debug("Clearing all znodes {}, {}, {}", acquiredZnode, reachedZnode, abortZnode);
273
274    // If the coordinator was shutdown mid-procedure, then we are going to lose
275    // an procedure that was previously started by cleaning out all the previous state. Its much
276    // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
277    ZKUtil.deleteChildrenRecursivelyMultiOrSequential(watcher, true, acquiredZnode, reachedZnode,
278      abortZnode);
279
280    if (LOG.isTraceEnabled()) {
281      logZKTree(this.baseZNode);
282    }
283  }
284
285  public void clearZNodes(String procedureName) throws KeeperException {
286    LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
287        + acquiredZnode + " " + reachedZnode + " " + abortZnode);
288
289    // Make sure we trigger the watches on these nodes by creating them. (HBASE-13885)
290    String acquiredBarrierNode = getAcquiredBarrierNode(procedureName);
291    String reachedBarrierNode = getReachedBarrierNode(procedureName);
292    String abortZNode = getAbortZNode(procedureName);
293
294    ZKUtil.createAndFailSilent(watcher, acquiredBarrierNode);
295    ZKUtil.createAndFailSilent(watcher, abortZNode);
296
297    ZKUtil.deleteNodeRecursivelyMultiOrSequential(watcher, true, acquiredBarrierNode,
298      reachedBarrierNode, abortZNode);
299
300    if (LOG.isTraceEnabled()) {
301      logZKTree(this.baseZNode);
302    }
303  }
304}