001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.AuthUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ZooKeeperConnectionException; 036import org.apache.hadoop.hbase.security.Superusers; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.hadoop.security.UserGroupInformation; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.zookeeper.KeeperException; 042import org.apache.zookeeper.WatchedEvent; 043import org.apache.zookeeper.Watcher; 044import org.apache.zookeeper.ZooDefs.Ids; 045import org.apache.zookeeper.ZooDefs.Perms; 046import org.apache.zookeeper.data.ACL; 047import org.apache.zookeeper.data.Id; 048import org.apache.zookeeper.data.Stat; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053 054/** 055 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated for each Master, 056 * RegionServer, and client process. 057 * <p> 058 * This is the only class that implements {@link Watcher}. Other internal classes which need to be 059 * notified of ZooKeeper events must register with the local instance of this watcher via 060 * {@link #registerListener}. 061 * <p> 062 * This class also holds and manages the connection to ZooKeeper. Code to deal with connection 063 * related events and exceptions are handled here. 064 */ 065@InterfaceAudience.Private 066public class ZKWatcher implements Watcher, Abortable, Closeable { 067 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 068 069 // Identifier for this watcher (for logging only). It is made of the prefix 070 // passed on construction and the zookeeper sessionid. 071 private final String prefix; 072 private String identifier; 073 074 // zookeeper quorum 075 private final String quorum; 076 077 // zookeeper connection 078 private final RecoverableZooKeeper recoverableZooKeeper; 079 080 // abortable in case of zk failure 081 protected Abortable abortable; 082 // Used if abortable is null 083 private boolean aborted = false; 084 085 private final ZNodePaths znodePaths; 086 087 // listeners to be notified 088 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 089 090 // Single threaded executor pool that processes event notifications from Zookeeper. Events are 091 // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do 092 // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. 093 // EventThread internally runs a single while loop to serially process all the events. When events 094 // are processed by the listeners in the same thread, that blocks the EventThread from processing 095 // subsequent events. Processing events in a separate thread frees up the event thread to continue 096 // and further prevents deadlocks if the process method itself makes other zookeeper calls. 097 // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the 098 // requests using a single while loop and hence there is no performance degradation. 099 private final ExecutorService zkEventProcessor = Executors 100 .newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d") 101 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 102 103 private final Configuration conf; 104 105 private final long zkSyncTimeout; 106 107 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 108 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 109 110 /** 111 * Instantiate a ZooKeeper connection and watcher. 112 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 113 * this instance. Use null for default. 114 * @throws IOException if the connection to ZooKeeper fails 115 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 116 */ 117 public ZKWatcher(Configuration conf, String identifier, Abortable abortable) 118 throws ZooKeeperConnectionException, IOException { 119 this(conf, identifier, abortable, false); 120 } 121 122 /** 123 * Instantiate a ZooKeeper connection and watcher. 124 * @param conf the configuration to use 125 * @param identifier string that is passed to RecoverableZookeeper to be used as 126 * identifier for this instance. Use null for default. 127 * @param abortable Can be null if there is on error there is no host to abort: e.g. 128 * client context. 129 * @param canCreateBaseZNode true if a base ZNode can be created 130 * @throws IOException if the connection to ZooKeeper fails 131 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 132 */ 133 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 134 boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { 135 this(conf, identifier, abortable, canCreateBaseZNode, false); 136 } 137 138 /** 139 * Instantiate a ZooKeeper connection and watcher. 140 * @param conf the configuration to use 141 * @param identifier string that is passed to RecoverableZookeeper to be used as 142 * identifier for this instance. Use null for default. 143 * @param abortable Can be null if there is on error there is no host to abort: e.g. 144 * client context. 145 * @param canCreateBaseZNode true if a base ZNode can be created 146 * @param clientZK whether this watcher is set to access client ZK 147 * @throws IOException if the connection to ZooKeeper fails 148 * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base 149 * ZNodes 150 */ 151 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 152 boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException { 153 this.conf = conf; 154 if (clientZK) { 155 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 156 String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); 157 if (clientZkQuorumServers != null) { 158 if (clientZkQuorumServers.equals(serverZkQuorumServers)) { 159 // Don't allow same settings to avoid dead loop when master trying 160 // to sync meta information from server ZK to client ZK 161 throw new IllegalArgumentException( 162 "The quorum settings for client ZK should be different from those for server"); 163 } 164 this.quorum = clientZkQuorumServers; 165 } else { 166 this.quorum = serverZkQuorumServers; 167 } 168 } else { 169 this.quorum = ZKConfig.getZKQuorumServersString(conf); 170 } 171 this.prefix = identifier; 172 // Identifier will get the sessionid appended later below down when we 173 // handle the syncconnect event. 174 this.identifier = identifier + "0x0"; 175 this.abortable = abortable; 176 this.znodePaths = new ZNodePaths(conf); 177 PendingWatcher pendingWatcher = new PendingWatcher(); 178 this.recoverableZooKeeper = 179 RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier); 180 pendingWatcher.prepare(this); 181 if (canCreateBaseZNode) { 182 try { 183 createBaseZNodes(); 184 } catch (ZooKeeperConnectionException zce) { 185 try { 186 this.recoverableZooKeeper.close(); 187 } catch (InterruptedException ie) { 188 LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper); 189 Thread.currentThread().interrupt(); 190 } 191 throw zce; 192 } 193 } 194 this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, 195 HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); 196 } 197 198 public List<ACL> createACL(String node) { 199 return createACL(node, ZKAuthentication.isSecureZooKeeper(getConfiguration())); 200 } 201 202 public List<ACL> createACL(String node, boolean isSecureZooKeeper) { 203 if (!node.startsWith(getZNodePaths().baseZNode)) { 204 return Ids.OPEN_ACL_UNSAFE; 205 } 206 if (isSecureZooKeeper) { 207 ArrayList<ACL> acls = new ArrayList<>(); 208 // add permission to hbase supper user 209 String[] superUsers = getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY); 210 String hbaseUser = null; 211 try { 212 hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 213 } catch (IOException e) { 214 LOG.debug("Could not acquire current User.", e); 215 } 216 if (superUsers != null) { 217 List<String> groups = new ArrayList<>(); 218 for (String user : superUsers) { 219 if (AuthUtil.isGroupPrincipal(user)) { 220 // TODO: Set node ACL for groups when ZK supports this feature 221 groups.add(user); 222 } else { 223 if (!user.equals(hbaseUser)) { 224 acls.add(new ACL(Perms.ALL, new Id("sasl", user))); 225 } 226 } 227 } 228 if (!groups.isEmpty()) { 229 LOG.warn("Znode ACL setting for group {} is skipped, ZooKeeper doesn't support this " 230 + "feature presently.", groups); 231 } 232 } 233 // Certain znodes are accessed directly by the client, 234 // so they must be readable by non-authenticated clients 235 if (getZNodePaths().isClientReadable(node)) { 236 acls.addAll(Ids.CREATOR_ALL_ACL); 237 acls.addAll(Ids.READ_ACL_UNSAFE); 238 } else { 239 acls.addAll(Ids.CREATOR_ALL_ACL); 240 } 241 return acls; 242 } else { 243 return Ids.OPEN_ACL_UNSAFE; 244 } 245 } 246 247 private void createBaseZNodes() throws ZooKeeperConnectionException { 248 try { 249 // Create all the necessary "directories" of znodes 250 ZKUtil.createWithParents(this, znodePaths.baseZNode); 251 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 252 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 253 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 254 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 255 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 256 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 257 } catch (KeeperException e) { 258 throw new ZooKeeperConnectionException( 259 prefix("Unexpected KeeperException creating base node"), e); 260 } 261 } 262 263 /** 264 * On master start, we check the znode ACLs under the root directory and set the ACLs properly if 265 * needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed so 266 * that the existing znodes created with open permissions are now changed with restrictive perms. 267 */ 268 public void checkAndSetZNodeAcls() { 269 if (!ZKAuthentication.isSecureZooKeeper(getConfiguration())) { 270 LOG.info("not a secure deployment, proceeding"); 271 return; 272 } 273 274 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 275 // correct. 276 try { 277 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 278 279 if (!isBaseZnodeAclSetup(actualAcls)) { 280 LOG.info("setting znode ACLs"); 281 setZnodeAclsRecursive(znodePaths.baseZNode); 282 } 283 } catch (KeeperException.NoNodeException nne) { 284 return; 285 } catch (InterruptedException ie) { 286 interruptedExceptionNoThrow(ie, false); 287 } catch (IOException | KeeperException e) { 288 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 289 } 290 } 291 292 /** 293 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs will 294 * be set last in case the master fails in between. 295 * @param znode the ZNode to set the permissions for 296 */ 297 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 298 List<String> children = recoverableZooKeeper.getChildren(znode, false); 299 300 for (String child : children) { 301 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 302 } 303 List<ACL> acls = createACL(znode, true); 304 LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls); 305 recoverableZooKeeper.setAcl(znode, acls, -1); 306 } 307 308 /** 309 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 310 * @param acls acls from zookeeper 311 * @return whether ACLs are set for the base znode 312 * @throws IOException if getting the current user fails 313 */ 314 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 315 if (LOG.isDebugEnabled()) { 316 LOG.debug("Checking znode ACLs"); 317 } 318 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 319 // Check whether ACL set for all superusers 320 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 321 return false; 322 } 323 324 // this assumes that current authenticated user is the same as zookeeper client user 325 // configured via JAAS 326 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 327 328 if (acls.isEmpty()) { 329 if (LOG.isDebugEnabled()) { 330 LOG.debug("ACL is empty"); 331 } 332 return false; 333 } 334 335 for (ACL acl : acls) { 336 int perms = acl.getPerms(); 337 Id id = acl.getId(); 338 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 339 // and one for the hbase user 340 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 341 if (perms != Perms.READ) { 342 if (LOG.isDebugEnabled()) { 343 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 344 id, perms, Perms.READ)); 345 } 346 return false; 347 } 348 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 349 if (perms != Perms.ALL) { 350 if (LOG.isDebugEnabled()) { 351 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 352 id, perms, Perms.ALL)); 353 } 354 return false; 355 } 356 } else if ("sasl".equals(id.getScheme())) { 357 String name = id.getId(); 358 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 359 Matcher match = NAME_PATTERN.matcher(name); 360 if (match.matches()) { 361 name = match.group(1); 362 } 363 if (name.equals(hbaseUser)) { 364 if (perms != Perms.ALL) { 365 if (LOG.isDebugEnabled()) { 366 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 367 id, perms, Perms.ALL)); 368 } 369 return false; 370 } 371 } else { 372 if (LOG.isDebugEnabled()) { 373 LOG.debug("Unexpected shortname in SASL ACL: {}", id); 374 } 375 return false; 376 } 377 } else { 378 if (LOG.isDebugEnabled()) { 379 LOG.debug("unexpected ACL id '{}'", id); 380 } 381 return false; 382 } 383 } 384 return true; 385 } 386 387 /* 388 * Validate whether ACL set for all superusers. 389 */ 390 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 391 for (String user : superUsers) { 392 boolean hasAccess = false; 393 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 394 if (!AuthUtil.isGroupPrincipal(user)) { 395 for (ACL acl : acls) { 396 if (user.equals(acl.getId().getId())) { 397 if (acl.getPerms() == Perms.ALL) { 398 hasAccess = true; 399 } else { 400 if (LOG.isDebugEnabled()) { 401 LOG.debug(String.format( 402 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 403 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 404 } 405 } 406 break; 407 } 408 } 409 if (!hasAccess) { 410 return false; 411 } 412 } 413 } 414 return true; 415 } 416 417 /* 418 * Validate whether ACL ID is superuser. 419 */ 420 public static boolean isSuperUserId(String[] superUsers, Id id) { 421 for (String user : superUsers) { 422 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 423 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 424 return true; 425 } 426 } 427 return false; 428 } 429 430 @Override 431 public String toString() { 432 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 433 } 434 435 /** 436 * Adds this instance's identifier as a prefix to the passed <code>str</code> 437 * @param str String to amend. 438 * @return A new string with this instance's identifier as prefix: e.g. if passed 'hello world', 439 * the returned string could be 440 */ 441 public String prefix(final String str) { 442 return this.toString() + " " + str; 443 } 444 445 /** 446 * Get the znodes corresponding to the meta replicas from ZK 447 * @return list of znodes 448 * @throws KeeperException if a ZooKeeper operation fails 449 */ 450 public List<String> getMetaReplicaNodes() throws KeeperException { 451 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 452 return filterMetaReplicaNodes(childrenOfBaseNode); 453 } 454 455 /** 456 * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode 457 * for subsequent CREATE/DELETE operations on child nodes. 458 */ 459 public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException { 460 List<String> childrenOfBaseNode = 461 ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode); 462 return filterMetaReplicaNodes(childrenOfBaseNode); 463 } 464 465 /** 466 * @param nodes Input list of znodes 467 * @return Filtered list of znodes from nodes that belong to meta replica(s). 468 */ 469 private List<String> filterMetaReplicaNodes(List<String> nodes) { 470 if (nodes == null || nodes.isEmpty()) { 471 return new ArrayList<>(); 472 } 473 List<String> metaReplicaNodes = new ArrayList<>(2); 474 String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX); 475 for (String child : nodes) { 476 if (child.startsWith(pattern)) { 477 metaReplicaNodes.add(child); 478 } 479 } 480 return metaReplicaNodes; 481 } 482 483 /** 484 * Register the specified listener to receive ZooKeeper events. 485 * @param listener the listener to register 486 */ 487 public void registerListener(ZKListener listener) { 488 listeners.add(listener); 489 } 490 491 /** 492 * Register the specified listener to receive ZooKeeper events and add it as the first in the list 493 * of current listeners. 494 * @param listener the listener to register 495 */ 496 public void registerListenerFirst(ZKListener listener) { 497 listeners.add(0, listener); 498 } 499 500 public void unregisterListener(ZKListener listener) { 501 listeners.remove(listener); 502 } 503 504 /** 505 * Clean all existing listeners 506 */ 507 public void unregisterAllListeners() { 508 listeners.clear(); 509 } 510 511 /** 512 * Get a copy of current registered listeners 513 */ 514 public List<ZKListener> getListeners() { 515 return new ArrayList<>(listeners); 516 } 517 518 /** 519 * @return The number of currently registered listeners 520 */ 521 public int getNumberOfListeners() { 522 return listeners.size(); 523 } 524 525 /** 526 * Get the connection to ZooKeeper. 527 * @return connection reference to zookeeper 528 */ 529 public RecoverableZooKeeper getRecoverableZooKeeper() { 530 return recoverableZooKeeper; 531 } 532 533 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 534 recoverableZooKeeper.reconnectAfterExpiration(); 535 } 536 537 /** 538 * Get the quorum address of this instance. 539 * @return quorum string of this zookeeper connection instance 540 */ 541 public String getQuorum() { 542 return quorum; 543 } 544 545 /** 546 * Get the znodePaths. 547 * <p> 548 * Mainly used for mocking as mockito can not mock a field access. 549 */ 550 public ZNodePaths getZNodePaths() { 551 return znodePaths; 552 } 553 554 private void processEvent(WatchedEvent event) { 555 switch (event.getType()) { 556 // If event type is NONE, this is a connection status change 557 case None: { 558 connectionEvent(event); 559 break; 560 } 561 562 // Otherwise pass along to the listeners 563 case NodeCreated: { 564 for (ZKListener listener : listeners) { 565 listener.nodeCreated(event.getPath()); 566 } 567 break; 568 } 569 570 case NodeDeleted: { 571 for (ZKListener listener : listeners) { 572 listener.nodeDeleted(event.getPath()); 573 } 574 break; 575 } 576 577 case NodeDataChanged: { 578 for (ZKListener listener : listeners) { 579 listener.nodeDataChanged(event.getPath()); 580 } 581 break; 582 } 583 584 case NodeChildrenChanged: { 585 for (ZKListener listener : listeners) { 586 listener.nodeChildrenChanged(event.getPath()); 587 } 588 break; 589 } 590 default: 591 LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(), 592 event.getPath()); 593 } 594 } 595 596 /** 597 * Method called from ZooKeeper for events and connection status. 598 * <p> 599 * Valid events are passed along to listeners. Connection status changes are dealt with locally. 600 */ 601 @Override 602 public void process(WatchedEvent event) { 603 LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" 604 + event.getState() + ", " + "path=" + event.getPath())); 605 zkEventProcessor.submit(() -> processEvent(event)); 606 } 607 608 // Connection management 609 610 /** 611 * Called when there is a connection-related event via the Watcher callback. 612 * <p> 613 * If Disconnected or Expired, this should shutdown the cluster. But, since we send a 614 * KeeperException.SessionExpiredException along with the abort call, it's possible for the 615 * Abortable to catch it and try to create a new session with ZooKeeper. This is what the client 616 * does in HCM. 617 * <p> 618 * @param event the connection-related event 619 */ 620 private void connectionEvent(WatchedEvent event) { 621 switch (event.getState()) { 622 case SyncConnected: 623 this.identifier = 624 this.prefix + "-0x" + Long.toHexString(this.recoverableZooKeeper.getSessionId()); 625 // Update our identifier. Otherwise ignore. 626 LOG.debug("{} connected", this.identifier); 627 break; 628 629 // Abort the server if Disconnected or Expired 630 case Disconnected: 631 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 632 break; 633 634 case Closed: 635 LOG.debug(prefix("ZooKeeper client closed, ignoring")); 636 break; 637 638 case Expired: 639 String msg = prefix(this.identifier + " received expired from " + "ZooKeeper, aborting"); 640 // TODO: One thought is to add call to ZKListener so say, 641 // ZKNodeTracker can zero out its data values. 642 if (this.abortable != null) { 643 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 644 } 645 break; 646 647 case ConnectedReadOnly: 648 case SaslAuthenticated: 649 case AuthFailed: 650 break; 651 652 default: 653 throw new IllegalStateException("Received event is not valid: " + event.getState()); 654 } 655 } 656 657 /** 658 * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a 659 * timeout lets the callers fail-fast rather than wait forever for the sync to finish. 660 * <p> 661 * Executing this method before running other methods will ensure that the subsequent operations 662 * are up-to-date and consistent as of the time that the sync is complete. 663 * <p> 664 * This is used for compareAndSwap type operations where we need to read the data of an existing 665 * node and delete or transition that node, utilizing the previously read version and data. We 666 * want to ensure that the version read is up-to-date from when we begin the operation. 667 * <p> 668 */ 669 public void syncOrTimeout(String path) throws KeeperException { 670 final CountDownLatch latch = new CountDownLatch(1); 671 long startTime = EnvironmentEdgeManager.currentTime(); 672 this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null); 673 try { 674 if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { 675 LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points " 676 + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout); 677 throw new KeeperException.RequestTimeoutException(); 678 } 679 } catch (InterruptedException e) { 680 LOG.warn("Interrupted waiting for ZK sync() to finish.", e); 681 Thread.currentThread().interrupt(); 682 return; 683 } 684 if (LOG.isDebugEnabled()) { 685 // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a 686 // useful metric to have since the latency of sync() impacts the callers. 687 LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime); 688 } 689 } 690 691 /** 692 * Handles KeeperExceptions in client calls. 693 * <p> 694 * This may be temporary but for now this gives one place to deal with these. 695 * <p> 696 * TODO: Currently this method rethrows the exception to let the caller handle 697 * <p> 698 * @param ke the exception to rethrow 699 * @throws KeeperException if a ZooKeeper operation fails 700 */ 701 public void keeperException(KeeperException ke) throws KeeperException { 702 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 703 throw ke; 704 } 705 706 /** 707 * Handles InterruptedExceptions in client calls. 708 * @param ie the InterruptedException instance thrown 709 * @throws KeeperException the exception to throw, transformed from the InterruptedException 710 */ 711 public void interruptedException(InterruptedException ie) throws KeeperException { 712 interruptedExceptionNoThrow(ie, true); 713 // Throw a system error exception to let upper level handle it 714 KeeperException keeperException = new KeeperException.SystemErrorException(); 715 keeperException.initCause(ie); 716 throw keeperException; 717 } 718 719 /** 720 * Log the InterruptedException and interrupt current thread 721 * @param ie The IterruptedException to log 722 * @param throwLater Whether we will throw the exception latter 723 */ 724 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 725 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 726 + (throwLater ? " and rethrow a SystemErrorException" : "")), ie); 727 // At least preserve interrupt. 728 Thread.currentThread().interrupt(); 729 } 730 731 /** 732 * Close the connection to ZooKeeper. 733 */ 734 @Override 735 public void close() { 736 try { 737 recoverableZooKeeper.close(); 738 } catch (InterruptedException e) { 739 Thread.currentThread().interrupt(); 740 } finally { 741 zkEventProcessor.shutdownNow(); 742 } 743 } 744 745 public Configuration getConfiguration() { 746 return conf; 747 } 748 749 @Override 750 public void abort(String why, Throwable e) { 751 if (this.abortable != null) { 752 this.abortable.abort(why, e); 753 } else { 754 this.aborted = true; 755 } 756 } 757 758 @Override 759 public boolean isAborted() { 760 return this.abortable == null ? this.aborted : this.abortable.isAborted(); 761 } 762}