001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.Deque; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.stream.Collectors; 029import org.apache.commons.lang3.StringUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.DeserializationException; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.zookeeper.AsyncCallback; 040import org.apache.zookeeper.CreateMode; 041import org.apache.zookeeper.KeeperException; 042import org.apache.zookeeper.KeeperException.NoNodeException; 043import org.apache.zookeeper.Op; 044import org.apache.zookeeper.ZooKeeper; 045import org.apache.zookeeper.data.Stat; 046import org.apache.zookeeper.proto.CreateRequest; 047import org.apache.zookeeper.proto.DeleteRequest; 048import org.apache.zookeeper.proto.SetDataRequest; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 054 055/** 056 * Internal HBase utility class for ZooKeeper. 057 * <p> 058 * Contains only static methods and constants. 059 * <p> 060 * Methods all throw {@link KeeperException} if there is an unexpected zookeeper exception, so 061 * callers of these methods must handle appropriately. If ZK is required for the operation, the 062 * server will need to be aborted. 063 */ 064@InterfaceAudience.Private 065public final class ZKUtil { 066 private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class); 067 068 private ZKUtil() { 069 } 070 071 // 072 // Helper methods 073 // 074 /** 075 * Returns the full path of the immediate parent of the specified node. 076 * @param node path to get parent of 077 * @return parent of path, null if passed the root node or an invalid node 078 */ 079 public static String getParent(String node) { 080 int idx = node.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR); 081 return idx <= 0 ? null : node.substring(0, idx); 082 } 083 084 /** 085 * Get the name of the current node from the specified fully-qualified path. 086 * @param path fully-qualified path 087 * @return name of the current node 088 */ 089 public static String getNodeName(String path) { 090 return path.substring(path.lastIndexOf("/") + 1); 091 } 092 093 // 094 // Existence checks and watches 095 // 096 097 /** 098 * Watch the specified znode for delete/create/change events. The watcher is set whether or not 099 * the node exists. If the node already exists, the method returns true. If the node does not 100 * exist, the method returns false. 101 * @param zkw zk reference 102 * @param znode path of node to watch 103 * @return true if znode exists, false if does not exist or error 104 * @throws KeeperException if unexpected zookeeper exception 105 */ 106 public static boolean watchAndCheckExists(ZKWatcher zkw, String znode) throws KeeperException { 107 try { 108 Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); 109 boolean exists = s != null; 110 if (exists) { 111 LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode)); 112 } else { 113 LOG.debug(zkw.prefix("Set watcher on znode that does not yet exist, " + znode)); 114 } 115 return exists; 116 } catch (KeeperException e) { 117 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); 118 zkw.keeperException(e); 119 return false; 120 } catch (InterruptedException e) { 121 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); 122 zkw.interruptedException(e); 123 return false; 124 } 125 } 126 127 /** 128 * Watch the specified znode, but only if exists. Useful when watching for deletions. Uses 129 * .getData() (and handles NoNodeException) instead of .exists() to accomplish this, as .getData() 130 * will only set a watch if the znode exists. 131 * @param zkw zk reference 132 * @param znode path of node to watch 133 * @return true if the watch is set, false if node does not exists 134 * @throws KeeperException if unexpected zookeeper exception 135 */ 136 public static boolean setWatchIfNodeExists(ZKWatcher zkw, String znode) throws KeeperException { 137 try { 138 zkw.getRecoverableZooKeeper().getData(znode, true, null); 139 return true; 140 } catch (NoNodeException e) { 141 return false; 142 } catch (InterruptedException e) { 143 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); 144 zkw.interruptedException(e); 145 return false; 146 } 147 } 148 149 /** 150 * Check if the specified node exists. Sets no watches. 151 * @param zkw zk reference 152 * @param znode path of node to watch 153 * @return version of the node if it exists, -1 if does not exist 154 * @throws KeeperException if unexpected zookeeper exception 155 */ 156 public static int checkExists(ZKWatcher zkw, String znode) throws KeeperException { 157 try { 158 Stat s = zkw.getRecoverableZooKeeper().exists(znode, null); 159 return s != null ? s.getVersion() : -1; 160 } catch (KeeperException e) { 161 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); 162 zkw.keeperException(e); 163 return -1; 164 } catch (InterruptedException e) { 165 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); 166 zkw.interruptedException(e); 167 return -1; 168 } 169 } 170 171 // 172 // Znode listings 173 // 174 175 /** 176 * Lists the children znodes of the specified znode. Also sets a watch on the specified znode 177 * which will capture a NodeDeleted event on the specified znode as well as NodeChildrenChanged if 178 * any children of the specified znode are created or deleted. Returns null if the specified node 179 * does not exist. Otherwise returns a list of children of the specified node. If the node exists 180 * but it has no children, an empty list will be returned. 181 * @param zkw zk reference 182 * @param znode path of node to list and watch children of 183 * @return list of children of the specified node, an empty list if the node exists but has no 184 * children, and null if the node does not exist 185 * @throws KeeperException if unexpected zookeeper exception 186 */ 187 public static List<String> listChildrenAndWatchForNewChildren(ZKWatcher zkw, String znode) 188 throws KeeperException { 189 try { 190 return zkw.getRecoverableZooKeeper().getChildren(znode, zkw); 191 } catch (KeeperException.NoNodeException ke) { 192 LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " 193 + "because node does not exist (not an error)")); 194 } catch (KeeperException e) { 195 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e); 196 zkw.keeperException(e); 197 } catch (InterruptedException e) { 198 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e); 199 zkw.interruptedException(e); 200 } 201 202 return null; 203 } 204 205 /** 206 * List all the children of the specified znode, setting a watch for children changes and also 207 * setting a watch on every individual child in order to get the NodeCreated and NodeDeleted 208 * events. 209 * @param zkw zookeeper reference 210 * @param znode node to get children of and watch 211 * @return list of znode names, null if the node doesn't exist 212 * @throws KeeperException if a ZooKeeper operation fails 213 */ 214 public static List<String> listChildrenAndWatchThem(ZKWatcher zkw, String znode) 215 throws KeeperException { 216 List<String> children = listChildrenAndWatchForNewChildren(zkw, znode); 217 if (children == null) { 218 return null; 219 } 220 for (String child : children) { 221 watchAndCheckExists(zkw, ZNodePaths.joinZNode(znode, child)); 222 } 223 return children; 224 } 225 226 /** 227 * Lists the children of the specified znode without setting any watches. Sets no watches at all, 228 * this method is best effort. Returns an empty list if the node has no children. Returns null if 229 * the parent node itself does not exist. 230 * @param zkw zookeeper reference 231 * @param znode node to get children 232 * @return list of data of children of specified znode, empty if no children, null if parent does 233 * not exist 234 * @throws KeeperException if unexpected zookeeper exception 235 */ 236 public static List<String> listChildrenNoWatch(ZKWatcher zkw, String znode) 237 throws KeeperException { 238 List<String> children = null; 239 try { 240 // List the children without watching 241 children = zkw.getRecoverableZooKeeper().getChildren(znode, null); 242 } catch (KeeperException.NoNodeException nne) { 243 return null; 244 } catch (InterruptedException ie) { 245 zkw.interruptedException(ie); 246 } 247 return children; 248 } 249 250 /** 251 * Simple class to hold a node path and node data. 252 * @deprecated Unused 253 */ 254 @Deprecated 255 public static class NodeAndData { 256 private String node; 257 private byte[] data; 258 259 public NodeAndData(String node, byte[] data) { 260 this.node = node; 261 this.data = data; 262 } 263 264 public String getNode() { 265 return node; 266 } 267 268 public byte[] getData() { 269 return data; 270 } 271 272 @Override 273 public String toString() { 274 return node; 275 } 276 277 public boolean isEmpty() { 278 return (data == null || data.length == 0); 279 } 280 } 281 282 /** 283 * Checks if the specified znode has any children. Sets no watches. Returns true if the node 284 * exists and has children. Returns false if the node does not exist or if the node does not have 285 * any children. Used during master initialization to determine if the master is a failed-over-to 286 * master or the first master during initial cluster startup. If the directory for regionserver 287 * ephemeral nodes is empty then this is a cluster startup, if not then it is not cluster startup. 288 * @param zkw zk reference 289 * @param znode path of node to check for children of 290 * @return true if node has children, false if not or node does not exist 291 * @throws KeeperException if unexpected zookeeper exception 292 */ 293 public static boolean nodeHasChildren(ZKWatcher zkw, String znode) throws KeeperException { 294 try { 295 return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty(); 296 } catch (KeeperException.NoNodeException ke) { 297 LOG.debug(zkw.prefix("Unable to list children of znode " + znode 298 + " because node does not exist (not an error)")); 299 return false; 300 } catch (KeeperException e) { 301 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e); 302 zkw.keeperException(e); 303 return false; 304 } catch (InterruptedException e) { 305 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e); 306 zkw.interruptedException(e); 307 return false; 308 } 309 } 310 311 /** 312 * Get the number of children of the specified node. If the node does not exist or has no 313 * children, returns 0. Sets no watches at all. 314 * @param zkw zk reference 315 * @param znode path of node to count children of 316 * @return number of children of specified node, 0 if none or parent does not exist 317 * @throws KeeperException if unexpected zookeeper exception 318 */ 319 public static int getNumberOfChildren(ZKWatcher zkw, String znode) throws KeeperException { 320 try { 321 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null); 322 return stat == null ? 0 : stat.getNumChildren(); 323 } catch (KeeperException e) { 324 LOG.warn(zkw.prefix("Unable to get children of node " + znode)); 325 zkw.keeperException(e); 326 } catch (InterruptedException e) { 327 zkw.interruptedException(e); 328 } 329 return 0; 330 } 331 332 // 333 // Data retrieval 334 // 335 336 /** 337 * Get znode data. Does not set a watcher. 338 * @return ZNode data, null if the node does not exist or if there is an error. 339 */ 340 public static byte[] getData(ZKWatcher zkw, String znode) 341 throws KeeperException, InterruptedException { 342 try { 343 byte[] data = zkw.getRecoverableZooKeeper().getData(znode, null, null); 344 logRetrievedMsg(zkw, znode, data, false); 345 return data; 346 } catch (KeeperException.NoNodeException e) { 347 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " 348 + "because node does not exist (not an error)")); 349 return null; 350 } catch (KeeperException e) { 351 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 352 zkw.keeperException(e); 353 return null; 354 } 355 } 356 357 /** 358 * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the 359 * node exists. Returns null and no watch is set if the node does not exist or there is an 360 * exception. 361 * @param zkw zk reference 362 * @param znode path of node 363 * @return data of the specified znode, or null 364 * @throws KeeperException if unexpected zookeeper exception 365 */ 366 public static byte[] getDataAndWatch(ZKWatcher zkw, String znode) throws KeeperException { 367 return getDataInternal(zkw, znode, null, true, true); 368 } 369 370 /** 371 * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the 372 * node exists. Returns null and no watch is set if the node does not exist or there is an 373 * exception. 374 * @param zkw zk reference 375 * @param znode path of node 376 * @param throwOnInterrupt if false then just interrupt the thread, do not throw exception 377 * @return data of the specified znode, or null 378 * @throws KeeperException if unexpected zookeeper exception 379 */ 380 public static byte[] getDataAndWatch(ZKWatcher zkw, String znode, boolean throwOnInterrupt) 381 throws KeeperException { 382 return getDataInternal(zkw, znode, null, true, throwOnInterrupt); 383 } 384 385 /** 386 * Get the data at the specified znode and set a watch. Returns the data and sets a watch if the 387 * node exists. Returns null and no watch is set if the node does not exist or there is an 388 * exception. 389 * @param zkw zk reference 390 * @param znode path of node 391 * @param stat object to populate the version of the znode 392 * @return data of the specified znode, or null 393 * @throws KeeperException if unexpected zookeeper exception 394 */ 395 public static byte[] getDataAndWatch(ZKWatcher zkw, String znode, Stat stat) 396 throws KeeperException { 397 return getDataInternal(zkw, znode, stat, true, true); 398 } 399 400 private static byte[] getDataInternal(ZKWatcher zkw, String znode, Stat stat, boolean watcherSet, 401 boolean throwOnInterrupt) throws KeeperException { 402 try { 403 byte[] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, stat); 404 logRetrievedMsg(zkw, znode, data, watcherSet); 405 return data; 406 } catch (KeeperException.NoNodeException e) { 407 // This log can get pretty annoying when we cycle on 100ms waits. 408 // Enable trace if you really want to see it. 409 LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " " 410 + "because node does not exist (not an error)")); 411 return null; 412 } catch (KeeperException e) { 413 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 414 zkw.keeperException(e); 415 return null; 416 } catch (InterruptedException e) { 417 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 418 if (throwOnInterrupt) { 419 zkw.interruptedException(e); 420 } else { 421 zkw.interruptedExceptionNoThrow(e, true); 422 } 423 return null; 424 } 425 } 426 427 /** 428 * Get the data at the specified znode without setting a watch. Returns the data if the node 429 * exists. Returns null if the node does not exist. Sets the stats of the node in the passed Stat 430 * object. Pass a null stat if not interested. 431 * @param zkw zk reference 432 * @param znode path of node 433 * @param stat node status to get if node exists 434 * @return data of the specified znode, or null if node does not exist 435 * @throws KeeperException if unexpected zookeeper exception 436 */ 437 public static byte[] getDataNoWatch(ZKWatcher zkw, String znode, Stat stat) 438 throws KeeperException { 439 try { 440 byte[] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat); 441 logRetrievedMsg(zkw, znode, data, false); 442 return data; 443 } catch (KeeperException.NoNodeException e) { 444 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " 445 + "because node does not exist (not necessarily an error)")); 446 return null; 447 } catch (KeeperException e) { 448 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 449 zkw.keeperException(e); 450 return null; 451 } catch (InterruptedException e) { 452 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e); 453 zkw.interruptedException(e); 454 return null; 455 } 456 } 457 458 /** 459 * Returns the date of child znodes of the specified znode. Also sets a watch on the specified 460 * znode which will capture a NodeDeleted event on the specified znode as well as 461 * NodeChildrenChanged if any children of the specified znode are created or deleted. Returns null 462 * if the specified node does not exist. Otherwise returns a list of children of the specified 463 * node. If the node exists but it has no children, an empty list will be returned. 464 * @param zkw zk reference 465 * @param baseNode path of node to list and watch children of 466 * @return list of data of children of the specified node, an empty list if the node exists but 467 * has no children, and null if the node does not exist 468 * @throws KeeperException if unexpected zookeeper exception 469 * @deprecated Unused 470 */ 471 @Deprecated 472 public static List<NodeAndData> getChildDataAndWatchForNewChildren(ZKWatcher zkw, String baseNode) 473 throws KeeperException { 474 return getChildDataAndWatchForNewChildren(zkw, baseNode, true); 475 } 476 477 /** 478 * Returns the date of child znodes of the specified znode. Also sets a watch on the specified 479 * znode which will capture a NodeDeleted event on the specified znode as well as 480 * NodeChildrenChanged if any children of the specified znode are created or deleted. Returns null 481 * if the specified node does not exist. Otherwise returns a list of children of the specified 482 * node. If the node exists but it has no children, an empty list will be returned. 483 * @param zkw zk reference 484 * @param baseNode path of node to list and watch children of 485 * @param throwOnInterrupt if true then just interrupt the thread, do not throw exception 486 * @return list of data of children of the specified node, an empty list if the node exists but 487 * has no children, and null if the node does not exist 488 * @throws KeeperException if unexpected zookeeper exception 489 * @deprecated Unused 490 */ 491 @Deprecated 492 public static List<NodeAndData> getChildDataAndWatchForNewChildren(ZKWatcher zkw, String baseNode, 493 boolean throwOnInterrupt) throws KeeperException { 494 List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode); 495 if (nodes != null) { 496 List<NodeAndData> newNodes = new ArrayList<>(); 497 for (String node : nodes) { 498 if (Thread.interrupted()) { 499 // Partial data should not be processed. Cancel processing by sending empty list. 500 return Collections.emptyList(); 501 } 502 String nodePath = ZNodePaths.joinZNode(baseNode, node); 503 byte[] data = ZKUtil.getDataAndWatch(zkw, nodePath, throwOnInterrupt); 504 newNodes.add(new NodeAndData(nodePath, data)); 505 } 506 return newNodes; 507 } 508 return null; 509 } 510 511 /** 512 * Update the data of an existing node with the expected version to have the specified data. 513 * Throws an exception if there is a version mismatch or some other problem. Sets no watches under 514 * any conditions. 515 * @param zkw zk reference 516 * @param znode the path to the ZNode 517 * @param data the data to store in ZooKeeper 518 * @param expectedVersion the expected version 519 * @throws KeeperException if unexpected zookeeper exception 520 * @throws KeeperException.BadVersionException if version mismatch 521 * @deprecated Unused 522 */ 523 @Deprecated 524 public static void updateExistingNodeData(ZKWatcher zkw, String znode, byte[] data, 525 int expectedVersion) throws KeeperException { 526 try { 527 zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion); 528 } catch (InterruptedException ie) { 529 zkw.interruptedException(ie); 530 } 531 } 532 533 // 534 // Data setting 535 // 536 537 /** 538 * Sets the data of the existing znode to be the specified data. Ensures that the current data has 539 * the specified expected version. 540 * <p> 541 * If the node does not exist, a {@link NoNodeException} will be thrown. 542 * <p> 543 * If their is a version mismatch, method returns null. 544 * <p> 545 * No watches are set but setting data will trigger other watchers of this node. 546 * <p> 547 * If there is another problem, a KeeperException will be thrown. 548 * @param zkw zk reference 549 * @param znode path of node 550 * @param data data to set for node 551 * @param expectedVersion version expected when setting data 552 * @return true if data set, false if version mismatch 553 * @throws KeeperException if unexpected zookeeper exception 554 */ 555 public static boolean setData(ZKWatcher zkw, String znode, byte[] data, int expectedVersion) 556 throws KeeperException, KeeperException.NoNodeException { 557 try { 558 return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null; 559 } catch (InterruptedException e) { 560 zkw.interruptedException(e); 561 return false; 562 } 563 } 564 565 /** 566 * Set data into node creating node if it doesn't yet exist. Does not set watch. 567 * @param zkw zk reference 568 * @param znode path of node 569 * @param data data to set for node 570 * @throws KeeperException if a ZooKeeper operation fails 571 */ 572 public static void createSetData(final ZKWatcher zkw, final String znode, final byte[] data) 573 throws KeeperException { 574 if (checkExists(zkw, znode) == -1) { 575 ZKUtil.createWithParents(zkw, znode, data); 576 } else { 577 ZKUtil.setData(zkw, znode, data); 578 } 579 } 580 581 /** 582 * Sets the data of the existing znode to be the specified data. The node must exist but no checks 583 * are done on the existing data or version. 584 * <p> 585 * If the node does not exist, a {@link NoNodeException} will be thrown. 586 * <p> 587 * No watches are set but setting data will trigger other watchers of this node. 588 * <p> 589 * If there is another problem, a KeeperException will be thrown. 590 * @param zkw zk reference 591 * @param znode path of node 592 * @param data data to set for node 593 * @throws KeeperException if unexpected zookeeper exception 594 */ 595 public static void setData(ZKWatcher zkw, String znode, byte[] data) 596 throws KeeperException, KeeperException.NoNodeException { 597 setData(zkw, (SetData) ZKUtilOp.setData(znode, data)); 598 } 599 600 private static void setData(ZKWatcher zkw, SetData setData) 601 throws KeeperException, KeeperException.NoNodeException { 602 SetDataRequest sd = (SetDataRequest) toZooKeeperOp(zkw, setData).toRequestRecord(); 603 setData(zkw, sd.getPath(), sd.getData(), sd.getVersion()); 604 } 605 606 // 607 // Node creation 608 // 609 610 /** 611 * Set the specified znode to be an ephemeral node carrying the specified data. If the node is 612 * created successfully, a watcher is also set on the node. If the node is not created 613 * successfully because it already exists, this method will also set a watcher on the node. If 614 * there is another problem, a KeeperException will be thrown. 615 * @param zkw zk reference 616 * @param znode path of node 617 * @param data data of node 618 * @return true if node created, false if not, watch set in both cases 619 * @throws KeeperException if unexpected zookeeper exception 620 */ 621 public static boolean createEphemeralNodeAndWatch(ZKWatcher zkw, String znode, byte[] data) 622 throws KeeperException { 623 boolean ret = true; 624 try { 625 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), CreateMode.EPHEMERAL); 626 } catch (KeeperException.NodeExistsException nee) { 627 ret = false; 628 } catch (InterruptedException e) { 629 LOG.info("Interrupted", e); 630 Thread.currentThread().interrupt(); 631 } 632 if (!watchAndCheckExists(zkw, znode)) { 633 // It did exist but now it doesn't, try again 634 return createEphemeralNodeAndWatch(zkw, znode, data); 635 } 636 return ret; 637 } 638 639 /** 640 * Creates the specified znode to be a persistent node carrying the specified data. Returns true 641 * if the node was successfully created, false if the node already existed. If the node is created 642 * successfully, a watcher is also set on the node. If the node is not created successfully 643 * because it already exists, this method will also set a watcher on the node but return false. If 644 * there is another problem, a KeeperException will be thrown. 645 * @param zkw zk reference 646 * @param znode path of node 647 * @param data data of node 648 * @return true if node created, false if not, watch set in both cases 649 * @throws KeeperException if unexpected zookeeper exception 650 */ 651 public static boolean createNodeIfNotExistsAndWatch(ZKWatcher zkw, String znode, byte[] data) 652 throws KeeperException { 653 boolean ret = true; 654 try { 655 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), 656 CreateMode.PERSISTENT); 657 } catch (KeeperException.NodeExistsException nee) { 658 ret = false; 659 } catch (InterruptedException e) { 660 zkw.interruptedException(e); 661 return false; 662 } 663 try { 664 zkw.getRecoverableZooKeeper().exists(znode, zkw); 665 } catch (InterruptedException e) { 666 zkw.interruptedException(e); 667 return false; 668 } 669 return ret; 670 } 671 672 /** 673 * Creates the specified znode with the specified data but does not watch it. Returns the znode of 674 * the newly created node If there is another problem, a KeeperException will be thrown. 675 * @param zkw zk reference 676 * @param znode path of node 677 * @param data data of node 678 * @param createMode specifying whether the node to be created is ephemeral and/or sequential 679 * @return true name of the newly created znode or null 680 * @throws KeeperException if unexpected zookeeper exception 681 */ 682 public static String createNodeIfNotExistsNoWatch(ZKWatcher zkw, String znode, byte[] data, 683 CreateMode createMode) throws KeeperException { 684 try { 685 return zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), createMode); 686 } catch (KeeperException.NodeExistsException nee) { 687 return znode; 688 } catch (InterruptedException e) { 689 zkw.interruptedException(e); 690 return null; 691 } 692 } 693 694 /** 695 * Creates the specified node with the specified data and watches it. 696 * <p> 697 * Throws an exception if the node already exists. 698 * <p> 699 * The node created is persistent and open access. 700 * <p> 701 * Returns the version number of the created node if successful. 702 * @param zkw zk reference 703 * @param znode path of node to create 704 * @param data data of node to create 705 * @return version of node created 706 * @throws KeeperException if unexpected zookeeper exception 707 * @throws KeeperException.NodeExistsException if node already exists 708 */ 709 public static int createAndWatch(ZKWatcher zkw, String znode, byte[] data) 710 throws KeeperException, KeeperException.NodeExistsException { 711 try { 712 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), 713 CreateMode.PERSISTENT); 714 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw); 715 if (stat == null) { 716 // Likely a race condition. Someone deleted the znode. 717 throw KeeperException.create(KeeperException.Code.SYSTEMERROR, 718 "ZK.exists returned null (i.e.: znode does not exist) for znode=" + znode); 719 } 720 721 return stat.getVersion(); 722 } catch (InterruptedException e) { 723 zkw.interruptedException(e); 724 return -1; 725 } 726 } 727 728 /** 729 * Async creates the specified node with the specified data. 730 * <p> 731 * Throws an exception if the node already exists. 732 * <p> 733 * The node created is persistent and open access. 734 * @param zkw zk reference 735 * @param znode path of node to create 736 * @param data data of node to create 737 * @param cb the callback to use for the creation 738 * @param ctx the context to use for the creation 739 */ 740 public static void asyncCreate(ZKWatcher zkw, String znode, byte[] data, 741 final AsyncCallback.StringCallback cb, final Object ctx) { 742 zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, zkw.createACL(znode), 743 CreateMode.PERSISTENT, cb, ctx); 744 } 745 746 /** 747 * Creates the specified node, iff the node does not exist. Does not set a watch and fails 748 * silently if the node already exists. The node created is persistent and open access. 749 * @param zkw zk reference 750 * @param znode path of node 751 * @throws KeeperException if unexpected zookeeper exception 752 */ 753 public static void createAndFailSilent(ZKWatcher zkw, String znode) throws KeeperException { 754 createAndFailSilent(zkw, znode, new byte[0]); 755 } 756 757 /** 758 * Creates the specified node containing specified data, iff the node does not exist. Does not set 759 * a watch and fails silently if the node already exists. The node created is persistent and open 760 * access. 761 * @param zkw zk reference 762 * @param znode path of node 763 * @param data a byte array data to store in the znode 764 * @throws KeeperException if unexpected zookeeper exception 765 */ 766 public static void createAndFailSilent(ZKWatcher zkw, String znode, byte[] data) 767 throws KeeperException { 768 createAndFailSilent(zkw, (CreateAndFailSilent) ZKUtilOp.createAndFailSilent(znode, data)); 769 } 770 771 private static void createAndFailSilent(ZKWatcher zkw, CreateAndFailSilent cafs) 772 throws KeeperException { 773 CreateRequest create = (CreateRequest) toZooKeeperOp(zkw, cafs).toRequestRecord(); 774 String znode = create.getPath(); 775 RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper(); 776 try { 777 if (zk.exists(znode, false) == null) { 778 zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags())); 779 } 780 } catch (KeeperException.NodeExistsException nee) { 781 // pass 782 } catch (KeeperException.NoAuthException nee) { 783 try { 784 if (zk.exists(znode, false) == null) { 785 // If we failed to create the file and it does not already exist. 786 throw nee; 787 } 788 } catch (InterruptedException ie) { 789 zkw.interruptedException(ie); 790 } 791 } catch (InterruptedException ie) { 792 zkw.interruptedException(ie); 793 } 794 } 795 796 /** 797 * Creates the specified node and all parent nodes required for it to exist. No watches are set 798 * and no errors are thrown if the node already exists. The nodes created are persistent and open 799 * access. 800 * @param zkw zk reference 801 * @param znode path of node 802 * @throws KeeperException if unexpected zookeeper exception 803 */ 804 public static void createWithParents(ZKWatcher zkw, String znode) throws KeeperException { 805 createWithParents(zkw, znode, new byte[0]); 806 } 807 808 /** 809 * Creates the specified node and all parent nodes required for it to exist. The creation of 810 * parent znodes is not atomic with the leafe znode creation but the data is written atomically 811 * when the leaf node is created. No watches are set and no errors are thrown if the node already 812 * exists. The nodes created are persistent and open access. 813 * @param zkw zk reference 814 * @param znode path of node 815 * @throws KeeperException if unexpected zookeeper exception 816 */ 817 public static void createWithParents(ZKWatcher zkw, String znode, byte[] data) 818 throws KeeperException { 819 try { 820 if (znode == null) { 821 return; 822 } 823 zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), 824 CreateMode.PERSISTENT); 825 } catch (KeeperException.NodeExistsException nee) { 826 return; 827 } catch (KeeperException.NoNodeException nne) { 828 createWithParents(zkw, getParent(znode)); 829 createWithParents(zkw, znode, data); 830 } catch (InterruptedException ie) { 831 zkw.interruptedException(ie); 832 } 833 } 834 835 // 836 // Deletes 837 // 838 839 /** 840 * Delete the specified node. Sets no watches. Throws all exceptions. 841 */ 842 public static void deleteNode(ZKWatcher zkw, String node) throws KeeperException { 843 deleteNode(zkw, node, -1); 844 } 845 846 /** 847 * Delete the specified node with the specified version. Sets no watches. Throws all exceptions. 848 */ 849 public static boolean deleteNode(ZKWatcher zkw, String node, int version) throws KeeperException { 850 try { 851 zkw.getRecoverableZooKeeper().delete(node, version); 852 return true; 853 } catch (KeeperException.BadVersionException bve) { 854 return false; 855 } catch (InterruptedException ie) { 856 zkw.interruptedException(ie); 857 return false; 858 } 859 } 860 861 /** 862 * Deletes the specified node. Fails silent if the node does not exist. 863 * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation 864 * @param node the node to delete 865 * @throws KeeperException if a ZooKeeper operation fails 866 */ 867 public static void deleteNodeFailSilent(ZKWatcher zkw, String node) throws KeeperException { 868 deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) ZKUtilOp.deleteNodeFailSilent(node)); 869 } 870 871 private static void deleteNodeFailSilent(ZKWatcher zkw, DeleteNodeFailSilent dnfs) 872 throws KeeperException { 873 DeleteRequest delete = (DeleteRequest) toZooKeeperOp(zkw, dnfs).toRequestRecord(); 874 try { 875 zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion()); 876 } catch (KeeperException.NoNodeException nne) { 877 } catch (InterruptedException ie) { 878 zkw.interruptedException(ie); 879 } 880 } 881 882 /** 883 * Delete the specified node and all of it's children. 884 * <p> 885 * If the node does not exist, just returns. 886 * <p> 887 * Sets no watches. Throws all exceptions besides dealing with deletion of children. 888 */ 889 public static void deleteNodeRecursively(ZKWatcher zkw, String node) throws KeeperException { 890 deleteNodeRecursivelyMultiOrSequential(zkw, true, node); 891 } 892 893 /** 894 * Delete all the children of the specified node but not the node itself. Sets no watches. Throws 895 * all exceptions besides dealing with deletion of children. 896 * @throws KeeperException if a ZooKeeper operation fails 897 */ 898 public static void deleteChildrenRecursively(ZKWatcher zkw, String node) throws KeeperException { 899 deleteChildrenRecursivelyMultiOrSequential(zkw, true, node); 900 } 901 902 /** 903 * Delete all the children of the specified node but not the node itself. This will first traverse 904 * the znode tree for listing the children and then delete these znodes using multi-update api or 905 * sequential based on the specified configurations. 906 * <p> 907 * Sets no watches. Throws all exceptions besides dealing with deletion of children. 908 * <p> 909 * If the following is true: 910 * <ul> 911 * <li>runSequentialOnMultiFailure is true 912 * </ul> 913 * on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*), we 914 * retry the operations one-by-one (sequentially). - zk reference - if true when we get a 915 * ZooKeeper exception that could retry the operations one-by-one (sequentially) - path of the 916 * parent node(s) 917 * @throws KeeperException.NotEmptyException if node has children while deleting if unexpected 918 * ZooKeeper exception if an invalid path is specified 919 */ 920 public static void deleteChildrenRecursivelyMultiOrSequential(ZKWatcher zkw, 921 boolean runSequentialOnMultiFailure, String... pathRoots) throws KeeperException { 922 if (pathRoots == null || pathRoots.length <= 0) { 923 LOG.warn("Given path is not valid!"); 924 return; 925 } 926 List<ZKUtilOp> ops = new ArrayList<>(); 927 for (String eachRoot : pathRoots) { 928 List<String> children = listChildrenBFSNoWatch(zkw, eachRoot); 929 // Delete the leaves first and eventually get rid of the root 930 for (int i = children.size() - 1; i >= 0; --i) { 931 ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i))); 932 } 933 } 934 submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); 935 } 936 937 /** 938 * Delete the specified node and its children. This traverse the znode tree for listing the 939 * children and then delete these znodes including the parent using multi-update api or sequential 940 * based on the specified configurations. 941 * <p> 942 * Sets no watches. Throws all exceptions besides dealing with deletion of children. 943 * <p> 944 * If the following is true: 945 * <ul> 946 * <li>runSequentialOnMultiFailure is true 947 * </ul> 948 * on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*), we 949 * retry the operations one-by-one (sequentially). - zk reference - if true when we get a 950 * ZooKeeper exception that could retry the operations one-by-one (sequentially) - path of the 951 * parent node(s) 952 * @throws KeeperException.NotEmptyException if node has children while deleting if unexpected 953 * ZooKeeper exception if an invalid path is specified 954 */ 955 public static void deleteNodeRecursivelyMultiOrSequential(ZKWatcher zkw, 956 boolean runSequentialOnMultiFailure, String... pathRoots) throws KeeperException { 957 if (pathRoots == null || pathRoots.length <= 0) { 958 LOG.warn("Given path is not valid!"); 959 return; 960 } 961 List<ZKUtilOp> ops = new ArrayList<>(); 962 for (String eachRoot : pathRoots) { 963 // ZooKeeper Watches are one time triggers; When children of parent nodes are deleted 964 // recursively, must set another watch, get notified of delete node 965 List<String> children = listChildrenBFSAndWatchThem(zkw, eachRoot); 966 // Delete the leaves first and eventually get rid of the root 967 for (int i = children.size() - 1; i >= 0; --i) { 968 ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i))); 969 } 970 try { 971 if (zkw.getRecoverableZooKeeper().exists(eachRoot, zkw) != null) { 972 ops.add(ZKUtilOp.deleteNodeFailSilent(eachRoot)); 973 } 974 } catch (InterruptedException e) { 975 zkw.interruptedException(e); 976 } 977 } 978 submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); 979 } 980 981 /** 982 * Chunks the provided {@code ops} when their approximate size exceeds the the configured limit. 983 * Take caution that this can ONLY be used for operations where atomicity is not important, e.g. 984 * deletions. It must not be used when atomicity of the operations is critical. 985 * @param zkw reference to the {@link ZKWatcher} which contains 986 * configuration and constants 987 * @param runSequentialOnMultiFailure if true when we get a ZooKeeper exception that could retry 988 * the operations one-by-one (sequentially) 989 * @param ops list of ZKUtilOp {@link ZKUtilOp} to partition while 990 * submitting batched multi or sequential 991 * @throws KeeperException unexpected ZooKeeper Exception / Zookeeper unreachable 992 */ 993 private static void submitBatchedMultiOrSequential(ZKWatcher zkw, 994 boolean runSequentialOnMultiFailure, List<ZKUtilOp> ops) throws KeeperException { 995 // at least one element should exist 996 if (ops.isEmpty()) { 997 return; 998 } 999 final int maxMultiSize = zkw.getRecoverableZooKeeper().getMaxMultiSizeLimit(); 1000 // Batch up the items to over smashing through jute.maxbuffer with too many Ops. 1001 final List<List<ZKUtilOp>> batchedOps = partitionOps(ops, maxMultiSize); 1002 // Would use forEach() but have to handle KeeperException 1003 for (List<ZKUtilOp> batch : batchedOps) { 1004 multiOrSequential(zkw, batch, runSequentialOnMultiFailure); 1005 } 1006 } 1007 1008 /** 1009 * Partition the list of {@code ops} by size (using {@link #estimateSize(ZKUtilOp)}). 1010 */ 1011 static List<List<ZKUtilOp>> partitionOps(List<ZKUtilOp> ops, int maxPartitionSize) { 1012 List<List<ZKUtilOp>> partitionedOps = new ArrayList<>(); 1013 List<ZKUtilOp> currentPartition = new ArrayList<>(); 1014 int currentPartitionSize = 0; 1015 partitionedOps.add(currentPartition); 1016 Iterator<ZKUtilOp> iter = ops.iterator(); 1017 while (iter.hasNext()) { 1018 ZKUtilOp currentOp = iter.next(); 1019 int currentOpSize = estimateSize(currentOp); 1020 1021 // Roll a new partition if necessary 1022 // If the current partition is empty, put the element in there anyways. 1023 // We can roll a new partition if we get another element 1024 if (!currentPartition.isEmpty() && currentOpSize + currentPartitionSize > maxPartitionSize) { 1025 currentPartition = new ArrayList<>(); 1026 partitionedOps.add(currentPartition); 1027 currentPartitionSize = 0; 1028 } 1029 1030 // Add the current op to the partition 1031 currentPartition.add(currentOp); 1032 // And record its size 1033 currentPartitionSize += currentOpSize; 1034 } 1035 return partitionedOps; 1036 } 1037 1038 static int estimateSize(ZKUtilOp op) { 1039 return Bytes.toBytes(op.getPath()).length; 1040 } 1041 1042 /** 1043 * BFS Traversal of all the children under path, with the entries in the list, in the same order 1044 * as that of the traversal. Lists all the children without setting any watches. - zk reference - 1045 * path of node 1046 * @return list of children znodes under the path if unexpected ZooKeeper exception 1047 */ 1048 private static List<String> listChildrenBFSNoWatch(ZKWatcher zkw, final String znode) 1049 throws KeeperException { 1050 Deque<String> queue = new LinkedList<>(); 1051 List<String> tree = new ArrayList<>(); 1052 queue.add(znode); 1053 while (true) { 1054 String node = queue.pollFirst(); 1055 if (node == null) { 1056 break; 1057 } 1058 List<String> children = listChildrenNoWatch(zkw, node); 1059 if (children == null) { 1060 continue; 1061 } 1062 for (final String child : children) { 1063 final String childPath = node + "/" + child; 1064 queue.add(childPath); 1065 tree.add(childPath); 1066 } 1067 } 1068 return tree; 1069 } 1070 1071 /** 1072 * BFS Traversal of all the children under path, with the entries in the list, in the same order 1073 * as that of the traversal. Lists all the children and set watches on to them. - zk reference - 1074 * path of node 1075 * @return list of children znodes under the path if unexpected ZooKeeper exception 1076 */ 1077 private static List<String> listChildrenBFSAndWatchThem(ZKWatcher zkw, final String znode) 1078 throws KeeperException { 1079 Deque<String> queue = new LinkedList<>(); 1080 List<String> tree = new ArrayList<>(); 1081 queue.add(znode); 1082 while (true) { 1083 String node = queue.pollFirst(); 1084 if (node == null) { 1085 break; 1086 } 1087 List<String> children = listChildrenAndWatchThem(zkw, node); 1088 if (children == null) { 1089 continue; 1090 } 1091 for (final String child : children) { 1092 final String childPath = node + "/" + child; 1093 queue.add(childPath); 1094 tree.add(childPath); 1095 } 1096 } 1097 return tree; 1098 } 1099 1100 /** 1101 * Represents an action taken by ZKUtil, e.g. createAndFailSilent. These actions are higher-level 1102 * than ZKOp actions, which represent individual actions in the ZooKeeper API, like create. 1103 */ 1104 public abstract static class ZKUtilOp { 1105 private String path; 1106 1107 @Override 1108 public String toString() { 1109 return this.getClass().getSimpleName() + ", path=" + this.path; 1110 } 1111 1112 private ZKUtilOp(String path) { 1113 this.path = path; 1114 } 1115 1116 /** Returns a createAndFailSilent ZKUtilOp */ 1117 public static ZKUtilOp createAndFailSilent(String path, byte[] data) { 1118 return new CreateAndFailSilent(path, data); 1119 } 1120 1121 /** Returns a deleteNodeFailSilent ZKUtilOP */ 1122 public static ZKUtilOp deleteNodeFailSilent(String path) { 1123 return new DeleteNodeFailSilent(path); 1124 } 1125 1126 /** Returns a setData ZKUtilOp */ 1127 public static ZKUtilOp setData(String path, byte[] data) { 1128 return new SetData(path, data); 1129 } 1130 1131 /** Returns a setData ZKUtilOp */ 1132 public static ZKUtilOp setData(String path, byte[] data, int version) { 1133 return new SetData(path, data, version); 1134 } 1135 1136 /** Returns path to znode where the ZKOp will occur */ 1137 public String getPath() { 1138 return path; 1139 } 1140 1141 /** 1142 * ZKUtilOp representing createAndFailSilent in ZooKeeper (attempt to create node, ignore error 1143 * if already exists) 1144 */ 1145 public static final class CreateAndFailSilent extends ZKUtilOp { 1146 private byte[] data; 1147 1148 private CreateAndFailSilent(String path, byte[] data) { 1149 super(path); 1150 this.data = data; 1151 } 1152 1153 public byte[] getData() { 1154 return data; 1155 } 1156 1157 @Override 1158 public boolean equals(Object o) { 1159 if (this == o) { 1160 return true; 1161 } 1162 if (!(o instanceof CreateAndFailSilent)) { 1163 return false; 1164 } 1165 1166 CreateAndFailSilent op = (CreateAndFailSilent) o; 1167 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data); 1168 } 1169 1170 @Override 1171 public int hashCode() { 1172 int ret = 17 + getPath().hashCode() * 31; 1173 return ret * 31 + Bytes.hashCode(data); 1174 } 1175 } 1176 1177 /** 1178 * ZKUtilOp representing deleteNodeFailSilent in ZooKeeper (attempt to delete node, ignore error 1179 * if node doesn't exist) 1180 */ 1181 public static final class DeleteNodeFailSilent extends ZKUtilOp { 1182 private DeleteNodeFailSilent(String path) { 1183 super(path); 1184 } 1185 1186 @Override 1187 public boolean equals(Object o) { 1188 if (this == o) { 1189 return true; 1190 } 1191 if (!(o instanceof DeleteNodeFailSilent)) { 1192 return false; 1193 } 1194 1195 return super.equals(o); 1196 } 1197 1198 @Override 1199 public int hashCode() { 1200 return getPath().hashCode(); 1201 } 1202 } 1203 1204 /** 1205 * ZKUtilOp representing setData in ZooKeeper 1206 */ 1207 public static final class SetData extends ZKUtilOp { 1208 private byte[] data; 1209 private int version = -1; 1210 1211 private SetData(String path, byte[] data) { 1212 super(path); 1213 this.data = data; 1214 } 1215 1216 private SetData(String path, byte[] data, int version) { 1217 super(path); 1218 this.data = data; 1219 this.version = version; 1220 } 1221 1222 public byte[] getData() { 1223 return data; 1224 } 1225 1226 public int getVersion() { 1227 return version; 1228 } 1229 1230 @Override 1231 public boolean equals(Object o) { 1232 if (this == o) { 1233 return true; 1234 } 1235 if (!(o instanceof SetData)) { 1236 return false; 1237 } 1238 1239 SetData op = (SetData) o; 1240 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data) 1241 && getVersion() == op.getVersion(); 1242 } 1243 1244 @Override 1245 public int hashCode() { 1246 int ret = getPath().hashCode(); 1247 ret = ret * 31 + Bytes.hashCode(data); 1248 return ret * 31 + Integer.hashCode(version); 1249 } 1250 } 1251 } 1252 1253 /** 1254 * Convert from ZKUtilOp to ZKOp 1255 */ 1256 private static Op toZooKeeperOp(ZKWatcher zkw, ZKUtilOp op) throws UnsupportedOperationException { 1257 if (op == null) { 1258 return null; 1259 } 1260 1261 if (op instanceof CreateAndFailSilent) { 1262 CreateAndFailSilent cafs = (CreateAndFailSilent) op; 1263 return Op.create(cafs.getPath(), cafs.getData(), zkw.createACL(cafs.getPath()), 1264 CreateMode.PERSISTENT); 1265 } else if (op instanceof DeleteNodeFailSilent) { 1266 DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent) op; 1267 return Op.delete(dnfs.getPath(), -1); 1268 } else if (op instanceof SetData) { 1269 SetData sd = (SetData) op; 1270 return Op.setData(sd.getPath(), sd.getData(), sd.getVersion()); 1271 } else { 1272 throw new UnsupportedOperationException( 1273 "Unexpected ZKUtilOp type: " + op.getClass().getName()); 1274 } 1275 } 1276 1277 // Static boolean for warning about useMulti because ideally there will be one warning per 1278 // process instance. It is fine if two threads end up racing on this for a bit. We do not 1279 // need to use an atomic type for this, that is overkill. The goal of reducing the number of 1280 // warnings from many to one or a few at startup is still achieved. 1281 private static boolean useMultiWarn = true; 1282 1283 /** 1284 * Use ZooKeeper's multi-update functionality. If all of the following are true: - 1285 * runSequentialOnMultiFailure is true - on calling multi, we get a ZooKeeper exception that can 1286 * be handled by a sequential call(*) Then: - we retry the operations one-by-one (sequentially) 1287 * Note *: an example is receiving a NodeExistsException from a "create" call. Without multi, a 1288 * user could call "createAndFailSilent" to ensure that a node exists if they don't care who 1289 * actually created the node (i.e. the NodeExistsException from ZooKeeper is caught). This will 1290 * cause all operations in the multi to fail, however, because the NodeExistsException that 1291 * zk.create throws will fail the multi transaction. In this case, if the previous conditions 1292 * hold, the commands are run sequentially, which should result in the correct final state, but 1293 * means that the operations will not run atomically. 1294 * @throws KeeperException if a ZooKeeper operation fails 1295 */ 1296 public static void multiOrSequential(ZKWatcher zkw, List<ZKUtilOp> ops, 1297 boolean runSequentialOnMultiFailure) throws KeeperException { 1298 if (ops == null) { 1299 return; 1300 } 1301 if (useMultiWarn) { // Only check and warn at first use 1302 if (zkw.getConfiguration().get("hbase.zookeeper.useMulti") != null) { 1303 LOG.warn("hbase.zookeeper.useMulti is deprecated. Default to true always."); 1304 } 1305 useMultiWarn = false; 1306 } 1307 List<Op> zkOps = new LinkedList<>(); 1308 for (ZKUtilOp op : ops) { 1309 zkOps.add(toZooKeeperOp(zkw, op)); 1310 } 1311 try { 1312 zkw.getRecoverableZooKeeper().multi(zkOps); 1313 } catch (KeeperException ke) { 1314 switch (ke.code()) { 1315 case NODEEXISTS: 1316 case NONODE: 1317 case BADVERSION: 1318 case NOAUTH: 1319 case NOTEMPTY: 1320 // if we get an exception that could be solved by running sequentially 1321 // (and the client asked us to), then break out and run sequentially 1322 if (runSequentialOnMultiFailure) { 1323 LOG.info( 1324 "multi exception: {}; running operations sequentially " 1325 + "(runSequentialOnMultiFailure=true); {}", 1326 ke.toString(), ops.stream().map(o -> o.toString()).collect(Collectors.joining(","))); 1327 processSequentially(zkw, ops); 1328 break; 1329 } 1330 default: 1331 throw ke; 1332 } 1333 } catch (InterruptedException ie) { 1334 zkw.interruptedException(ie); 1335 } 1336 } 1337 1338 private static void processSequentially(ZKWatcher zkw, List<ZKUtilOp> ops) 1339 throws KeeperException, NoNodeException { 1340 for (ZKUtilOp op : ops) { 1341 if (op instanceof CreateAndFailSilent) { 1342 createAndFailSilent(zkw, (CreateAndFailSilent) op); 1343 } else if (op instanceof DeleteNodeFailSilent) { 1344 deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) op); 1345 } else if (op instanceof SetData) { 1346 setData(zkw, (SetData) op); 1347 } else { 1348 throw new UnsupportedOperationException( 1349 "Unexpected ZKUtilOp type: " + op.getClass().getName()); 1350 } 1351 } 1352 } 1353 1354 // 1355 // ZooKeeper cluster information 1356 // 1357 1358 private static void logRetrievedMsg(final ZKWatcher zkw, final String znode, final byte[] data, 1359 final boolean watcherSet) { 1360 if (!LOG.isTraceEnabled()) { 1361 return; 1362 } 1363 1364 LOG.trace(zkw.prefix("Retrieved " + ((data == null) ? 0 : data.length) 1365 + " byte(s) of data from znode " + znode + (watcherSet ? " and set watcher; " : "; data=") 1366 + (data == null ? "null" 1367 : data.length == 0 ? "empty" 1368 : (zkw.getZNodePaths().isMetaZNodePath(znode) ? getServerNameOrEmptyString(data) 1369 : znode.startsWith(zkw.getZNodePaths().backupMasterAddressesZNode) 1370 ? getServerNameOrEmptyString(data) 1371 : StringUtils.abbreviate(Bytes.toStringBinary(data), 32))))); 1372 } 1373 1374 private static String getServerNameOrEmptyString(final byte[] data) { 1375 try { 1376 return ProtobufUtil.parseServerNameFrom(data).toString(); 1377 } catch (DeserializationException e) { 1378 return ""; 1379 } 1380 } 1381 1382 /** 1383 * Waits for HBase installation's base (parent) znode to become available. 1384 * @throws IOException on ZK errors 1385 */ 1386 public static void waitForBaseZNode(Configuration conf) throws IOException { 1387 LOG.info("Waiting until the base znode is available"); 1388 String parentZNode = 1389 conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1390 ZooKeeper zk = new ZooKeeper(ZKConfig.getZKQuorumServersString(conf), 1391 conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT), 1392 EmptyWatcher.instance); 1393 1394 final int maxTimeMs = 10000; 1395 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 1396 1397 KeeperException keeperEx = null; 1398 try { 1399 try { 1400 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 1401 try { 1402 if (zk.exists(parentZNode, false) != null) { 1403 LOG.info("Parent znode exists: {}", parentZNode); 1404 keeperEx = null; 1405 break; 1406 } 1407 } catch (KeeperException e) { 1408 keeperEx = e; 1409 } 1410 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 1411 } 1412 } finally { 1413 zk.close(); 1414 } 1415 } catch (InterruptedException ex) { 1416 Thread.currentThread().interrupt(); 1417 } 1418 1419 if (keeperEx != null) { 1420 throw new IOException(keeperEx); 1421 } 1422 } 1423 1424 /** 1425 * Convert a {@link DeserializationException} to a more palatable {@link KeeperException}. Used 1426 * when can't let a {@link DeserializationException} out w/o changing public API. 1427 * @param e Exception to convert 1428 * @return Converted exception 1429 */ 1430 public static KeeperException convert(final DeserializationException e) { 1431 KeeperException ke = new KeeperException.DataInconsistencyException(); 1432 ke.initCause(e); 1433 return ke; 1434 } 1435 1436 /** 1437 * Recursively print the current state of ZK (non-transactional) 1438 * @param root name of the root directory in zk to print 1439 */ 1440 public static void logZKTree(ZKWatcher zkw, String root) { 1441 if (!LOG.isDebugEnabled()) { 1442 return; 1443 } 1444 1445 LOG.debug("Current zk system:"); 1446 String prefix = "|-"; 1447 LOG.debug(prefix + root); 1448 try { 1449 logZKTree(zkw, root, prefix); 1450 } catch (KeeperException e) { 1451 throw new RuntimeException(e); 1452 } 1453 } 1454 1455 /** 1456 * Helper method to print the current state of the ZK tree. 1457 * @see #logZKTree(ZKWatcher, String) 1458 * @throws KeeperException if an unexpected exception occurs 1459 */ 1460 private static void logZKTree(ZKWatcher zkw, String root, String prefix) throws KeeperException { 1461 List<String> children = ZKUtil.listChildrenNoWatch(zkw, root); 1462 1463 if (children == null) { 1464 return; 1465 } 1466 1467 for (String child : children) { 1468 LOG.debug(prefix + child); 1469 String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child); 1470 logZKTree(zkw, node, prefix + "---"); 1471 } 1472 } 1473 1474 /** 1475 * @param position the position to serialize 1476 * @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable 1477 * for use as content of an wal position in a replication queue. 1478 */ 1479 public static byte[] positionToByteArray(final long position) { 1480 byte[] bytes = ReplicationProtos.ReplicationHLogPosition.newBuilder().setPosition(position) 1481 .build().toByteArray(); 1482 return ProtobufUtil.prependPBMagic(bytes); 1483 } 1484 1485 /** 1486 * @param bytes - Content of a WAL position znode. 1487 * @return long - The current WAL position. 1488 * @throws DeserializationException if the WAL position cannot be parsed 1489 */ 1490 public static long parseWALPositionFrom(final byte[] bytes) throws DeserializationException { 1491 if (bytes == null) { 1492 throw new DeserializationException("Unable to parse null WAL position."); 1493 } 1494 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 1495 int pblen = ProtobufUtil.lengthOfPBMagic(); 1496 ReplicationProtos.ReplicationHLogPosition.Builder builder = 1497 ReplicationProtos.ReplicationHLogPosition.newBuilder(); 1498 ReplicationProtos.ReplicationHLogPosition position; 1499 try { 1500 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 1501 position = builder.build(); 1502 } catch (IOException e) { 1503 throw new DeserializationException(e); 1504 } 1505 return position.getPosition(); 1506 } else { 1507 if (bytes.length > 0) { 1508 return Bytes.toLong(bytes); 1509 } 1510 return 0; 1511 } 1512 } 1513}