001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.Deque;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.stream.Collectors;
029import org.apache.commons.lang3.StringUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.exceptions.DeserializationException;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
036import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent;
037import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.zookeeper.AsyncCallback;
040import org.apache.zookeeper.CreateMode;
041import org.apache.zookeeper.KeeperException;
042import org.apache.zookeeper.KeeperException.NoNodeException;
043import org.apache.zookeeper.Op;
044import org.apache.zookeeper.ZooKeeper;
045import org.apache.zookeeper.data.Stat;
046import org.apache.zookeeper.proto.CreateRequest;
047import org.apache.zookeeper.proto.DeleteRequest;
048import org.apache.zookeeper.proto.SetDataRequest;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
054
055/**
056 * Internal HBase utility class for ZooKeeper.
057 * <p>
058 * Contains only static methods and constants.
059 * <p>
060 * Methods all throw {@link KeeperException} if there is an unexpected zookeeper exception, so
061 * callers of these methods must handle appropriately. If ZK is required for the operation, the
062 * server will need to be aborted.
063 */
064@InterfaceAudience.Private
065public final class ZKUtil {
066  private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class);
067
068  private ZKUtil() {
069  }
070
071  //
072  // Helper methods
073  //
074  /**
075   * Returns the full path of the immediate parent of the specified node.
076   * @param node path to get parent of
077   * @return parent of path, null if passed the root node or an invalid node
078   */
079  public static String getParent(String node) {
080    int idx = node.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR);
081    return idx <= 0 ? null : node.substring(0, idx);
082  }
083
084  /**
085   * Get the name of the current node from the specified fully-qualified path.
086   * @param path fully-qualified path
087   * @return name of the current node
088   */
089  public static String getNodeName(String path) {
090    return path.substring(path.lastIndexOf("/") + 1);
091  }
092
093  //
094  // Existence checks and watches
095  //
096
097  /**
098   * Watch the specified znode for delete/create/change events. The watcher is set whether or not
099   * the node exists. If the node already exists, the method returns true. If the node does not
100   * exist, the method returns false.
101   * @param zkw   zk reference
102   * @param znode path of node to watch
103   * @return true if znode exists, false if does not exist or error
104   * @throws KeeperException if unexpected zookeeper exception
105   */
106  public static boolean watchAndCheckExists(ZKWatcher zkw, String znode) throws KeeperException {
107    try {
108      Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
109      boolean exists = s != null;
110      if (exists) {
111        LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode));
112      } else {
113        LOG.debug(zkw.prefix("Set watcher on znode that does not yet exist, " + znode));
114      }
115      return exists;
116    } catch (KeeperException e) {
117      LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
118      zkw.keeperException(e);
119      return false;
120    } catch (InterruptedException e) {
121      LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
122      zkw.interruptedException(e);
123      return false;
124    }
125  }
126
127  /**
128   * Watch the specified znode, but only if exists. Useful when watching for deletions. Uses
129   * .getData() (and handles NoNodeException) instead of .exists() to accomplish this, as .getData()
130   * will only set a watch if the znode exists.
131   * @param zkw   zk reference
132   * @param znode path of node to watch
133   * @return true if the watch is set, false if node does not exists
134   * @throws KeeperException if unexpected zookeeper exception
135   */
136  public static boolean setWatchIfNodeExists(ZKWatcher zkw, String znode) throws KeeperException {
137    try {
138      zkw.getRecoverableZooKeeper().getData(znode, true, null);
139      return true;
140    } catch (NoNodeException e) {
141      return false;
142    } catch (InterruptedException e) {
143      LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
144      zkw.interruptedException(e);
145      return false;
146    }
147  }
148
149  /**
150   * Check if the specified node exists. Sets no watches.
151   * @param zkw   zk reference
152   * @param znode path of node to watch
153   * @return version of the node if it exists, -1 if does not exist
154   * @throws KeeperException if unexpected zookeeper exception
155   */
156  public static int checkExists(ZKWatcher zkw, String znode) throws KeeperException {
157    try {
158      Stat s = zkw.getRecoverableZooKeeper().exists(znode, null);
159      return s != null ? s.getVersion() : -1;
160    } catch (KeeperException e) {
161      LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
162      zkw.keeperException(e);
163      return -1;
164    } catch (InterruptedException e) {
165      LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
166      zkw.interruptedException(e);
167      return -1;
168    }
169  }
170
171  //
172  // Znode listings
173  //
174
175  /**
176   * Lists the children znodes of the specified znode. Also sets a watch on the specified znode
177   * which will capture a NodeDeleted event on the specified znode as well as NodeChildrenChanged if
178   * any children of the specified znode are created or deleted. Returns null if the specified node
179   * does not exist. Otherwise returns a list of children of the specified node. If the node exists
180   * but it has no children, an empty list will be returned.
181   * @param zkw   zk reference
182   * @param znode path of node to list and watch children of
183   * @return list of children of the specified node, an empty list if the node exists but has no
184   *         children, and null if the node does not exist
185   * @throws KeeperException if unexpected zookeeper exception
186   */
187  public static List<String> listChildrenAndWatchForNewChildren(ZKWatcher zkw, String znode)
188    throws KeeperException {
189    try {
190      return zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
191    } catch (KeeperException.NoNodeException ke) {
192      LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " "
193        + "because node does not exist (not an error)"));
194    } catch (KeeperException e) {
195      LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e);
196      zkw.keeperException(e);
197    } catch (InterruptedException e) {
198      LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e);
199      zkw.interruptedException(e);
200    }
201
202    return null;
203  }
204
205  /**
206   * List all the children of the specified znode, setting a watch for children changes and also
207   * setting a watch on every individual child in order to get the NodeCreated and NodeDeleted
208   * events.
209   * @param zkw   zookeeper reference
210   * @param znode node to get children of and watch
211   * @return list of znode names, null if the node doesn't exist
212   * @throws KeeperException if a ZooKeeper operation fails
213   */
214  public static List<String> listChildrenAndWatchThem(ZKWatcher zkw, String znode)
215    throws KeeperException {
216    List<String> children = listChildrenAndWatchForNewChildren(zkw, znode);
217    if (children == null) {
218      return null;
219    }
220    for (String child : children) {
221      watchAndCheckExists(zkw, ZNodePaths.joinZNode(znode, child));
222    }
223    return children;
224  }
225
226  /**
227   * Lists the children of the specified znode without setting any watches. Sets no watches at all,
228   * this method is best effort. Returns an empty list if the node has no children. Returns null if
229   * the parent node itself does not exist.
230   * @param zkw   zookeeper reference
231   * @param znode node to get children
232   * @return list of data of children of specified znode, empty if no children, null if parent does
233   *         not exist
234   * @throws KeeperException if unexpected zookeeper exception
235   */
236  public static List<String> listChildrenNoWatch(ZKWatcher zkw, String znode)
237    throws KeeperException {
238    List<String> children = null;
239    try {
240      // List the children without watching
241      children = zkw.getRecoverableZooKeeper().getChildren(znode, null);
242    } catch (KeeperException.NoNodeException nne) {
243      return null;
244    } catch (InterruptedException ie) {
245      zkw.interruptedException(ie);
246    }
247    return children;
248  }
249
250  /**
251   * Simple class to hold a node path and node data.
252   * @deprecated Unused
253   */
254  @Deprecated
255  public static class NodeAndData {
256    private String node;
257    private byte[] data;
258
259    public NodeAndData(String node, byte[] data) {
260      this.node = node;
261      this.data = data;
262    }
263
264    public String getNode() {
265      return node;
266    }
267
268    public byte[] getData() {
269      return data;
270    }
271
272    @Override
273    public String toString() {
274      return node;
275    }
276
277    public boolean isEmpty() {
278      return (data == null || data.length == 0);
279    }
280  }
281
282  /**
283   * Checks if the specified znode has any children. Sets no watches. Returns true if the node
284   * exists and has children. Returns false if the node does not exist or if the node does not have
285   * any children. Used during master initialization to determine if the master is a failed-over-to
286   * master or the first master during initial cluster startup. If the directory for regionserver
287   * ephemeral nodes is empty then this is a cluster startup, if not then it is not cluster startup.
288   * @param zkw   zk reference
289   * @param znode path of node to check for children of
290   * @return true if node has children, false if not or node does not exist
291   * @throws KeeperException if unexpected zookeeper exception
292   */
293  public static boolean nodeHasChildren(ZKWatcher zkw, String znode) throws KeeperException {
294    try {
295      return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty();
296    } catch (KeeperException.NoNodeException ke) {
297      LOG.debug(zkw.prefix("Unable to list children of znode " + znode
298        + " because node does not exist (not an error)"));
299      return false;
300    } catch (KeeperException e) {
301      LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e);
302      zkw.keeperException(e);
303      return false;
304    } catch (InterruptedException e) {
305      LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e);
306      zkw.interruptedException(e);
307      return false;
308    }
309  }
310
311  /**
312   * Get the number of children of the specified node. If the node does not exist or has no
313   * children, returns 0. Sets no watches at all.
314   * @param zkw   zk reference
315   * @param znode path of node to count children of
316   * @return number of children of specified node, 0 if none or parent does not exist
317   * @throws KeeperException if unexpected zookeeper exception
318   */
319  public static int getNumberOfChildren(ZKWatcher zkw, String znode) throws KeeperException {
320    try {
321      Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null);
322      return stat == null ? 0 : stat.getNumChildren();
323    } catch (KeeperException e) {
324      LOG.warn(zkw.prefix("Unable to get children of node " + znode));
325      zkw.keeperException(e);
326    } catch (InterruptedException e) {
327      zkw.interruptedException(e);
328    }
329    return 0;
330  }
331
332  //
333  // Data retrieval
334  //
335
336  /**
337   * Get znode data. Does not set a watcher.
338   * @return ZNode data, null if the node does not exist or if there is an error.
339   */
340  public static byte[] getData(ZKWatcher zkw, String znode)
341    throws KeeperException, InterruptedException {
342    try {
343      byte[] data = zkw.getRecoverableZooKeeper().getData(znode, null, null);
344      logRetrievedMsg(zkw, znode, data, false);
345      return data;
346    } catch (KeeperException.NoNodeException e) {
347      LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " "
348        + "because node does not exist (not an error)"));
349      return null;
350    } catch (KeeperException e) {
351      LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
352      zkw.keeperException(e);
353      return null;
354    }
355  }
356
357  /**
358   * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the
359   * node exists. Returns null and no watch is set if the node does not exist or there is an
360   * exception.
361   * @param zkw   zk reference
362   * @param znode path of node
363   * @return data of the specified znode, or null
364   * @throws KeeperException if unexpected zookeeper exception
365   */
366  public static byte[] getDataAndWatch(ZKWatcher zkw, String znode) throws KeeperException {
367    return getDataInternal(zkw, znode, null, true, true);
368  }
369
370  /**
371   * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the
372   * node exists. Returns null and no watch is set if the node does not exist or there is an
373   * exception.
374   * @param zkw              zk reference
375   * @param znode            path of node
376   * @param throwOnInterrupt if false then just interrupt the thread, do not throw exception
377   * @return data of the specified znode, or null
378   * @throws KeeperException if unexpected zookeeper exception
379   */
380  public static byte[] getDataAndWatch(ZKWatcher zkw, String znode, boolean throwOnInterrupt)
381    throws KeeperException {
382    return getDataInternal(zkw, znode, null, true, throwOnInterrupt);
383  }
384
385  /**
386   * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the
387   * node exists. Returns null and no watch is set if the node does not exist or there is an
388   * exception.
389   * @param zkw   zk reference
390   * @param znode path of node
391   * @param stat  object to populate the version of the znode
392   * @return data of the specified znode, or null
393   * @throws KeeperException if unexpected zookeeper exception
394   */
395  public static byte[] getDataAndWatch(ZKWatcher zkw, String znode, Stat stat)
396    throws KeeperException {
397    return getDataInternal(zkw, znode, stat, true, true);
398  }
399
400  private static byte[] getDataInternal(ZKWatcher zkw, String znode, Stat stat, boolean watcherSet,
401    boolean throwOnInterrupt) throws KeeperException {
402    try {
403      byte[] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, stat);
404      logRetrievedMsg(zkw, znode, data, watcherSet);
405      return data;
406    } catch (KeeperException.NoNodeException e) {
407      // This log can get pretty annoying when we cycle on 100ms waits.
408      // Enable trace if you really want to see it.
409      LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " "
410        + "because node does not exist (not an error)"));
411      return null;
412    } catch (KeeperException e) {
413      LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
414      zkw.keeperException(e);
415      return null;
416    } catch (InterruptedException e) {
417      LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
418      if (throwOnInterrupt) {
419        zkw.interruptedException(e);
420      } else {
421        zkw.interruptedExceptionNoThrow(e, true);
422      }
423      return null;
424    }
425  }
426
427  /**
428   * Get the data at the specified znode without setting a watch. Returns the data if the node
429   * exists. Returns null if the node does not exist. Sets the stats of the node in the passed Stat
430   * object. Pass a null stat if not interested.
431   * @param zkw   zk reference
432   * @param znode path of node
433   * @param stat  node status to get if node exists
434   * @return data of the specified znode, or null if node does not exist
435   * @throws KeeperException if unexpected zookeeper exception
436   */
437  public static byte[] getDataNoWatch(ZKWatcher zkw, String znode, Stat stat)
438    throws KeeperException {
439    try {
440      byte[] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat);
441      logRetrievedMsg(zkw, znode, data, false);
442      return data;
443    } catch (KeeperException.NoNodeException e) {
444      LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " "
445        + "because node does not exist (not necessarily an error)"));
446      return null;
447    } catch (KeeperException e) {
448      LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
449      zkw.keeperException(e);
450      return null;
451    } catch (InterruptedException e) {
452      LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
453      zkw.interruptedException(e);
454      return null;
455    }
456  }
457
458  /**
459   * Returns the date of child znodes of the specified znode. Also sets a watch on the specified
460   * znode which will capture a NodeDeleted event on the specified znode as well as
461   * NodeChildrenChanged if any children of the specified znode are created or deleted. Returns null
462   * if the specified node does not exist. Otherwise returns a list of children of the specified
463   * node. If the node exists but it has no children, an empty list will be returned.
464   * @param zkw      zk reference
465   * @param baseNode path of node to list and watch children of
466   * @return list of data of children of the specified node, an empty list if the node exists but
467   *         has no children, and null if the node does not exist
468   * @throws KeeperException if unexpected zookeeper exception
469   * @deprecated Unused
470   */
471  @Deprecated
472  public static List<NodeAndData> getChildDataAndWatchForNewChildren(ZKWatcher zkw, String baseNode)
473    throws KeeperException {
474    return getChildDataAndWatchForNewChildren(zkw, baseNode, true);
475  }
476
477  /**
478   * Returns the date of child znodes of the specified znode. Also sets a watch on the specified
479   * znode which will capture a NodeDeleted event on the specified znode as well as
480   * NodeChildrenChanged if any children of the specified znode are created or deleted. Returns null
481   * if the specified node does not exist. Otherwise returns a list of children of the specified
482   * node. If the node exists but it has no children, an empty list will be returned.
483   * @param zkw              zk reference
484   * @param baseNode         path of node to list and watch children of
485   * @param throwOnInterrupt if true then just interrupt the thread, do not throw exception
486   * @return list of data of children of the specified node, an empty list if the node exists but
487   *         has no children, and null if the node does not exist
488   * @throws KeeperException if unexpected zookeeper exception
489   * @deprecated Unused
490   */
491  @Deprecated
492  public static List<NodeAndData> getChildDataAndWatchForNewChildren(ZKWatcher zkw, String baseNode,
493    boolean throwOnInterrupt) throws KeeperException {
494    List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode);
495    if (nodes != null) {
496      List<NodeAndData> newNodes = new ArrayList<>();
497      for (String node : nodes) {
498        if (Thread.interrupted()) {
499          // Partial data should not be processed. Cancel processing by sending empty list.
500          return Collections.emptyList();
501        }
502        String nodePath = ZNodePaths.joinZNode(baseNode, node);
503        byte[] data = ZKUtil.getDataAndWatch(zkw, nodePath, throwOnInterrupt);
504        newNodes.add(new NodeAndData(nodePath, data));
505      }
506      return newNodes;
507    }
508    return null;
509  }
510
511  /**
512   * Update the data of an existing node with the expected version to have the specified data.
513   * Throws an exception if there is a version mismatch or some other problem. Sets no watches under
514   * any conditions.
515   * @param zkw             zk reference
516   * @param znode           the path to the ZNode
517   * @param data            the data to store in ZooKeeper
518   * @param expectedVersion the expected version
519   * @throws KeeperException                     if unexpected zookeeper exception
520   * @throws KeeperException.BadVersionException if version mismatch
521   * @deprecated Unused
522   */
523  @Deprecated
524  public static void updateExistingNodeData(ZKWatcher zkw, String znode, byte[] data,
525    int expectedVersion) throws KeeperException {
526    try {
527      zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion);
528    } catch (InterruptedException ie) {
529      zkw.interruptedException(ie);
530    }
531  }
532
533  //
534  // Data setting
535  //
536
537  /**
538   * Sets the data of the existing znode to be the specified data. Ensures that the current data has
539   * the specified expected version.
540   * <p>
541   * If the node does not exist, a {@link NoNodeException} will be thrown.
542   * <p>
543   * If their is a version mismatch, method returns null.
544   * <p>
545   * No watches are set but setting data will trigger other watchers of this node.
546   * <p>
547   * If there is another problem, a KeeperException will be thrown.
548   * @param zkw             zk reference
549   * @param znode           path of node
550   * @param data            data to set for node
551   * @param expectedVersion version expected when setting data
552   * @return true if data set, false if version mismatch
553   * @throws KeeperException if unexpected zookeeper exception
554   */
555  public static boolean setData(ZKWatcher zkw, String znode, byte[] data, int expectedVersion)
556    throws KeeperException, KeeperException.NoNodeException {
557    try {
558      return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null;
559    } catch (InterruptedException e) {
560      zkw.interruptedException(e);
561      return false;
562    }
563  }
564
565  /**
566   * Set data into node creating node if it doesn't yet exist. Does not set watch.
567   * @param zkw   zk reference
568   * @param znode path of node
569   * @param data  data to set for node
570   * @throws KeeperException if a ZooKeeper operation fails
571   */
572  public static void createSetData(final ZKWatcher zkw, final String znode, final byte[] data)
573    throws KeeperException {
574    if (checkExists(zkw, znode) == -1) {
575      ZKUtil.createWithParents(zkw, znode, data);
576    } else {
577      ZKUtil.setData(zkw, znode, data);
578    }
579  }
580
581  /**
582   * Sets the data of the existing znode to be the specified data. The node must exist but no checks
583   * are done on the existing data or version.
584   * <p>
585   * If the node does not exist, a {@link NoNodeException} will be thrown.
586   * <p>
587   * No watches are set but setting data will trigger other watchers of this node.
588   * <p>
589   * If there is another problem, a KeeperException will be thrown.
590   * @param zkw   zk reference
591   * @param znode path of node
592   * @param data  data to set for node
593   * @throws KeeperException if unexpected zookeeper exception
594   */
595  public static void setData(ZKWatcher zkw, String znode, byte[] data)
596    throws KeeperException, KeeperException.NoNodeException {
597    setData(zkw, (SetData) ZKUtilOp.setData(znode, data));
598  }
599
600  private static void setData(ZKWatcher zkw, SetData setData)
601    throws KeeperException, KeeperException.NoNodeException {
602    SetDataRequest sd = (SetDataRequest) toZooKeeperOp(zkw, setData).toRequestRecord();
603    setData(zkw, sd.getPath(), sd.getData(), sd.getVersion());
604  }
605
606  //
607  // Node creation
608  //
609
610  /**
611   * Set the specified znode to be an ephemeral node carrying the specified data. If the node is
612   * created successfully, a watcher is also set on the node. If the node is not created
613   * successfully because it already exists, this method will also set a watcher on the node. If
614   * there is another problem, a KeeperException will be thrown.
615   * @param zkw   zk reference
616   * @param znode path of node
617   * @param data  data of node
618   * @return true if node created, false if not, watch set in both cases
619   * @throws KeeperException if unexpected zookeeper exception
620   */
621  public static boolean createEphemeralNodeAndWatch(ZKWatcher zkw, String znode, byte[] data)
622    throws KeeperException {
623    boolean ret = true;
624    try {
625      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), CreateMode.EPHEMERAL);
626    } catch (KeeperException.NodeExistsException nee) {
627      ret = false;
628    } catch (InterruptedException e) {
629      LOG.info("Interrupted", e);
630      Thread.currentThread().interrupt();
631    }
632    if (!watchAndCheckExists(zkw, znode)) {
633      // It did exist but now it doesn't, try again
634      return createEphemeralNodeAndWatch(zkw, znode, data);
635    }
636    return ret;
637  }
638
639  /**
640   * Creates the specified znode to be a persistent node carrying the specified data. Returns true
641   * if the node was successfully created, false if the node already existed. If the node is created
642   * successfully, a watcher is also set on the node. If the node is not created successfully
643   * because it already exists, this method will also set a watcher on the node but return false. If
644   * there is another problem, a KeeperException will be thrown.
645   * @param zkw   zk reference
646   * @param znode path of node
647   * @param data  data of node
648   * @return true if node created, false if not, watch set in both cases
649   * @throws KeeperException if unexpected zookeeper exception
650   */
651  public static boolean createNodeIfNotExistsAndWatch(ZKWatcher zkw, String znode, byte[] data)
652    throws KeeperException {
653    boolean ret = true;
654    try {
655      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode),
656        CreateMode.PERSISTENT);
657    } catch (KeeperException.NodeExistsException nee) {
658      ret = false;
659    } catch (InterruptedException e) {
660      zkw.interruptedException(e);
661      return false;
662    }
663    try {
664      zkw.getRecoverableZooKeeper().exists(znode, zkw);
665    } catch (InterruptedException e) {
666      zkw.interruptedException(e);
667      return false;
668    }
669    return ret;
670  }
671
672  /**
673   * Creates the specified znode with the specified data but does not watch it. Returns the znode of
674   * the newly created node If there is another problem, a KeeperException will be thrown.
675   * @param zkw        zk reference
676   * @param znode      path of node
677   * @param data       data of node
678   * @param createMode specifying whether the node to be created is ephemeral and/or sequential
679   * @return true name of the newly created znode or null
680   * @throws KeeperException if unexpected zookeeper exception
681   */
682  public static String createNodeIfNotExistsNoWatch(ZKWatcher zkw, String znode, byte[] data,
683    CreateMode createMode) throws KeeperException {
684    try {
685      return zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), createMode);
686    } catch (KeeperException.NodeExistsException nee) {
687      return znode;
688    } catch (InterruptedException e) {
689      zkw.interruptedException(e);
690      return null;
691    }
692  }
693
694  /**
695   * Creates the specified node with the specified data and watches it.
696   * <p>
697   * Throws an exception if the node already exists.
698   * <p>
699   * The node created is persistent and open access.
700   * <p>
701   * Returns the version number of the created node if successful.
702   * @param zkw   zk reference
703   * @param znode path of node to create
704   * @param data  data of node to create
705   * @return version of node created
706   * @throws KeeperException                     if unexpected zookeeper exception
707   * @throws KeeperException.NodeExistsException if node already exists
708   */
709  public static int createAndWatch(ZKWatcher zkw, String znode, byte[] data)
710    throws KeeperException, KeeperException.NodeExistsException {
711    try {
712      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode),
713        CreateMode.PERSISTENT);
714      Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw);
715      if (stat == null) {
716        // Likely a race condition. Someone deleted the znode.
717        throw KeeperException.create(KeeperException.Code.SYSTEMERROR,
718          "ZK.exists returned null (i.e.: znode does not exist) for znode=" + znode);
719      }
720
721      return stat.getVersion();
722    } catch (InterruptedException e) {
723      zkw.interruptedException(e);
724      return -1;
725    }
726  }
727
728  /**
729   * Async creates the specified node with the specified data.
730   * <p>
731   * Throws an exception if the node already exists.
732   * <p>
733   * The node created is persistent and open access.
734   * @param zkw   zk reference
735   * @param znode path of node to create
736   * @param data  data of node to create
737   * @param cb    the callback to use for the creation
738   * @param ctx   the context to use for the creation
739   */
740  public static void asyncCreate(ZKWatcher zkw, String znode, byte[] data,
741    final AsyncCallback.StringCallback cb, final Object ctx) {
742    zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, zkw.createACL(znode),
743      CreateMode.PERSISTENT, cb, ctx);
744  }
745
746  /**
747   * Creates the specified node, iff the node does not exist. Does not set a watch and fails
748   * silently if the node already exists. The node created is persistent and open access.
749   * @param zkw   zk reference
750   * @param znode path of node
751   * @throws KeeperException if unexpected zookeeper exception
752   */
753  public static void createAndFailSilent(ZKWatcher zkw, String znode) throws KeeperException {
754    createAndFailSilent(zkw, znode, new byte[0]);
755  }
756
757  /**
758   * Creates the specified node containing specified data, iff the node does not exist. Does not set
759   * a watch and fails silently if the node already exists. The node created is persistent and open
760   * access.
761   * @param zkw   zk reference
762   * @param znode path of node
763   * @param data  a byte array data to store in the znode
764   * @throws KeeperException if unexpected zookeeper exception
765   */
766  public static void createAndFailSilent(ZKWatcher zkw, String znode, byte[] data)
767    throws KeeperException {
768    createAndFailSilent(zkw, (CreateAndFailSilent) ZKUtilOp.createAndFailSilent(znode, data));
769  }
770
771  private static void createAndFailSilent(ZKWatcher zkw, CreateAndFailSilent cafs)
772    throws KeeperException {
773    CreateRequest create = (CreateRequest) toZooKeeperOp(zkw, cafs).toRequestRecord();
774    String znode = create.getPath();
775    try {
776      RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
777      if (zk.exists(znode, false) == null) {
778        zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags()));
779      }
780    } catch (KeeperException.NodeExistsException nee) {
781      // pass
782    } catch (KeeperException.NoAuthException nee) {
783      try {
784        if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) {
785          // If we failed to create the file and it does not already exist.
786          throw (nee);
787        }
788      } catch (InterruptedException ie) {
789        zkw.interruptedException(ie);
790      }
791    } catch (InterruptedException ie) {
792      zkw.interruptedException(ie);
793    }
794  }
795
796  /**
797   * Creates the specified node and all parent nodes required for it to exist. No watches are set
798   * and no errors are thrown if the node already exists. The nodes created are persistent and open
799   * access.
800   * @param zkw   zk reference
801   * @param znode path of node
802   * @throws KeeperException if unexpected zookeeper exception
803   */
804  public static void createWithParents(ZKWatcher zkw, String znode) throws KeeperException {
805    createWithParents(zkw, znode, new byte[0]);
806  }
807
808  /**
809   * Creates the specified node and all parent nodes required for it to exist. The creation of
810   * parent znodes is not atomic with the leafe znode creation but the data is written atomically
811   * when the leaf node is created. No watches are set and no errors are thrown if the node already
812   * exists. The nodes created are persistent and open access.
813   * @param zkw   zk reference
814   * @param znode path of node
815   * @throws KeeperException if unexpected zookeeper exception
816   */
817  public static void createWithParents(ZKWatcher zkw, String znode, byte[] data)
818    throws KeeperException {
819    try {
820      if (znode == null) {
821        return;
822      }
823      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode),
824        CreateMode.PERSISTENT);
825    } catch (KeeperException.NodeExistsException nee) {
826      return;
827    } catch (KeeperException.NoNodeException nne) {
828      createWithParents(zkw, getParent(znode));
829      createWithParents(zkw, znode, data);
830    } catch (InterruptedException ie) {
831      zkw.interruptedException(ie);
832    }
833  }
834
835  //
836  // Deletes
837  //
838
839  /**
840   * Delete the specified node. Sets no watches. Throws all exceptions.
841   */
842  public static void deleteNode(ZKWatcher zkw, String node) throws KeeperException {
843    deleteNode(zkw, node, -1);
844  }
845
846  /**
847   * Delete the specified node with the specified version. Sets no watches. Throws all exceptions.
848   */
849  public static boolean deleteNode(ZKWatcher zkw, String node, int version) throws KeeperException {
850    try {
851      zkw.getRecoverableZooKeeper().delete(node, version);
852      return true;
853    } catch (KeeperException.BadVersionException bve) {
854      return false;
855    } catch (InterruptedException ie) {
856      zkw.interruptedException(ie);
857      return false;
858    }
859  }
860
861  /**
862   * Deletes the specified node. Fails silent if the node does not exist.
863   * @param zkw  reference to the {@link ZKWatcher} which also contains configuration and operation
864   * @param node the node to delete
865   * @throws KeeperException if a ZooKeeper operation fails
866   */
867  public static void deleteNodeFailSilent(ZKWatcher zkw, String node) throws KeeperException {
868    deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) ZKUtilOp.deleteNodeFailSilent(node));
869  }
870
871  private static void deleteNodeFailSilent(ZKWatcher zkw, DeleteNodeFailSilent dnfs)
872    throws KeeperException {
873    DeleteRequest delete = (DeleteRequest) toZooKeeperOp(zkw, dnfs).toRequestRecord();
874    try {
875      zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion());
876    } catch (KeeperException.NoNodeException nne) {
877    } catch (InterruptedException ie) {
878      zkw.interruptedException(ie);
879    }
880  }
881
882  /**
883   * Delete the specified node and all of it's children.
884   * <p>
885   * If the node does not exist, just returns.
886   * <p>
887   * Sets no watches. Throws all exceptions besides dealing with deletion of children.
888   */
889  public static void deleteNodeRecursively(ZKWatcher zkw, String node) throws KeeperException {
890    deleteNodeRecursivelyMultiOrSequential(zkw, true, node);
891  }
892
893  /**
894   * Delete all the children of the specified node but not the node itself. Sets no watches. Throws
895   * all exceptions besides dealing with deletion of children.
896   * @throws KeeperException if a ZooKeeper operation fails
897   */
898  public static void deleteChildrenRecursively(ZKWatcher zkw, String node) throws KeeperException {
899    deleteChildrenRecursivelyMultiOrSequential(zkw, true, node);
900  }
901
902  /**
903   * Delete all the children of the specified node but not the node itself. This will first traverse
904   * the znode tree for listing the children and then delete these znodes using multi-update api or
905   * sequential based on the specified configurations.
906   * <p>
907   * Sets no watches. Throws all exceptions besides dealing with deletion of children.
908   * <p>
909   * If the following is true:
910   * <ul>
911   * <li>runSequentialOnMultiFailure is true
912   * </ul>
913   * on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*), we
914   * retry the operations one-by-one (sequentially). - zk reference - if true when we get a
915   * ZooKeeper exception that could retry the operations one-by-one (sequentially) - path of the
916   * parent node(s)
917   * @throws KeeperException.NotEmptyException if node has children while deleting if unexpected
918   *                                           ZooKeeper exception if an invalid path is specified
919   */
920  public static void deleteChildrenRecursivelyMultiOrSequential(ZKWatcher zkw,
921    boolean runSequentialOnMultiFailure, String... pathRoots) throws KeeperException {
922    if (pathRoots == null || pathRoots.length <= 0) {
923      LOG.warn("Given path is not valid!");
924      return;
925    }
926    List<ZKUtilOp> ops = new ArrayList<>();
927    for (String eachRoot : pathRoots) {
928      List<String> children = listChildrenBFSNoWatch(zkw, eachRoot);
929      // Delete the leaves first and eventually get rid of the root
930      for (int i = children.size() - 1; i >= 0; --i) {
931        ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i)));
932      }
933    }
934    submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops);
935  }
936
937  /**
938   * Delete the specified node and its children. This traverse the znode tree for listing the
939   * children and then delete these znodes including the parent using multi-update api or sequential
940   * based on the specified configurations.
941   * <p>
942   * Sets no watches. Throws all exceptions besides dealing with deletion of children.
943   * <p>
944   * If the following is true:
945   * <ul>
946   * <li>runSequentialOnMultiFailure is true
947   * </ul>
948   * on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*), we
949   * retry the operations one-by-one (sequentially). - zk reference - if true when we get a
950   * ZooKeeper exception that could retry the operations one-by-one (sequentially) - path of the
951   * parent node(s)
952   * @throws KeeperException.NotEmptyException if node has children while deleting if unexpected
953   *                                           ZooKeeper exception if an invalid path is specified
954   */
955  public static void deleteNodeRecursivelyMultiOrSequential(ZKWatcher zkw,
956    boolean runSequentialOnMultiFailure, String... pathRoots) throws KeeperException {
957    if (pathRoots == null || pathRoots.length <= 0) {
958      LOG.warn("Given path is not valid!");
959      return;
960    }
961    List<ZKUtilOp> ops = new ArrayList<>();
962    for (String eachRoot : pathRoots) {
963      // ZooKeeper Watches are one time triggers; When children of parent nodes are deleted
964      // recursively, must set another watch, get notified of delete node
965      List<String> children = listChildrenBFSAndWatchThem(zkw, eachRoot);
966      // Delete the leaves first and eventually get rid of the root
967      for (int i = children.size() - 1; i >= 0; --i) {
968        ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i)));
969      }
970      try {
971        if (zkw.getRecoverableZooKeeper().exists(eachRoot, zkw) != null) {
972          ops.add(ZKUtilOp.deleteNodeFailSilent(eachRoot));
973        }
974      } catch (InterruptedException e) {
975        zkw.interruptedException(e);
976      }
977    }
978    submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops);
979  }
980
981  /**
982   * Chunks the provided {@code ops} when their approximate size exceeds the the configured limit.
983   * Take caution that this can ONLY be used for operations where atomicity is not important, e.g.
984   * deletions. It must not be used when atomicity of the operations is critical.
985   * @param zkw                         reference to the {@link ZKWatcher} which contains
986   *                                    configuration and constants
987   * @param runSequentialOnMultiFailure if true when we get a ZooKeeper exception that could retry
988   *                                    the operations one-by-one (sequentially)
989   * @param ops                         list of ZKUtilOp {@link ZKUtilOp} to partition while
990   *                                    submitting batched multi or sequential
991   * @throws KeeperException unexpected ZooKeeper Exception / Zookeeper unreachable
992   */
993  private static void submitBatchedMultiOrSequential(ZKWatcher zkw,
994    boolean runSequentialOnMultiFailure, List<ZKUtilOp> ops) throws KeeperException {
995    // at least one element should exist
996    if (ops.isEmpty()) {
997      return;
998    }
999    final int maxMultiSize = zkw.getRecoverableZooKeeper().getMaxMultiSizeLimit();
1000    // Batch up the items to over smashing through jute.maxbuffer with too many Ops.
1001    final List<List<ZKUtilOp>> batchedOps = partitionOps(ops, maxMultiSize);
1002    // Would use forEach() but have to handle KeeperException
1003    for (List<ZKUtilOp> batch : batchedOps) {
1004      multiOrSequential(zkw, batch, runSequentialOnMultiFailure);
1005    }
1006  }
1007
1008  /**
1009   * Partition the list of {@code ops} by size (using {@link #estimateSize(ZKUtilOp)}).
1010   */
1011  static List<List<ZKUtilOp>> partitionOps(List<ZKUtilOp> ops, int maxPartitionSize) {
1012    List<List<ZKUtilOp>> partitionedOps = new ArrayList<>();
1013    List<ZKUtilOp> currentPartition = new ArrayList<>();
1014    int currentPartitionSize = 0;
1015    partitionedOps.add(currentPartition);
1016    Iterator<ZKUtilOp> iter = ops.iterator();
1017    while (iter.hasNext()) {
1018      ZKUtilOp currentOp = iter.next();
1019      int currentOpSize = estimateSize(currentOp);
1020
1021      // Roll a new partition if necessary
1022      // If the current partition is empty, put the element in there anyways.
1023      // We can roll a new partition if we get another element
1024      if (!currentPartition.isEmpty() && currentOpSize + currentPartitionSize > maxPartitionSize) {
1025        currentPartition = new ArrayList<>();
1026        partitionedOps.add(currentPartition);
1027        currentPartitionSize = 0;
1028      }
1029
1030      // Add the current op to the partition
1031      currentPartition.add(currentOp);
1032      // And record its size
1033      currentPartitionSize += currentOpSize;
1034    }
1035    return partitionedOps;
1036  }
1037
1038  static int estimateSize(ZKUtilOp op) {
1039    return Bytes.toBytes(op.getPath()).length;
1040  }
1041
1042  /**
1043   * BFS Traversal of all the children under path, with the entries in the list, in the same order
1044   * as that of the traversal. Lists all the children without setting any watches. - zk reference -
1045   * path of node
1046   * @return list of children znodes under the path if unexpected ZooKeeper exception
1047   */
1048  private static List<String> listChildrenBFSNoWatch(ZKWatcher zkw, final String znode)
1049    throws KeeperException {
1050    Deque<String> queue = new LinkedList<>();
1051    List<String> tree = new ArrayList<>();
1052    queue.add(znode);
1053    while (true) {
1054      String node = queue.pollFirst();
1055      if (node == null) {
1056        break;
1057      }
1058      List<String> children = listChildrenNoWatch(zkw, node);
1059      if (children == null) {
1060        continue;
1061      }
1062      for (final String child : children) {
1063        final String childPath = node + "/" + child;
1064        queue.add(childPath);
1065        tree.add(childPath);
1066      }
1067    }
1068    return tree;
1069  }
1070
1071  /**
1072   * BFS Traversal of all the children under path, with the entries in the list, in the same order
1073   * as that of the traversal. Lists all the children and set watches on to them. - zk reference -
1074   * path of node
1075   * @return list of children znodes under the path if unexpected ZooKeeper exception
1076   */
1077  private static List<String> listChildrenBFSAndWatchThem(ZKWatcher zkw, final String znode)
1078    throws KeeperException {
1079    Deque<String> queue = new LinkedList<>();
1080    List<String> tree = new ArrayList<>();
1081    queue.add(znode);
1082    while (true) {
1083      String node = queue.pollFirst();
1084      if (node == null) {
1085        break;
1086      }
1087      List<String> children = listChildrenAndWatchThem(zkw, node);
1088      if (children == null) {
1089        continue;
1090      }
1091      for (final String child : children) {
1092        final String childPath = node + "/" + child;
1093        queue.add(childPath);
1094        tree.add(childPath);
1095      }
1096    }
1097    return tree;
1098  }
1099
1100  /**
1101   * Represents an action taken by ZKUtil, e.g. createAndFailSilent. These actions are higher-level
1102   * than ZKOp actions, which represent individual actions in the ZooKeeper API, like create.
1103   */
1104  public abstract static class ZKUtilOp {
1105    private String path;
1106
1107    @Override
1108    public String toString() {
1109      return this.getClass().getSimpleName() + ", path=" + this.path;
1110    }
1111
1112    private ZKUtilOp(String path) {
1113      this.path = path;
1114    }
1115
1116    /** Returns a createAndFailSilent ZKUtilOp */
1117    public static ZKUtilOp createAndFailSilent(String path, byte[] data) {
1118      return new CreateAndFailSilent(path, data);
1119    }
1120
1121    /** Returns a deleteNodeFailSilent ZKUtilOP */
1122    public static ZKUtilOp deleteNodeFailSilent(String path) {
1123      return new DeleteNodeFailSilent(path);
1124    }
1125
1126    /** Returns a setData ZKUtilOp */
1127    public static ZKUtilOp setData(String path, byte[] data) {
1128      return new SetData(path, data);
1129    }
1130
1131    /** Returns a setData ZKUtilOp */
1132    public static ZKUtilOp setData(String path, byte[] data, int version) {
1133      return new SetData(path, data, version);
1134    }
1135
1136    /** Returns path to znode where the ZKOp will occur */
1137    public String getPath() {
1138      return path;
1139    }
1140
1141    /**
1142     * ZKUtilOp representing createAndFailSilent in ZooKeeper (attempt to create node, ignore error
1143     * if already exists)
1144     */
1145    public static final class CreateAndFailSilent extends ZKUtilOp {
1146      private byte[] data;
1147
1148      private CreateAndFailSilent(String path, byte[] data) {
1149        super(path);
1150        this.data = data;
1151      }
1152
1153      public byte[] getData() {
1154        return data;
1155      }
1156
1157      @Override
1158      public boolean equals(Object o) {
1159        if (this == o) {
1160          return true;
1161        }
1162        if (!(o instanceof CreateAndFailSilent)) {
1163          return false;
1164        }
1165
1166        CreateAndFailSilent op = (CreateAndFailSilent) o;
1167        return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
1168      }
1169
1170      @Override
1171      public int hashCode() {
1172        int ret = 17 + getPath().hashCode() * 31;
1173        return ret * 31 + Bytes.hashCode(data);
1174      }
1175    }
1176
1177    /**
1178     * ZKUtilOp representing deleteNodeFailSilent in ZooKeeper (attempt to delete node, ignore error
1179     * if node doesn't exist)
1180     */
1181    public static final class DeleteNodeFailSilent extends ZKUtilOp {
1182      private DeleteNodeFailSilent(String path) {
1183        super(path);
1184      }
1185
1186      @Override
1187      public boolean equals(Object o) {
1188        if (this == o) {
1189          return true;
1190        }
1191        if (!(o instanceof DeleteNodeFailSilent)) {
1192          return false;
1193        }
1194
1195        return super.equals(o);
1196      }
1197
1198      @Override
1199      public int hashCode() {
1200        return getPath().hashCode();
1201      }
1202    }
1203
1204    /**
1205     * ZKUtilOp representing setData in ZooKeeper
1206     */
1207    public static final class SetData extends ZKUtilOp {
1208      private byte[] data;
1209      private int version = -1;
1210
1211      private SetData(String path, byte[] data) {
1212        super(path);
1213        this.data = data;
1214      }
1215
1216      private SetData(String path, byte[] data, int version) {
1217        super(path);
1218        this.data = data;
1219        this.version = version;
1220      }
1221
1222      public byte[] getData() {
1223        return data;
1224      }
1225
1226      public int getVersion() {
1227        return version;
1228      }
1229
1230      @Override
1231      public boolean equals(Object o) {
1232        if (this == o) {
1233          return true;
1234        }
1235        if (!(o instanceof SetData)) {
1236          return false;
1237        }
1238
1239        SetData op = (SetData) o;
1240        return getPath().equals(op.getPath()) && Arrays.equals(data, op.data)
1241          && getVersion() == op.getVersion();
1242      }
1243
1244      @Override
1245      public int hashCode() {
1246        int ret = getPath().hashCode();
1247        ret = ret * 31 + Bytes.hashCode(data);
1248        return ret * 31 + Integer.hashCode(version);
1249      }
1250    }
1251  }
1252
1253  /**
1254   * Convert from ZKUtilOp to ZKOp
1255   */
1256  private static Op toZooKeeperOp(ZKWatcher zkw, ZKUtilOp op) throws UnsupportedOperationException {
1257    if (op == null) {
1258      return null;
1259    }
1260
1261    if (op instanceof CreateAndFailSilent) {
1262      CreateAndFailSilent cafs = (CreateAndFailSilent) op;
1263      return Op.create(cafs.getPath(), cafs.getData(), zkw.createACL(cafs.getPath()),
1264        CreateMode.PERSISTENT);
1265    } else if (op instanceof DeleteNodeFailSilent) {
1266      DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent) op;
1267      return Op.delete(dnfs.getPath(), -1);
1268    } else if (op instanceof SetData) {
1269      SetData sd = (SetData) op;
1270      return Op.setData(sd.getPath(), sd.getData(), sd.getVersion());
1271    } else {
1272      throw new UnsupportedOperationException(
1273        "Unexpected ZKUtilOp type: " + op.getClass().getName());
1274    }
1275  }
1276
1277  // Static boolean for warning about useMulti because ideally there will be one warning per
1278  // process instance. It is fine if two threads end up racing on this for a bit. We do not
1279  // need to use an atomic type for this, that is overkill. The goal of reducing the number of
1280  // warnings from many to one or a few at startup is still achieved.
1281  private static boolean useMultiWarn = true;
1282
1283  /**
1284   * Use ZooKeeper's multi-update functionality. If all of the following are true: -
1285   * runSequentialOnMultiFailure is true - on calling multi, we get a ZooKeeper exception that can
1286   * be handled by a sequential call(*) Then: - we retry the operations one-by-one (sequentially)
1287   * Note *: an example is receiving a NodeExistsException from a "create" call. Without multi, a
1288   * user could call "createAndFailSilent" to ensure that a node exists if they don't care who
1289   * actually created the node (i.e. the NodeExistsException from ZooKeeper is caught). This will
1290   * cause all operations in the multi to fail, however, because the NodeExistsException that
1291   * zk.create throws will fail the multi transaction. In this case, if the previous conditions
1292   * hold, the commands are run sequentially, which should result in the correct final state, but
1293   * means that the operations will not run atomically.
1294   * @throws KeeperException if a ZooKeeper operation fails
1295   */
1296  public static void multiOrSequential(ZKWatcher zkw, List<ZKUtilOp> ops,
1297    boolean runSequentialOnMultiFailure) throws KeeperException {
1298    if (ops == null) {
1299      return;
1300    }
1301    if (useMultiWarn) { // Only check and warn at first use
1302      if (zkw.getConfiguration().get("hbase.zookeeper.useMulti") != null) {
1303        LOG.warn("hbase.zookeeper.useMulti is deprecated. Default to true always.");
1304      }
1305      useMultiWarn = false;
1306    }
1307    List<Op> zkOps = new LinkedList<>();
1308    for (ZKUtilOp op : ops) {
1309      zkOps.add(toZooKeeperOp(zkw, op));
1310    }
1311    try {
1312      zkw.getRecoverableZooKeeper().multi(zkOps);
1313    } catch (KeeperException ke) {
1314      switch (ke.code()) {
1315        case NODEEXISTS:
1316        case NONODE:
1317        case BADVERSION:
1318        case NOAUTH:
1319        case NOTEMPTY:
1320          // if we get an exception that could be solved by running sequentially
1321          // (and the client asked us to), then break out and run sequentially
1322          if (runSequentialOnMultiFailure) {
1323            LOG.info(
1324              "multi exception: {}; running operations sequentially "
1325                + "(runSequentialOnMultiFailure=true); {}",
1326              ke.toString(), ops.stream().map(o -> o.toString()).collect(Collectors.joining(",")));
1327            processSequentially(zkw, ops);
1328            break;
1329          }
1330        default:
1331          throw ke;
1332      }
1333    } catch (InterruptedException ie) {
1334      zkw.interruptedException(ie);
1335    }
1336  }
1337
1338  private static void processSequentially(ZKWatcher zkw, List<ZKUtilOp> ops)
1339    throws KeeperException, NoNodeException {
1340    for (ZKUtilOp op : ops) {
1341      if (op instanceof CreateAndFailSilent) {
1342        createAndFailSilent(zkw, (CreateAndFailSilent) op);
1343      } else if (op instanceof DeleteNodeFailSilent) {
1344        deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) op);
1345      } else if (op instanceof SetData) {
1346        setData(zkw, (SetData) op);
1347      } else {
1348        throw new UnsupportedOperationException(
1349          "Unexpected ZKUtilOp type: " + op.getClass().getName());
1350      }
1351    }
1352  }
1353
1354  //
1355  // ZooKeeper cluster information
1356  //
1357
1358  private static void logRetrievedMsg(final ZKWatcher zkw, final String znode, final byte[] data,
1359    final boolean watcherSet) {
1360    if (!LOG.isTraceEnabled()) {
1361      return;
1362    }
1363
1364    LOG.trace(zkw.prefix("Retrieved " + ((data == null) ? 0 : data.length)
1365      + " byte(s) of data from znode " + znode + (watcherSet ? " and set watcher; " : "; data=")
1366      + (data == null ? "null"
1367        : data.length == 0 ? "empty"
1368        : (zkw.getZNodePaths().isMetaZNodePath(znode) ? getServerNameOrEmptyString(data)
1369          : znode.startsWith(zkw.getZNodePaths().backupMasterAddressesZNode)
1370            ? getServerNameOrEmptyString(data)
1371          : StringUtils.abbreviate(Bytes.toStringBinary(data), 32)))));
1372  }
1373
1374  private static String getServerNameOrEmptyString(final byte[] data) {
1375    try {
1376      return ProtobufUtil.parseServerNameFrom(data).toString();
1377    } catch (DeserializationException e) {
1378      return "";
1379    }
1380  }
1381
1382  /**
1383   * Waits for HBase installation's base (parent) znode to become available.
1384   * @throws IOException on ZK errors
1385   */
1386  public static void waitForBaseZNode(Configuration conf) throws IOException {
1387    LOG.info("Waiting until the base znode is available");
1388    String parentZNode =
1389      conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1390    ZooKeeper zk = new ZooKeeper(ZKConfig.getZKQuorumServersString(conf),
1391      conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT),
1392      EmptyWatcher.instance);
1393
1394    final int maxTimeMs = 10000;
1395    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
1396
1397    KeeperException keeperEx = null;
1398    try {
1399      try {
1400        for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
1401          try {
1402            if (zk.exists(parentZNode, false) != null) {
1403              LOG.info("Parent znode exists: {}", parentZNode);
1404              keeperEx = null;
1405              break;
1406            }
1407          } catch (KeeperException e) {
1408            keeperEx = e;
1409          }
1410          Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
1411        }
1412      } finally {
1413        zk.close();
1414      }
1415    } catch (InterruptedException ex) {
1416      Thread.currentThread().interrupt();
1417    }
1418
1419    if (keeperEx != null) {
1420      throw new IOException(keeperEx);
1421    }
1422  }
1423
1424  /**
1425   * Convert a {@link DeserializationException} to a more palatable {@link KeeperException}. Used
1426   * when can't let a {@link DeserializationException} out w/o changing public API.
1427   * @param e Exception to convert
1428   * @return Converted exception
1429   */
1430  public static KeeperException convert(final DeserializationException e) {
1431    KeeperException ke = new KeeperException.DataInconsistencyException();
1432    ke.initCause(e);
1433    return ke;
1434  }
1435
1436  /**
1437   * Recursively print the current state of ZK (non-transactional)
1438   * @param root name of the root directory in zk to print
1439   */
1440  public static void logZKTree(ZKWatcher zkw, String root) {
1441    if (!LOG.isDebugEnabled()) {
1442      return;
1443    }
1444
1445    LOG.debug("Current zk system:");
1446    String prefix = "|-";
1447    LOG.debug(prefix + root);
1448    try {
1449      logZKTree(zkw, root, prefix);
1450    } catch (KeeperException e) {
1451      throw new RuntimeException(e);
1452    }
1453  }
1454
1455  /**
1456   * Helper method to print the current state of the ZK tree.
1457   * @see #logZKTree(ZKWatcher, String)
1458   * @throws KeeperException if an unexpected exception occurs
1459   */
1460  private static void logZKTree(ZKWatcher zkw, String root, String prefix) throws KeeperException {
1461    List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);
1462
1463    if (children == null) {
1464      return;
1465    }
1466
1467    for (String child : children) {
1468      LOG.debug(prefix + child);
1469      String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child);
1470      logZKTree(zkw, node, prefix + "---");
1471    }
1472  }
1473
1474  /**
1475   * @param position the position to serialize
1476   * @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable
1477   *         for use as content of an wal position in a replication queue.
1478   */
1479  public static byte[] positionToByteArray(final long position) {
1480    byte[] bytes = ReplicationProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
1481      .build().toByteArray();
1482    return ProtobufUtil.prependPBMagic(bytes);
1483  }
1484
1485  /**
1486   * @param bytes - Content of a WAL position znode.
1487   * @return long - The current WAL position.
1488   * @throws DeserializationException if the WAL position cannot be parsed
1489   */
1490  public static long parseWALPositionFrom(final byte[] bytes) throws DeserializationException {
1491    if (bytes == null) {
1492      throw new DeserializationException("Unable to parse null WAL position.");
1493    }
1494    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
1495      int pblen = ProtobufUtil.lengthOfPBMagic();
1496      ReplicationProtos.ReplicationHLogPosition.Builder builder =
1497        ReplicationProtos.ReplicationHLogPosition.newBuilder();
1498      ReplicationProtos.ReplicationHLogPosition position;
1499      try {
1500        ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
1501        position = builder.build();
1502      } catch (IOException e) {
1503        throw new DeserializationException(e);
1504      }
1505      return position.getPosition();
1506    } else {
1507      if (bytes.length > 0) {
1508        return Bytes.toLong(bytes);
1509      }
1510      return 0;
1511    }
1512  }
1513}