001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.IOException; 022import java.lang.management.ManagementFactory; 023import java.util.ArrayList; 024import java.util.LinkedList; 025import java.util.List; 026 027import org.apache.hadoop.hbase.trace.TraceUtil; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.RetryCounter; 031import org.apache.hadoop.hbase.util.RetryCounterFactory; 032import org.apache.htrace.core.TraceScope; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.apache.zookeeper.AsyncCallback; 035import org.apache.zookeeper.CreateMode; 036import org.apache.zookeeper.KeeperException; 037import org.apache.zookeeper.Op; 038import org.apache.zookeeper.OpResult; 039import org.apache.zookeeper.Watcher; 040import org.apache.zookeeper.ZooDefs; 041import org.apache.zookeeper.ZooKeeper; 042import org.apache.zookeeper.ZooKeeper.States; 043import org.apache.zookeeper.data.ACL; 044import org.apache.zookeeper.data.Stat; 045import org.apache.zookeeper.proto.CreateRequest; 046import org.apache.zookeeper.proto.SetDataRequest; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A zookeeper that can handle 'recoverable' errors. 052 * To handle recoverable errors, developers need to realize that there are two 053 * classes of requests: idempotent and non-idempotent requests. Read requests 054 * and unconditional sets and deletes are examples of idempotent requests, they 055 * can be reissued with the same results. 056 * (Although, the delete may throw a NoNodeException on reissue its effect on 057 * the ZooKeeper state is the same.) Non-idempotent requests need special 058 * handling, application and library writers need to keep in mind that they may 059 * need to encode information in the data or name of znodes to detect 060 * retries. A simple example is a create that uses a sequence flag. 061 * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection 062 * loss exception, that process will reissue another 063 * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a 064 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be 065 * that x-109 was the result of the previous create, so the process actually 066 * owns both x-109 and x-111. An easy way around this is to use "x-process id-" 067 * when doing the create. If the process is using an id of 352, before reissuing 068 * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 069 * "x-352-109", x-333-110". The process will know that the original create 070 * succeeded an the znode it created is "x-352-109". 071 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" 072 */ 073@InterfaceAudience.Private 074public class RecoverableZooKeeper { 075 private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class); 076 // the actual ZooKeeper client instance 077 private ZooKeeper zk; 078 private final RetryCounterFactory retryCounterFactory; 079 // An identifier of this process in the cluster 080 private final String identifier; 081 private final byte[] id; 082 private final Watcher watcher; 083 private final int sessionTimeout; 084 private final String quorumServers; 085 private final int maxMultiSize; 086 087 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", 088 justification="None. Its always been this way.") 089 public RecoverableZooKeeper(String quorumServers, int sessionTimeout, 090 Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, 091 int maxMultiSize) 092 throws IOException { 093 // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. 094 this.retryCounterFactory = 095 new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime); 096 097 if (identifier == null || identifier.length() == 0) { 098 // the identifier = processID@hostName 099 identifier = ManagementFactory.getRuntimeMXBean().getName(); 100 } 101 LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier, 102 quorumServers); 103 this.identifier = identifier; 104 this.id = Bytes.toBytes(identifier); 105 106 this.watcher = watcher; 107 this.sessionTimeout = sessionTimeout; 108 this.quorumServers = quorumServers; 109 this.maxMultiSize = maxMultiSize; 110 111 try { 112 checkZk(); 113 } catch (Exception x) { 114 /* ignore */ 115 } 116 } 117 118 /** 119 * Returns the maximum size (in bytes) that should be included in any single multi() call. 120 * 121 * NB: This is an approximation, so there may be variance in the msg actually sent over the 122 * wire. Please be sure to set this approximately, with respect to your ZK server configuration 123 * for jute.maxbuffer. 124 */ 125 public int getMaxMultiSizeLimit() { 126 return maxMultiSize; 127 } 128 129 /** 130 * Try to create a ZooKeeper connection. Turns any exception encountered into a 131 * KeeperException.OperationTimeoutException so it can retried. 132 * @return The created ZooKeeper connection object 133 * @throws KeeperException if a ZooKeeper operation fails 134 */ 135 protected synchronized ZooKeeper checkZk() throws KeeperException { 136 if (this.zk == null) { 137 try { 138 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); 139 } catch (IOException ex) { 140 LOG.warn("Unable to create ZooKeeper Connection", ex); 141 throw new KeeperException.OperationTimeoutException(); 142 } 143 } 144 return zk; 145 } 146 147 public synchronized void reconnectAfterExpiration() 148 throws IOException, KeeperException, InterruptedException { 149 if (zk != null) { 150 LOG.info("Closing dead ZooKeeper connection, session" + 151 " was: 0x"+Long.toHexString(zk.getSessionId())); 152 zk.close(); 153 // reset the ZooKeeper connection 154 zk = null; 155 } 156 checkZk(); 157 LOG.info("Recreated a ZooKeeper, session" + 158 " is: 0x"+Long.toHexString(zk.getSessionId())); 159 } 160 161 /** 162 * delete is an idempotent operation. Retry before throwing exception. 163 * This function will not throw NoNodeException if the path does not 164 * exist. 165 */ 166 public void delete(String path, int version) throws InterruptedException, KeeperException { 167 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) { 168 RetryCounter retryCounter = retryCounterFactory.create(); 169 boolean isRetry = false; // False for first attempt, true for all retries. 170 while (true) { 171 try { 172 long startTime = EnvironmentEdgeManager.currentTime(); 173 checkZk().delete(path, version); 174 return; 175 } catch (KeeperException e) { 176 switch (e.code()) { 177 case NONODE: 178 if (isRetry) { 179 LOG.debug("Node " + path + " already deleted. Assuming a " + 180 "previous attempt succeeded."); 181 return; 182 } 183 LOG.debug("Node {} already deleted, retry={}", path, isRetry); 184 throw e; 185 186 case CONNECTIONLOSS: 187 retryOrThrow(retryCounter, e, "delete"); 188 break; 189 case OPERATIONTIMEOUT: 190 retryOrThrow(retryCounter, e, "delete"); 191 break; 192 193 default: 194 throw e; 195 } 196 } 197 retryCounter.sleepUntilNextRetry(); 198 isRetry = true; 199 } 200 } 201 } 202 203 /** 204 * exists is an idempotent operation. Retry before throwing exception 205 * @return A Stat instance 206 */ 207 public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { 208 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { 209 RetryCounter retryCounter = retryCounterFactory.create(); 210 while (true) { 211 try { 212 long startTime = EnvironmentEdgeManager.currentTime(); 213 Stat nodeStat = checkZk().exists(path, watcher); 214 return nodeStat; 215 } catch (KeeperException e) { 216 switch (e.code()) { 217 case CONNECTIONLOSS: 218 retryOrThrow(retryCounter, e, "exists"); 219 break; 220 case OPERATIONTIMEOUT: 221 retryOrThrow(retryCounter, e, "exists"); 222 break; 223 224 default: 225 throw e; 226 } 227 } 228 retryCounter.sleepUntilNextRetry(); 229 } 230 } 231 } 232 233 /** 234 * exists is an idempotent operation. Retry before throwing exception 235 * @return A Stat instance 236 */ 237 public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { 238 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { 239 RetryCounter retryCounter = retryCounterFactory.create(); 240 while (true) { 241 try { 242 long startTime = EnvironmentEdgeManager.currentTime(); 243 Stat nodeStat = checkZk().exists(path, watch); 244 return nodeStat; 245 } catch (KeeperException e) { 246 switch (e.code()) { 247 case CONNECTIONLOSS: 248 retryOrThrow(retryCounter, e, "exists"); 249 break; 250 case OPERATIONTIMEOUT: 251 retryOrThrow(retryCounter, e, "exists"); 252 break; 253 254 default: 255 throw e; 256 } 257 } 258 retryCounter.sleepUntilNextRetry(); 259 } 260 } 261 } 262 263 private void retryOrThrow(RetryCounter retryCounter, KeeperException e, 264 String opName) throws KeeperException { 265 if (!retryCounter.shouldRetry()) { 266 LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts()); 267 throw e; 268 } 269 LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e); 270 } 271 272 /** 273 * getChildren is an idempotent operation. Retry before throwing exception 274 * @return List of children znodes 275 */ 276 public List<String> getChildren(String path, Watcher watcher) 277 throws KeeperException, InterruptedException { 278 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { 279 RetryCounter retryCounter = retryCounterFactory.create(); 280 while (true) { 281 try { 282 long startTime = EnvironmentEdgeManager.currentTime(); 283 List<String> children = checkZk().getChildren(path, watcher); 284 return children; 285 } catch (KeeperException e) { 286 switch (e.code()) { 287 case CONNECTIONLOSS: 288 retryOrThrow(retryCounter, e, "getChildren"); 289 break; 290 case OPERATIONTIMEOUT: 291 retryOrThrow(retryCounter, e, "getChildren"); 292 break; 293 294 default: 295 throw e; 296 } 297 } 298 retryCounter.sleepUntilNextRetry(); 299 } 300 } 301 } 302 303 /** 304 * getChildren is an idempotent operation. Retry before throwing exception 305 * @return List of children znodes 306 */ 307 public List<String> getChildren(String path, boolean watch) 308 throws KeeperException, InterruptedException { 309 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { 310 RetryCounter retryCounter = retryCounterFactory.create(); 311 while (true) { 312 try { 313 long startTime = EnvironmentEdgeManager.currentTime(); 314 List<String> children = checkZk().getChildren(path, watch); 315 return children; 316 } catch (KeeperException e) { 317 switch (e.code()) { 318 case CONNECTIONLOSS: 319 retryOrThrow(retryCounter, e, "getChildren"); 320 break; 321 case OPERATIONTIMEOUT: 322 retryOrThrow(retryCounter, e, "getChildren"); 323 break; 324 325 default: 326 throw e; 327 } 328 } 329 retryCounter.sleepUntilNextRetry(); 330 } 331 } 332 } 333 334 /** 335 * getData is an idempotent operation. Retry before throwing exception 336 * @return Data 337 */ 338 public byte[] getData(String path, Watcher watcher, Stat stat) 339 throws KeeperException, InterruptedException { 340 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { 341 RetryCounter retryCounter = retryCounterFactory.create(); 342 while (true) { 343 try { 344 long startTime = EnvironmentEdgeManager.currentTime(); 345 byte[] revData = checkZk().getData(path, watcher, stat); 346 return ZKMetadata.removeMetaData(revData); 347 } catch (KeeperException e) { 348 switch (e.code()) { 349 case CONNECTIONLOSS: 350 retryOrThrow(retryCounter, e, "getData"); 351 break; 352 case OPERATIONTIMEOUT: 353 retryOrThrow(retryCounter, e, "getData"); 354 break; 355 356 default: 357 throw e; 358 } 359 } 360 retryCounter.sleepUntilNextRetry(); 361 } 362 } 363 } 364 365 /** 366 * getData is an idempotent operation. Retry before throwing exception 367 * @return Data 368 */ 369 public byte[] getData(String path, boolean watch, Stat stat) 370 throws KeeperException, InterruptedException { 371 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { 372 RetryCounter retryCounter = retryCounterFactory.create(); 373 while (true) { 374 try { 375 long startTime = EnvironmentEdgeManager.currentTime(); 376 byte[] revData = checkZk().getData(path, watch, stat); 377 return ZKMetadata.removeMetaData(revData); 378 } catch (KeeperException e) { 379 switch (e.code()) { 380 case CONNECTIONLOSS: 381 retryOrThrow(retryCounter, e, "getData"); 382 break; 383 case OPERATIONTIMEOUT: 384 retryOrThrow(retryCounter, e, "getData"); 385 break; 386 387 default: 388 throw e; 389 } 390 } 391 retryCounter.sleepUntilNextRetry(); 392 } 393 } 394 } 395 396 /** 397 * setData is NOT an idempotent operation. Retry may cause BadVersion Exception 398 * Adding an identifier field into the data to check whether 399 * badversion is caused by the result of previous correctly setData 400 * @return Stat instance 401 */ 402 public Stat setData(String path, byte[] data, int version) 403 throws KeeperException, InterruptedException { 404 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) { 405 RetryCounter retryCounter = retryCounterFactory.create(); 406 byte[] newData = ZKMetadata.appendMetaData(id, data); 407 boolean isRetry = false; 408 long startTime; 409 while (true) { 410 try { 411 startTime = EnvironmentEdgeManager.currentTime(); 412 Stat nodeStat = checkZk().setData(path, newData, version); 413 return nodeStat; 414 } catch (KeeperException e) { 415 switch (e.code()) { 416 case CONNECTIONLOSS: 417 retryOrThrow(retryCounter, e, "setData"); 418 break; 419 case OPERATIONTIMEOUT: 420 retryOrThrow(retryCounter, e, "setData"); 421 break; 422 case BADVERSION: 423 if (isRetry) { 424 // try to verify whether the previous setData success or not 425 try{ 426 Stat stat = new Stat(); 427 byte[] revData = checkZk().getData(path, false, stat); 428 if(Bytes.compareTo(revData, newData) == 0) { 429 // the bad version is caused by previous successful setData 430 return stat; 431 } 432 } catch(KeeperException keeperException){ 433 // the ZK is not reliable at this moment. just throwing exception 434 throw keeperException; 435 } 436 } 437 // throw other exceptions and verified bad version exceptions 438 default: 439 throw e; 440 } 441 } 442 retryCounter.sleepUntilNextRetry(); 443 isRetry = true; 444 } 445 } 446 } 447 448 /** 449 * getAcl is an idempotent operation. Retry before throwing exception 450 * @return list of ACLs 451 */ 452 public List<ACL> getAcl(String path, Stat stat) 453 throws KeeperException, InterruptedException { 454 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) { 455 RetryCounter retryCounter = retryCounterFactory.create(); 456 while (true) { 457 try { 458 long startTime = EnvironmentEdgeManager.currentTime(); 459 List<ACL> nodeACL = checkZk().getACL(path, stat); 460 return nodeACL; 461 } catch (KeeperException e) { 462 switch (e.code()) { 463 case CONNECTIONLOSS: 464 retryOrThrow(retryCounter, e, "getAcl"); 465 break; 466 case OPERATIONTIMEOUT: 467 retryOrThrow(retryCounter, e, "getAcl"); 468 break; 469 470 default: 471 throw e; 472 } 473 } 474 retryCounter.sleepUntilNextRetry(); 475 } 476 } 477 } 478 479 /** 480 * setAcl is an idempotent operation. Retry before throwing exception 481 * @return list of ACLs 482 */ 483 public Stat setAcl(String path, List<ACL> acls, int version) 484 throws KeeperException, InterruptedException { 485 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) { 486 RetryCounter retryCounter = retryCounterFactory.create(); 487 while (true) { 488 try { 489 long startTime = EnvironmentEdgeManager.currentTime(); 490 Stat nodeStat = checkZk().setACL(path, acls, version); 491 return nodeStat; 492 } catch (KeeperException e) { 493 switch (e.code()) { 494 case CONNECTIONLOSS: 495 retryOrThrow(retryCounter, e, "setAcl"); 496 break; 497 case OPERATIONTIMEOUT: 498 retryOrThrow(retryCounter, e, "setAcl"); 499 break; 500 501 default: 502 throw e; 503 } 504 } 505 retryCounter.sleepUntilNextRetry(); 506 } 507 } 508 } 509 510 /** 511 * <p> 512 * NONSEQUENTIAL create is idempotent operation. 513 * Retry before throwing exceptions. 514 * But this function will not throw the NodeExist exception back to the 515 * application. 516 * </p> 517 * <p> 518 * But SEQUENTIAL is NOT idempotent operation. It is necessary to add 519 * identifier to the path to verify, whether the previous one is successful 520 * or not. 521 * </p> 522 * 523 * @return Path 524 */ 525 public String create(String path, byte[] data, List<ACL> acl, 526 CreateMode createMode) 527 throws KeeperException, InterruptedException { 528 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) { 529 byte[] newData = ZKMetadata.appendMetaData(id, data); 530 switch (createMode) { 531 case EPHEMERAL: 532 case PERSISTENT: 533 return createNonSequential(path, newData, acl, createMode); 534 535 case EPHEMERAL_SEQUENTIAL: 536 case PERSISTENT_SEQUENTIAL: 537 return createSequential(path, newData, acl, createMode); 538 539 default: 540 throw new IllegalArgumentException("Unrecognized CreateMode: " + 541 createMode); 542 } 543 } 544 } 545 546 private String createNonSequential(String path, byte[] data, List<ACL> acl, 547 CreateMode createMode) throws KeeperException, InterruptedException { 548 RetryCounter retryCounter = retryCounterFactory.create(); 549 boolean isRetry = false; // False for first attempt, true for all retries. 550 long startTime; 551 while (true) { 552 try { 553 startTime = EnvironmentEdgeManager.currentTime(); 554 String nodePath = checkZk().create(path, data, acl, createMode); 555 return nodePath; 556 } catch (KeeperException e) { 557 switch (e.code()) { 558 case NODEEXISTS: 559 if (isRetry) { 560 // If the connection was lost, there is still a possibility that 561 // we have successfully created the node at our previous attempt, 562 // so we read the node and compare. 563 byte[] currentData = checkZk().getData(path, false, null); 564 if (currentData != null && 565 Bytes.compareTo(currentData, data) == 0) { 566 // We successfully created a non-sequential node 567 return path; 568 } 569 LOG.error("Node " + path + " already exists with " + 570 Bytes.toStringBinary(currentData) + ", could not write " + 571 Bytes.toStringBinary(data)); 572 throw e; 573 } 574 LOG.trace("Node {} already exists", path); 575 throw e; 576 577 case CONNECTIONLOSS: 578 retryOrThrow(retryCounter, e, "create"); 579 break; 580 case OPERATIONTIMEOUT: 581 retryOrThrow(retryCounter, e, "create"); 582 break; 583 584 default: 585 throw e; 586 } 587 } 588 retryCounter.sleepUntilNextRetry(); 589 isRetry = true; 590 } 591 } 592 593 private String createSequential(String path, byte[] data, 594 List<ACL> acl, CreateMode createMode) 595 throws KeeperException, InterruptedException { 596 RetryCounter retryCounter = retryCounterFactory.create(); 597 boolean first = true; 598 String newPath = path+this.identifier; 599 while (true) { 600 try { 601 if (!first) { 602 // Check if we succeeded on a previous attempt 603 String previousResult = findPreviousSequentialNode(newPath); 604 if (previousResult != null) { 605 return previousResult; 606 } 607 } 608 first = false; 609 long startTime = EnvironmentEdgeManager.currentTime(); 610 String nodePath = checkZk().create(newPath, data, acl, createMode); 611 return nodePath; 612 } catch (KeeperException e) { 613 switch (e.code()) { 614 case CONNECTIONLOSS: 615 retryOrThrow(retryCounter, e, "create"); 616 break; 617 case OPERATIONTIMEOUT: 618 retryOrThrow(retryCounter, e, "create"); 619 break; 620 621 default: 622 throw e; 623 } 624 } 625 retryCounter.sleepUntilNextRetry(); 626 } 627 } 628 /** 629 * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op 630 * instances to actually pass to multi (need to do this in order to appendMetaData). 631 */ 632 private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException { 633 if(ops == null) { 634 return null; 635 } 636 637 List<Op> preparedOps = new LinkedList<>(); 638 for (Op op : ops) { 639 if (op.getType() == ZooDefs.OpCode.create) { 640 CreateRequest create = (CreateRequest)op.toRequestRecord(); 641 preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()), 642 create.getAcl(), create.getFlags())); 643 } else if (op.getType() == ZooDefs.OpCode.delete) { 644 // no need to appendMetaData for delete 645 preparedOps.add(op); 646 } else if (op.getType() == ZooDefs.OpCode.setData) { 647 SetDataRequest setData = (SetDataRequest)op.toRequestRecord(); 648 preparedOps.add(Op.setData(setData.getPath(), 649 ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion())); 650 } else { 651 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName()); 652 } 653 } 654 return preparedOps; 655 } 656 657 /** 658 * Run multiple operations in a transactional manner. Retry before throwing exception 659 */ 660 public List<OpResult> multi(Iterable<Op> ops) 661 throws KeeperException, InterruptedException { 662 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) { 663 RetryCounter retryCounter = retryCounterFactory.create(); 664 Iterable<Op> multiOps = prepareZKMulti(ops); 665 while (true) { 666 try { 667 long startTime = EnvironmentEdgeManager.currentTime(); 668 List<OpResult> opResults = checkZk().multi(multiOps); 669 return opResults; 670 } catch (KeeperException e) { 671 switch (e.code()) { 672 case CONNECTIONLOSS: 673 retryOrThrow(retryCounter, e, "multi"); 674 break; 675 case OPERATIONTIMEOUT: 676 retryOrThrow(retryCounter, e, "multi"); 677 break; 678 679 default: 680 throw e; 681 } 682 } 683 retryCounter.sleepUntilNextRetry(); 684 } 685 } 686 } 687 688 private String findPreviousSequentialNode(String path) 689 throws KeeperException, InterruptedException { 690 int lastSlashIdx = path.lastIndexOf('/'); 691 assert(lastSlashIdx != -1); 692 String parent = path.substring(0, lastSlashIdx); 693 String nodePrefix = path.substring(lastSlashIdx+1); 694 long startTime = EnvironmentEdgeManager.currentTime(); 695 List<String> nodes = checkZk().getChildren(parent, false); 696 List<String> matching = filterByPrefix(nodes, nodePrefix); 697 for (String node : matching) { 698 String nodePath = parent + "/" + node; 699 startTime = EnvironmentEdgeManager.currentTime(); 700 Stat stat = checkZk().exists(nodePath, false); 701 if (stat != null) { 702 return nodePath; 703 } 704 } 705 return null; 706 } 707 708 public synchronized long getSessionId() { 709 return zk == null ? -1 : zk.getSessionId(); 710 } 711 712 public synchronized void close() throws InterruptedException { 713 if (zk != null) { 714 zk.close(); 715 } 716 } 717 718 public synchronized States getState() { 719 return zk == null ? null : zk.getState(); 720 } 721 722 public synchronized ZooKeeper getZooKeeper() { 723 return zk; 724 } 725 726 public synchronized byte[] getSessionPasswd() { 727 return zk == null ? null : zk.getSessionPasswd(); 728 } 729 730 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { 731 checkZk().sync(path, cb, ctx); 732 } 733 734 /** 735 * Filters the given node list by the given prefixes. 736 * This method is all-inclusive--if any element in the node list starts 737 * with any of the given prefixes, then it is included in the result. 738 * 739 * @param nodes the nodes to filter 740 * @param prefixes the prefixes to include in the result 741 * @return list of every element that starts with one of the prefixes 742 */ 743 private static List<String> filterByPrefix(List<String> nodes, 744 String... prefixes) { 745 List<String> lockChildren = new ArrayList<>(); 746 for (String child : nodes){ 747 for (String prefix : prefixes){ 748 if (child.startsWith(prefix)){ 749 lockChildren.add(child); 750 break; 751 } 752 } 753 } 754 return lockChildren; 755 } 756 757 public String getIdentifier() { 758 return identifier; 759 } 760}