001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.AuthUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ZooKeeperConnectionException; 036import org.apache.hadoop.hbase.security.Superusers; 037import org.apache.hadoop.hbase.trace.TraceUtil; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.zookeeper.KeeperException; 043import org.apache.zookeeper.WatchedEvent; 044import org.apache.zookeeper.Watcher; 045import org.apache.zookeeper.ZooDefs.Ids; 046import org.apache.zookeeper.ZooDefs.Perms; 047import org.apache.zookeeper.data.ACL; 048import org.apache.zookeeper.data.Id; 049import org.apache.zookeeper.data.Stat; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055/** 056 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated for each Master, 057 * RegionServer, and client process. 058 * <p> 059 * This is the only class that implements {@link Watcher}. Other internal classes which need to be 060 * notified of ZooKeeper events must register with the local instance of this watcher via 061 * {@link #registerListener}. 062 * <p> 063 * This class also holds and manages the connection to ZooKeeper. Code to deal with connection 064 * related events and exceptions are handled here. 065 */ 066@InterfaceAudience.Private 067public class ZKWatcher implements Watcher, Abortable, Closeable { 068 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 069 070 // Identifier for this watcher (for logging only). It is made of the prefix 071 // passed on construction and the zookeeper sessionid. 072 private final String prefix; 073 private String identifier; 074 075 // zookeeper quorum 076 private final String quorum; 077 078 // zookeeper connection 079 private final RecoverableZooKeeper recoverableZooKeeper; 080 081 // abortable in case of zk failure 082 protected Abortable abortable; 083 // Used if abortable is null 084 private boolean aborted = false; 085 086 private final ZNodePaths znodePaths; 087 088 // listeners to be notified 089 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 090 091 // Single threaded executor pool that processes event notifications from Zookeeper. Events are 092 // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do 093 // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. 094 // EventThread internally runs a single while loop to serially process all the events. When events 095 // are processed by the listeners in the same thread, that blocks the EventThread from processing 096 // subsequent events. Processing events in a separate thread frees up the event thread to continue 097 // and further prevents deadlocks if the process method itself makes other zookeeper calls. 098 // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the 099 // requests using a single while loop and hence there is no performance degradation. 100 private final ExecutorService zkEventProcessor = Executors 101 .newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d") 102 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 103 104 private final Configuration conf; 105 106 private final long zkSyncTimeout; 107 108 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 109 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 110 111 /** 112 * Instantiate a ZooKeeper connection and watcher. 113 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 114 * this instance. Use null for default. 115 * @throws IOException if the connection to ZooKeeper fails 116 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 117 */ 118 public ZKWatcher(Configuration conf, String identifier, Abortable abortable) 119 throws ZooKeeperConnectionException, IOException { 120 this(conf, identifier, abortable, false); 121 } 122 123 /** 124 * Instantiate a ZooKeeper connection and watcher. 125 * @param conf the configuration to use 126 * @param identifier string that is passed to RecoverableZookeeper to be used as 127 * identifier for this instance. Use null for default. 128 * @param abortable Can be null if there is on error there is no host to abort: e.g. 129 * client context. 130 * @param canCreateBaseZNode true if a base ZNode can be created 131 * @throws IOException if the connection to ZooKeeper fails 132 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 133 */ 134 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 135 boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { 136 this(conf, identifier, abortable, canCreateBaseZNode, false); 137 } 138 139 /** 140 * Instantiate a ZooKeeper connection and watcher. 141 * @param conf the configuration to use 142 * @param identifier string that is passed to RecoverableZookeeper to be used as 143 * identifier for this instance. Use null for default. 144 * @param abortable Can be null if there is on error there is no host to abort: e.g. 145 * client context. 146 * @param canCreateBaseZNode true if a base ZNode can be created 147 * @param clientZK whether this watcher is set to access client ZK 148 * @throws IOException if the connection to ZooKeeper fails 149 * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base 150 * ZNodes 151 */ 152 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 153 boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException { 154 this.conf = conf; 155 if (clientZK) { 156 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 157 String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); 158 if (clientZkQuorumServers != null) { 159 if (clientZkQuorumServers.equals(serverZkQuorumServers)) { 160 // Don't allow same settings to avoid dead loop when master trying 161 // to sync meta information from server ZK to client ZK 162 throw new IllegalArgumentException( 163 "The quorum settings for client ZK should be different from those for server"); 164 } 165 this.quorum = clientZkQuorumServers; 166 } else { 167 this.quorum = serverZkQuorumServers; 168 } 169 } else { 170 this.quorum = ZKConfig.getZKQuorumServersString(conf); 171 } 172 this.prefix = identifier; 173 // Identifier will get the sessionid appended later below down when we 174 // handle the syncconnect event. 175 this.identifier = identifier + "0x0"; 176 this.abortable = abortable; 177 this.znodePaths = new ZNodePaths(conf); 178 PendingWatcher pendingWatcher = new PendingWatcher(); 179 this.recoverableZooKeeper = RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, 180 identifier, ZKConfig.getZKClientConfig(conf)); 181 pendingWatcher.prepare(this); 182 if (canCreateBaseZNode) { 183 try { 184 createBaseZNodes(); 185 } catch (ZooKeeperConnectionException zce) { 186 try { 187 this.recoverableZooKeeper.close(); 188 } catch (InterruptedException ie) { 189 LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper); 190 Thread.currentThread().interrupt(); 191 } 192 throw zce; 193 } 194 } 195 this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, 196 HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); 197 } 198 199 public List<ACL> createACL(String node) { 200 return createACL(node, ZKAuthentication.isSecureZooKeeper(getConfiguration())); 201 } 202 203 public List<ACL> createACL(String node, boolean isSecureZooKeeper) { 204 if (!node.startsWith(getZNodePaths().baseZNode)) { 205 return Ids.OPEN_ACL_UNSAFE; 206 } 207 if (isSecureZooKeeper) { 208 ArrayList<ACL> acls = new ArrayList<>(); 209 // add permission to hbase supper user 210 String[] superUsers = getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY); 211 String hbaseUser = null; 212 try { 213 hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 214 } catch (IOException e) { 215 LOG.debug("Could not acquire current User.", e); 216 } 217 if (superUsers != null) { 218 List<String> groups = new ArrayList<>(); 219 for (String user : superUsers) { 220 if (AuthUtil.isGroupPrincipal(user)) { 221 // TODO: Set node ACL for groups when ZK supports this feature 222 groups.add(user); 223 } else { 224 if (!user.equals(hbaseUser)) { 225 acls.add(new ACL(Perms.ALL, new Id("sasl", user))); 226 } 227 } 228 } 229 if (!groups.isEmpty()) { 230 LOG.warn("Znode ACL setting for group {} is skipped, ZooKeeper doesn't support this " 231 + "feature presently.", groups); 232 } 233 } 234 // Certain znodes are accessed directly by the client, 235 // so they must be readable by non-authenticated clients 236 if (getZNodePaths().isClientReadable(node)) { 237 acls.addAll(Ids.CREATOR_ALL_ACL); 238 acls.addAll(Ids.READ_ACL_UNSAFE); 239 } else { 240 acls.addAll(Ids.CREATOR_ALL_ACL); 241 } 242 return acls; 243 } else { 244 return Ids.OPEN_ACL_UNSAFE; 245 } 246 } 247 248 private void createBaseZNodes() throws ZooKeeperConnectionException { 249 try { 250 // Create all the necessary "directories" of znodes 251 ZKUtil.createWithParents(this, znodePaths.baseZNode); 252 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 253 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 254 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 255 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 256 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 257 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 258 } catch (KeeperException e) { 259 throw new ZooKeeperConnectionException( 260 prefix("Unexpected KeeperException creating base node"), e); 261 } 262 } 263 264 /** 265 * On master start, we check the znode ACLs under the root directory and set the ACLs properly if 266 * needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed so 267 * that the existing znodes created with open permissions are now changed with restrictive perms. 268 */ 269 public void checkAndSetZNodeAcls() { 270 if (!ZKAuthentication.isSecureZooKeeper(getConfiguration())) { 271 LOG.info("not a secure deployment, proceeding"); 272 return; 273 } 274 275 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 276 // correct. 277 try { 278 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 279 280 if (!isBaseZnodeAclSetup(actualAcls)) { 281 LOG.info("setting znode ACLs"); 282 setZnodeAclsRecursive(znodePaths.baseZNode); 283 } 284 } catch (KeeperException.NoNodeException nne) { 285 return; 286 } catch (InterruptedException ie) { 287 interruptedExceptionNoThrow(ie, false); 288 } catch (IOException | KeeperException e) { 289 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 290 } 291 } 292 293 /** 294 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs will 295 * be set last in case the master fails in between. 296 * @param znode the ZNode to set the permissions for 297 */ 298 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 299 List<String> children = recoverableZooKeeper.getChildren(znode, false); 300 301 for (String child : children) { 302 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 303 } 304 List<ACL> acls = createACL(znode, true); 305 LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls); 306 recoverableZooKeeper.setAcl(znode, acls, -1); 307 } 308 309 /** 310 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 311 * @param acls acls from zookeeper 312 * @return whether ACLs are set for the base znode 313 * @throws IOException if getting the current user fails 314 */ 315 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 316 if (LOG.isDebugEnabled()) { 317 LOG.debug("Checking znode ACLs"); 318 } 319 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 320 // Check whether ACL set for all superusers 321 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 322 return false; 323 } 324 325 // this assumes that current authenticated user is the same as zookeeper client user 326 // configured via JAAS 327 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 328 329 if (acls.isEmpty()) { 330 if (LOG.isDebugEnabled()) { 331 LOG.debug("ACL is empty"); 332 } 333 return false; 334 } 335 336 for (ACL acl : acls) { 337 int perms = acl.getPerms(); 338 Id id = acl.getId(); 339 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 340 // and one for the hbase user 341 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 342 if (perms != Perms.READ) { 343 if (LOG.isDebugEnabled()) { 344 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 345 id, perms, Perms.READ)); 346 } 347 return false; 348 } 349 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 350 if (perms != Perms.ALL) { 351 if (LOG.isDebugEnabled()) { 352 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 353 id, perms, Perms.ALL)); 354 } 355 return false; 356 } 357 } else if ("sasl".equals(id.getScheme())) { 358 String name = id.getId(); 359 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 360 Matcher match = NAME_PATTERN.matcher(name); 361 if (match.matches()) { 362 name = match.group(1); 363 } 364 if (name.equals(hbaseUser)) { 365 if (perms != Perms.ALL) { 366 if (LOG.isDebugEnabled()) { 367 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 368 id, perms, Perms.ALL)); 369 } 370 return false; 371 } 372 } else { 373 if (LOG.isDebugEnabled()) { 374 LOG.debug("Unexpected shortname in SASL ACL: {}", id); 375 } 376 return false; 377 } 378 } else { 379 if (LOG.isDebugEnabled()) { 380 LOG.debug("unexpected ACL id '{}'", id); 381 } 382 return false; 383 } 384 } 385 return true; 386 } 387 388 /* 389 * Validate whether ACL set for all superusers. 390 */ 391 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 392 for (String user : superUsers) { 393 boolean hasAccess = false; 394 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 395 if (!AuthUtil.isGroupPrincipal(user)) { 396 for (ACL acl : acls) { 397 if (user.equals(acl.getId().getId())) { 398 if (acl.getPerms() == Perms.ALL) { 399 hasAccess = true; 400 } else { 401 if (LOG.isDebugEnabled()) { 402 LOG.debug(String.format( 403 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 404 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 405 } 406 } 407 break; 408 } 409 } 410 if (!hasAccess) { 411 return false; 412 } 413 } 414 } 415 return true; 416 } 417 418 /* 419 * Validate whether ACL ID is superuser. 420 */ 421 public static boolean isSuperUserId(String[] superUsers, Id id) { 422 for (String user : superUsers) { 423 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 424 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 425 return true; 426 } 427 } 428 return false; 429 } 430 431 @Override 432 public String toString() { 433 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 434 } 435 436 /** 437 * Adds this instance's identifier as a prefix to the passed <code>str</code> 438 * @param str String to amend. 439 * @return A new string with this instance's identifier as prefix: e.g. if passed 'hello world', 440 * the returned string could be 441 */ 442 public String prefix(final String str) { 443 return this.toString() + " " + str; 444 } 445 446 /** 447 * Get the znodes corresponding to the meta replicas from ZK 448 * @return list of znodes 449 * @throws KeeperException if a ZooKeeper operation fails 450 */ 451 public List<String> getMetaReplicaNodes() throws KeeperException { 452 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 453 return filterMetaReplicaNodes(childrenOfBaseNode); 454 } 455 456 /** 457 * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode 458 * for subsequent CREATE/DELETE operations on child nodes. 459 */ 460 public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException { 461 List<String> childrenOfBaseNode = 462 ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode); 463 // Need to throw here instead of returning an empty list if the base znode hasn't been created 464 // Caller should retry in that case, versus thinking the base znode has a watcher set 465 if (childrenOfBaseNode == null) { 466 keeperException(new KeeperException.NoNodeException(znodePaths.baseZNode)); 467 } 468 return filterMetaReplicaNodes(childrenOfBaseNode); 469 } 470 471 /** 472 * @param nodes Input list of znodes 473 * @return Filtered list of znodes from nodes that belong to meta replica(s). 474 */ 475 private List<String> filterMetaReplicaNodes(List<String> nodes) { 476 if (nodes == null || nodes.isEmpty()) { 477 return new ArrayList<>(); 478 } 479 List<String> metaReplicaNodes = new ArrayList<>(2); 480 String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX); 481 for (String child : nodes) { 482 if (child.startsWith(pattern)) { 483 metaReplicaNodes.add(child); 484 } 485 } 486 return metaReplicaNodes; 487 } 488 489 /** 490 * Register the specified listener to receive ZooKeeper events. 491 * @param listener the listener to register 492 */ 493 public void registerListener(ZKListener listener) { 494 listeners.add(listener); 495 } 496 497 /** 498 * Register the specified listener to receive ZooKeeper events and add it as the first in the list 499 * of current listeners. 500 * @param listener the listener to register 501 */ 502 public void registerListenerFirst(ZKListener listener) { 503 listeners.add(0, listener); 504 } 505 506 public void unregisterListener(ZKListener listener) { 507 listeners.remove(listener); 508 } 509 510 /** 511 * Clean all existing listeners 512 */ 513 public void unregisterAllListeners() { 514 listeners.clear(); 515 } 516 517 /** 518 * Get a copy of current registered listeners 519 */ 520 public List<ZKListener> getListeners() { 521 return new ArrayList<>(listeners); 522 } 523 524 /** Returns The number of currently registered listeners */ 525 public int getNumberOfListeners() { 526 return listeners.size(); 527 } 528 529 /** 530 * Get the connection to ZooKeeper. 531 * @return connection reference to zookeeper 532 */ 533 public RecoverableZooKeeper getRecoverableZooKeeper() { 534 return recoverableZooKeeper; 535 } 536 537 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 538 recoverableZooKeeper.reconnectAfterExpiration(); 539 } 540 541 /** 542 * Get the quorum address of this instance. 543 * @return quorum string of this zookeeper connection instance 544 */ 545 public String getQuorum() { 546 return quorum; 547 } 548 549 /** 550 * Get the znodePaths. 551 * <p> 552 * Mainly used for mocking as mockito can not mock a field access. 553 */ 554 public ZNodePaths getZNodePaths() { 555 return znodePaths; 556 } 557 558 private void processEvent(WatchedEvent event) { 559 TraceUtil.trace(() -> { 560 switch (event.getType()) { 561 // If event type is NONE, this is a connection status change 562 case None: { 563 connectionEvent(event); 564 break; 565 } 566 567 // Otherwise pass along to the listeners 568 case NodeCreated: { 569 for (ZKListener listener : listeners) { 570 listener.nodeCreated(event.getPath()); 571 } 572 break; 573 } 574 575 case NodeDeleted: { 576 for (ZKListener listener : listeners) { 577 listener.nodeDeleted(event.getPath()); 578 } 579 break; 580 } 581 582 case NodeDataChanged: { 583 for (ZKListener listener : listeners) { 584 listener.nodeDataChanged(event.getPath()); 585 } 586 break; 587 } 588 589 case NodeChildrenChanged: { 590 for (ZKListener listener : listeners) { 591 listener.nodeChildrenChanged(event.getPath()); 592 } 593 break; 594 } 595 default: 596 LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(), 597 event.getPath()); 598 } 599 }, "ZKWatcher.processEvent: " + event.getType() + " " + event.getPath()); 600 } 601 602 /** 603 * Method called from ZooKeeper for events and connection status. 604 * <p> 605 * Valid events are passed along to listeners. Connection status changes are dealt with locally. 606 */ 607 @Override 608 public void process(WatchedEvent event) { 609 LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" 610 + event.getState() + ", " + "path=" + event.getPath())); 611 final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier; 612 if (!zkEventProcessor.isShutdown()) { 613 zkEventProcessor.execute(TraceUtil.tracedRunnable(() -> processEvent(event), spanName)); 614 } 615 } 616 617 // Connection management 618 619 /** 620 * Called when there is a connection-related event via the Watcher callback. 621 * <p> 622 * If Disconnected or Expired, this should shutdown the cluster. But, since we send a 623 * KeeperException.SessionExpiredException along with the abort call, it's possible for the 624 * Abortable to catch it and try to create a new session with ZooKeeper. This is what the client 625 * does in HCM. 626 * <p> 627 * @param event the connection-related event 628 */ 629 private void connectionEvent(WatchedEvent event) { 630 switch (event.getState()) { 631 case SyncConnected: 632 this.identifier = 633 this.prefix + "-0x" + Long.toHexString(this.recoverableZooKeeper.getSessionId()); 634 // Update our identifier. Otherwise ignore. 635 LOG.debug("{} connected", this.identifier); 636 break; 637 638 // Abort the server if Disconnected or Expired 639 case Disconnected: 640 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 641 break; 642 643 case Closed: 644 LOG.debug(prefix("ZooKeeper client closed, ignoring")); 645 break; 646 647 case Expired: 648 String msg = prefix(this.identifier + " received expired from " + "ZooKeeper, aborting"); 649 // TODO: One thought is to add call to ZKListener so say, 650 // ZKNodeTracker can zero out its data values. 651 if (this.abortable != null) { 652 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 653 } 654 break; 655 656 case ConnectedReadOnly: 657 case SaslAuthenticated: 658 case AuthFailed: 659 break; 660 661 default: 662 throw new IllegalStateException("Received event is not valid: " + event.getState()); 663 } 664 } 665 666 /** 667 * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a 668 * timeout lets the callers fail-fast rather than wait forever for the sync to finish. 669 * <p> 670 * Executing this method before running other methods will ensure that the subsequent operations 671 * are up-to-date and consistent as of the time that the sync is complete. 672 * <p> 673 * This is used for compareAndSwap type operations where we need to read the data of an existing 674 * node and delete or transition that node, utilizing the previously read version and data. We 675 * want to ensure that the version read is up-to-date from when we begin the operation. 676 * <p> 677 */ 678 public void syncOrTimeout(String path) throws KeeperException { 679 final CountDownLatch latch = new CountDownLatch(1); 680 long startTime = EnvironmentEdgeManager.currentTime(); 681 this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null); 682 try { 683 if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { 684 LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points " 685 + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout); 686 throw new KeeperException.RequestTimeoutException(); 687 } 688 } catch (InterruptedException e) { 689 LOG.warn("Interrupted waiting for ZK sync() to finish.", e); 690 Thread.currentThread().interrupt(); 691 return; 692 } 693 if (LOG.isDebugEnabled()) { 694 // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a 695 // useful metric to have since the latency of sync() impacts the callers. 696 LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime); 697 } 698 } 699 700 /** 701 * Handles KeeperExceptions in client calls. 702 * <p> 703 * This may be temporary but for now this gives one place to deal with these. 704 * <p> 705 * TODO: Currently this method rethrows the exception to let the caller handle 706 * <p> 707 * @param ke the exception to rethrow 708 * @throws KeeperException if a ZooKeeper operation fails 709 */ 710 public void keeperException(KeeperException ke) throws KeeperException { 711 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 712 throw ke; 713 } 714 715 /** 716 * Handles InterruptedExceptions in client calls. 717 * @param ie the InterruptedException instance thrown 718 * @throws KeeperException the exception to throw, transformed from the InterruptedException 719 */ 720 public void interruptedException(InterruptedException ie) throws KeeperException { 721 interruptedExceptionNoThrow(ie, true); 722 // Throw a system error exception to let upper level handle it 723 KeeperException keeperException = new KeeperException.SystemErrorException(); 724 keeperException.initCause(ie); 725 throw keeperException; 726 } 727 728 /** 729 * Log the InterruptedException and interrupt current thread 730 * @param ie The IterruptedException to log 731 * @param throwLater Whether we will throw the exception latter 732 */ 733 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 734 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 735 + (throwLater ? " and rethrow a SystemErrorException" : "")), ie); 736 // At least preserve interrupt. 737 Thread.currentThread().interrupt(); 738 } 739 740 /** 741 * Close the connection to ZooKeeper. 742 */ 743 @Override 744 public void close() { 745 zkEventProcessor.shutdown(); 746 try { 747 if (!zkEventProcessor.awaitTermination(15, TimeUnit.SECONDS)) { 748 LOG.warn("ZKWatcher event processor has not finished to terminate."); 749 zkEventProcessor.shutdownNow(); 750 } 751 } catch (InterruptedException e) { 752 Thread.currentThread().interrupt(); 753 } finally { 754 try { 755 recoverableZooKeeper.close(); 756 } catch (InterruptedException e) { 757 Thread.currentThread().interrupt(); 758 } 759 } 760 } 761 762 public Configuration getConfiguration() { 763 return conf; 764 } 765 766 @Override 767 public void abort(String why, Throwable e) { 768 if (this.abortable != null) { 769 this.abortable.abort(why, e); 770 } else { 771 this.aborted = true; 772 } 773 } 774 775 @Override 776 public boolean isAborted() { 777 return this.abortable == null ? this.aborted : this.abortable.isAborted(); 778 } 779}