001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.io.IOException; 024import java.lang.management.ManagementFactory; 025import java.util.ArrayList; 026import java.util.LinkedList; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.trace.TraceUtil; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.apache.hadoop.hbase.util.RetryCounter; 034import org.apache.hadoop.hbase.util.RetryCounterFactory; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.zookeeper.AsyncCallback; 037import org.apache.zookeeper.CreateMode; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.Op; 040import org.apache.zookeeper.OpResult; 041import org.apache.zookeeper.Watcher; 042import org.apache.zookeeper.ZooDefs; 043import org.apache.zookeeper.ZooKeeper; 044import org.apache.zookeeper.ZooKeeper.States; 045import org.apache.zookeeper.data.ACL; 046import org.apache.zookeeper.data.Stat; 047import org.apache.zookeeper.proto.CreateRequest; 048import org.apache.zookeeper.proto.SetDataRequest; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need 054 * to realize that there are two classes of requests: idempotent and non-idempotent requests. Read 055 * requests and unconditional sets and deletes are examples of idempotent requests, they can be 056 * reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its 057 * effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling, 058 * application and library writers need to keep in mind that they may need to encode information in 059 * the data or name of znodes to detect retries. A simple example is a create that uses a sequence 060 * flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception, 061 * that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the 062 * process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 063 * was the result of the previous create, so the process actually owns both x-109 and x-111. An easy 064 * way around this is to use "x-process id-" when doing the create. If the process is using an id of 065 * 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 066 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it 067 * created is "x-352-109". 068 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" 069 */ 070@InterfaceAudience.Private 071public class RecoverableZooKeeper { 072 private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class); 073 // the actual ZooKeeper client instance 074 private ZooKeeper zk; 075 private final RetryCounterFactory retryCounterFactory; 076 // An identifier of this process in the cluster 077 private final String identifier; 078 private final byte[] id; 079 private final Watcher watcher; 080 private final int sessionTimeout; 081 private final String quorumServers; 082 private final int maxMultiSize; 083 084 /** 085 * See {@link #connect(Configuration, String, Watcher, String)} 086 */ 087 public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) 088 throws IOException { 089 String ensemble = ZKConfig.getZKQuorumServersString(conf); 090 return connect(conf, ensemble, watcher); 091 } 092 093 /** 094 * See {@link #connect(Configuration, String, Watcher, String)} 095 */ 096 public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher) 097 throws IOException { 098 return connect(conf, ensemble, watcher, null); 099 } 100 101 /** 102 * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified 103 * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring 104 * watcher to the specified watcher. 105 * @param conf configuration to pull ensemble and other settings from 106 * @param watcher watcher to monitor connection changes 107 * @param ensemble ZooKeeper servers quorum string 108 * @param identifier value used to identify this client instance. 109 * @return connection to zookeeper 110 * @throws IOException if unable to connect to zk or config problem 111 */ 112 public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, 113 final String identifier) throws IOException { 114 if (ensemble == null) { 115 throw new IOException("Unable to determine ZooKeeper ensemble"); 116 } 117 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 118 if (LOG.isTraceEnabled()) { 119 LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble); 120 } 121 int retry = conf.getInt("zookeeper.recovery.retry", 3); 122 int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); 123 int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); 124 int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024); 125 return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis, 126 maxSleepTime, identifier, multiMaxSize); 127 } 128 129 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", 130 justification = "None. Its always been this way.") 131 public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, 132 int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize) 133 throws IOException { 134 // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. 135 this.retryCounterFactory = 136 new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime); 137 138 if (identifier == null || identifier.length() == 0) { 139 // the identifier = processID@hostName 140 identifier = ManagementFactory.getRuntimeMXBean().getName(); 141 } 142 LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier, 143 quorumServers); 144 this.identifier = identifier; 145 this.id = Bytes.toBytes(identifier); 146 147 this.watcher = watcher; 148 this.sessionTimeout = sessionTimeout; 149 this.quorumServers = quorumServers; 150 this.maxMultiSize = maxMultiSize; 151 152 try { 153 checkZk(); 154 } catch (Exception x) { 155 /* ignore */ 156 } 157 } 158 159 /** 160 * Returns the maximum size (in bytes) that should be included in any single multi() call. NB: 161 * This is an approximation, so there may be variance in the msg actually sent over the wire. 162 * Please be sure to set this approximately, with respect to your ZK server configuration for 163 * jute.maxbuffer. 164 */ 165 public int getMaxMultiSizeLimit() { 166 return maxMultiSize; 167 } 168 169 /** 170 * Try to create a ZooKeeper connection. Turns any exception encountered into a 171 * KeeperException.OperationTimeoutException so it can retried. 172 * @return The created ZooKeeper connection object 173 * @throws KeeperException if a ZooKeeper operation fails 174 */ 175 protected synchronized ZooKeeper checkZk() throws KeeperException { 176 if (this.zk == null) { 177 try { 178 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); 179 } catch (IOException ex) { 180 LOG.warn("Unable to create ZooKeeper Connection", ex); 181 throw new KeeperException.OperationTimeoutException(); 182 } 183 } 184 return zk; 185 } 186 187 public synchronized void reconnectAfterExpiration() 188 throws IOException, KeeperException, InterruptedException { 189 if (zk != null) { 190 LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x" 191 + Long.toHexString(zk.getSessionId())); 192 zk.close(); 193 // reset the ZooKeeper connection 194 zk = null; 195 } 196 checkZk(); 197 LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId())); 198 } 199 200 /** 201 * delete is an idempotent operation. Retry before throwing exception. This function will not 202 * throw NoNodeException if the path does not exist. 203 */ 204 public void delete(String path, int version) throws InterruptedException, KeeperException { 205 final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete"); 206 try (Scope ignored = span.makeCurrent()) { 207 RetryCounter retryCounter = retryCounterFactory.create(); 208 boolean isRetry = false; // False for first attempt, true for all retries. 209 while (true) { 210 try { 211 long startTime = EnvironmentEdgeManager.currentTime(); 212 checkZk().delete(path, version); 213 span.setStatus(StatusCode.OK); 214 return; 215 } catch (KeeperException e) { 216 switch (e.code()) { 217 case NONODE: 218 if (isRetry) { 219 LOG.debug( 220 "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded."); 221 span.setStatus(StatusCode.OK); 222 return; 223 } 224 LOG.debug("Node {} already deleted, retry={}", path, isRetry); 225 TraceUtil.setError(span, e); 226 throw e; 227 228 case CONNECTIONLOSS: 229 case OPERATIONTIMEOUT: 230 case REQUESTTIMEOUT: 231 TraceUtil.setError(span, e); 232 retryOrThrow(retryCounter, e, "delete"); 233 break; 234 235 default: 236 TraceUtil.setError(span, e); 237 throw e; 238 } 239 } 240 retryCounter.sleepUntilNextRetry(); 241 isRetry = true; 242 } 243 } finally { 244 span.end(); 245 } 246 } 247 248 /** 249 * exists is an idempotent operation. Retry before throwing exception 250 * @return A Stat instance 251 */ 252 public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { 253 final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); 254 try (Scope ignored = span.makeCurrent()) { 255 RetryCounter retryCounter = retryCounterFactory.create(); 256 while (true) { 257 try { 258 long startTime = EnvironmentEdgeManager.currentTime(); 259 Stat nodeStat = checkZk().exists(path, watcher); 260 span.setStatus(StatusCode.OK); 261 return nodeStat; 262 } catch (KeeperException e) { 263 switch (e.code()) { 264 case CONNECTIONLOSS: 265 case OPERATIONTIMEOUT: 266 case REQUESTTIMEOUT: 267 TraceUtil.setError(span, e); 268 retryOrThrow(retryCounter, e, "exists"); 269 break; 270 271 default: 272 TraceUtil.setError(span, e); 273 throw e; 274 } 275 } 276 retryCounter.sleepUntilNextRetry(); 277 } 278 } finally { 279 span.end(); 280 } 281 } 282 283 /** 284 * exists is an idempotent operation. Retry before throwing exception 285 * @return A Stat instance 286 */ 287 public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { 288 Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); 289 try (Scope ignored = span.makeCurrent()) { 290 RetryCounter retryCounter = retryCounterFactory.create(); 291 while (true) { 292 try { 293 long startTime = EnvironmentEdgeManager.currentTime(); 294 Stat nodeStat = checkZk().exists(path, watch); 295 span.setStatus(StatusCode.OK); 296 return nodeStat; 297 } catch (KeeperException e) { 298 switch (e.code()) { 299 case CONNECTIONLOSS: 300 TraceUtil.setError(span, e); 301 retryOrThrow(retryCounter, e, "exists"); 302 break; 303 case OPERATIONTIMEOUT: 304 TraceUtil.setError(span, e); 305 retryOrThrow(retryCounter, e, "exists"); 306 break; 307 308 default: 309 TraceUtil.setError(span, e); 310 throw e; 311 } 312 } 313 retryCounter.sleepUntilNextRetry(); 314 } 315 } finally { 316 span.end(); 317 } 318 } 319 320 private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName) 321 throws KeeperException { 322 if (!retryCounter.shouldRetry()) { 323 LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts()); 324 throw e; 325 } 326 LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e); 327 } 328 329 /** 330 * getChildren is an idempotent operation. Retry before throwing exception 331 * @return List of children znodes 332 */ 333 public List<String> getChildren(String path, Watcher watcher) 334 throws KeeperException, InterruptedException { 335 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); 336 try (Scope ignored = span.makeCurrent()) { 337 RetryCounter retryCounter = retryCounterFactory.create(); 338 while (true) { 339 try { 340 long startTime = EnvironmentEdgeManager.currentTime(); 341 List<String> children = checkZk().getChildren(path, watcher); 342 span.setStatus(StatusCode.OK); 343 return children; 344 } catch (KeeperException e) { 345 switch (e.code()) { 346 case CONNECTIONLOSS: 347 case OPERATIONTIMEOUT: 348 case REQUESTTIMEOUT: 349 TraceUtil.setError(span, e); 350 retryOrThrow(retryCounter, e, "getChildren"); 351 break; 352 353 default: 354 TraceUtil.setError(span, e); 355 throw e; 356 } 357 } 358 retryCounter.sleepUntilNextRetry(); 359 } 360 } finally { 361 span.end(); 362 } 363 } 364 365 /** 366 * getChildren is an idempotent operation. Retry before throwing exception 367 * @return List of children znodes 368 */ 369 public List<String> getChildren(String path, boolean watch) 370 throws KeeperException, InterruptedException { 371 Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); 372 try (Scope ignored = span.makeCurrent()) { 373 RetryCounter retryCounter = retryCounterFactory.create(); 374 while (true) { 375 try { 376 long startTime = EnvironmentEdgeManager.currentTime(); 377 List<String> children = checkZk().getChildren(path, watch); 378 span.setStatus(StatusCode.OK); 379 return children; 380 } catch (KeeperException e) { 381 switch (e.code()) { 382 case CONNECTIONLOSS: 383 TraceUtil.setError(span, e); 384 retryOrThrow(retryCounter, e, "getChildren"); 385 break; 386 case OPERATIONTIMEOUT: 387 TraceUtil.setError(span, e); 388 retryOrThrow(retryCounter, e, "getChildren"); 389 break; 390 391 default: 392 TraceUtil.setError(span, e); 393 throw e; 394 } 395 } 396 retryCounter.sleepUntilNextRetry(); 397 } 398 } finally { 399 span.end(); 400 } 401 } 402 403 /** 404 * getData is an idempotent operation. Retry before throwing exception n 405 */ 406 public byte[] getData(String path, Watcher watcher, Stat stat) 407 throws KeeperException, InterruptedException { 408 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData"); 409 try (Scope ignored = span.makeCurrent()) { 410 RetryCounter retryCounter = retryCounterFactory.create(); 411 while (true) { 412 try { 413 long startTime = EnvironmentEdgeManager.currentTime(); 414 byte[] revData = checkZk().getData(path, watcher, stat); 415 span.setStatus(StatusCode.OK); 416 return ZKMetadata.removeMetaData(revData); 417 } catch (KeeperException e) { 418 switch (e.code()) { 419 case CONNECTIONLOSS: 420 case OPERATIONTIMEOUT: 421 case REQUESTTIMEOUT: 422 TraceUtil.setError(span, e); 423 retryOrThrow(retryCounter, e, "getData"); 424 break; 425 426 default: 427 TraceUtil.setError(span, e); 428 throw e; 429 } 430 } 431 retryCounter.sleepUntilNextRetry(); 432 } 433 } finally { 434 span.end(); 435 } 436 } 437 438 /** 439 * getData is an idempotent operation. Retry before throwing exception n 440 */ 441 public byte[] getData(String path, boolean watch, Stat stat) 442 throws KeeperException, InterruptedException { 443 Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan(); 444 try (Scope scope = span.makeCurrent()) { 445 RetryCounter retryCounter = retryCounterFactory.create(); 446 while (true) { 447 try { 448 long startTime = EnvironmentEdgeManager.currentTime(); 449 byte[] revData = checkZk().getData(path, watch, stat); 450 span.setStatus(StatusCode.OK); 451 return ZKMetadata.removeMetaData(revData); 452 } catch (KeeperException e) { 453 switch (e.code()) { 454 case CONNECTIONLOSS: 455 TraceUtil.setError(span, e); 456 retryOrThrow(retryCounter, e, "getData"); 457 break; 458 case OPERATIONTIMEOUT: 459 TraceUtil.setError(span, e); 460 retryOrThrow(retryCounter, e, "getData"); 461 break; 462 463 default: 464 TraceUtil.setError(span, e); 465 throw e; 466 } 467 } 468 retryCounter.sleepUntilNextRetry(); 469 } 470 } finally { 471 span.end(); 472 } 473 } 474 475 /** 476 * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an 477 * identifier field into the data to check whether badversion is caused by the result of previous 478 * correctly setData 479 * @return Stat instance 480 */ 481 public Stat setData(String path, byte[] data, int version) 482 throws KeeperException, InterruptedException { 483 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData"); 484 try (Scope ignored = span.makeCurrent()) { 485 RetryCounter retryCounter = retryCounterFactory.create(); 486 byte[] newData = ZKMetadata.appendMetaData(id, data); 487 boolean isRetry = false; 488 long startTime; 489 while (true) { 490 try { 491 startTime = EnvironmentEdgeManager.currentTime(); 492 Stat nodeStat = checkZk().setData(path, newData, version); 493 span.setStatus(StatusCode.OK); 494 return nodeStat; 495 } catch (KeeperException e) { 496 switch (e.code()) { 497 case CONNECTIONLOSS: 498 case OPERATIONTIMEOUT: 499 case REQUESTTIMEOUT: 500 TraceUtil.setError(span, e); 501 retryOrThrow(retryCounter, e, "setData"); 502 break; 503 case BADVERSION: 504 if (isRetry) { 505 // try to verify whether the previous setData success or not 506 try { 507 Stat stat = new Stat(); 508 byte[] revData = checkZk().getData(path, false, stat); 509 if (Bytes.compareTo(revData, newData) == 0) { 510 // the bad version is caused by previous successful setData 511 span.setStatus(StatusCode.OK); 512 return stat; 513 } 514 } catch (KeeperException keeperException) { 515 // the ZK is not reliable at this moment. just throwing exception 516 TraceUtil.setError(span, keeperException); 517 throw keeperException; 518 } 519 } 520 // throw other exceptions and verified bad version exceptions 521 default: 522 TraceUtil.setError(span, e); 523 throw e; 524 } 525 } 526 retryCounter.sleepUntilNextRetry(); 527 isRetry = true; 528 } 529 } finally { 530 span.end(); 531 } 532 } 533 534 /** 535 * getAcl is an idempotent operation. Retry before throwing exception 536 * @return list of ACLs 537 */ 538 public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException { 539 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl"); 540 try (Scope ignored = span.makeCurrent()) { 541 RetryCounter retryCounter = retryCounterFactory.create(); 542 while (true) { 543 try { 544 long startTime = EnvironmentEdgeManager.currentTime(); 545 List<ACL> nodeACL = checkZk().getACL(path, stat); 546 span.setStatus(StatusCode.OK); 547 return nodeACL; 548 } catch (KeeperException e) { 549 switch (e.code()) { 550 case CONNECTIONLOSS: 551 case OPERATIONTIMEOUT: 552 case REQUESTTIMEOUT: 553 TraceUtil.setError(span, e); 554 retryOrThrow(retryCounter, e, "getAcl"); 555 break; 556 557 default: 558 TraceUtil.setError(span, e); 559 throw e; 560 } 561 } 562 retryCounter.sleepUntilNextRetry(); 563 } 564 } finally { 565 span.end(); 566 } 567 } 568 569 /** 570 * setAcl is an idempotent operation. Retry before throwing exception 571 * @return list of ACLs 572 */ 573 public Stat setAcl(String path, List<ACL> acls, int version) 574 throws KeeperException, InterruptedException { 575 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl"); 576 try (Scope ignored = span.makeCurrent()) { 577 RetryCounter retryCounter = retryCounterFactory.create(); 578 while (true) { 579 try { 580 long startTime = EnvironmentEdgeManager.currentTime(); 581 Stat nodeStat = checkZk().setACL(path, acls, version); 582 span.setStatus(StatusCode.OK); 583 return nodeStat; 584 } catch (KeeperException e) { 585 switch (e.code()) { 586 case CONNECTIONLOSS: 587 case OPERATIONTIMEOUT: 588 TraceUtil.setError(span, e); 589 retryOrThrow(retryCounter, e, "setAcl"); 590 break; 591 592 default: 593 TraceUtil.setError(span, e); 594 throw e; 595 } 596 } 597 retryCounter.sleepUntilNextRetry(); 598 } 599 } finally { 600 span.end(); 601 } 602 } 603 604 /** 605 * <p> 606 * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this 607 * function will not throw the NodeExist exception back to the application. 608 * </p> 609 * <p> 610 * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to 611 * verify, whether the previous one is successful or not. 612 * </p> 613 * n 614 */ 615 public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 616 throws KeeperException, InterruptedException { 617 final Span span = TraceUtil.createSpan("RecoverableZookeeper.create"); 618 try (Scope ignored = span.makeCurrent()) { 619 byte[] newData = ZKMetadata.appendMetaData(id, data); 620 switch (createMode) { 621 case EPHEMERAL: 622 case PERSISTENT: 623 span.setStatus(StatusCode.OK); 624 return createNonSequential(path, newData, acl, createMode); 625 626 case EPHEMERAL_SEQUENTIAL: 627 case PERSISTENT_SEQUENTIAL: 628 span.setStatus(StatusCode.OK); 629 return createSequential(path, newData, acl, createMode); 630 631 default: 632 final IllegalArgumentException e = 633 new IllegalArgumentException("Unrecognized CreateMode: " + createMode); 634 TraceUtil.setError(span, e); 635 throw e; 636 } 637 } finally { 638 span.end(); 639 } 640 } 641 642 private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 643 throws KeeperException, InterruptedException { 644 RetryCounter retryCounter = retryCounterFactory.create(); 645 boolean isRetry = false; // False for first attempt, true for all retries. 646 long startTime; 647 while (true) { 648 try { 649 startTime = EnvironmentEdgeManager.currentTime(); 650 String nodePath = checkZk().create(path, data, acl, createMode); 651 return nodePath; 652 } catch (KeeperException e) { 653 switch (e.code()) { 654 case NODEEXISTS: 655 if (isRetry) { 656 // If the connection was lost, there is still a possibility that 657 // we have successfully created the node at our previous attempt, 658 // so we read the node and compare. 659 byte[] currentData = checkZk().getData(path, false, null); 660 if (currentData != null && Bytes.compareTo(currentData, data) == 0) { 661 // We successfully created a non-sequential node 662 return path; 663 } 664 LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData) 665 + ", could not write " + Bytes.toStringBinary(data)); 666 throw e; 667 } 668 LOG.trace("Node {} already exists", path); 669 throw e; 670 671 case CONNECTIONLOSS: 672 case OPERATIONTIMEOUT: 673 case REQUESTTIMEOUT: 674 retryOrThrow(retryCounter, e, "create"); 675 break; 676 677 default: 678 throw e; 679 } 680 } 681 retryCounter.sleepUntilNextRetry(); 682 isRetry = true; 683 } 684 } 685 686 private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 687 throws KeeperException, InterruptedException { 688 RetryCounter retryCounter = retryCounterFactory.create(); 689 boolean first = true; 690 String newPath = path + this.identifier; 691 while (true) { 692 try { 693 if (!first) { 694 // Check if we succeeded on a previous attempt 695 String previousResult = findPreviousSequentialNode(newPath); 696 if (previousResult != null) { 697 return previousResult; 698 } 699 } 700 first = false; 701 long startTime = EnvironmentEdgeManager.currentTime(); 702 String nodePath = checkZk().create(newPath, data, acl, createMode); 703 return nodePath; 704 } catch (KeeperException e) { 705 switch (e.code()) { 706 case CONNECTIONLOSS: 707 case OPERATIONTIMEOUT: 708 case REQUESTTIMEOUT: 709 retryOrThrow(retryCounter, e, "create"); 710 break; 711 712 default: 713 throw e; 714 } 715 } 716 retryCounter.sleepUntilNextRetry(); 717 } 718 } 719 720 /** 721 * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to 722 * actually pass to multi (need to do this in order to appendMetaData). 723 */ 724 private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException { 725 if (ops == null) { 726 return null; 727 } 728 729 List<Op> preparedOps = new LinkedList<>(); 730 for (Op op : ops) { 731 if (op.getType() == ZooDefs.OpCode.create) { 732 CreateRequest create = (CreateRequest) op.toRequestRecord(); 733 preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()), 734 create.getAcl(), create.getFlags())); 735 } else if (op.getType() == ZooDefs.OpCode.delete) { 736 // no need to appendMetaData for delete 737 preparedOps.add(op); 738 } else if (op.getType() == ZooDefs.OpCode.setData) { 739 SetDataRequest setData = (SetDataRequest) op.toRequestRecord(); 740 preparedOps.add(Op.setData(setData.getPath(), 741 ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion())); 742 } else { 743 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName()); 744 } 745 } 746 return preparedOps; 747 } 748 749 /** 750 * Run multiple operations in a transactional manner. Retry before throwing exception 751 */ 752 public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException { 753 final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi"); 754 try (Scope ignored = span.makeCurrent()) { 755 RetryCounter retryCounter = retryCounterFactory.create(); 756 Iterable<Op> multiOps = prepareZKMulti(ops); 757 while (true) { 758 try { 759 long startTime = EnvironmentEdgeManager.currentTime(); 760 List<OpResult> opResults = checkZk().multi(multiOps); 761 span.setStatus(StatusCode.OK); 762 return opResults; 763 } catch (KeeperException e) { 764 switch (e.code()) { 765 case CONNECTIONLOSS: 766 case OPERATIONTIMEOUT: 767 case REQUESTTIMEOUT: 768 TraceUtil.setError(span, e); 769 retryOrThrow(retryCounter, e, "multi"); 770 break; 771 772 default: 773 TraceUtil.setError(span, e); 774 throw e; 775 } 776 } 777 retryCounter.sleepUntilNextRetry(); 778 } 779 } finally { 780 span.end(); 781 } 782 } 783 784 private String findPreviousSequentialNode(String path) 785 throws KeeperException, InterruptedException { 786 int lastSlashIdx = path.lastIndexOf('/'); 787 assert (lastSlashIdx != -1); 788 String parent = path.substring(0, lastSlashIdx); 789 String nodePrefix = path.substring(lastSlashIdx + 1); 790 long startTime = EnvironmentEdgeManager.currentTime(); 791 List<String> nodes = checkZk().getChildren(parent, false); 792 List<String> matching = filterByPrefix(nodes, nodePrefix); 793 for (String node : matching) { 794 String nodePath = parent + "/" + node; 795 startTime = EnvironmentEdgeManager.currentTime(); 796 Stat stat = checkZk().exists(nodePath, false); 797 if (stat != null) { 798 return nodePath; 799 } 800 } 801 return null; 802 } 803 804 public synchronized long getSessionId() { 805 return zk == null ? -1 : zk.getSessionId(); 806 } 807 808 public synchronized void close() throws InterruptedException { 809 if (zk != null) { 810 zk.close(); 811 } 812 } 813 814 public synchronized States getState() { 815 return zk == null ? null : zk.getState(); 816 } 817 818 public synchronized ZooKeeper getZooKeeper() { 819 return zk; 820 } 821 822 public synchronized byte[] getSessionPasswd() { 823 return zk == null ? null : zk.getSessionPasswd(); 824 } 825 826 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { 827 checkZk().sync(path, cb, ctx); 828 } 829 830 /** 831 * Filters the given node list by the given prefixes. This method is all-inclusive--if any element 832 * in the node list starts with any of the given prefixes, then it is included in the result. 833 * @param nodes the nodes to filter 834 * @param prefixes the prefixes to include in the result 835 * @return list of every element that starts with one of the prefixes 836 */ 837 private static List<String> filterByPrefix(List<String> nodes, String... prefixes) { 838 List<String> lockChildren = new ArrayList<>(); 839 for (String child : nodes) { 840 for (String prefix : prefixes) { 841 if (child.startsWith(prefix)) { 842 lockChildren.add(child); 843 break; 844 } 845 } 846 } 847 return lockChildren; 848 } 849 850 public String getIdentifier() { 851 return identifier; 852 } 853}