001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.io.IOException; 024import java.lang.management.ManagementFactory; 025import java.util.ArrayList; 026import java.util.LinkedList; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.trace.TraceUtil; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.RetryCounter; 033import org.apache.hadoop.hbase.util.RetryCounterFactory; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.zookeeper.AsyncCallback; 036import org.apache.zookeeper.CreateMode; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.Op; 039import org.apache.zookeeper.OpResult; 040import org.apache.zookeeper.Watcher; 041import org.apache.zookeeper.ZooDefs; 042import org.apache.zookeeper.ZooKeeper; 043import org.apache.zookeeper.ZooKeeper.States; 044import org.apache.zookeeper.data.ACL; 045import org.apache.zookeeper.data.Stat; 046import org.apache.zookeeper.proto.CreateRequest; 047import org.apache.zookeeper.proto.SetDataRequest; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need 053 * to realize that there are two classes of requests: idempotent and non-idempotent requests. Read 054 * requests and unconditional sets and deletes are examples of idempotent requests, they can be 055 * reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its 056 * effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling, 057 * application and library writers need to keep in mind that they may need to encode information in 058 * the data or name of znodes to detect retries. A simple example is a create that uses a sequence 059 * flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception, 060 * that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the 061 * process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 062 * was the result of the previous create, so the process actually owns both x-109 and x-111. An easy 063 * way around this is to use "x-process id-" when doing the create. If the process is using an id of 064 * 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 065 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it 066 * created is "x-352-109". 067 * @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling" 068 */ 069@InterfaceAudience.Private 070public class RecoverableZooKeeper { 071 private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class); 072 // the actual ZooKeeper client instance 073 private ZooKeeper zk; 074 private final RetryCounterFactory retryCounterFactory; 075 // An identifier of this process in the cluster 076 private final String identifier; 077 private final byte[] id; 078 private final Watcher watcher; 079 private final int sessionTimeout; 080 private final String quorumServers; 081 private final int maxMultiSize; 082 083 /** 084 * See {@link #connect(Configuration, String, Watcher, String)} 085 */ 086 public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) 087 throws IOException { 088 String ensemble = ZKConfig.getZKQuorumServersString(conf); 089 return connect(conf, ensemble, watcher); 090 } 091 092 /** 093 * See {@link #connect(Configuration, String, Watcher, String)} 094 */ 095 public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher) 096 throws IOException { 097 return connect(conf, ensemble, watcher, null); 098 } 099 100 /** 101 * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified 102 * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring 103 * watcher to the specified watcher. 104 * @param conf configuration to pull ensemble and other settings from 105 * @param watcher watcher to monitor connection changes 106 * @param ensemble ZooKeeper servers quorum string 107 * @param identifier value used to identify this client instance. 108 * @return connection to zookeeper 109 * @throws IOException if unable to connect to zk or config problem 110 */ 111 public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, 112 final String identifier) throws IOException { 113 if (ensemble == null) { 114 throw new IOException("Unable to determine ZooKeeper ensemble"); 115 } 116 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 117 if (LOG.isTraceEnabled()) { 118 LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble); 119 } 120 int retry = conf.getInt("zookeeper.recovery.retry", 3); 121 int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); 122 int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); 123 int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024); 124 return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis, 125 maxSleepTime, identifier, multiMaxSize); 126 } 127 128 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", 129 justification = "None. Its always been this way.") 130 public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, 131 int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize) 132 throws IOException { 133 // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. 134 this.retryCounterFactory = 135 new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime); 136 137 if (identifier == null || identifier.length() == 0) { 138 // the identifier = processID@hostName 139 identifier = ManagementFactory.getRuntimeMXBean().getName(); 140 } 141 LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier, 142 quorumServers); 143 this.identifier = identifier; 144 this.id = Bytes.toBytes(identifier); 145 146 this.watcher = watcher; 147 this.sessionTimeout = sessionTimeout; 148 this.quorumServers = quorumServers; 149 this.maxMultiSize = maxMultiSize; 150 151 try { 152 checkZk(); 153 } catch (Exception x) { 154 /* ignore */ 155 } 156 } 157 158 /** 159 * Returns the maximum size (in bytes) that should be included in any single multi() call. NB: 160 * This is an approximation, so there may be variance in the msg actually sent over the wire. 161 * Please be sure to set this approximately, with respect to your ZK server configuration for 162 * jute.maxbuffer. 163 */ 164 public int getMaxMultiSizeLimit() { 165 return maxMultiSize; 166 } 167 168 /** 169 * Try to create a ZooKeeper connection. Turns any exception encountered into a 170 * KeeperException.OperationTimeoutException so it can retried. 171 * @return The created ZooKeeper connection object 172 * @throws KeeperException if a ZooKeeper operation fails 173 */ 174 protected synchronized ZooKeeper checkZk() throws KeeperException { 175 if (this.zk == null) { 176 try { 177 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); 178 } catch (IOException ex) { 179 LOG.warn("Unable to create ZooKeeper Connection", ex); 180 throw new KeeperException.OperationTimeoutException(); 181 } 182 } 183 return zk; 184 } 185 186 public synchronized void reconnectAfterExpiration() 187 throws IOException, KeeperException, InterruptedException { 188 if (zk != null) { 189 LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x" 190 + Long.toHexString(zk.getSessionId())); 191 zk.close(); 192 // reset the ZooKeeper connection 193 zk = null; 194 } 195 checkZk(); 196 LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId())); 197 } 198 199 /** 200 * delete is an idempotent operation. Retry before throwing exception. This function will not 201 * throw NoNodeException if the path does not exist. 202 */ 203 public void delete(String path, int version) throws InterruptedException, KeeperException { 204 final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete"); 205 try (Scope ignored = span.makeCurrent()) { 206 RetryCounter retryCounter = retryCounterFactory.create(); 207 boolean isRetry = false; // False for first attempt, true for all retries. 208 while (true) { 209 try { 210 checkZk().delete(path, version); 211 span.setStatus(StatusCode.OK); 212 return; 213 } catch (KeeperException e) { 214 switch (e.code()) { 215 case NONODE: 216 if (isRetry) { 217 LOG.debug( 218 "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded."); 219 return; 220 } 221 LOG.debug("Node {} already deleted, retry={}", path, isRetry); 222 TraceUtil.setError(span, e); 223 throw e; 224 225 case CONNECTIONLOSS: 226 case OPERATIONTIMEOUT: 227 case REQUESTTIMEOUT: 228 TraceUtil.setError(span, e); 229 retryOrThrow(retryCounter, e, "delete"); 230 break; 231 232 default: 233 TraceUtil.setError(span, e); 234 throw e; 235 } 236 } 237 retryCounter.sleepUntilNextRetry(); 238 isRetry = true; 239 } 240 } finally { 241 span.end(); 242 } 243 } 244 245 /** 246 * exists is an idempotent operation. Retry before throwing exception 247 * @return A Stat instance 248 */ 249 public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { 250 return exists(path, watcher, null); 251 } 252 253 private Stat exists(String path, Watcher watcher, Boolean watch) 254 throws InterruptedException, KeeperException { 255 final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); 256 try (Scope ignored = span.makeCurrent()) { 257 RetryCounter retryCounter = retryCounterFactory.create(); 258 while (true) { 259 try { 260 Stat nodeStat; 261 if (watch == null) { 262 nodeStat = checkZk().exists(path, watcher); 263 } else { 264 nodeStat = checkZk().exists(path, watch); 265 } 266 span.setStatus(StatusCode.OK); 267 return nodeStat; 268 } catch (KeeperException e) { 269 switch (e.code()) { 270 case CONNECTIONLOSS: 271 case OPERATIONTIMEOUT: 272 case REQUESTTIMEOUT: 273 TraceUtil.setError(span, e); 274 retryOrThrow(retryCounter, e, "exists"); 275 break; 276 277 default: 278 TraceUtil.setError(span, e); 279 throw e; 280 } 281 } 282 retryCounter.sleepUntilNextRetry(); 283 } 284 } finally { 285 span.end(); 286 } 287 } 288 289 /** 290 * exists is an idempotent operation. Retry before throwing exception 291 * @return A Stat instance 292 */ 293 public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { 294 return exists(path, null, watch); 295 } 296 297 private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName) 298 throws KeeperException { 299 if (!retryCounter.shouldRetry()) { 300 LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts()); 301 throw e; 302 } 303 LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e); 304 } 305 306 /** 307 * getChildren is an idempotent operation. Retry before throwing exception 308 * @return List of children znodes 309 */ 310 public List<String> getChildren(String path, Watcher watcher) 311 throws KeeperException, InterruptedException { 312 return getChildren(path, watcher, null); 313 } 314 315 private List<String> getChildren(String path, Watcher watcher, Boolean watch) 316 throws InterruptedException, KeeperException { 317 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); 318 try (Scope ignored = span.makeCurrent()) { 319 RetryCounter retryCounter = retryCounterFactory.create(); 320 while (true) { 321 try { 322 List<String> children; 323 if (watch == null) { 324 children = checkZk().getChildren(path, watcher); 325 } else { 326 children = checkZk().getChildren(path, watch); 327 } 328 span.setStatus(StatusCode.OK); 329 return children; 330 } catch (KeeperException e) { 331 switch (e.code()) { 332 case CONNECTIONLOSS: 333 case OPERATIONTIMEOUT: 334 case REQUESTTIMEOUT: 335 TraceUtil.setError(span, e); 336 retryOrThrow(retryCounter, e, "getChildren"); 337 break; 338 339 default: 340 TraceUtil.setError(span, e); 341 throw e; 342 } 343 } 344 retryCounter.sleepUntilNextRetry(); 345 } 346 } finally { 347 span.end(); 348 } 349 } 350 351 /** 352 * getChildren is an idempotent operation. Retry before throwing exception 353 * @return List of children znodes 354 */ 355 public List<String> getChildren(String path, boolean watch) 356 throws KeeperException, InterruptedException { 357 return getChildren(path, null, watch); 358 } 359 360 /** 361 * getData is an idempotent operation. Retry before throwing exception 362 */ 363 public byte[] getData(String path, Watcher watcher, Stat stat) 364 throws KeeperException, InterruptedException { 365 return getData(path, watcher, null, stat); 366 } 367 368 private byte[] getData(String path, Watcher watcher, Boolean watch, Stat stat) 369 throws InterruptedException, KeeperException { 370 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData"); 371 try (Scope ignored = span.makeCurrent()) { 372 RetryCounter retryCounter = retryCounterFactory.create(); 373 while (true) { 374 try { 375 byte[] revData; 376 if (watch == null) { 377 revData = checkZk().getData(path, watcher, stat); 378 } else { 379 revData = checkZk().getData(path, watch, stat); 380 } 381 span.setStatus(StatusCode.OK); 382 return ZKMetadata.removeMetaData(revData); 383 } catch (KeeperException e) { 384 switch (e.code()) { 385 case CONNECTIONLOSS: 386 case OPERATIONTIMEOUT: 387 case REQUESTTIMEOUT: 388 TraceUtil.setError(span, e); 389 retryOrThrow(retryCounter, e, "getData"); 390 break; 391 392 default: 393 TraceUtil.setError(span, e); 394 throw e; 395 } 396 } 397 retryCounter.sleepUntilNextRetry(); 398 } 399 } finally { 400 span.end(); 401 } 402 } 403 404 /** 405 * getData is an idempotent operation. Retry before throwing exception 406 */ 407 public byte[] getData(String path, boolean watch, Stat stat) 408 throws KeeperException, InterruptedException { 409 return getData(path, null, watch, stat); 410 } 411 412 /** 413 * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an 414 * identifier field into the data to check whether badversion is caused by the result of previous 415 * correctly setData 416 * @return Stat instance 417 */ 418 public Stat setData(String path, byte[] data, int version) 419 throws KeeperException, InterruptedException { 420 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData"); 421 try (Scope ignored = span.makeCurrent()) { 422 RetryCounter retryCounter = retryCounterFactory.create(); 423 byte[] newData = ZKMetadata.appendMetaData(id, data); 424 boolean isRetry = false; 425 while (true) { 426 try { 427 span.setStatus(StatusCode.OK); 428 return checkZk().setData(path, newData, version); 429 } catch (KeeperException e) { 430 switch (e.code()) { 431 case CONNECTIONLOSS: 432 case OPERATIONTIMEOUT: 433 case REQUESTTIMEOUT: 434 TraceUtil.setError(span, e); 435 retryOrThrow(retryCounter, e, "setData"); 436 break; 437 case BADVERSION: 438 if (isRetry) { 439 // try to verify whether the previous setData success or not 440 try { 441 Stat stat = new Stat(); 442 byte[] revData = checkZk().getData(path, false, stat); 443 if (Bytes.compareTo(revData, newData) == 0) { 444 // the bad version is caused by previous successful setData 445 return stat; 446 } 447 } catch (KeeperException keeperException) { 448 // the ZK is not reliable at this moment. just throwing exception 449 TraceUtil.setError(span, e); 450 throw keeperException; 451 } 452 } 453 // throw other exceptions and verified bad version exceptions 454 default: 455 TraceUtil.setError(span, e); 456 throw e; 457 } 458 } 459 retryCounter.sleepUntilNextRetry(); 460 isRetry = true; 461 } 462 } finally { 463 span.end(); 464 } 465 } 466 467 /** 468 * getAcl is an idempotent operation. Retry before throwing exception 469 * @return list of ACLs 470 */ 471 public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException { 472 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl"); 473 try (Scope ignored = span.makeCurrent()) { 474 RetryCounter retryCounter = retryCounterFactory.create(); 475 while (true) { 476 try { 477 span.setStatus(StatusCode.OK); 478 return checkZk().getACL(path, stat); 479 } catch (KeeperException e) { 480 switch (e.code()) { 481 case CONNECTIONLOSS: 482 case OPERATIONTIMEOUT: 483 case REQUESTTIMEOUT: 484 TraceUtil.setError(span, e); 485 retryOrThrow(retryCounter, e, "getAcl"); 486 break; 487 488 default: 489 TraceUtil.setError(span, e); 490 throw e; 491 } 492 } 493 retryCounter.sleepUntilNextRetry(); 494 } 495 } finally { 496 span.end(); 497 } 498 } 499 500 /** 501 * setAcl is an idempotent operation. Retry before throwing exception 502 * @return list of ACLs 503 */ 504 public Stat setAcl(String path, List<ACL> acls, int version) 505 throws KeeperException, InterruptedException { 506 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl"); 507 try (Scope ignored = span.makeCurrent()) { 508 RetryCounter retryCounter = retryCounterFactory.create(); 509 while (true) { 510 try { 511 span.setStatus(StatusCode.OK); 512 return checkZk().setACL(path, acls, version); 513 } catch (KeeperException e) { 514 switch (e.code()) { 515 case CONNECTIONLOSS: 516 case OPERATIONTIMEOUT: 517 TraceUtil.setError(span, e); 518 retryOrThrow(retryCounter, e, "setAcl"); 519 break; 520 521 default: 522 TraceUtil.setError(span, e); 523 throw e; 524 } 525 } 526 retryCounter.sleepUntilNextRetry(); 527 } 528 } finally { 529 span.end(); 530 } 531 } 532 533 /** 534 * <p> 535 * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this 536 * function will not throw the NodeExist exception back to the application. 537 * </p> 538 * <p> 539 * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to 540 * verify, whether the previous one is successful or not. 541 * </p> 542 */ 543 public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 544 throws KeeperException, InterruptedException { 545 final Span span = TraceUtil.createSpan("RecoverableZookeeper.create"); 546 try (Scope ignored = span.makeCurrent()) { 547 byte[] newData = ZKMetadata.appendMetaData(id, data); 548 switch (createMode) { 549 case EPHEMERAL: 550 case PERSISTENT: 551 span.setStatus(StatusCode.OK); 552 return createNonSequential(path, newData, acl, createMode); 553 554 case EPHEMERAL_SEQUENTIAL: 555 case PERSISTENT_SEQUENTIAL: 556 span.setStatus(StatusCode.OK); 557 return createSequential(path, newData, acl, createMode); 558 559 default: 560 final IllegalArgumentException e = 561 new IllegalArgumentException("Unrecognized CreateMode: " + createMode); 562 TraceUtil.setError(span, e); 563 throw e; 564 } 565 } finally { 566 span.end(); 567 } 568 } 569 570 private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 571 throws KeeperException, InterruptedException { 572 RetryCounter retryCounter = retryCounterFactory.create(); 573 boolean isRetry = false; // False for first attempt, true for all retries. 574 while (true) { 575 try { 576 return checkZk().create(path, data, acl, createMode); 577 } catch (KeeperException e) { 578 switch (e.code()) { 579 case NODEEXISTS: 580 if (isRetry) { 581 // If the connection was lost, there is still a possibility that 582 // we have successfully created the node at our previous attempt, 583 // so we read the node and compare. 584 byte[] currentData = checkZk().getData(path, false, null); 585 if (currentData != null && Bytes.compareTo(currentData, data) == 0) { 586 // We successfully created a non-sequential node 587 return path; 588 } 589 LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData) 590 + ", could not write " + Bytes.toStringBinary(data)); 591 throw e; 592 } 593 LOG.trace("Node {} already exists", path); 594 throw e; 595 596 case CONNECTIONLOSS: 597 case OPERATIONTIMEOUT: 598 case REQUESTTIMEOUT: 599 retryOrThrow(retryCounter, e, "create"); 600 break; 601 602 default: 603 throw e; 604 } 605 } 606 retryCounter.sleepUntilNextRetry(); 607 isRetry = true; 608 } 609 } 610 611 private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 612 throws KeeperException, InterruptedException { 613 RetryCounter retryCounter = retryCounterFactory.create(); 614 boolean first = true; 615 String newPath = path + this.identifier; 616 while (true) { 617 try { 618 if (!first) { 619 // Check if we succeeded on a previous attempt 620 String previousResult = findPreviousSequentialNode(newPath); 621 if (previousResult != null) { 622 return previousResult; 623 } 624 } 625 first = false; 626 return checkZk().create(newPath, data, acl, createMode); 627 } catch (KeeperException e) { 628 switch (e.code()) { 629 case CONNECTIONLOSS: 630 case OPERATIONTIMEOUT: 631 case REQUESTTIMEOUT: 632 retryOrThrow(retryCounter, e, "create"); 633 break; 634 635 default: 636 throw e; 637 } 638 } 639 retryCounter.sleepUntilNextRetry(); 640 } 641 } 642 643 /** 644 * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to 645 * actually pass to multi (need to do this in order to appendMetaData). 646 */ 647 private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException { 648 if (ops == null) { 649 return null; 650 } 651 652 List<Op> preparedOps = new LinkedList<>(); 653 for (Op op : ops) { 654 if (op.getType() == ZooDefs.OpCode.create) { 655 CreateRequest create = (CreateRequest) op.toRequestRecord(); 656 preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()), 657 create.getAcl(), create.getFlags())); 658 } else if (op.getType() == ZooDefs.OpCode.delete) { 659 // no need to appendMetaData for delete 660 preparedOps.add(op); 661 } else if (op.getType() == ZooDefs.OpCode.setData) { 662 SetDataRequest setData = (SetDataRequest) op.toRequestRecord(); 663 preparedOps.add(Op.setData(setData.getPath(), 664 ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion())); 665 } else { 666 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName()); 667 } 668 } 669 return preparedOps; 670 } 671 672 /** 673 * Run multiple operations in a transactional manner. Retry before throwing exception 674 */ 675 public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException { 676 final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi"); 677 try (Scope ignored = span.makeCurrent()) { 678 RetryCounter retryCounter = retryCounterFactory.create(); 679 Iterable<Op> multiOps = prepareZKMulti(ops); 680 while (true) { 681 try { 682 span.setStatus(StatusCode.OK); 683 return checkZk().multi(multiOps); 684 } catch (KeeperException e) { 685 switch (e.code()) { 686 case CONNECTIONLOSS: 687 case OPERATIONTIMEOUT: 688 case REQUESTTIMEOUT: 689 TraceUtil.setError(span, e); 690 retryOrThrow(retryCounter, e, "multi"); 691 break; 692 693 default: 694 TraceUtil.setError(span, e); 695 throw e; 696 } 697 } 698 retryCounter.sleepUntilNextRetry(); 699 } 700 } finally { 701 span.end(); 702 } 703 } 704 705 private String findPreviousSequentialNode(String path) 706 throws KeeperException, InterruptedException { 707 int lastSlashIdx = path.lastIndexOf('/'); 708 assert (lastSlashIdx != -1); 709 String parent = path.substring(0, lastSlashIdx); 710 String nodePrefix = path.substring(lastSlashIdx + 1); 711 List<String> nodes = checkZk().getChildren(parent, false); 712 List<String> matching = filterByPrefix(nodes, nodePrefix); 713 for (String node : matching) { 714 String nodePath = parent + "/" + node; 715 Stat stat = checkZk().exists(nodePath, false); 716 if (stat != null) { 717 return nodePath; 718 } 719 } 720 return null; 721 } 722 723 public synchronized long getSessionId() { 724 return zk == null ? -1 : zk.getSessionId(); 725 } 726 727 public synchronized void close() throws InterruptedException { 728 if (zk != null) { 729 zk.close(); 730 } 731 } 732 733 public synchronized States getState() { 734 return zk == null ? null : zk.getState(); 735 } 736 737 public synchronized ZooKeeper getZooKeeper() { 738 return zk; 739 } 740 741 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { 742 checkZk().sync(path, cb, ctx); 743 } 744 745 /** 746 * Filters the given node list by the given prefixes. This method is all-inclusive--if any element 747 * in the node list starts with any of the given prefixes, then it is included in the result. 748 * @param nodes the nodes to filter 749 * @param prefixes the prefixes to include in the result 750 * @return list of every element that starts with one of the prefixes 751 */ 752 private static List<String> filterByPrefix(List<String> nodes, String... prefixes) { 753 List<String> lockChildren = new ArrayList<>(); 754 for (String child : nodes) { 755 for (String prefix : prefixes) { 756 if (child.startsWith(prefix)) { 757 lockChildren.add(child); 758 break; 759 } 760 } 761 } 762 return lockChildren; 763 } 764 765 public String getIdentifier() { 766 return identifier; 767 } 768}