001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.IOException; 022import java.lang.management.ManagementFactory; 023import java.util.ArrayList; 024import java.util.LinkedList; 025import java.util.List; 026 027import org.apache.hadoop.hbase.trace.TraceUtil; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.RetryCounter; 031import org.apache.hadoop.hbase.util.RetryCounterFactory; 032import org.apache.htrace.core.TraceScope; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.apache.zookeeper.AsyncCallback; 035import org.apache.zookeeper.CreateMode; 036import org.apache.zookeeper.KeeperException; 037import org.apache.zookeeper.Op; 038import org.apache.zookeeper.OpResult; 039import org.apache.zookeeper.Watcher; 040import org.apache.zookeeper.ZooDefs; 041import org.apache.zookeeper.ZooKeeper; 042import org.apache.zookeeper.ZooKeeper.States; 043import org.apache.zookeeper.data.ACL; 044import org.apache.zookeeper.data.Stat; 045import org.apache.zookeeper.proto.CreateRequest; 046import org.apache.zookeeper.proto.SetDataRequest; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A zookeeper that can handle 'recoverable' errors. 052 * To handle recoverable errors, developers need to realize that there are two 053 * classes of requests: idempotent and non-idempotent requests. Read requests 054 * and unconditional sets and deletes are examples of idempotent requests, they 055 * can be reissued with the same results. 056 * (Although, the delete may throw a NoNodeException on reissue its effect on 057 * the ZooKeeper state is the same.) Non-idempotent requests need special 058 * handling, application and library writers need to keep in mind that they may 059 * need to encode information in the data or name of znodes to detect 060 * retries. A simple example is a create that uses a sequence flag. 061 * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection 062 * loss exception, that process will reissue another 063 * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a 064 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be 065 * that x-109 was the result of the previous create, so the process actually 066 * owns both x-109 and x-111. An easy way around this is to use "x-process id-" 067 * when doing the create. If the process is using an id of 352, before reissuing 068 * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 069 * "x-352-109", x-333-110". The process will know that the original create 070 * succeeded an the znode it created is "x-352-109". 071 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" 072 */ 073@InterfaceAudience.Private 074public class RecoverableZooKeeper { 075 private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class); 076 // the actual ZooKeeper client instance 077 private ZooKeeper zk; 078 private final RetryCounterFactory retryCounterFactory; 079 // An identifier of this process in the cluster 080 private final String identifier; 081 private final byte[] id; 082 private final Watcher watcher; 083 private final int sessionTimeout; 084 private final String quorumServers; 085 086 public RecoverableZooKeeper(String quorumServers, int sessionTimeout, 087 Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime) 088 throws IOException { 089 this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime, 090 null); 091 } 092 093 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", 094 justification="None. Its always been this way.") 095 public RecoverableZooKeeper(String quorumServers, int sessionTimeout, 096 Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier) 097 throws IOException { 098 // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. 099 this.retryCounterFactory = 100 new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime); 101 102 if (identifier == null || identifier.length() == 0) { 103 // the identifier = processID@hostName 104 identifier = ManagementFactory.getRuntimeMXBean().getName(); 105 } 106 LOG.info("Process identifier=" + identifier + 107 " connecting to ZooKeeper ensemble=" + quorumServers); 108 this.identifier = identifier; 109 this.id = Bytes.toBytes(identifier); 110 111 this.watcher = watcher; 112 this.sessionTimeout = sessionTimeout; 113 this.quorumServers = quorumServers; 114 115 try { 116 checkZk(); 117 } catch (Exception x) { 118 /* ignore */ 119 } 120 } 121 122 /** 123 * Try to create a ZooKeeper connection. Turns any exception encountered into a 124 * KeeperException.OperationTimeoutException so it can retried. 125 * @return The created ZooKeeper connection object 126 * @throws KeeperException if a ZooKeeper operation fails 127 */ 128 protected synchronized ZooKeeper checkZk() throws KeeperException { 129 if (this.zk == null) { 130 try { 131 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); 132 } catch (IOException ex) { 133 LOG.warn("Unable to create ZooKeeper Connection", ex); 134 throw new KeeperException.OperationTimeoutException(); 135 } 136 } 137 return zk; 138 } 139 140 public synchronized void reconnectAfterExpiration() 141 throws IOException, KeeperException, InterruptedException { 142 if (zk != null) { 143 LOG.info("Closing dead ZooKeeper connection, session" + 144 " was: 0x"+Long.toHexString(zk.getSessionId())); 145 zk.close(); 146 // reset the ZooKeeper connection 147 zk = null; 148 } 149 checkZk(); 150 LOG.info("Recreated a ZooKeeper, session" + 151 " is: 0x"+Long.toHexString(zk.getSessionId())); 152 } 153 154 /** 155 * delete is an idempotent operation. Retry before throwing exception. 156 * This function will not throw NoNodeException if the path does not 157 * exist. 158 */ 159 public void delete(String path, int version) throws InterruptedException, KeeperException { 160 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) { 161 RetryCounter retryCounter = retryCounterFactory.create(); 162 boolean isRetry = false; // False for first attempt, true for all retries. 163 while (true) { 164 try { 165 long startTime = EnvironmentEdgeManager.currentTime(); 166 checkZk().delete(path, version); 167 return; 168 } catch (KeeperException e) { 169 switch (e.code()) { 170 case NONODE: 171 if (isRetry) { 172 LOG.debug("Node " + path + " already deleted. Assuming a " + 173 "previous attempt succeeded."); 174 return; 175 } 176 LOG.debug("Node " + path + " already deleted, retry=" + isRetry); 177 throw e; 178 179 case CONNECTIONLOSS: 180 retryOrThrow(retryCounter, e, "delete"); 181 break; 182 case OPERATIONTIMEOUT: 183 retryOrThrow(retryCounter, e, "delete"); 184 break; 185 186 default: 187 throw e; 188 } 189 } 190 retryCounter.sleepUntilNextRetry(); 191 isRetry = true; 192 } 193 } 194 } 195 196 /** 197 * exists is an idempotent operation. Retry before throwing exception 198 * @return A Stat instance 199 */ 200 public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { 201 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { 202 RetryCounter retryCounter = retryCounterFactory.create(); 203 while (true) { 204 try { 205 long startTime = EnvironmentEdgeManager.currentTime(); 206 Stat nodeStat = checkZk().exists(path, watcher); 207 return nodeStat; 208 } catch (KeeperException e) { 209 switch (e.code()) { 210 case CONNECTIONLOSS: 211 retryOrThrow(retryCounter, e, "exists"); 212 break; 213 case OPERATIONTIMEOUT: 214 retryOrThrow(retryCounter, e, "exists"); 215 break; 216 217 default: 218 throw e; 219 } 220 } 221 retryCounter.sleepUntilNextRetry(); 222 } 223 } 224 } 225 226 /** 227 * exists is an idempotent operation. Retry before throwing exception 228 * @return A Stat instance 229 */ 230 public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { 231 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { 232 RetryCounter retryCounter = retryCounterFactory.create(); 233 while (true) { 234 try { 235 long startTime = EnvironmentEdgeManager.currentTime(); 236 Stat nodeStat = checkZk().exists(path, watch); 237 return nodeStat; 238 } catch (KeeperException e) { 239 switch (e.code()) { 240 case CONNECTIONLOSS: 241 retryOrThrow(retryCounter, e, "exists"); 242 break; 243 case OPERATIONTIMEOUT: 244 retryOrThrow(retryCounter, e, "exists"); 245 break; 246 247 default: 248 throw e; 249 } 250 } 251 retryCounter.sleepUntilNextRetry(); 252 } 253 } 254 } 255 256 private void retryOrThrow(RetryCounter retryCounter, KeeperException e, 257 String opName) throws KeeperException { 258 if (!retryCounter.shouldRetry()) { 259 LOG.error("ZooKeeper " + opName + " failed after " 260 + retryCounter.getMaxAttempts() + " attempts"); 261 throw e; 262 } 263 LOG.debug("Retry, connectivity issue (JVM Pause?); quorum=" + quorumServers + "," + 264 "exception=" + e); 265 } 266 267 /** 268 * getChildren is an idempotent operation. Retry before throwing exception 269 * @return List of children znodes 270 */ 271 public List<String> getChildren(String path, Watcher watcher) 272 throws KeeperException, InterruptedException { 273 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { 274 RetryCounter retryCounter = retryCounterFactory.create(); 275 while (true) { 276 try { 277 long startTime = EnvironmentEdgeManager.currentTime(); 278 List<String> children = checkZk().getChildren(path, watcher); 279 return children; 280 } catch (KeeperException e) { 281 switch (e.code()) { 282 case CONNECTIONLOSS: 283 retryOrThrow(retryCounter, e, "getChildren"); 284 break; 285 case OPERATIONTIMEOUT: 286 retryOrThrow(retryCounter, e, "getChildren"); 287 break; 288 289 default: 290 throw e; 291 } 292 } 293 retryCounter.sleepUntilNextRetry(); 294 } 295 } 296 } 297 298 /** 299 * getChildren is an idempotent operation. Retry before throwing exception 300 * @return List of children znodes 301 */ 302 public List<String> getChildren(String path, boolean watch) 303 throws KeeperException, InterruptedException { 304 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { 305 RetryCounter retryCounter = retryCounterFactory.create(); 306 while (true) { 307 try { 308 long startTime = EnvironmentEdgeManager.currentTime(); 309 List<String> children = checkZk().getChildren(path, watch); 310 return children; 311 } catch (KeeperException e) { 312 switch (e.code()) { 313 case CONNECTIONLOSS: 314 retryOrThrow(retryCounter, e, "getChildren"); 315 break; 316 case OPERATIONTIMEOUT: 317 retryOrThrow(retryCounter, e, "getChildren"); 318 break; 319 320 default: 321 throw e; 322 } 323 } 324 retryCounter.sleepUntilNextRetry(); 325 } 326 } 327 } 328 329 /** 330 * getData is an idempotent operation. Retry before throwing exception 331 * @return Data 332 */ 333 public byte[] getData(String path, Watcher watcher, Stat stat) 334 throws KeeperException, InterruptedException { 335 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { 336 RetryCounter retryCounter = retryCounterFactory.create(); 337 while (true) { 338 try { 339 long startTime = EnvironmentEdgeManager.currentTime(); 340 byte[] revData = checkZk().getData(path, watcher, stat); 341 return ZKMetadata.removeMetaData(revData); 342 } catch (KeeperException e) { 343 switch (e.code()) { 344 case CONNECTIONLOSS: 345 retryOrThrow(retryCounter, e, "getData"); 346 break; 347 case OPERATIONTIMEOUT: 348 retryOrThrow(retryCounter, e, "getData"); 349 break; 350 351 default: 352 throw e; 353 } 354 } 355 retryCounter.sleepUntilNextRetry(); 356 } 357 } 358 } 359 360 /** 361 * getData is an idempotent operation. Retry before throwing exception 362 * @return Data 363 */ 364 public byte[] getData(String path, boolean watch, Stat stat) 365 throws KeeperException, InterruptedException { 366 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { 367 RetryCounter retryCounter = retryCounterFactory.create(); 368 while (true) { 369 try { 370 long startTime = EnvironmentEdgeManager.currentTime(); 371 byte[] revData = checkZk().getData(path, watch, stat); 372 return ZKMetadata.removeMetaData(revData); 373 } catch (KeeperException e) { 374 switch (e.code()) { 375 case CONNECTIONLOSS: 376 retryOrThrow(retryCounter, e, "getData"); 377 break; 378 case OPERATIONTIMEOUT: 379 retryOrThrow(retryCounter, e, "getData"); 380 break; 381 382 default: 383 throw e; 384 } 385 } 386 retryCounter.sleepUntilNextRetry(); 387 } 388 } 389 } 390 391 /** 392 * setData is NOT an idempotent operation. Retry may cause BadVersion Exception 393 * Adding an identifier field into the data to check whether 394 * badversion is caused by the result of previous correctly setData 395 * @return Stat instance 396 */ 397 public Stat setData(String path, byte[] data, int version) 398 throws KeeperException, InterruptedException { 399 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) { 400 RetryCounter retryCounter = retryCounterFactory.create(); 401 byte[] newData = ZKMetadata.appendMetaData(id, data); 402 boolean isRetry = false; 403 long startTime; 404 while (true) { 405 try { 406 startTime = EnvironmentEdgeManager.currentTime(); 407 Stat nodeStat = checkZk().setData(path, newData, version); 408 return nodeStat; 409 } catch (KeeperException e) { 410 switch (e.code()) { 411 case CONNECTIONLOSS: 412 retryOrThrow(retryCounter, e, "setData"); 413 break; 414 case OPERATIONTIMEOUT: 415 retryOrThrow(retryCounter, e, "setData"); 416 break; 417 case BADVERSION: 418 if (isRetry) { 419 // try to verify whether the previous setData success or not 420 try{ 421 Stat stat = new Stat(); 422 byte[] revData = checkZk().getData(path, false, stat); 423 if(Bytes.compareTo(revData, newData) == 0) { 424 // the bad version is caused by previous successful setData 425 return stat; 426 } 427 } catch(KeeperException keeperException){ 428 // the ZK is not reliable at this moment. just throwing exception 429 throw keeperException; 430 } 431 } 432 // throw other exceptions and verified bad version exceptions 433 default: 434 throw e; 435 } 436 } 437 retryCounter.sleepUntilNextRetry(); 438 isRetry = true; 439 } 440 } 441 } 442 443 /** 444 * getAcl is an idempotent operation. Retry before throwing exception 445 * @return list of ACLs 446 */ 447 public List<ACL> getAcl(String path, Stat stat) 448 throws KeeperException, InterruptedException { 449 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) { 450 RetryCounter retryCounter = retryCounterFactory.create(); 451 while (true) { 452 try { 453 long startTime = EnvironmentEdgeManager.currentTime(); 454 List<ACL> nodeACL = checkZk().getACL(path, stat); 455 return nodeACL; 456 } catch (KeeperException e) { 457 switch (e.code()) { 458 case CONNECTIONLOSS: 459 retryOrThrow(retryCounter, e, "getAcl"); 460 break; 461 case OPERATIONTIMEOUT: 462 retryOrThrow(retryCounter, e, "getAcl"); 463 break; 464 465 default: 466 throw e; 467 } 468 } 469 retryCounter.sleepUntilNextRetry(); 470 } 471 } 472 } 473 474 /** 475 * setAcl is an idempotent operation. Retry before throwing exception 476 * @return list of ACLs 477 */ 478 public Stat setAcl(String path, List<ACL> acls, int version) 479 throws KeeperException, InterruptedException { 480 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) { 481 RetryCounter retryCounter = retryCounterFactory.create(); 482 while (true) { 483 try { 484 long startTime = EnvironmentEdgeManager.currentTime(); 485 Stat nodeStat = checkZk().setACL(path, acls, version); 486 return nodeStat; 487 } catch (KeeperException e) { 488 switch (e.code()) { 489 case CONNECTIONLOSS: 490 retryOrThrow(retryCounter, e, "setAcl"); 491 break; 492 case OPERATIONTIMEOUT: 493 retryOrThrow(retryCounter, e, "setAcl"); 494 break; 495 496 default: 497 throw e; 498 } 499 } 500 retryCounter.sleepUntilNextRetry(); 501 } 502 } 503 } 504 505 /** 506 * <p> 507 * NONSEQUENTIAL create is idempotent operation. 508 * Retry before throwing exceptions. 509 * But this function will not throw the NodeExist exception back to the 510 * application. 511 * </p> 512 * <p> 513 * But SEQUENTIAL is NOT idempotent operation. It is necessary to add 514 * identifier to the path to verify, whether the previous one is successful 515 * or not. 516 * </p> 517 * 518 * @return Path 519 */ 520 public String create(String path, byte[] data, List<ACL> acl, 521 CreateMode createMode) 522 throws KeeperException, InterruptedException { 523 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) { 524 byte[] newData = ZKMetadata.appendMetaData(id, data); 525 switch (createMode) { 526 case EPHEMERAL: 527 case PERSISTENT: 528 return createNonSequential(path, newData, acl, createMode); 529 530 case EPHEMERAL_SEQUENTIAL: 531 case PERSISTENT_SEQUENTIAL: 532 return createSequential(path, newData, acl, createMode); 533 534 default: 535 throw new IllegalArgumentException("Unrecognized CreateMode: " + 536 createMode); 537 } 538 } 539 } 540 541 private String createNonSequential(String path, byte[] data, List<ACL> acl, 542 CreateMode createMode) throws KeeperException, InterruptedException { 543 RetryCounter retryCounter = retryCounterFactory.create(); 544 boolean isRetry = false; // False for first attempt, true for all retries. 545 long startTime; 546 while (true) { 547 try { 548 startTime = EnvironmentEdgeManager.currentTime(); 549 String nodePath = checkZk().create(path, data, acl, createMode); 550 return nodePath; 551 } catch (KeeperException e) { 552 switch (e.code()) { 553 case NODEEXISTS: 554 if (isRetry) { 555 // If the connection was lost, there is still a possibility that 556 // we have successfully created the node at our previous attempt, 557 // so we read the node and compare. 558 byte[] currentData = checkZk().getData(path, false, null); 559 if (currentData != null && 560 Bytes.compareTo(currentData, data) == 0) { 561 // We successfully created a non-sequential node 562 return path; 563 } 564 LOG.error("Node " + path + " already exists with " + 565 Bytes.toStringBinary(currentData) + ", could not write " + 566 Bytes.toStringBinary(data)); 567 throw e; 568 } 569 LOG.trace("Node {} already exists", path); 570 throw e; 571 572 case CONNECTIONLOSS: 573 retryOrThrow(retryCounter, e, "create"); 574 break; 575 case OPERATIONTIMEOUT: 576 retryOrThrow(retryCounter, e, "create"); 577 break; 578 579 default: 580 throw e; 581 } 582 } 583 retryCounter.sleepUntilNextRetry(); 584 isRetry = true; 585 } 586 } 587 588 private String createSequential(String path, byte[] data, 589 List<ACL> acl, CreateMode createMode) 590 throws KeeperException, InterruptedException { 591 RetryCounter retryCounter = retryCounterFactory.create(); 592 boolean first = true; 593 String newPath = path+this.identifier; 594 while (true) { 595 try { 596 if (!first) { 597 // Check if we succeeded on a previous attempt 598 String previousResult = findPreviousSequentialNode(newPath); 599 if (previousResult != null) { 600 return previousResult; 601 } 602 } 603 first = false; 604 long startTime = EnvironmentEdgeManager.currentTime(); 605 String nodePath = checkZk().create(newPath, data, acl, createMode); 606 return nodePath; 607 } catch (KeeperException e) { 608 switch (e.code()) { 609 case CONNECTIONLOSS: 610 retryOrThrow(retryCounter, e, "create"); 611 break; 612 case OPERATIONTIMEOUT: 613 retryOrThrow(retryCounter, e, "create"); 614 break; 615 616 default: 617 throw e; 618 } 619 } 620 retryCounter.sleepUntilNextRetry(); 621 } 622 } 623 /** 624 * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op 625 * instances to actually pass to multi (need to do this in order to appendMetaData). 626 */ 627 private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException { 628 if(ops == null) { 629 return null; 630 } 631 632 List<Op> preparedOps = new LinkedList<>(); 633 for (Op op : ops) { 634 if (op.getType() == ZooDefs.OpCode.create) { 635 CreateRequest create = (CreateRequest)op.toRequestRecord(); 636 preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()), 637 create.getAcl(), create.getFlags())); 638 } else if (op.getType() == ZooDefs.OpCode.delete) { 639 // no need to appendMetaData for delete 640 preparedOps.add(op); 641 } else if (op.getType() == ZooDefs.OpCode.setData) { 642 SetDataRequest setData = (SetDataRequest)op.toRequestRecord(); 643 preparedOps.add(Op.setData(setData.getPath(), 644 ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion())); 645 } else { 646 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName()); 647 } 648 } 649 return preparedOps; 650 } 651 652 /** 653 * Run multiple operations in a transactional manner. Retry before throwing exception 654 */ 655 public List<OpResult> multi(Iterable<Op> ops) 656 throws KeeperException, InterruptedException { 657 try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) { 658 RetryCounter retryCounter = retryCounterFactory.create(); 659 Iterable<Op> multiOps = prepareZKMulti(ops); 660 while (true) { 661 try { 662 long startTime = EnvironmentEdgeManager.currentTime(); 663 List<OpResult> opResults = checkZk().multi(multiOps); 664 return opResults; 665 } catch (KeeperException e) { 666 switch (e.code()) { 667 case CONNECTIONLOSS: 668 retryOrThrow(retryCounter, e, "multi"); 669 break; 670 case OPERATIONTIMEOUT: 671 retryOrThrow(retryCounter, e, "multi"); 672 break; 673 674 default: 675 throw e; 676 } 677 } 678 retryCounter.sleepUntilNextRetry(); 679 } 680 } 681 } 682 683 private String findPreviousSequentialNode(String path) 684 throws KeeperException, InterruptedException { 685 int lastSlashIdx = path.lastIndexOf('/'); 686 assert(lastSlashIdx != -1); 687 String parent = path.substring(0, lastSlashIdx); 688 String nodePrefix = path.substring(lastSlashIdx+1); 689 long startTime = EnvironmentEdgeManager.currentTime(); 690 List<String> nodes = checkZk().getChildren(parent, false); 691 List<String> matching = filterByPrefix(nodes, nodePrefix); 692 for (String node : matching) { 693 String nodePath = parent + "/" + node; 694 startTime = EnvironmentEdgeManager.currentTime(); 695 Stat stat = checkZk().exists(nodePath, false); 696 if (stat != null) { 697 return nodePath; 698 } 699 } 700 return null; 701 } 702 703 public synchronized long getSessionId() { 704 return zk == null ? -1 : zk.getSessionId(); 705 } 706 707 public synchronized void close() throws InterruptedException { 708 if (zk != null) { 709 zk.close(); 710 } 711 } 712 713 public synchronized States getState() { 714 return zk == null ? null : zk.getState(); 715 } 716 717 public synchronized ZooKeeper getZooKeeper() { 718 return zk; 719 } 720 721 public synchronized byte[] getSessionPasswd() { 722 return zk == null ? null : zk.getSessionPasswd(); 723 } 724 725 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { 726 checkZk().sync(path, cb, null); 727 } 728 729 /** 730 * Filters the given node list by the given prefixes. 731 * This method is all-inclusive--if any element in the node list starts 732 * with any of the given prefixes, then it is included in the result. 733 * 734 * @param nodes the nodes to filter 735 * @param prefixes the prefixes to include in the result 736 * @return list of every element that starts with one of the prefixes 737 */ 738 private static List<String> filterByPrefix(List<String> nodes, 739 String... prefixes) { 740 List<String> lockChildren = new ArrayList<>(); 741 for (String child : nodes){ 742 for (String prefix : prefixes){ 743 if (child.startsWith(prefix)){ 744 lockChildren.add(child); 745 break; 746 } 747 } 748 } 749 return lockChildren; 750 } 751 752 public String getIdentifier() { 753 return identifier; 754 } 755}