001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.Deque; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.stream.Collectors; 029import org.apache.commons.lang3.StringUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.DeserializationException; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.zookeeper.AsyncCallback; 040import org.apache.zookeeper.CreateMode; 041import org.apache.zookeeper.KeeperException; 042import org.apache.zookeeper.KeeperException.NoNodeException; 043import org.apache.zookeeper.Op; 044import org.apache.zookeeper.ZooKeeper; 045import org.apache.zookeeper.data.Stat; 046import org.apache.zookeeper.proto.CreateRequest; 047import org.apache.zookeeper.proto.DeleteRequest; 048import org.apache.zookeeper.proto.SetDataRequest; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 054 055/** 056 * Internal HBase utility class for ZooKeeper. 057 * <p> 058 * Contains only static methods and constants. 059 * <p> 060 * Methods all throw {@link KeeperException} if there is an unexpected zookeeper exception, so 061 * callers of these methods must handle appropriately. If ZK is required for the operation, the 062 * server will need to be aborted. 063 */ 064@InterfaceAudience.Private 065public final class ZKUtil { 066 private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class); 067 068 private ZKUtil() { 069 } 070 071 // 072 // Helper methods 073 // 074 /** 075 * Returns the full path of the immediate parent of the specified node. 076 * @param node path to get parent of 077 * @return parent of path, null if passed the root node or an invalid node 078 */ 079 public static String getParent(String node) { 080 int idx = node.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR); 081 return idx <= 0 ? null : node.substring(0, idx); 082 } 083 084 /** 085 * Get the name of the current node from the specified fully-qualified path. 086 * @param path fully-qualified path 087 * @return name of the current node 088 */ 089 public static String getNodeName(String path) { 090 return path.substring(path.lastIndexOf("/") + 1); 091 } 092 093 // 094 // Existence checks and watches 095 // 096 097 /** 098 * Watch the specified znode for delete/create/change events. The watcher is set whether or not 099 * the node exists. If the node already exists, the method returns true. If the node does not 100 * exist, the method returns false. 101 * @param zkw zk reference 102 * @param znode path of node to watch 103 * @return true if znode exists, false if does not exist or error 104 * @throws KeeperException if unexpected zookeeper exception 105 */ 106 public static boolean watchAndCheckExists(ZKWatcher zkw, String znode) throws KeeperException { 107 try { 108 Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); 109 boolean exists = s != null; 110 if (exists) { 111 LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode)); 112 } else { 113 LOG.debug(zkw.prefix("Set watcher on znode that does not yet exist, " + znode)); 114 } 115 return exists; 116 } catch (KeeperException e) { 117 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); 118 zkw.keeperException(e); 119 return false; 120 } catch (InterruptedException e) { 121 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); 122 zkw.interruptedException(e); 123 return false; 124 } 125 } 126 127 /** 128 * Watch the specified znode, but only if exists. Useful when watching for deletions. Uses 129 * .getData() (and handles NoNodeException) instead of .exists() to accomplish this, as .getData() 130 * will only set a watch if the znode exists. 131 * @param zkw zk reference 132 * @param znode path of node to watch 133 * @return true if the watch is set, false if node does not exists 134 * @throws KeeperException if unexpected zookeeper exception 135 */ 136 public static boolean setWatchIfNodeExists(ZKWatcher zkw, String znode) throws KeeperException { 137 try { 138 zkw.getRecoverableZooKeeper().getData(znode, true, null); 139 return true; 140 } catch (NoNodeException e) { 141 return false; 142 } catch (InterruptedException e) { 143 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); 144 zkw.interruptedException(e); 145 return false; 146 } 147 } 148 149 /** 150 * Check if the specified node exists. Sets no watches. 151 * @param zkw zk reference 152 * @param znode path of node to watch 153 * @return version of the node if it exists, -1 if does not exist 154 * @throws KeeperException if unexpected zookeeper exception 155 */ 156 public static int checkExists(ZKWatcher zkw, String znode) throws KeeperException { 157 try { 158 Stat s = zkw.getRecoverableZooKeeper().exists(znode, null); 159 return s != null ? s.getVersion() : -1; 160 } catch (KeeperException e) { 161 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); 162 zkw.keeperException(e); 163 return -1; 164 } catch (InterruptedException e) { 165 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); 166 zkw.interruptedException(e); 167 return -1; 168 } 169 } 170 171 // 172 // Znode listings 173 // 174 175 /** 176 * Lists the children znodes of the specified znode. Also sets a watch on the specified znode 177 * which will capture a NodeDeleted event on the specified znode as well as NodeChildrenChanged if 178 * any children of the specified znode are created or deleted. Returns null if the specified node 179 * does not exist. Otherwise returns a list of children of the specified node. If the node exists 180 * but it has no children, an empty list will be returned. 181 * @param zkw zk reference 182 * @param znode path of node to list and watch children of 183 * @return list of children of the specified node, an empty list if the node exists but has no 184 * children, and null if the node does not exist 185 * @throws KeeperException if unexpected zookeeper exception 186 */ 187 public static List<String> listChildrenAndWatchForNewChildren(ZKWatcher zkw, String znode) 188 throws KeeperException { 189 try { 190 return zkw.getRecoverableZooKeeper().getChildren(znode, zkw); 191 } catch (KeeperException.NoNodeException ke) { 192 LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " 193 + "because node does not exist (not an error)")); 194 } catch (KeeperException e) { 195 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e); 196 zkw.keeperException(e); 197 } catch (InterruptedException e) { 198 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e); 199 zkw.interruptedException(e); 200 } 201 202 return null; 203 } 204 205 /** 206 * List all the children of the specified znode, setting a watch for children changes and also 207 * setting a watch on every individual child in order to get the NodeCreated and NodeDeleted 208 * events. 209 * @param zkw zookeeper reference 210 * @param znode node to get children of and watch 211 * @return list of znode names, null if the node doesn't exist 212 * @throws KeeperException if a ZooKeeper operation fails 213 */ 214 public static List<String> listChildrenAndWatchThem(ZKWatcher zkw, String znode) 215 throws KeeperException { 216 List<String> children = listChildrenAndWatchForNewChildren(zkw, znode); 217 if (children == null) { 218 return null; 219 } 220 for (String child : children) { 221 watchAndCheckExists(zkw, ZNodePaths.joinZNode(znode, child)); 222 } 223 return children; 224 } 225 226 /** 227 * Lists the children of the specified znode without setting any watches. Sets no watches at all, 228 * this method is best effort. Returns an empty list if the node has no children. Returns null if 229 * the parent node itself does not exist. 230 * @param zkw zookeeper reference 231 * @param znode node to get children 232 * @return list of data of children of specified znode, empty if no children, null if parent does 233 * not exist 234 * @throws KeeperException if unexpected zookeeper exception 235 */ 236 public static List<String> listChildrenNoWatch(ZKWatcher zkw, String znode) 237 throws KeeperException { 238 List<String> children = null; 239 try { 240 // List the children without watching 241 children = zkw.getRecoverableZooKeeper().getChildren(znode, null); 242 } catch (KeeperException.NoNodeException nne) { 243 return null; 244 } catch (InterruptedException ie) { 245 zkw.interruptedException(ie); 246 } 247 return children; 248 } 249 250 /** 251 * Simple class to hold a node path and node data. 252 * @deprecated Unused 253 */ 254 @Deprecated 255 public static class NodeAndData { 256 private String node; 257 private byte[] data; 258 259 public NodeAndData(String node, byte[] data) { 260 this.node = node; 261 this.data = data; 262 } 263 264 public String getNode() { 265 return node; 266 } 267 268 public byte[] getData() { 269 return data; 270 } 271 272 @Override 273 public String toString() { 274 return node; 275 } 276 277 public boolean isEmpty() { 278 return (data == null || data.length == 0); 279 } 280 } 281 282 /** 283 * Checks if the specified znode has any children. Sets no watches. Returns true if the node 284 * exists and has children. Returns false if the node does not exist or if the node does not have 285 * any children. Used during master initialization to determine if the master is a failed-over-to 286 * master or the first master during initial cluster startup. If the directory for regionserver 287 * ephemeral nodes is empty then this is a cluster startup, if not then it is not cluster startup. 288 * @param zkw zk reference 289 * @param znode path of node to check for children of 290 * @return true if node has children, false if not or node does not exist 291 * @throws KeeperException if unexpected zookeeper exception 292 */ 293 public static boolean nodeHasChildren(ZKWatcher zkw, String znode) throws KeeperException { 294 try { 295 return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty(); 296 } catch (KeeperException.NoNodeException ke) { 297 LOG.debug(zkw.prefix("Unable to list children of znode " + znode 298 + " because node does not exist (not an error)")); 299 return false; 300 } catch (KeeperException e) { 301 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e); 302 zkw.keeperException(e); 303 return false; 304 } catch (InterruptedException e) { 305 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e); 306 zkw.interruptedException(e); 307 return false; 308 } 309 } 310 311 /** 312 * Get the number of children of the specified node. If the node does not exist or has no 313 * children, returns 0. Sets no watches at all. 314 * @param zkw zk reference 315 * @param znode path of node to count children of 316 * @return number of children of specified node, 0 if none or parent does not exist 317 * @throws KeeperException if unexpected zookeeper exception 318 */ 319 public static int getNumberOfChildren(ZKWatcher zkw, String znode) throws KeeperException { 320 try { 321 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null); 322 return stat == null ? 0 : stat.getNumChildren(); 323 } catch (KeeperException e) { 324 LOG.warn(zkw.prefix("Unable to get children of node " + znode)); 325 zkw.keeperException(e); 326 } catch (InterruptedException e) { 327 zkw.interruptedException(e); 328 } 329 return 0; 330 } 331 332 // 333 // Data retrieval 334 // 335 336 /** 337 * Get znode data. Does not set a watcher. 338 * @return ZNode data, null if the node does not exist or if there is an error. 339 */ 340 public static byte[] getData(ZKWatcher zkw, String znode) 341 throws KeeperException, InterruptedException { 342 try { 343 byte[] data = zkw.getRecoverableZooKeeper().getData(znode, null, null); 344 logRetrievedMsg(zkw, znode, data, false); 345 return data; 346 } catch (KeeperException.NoNodeException e) { 347 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " 348 + "because node does not exist (not an error)")); 349 return null; 350 } catch (KeeperException e) { 351 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 352 zkw.keeperException(e); 353 return null; 354 } 355 } 356 357 /** 358 * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the 359 * node exists. Returns null and no watch is set if the node does not exist or there is an 360 * exception. 361 * @param zkw zk reference 362 * @param znode path of node 363 * @return data of the specified znode, or null 364 * @throws KeeperException if unexpected zookeeper exception 365 */ 366 public static byte[] getDataAndWatch(ZKWatcher zkw, String znode) throws KeeperException { 367 return getDataInternal(zkw, znode, null, true, true); 368 } 369 370 /** 371 * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the 372 * node exists. Returns null and no watch is set if the node does not exist or there is an 373 * exception. 374 * @param zkw zk reference 375 * @param znode path of node 376 * @param throwOnInterrupt if false then just interrupt the thread, do not throw exception 377 * @return data of the specified znode, or null 378 * @throws KeeperException if unexpected zookeeper exception 379 */ 380 public static byte[] getDataAndWatch(ZKWatcher zkw, String znode, boolean throwOnInterrupt) 381 throws KeeperException { 382 return getDataInternal(zkw, znode, null, true, throwOnInterrupt); 383 } 384 385 /** 386 * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the 387 * node exists. Returns null and no watch is set if the node does not exist or there is an 388 * exception. 389 * @param zkw zk reference 390 * @param znode path of node 391 * @param stat object to populate the version of the znode 392 * @return data of the specified znode, or null 393 * @throws KeeperException if unexpected zookeeper exception 394 */ 395 public static byte[] getDataAndWatch(ZKWatcher zkw, String znode, Stat stat) 396 throws KeeperException { 397 return getDataInternal(zkw, znode, stat, true, true); 398 } 399 400 private static byte[] getDataInternal(ZKWatcher zkw, String znode, Stat stat, boolean watcherSet, 401 boolean throwOnInterrupt) throws KeeperException { 402 try { 403 byte[] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, stat); 404 logRetrievedMsg(zkw, znode, data, watcherSet); 405 return data; 406 } catch (KeeperException.NoNodeException e) { 407 // This log can get pretty annoying when we cycle on 100ms waits. 408 // Enable trace if you really want to see it. 409 LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " " 410 + "because node does not exist (not an error)")); 411 return null; 412 } catch (KeeperException e) { 413 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 414 zkw.keeperException(e); 415 return null; 416 } catch (InterruptedException e) { 417 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 418 if (throwOnInterrupt) { 419 zkw.interruptedException(e); 420 } else { 421 zkw.interruptedExceptionNoThrow(e, true); 422 } 423 return null; 424 } 425 } 426 427 /** 428 * Get the data at the specified znode without setting a watch. Returns the data if the node 429 * exists. Returns null if the node does not exist. Sets the stats of the node in the passed Stat 430 * object. Pass a null stat if not interested. 431 * @param zkw zk reference 432 * @param znode path of node 433 * @param stat node status to get if node exists 434 * @return data of the specified znode, or null if node does not exist 435 * @throws KeeperException if unexpected zookeeper exception 436 */ 437 public static byte[] getDataNoWatch(ZKWatcher zkw, String znode, Stat stat) 438 throws KeeperException { 439 try { 440 byte[] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat); 441 logRetrievedMsg(zkw, znode, data, false); 442 return data; 443 } catch (KeeperException.NoNodeException e) { 444 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " 445 + "because node does not exist (not necessarily an error)")); 446 return null; 447 } catch (KeeperException e) { 448 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 449 zkw.keeperException(e); 450 return null; 451 } catch (InterruptedException e) { 452 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 453 zkw.interruptedException(e); 454 return null; 455 } 456 } 457 458 /** 459 * Returns the date of child znodes of the specified znode. Also sets a watch on the specified 460 * znode which will capture a NodeDeleted event on the specified znode as well as 461 * NodeChildrenChanged if any children of the specified znode are created or deleted. Returns null 462 * if the specified node does not exist. Otherwise returns a list of children of the specified 463 * node. If the node exists but it has no children, an empty list will be returned. 464 * @param zkw zk reference 465 * @param baseNode path of node to list and watch children of 466 * @return list of data of children of the specified node, an empty list if the node exists but 467 * has no children, and null if the node does not exist 468 * @throws KeeperException if unexpected zookeeper exception 469 * @deprecated Unused 470 */ 471 @Deprecated 472 public static List<NodeAndData> getChildDataAndWatchForNewChildren(ZKWatcher zkw, String baseNode) 473 throws KeeperException { 474 return getChildDataAndWatchForNewChildren(zkw, baseNode, true); 475 } 476 477 /** 478 * Returns the date of child znodes of the specified znode. Also sets a watch on the specified 479 * znode which will capture a NodeDeleted event on the specified znode as well as 480 * NodeChildrenChanged if any children of the specified znode are created or deleted. Returns null 481 * if the specified node does not exist. Otherwise returns a list of children of the specified 482 * node. If the node exists but it has no children, an empty list will be returned. 483 * @param zkw zk reference 484 * @param baseNode path of node to list and watch children of 485 * @param throwOnInterrupt if true then just interrupt the thread, do not throw exception 486 * @return list of data of children of the specified node, an empty list if the node exists but 487 * has no children, and null if the node does not exist 488 * @throws KeeperException if unexpected zookeeper exception 489 * @deprecated Unused 490 */ 491 @Deprecated 492 public static List<NodeAndData> getChildDataAndWatchForNewChildren(ZKWatcher zkw, String baseNode, 493 boolean throwOnInterrupt) throws KeeperException { 494 List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode); 495 if (nodes != null) { 496 List<NodeAndData> newNodes = new ArrayList<>(); 497 for (String node : nodes) { 498 if (Thread.interrupted()) { 499 // Partial data should not be processed. Cancel processing by sending empty list. 500 return Collections.emptyList(); 501 } 502 String nodePath = ZNodePaths.joinZNode(baseNode, node); 503 byte[] data = ZKUtil.getDataAndWatch(zkw, nodePath, throwOnInterrupt); 504 newNodes.add(new NodeAndData(nodePath, data)); 505 } 506 return newNodes; 507 } 508 return null; 509 } 510 511 /** 512 * Update the data of an existing node with the expected version to have the specified data. 513 * Throws an exception if there is a version mismatch or some other problem. Sets no watches under 514 * any conditions. 515 * @param zkw zk reference 516 * @param znode the path to the ZNode 517 * @param data the data to store in ZooKeeper 518 * @param expectedVersion the expected version 519 * @throws KeeperException if unexpected zookeeper exception 520 * @throws KeeperException.BadVersionException if version mismatch 521 * @deprecated Unused 522 */ 523 @Deprecated 524 public static void updateExistingNodeData(ZKWatcher zkw, String znode, byte[] data, 525 int expectedVersion) throws KeeperException { 526 try { 527 zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion); 528 } catch (InterruptedException ie) { 529 zkw.interruptedException(ie); 530 } 531 } 532 533 // 534 // Data setting 535 // 536 537 /** 538 * Sets the data of the existing znode to be the specified data. Ensures that the current data has 539 * the specified expected version. 540 * <p> 541 * If the node does not exist, a {@link NoNodeException} will be thrown. 542 * <p> 543 * If their is a version mismatch, method returns null. 544 * <p> 545 * No watches are set but setting data will trigger other watchers of this node. 546 * <p> 547 * If there is another problem, a KeeperException will be thrown. 548 * @param zkw zk reference 549 * @param znode path of node 550 * @param data data to set for node 551 * @param expectedVersion version expected when setting data 552 * @return true if data set, false if version mismatch 553 * @throws KeeperException if unexpected zookeeper exception 554 */ 555 public static boolean setData(ZKWatcher zkw, String znode, byte[] data, int expectedVersion) 556 throws KeeperException, KeeperException.NoNodeException { 557 try { 558 return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null; 559 } catch (InterruptedException e) { 560 zkw.interruptedException(e); 561 return false; 562 } 563 } 564 565 /** 566 * Set data into node creating node if it doesn't yet exist. Does not set watch. 567 * @param zkw zk reference 568 * @param znode path of node 569 * @param data data to set for node 570 * @throws KeeperException if a ZooKeeper operation fails 571 */ 572 public static void createSetData(final ZKWatcher zkw, final String znode, final byte[] data) 573 throws KeeperException { 574 if (checkExists(zkw, znode) == -1) { 575 ZKUtil.createWithParents(zkw, znode, data); 576 } else { 577 ZKUtil.setData(zkw, znode, data); 578 } 579 } 580 581 /** 582 * Sets the data of the existing znode to be the specified data. The node must exist but no checks 583 * are done on the existing data or version. 584 * <p> 585 * If the node does not exist, a {@link NoNodeException} will be thrown. 586 * <p> 587 * No watches are set but setting data will trigger other watchers of this node. 588 * <p> 589 * If there is another problem, a KeeperException will be thrown. 590 * @param zkw zk reference 591 * @param znode path of node 592 * @param data data to set for node 593 * @throws KeeperException if unexpected zookeeper exception 594 */ 595 public static void setData(ZKWatcher zkw, String znode, byte[] data) 596 throws KeeperException, KeeperException.NoNodeException { 597 setData(zkw, (SetData) ZKUtilOp.setData(znode, data)); 598 } 599 600 private static void setData(ZKWatcher zkw, SetData setData) 601 throws KeeperException, KeeperException.NoNodeException { 602 SetDataRequest sd = (SetDataRequest) toZooKeeperOp(zkw, setData).toRequestRecord(); 603 setData(zkw, sd.getPath(), sd.getData(), sd.getVersion()); 604 } 605 606 // 607 // Node creation 608 // 609 610 /** 611 * Set the specified znode to be an ephemeral node carrying the specified data. If the node is 612 * created successfully, a watcher is also set on the node. If the node is not created 613 * successfully because it already exists, this method will also set a watcher on the node. If 614 * there is another problem, a KeeperException will be thrown. 615 * @param zkw zk reference 616 * @param znode path of node 617 * @param data data of node 618 * @return true if node created, false if not, watch set in both cases 619 * @throws KeeperException if unexpected zookeeper exception 620 */ 621 public static boolean createEphemeralNodeAndWatch(ZKWatcher zkw, String znode, byte[] data) 622 throws KeeperException { 623 boolean ret = true; 624 try { 625 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), CreateMode.EPHEMERAL); 626 } catch (KeeperException.NodeExistsException nee) { 627 ret = false; 628 } catch (InterruptedException e) { 629 LOG.info("Interrupted", e); 630 Thread.currentThread().interrupt(); 631 } 632 if (!watchAndCheckExists(zkw, znode)) { 633 // It did exist but now it doesn't, try again 634 return createEphemeralNodeAndWatch(zkw, znode, data); 635 } 636 return ret; 637 } 638 639 /** 640 * Creates the specified znode to be a persistent node carrying the specified data. Returns true 641 * if the node was successfully created, false if the node already existed. If the node is created 642 * successfully, a watcher is also set on the node. If the node is not created successfully 643 * because it already exists, this method will also set a watcher on the node but return false. If 644 * there is another problem, a KeeperException will be thrown. 645 * @param zkw zk reference 646 * @param znode path of node 647 * @param data data of node 648 * @return true if node created, false if not, watch set in both cases 649 * @throws KeeperException if unexpected zookeeper exception 650 */ 651 public static boolean createNodeIfNotExistsAndWatch(ZKWatcher zkw, String znode, byte[] data) 652 throws KeeperException { 653 boolean ret = true; 654 try { 655 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), 656 CreateMode.PERSISTENT); 657 } catch (KeeperException.NodeExistsException nee) { 658 ret = false; 659 } catch (InterruptedException e) { 660 zkw.interruptedException(e); 661 return false; 662 } 663 try { 664 zkw.getRecoverableZooKeeper().exists(znode, zkw); 665 } catch (InterruptedException e) { 666 zkw.interruptedException(e); 667 return false; 668 } 669 return ret; 670 } 671 672 /** 673 * Creates the specified znode with the specified data but does not watch it. Returns the znode of 674 * the newly created node If there is another problem, a KeeperException will be thrown. 675 * @param zkw zk reference 676 * @param znode path of node 677 * @param data data of node 678 * @param createMode specifying whether the node to be created is ephemeral and/or sequential 679 * @return true name of the newly created znode or null 680 * @throws KeeperException if unexpected zookeeper exception 681 */ 682 public static String createNodeIfNotExistsNoWatch(ZKWatcher zkw, String znode, byte[] data, 683 CreateMode createMode) throws KeeperException { 684 try { 685 return zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), createMode); 686 } catch (KeeperException.NodeExistsException nee) { 687 return znode; 688 } catch (InterruptedException e) { 689 zkw.interruptedException(e); 690 return null; 691 } 692 } 693 694 /** 695 * Creates the specified node with the specified data and watches it. 696 * <p> 697 * Throws an exception if the node already exists. 698 * <p> 699 * The node created is persistent and open access. 700 * <p> 701 * Returns the version number of the created node if successful. 702 * @param zkw zk reference 703 * @param znode path of node to create 704 * @param data data of node to create 705 * @return version of node created 706 * @throws KeeperException if unexpected zookeeper exception 707 * @throws KeeperException.NodeExistsException if node already exists 708 */ 709 public static int createAndWatch(ZKWatcher zkw, String znode, byte[] data) 710 throws KeeperException, KeeperException.NodeExistsException { 711 try { 712 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), 713 CreateMode.PERSISTENT); 714 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw); 715 if (stat == null) { 716 // Likely a race condition. Someone deleted the znode. 717 throw KeeperException.create(KeeperException.Code.SYSTEMERROR, 718 "ZK.exists returned null (i.e.: znode does not exist) for znode=" + znode); 719 } 720 721 return stat.getVersion(); 722 } catch (InterruptedException e) { 723 zkw.interruptedException(e); 724 return -1; 725 } 726 } 727 728 /** 729 * Async creates the specified node with the specified data. 730 * <p> 731 * Throws an exception if the node already exists. 732 * <p> 733 * The node created is persistent and open access. 734 * @param zkw zk reference 735 * @param znode path of node to create 736 * @param data data of node to create 737 * @param cb the callback to use for the creation 738 * @param ctx the context to use for the creation 739 */ 740 public static void asyncCreate(ZKWatcher zkw, String znode, byte[] data, 741 final AsyncCallback.StringCallback cb, final Object ctx) { 742 zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, zkw.createACL(znode), 743 CreateMode.PERSISTENT, cb, ctx); 744 } 745 746 /** 747 * Creates the specified node, iff the node does not exist. Does not set a watch and fails 748 * silently if the node already exists. The node created is persistent and open access. 749 * @param zkw zk reference 750 * @param znode path of node 751 * @throws KeeperException if unexpected zookeeper exception 752 */ 753 public static void createAndFailSilent(ZKWatcher zkw, String znode) throws KeeperException { 754 createAndFailSilent(zkw, znode, new byte[0]); 755 } 756 757 /** 758 * Creates the specified node containing specified data, iff the node does not exist. Does not set 759 * a watch and fails silently if the node already exists. The node created is persistent and open 760 * access. 761 * @param zkw zk reference 762 * @param znode path of node 763 * @param data a byte array data to store in the znode 764 * @throws KeeperException if unexpected zookeeper exception 765 */ 766 public static void createAndFailSilent(ZKWatcher zkw, String znode, byte[] data) 767 throws KeeperException { 768 createAndFailSilent(zkw, (CreateAndFailSilent) ZKUtilOp.createAndFailSilent(znode, data)); 769 } 770 771 private static void createAndFailSilent(ZKWatcher zkw, CreateAndFailSilent cafs) 772 throws KeeperException { 773 CreateRequest create = (CreateRequest) toZooKeeperOp(zkw, cafs).toRequestRecord(); 774 String znode = create.getPath(); 775 try { 776 RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper(); 777 if (zk.exists(znode, false) == null) { 778 zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags())); 779 } 780 } catch (KeeperException.NodeExistsException nee) { 781 // pass 782 } catch (KeeperException.NoAuthException nee) { 783 try { 784 if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) { 785 // If we failed to create the file and it does not already exist. 786 throw (nee); 787 } 788 } catch (InterruptedException ie) { 789 zkw.interruptedException(ie); 790 } 791 } catch (InterruptedException ie) { 792 zkw.interruptedException(ie); 793 } 794 } 795 796 /** 797 * Creates the specified node and all parent nodes required for it to exist. No watches are set 798 * and no errors are thrown if the node already exists. The nodes created are persistent and open 799 * access. 800 * @param zkw zk reference 801 * @param znode path of node 802 * @throws KeeperException if unexpected zookeeper exception 803 */ 804 public static void createWithParents(ZKWatcher zkw, String znode) throws KeeperException { 805 createWithParents(zkw, znode, new byte[0]); 806 } 807 808 /** 809 * Creates the specified node and all parent nodes required for it to exist. The creation of 810 * parent znodes is not atomic with the leafe znode creation but the data is written atomically 811 * when the leaf node is created. No watches are set and no errors are thrown if the node already 812 * exists. The nodes created are persistent and open access. 813 * @param zkw zk reference 814 * @param znode path of node 815 * @throws KeeperException if unexpected zookeeper exception 816 */ 817 public static void createWithParents(ZKWatcher zkw, String znode, byte[] data) 818 throws KeeperException { 819 try { 820 if (znode == null) { 821 return; 822 } 823 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), 824 CreateMode.PERSISTENT); 825 } catch (KeeperException.NodeExistsException nee) { 826 return; 827 } catch (KeeperException.NoNodeException nne) { 828 createWithParents(zkw, getParent(znode)); 829 createWithParents(zkw, znode, data); 830 } catch (InterruptedException ie) { 831 zkw.interruptedException(ie); 832 } 833 } 834 835 // 836 // Deletes 837 // 838 839 /** 840 * Delete the specified node. Sets no watches. Throws all exceptions. 841 */ 842 public static void deleteNode(ZKWatcher zkw, String node) throws KeeperException { 843 deleteNode(zkw, node, -1); 844 } 845 846 /** 847 * Delete the specified node with the specified version. Sets no watches. Throws all exceptions. 848 */ 849 public static boolean deleteNode(ZKWatcher zkw, String node, int version) throws KeeperException { 850 try { 851 zkw.getRecoverableZooKeeper().delete(node, version); 852 return true; 853 } catch (KeeperException.BadVersionException bve) { 854 return false; 855 } catch (InterruptedException ie) { 856 zkw.interruptedException(ie); 857 return false; 858 } 859 } 860 861 /** 862 * Deletes the specified node. Fails silent if the node does not exist. 863 * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation 864 * @param node the node to delete 865 * @throws KeeperException if a ZooKeeper operation fails 866 */ 867 public static void deleteNodeFailSilent(ZKWatcher zkw, String node) throws KeeperException { 868 deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) ZKUtilOp.deleteNodeFailSilent(node)); 869 } 870 871 private static void deleteNodeFailSilent(ZKWatcher zkw, DeleteNodeFailSilent dnfs) 872 throws KeeperException { 873 DeleteRequest delete = (DeleteRequest) toZooKeeperOp(zkw, dnfs).toRequestRecord(); 874 try { 875 zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion()); 876 } catch (KeeperException.NoNodeException nne) { 877 } catch (InterruptedException ie) { 878 zkw.interruptedException(ie); 879 } 880 } 881 882 /** 883 * Delete the specified node and all of it's children. 884 * <p> 885 * If the node does not exist, just returns. 886 * <p> 887 * Sets no watches. Throws all exceptions besides dealing with deletion of children. 888 */ 889 public static void deleteNodeRecursively(ZKWatcher zkw, String node) throws KeeperException { 890 deleteNodeRecursivelyMultiOrSequential(zkw, true, node); 891 } 892 893 /** 894 * Delete all the children of the specified node but not the node itself. Sets no watches. Throws 895 * all exceptions besides dealing with deletion of children. 896 * @throws KeeperException if a ZooKeeper operation fails 897 */ 898 public static void deleteChildrenRecursively(ZKWatcher zkw, String node) throws KeeperException { 899 deleteChildrenRecursivelyMultiOrSequential(zkw, true, node); 900 } 901 902 /** 903 * Delete all the children of the specified node but not the node itself. This will first traverse 904 * the znode tree for listing the children and then delete these znodes using multi-update api or 905 * sequential based on the specified configurations. 906 * <p> 907 * Sets no watches. Throws all exceptions besides dealing with deletion of children. 908 * <p> 909 * If the following is true: 910 * <ul> 911 * <li>runSequentialOnMultiFailure is true 912 * </ul> 913 * on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*), we 914 * retry the operations one-by-one (sequentially). n * - zk reference n * - if true when we get a 915 * ZooKeeper exception that could retry the operations one-by-one (sequentially) n * - path of the 916 * parent node(s) 917 * @throws KeeperException.NotEmptyException if node has children while deleting n * if unexpected 918 * ZooKeeper exception n * if an invalid path is 919 * specified 920 */ 921 public static void deleteChildrenRecursivelyMultiOrSequential(ZKWatcher zkw, 922 boolean runSequentialOnMultiFailure, String... pathRoots) throws KeeperException { 923 if (pathRoots == null || pathRoots.length <= 0) { 924 LOG.warn("Given path is not valid!"); 925 return; 926 } 927 List<ZKUtilOp> ops = new ArrayList<>(); 928 for (String eachRoot : pathRoots) { 929 List<String> children = listChildrenBFSNoWatch(zkw, eachRoot); 930 // Delete the leaves first and eventually get rid of the root 931 for (int i = children.size() - 1; i >= 0; --i) { 932 ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i))); 933 } 934 } 935 submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); 936 } 937 938 /** 939 * Delete the specified node and its children. This traverse the znode tree for listing the 940 * children and then delete these znodes including the parent using multi-update api or sequential 941 * based on the specified configurations. 942 * <p> 943 * Sets no watches. Throws all exceptions besides dealing with deletion of children. 944 * <p> 945 * If the following is true: 946 * <ul> 947 * <li>runSequentialOnMultiFailure is true 948 * </ul> 949 * on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*), we 950 * retry the operations one-by-one (sequentially). n * - zk reference n * - if true when we get a 951 * ZooKeeper exception that could retry the operations one-by-one (sequentially) n * - path of the 952 * parent node(s) 953 * @throws KeeperException.NotEmptyException if node has children while deleting n * if unexpected 954 * ZooKeeper exception n * if an invalid path is 955 * specified 956 */ 957 public static void deleteNodeRecursivelyMultiOrSequential(ZKWatcher zkw, 958 boolean runSequentialOnMultiFailure, String... pathRoots) throws KeeperException { 959 if (pathRoots == null || pathRoots.length <= 0) { 960 LOG.warn("Given path is not valid!"); 961 return; 962 } 963 List<ZKUtilOp> ops = new ArrayList<>(); 964 for (String eachRoot : pathRoots) { 965 // ZooKeeper Watches are one time triggers; When children of parent nodes are deleted 966 // recursively, must set another watch, get notified of delete node 967 List<String> children = listChildrenBFSAndWatchThem(zkw, eachRoot); 968 // Delete the leaves first and eventually get rid of the root 969 for (int i = children.size() - 1; i >= 0; --i) { 970 ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i))); 971 } 972 try { 973 if (zkw.getRecoverableZooKeeper().exists(eachRoot, zkw) != null) { 974 ops.add(ZKUtilOp.deleteNodeFailSilent(eachRoot)); 975 } 976 } catch (InterruptedException e) { 977 zkw.interruptedException(e); 978 } 979 } 980 submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); 981 } 982 983 /** 984 * Chunks the provided {@code ops} when their approximate size exceeds the the configured limit. 985 * Take caution that this can ONLY be used for operations where atomicity is not important, e.g. 986 * deletions. It must not be used when atomicity of the operations is critical. 987 * @param zkw reference to the {@link ZKWatcher} which contains 988 * configuration and constants 989 * @param runSequentialOnMultiFailure if true when we get a ZooKeeper exception that could retry 990 * the operations one-by-one (sequentially) 991 * @param ops list of ZKUtilOp {@link ZKUtilOp} to partition while 992 * submitting batched multi or sequential 993 * @throws KeeperException unexpected ZooKeeper Exception / Zookeeper unreachable 994 */ 995 private static void submitBatchedMultiOrSequential(ZKWatcher zkw, 996 boolean runSequentialOnMultiFailure, List<ZKUtilOp> ops) throws KeeperException { 997 // at least one element should exist 998 if (ops.isEmpty()) { 999 return; 1000 } 1001 final int maxMultiSize = zkw.getRecoverableZooKeeper().getMaxMultiSizeLimit(); 1002 // Batch up the items to over smashing through jute.maxbuffer with too many Ops. 1003 final List<List<ZKUtilOp>> batchedOps = partitionOps(ops, maxMultiSize); 1004 // Would use forEach() but have to handle KeeperException 1005 for (List<ZKUtilOp> batch : batchedOps) { 1006 multiOrSequential(zkw, batch, runSequentialOnMultiFailure); 1007 } 1008 } 1009 1010 /** 1011 * Partition the list of {@code ops} by size (using {@link #estimateSize(ZKUtilOp)}). 1012 */ 1013 static List<List<ZKUtilOp>> partitionOps(List<ZKUtilOp> ops, int maxPartitionSize) { 1014 List<List<ZKUtilOp>> partitionedOps = new ArrayList<>(); 1015 List<ZKUtilOp> currentPartition = new ArrayList<>(); 1016 int currentPartitionSize = 0; 1017 partitionedOps.add(currentPartition); 1018 Iterator<ZKUtilOp> iter = ops.iterator(); 1019 while (iter.hasNext()) { 1020 ZKUtilOp currentOp = iter.next(); 1021 int currentOpSize = estimateSize(currentOp); 1022 1023 // Roll a new partition if necessary 1024 // If the current partition is empty, put the element in there anyways. 1025 // We can roll a new partition if we get another element 1026 if (!currentPartition.isEmpty() && currentOpSize + currentPartitionSize > maxPartitionSize) { 1027 currentPartition = new ArrayList<>(); 1028 partitionedOps.add(currentPartition); 1029 currentPartitionSize = 0; 1030 } 1031 1032 // Add the current op to the partition 1033 currentPartition.add(currentOp); 1034 // And record its size 1035 currentPartitionSize += currentOpSize; 1036 } 1037 return partitionedOps; 1038 } 1039 1040 static int estimateSize(ZKUtilOp op) { 1041 return Bytes.toBytes(op.getPath()).length; 1042 } 1043 1044 /** 1045 * BFS Traversal of all the children under path, with the entries in the list, in the same order 1046 * as that of the traversal. Lists all the children without setting any watches. n * - zk 1047 * reference n * - path of node 1048 * @return list of children znodes under the path n * if unexpected ZooKeeper exception 1049 */ 1050 private static List<String> listChildrenBFSNoWatch(ZKWatcher zkw, final String znode) 1051 throws KeeperException { 1052 Deque<String> queue = new LinkedList<>(); 1053 List<String> tree = new ArrayList<>(); 1054 queue.add(znode); 1055 while (true) { 1056 String node = queue.pollFirst(); 1057 if (node == null) { 1058 break; 1059 } 1060 List<String> children = listChildrenNoWatch(zkw, node); 1061 if (children == null) { 1062 continue; 1063 } 1064 for (final String child : children) { 1065 final String childPath = node + "/" + child; 1066 queue.add(childPath); 1067 tree.add(childPath); 1068 } 1069 } 1070 return tree; 1071 } 1072 1073 /** 1074 * BFS Traversal of all the children under path, with the entries in the list, in the same order 1075 * as that of the traversal. Lists all the children and set watches on to them. n * - zk reference 1076 * n * - path of node 1077 * @return list of children znodes under the path n * if unexpected ZooKeeper exception 1078 */ 1079 private static List<String> listChildrenBFSAndWatchThem(ZKWatcher zkw, final String znode) 1080 throws KeeperException { 1081 Deque<String> queue = new LinkedList<>(); 1082 List<String> tree = new ArrayList<>(); 1083 queue.add(znode); 1084 while (true) { 1085 String node = queue.pollFirst(); 1086 if (node == null) { 1087 break; 1088 } 1089 List<String> children = listChildrenAndWatchThem(zkw, node); 1090 if (children == null) { 1091 continue; 1092 } 1093 for (final String child : children) { 1094 final String childPath = node + "/" + child; 1095 queue.add(childPath); 1096 tree.add(childPath); 1097 } 1098 } 1099 return tree; 1100 } 1101 1102 /** 1103 * Represents an action taken by ZKUtil, e.g. createAndFailSilent. These actions are higher-level 1104 * than ZKOp actions, which represent individual actions in the ZooKeeper API, like create. 1105 */ 1106 public abstract static class ZKUtilOp { 1107 private String path; 1108 1109 @Override 1110 public String toString() { 1111 return this.getClass().getSimpleName() + ", path=" + this.path; 1112 } 1113 1114 private ZKUtilOp(String path) { 1115 this.path = path; 1116 } 1117 1118 /** Returns a createAndFailSilent ZKUtilOp */ 1119 public static ZKUtilOp createAndFailSilent(String path, byte[] data) { 1120 return new CreateAndFailSilent(path, data); 1121 } 1122 1123 /** Returns a deleteNodeFailSilent ZKUtilOP */ 1124 public static ZKUtilOp deleteNodeFailSilent(String path) { 1125 return new DeleteNodeFailSilent(path); 1126 } 1127 1128 /** Returns a setData ZKUtilOp */ 1129 public static ZKUtilOp setData(String path, byte[] data) { 1130 return new SetData(path, data); 1131 } 1132 1133 /** Returns a setData ZKUtilOp */ 1134 public static ZKUtilOp setData(String path, byte[] data, int version) { 1135 return new SetData(path, data, version); 1136 } 1137 1138 /** Returns path to znode where the ZKOp will occur */ 1139 public String getPath() { 1140 return path; 1141 } 1142 1143 /** 1144 * ZKUtilOp representing createAndFailSilent in ZooKeeper (attempt to create node, ignore error 1145 * if already exists) 1146 */ 1147 public static final class CreateAndFailSilent extends ZKUtilOp { 1148 private byte[] data; 1149 1150 private CreateAndFailSilent(String path, byte[] data) { 1151 super(path); 1152 this.data = data; 1153 } 1154 1155 public byte[] getData() { 1156 return data; 1157 } 1158 1159 @Override 1160 public boolean equals(Object o) { 1161 if (this == o) { 1162 return true; 1163 } 1164 if (!(o instanceof CreateAndFailSilent)) { 1165 return false; 1166 } 1167 1168 CreateAndFailSilent op = (CreateAndFailSilent) o; 1169 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data); 1170 } 1171 1172 @Override 1173 public int hashCode() { 1174 int ret = 17 + getPath().hashCode() * 31; 1175 return ret * 31 + Bytes.hashCode(data); 1176 } 1177 } 1178 1179 /** 1180 * ZKUtilOp representing deleteNodeFailSilent in ZooKeeper (attempt to delete node, ignore error 1181 * if node doesn't exist) 1182 */ 1183 public static final class DeleteNodeFailSilent extends ZKUtilOp { 1184 private DeleteNodeFailSilent(String path) { 1185 super(path); 1186 } 1187 1188 @Override 1189 public boolean equals(Object o) { 1190 if (this == o) { 1191 return true; 1192 } 1193 if (!(o instanceof DeleteNodeFailSilent)) { 1194 return false; 1195 } 1196 1197 return super.equals(o); 1198 } 1199 1200 @Override 1201 public int hashCode() { 1202 return getPath().hashCode(); 1203 } 1204 } 1205 1206 /** 1207 * ZKUtilOp representing setData in ZooKeeper 1208 */ 1209 public static final class SetData extends ZKUtilOp { 1210 private byte[] data; 1211 private int version = -1; 1212 1213 private SetData(String path, byte[] data) { 1214 super(path); 1215 this.data = data; 1216 } 1217 1218 private SetData(String path, byte[] data, int version) { 1219 super(path); 1220 this.data = data; 1221 this.version = version; 1222 } 1223 1224 public byte[] getData() { 1225 return data; 1226 } 1227 1228 public int getVersion() { 1229 return version; 1230 } 1231 1232 @Override 1233 public boolean equals(Object o) { 1234 if (this == o) { 1235 return true; 1236 } 1237 if (!(o instanceof SetData)) { 1238 return false; 1239 } 1240 1241 SetData op = (SetData) o; 1242 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data) 1243 && getVersion() == op.getVersion(); 1244 } 1245 1246 @Override 1247 public int hashCode() { 1248 int ret = getPath().hashCode(); 1249 ret = ret * 31 + Bytes.hashCode(data); 1250 return ret * 31 + Integer.hashCode(version); 1251 } 1252 } 1253 } 1254 1255 /** 1256 * Convert from ZKUtilOp to ZKOp 1257 */ 1258 private static Op toZooKeeperOp(ZKWatcher zkw, ZKUtilOp op) throws UnsupportedOperationException { 1259 if (op == null) { 1260 return null; 1261 } 1262 1263 if (op instanceof CreateAndFailSilent) { 1264 CreateAndFailSilent cafs = (CreateAndFailSilent) op; 1265 return Op.create(cafs.getPath(), cafs.getData(), zkw.createACL(cafs.getPath()), 1266 CreateMode.PERSISTENT); 1267 } else if (op instanceof DeleteNodeFailSilent) { 1268 DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent) op; 1269 return Op.delete(dnfs.getPath(), -1); 1270 } else if (op instanceof SetData) { 1271 SetData sd = (SetData) op; 1272 return Op.setData(sd.getPath(), sd.getData(), sd.getVersion()); 1273 } else { 1274 throw new UnsupportedOperationException( 1275 "Unexpected ZKUtilOp type: " + op.getClass().getName()); 1276 } 1277 } 1278 1279 // Static boolean for warning about useMulti because ideally there will be one warning per 1280 // process instance. It is fine if two threads end up racing on this for a bit. We do not 1281 // need to use an atomic type for this, that is overkill. The goal of reducing the number of 1282 // warnings from many to one or a few at startup is still achieved. 1283 private static boolean useMultiWarn = true; 1284 1285 /** 1286 * Use ZooKeeper's multi-update functionality. If all of the following are true: - 1287 * runSequentialOnMultiFailure is true - on calling multi, we get a ZooKeeper exception that can 1288 * be handled by a sequential call(*) Then: - we retry the operations one-by-one (sequentially) 1289 * Note *: an example is receiving a NodeExistsException from a "create" call. Without multi, a 1290 * user could call "createAndFailSilent" to ensure that a node exists if they don't care who 1291 * actually created the node (i.e. the NodeExistsException from ZooKeeper is caught). This will 1292 * cause all operations in the multi to fail, however, because the NodeExistsException that 1293 * zk.create throws will fail the multi transaction. In this case, if the previous conditions 1294 * hold, the commands are run sequentially, which should result in the correct final state, but 1295 * means that the operations will not run atomically. 1296 * @throws KeeperException if a ZooKeeper operation fails 1297 */ 1298 public static void multiOrSequential(ZKWatcher zkw, List<ZKUtilOp> ops, 1299 boolean runSequentialOnMultiFailure) throws KeeperException { 1300 if (ops == null) { 1301 return; 1302 } 1303 if (useMultiWarn) { // Only check and warn at first use 1304 if (zkw.getConfiguration().get("hbase.zookeeper.useMulti") != null) { 1305 LOG.warn("hbase.zookeeper.useMulti is deprecated. Default to true always."); 1306 } 1307 useMultiWarn = false; 1308 } 1309 List<Op> zkOps = new LinkedList<>(); 1310 for (ZKUtilOp op : ops) { 1311 zkOps.add(toZooKeeperOp(zkw, op)); 1312 } 1313 try { 1314 zkw.getRecoverableZooKeeper().multi(zkOps); 1315 } catch (KeeperException ke) { 1316 switch (ke.code()) { 1317 case NODEEXISTS: 1318 case NONODE: 1319 case BADVERSION: 1320 case NOAUTH: 1321 case NOTEMPTY: 1322 // if we get an exception that could be solved by running sequentially 1323 // (and the client asked us to), then break out and run sequentially 1324 if (runSequentialOnMultiFailure) { 1325 LOG.info( 1326 "multi exception: {}; running operations sequentially " 1327 + "(runSequentialOnMultiFailure=true); {}", 1328 ke.toString(), ops.stream().map(o -> o.toString()).collect(Collectors.joining(","))); 1329 processSequentially(zkw, ops); 1330 break; 1331 } 1332 default: 1333 throw ke; 1334 } 1335 } catch (InterruptedException ie) { 1336 zkw.interruptedException(ie); 1337 } 1338 } 1339 1340 private static void processSequentially(ZKWatcher zkw, List<ZKUtilOp> ops) 1341 throws KeeperException, NoNodeException { 1342 for (ZKUtilOp op : ops) { 1343 if (op instanceof CreateAndFailSilent) { 1344 createAndFailSilent(zkw, (CreateAndFailSilent) op); 1345 } else if (op instanceof DeleteNodeFailSilent) { 1346 deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) op); 1347 } else if (op instanceof SetData) { 1348 setData(zkw, (SetData) op); 1349 } else { 1350 throw new UnsupportedOperationException( 1351 "Unexpected ZKUtilOp type: " + op.getClass().getName()); 1352 } 1353 } 1354 } 1355 1356 // 1357 // ZooKeeper cluster information 1358 // 1359 1360 private static void logRetrievedMsg(final ZKWatcher zkw, final String znode, final byte[] data, 1361 final boolean watcherSet) { 1362 if (!LOG.isTraceEnabled()) { 1363 return; 1364 } 1365 1366 LOG.trace(zkw.prefix("Retrieved " + ((data == null) ? 0 : data.length) 1367 + " byte(s) of data from znode " + znode + (watcherSet ? " and set watcher; " : "; data=") 1368 + (data == null ? "null" 1369 : data.length == 0 ? "empty" 1370 : (zkw.getZNodePaths().isMetaZNodePath(znode) ? getServerNameOrEmptyString(data) 1371 : znode.startsWith(zkw.getZNodePaths().backupMasterAddressesZNode) 1372 ? getServerNameOrEmptyString(data) 1373 : StringUtils.abbreviate(Bytes.toStringBinary(data), 32))))); 1374 } 1375 1376 private static String getServerNameOrEmptyString(final byte[] data) { 1377 try { 1378 return ProtobufUtil.parseServerNameFrom(data).toString(); 1379 } catch (DeserializationException e) { 1380 return ""; 1381 } 1382 } 1383 1384 /** 1385 * Waits for HBase installation's base (parent) znode to become available. 1386 * @throws IOException on ZK errors 1387 */ 1388 public static void waitForBaseZNode(Configuration conf) throws IOException { 1389 LOG.info("Waiting until the base znode is available"); 1390 String parentZNode = 1391 conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1392 ZooKeeper zk = new ZooKeeper(ZKConfig.getZKQuorumServersString(conf), 1393 conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT), 1394 EmptyWatcher.instance); 1395 1396 final int maxTimeMs = 10000; 1397 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 1398 1399 KeeperException keeperEx = null; 1400 try { 1401 try { 1402 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 1403 try { 1404 if (zk.exists(parentZNode, false) != null) { 1405 LOG.info("Parent znode exists: {}", parentZNode); 1406 keeperEx = null; 1407 break; 1408 } 1409 } catch (KeeperException e) { 1410 keeperEx = e; 1411 } 1412 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 1413 } 1414 } finally { 1415 zk.close(); 1416 } 1417 } catch (InterruptedException ex) { 1418 Thread.currentThread().interrupt(); 1419 } 1420 1421 if (keeperEx != null) { 1422 throw new IOException(keeperEx); 1423 } 1424 } 1425 1426 /** 1427 * Convert a {@link DeserializationException} to a more palatable {@link KeeperException}. Used 1428 * when can't let a {@link DeserializationException} out w/o changing public API. 1429 * @param e Exception to convert 1430 * @return Converted exception 1431 */ 1432 public static KeeperException convert(final DeserializationException e) { 1433 KeeperException ke = new KeeperException.DataInconsistencyException(); 1434 ke.initCause(e); 1435 return ke; 1436 } 1437 1438 /** 1439 * Recursively print the current state of ZK (non-transactional) 1440 * @param root name of the root directory in zk to print 1441 */ 1442 public static void logZKTree(ZKWatcher zkw, String root) { 1443 if (!LOG.isDebugEnabled()) { 1444 return; 1445 } 1446 1447 LOG.debug("Current zk system:"); 1448 String prefix = "|-"; 1449 LOG.debug(prefix + root); 1450 try { 1451 logZKTree(zkw, root, prefix); 1452 } catch (KeeperException e) { 1453 throw new RuntimeException(e); 1454 } 1455 } 1456 1457 /** 1458 * Helper method to print the current state of the ZK tree. 1459 * @see #logZKTree(ZKWatcher, String) 1460 * @throws KeeperException if an unexpected exception occurs 1461 */ 1462 private static void logZKTree(ZKWatcher zkw, String root, String prefix) throws KeeperException { 1463 List<String> children = ZKUtil.listChildrenNoWatch(zkw, root); 1464 1465 if (children == null) { 1466 return; 1467 } 1468 1469 for (String child : children) { 1470 LOG.debug(prefix + child); 1471 String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child); 1472 logZKTree(zkw, node, prefix + "---"); 1473 } 1474 } 1475 1476 /** 1477 * @param position the position to serialize 1478 * @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable 1479 * for use as content of an wal position in a replication queue. 1480 */ 1481 public static byte[] positionToByteArray(final long position) { 1482 byte[] bytes = ReplicationProtos.ReplicationHLogPosition.newBuilder().setPosition(position) 1483 .build().toByteArray(); 1484 return ProtobufUtil.prependPBMagic(bytes); 1485 } 1486 1487 /** 1488 * @param bytes - Content of a WAL position znode. 1489 * @return long - The current WAL position. 1490 * @throws DeserializationException if the WAL position cannot be parsed 1491 */ 1492 public static long parseWALPositionFrom(final byte[] bytes) throws DeserializationException { 1493 if (bytes == null) { 1494 throw new DeserializationException("Unable to parse null WAL position."); 1495 } 1496 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 1497 int pblen = ProtobufUtil.lengthOfPBMagic(); 1498 ReplicationProtos.ReplicationHLogPosition.Builder builder = 1499 ReplicationProtos.ReplicationHLogPosition.newBuilder(); 1500 ReplicationProtos.ReplicationHLogPosition position; 1501 try { 1502 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 1503 position = builder.build(); 1504 } catch (IOException e) { 1505 throw new DeserializationException(e); 1506 } 1507 return position.getPosition(); 1508 } else { 1509 if (bytes.length > 0) { 1510 return Bytes.toLong(bytes); 1511 } 1512 return 0; 1513 } 1514 } 1515}